Skip to content

Commit

Permalink
Shutdown processing when session expires.
Browse files Browse the repository at this point in the history
At present the whole loader will shutdown if session expires for even one routine. We are doing this so that we get to know of the problem fast enough as we get Crashloop alerts on continuous restarts. Later we can change the session timeouts to restart the ConsumeClaim and track the error rate prometheus

#150 (comment)
  • Loading branch information
alok87 committed Mar 4, 2021
1 parent e6068ad commit c928ddb
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 7 deletions.
15 changes: 11 additions & 4 deletions redshiftsink/pkg/redshiftbatcher/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,21 @@ func removeEmptyNullValues(value map[string]*string) map[string]*string {
return value
}

// TODO: get rid of this https://github.com/herryg91/gobatch/issues/2
func (b *batchProcessor) ctxCancelled(ctx context.Context) bool {
select {
case <-ctx.Done():
err := ctx.Err()
klog.Warningf("Batch processing stopped, ctx done, ctxErr: %v", err)
klog.V(2).Infof(
"topic:%s, batchId:%d, lastCommittedOffset:%d: Cancelled.\n",
klog.Warningf("Processing stopped! main ctx done, ctxErr: %v", err)
klog.Warningf(
"%s, batchId:%d, lastCommitted:%d: main ctx done. Cancelled.\n",
b.topic, b.batchId, b.lastCommittedOffset,
)
return true
case <-b.session.Context().Done():
err := ctx.Err()
klog.Warningf("Processing stopped! ctx done, ctxErr: %v", err)
klog.Warningf(
"%s, batchId:%d, lastCommitted:%d: session ctx done. Cancelled.\n",
b.topic, b.batchId, b.lastCommittedOffset,
)
return true
Expand Down
15 changes: 12 additions & 3 deletions redshiftsink/pkg/redshiftloader/load_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,21 @@ func newLoadProcessor(
}
}

// TODO: get rid of this https://github.com/herryg91/gobatch/issues/2
func (b *loadProcessor) ctxCancelled(ctx context.Context) bool {
select {
case <-ctx.Done():
klog.Infof(
"%s, batchId:%d, lastCommittedOffset:%d: Cancelled.\n",
err := ctx.Err()
klog.Warningf("Processing stopped! main ctx done, ctxErr: %v", err)
klog.Warningf(
"%s, batchId:%d, lastCommitted:%d: main ctx done. Cancelled.\n",
b.topic, b.batchId, b.lastCommittedOffset,
)
return true
case <-b.session.Context().Done():
err := ctx.Err()
klog.Warningf("Processing stopped! ctx done, ctxErr: %v", err)
klog.Warningf(
"%s, batchId:%d, lastCommitted:%d: session ctx done. Cancelled.\n",
b.topic, b.batchId, b.lastCommittedOffset,
)
return true
Expand Down

0 comments on commit c928ddb

Please sign in to comment.