Skip to content

Commit

Permalink
GODRIVER-2489 createPipelineOptionsDoc: return err (#1036)
Browse files Browse the repository at this point in the history
  • Loading branch information
prestonvasquez authored and matthewdale committed Aug 2, 2022
1 parent bdfc953 commit 06579a7
Showing 1 changed file with 12 additions and 11 deletions.
23 changes: 12 additions & 11 deletions mongo/change_stream.go
Expand Up @@ -256,9 +256,9 @@ func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) err
if resuming {
cs.replaceOptions(cs.wireVersion)

csOptDoc := cs.createPipelineOptionsDoc()
if cs.err != nil {
return cs.Err()
csOptDoc, err := cs.createPipelineOptionsDoc()
if err != nil {
return err
}
pipIdx, pipDoc := bsoncore.AppendDocumentStart(nil)
pipDoc = bsoncore.AppendDocumentElement(pipDoc, "$changeStream", csOptDoc)
Expand Down Expand Up @@ -386,9 +386,10 @@ func (cs *ChangeStream) buildPipelineSlice(pipeline interface{}) error {
cs.pipelineSlice = make([]bsoncore.Document, 0, val.Len()+1)

csIdx, csDoc := bsoncore.AppendDocumentStart(nil)
csDocTemp := cs.createPipelineOptionsDoc()
if cs.err != nil {
return cs.err

csDocTemp, err := cs.createPipelineOptionsDoc()
if err != nil {
return err
}
csDoc = bsoncore.AppendDocumentElement(csDoc, "$changeStream", csDocTemp)
csDoc, cs.err = bsoncore.AppendDocumentEnd(csDoc, csIdx)
Expand All @@ -410,7 +411,7 @@ func (cs *ChangeStream) buildPipelineSlice(pipeline interface{}) error {
return cs.err
}

func (cs *ChangeStream) createPipelineOptionsDoc() bsoncore.Document {
func (cs *ChangeStream) createPipelineOptionsDoc() (bsoncore.Document, error) {
plDocIdx, plDoc := bsoncore.AppendDocumentStart(nil)

if cs.streamType == ClientStream {
Expand All @@ -434,7 +435,7 @@ func (cs *ChangeStream) createPipelineOptionsDoc() bsoncore.Document {
var raDoc bsoncore.Document
raDoc, cs.err = transformBsoncoreDocument(cs.registry, cs.options.ResumeAfter, true, "resumeAfter")
if cs.err != nil {
return nil
return nil, cs.err
}

plDoc = bsoncore.AppendDocumentElement(plDoc, "resumeAfter", raDoc)
Expand All @@ -448,7 +449,7 @@ func (cs *ChangeStream) createPipelineOptionsDoc() bsoncore.Document {
var saDoc bsoncore.Document
saDoc, cs.err = transformBsoncoreDocument(cs.registry, cs.options.StartAfter, true, "startAfter")
if cs.err != nil {
return nil
return nil, cs.err
}

plDoc = bsoncore.AppendDocumentElement(plDoc, "startAfter", saDoc)
Expand All @@ -464,10 +465,10 @@ func (cs *ChangeStream) createPipelineOptionsDoc() bsoncore.Document {
}

if plDoc, cs.err = bsoncore.AppendDocumentEnd(plDoc, plDocIdx); cs.err != nil {
return nil
return nil, cs.err
}

return plDoc
return plDoc, nil
}

func (cs *ChangeStream) pipelineToBSON() (bsoncore.Document, error) {
Expand Down

0 comments on commit 06579a7

Please sign in to comment.