Skip to content

Commit

Permalink
fix: Following Container Logs feature fixes
Browse files Browse the repository at this point in the history
* fix the doc: `c.FollowOutput()` MUST be called before
  `c.StartLogProducer()` due to date race
* do not allow multiple `c.StartLogProducer()` without calling a
  `c.StopLogProducer()`
* run `c.StopLogProducer()` in `c.Terminate()` to reduce risk of an
   accidental goroutine leak
* fix tests, write new tests
  • Loading branch information
Krystian Chmura committed Oct 12, 2021
1 parent a5a9803 commit 65c2409
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 81 deletions.
28 changes: 16 additions & 12 deletions docker.go
Expand Up @@ -177,6 +177,7 @@ func (c *DockerContainer) Start(ctx context.Context) error {

// Terminate is used to kill the container. It is usually triggered by as defer function.
func (c *DockerContainer) Terminate(ctx context.Context) error {
c.StopLogProducer()
select {
// close reaper if it was created
case c.terminationSignal <- true:
Expand Down Expand Up @@ -226,13 +227,7 @@ func (c *DockerContainer) Logs(ctx context.Context) (io.ReadCloser, error) {
// FollowOutput adds a LogConsumer to be sent logs from the container's
// STDOUT and STDERR
func (c *DockerContainer) FollowOutput(consumer LogConsumer) {
if c.consumers == nil {
c.consumers = []LogConsumer{
consumer,
}
} else {
c.consumers = append(c.consumers, consumer)
}
c.consumers = append(c.consumers, consumer)
}

// Name gets the name of the container.
Expand Down Expand Up @@ -398,7 +393,13 @@ func (c *DockerContainer) CopyFileToContainer(ctx context.Context, hostFilePath
// StartLogProducer will start a concurrent process that will continuously read logs
// from the container and will send them to each added LogConsumer
func (c *DockerContainer) StartLogProducer(ctx context.Context) error {
go func() {
if c.stopProducer != nil {
return errors.New("log producer already started")
}

c.stopProducer = make(chan bool)

go func(stop <-chan bool) {
since := ""
// if the socket is closed we will make additional logs request with updated Since timestamp
BEGIN:
Expand All @@ -421,7 +422,7 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error {

for {
select {
case <-c.stopProducer:
case <-stop:
err := r.Close()
if err != nil {
// we can't close the read closer, this should never happen
Expand Down Expand Up @@ -470,15 +471,18 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error {
}
}
}
}()
}(c.stopProducer)

return nil
}

// StopLogProducer will stop the concurrent process that is reading logs
// and sending them to each added LogConsumer
func (c *DockerContainer) StopLogProducer() error {
c.stopProducer <- true
if c.stopProducer != nil {
c.stopProducer <- true
c.stopProducer = nil
}
return nil
}

Expand Down Expand Up @@ -758,7 +762,7 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque
provider: p,
terminationSignal: termSignal,
skipReaper: req.SkipReaper,
stopProducer: make(chan bool),
stopProducer: nil,
}

return c, nil
Expand Down
11 changes: 5 additions & 6 deletions docs/features/follow_logs.md
@@ -1,14 +1,11 @@
# Following Container Logs

If you wish to follow container logs, you can set up `LogConsumer`s. The log
following functionality follows a producer-consumer model. You will need to
explicitly start and stop the producer. As logs are written to either `stdout`,
following functionality follows a producer-consumer model. As logs are written to either `stdout`,
or `stderr` (`stdin` is not supported) they will be forwarded (produced) to any
associated `LogConsumer`s. You can associate `LogConsumer`s with the
`.FollowOutput` function.

**Please note** if you start the producer you should always stop it explicitly.

For example, this consumer will just add logs to a slice

```go
Expand All @@ -26,13 +23,13 @@ g := TestLogConsumer{
Msgs: []string{},
}

c.FollowOutput(&g) // must be called before StarLogProducer

err := c.StartLogProducer(ctx)
if err != nil {
// do something with err
}

c.FollowOutput(&g)

// some stuff happens...

err = c.StopLogProducer()
Expand All @@ -41,3 +38,5 @@ if err != nil {
}
```

`LogProducer` is stopped in `c.Terminate()`. It can be done manually during container lifecycle
using `c.StopLogProducer()`. For a particular container, only one `LogProducer` can be active at time
190 changes: 127 additions & 63 deletions logconsumer_test.go
Expand Up @@ -3,6 +3,7 @@ package testcontainers
import (
"context"
"fmt"
"github.com/stretchr/testify/require"
"net/http"
"testing"
"time"
Expand All @@ -19,23 +20,16 @@ type TestLogConsumer struct {
}

func (g *TestLogConsumer) Accept(l Log) {
if string(l.Content) == fmt.Sprintf("echo %s\n", lastMessage) {
s := string(l.Content)
if s == fmt.Sprintf("echo %s\n", lastMessage) {
g.Ack <- true
return
}

g.Msgs = append(g.Msgs, string(l.Content))
g.Msgs = append(g.Msgs, s)
}

func Test_LogConsumerGetsCalled(t *testing.T) {
t.Skip("This test is randomly failing for different versions of Go")
/*
send one request at a time to a server that will
print whatever was sent in the "echo" parameter, the log
consumer should get all of the messages and append them
to the Msgs slice
*/

ctx := context.Background()
req := ContainerRequest{
FromDockerfile: FromDockerfile{
Expand All @@ -52,56 +46,38 @@ func Test_LogConsumerGetsCalled(t *testing.T) {
}

c, err := GenericContainer(ctx, gReq)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

ep, err := c.Endpoint(ctx, "http")
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

g := TestLogConsumer{
Msgs: []string{},
Ack: make(chan bool),
}

err = c.StartLogProducer(ctx)
if err != nil {
t.Fatal(err)
}

c.FollowOutput(&g)

err = c.StartLogProducer(ctx)
require.NoError(t, err)

_, err = http.Get(ep + "/stdout?echo=hello")
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

_, err = http.Get(ep + "/stdout?echo=there")
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

time.Sleep(10 * time.Second)

_, err = http.Get(ep + fmt.Sprintf("/stdout?echo=%s", lastMessage))
if err != nil {
t.Fatal(err)
}
_, err = http.Get(ep + "/stdout?echo=" + lastMessage)
require.NoError(t, err)

select {
case <-g.Ack:
case <-time.After(5 * time.Second):
t.Fatal("never received final log message")
}
c.StopLogProducer()

// get rid of the server "ready" log
g.Msgs = g.Msgs[1:]

assert.DeepEqual(t, []string{"echo hello\n", "echo there\n"}, g.Msgs)
c.Terminate(ctx)
assert.NilError(t, c.StopLogProducer())
assert.DeepEqual(t, []string{"ready\n", "echo hello\n", "echo there\n"}, g.Msgs)
assert.NilError(t, c.Terminate(ctx))
}

type TestLogTypeConsumer struct {
Expand Down Expand Up @@ -135,49 +111,137 @@ func Test_ShouldRecognizeLogTypes(t *testing.T) {
}

c, err := GenericContainer(ctx, gReq)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

ep, err := c.Endpoint(ctx, "http")
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

g := TestLogTypeConsumer{
LogTypes: map[string]string{},
Ack: make(chan bool),
}

err = c.StartLogProducer(ctx)
if err != nil {
t.Fatal(err)
}

c.FollowOutput(&g)

err = c.StartLogProducer(ctx)
require.NoError(t, err)

_, err = http.Get(ep + "/stdout?echo=this-is-stdout")
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

_, err = http.Get(ep + "/stderr?echo=this-is-stderr")
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

_, err = http.Get(ep + fmt.Sprintf("/stdout?echo=%s", lastMessage))
if err != nil {
t.Fatal(err)
}
_, err = http.Get(ep + "/stdout?echo=" + lastMessage)
require.NoError(t, err)

<-g.Ack
c.StopLogProducer()
assert.NilError(t, c.StopLogProducer())

assert.DeepEqual(t, map[string]string{
StdoutLog: "echo this-is-stdout\n",
StderrLog: "echo this-is-stderr\n",
}, g.LogTypes)
c.Terminate(ctx)
assert.NilError(t, c.Terminate(ctx))
}

func Test_MultipleLogConsumers(t *testing.T) {
ctx := context.Background()
req := ContainerRequest{
FromDockerfile: FromDockerfile{
Context: "./testresources/",
Dockerfile: "echoserver.Dockerfile",
},
ExposedPorts: []string{"8080/tcp"},
WaitingFor: wait.ForLog("ready"),
}

gReq := GenericContainerRequest{
ContainerRequest: req,
Started: true,
}

c, err := GenericContainer(ctx, gReq)
require.NoError(t, err)

ep, err := c.Endpoint(ctx, "http")
require.NoError(t, err)

first := TestLogConsumer{Msgs: []string{}, Ack: make(chan bool)}
second := TestLogConsumer{Msgs: []string{}, Ack: make(chan bool)}

c.FollowOutput(&first)
c.FollowOutput(&second)

err = c.StartLogProducer(ctx)
require.NoError(t, err)

_, err = http.Get(ep + "/stdout?echo=mlem")
require.NoError(t, err)

_, err = http.Get(ep + "/stdout?echo=" + lastMessage)
require.NoError(t, err)

<-first.Ack
<-second.Ack
assert.NilError(t, c.StopLogProducer())

assert.DeepEqual(t, []string{"ready\n", "echo mlem\n"}, first.Msgs)
assert.DeepEqual(t, []string{"ready\n", "echo mlem\n"}, second.Msgs)
assert.NilError(t, c.Terminate(ctx))
}

func Test_StartStop(t *testing.T) {
ctx := context.Background()
req := ContainerRequest{
FromDockerfile: FromDockerfile{
Context: "./testresources/",
Dockerfile: "echoserver.Dockerfile",
},
ExposedPorts: []string{"8080/tcp"},
WaitingFor: wait.ForLog("ready"),
}

gReq := GenericContainerRequest{
ContainerRequest: req,
Started: true,
}

c, err := GenericContainer(ctx, gReq)
require.NoError(t, err)

ep, err := c.Endpoint(ctx, "http")
require.NoError(t, err)

g := TestLogConsumer{Msgs: []string{}, Ack: make(chan bool)}

c.FollowOutput(&g)

require.NoError(t, c.StopLogProducer(), "nothing should happen even if the producer is not started")
require.NoError(t, c.StartLogProducer(ctx))
require.Error(t, c.StartLogProducer(ctx), "log producer is already started")

_, err = http.Get(ep + "/stdout?echo=mlem")
require.NoError(t, err)

require.NoError(t, c.StopLogProducer())
require.NoError(t, c.StartLogProducer(ctx))

_, err = http.Get(ep + "/stdout?echo=mlem2")
require.NoError(t, err)

_, err = http.Get(ep + "/stdout?echo=" + lastMessage)
require.NoError(t, err)

<-g.Ack
// Do not close producer here, let's delegate it to c.Terminate

assert.DeepEqual(t, []string{
"ready\n",
"echo mlem\n",
"ready\n",
"echo mlem\n",
"echo mlem2\n",
}, g.Msgs)
assert.NilError(t, c.Terminate(ctx))
}
1 change: 1 addition & 0 deletions testresources/echoserver.go
Expand Up @@ -24,6 +24,7 @@ func echoHandler(destination *os.File) http.HandlerFunc {
l := log.New(destination, "echo ", 0)

l.Println(echo)
destination.Sync()

rw.WriteHeader(http.StatusAccepted)
}
Expand Down

0 comments on commit 65c2409

Please sign in to comment.