Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query blocks with ChangeStream #376

Open
jinq0123 opened this issue Sep 11, 2019 · 2 comments
Open

Query blocks with ChangeStream #376

jinq0123 opened this issue Sep 11, 2019 · 2 comments

Comments

@jinq0123
Copy link

We use the issue tracker to track bugs with mgo - if you have a usage question,
it's best to try Stack Overflow :)

Replace this text with your description, and please answer the questions below
before submitting your issue to help us out. Thanks!


What version of MongoDB are you using (mongod --version)?

db version v4.2.0
git version: a4b751dcf51dd249c5865812b390cfd1c0129c30
allocator: tcmalloc
modules: none
build environment:
    distmod: 2012plus
    distarch: x86_64
    target_arch: x86_64

What version of Go are you using (go version)?

go version go1.12.7 windows/amd64

What operating system and processor architecture are you using (go env)?

set GOARCH=amd64
set GOBIN=
set GOCACHE=C:\Users\jinqing\AppData\Local\go-build
set GOEXE=.exe
set GOFLAGS=
set GOHOSTARCH=amd64
set GOHOSTOS=windows
set GOOS=windows
set GOPATH=D:\gopath
set GOPROXY=https://goproxy.cn
set GORACE=
set GOROOT=D:\Go
set GOTMPDIR=
set GOTOOLDIR=D:\Go\pkg\tool\windows_amd64
set GCCGO=gccgo
set CC=gcc
set CXX=g++
set CGO_ENABLED=1
set GOMOD=
set CGO_CFLAGS=-g -O2
set CGO_CPPFLAGS=
set CGO_CXXFLAGS=-g -O2
set CGO_FFLAGS=-g -O2
set CGO_LDFLAGS=-g -O2
set PKG_CONFIG=pkg-config
set GOGCCFLAGS=-m64 -mthreads -fmessage-length=0 -fdebug-prefix-map=C:\Users\jinqing\AppData\Local\Temp\go-build075823922=/tmp/go-build -gno-record-gcc-switches

What did you do?

The program watches a change stream in backgroud, and the query will block.

Setup a local mongodb:

mkdir d:\data\db
D:\Tool\mongodb-win32-x86_64-2012plus-4.2.0\bin
λ mongo --nodb
MongoDB shell version v4.2.0
> cluster = new SharingTest({shards: 3})

Then run the test program:

package main

import (
	"fmt"
	"os"
	"time"

	"github.com/globalsign/mgo"
)

func main() {
	session, err := mgo.Dial("mongodb://localhost:20006/test")
	if err != nil {
		fmt.Println("Dial error: ", err)
		return
	}

	go loopWatch(session)
	for {
		time.Sleep(time.Second)
		query(session)
	}
}

func query(session *mgo.Session) {
	c := session.DB("").C("test1")
	result := struct{}{}

	startTime := time.Now()
	err := c.Find(nil).One(&result)
	cost := time.Since(startTime)
	fmt.Printf("Query takes: %v, result: %v, err: %v\n", cost, result, err)
}

func loopWatch(session *mgo.Session) {
	for {
		watch(session)
	}
}

func watch(session *mgo.Session) {
	coll := session.DB("").C("test2")
	changeStream, err := coll.Watch(nil, mgo.ChangeStreamOptions{
		MaxAwaitTimeMS: 5 * time.Second,
	})
	if err != nil {
		fmt.Println("failed to watch: ", err)
		os.Exit(1)
	}
	defer changeStream.Close()

	//Handling change stream in a cycle
	for {
		//making a struct for unmarshalling
		changeDoc := struct {
			OperationType string `json:"operationType" bson:"operationType"`
		}{}

		//getting next item from the steam
		fmt.Println("Change stream next...")
		ok := changeStream.Next(&changeDoc)
		if ok {
			fmt.Println("got change")
			continue
		}

		//if data from the stream wasn't unmarshaled, we get ok == false as a result
		//so we need to call Err() method to get info why
		//it'll be nil if we just have no data
		err := changeStream.Err()
		if err == nil {
			continue
		}

		//if err is not nil, it means something bad happened, let's finish our func
		return
	} // for
}

Output:

D:\jinqing\Test\go\mgochg
λ mgochg.exe
Change stream next...
Change stream next...
Query takes: 4.0112788s, result: {}, err: not found
Change stream next...
Query takes: 4.0022317s, result: {}, err: not found
Change stream next...
Query takes: 4.0022971s, result: {}, err: not found
Change stream next...
Query takes: 4.0003046s, result: {}, err: not found
Change stream next...
Query takes: 4.0032182s, result: {}, err: not found
Change stream next...
Query takes: 4.0043058s, result: {}, err: not found
...

Can you reproduce the issue on the latest development branch?

@fananchong
Copy link

The correct way to use should look like this:

// Copy the session - if needed this will dial a new connection which
// can later be reused.
//
// Calling close returns the connection to the pool.
conn := session.Copy()
defer conn.Close()

// Do something(s) with the connection
_, _ = conn.DB("").C("my_data").Count()

@jinq0123
Copy link
Author

Same result using copied session in Eventual mode:

	session.SetMode(mgo.Eventual, true)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants