Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix deadlock waiting for attached-dependencies #10029

Merged
merged 1 commit into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/compose/up.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func runUp(ctx context.Context, backend api.Service, createOptions createOptions
ExitCodeFrom: upOptions.exitCodeFrom,
CascadeStop: upOptions.cascadeStop,
Wait: upOptions.wait,
Services: services,
},
})
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/compose/containers.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,14 @@ func (containers Containers) sorted() Containers {
})
return containers
}

func (containers Containers) remove(id string) Containers {
for i, c := range containers {
if c.ID == id {
l := len(containers) - 1
containers[i] = containers[l]
return containers[:l]
}
}
return containers
}
19 changes: 8 additions & 11 deletions pkg/compose/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,19 +65,16 @@ func (s *composeService) Logs(

if options.Follow {
printer := newLogPrinter(consumer)
eg.Go(func() error {
for _, c := range containers {
printer.HandleEvent(api.ContainerEvent{
Type: api.ContainerEventAttach,
Container: getContainerNameWithoutProject(c),
Service: c.Labels[api.ServiceLabel],
})
}
return nil
})
for _, c := range containers {
printer.HandleEvent(api.ContainerEvent{
Type: api.ContainerEventAttach,
Container: getContainerNameWithoutProject(c),
Service: c.Labels[api.ServiceLabel],
})
}

eg.Go(func() error {
return s.watchContainers(ctx, projectName, options.Services, printer.HandleEvent, containers, func(c types.Container) error {
return s.watchContainers(ctx, projectName, options.Services, nil, printer.HandleEvent, containers, func(c types.Container) error {
printer.HandleEvent(api.ContainerEvent{
Type: api.ContainerEventAttach,
Container: getContainerNameWithoutProject(c),
Expand Down
10 changes: 10 additions & 0 deletions pkg/compose/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,23 @@ type logPrinter interface {
HandleEvent(event api.ContainerEvent)
Run(ctx context.Context, cascadeStop bool, exitCodeFrom string, stopFn func() error) (int, error)
Cancel()
Stop()
}

type printer struct {
queue chan api.ContainerEvent
consumer api.LogConsumer
stopCh chan struct{}
}

// newLogPrinter builds a LogPrinter passing containers logs to LogConsumer
func newLogPrinter(consumer api.LogConsumer) logPrinter {
queue := make(chan api.ContainerEvent)
stopCh := make(chan struct{}, 1) // printer MAY stop on his own, so Stop MUST not be blocking
printer := printer{
consumer: consumer,
queue: queue,
stopCh: stopCh,
}
return &printer
}
Expand All @@ -51,6 +55,10 @@ func (p *printer) Cancel() {
}
}

func (p *printer) Stop() {
p.stopCh <- struct{}{}
}

func (p *printer) HandleEvent(event api.ContainerEvent) {
p.queue <- event
}
Expand All @@ -64,6 +72,8 @@ func (p *printer) Run(ctx context.Context, cascadeStop bool, exitCodeFrom string
containers := map[string]struct{}{}
for {
select {
case <-p.stopCh:
return exitCode, nil
case <-ctx.Done():
return exitCode, ctx.Err()
case event := <-p.queue:
Expand Down
59 changes: 28 additions & 31 deletions pkg/compose/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"

"github.com/compose-spec/compose-go/types"
"github.com/docker/compose/v2/pkg/utils"
moby "github.com/docker/docker/api/types"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -50,13 +51,6 @@ func (s *composeService) start(ctx context.Context, projectName string, options
}
}

if len(options.Services) > 0 {
err := project.ForServices(options.Services)
if err != nil {
return err
}
}

eg, ctx := errgroup.WithContext(ctx)
if listener != nil {
attached, err := s.attach(ctx, project, listener, options.AttachTo)
Expand All @@ -65,7 +59,7 @@ func (s *composeService) start(ctx context.Context, projectName string, options
}

eg.Go(func() error {
return s.watchContainers(context.Background(), project.Name, options.AttachTo, listener, attached, func(container moby.Container) error {
return s.watchContainers(context.Background(), project.Name, options.AttachTo, options.Services, listener, attached, func(container moby.Container) error {
return s.attachContainer(ctx, container, listener)
})
})
Expand Down Expand Up @@ -116,9 +110,20 @@ func getDependencyCondition(service types.ServiceConfig, project *types.Project)
type containerWatchFn func(container moby.Container) error

// watchContainers uses engine events to capture container start/die and notify ContainerEventListener
func (s *composeService) watchContainers(ctx context.Context, projectName string, services []string, listener api.ContainerEventListener, containers Containers, onStart containerWatchFn) error {
watched := map[string]int{}
func (s *composeService) watchContainers(ctx context.Context, projectName string, services, required []string,
listener api.ContainerEventListener, containers Containers, onStart containerWatchFn) error {
if len(required) == 0 {
required = services
}

var (
expected Containers
watched = map[string]int{}
)
for _, c := range containers {
if utils.Contains(required, c.Labels[api.ServiceLabel]) {
expected = append(expected, c)
}
watched[c.ID] = 0
}

Expand All @@ -143,22 +148,18 @@ func (s *composeService) watchContainers(ctx context.Context, projectName string
}
name := getContainerNameWithoutProject(container)

if event.Status == "stop" {
service := container.Labels[api.ServiceLabel]
switch event.Status {
case "stop":
listener(api.ContainerEvent{
Type: api.ContainerEventStopped,
Container: name,
Service: container.Labels[api.ServiceLabel],
Service: service,
})

delete(watched, container.ID)
if len(watched) == 0 {
// all project containers stopped, we're done
stop()
}
return nil
}

if event.Status == "die" {
expected = expected.remove(container.ID)
case "die":
restarted := watched[container.ID]
watched[container.ID] = restarted + 1
// Container terminated.
Expand All @@ -167,29 +168,23 @@ func (s *composeService) watchContainers(ctx context.Context, projectName string
listener(api.ContainerEvent{
Type: api.ContainerEventExit,
Container: name,
Service: container.Labels[api.ServiceLabel],
Service: service,
ExitCode: inspected.State.ExitCode,
Restarting: willRestart,
})

if !willRestart {
// we're done with this one
delete(watched, container.ID)
expected = expected.remove(container.ID)
}

if len(watched) == 0 {
// all project containers stopped, we're done
stop()
}
return nil
}

if event.Status == "start" {
case "start":
count, ok := watched[container.ID]
mustAttach := ok && count > 0 // Container restarted, need to re-attach
if !ok {
// A new container has just been added to service by scale
watched[container.ID] = 0
expected = append(expected, container)
mustAttach = true
}
if mustAttach {
Expand All @@ -200,7 +195,9 @@ func (s *composeService) watchContainers(ctx context.Context, projectName string
}
}
}

if len(expected) == 0 {
stop()
}
return nil
},
})
Expand Down
6 changes: 3 additions & 3 deletions pkg/compose/up.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@ import (
"os/signal"
"syscall"

"github.com/docker/compose/v2/pkg/api"
"github.com/docker/compose/v2/pkg/progress"

"github.com/compose-spec/compose-go/types"
"github.com/docker/cli/cli"
"github.com/docker/compose/v2/pkg/api"
"github.com/docker/compose/v2/pkg/progress"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -92,6 +91,7 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
return err
}

printer.Stop()
err = eg.Wait()
if exitCode != 0 {
errMsg := ""
Expand Down
10 changes: 10 additions & 0 deletions pkg/e2e/compose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,13 @@ networks:
name: compose-e2e-convert-interpolate_default`, filepath.Join(wd, "fixtures", "simple-build-test", "nginx-build")), ExitCode: 0})
})
}

func TestStopWithDependeciesAttached(t *testing.T) {
const projectName = "compose-e2e-stop-with-deps"
c := NewParallelCLI(t, WithEnv("COMMAND=echo hello"))

t.Run("up", func(t *testing.T) {
res := c.RunDockerComposeCmd(t, "-f", "./fixtures/dependencies/compose.yaml", "-p", projectName, "up", "--attach-dependencies", "foo")
res.Assert(t, icmd.Expected{Out: "exited with code 0"})
})
}
2 changes: 2 additions & 0 deletions pkg/e2e/fixtures/dependencies/compose.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
services:
foo:
image: nginx:alpine
command: "${COMMAND}"
depends_on:
- bar

bar:
image: nginx:alpine
scale: 2