[go: up one dir, main page]

Skip to content

Commit

Permalink
push to CAPI in go routine (crowdsecurity#489)
Browse files Browse the repository at this point in the history
Co-authored-by: AlteredCoder <AlteredCoder>
  • Loading branch information
AlteredCoder authored Nov 30, 2020
1 parent fa11a94 commit c6eb2af
Showing 1 changed file with 35 additions and 16 deletions.
51 changes: 35 additions & 16 deletions pkg/apiserver/apic.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,24 +151,16 @@ func (a *apic) Push() error {
if len(cache) == 0 {
return nil
}
err := a.Send(&cache)
return err
go a.Send(&cache)
return nil
case <-ticker.C:
if len(cache) > 0 {
a.mu.Lock()
cacheCopy := cache
cache = make(models.AddSignalsRequest, 0)
a.mu.Unlock()
log.Infof("Signal push: %d signals to push", len(cacheCopy))
err := a.Send(&cacheCopy)
if err != nil {
log.Errorf("while sending signal to Central API : %s", err)
log.Debugf("dump: %+v", cacheCopy)
/*
even in case of error, we don't want to return here, or we need to kill everything.
this go-routine is in charge of pushing the signals to LAPI and is emptying the CAPIChan
*/
}
go a.Send(&cacheCopy)
}
case alerts := <-a.alertToPush:
var signals []*models.AddSignalsRequestItem
Expand All @@ -182,7 +174,7 @@ func (a *apic) Push() error {
}
}

func (a *apic) Send(cache *models.AddSignalsRequest) error {
func (a *apic) Send(cacheOrig *models.AddSignalsRequest) {
/*we do have a problem with this :
The apic.Push background routine reads from alertToPush chan.
This chan is filled by Controller.CreateAlert
Expand All @@ -194,10 +186,37 @@ func (a *apic) Send(cache *models.AddSignalsRequest) error {
I don't know enough about gin to tell how much of an issue it can be.
*/
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, _, err := a.apiClient.Signal.Add(ctx, cache)
return err
var cache []*models.AddSignalsRequestItem = *cacheOrig
var send models.AddSignalsRequest

bulkSize := 50
pageStart := 0
pageEnd := bulkSize

for {

if pageEnd >= len(cache) {
send = cache[pageStart:]
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, _, err := a.apiClient.Signal.Add(ctx, &send)
if err != nil {
log.Errorf("Error while sending final chunk to central API : %s", err)
return
}
break
}
send = cache[pageStart:pageEnd]
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, _, err := a.apiClient.Signal.Add(ctx, &send)
if err != nil {
//we log it here as well, because the return value of func might be discarded
log.Errorf("Error while sending chunk to central API : %s", err)
}
pageStart += bulkSize
pageEnd += bulkSize
}
}

func (a *apic) PullTop() error {
Expand Down

0 comments on commit c6eb2af

Please sign in to comment.