Skip to content

Commit

Permalink
fix deadlock waiting for attached-dependencies
Browse files Browse the repository at this point in the history
Signed-off-by: Nicolas De Loof <nicolas.deloof@gmail.com>
  • Loading branch information
ndeloof committed Dec 7, 2022
1 parent 6b4ad0d commit 9fa1945
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 40 deletions.
1 change: 1 addition & 0 deletions cmd/compose/up.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ func runUp(ctx context.Context, backend api.Service, createOptions createOptions
AttachTo: attachTo,
ExitCodeFrom: upOptions.exitCodeFrom,
CascadeStop: upOptions.cascadeStop,
Services: services,
Wait: upOptions.wait,
},
})
Expand Down
5 changes: 4 additions & 1 deletion pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,10 @@ type LogConsumer interface {
}

// ContainerEventListener is a callback to process ContainerEvent from services
type ContainerEventListener func(event ContainerEvent)
type ContainerEventListener interface {
HandleEvent(event ContainerEvent)
Stop()
}

// ContainerEvent notify an event has been collected on source container implementing Service
type ContainerEvent struct {
Expand Down
6 changes: 3 additions & 3 deletions pkg/compose/attach.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (s *composeService) attach(ctx context.Context, project *types.Project, lis
return nil, err
}

containers.sorted() // This enforce predictable colors assignment
containers.sorted() // Enforce a predictable colors assignment

var names []string
for _, c := range containers {
Expand All @@ -60,14 +60,14 @@ func (s *composeService) attachContainer(ctx context.Context, container moby.Con
serviceName := container.Labels[api.ServiceLabel]
containerName := getContainerNameWithoutProject(container)

listener(api.ContainerEvent{
listener.HandleEvent(api.ContainerEvent{
Type: api.ContainerEventAttach,
Container: containerName,
Service: serviceName,
})

w := utils.GetWriter(func(line string) {
listener(api.ContainerEvent{
listener.HandleEvent(api.ContainerEvent{
Type: api.ContainerEventLog,
Container: containerName,
Service: serviceName,
Expand Down
2 changes: 1 addition & 1 deletion pkg/compose/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (s *composeService) Logs(
})

eg.Go(func() error {
return s.watchContainers(ctx, projectName, options.Services, printer.HandleEvent, containers, func(c types.Container) error {
return s.watchContainers(ctx, projectName, options.Services, options.Services, printer, containers, func(c types.Container) error {
printer.HandleEvent(api.ContainerEvent{
Type: api.ContainerEventAttach,
Container: getContainerNameWithoutProject(c),
Expand Down
12 changes: 7 additions & 5 deletions pkg/compose/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ import (
type logPrinter interface {
HandleEvent(event api.ContainerEvent)
Run(ctx context.Context, cascadeStop bool, exitCodeFrom string, stopFn func() error) (int, error)
Cancel()
Stop()
}

type printer struct {
queue chan api.ContainerEvent
consumer api.LogConsumer
stopC chan struct{}
}

// newLogPrinter builds a LogPrinter passing containers logs to LogConsumer
Expand All @@ -41,14 +42,13 @@ func newLogPrinter(consumer api.LogConsumer) logPrinter {
printer := printer{
consumer: consumer,
queue: queue,
stopC: make(chan struct{}, 1),
}
return &printer
}

func (p *printer) Cancel() {
p.queue <- api.ContainerEvent{
Type: api.UserCancel,
}
func (p *printer) Stop() {
p.stopC <- struct{}{}
}

func (p *printer) HandleEvent(event api.ContainerEvent) {
Expand All @@ -66,6 +66,8 @@ func (p *printer) Run(ctx context.Context, cascadeStop bool, exitCodeFrom string
select {
case <-ctx.Done():
return exitCode, ctx.Err()
case <-p.stopC:
return exitCode, nil
case event := <-p.queue:
container := event.Container
switch event.Type {
Expand Down
89 changes: 62 additions & 27 deletions pkg/compose/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,16 @@ import (

"github.com/docker/compose/v2/pkg/api"
"github.com/docker/compose/v2/pkg/progress"
"github.com/docker/compose/v2/pkg/utils"
)

func (s *composeService) Start(ctx context.Context, projectName string, options api.StartOptions) error {
return progress.Run(ctx, func(ctx context.Context) error {
return s.start(ctx, strings.ToLower(projectName), options, nil)
return s.start(ctx, strings.ToLower(projectName), options, nil, nil)
})
}

func (s *composeService) start(ctx context.Context, projectName string, options api.StartOptions, listener api.ContainerEventListener) error {
func (s *composeService) start(ctx context.Context, projectName string, options api.StartOptions, listener api.ContainerEventListener, stopC chan struct{}) error {
project := options.Project
if project == nil {
var containers Containers
Expand All @@ -57,15 +58,17 @@ func (s *composeService) start(ctx context.Context, projectName string, options
}
}

watchCtx, cancelFunc := context.WithCancel(ctx)
defer cancelFunc()
eg, ctx := errgroup.WithContext(ctx)
if listener != nil {
attached, err := s.attach(ctx, project, listener, options.AttachTo)
containers, err := s.attach(ctx, project, listener, options.AttachTo)
if err != nil {
return err
}

eg.Go(func() error {
return s.watchContainers(context.Background(), project.Name, options.AttachTo, listener, attached, func(container moby.Container) error {
return s.watchContainers(watchCtx, project.Name, options.AttachTo, options.Services, listener, containers, func(container moby.Container) error {
return s.attachContainer(ctx, container, listener)
})
})
Expand Down Expand Up @@ -96,7 +99,23 @@ func (s *composeService) start(ctx context.Context, projectName string, options
}
}

return eg.Wait()
doneCh := make(chan error)
go func() {
// errgroup unfortunately don't let us use `case <-ctx.Done()` to detect all tasks completed
err = eg.Wait()
doneCh <- err
}()

select {
case <-stopC:
cancelFunc()
return nil
case err := <-doneCh:
if listener != nil {
listener.Stop()
}
return err
}
}

// getDependencyCondition checks if service is depended on by other services
Expand All @@ -116,10 +135,21 @@ func getDependencyCondition(service types.ServiceConfig, project *types.Project)
type containerWatchFn func(container moby.Container) error

// watchContainers uses engine events to capture container start/die and notify ContainerEventListener
func (s *composeService) watchContainers(ctx context.Context, projectName string, services []string, listener api.ContainerEventListener, containers Containers, onStart containerWatchFn) error {
func (s *composeService) watchContainers(ctx context.Context, projectName string,
services []string, required []string,
listener api.ContainerEventListener, containers Containers, onStart containerWatchFn) error {
if len(required) == 0 {
required = services
}
// watched track all containers we are watching and how many times they restarted
watched := map[string]int{}
// count track the number of required containers we watch
count := 0
for _, c := range containers {
watched[c.ID] = 0
if utils.Contains(required, c.Labels[api.ServiceLabel]) {
count++
}
}

ctx, stop := context.WithCancel(ctx)
Expand All @@ -142,32 +172,34 @@ func (s *composeService) watchContainers(ctx context.Context, projectName string
Labels: inspected.Config.Labels,
}
name := getContainerNameWithoutProject(container)
service := container.Labels[api.ServiceLabel]

if event.Status == "stop" {
listener(api.ContainerEvent{
switch event.Status {
case "stop":
listener.HandleEvent(api.ContainerEvent{
Type: api.ContainerEventStopped,
Container: name,
Service: container.Labels[api.ServiceLabel],
Service: service,
})

delete(watched, container.ID)
if len(watched) == 0 {
// all project containers stopped, we're done
stop()
if utils.Contains(required, service) {
count--
if count == 0 {
// all project containers stopped, we're done
stop()
}
}
return nil
}

if event.Status == "die" {
case "die":
restarted := watched[container.ID]
watched[container.ID] = restarted + 1
// Container terminated.
willRestart := willContainerRestart(inspected, restarted)

listener(api.ContainerEvent{
listener.HandleEvent(api.ContainerEvent{
Type: api.ContainerEventExit,
Container: name,
Service: container.Labels[api.ServiceLabel],
Service: service,
ExitCode: inspected.State.ExitCode,
Restarting: willRestart,
})
Expand All @@ -177,19 +209,22 @@ func (s *composeService) watchContainers(ctx context.Context, projectName string
delete(watched, container.ID)
}

if len(watched) == 0 {
// all project containers stopped, we're done
stop()
if utils.Contains(required, service) {
count--
if count == 0 {
// all project containers stopped, we're done
stop()
}
}
return nil
}

if event.Status == "start" {
count, ok := watched[container.ID]
mustAttach := ok && count > 0 // Container restarted, need to re-attach
case "start":
restarted, ok := watched[container.ID]
mustAttach := ok && restarted > 0 // Container restarted, need to re-attach
if !ok {
// A new container has just been added to service by scale
watched[container.ID] = 0
if utils.Contains(required, service) {
count++
}
mustAttach = true
}
if mustAttach {
Expand Down
11 changes: 8 additions & 3 deletions pkg/compose/up.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
return err
}
if options.Start.Attach == nil {
return s.start(ctx, project.Name, options.Start, nil)
return s.start(ctx, project.Name, options.Start, nil, nil)
}
return nil
})
Expand All @@ -51,6 +51,7 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
}

printer := newLogPrinter(options.Start.Attach)
stopC := make(chan struct{})

signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
Expand All @@ -74,9 +75,13 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
}
go func() {
<-signalChan
printer.Cancel()
printer.HandleEvent(api.ContainerEvent{
Type: api.UserCancel,
})
fmt.Println("Gracefully stopping... (press Ctrl+C again to force)")
stopC <- struct{}{}
stopFunc() //nolint:errcheck
printer.Stop()
}()

var exitCode int
Expand All @@ -87,7 +92,7 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
return err
})

err = s.start(ctx, project.Name, options.Start, printer.HandleEvent)
err = s.start(ctx, project.Name, options.Start, printer, stopC)
if err != nil {
return err
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/e2e/compose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,13 @@ networks:
name: compose-e2e-convert-interpolate_default`, filepath.Join(wd, "fixtures", "simple-build-test", "nginx-build")), ExitCode: 0})
})
}

func TestStopWithDependeciesAttached(t *testing.T) {
const projectName = "compose-e2e-stop-with-deps"
c := NewParallelCLI(t, WithEnv("COMMAND=echo hello"))

t.Run("up", func(t *testing.T) {
res := c.RunDockerComposeCmd(t, "-f", "./fixtures/dependencies/compose.yaml", "-p", projectName, "up", "--attach-dependencies", "foo")
res.Assert(t, icmd.Expected{Out: "exited with code 0"})
})
}
1 change: 1 addition & 0 deletions pkg/e2e/fixtures/dependencies/compose.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
services:
foo:
image: nginx:alpine
command: "${COMMAND}"
depends_on:
- bar

Expand Down

0 comments on commit 9fa1945

Please sign in to comment.