diff --git a/docker.go b/docker.go index efb324aa54..7a66b70f9e 100644 --- a/docker.go +++ b/docker.go @@ -177,6 +177,7 @@ func (c *DockerContainer) Start(ctx context.Context) error { // Terminate is used to kill the container. It is usually triggered by as defer function. func (c *DockerContainer) Terminate(ctx context.Context) error { + c.StopLogProducer() select { // close reaper if it was created case c.terminationSignal <- true: @@ -226,13 +227,7 @@ func (c *DockerContainer) Logs(ctx context.Context) (io.ReadCloser, error) { // FollowOutput adds a LogConsumer to be sent logs from the container's // STDOUT and STDERR func (c *DockerContainer) FollowOutput(consumer LogConsumer) { - if c.consumers == nil { - c.consumers = []LogConsumer{ - consumer, - } - } else { - c.consumers = append(c.consumers, consumer) - } + c.consumers = append(c.consumers, consumer) } // Name gets the name of the container. @@ -398,7 +393,13 @@ func (c *DockerContainer) CopyFileToContainer(ctx context.Context, hostFilePath // StartLogProducer will start a concurrent process that will continuously read logs // from the container and will send them to each added LogConsumer func (c *DockerContainer) StartLogProducer(ctx context.Context) error { - go func() { + if c.stopProducer != nil { + return errors.New("log producer already started") + } + + c.stopProducer = make(chan bool) + + go func(stop <-chan bool) { since := "" // if the socket is closed we will make additional logs request with updated Since timestamp BEGIN: @@ -421,7 +422,7 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { for { select { - case <-c.stopProducer: + case <-stop: err := r.Close() if err != nil { // we can't close the read closer, this should never happen @@ -470,7 +471,7 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { } } } - }() + }(c.stopProducer) return nil } @@ -478,7 +479,10 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { // StopLogProducer will stop the concurrent process that is reading logs // and sending them to each added LogConsumer func (c *DockerContainer) StopLogProducer() error { - c.stopProducer <- true + if c.stopProducer != nil { + c.stopProducer <- true + c.stopProducer = nil + } return nil } @@ -758,7 +762,7 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque provider: p, terminationSignal: termSignal, skipReaper: req.SkipReaper, - stopProducer: make(chan bool), + stopProducer: nil, } return c, nil diff --git a/docs/features/follow_logs.md b/docs/features/follow_logs.md index 8c626c27ad..4fa93cd01c 100644 --- a/docs/features/follow_logs.md +++ b/docs/features/follow_logs.md @@ -1,14 +1,11 @@ # Following Container Logs If you wish to follow container logs, you can set up `LogConsumer`s. The log -following functionality follows a producer-consumer model. You will need to -explicitly start and stop the producer. As logs are written to either `stdout`, +following functionality follows a producer-consumer model. As logs are written to either `stdout`, or `stderr` (`stdin` is not supported) they will be forwarded (produced) to any associated `LogConsumer`s. You can associate `LogConsumer`s with the `.FollowOutput` function. -**Please note** if you start the producer you should always stop it explicitly. - For example, this consumer will just add logs to a slice ```go @@ -26,13 +23,13 @@ g := TestLogConsumer{ Msgs: []string{}, } +c.FollowOutput(&g) // must be called before StarLogProducer + err := c.StartLogProducer(ctx) if err != nil { // do something with err } -c.FollowOutput(&g) - // some stuff happens... err = c.StopLogProducer() @@ -41,3 +38,5 @@ if err != nil { } ``` +`LogProducer` is stopped in `c.Terminate()`. It can be done manually during container lifecycle +using `c.StopLogProducer()`. For a particular container, only one `LogProducer` can be active at time diff --git a/logconsumer_test.go b/logconsumer_test.go index b251349e40..d03916cc5c 100644 --- a/logconsumer_test.go +++ b/logconsumer_test.go @@ -3,6 +3,7 @@ package testcontainers import ( "context" "fmt" + "github.com/stretchr/testify/require" "net/http" "testing" "time" @@ -19,23 +20,16 @@ type TestLogConsumer struct { } func (g *TestLogConsumer) Accept(l Log) { - if string(l.Content) == fmt.Sprintf("echo %s\n", lastMessage) { + s := string(l.Content) + if s == fmt.Sprintf("echo %s\n", lastMessage) { g.Ack <- true return } - g.Msgs = append(g.Msgs, string(l.Content)) + g.Msgs = append(g.Msgs, s) } func Test_LogConsumerGetsCalled(t *testing.T) { - t.Skip("This test is randomly failing for different versions of Go") - /* - send one request at a time to a server that will - print whatever was sent in the "echo" parameter, the log - consumer should get all of the messages and append them - to the Msgs slice - */ - ctx := context.Background() req := ContainerRequest{ FromDockerfile: FromDockerfile{ @@ -52,56 +46,38 @@ func Test_LogConsumerGetsCalled(t *testing.T) { } c, err := GenericContainer(ctx, gReq) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) ep, err := c.Endpoint(ctx, "http") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) g := TestLogConsumer{ Msgs: []string{}, Ack: make(chan bool), } - err = c.StartLogProducer(ctx) - if err != nil { - t.Fatal(err) - } - c.FollowOutput(&g) + err = c.StartLogProducer(ctx) + require.NoError(t, err) + _, err = http.Get(ep + "/stdout?echo=hello") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) _, err = http.Get(ep + "/stdout?echo=there") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) - time.Sleep(10 * time.Second) - - _, err = http.Get(ep + fmt.Sprintf("/stdout?echo=%s", lastMessage)) - if err != nil { - t.Fatal(err) - } + _, err = http.Get(ep + "/stdout?echo=" + lastMessage) + require.NoError(t, err) select { case <-g.Ack: case <-time.After(5 * time.Second): t.Fatal("never received final log message") } - c.StopLogProducer() - - // get rid of the server "ready" log - g.Msgs = g.Msgs[1:] - - assert.DeepEqual(t, []string{"echo hello\n", "echo there\n"}, g.Msgs) - c.Terminate(ctx) + assert.NilError(t, c.StopLogProducer()) + assert.DeepEqual(t, []string{"ready\n", "echo hello\n", "echo there\n"}, g.Msgs) + assert.NilError(t, c.Terminate(ctx)) } type TestLogTypeConsumer struct { @@ -135,49 +111,137 @@ func Test_ShouldRecognizeLogTypes(t *testing.T) { } c, err := GenericContainer(ctx, gReq) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) ep, err := c.Endpoint(ctx, "http") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) g := TestLogTypeConsumer{ LogTypes: map[string]string{}, Ack: make(chan bool), } - err = c.StartLogProducer(ctx) - if err != nil { - t.Fatal(err) - } - c.FollowOutput(&g) + err = c.StartLogProducer(ctx) + require.NoError(t, err) + _, err = http.Get(ep + "/stdout?echo=this-is-stdout") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) _, err = http.Get(ep + "/stderr?echo=this-is-stderr") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) - _, err = http.Get(ep + fmt.Sprintf("/stdout?echo=%s", lastMessage)) - if err != nil { - t.Fatal(err) - } + _, err = http.Get(ep + "/stdout?echo=" + lastMessage) + require.NoError(t, err) <-g.Ack - c.StopLogProducer() + assert.NilError(t, c.StopLogProducer()) assert.DeepEqual(t, map[string]string{ StdoutLog: "echo this-is-stdout\n", StderrLog: "echo this-is-stderr\n", }, g.LogTypes) - c.Terminate(ctx) + assert.NilError(t, c.Terminate(ctx)) +} + +func Test_MultipleLogConsumers(t *testing.T) { + ctx := context.Background() + req := ContainerRequest{ + FromDockerfile: FromDockerfile{ + Context: "./testresources/", + Dockerfile: "echoserver.Dockerfile", + }, + ExposedPorts: []string{"8080/tcp"}, + WaitingFor: wait.ForLog("ready"), + } + + gReq := GenericContainerRequest{ + ContainerRequest: req, + Started: true, + } + c, err := GenericContainer(ctx, gReq) + require.NoError(t, err) + + ep, err := c.Endpoint(ctx, "http") + require.NoError(t, err) + + first := TestLogConsumer{Msgs: []string{}, Ack: make(chan bool)} + second := TestLogConsumer{Msgs: []string{}, Ack: make(chan bool)} + + c.FollowOutput(&first) + c.FollowOutput(&second) + + err = c.StartLogProducer(ctx) + require.NoError(t, err) + + _, err = http.Get(ep + "/stdout?echo=mlem") + require.NoError(t, err) + + _, err = http.Get(ep + "/stdout?echo=" + lastMessage) + require.NoError(t, err) + + <-first.Ack + <-second.Ack + assert.NilError(t, c.StopLogProducer()) + + assert.DeepEqual(t, []string{"ready\n", "echo mlem\n"}, first.Msgs) + assert.DeepEqual(t, []string{"ready\n", "echo mlem\n"}, second.Msgs) + assert.NilError(t, c.Terminate(ctx)) +} + +func Test_StartStop(t *testing.T) { + ctx := context.Background() + req := ContainerRequest{ + FromDockerfile: FromDockerfile{ + Context: "./testresources/", + Dockerfile: "echoserver.Dockerfile", + }, + ExposedPorts: []string{"8080/tcp"}, + WaitingFor: wait.ForLog("ready"), + } + + gReq := GenericContainerRequest{ + ContainerRequest: req, + Started: true, + } + + c, err := GenericContainer(ctx, gReq) + require.NoError(t, err) + + ep, err := c.Endpoint(ctx, "http") + require.NoError(t, err) + + g := TestLogConsumer{Msgs: []string{}, Ack: make(chan bool)} + + c.FollowOutput(&g) + + require.NoError(t, c.StopLogProducer(), "nothing should happen even if the producer is not started") + require.NoError(t, c.StartLogProducer(ctx)) + require.Error(t, c.StartLogProducer(ctx), "log producer is already started") + + _, err = http.Get(ep + "/stdout?echo=mlem") + require.NoError(t, err) + + require.NoError(t, c.StopLogProducer()) + require.NoError(t, c.StartLogProducer(ctx)) + + _, err = http.Get(ep + "/stdout?echo=mlem2") + require.NoError(t, err) + + _, err = http.Get(ep + "/stdout?echo=" + lastMessage) + require.NoError(t, err) + + <-g.Ack + // Do not close producer here, let's delegate it to c.Terminate + + assert.DeepEqual(t, []string{ + "ready\n", + "echo mlem\n", + "ready\n", + "echo mlem\n", + "echo mlem2\n", + }, g.Msgs) + assert.NilError(t, c.Terminate(ctx)) } diff --git a/testresources/echoserver.go b/testresources/echoserver.go index 0ab8d5c31f..9d391737c9 100644 --- a/testresources/echoserver.go +++ b/testresources/echoserver.go @@ -24,6 +24,7 @@ func echoHandler(destination *os.File) http.HandlerFunc { l := log.New(destination, "echo ", 0) l.Println(echo) + destination.Sync() rw.WriteHeader(http.StatusAccepted) }