Codebase list fever / 5336127
New upstream version 1.0.16 Sascha Steinbiss 3 years ago
8 changed file(s) with 103 addition(s) and 28 deletion(s). Raw diff Collapse all Expand all
44 jobs:
55 build:
66 docker:
7 - image: circleci/golang:1.10-stretch
8
7 - image: circleci/golang:latest
8 environment:
9 GO111MODULE: "on"
910 working_directory: /go/src/github.com/DCSO/fever
1011 steps:
1112 - checkout
1213 - run:
1314 name: Install test dependencies
1415 command: 'sudo apt-get update && sudo apt-get install redis-server -y'
15
1616 - run: go get -v -t -d ./...
1717 - run: go test -v ./...
0 module github.com/DCSO/fever
1
2 go 1.14
3
4 require (
5 github.com/DCSO/bloom v0.2.3
6 github.com/DCSO/fluxline v0.0.0-20200907065040-78686e5e68f6
7 github.com/NeowayLabs/wabbit v0.0.0-20201021105516-ded4a9ef19d2
8 github.com/Showmax/go-fqdn v1.0.0 // indirect
9 github.com/buger/jsonparser v1.1.1
10 github.com/fsouza/go-dockerclient v1.7.1 // indirect
11 github.com/garyburd/redigo v1.6.2
12 github.com/golang/protobuf v1.4.3
13 github.com/gomodule/redigo v1.8.3 // indirect
14 github.com/jinzhu/inflection v1.0.0 // indirect
15 github.com/mitchellh/go-homedir v1.1.0
16 github.com/onsi/ginkgo v1.15.0 // indirect
17 github.com/onsi/gomega v1.10.5 // indirect
18 github.com/patrickmn/go-cache v2.1.0+incompatible
19 github.com/pborman/uuid v1.2.1 // indirect
20 github.com/sirupsen/logrus v1.8.0
21 github.com/spf13/cobra v1.1.3
22 github.com/spf13/viper v1.7.1
23 github.com/streadway/amqp v1.0.0
24 github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203
25 github.com/tiago4orion/conjure v0.0.0-20150908101743-93cb30b9d218 // indirect
26 github.com/yl2chen/cidranger v1.0.2
27 google.golang.org/grpc v1.35.0
28 gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22
29 gopkg.in/pg.v5 v5.3.3
30 )
55 import (
66 "bytes"
77 "encoding/json"
8 "fmt"
89 "os"
910 "sync"
1011 "time"
140141 func (a *FlowAggregator) Consume(e *types.Entry) error {
141142 a.StringBuf.Write([]byte(e.SrcIP))
142143 a.StringBuf.Write([]byte(e.DestIP))
143 a.StringBuf.Write([]byte(string(e.DestPort)))
144 a.StringBuf.Write([]byte(fmt.Sprint(e.DestPort)))
144145 a.countFlow(a.StringBuf.String(), e)
145146 a.StringBuf.Reset()
146147 return nil
157157 if fh.StatsEncoder == nil || time.Since(sTime) < fh.StatsEncoder.SubmitPeriod {
158158 continue
159159 }
160 // Lock the current measurements for submission. Since this is a blocking
161 // operation, we don't want this to depend on how long submitter.Submit()
162 // takes but keep it independent of that. Hence we take the time to create
163 // a local copy of the counter to be able to reset and release the live
164 // one as quickly as possible.
160165 fh.Lock.Lock()
161 fh.PerfStats.ForwardedPerSec /= uint64(fh.StatsEncoder.SubmitPeriod.Seconds())
162 fh.StatsEncoder.Submit(fh.PerfStats)
166 // Make our own copy of the current counter
167 myStats := ForwardHandlerPerfStats{
168 ForwardedPerSec: fh.PerfStats.ForwardedPerSec,
169 }
170 myStats.ForwardedPerSec /= uint64(fh.StatsEncoder.SubmitPeriod.Seconds())
171 // Reset live counter
163172 fh.PerfStats.ForwardedPerSec = 0
173 // Release live counter to not block further events
174 fh.Lock.Unlock()
175
176 fh.StatsEncoder.Submit(myStats)
164177 sTime = time.Now()
165 fh.Lock.Unlock()
166178 }
167179 }
168180 }
143143 e = makeEvent("alert", "foo3")
144144 fh.Consume(&e)
145145
146 // wait for socket consumer to receive all
147 wg.Wait()
148
146149 // stop forwarding handler
147150 scChan := make(chan bool)
148151 fh.Stop(scChan)
149152 <-scChan
150
151 // wait for socket consumer to receive all
152 wg.Wait()
153153
154154 if len(coll) != 2 {
155155 t.Fatalf("unexpected number of alerts: %d != 2", len(coll))
223223 e = makeEvent("alert", "foo2")
224224 fh.Consume(&e)
225225
226 // wait for socket consumer to receive all
227 wg.Wait()
228
226229 // stop forwarding handler
227230 scChan := make(chan bool)
228231 fh.Stop(scChan)
229232 <-scChan
230233
231 // wait for socket consumer to receive all
232 wg.Wait()
233
234234 if len(coll) != 2 {
235235 t.Fatalf("unexpected number of alerts: %d != 2", len(coll))
236236 }
302302 e = makeEvent("alert", "foo3")
303303 fh.Consume(&e)
304304
305 wg.Wait()
306
305307 // stop forwarding handler
306308 scChan := make(chan bool)
307309 fh.Stop(scChan)
310312 // stop socket consumer
311313 inputListener.Close()
312314 close(clCh)
313 wg.Wait()
314315
315316 if len(coll) != 3 {
316317 t.Fatalf("unexpected number of alerts: %d != 3", len(coll))
4949 return []string{"not applicable"}
5050 }
5151
52 // Consume simply emits ths consumed entry on the default output channel
52 // Consume simply emits the consumed entry on the default output channel
5353 func (h *DBHandler) Consume(e *types.Entry) error {
5454 h.OutChan <- *e
5555 return nil
6767 if ad.StatsEncoder == nil || time.Since(sTime) < ad.StatsEncoder.SubmitPeriod {
6868 continue
6969 }
70 // Lock the current measurements for submission. Since this is a blocking
71 // operation, we don't want this to depend on how long submitter.Submit()
72 // takes but keep it independent of that. Hence we take the time to create
73 // a local copy of the counter to be able to reset and release the live
74 // one as quickly as possible.
7075 ad.Lock.Lock()
71 ad.PerfStats.DispatchedPerSec /= uint64(ad.StatsEncoder.SubmitPeriod.Seconds())
72 ad.StatsEncoder.Submit(ad.PerfStats)
76 // Make our own copy of the current counter
77 myStats := HandlerDispatcherPerfStats{
78 DispatchedPerSec: ad.PerfStats.DispatchedPerSec,
79 }
80 myStats.DispatchedPerSec /= uint64(ad.StatsEncoder.SubmitPeriod.Seconds())
81 // Reset live counter
7382 ad.PerfStats.DispatchedPerSec = 0
83 // Release live counter to not block further events
84 ad.Lock.Unlock()
85
86 ad.StatsEncoder.Submit(myStats)
7487 sTime = time.Now()
75 ad.Lock.Unlock()
7688 }
7789 }
7890 }
195195 // send flow notification with matching flow ID to signal end of flow
196196 notifyChan <- flowEntry
197197
198 // wait for socket consumer to receive all
199 wg.Wait()
200
198201 // stop forwarding handler
199202 scChan := make(chan bool)
200203 fh.Stop(scChan)
203206 // we can now close this
204207 grpcServer.Close()
205208
206 // wait for socket consumer to receive all
207 wg.Wait()
208
209209 return coll
210210 }
211211
8585 }
8686
8787 func (a *UnicornAggregator) submit(submitter util.StatsSubmitter, dummyMode bool) {
88 // Lock the current measurements for submission. Since this is a blocking
89 // operation, we don't want this to depend on how long submitter.Submit()
90 // takes but keep it independent of that. Hence we take the time to create
91 // a local copy of the aggregate to be able to reset and release the live
92 // one as quickly as possible.
8893 a.UnicornTuplesMutex.Lock()
8994 a.UnicornProxyMapMutex.Lock()
90 jsonString, myerror := json.Marshal(a.Aggregate)
95 // Make our own copy of the current aggregate, claiming ownership of the
96 // maps with the measurements
97 myAgg := UnicornAggregate{
98 SensorID: a.Aggregate.SensorID,
99 TimestampStart: a.Aggregate.TimestampStart,
100 TimestampEnd: a.Aggregate.TimestampEnd,
101 ProxyMap: a.Aggregate.ProxyMap,
102 FlowTuples: a.Aggregate.FlowTuples,
103 }
104 // Replace live maps with empty ones
105 a.Aggregate.FlowTuples = make(map[string](map[string]int64))
106 a.Aggregate.ProxyMap = make(map[string](map[string]int64))
107 // Release aggregate to not block further blocking ops on map contents
108 a.UnicornTuplesMutex.Unlock()
109 a.UnicornProxyMapMutex.Unlock()
110
111 jsonString, myerror := json.Marshal(myAgg)
91112 if myerror == nil {
92113 a.Logger.WithFields(log.Fields{
93 "flowtuples": len(a.Aggregate.FlowTuples),
94 "http-destips": len(a.Aggregate.ProxyMap)},
114 "flowtuples": len(myAgg.FlowTuples),
115 "http-destips": len(myAgg.ProxyMap)},
95116 ).Info("preparing to submit")
96117 submitter.Submit(jsonString, "unicorn", "application/json")
97118 } else {
98119 a.Logger.Warn("error marshaling JSON for metadata aggregation")
99120 }
100 a.Aggregate.FlowTuples = make(map[string](map[string]int64))
101 a.Aggregate.ProxyMap = make(map[string](map[string]int64))
102 a.UnicornTuplesMutex.Unlock()
103 a.UnicornProxyMapMutex.Unlock()
121
104122 }
105123
106124 // CountFlowTuple increments the flow tuple counter for the given key.