Skip to content

Commit

Permalink
use context to correctly close stream connections (#368)
Browse files Browse the repository at this point in the history
* use context to correctly close stream connections

* remove print statements
  • Loading branch information
ffalor committed Sep 8, 2023
1 parent e3f8530 commit 9a96b10
Showing 1 changed file with 56 additions and 36 deletions.
92 changes: 56 additions & 36 deletions falcon/api_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"errors"
"fmt"
"net/http"
"os"
"time"

"github.com/crowdstrike/gofalcon/falcon/client"
Expand All @@ -17,27 +16,31 @@ import (

// StreamingHandle is higher order type that allows for easy use of CrowdStrike Falcon Streaming API
type StreamingHandle struct {
ctx context.Context
client *client.CrowdStrikeAPISpecification
appId string
offset uint64
stream *models.MainAvailableStreamV2
Events chan *streaming_models.EventItem
Errors chan StreamingError
HTTPClient *http.Client
ctx context.Context
ctxCancelFunc context.CancelFunc
client *client.CrowdStrikeAPISpecification
appId string
offset uint64
stream *models.MainAvailableStreamV2
Events chan *streaming_models.EventItem
Errors chan StreamingError
HTTPClient *http.Client
}

// newStream initializes new StreamingHandle and connects to the Streaming API using the provided http.Client.
func newStream(ctx context.Context, client *client.CrowdStrikeAPISpecification, appId string, stream *models.MainAvailableStreamV2, offset uint64, httpClient *http.Client) (*StreamingHandle, error) {
ctx, cancel := context.WithCancel(ctx)

sh := &StreamingHandle{
ctx: ctx,
client: client,
appId: appId,
stream: stream,
offset: offset,
Events: make(chan *streaming_models.EventItem),
Errors: make(chan StreamingError),
HTTPClient: httpClient,
ctx: ctx,
ctxCancelFunc: cancel,
client: client,
appId: appId,
stream: stream,
offset: offset,
Events: make(chan *streaming_models.EventItem),
Errors: make(chan StreamingError),
HTTPClient: httpClient,
}
sh.maintainSession()
err := sh.open()
Expand Down Expand Up @@ -109,24 +112,40 @@ func (sh *StreamingHandle) open() error {

sh.Events = make(chan *streaming_models.EventItem)
go func() {
dec := json.NewDecoder(resp.Body)
for dec.More() {
var detection streaming_models.EventItem
//dec.DisallowUnknownFields()
err := dec.Decode(&detection)
defer func() {
err := resp.Body.Close()

if err != nil {
sh.Errors <- StreamingError{Fatal: false, Err: err}
sh.Errors <- StreamingError{
Fatal: false,
Err: err,
}
}

sh.Errors <- StreamingError{
Fatal: true,
Err: errors.New("streaming connection closed"),
}

close(sh.Errors)
close(sh.Events)
}()

dec := json.NewDecoder(resp.Body)
for {
select {
case <-sh.ctx.Done():
return
default:
if dec.More() {
var detection streaming_models.EventItem
err := dec.Decode(&detection)
if err != nil {
sh.Errors <- StreamingError{Fatal: false, Err: err}
}
sh.Events <- &detection
}
}
sh.Events <- &detection
}
sh.Errors <- StreamingError{
Fatal: true,
Err: errors.New("streaming connection closed"),
}
close(sh.Events)
err = resp.Body.Close()
if err != nil {
fmt.Fprintf(os.Stderr, "Error while closing the streaming connection: %v", err)
}
}()

Expand All @@ -135,9 +154,10 @@ func (sh *StreamingHandle) open() error {

// Close the StreamingHandle after use
func (sh *StreamingHandle) Close() {
close(sh.Errors)
sh.Errors = nil
sh.HTTPClient.CloseIdleConnections()
sh.ctxCancelFunc()
if sh.HTTPClient != nil {
sh.HTTPClient.CloseIdleConnections()
}
}

func (sh *StreamingHandle) url() string {
Expand Down

0 comments on commit 9a96b10

Please sign in to comment.