Skip to content

Commit

Permalink
detect required service are gone to stop watching
Browse files Browse the repository at this point in the history
explicit API to stop the log printer

Signed-off-by: Nicolas De Loof <nicolas.deloof@gmail.com>
  • Loading branch information
ndeloof committed Dec 7, 2022
1 parent 6b4ad0d commit 76c3208
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 45 deletions.
1 change: 1 addition & 0 deletions cmd/compose/up.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func runUp(ctx context.Context, backend api.Service, createOptions createOptions
ExitCodeFrom: upOptions.exitCodeFrom,
CascadeStop: upOptions.cascadeStop,
Wait: upOptions.wait,
Services: services,
},
})
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/compose/containers.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,14 @@ func (containers Containers) sorted() Containers {
})
return containers
}

func (containers Containers) remove(id string) Containers {
for i, c := range containers {
if c.ID == id {
l := len(containers) - 1
containers[i] = containers[l]
return containers[:l]
}
}
return containers
}
19 changes: 8 additions & 11 deletions pkg/compose/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,19 +65,16 @@ func (s *composeService) Logs(

if options.Follow {
printer := newLogPrinter(consumer)
eg.Go(func() error {
for _, c := range containers {
printer.HandleEvent(api.ContainerEvent{
Type: api.ContainerEventAttach,
Container: getContainerNameWithoutProject(c),
Service: c.Labels[api.ServiceLabel],
})
}
return nil
})
for _, c := range containers {
printer.HandleEvent(api.ContainerEvent{
Type: api.ContainerEventAttach,
Container: getContainerNameWithoutProject(c),
Service: c.Labels[api.ServiceLabel],
})
}

eg.Go(func() error {
return s.watchContainers(ctx, projectName, options.Services, printer.HandleEvent, containers, func(c types.Container) error {
return s.watchContainers(ctx, projectName, options.Services, nil, printer.HandleEvent, containers, func(c types.Container) error {
printer.HandleEvent(api.ContainerEvent{
Type: api.ContainerEventAttach,
Container: getContainerNameWithoutProject(c),
Expand Down
10 changes: 10 additions & 0 deletions pkg/compose/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,23 @@ type logPrinter interface {
HandleEvent(event api.ContainerEvent)
Run(ctx context.Context, cascadeStop bool, exitCodeFrom string, stopFn func() error) (int, error)
Cancel()
Stop()
}

type printer struct {
queue chan api.ContainerEvent
consumer api.LogConsumer
stopCh chan struct{}
}

// newLogPrinter builds a LogPrinter passing containers logs to LogConsumer
func newLogPrinter(consumer api.LogConsumer) logPrinter {
queue := make(chan api.ContainerEvent)
stopCh := make(chan struct{}, 1) // printer MAY stop on his own, so Stop MUST not be blocking
printer := printer{
consumer: consumer,
queue: queue,
stopCh: stopCh,
}
return &printer
}
Expand All @@ -51,6 +55,10 @@ func (p *printer) Cancel() {
}
}

func (p *printer) Stop() {
p.stopCh <- struct{}{}
}

func (p *printer) HandleEvent(event api.ContainerEvent) {
p.queue <- event
}
Expand All @@ -64,6 +72,8 @@ func (p *printer) Run(ctx context.Context, cascadeStop bool, exitCodeFrom string
containers := map[string]struct{}{}
for {
select {
case <-p.stopCh:
return exitCode, nil
case <-ctx.Done():
return exitCode, ctx.Err()
case event := <-p.queue:
Expand Down
58 changes: 27 additions & 31 deletions pkg/compose/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"

"github.com/compose-spec/compose-go/types"
"github.com/docker/compose/v2/pkg/utils"
moby "github.com/docker/docker/api/types"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -50,13 +51,6 @@ func (s *composeService) start(ctx context.Context, projectName string, options
}
}

if len(options.Services) > 0 {
err := project.ForServices(options.Services)
if err != nil {
return err
}
}

eg, ctx := errgroup.WithContext(ctx)
if listener != nil {
attached, err := s.attach(ctx, project, listener, options.AttachTo)
Expand All @@ -65,7 +59,7 @@ func (s *composeService) start(ctx context.Context, projectName string, options
}

eg.Go(func() error {
return s.watchContainers(context.Background(), project.Name, options.AttachTo, listener, attached, func(container moby.Container) error {
return s.watchContainers(context.Background(), project.Name, options.AttachTo, options.Services, listener, attached, func(container moby.Container) error {
return s.attachContainer(ctx, container, listener)
})
})
Expand Down Expand Up @@ -116,9 +110,19 @@ func getDependencyCondition(service types.ServiceConfig, project *types.Project)
type containerWatchFn func(container moby.Container) error

// watchContainers uses engine events to capture container start/die and notify ContainerEventListener
func (s *composeService) watchContainers(ctx context.Context, projectName string, services []string, listener api.ContainerEventListener, containers Containers, onStart containerWatchFn) error {
watched := map[string]int{}
func (s *composeService) watchContainers(ctx context.Context, projectName string, services []string, required []string, listener api.ContainerEventListener, containers Containers, onStart containerWatchFn) error {
if len(required) == 0 {
required = services
}

var (
expected Containers
watched = map[string]int{}
)
for _, c := range containers {
if utils.Contains(required, c.Labels[api.ServiceLabel]) {
expected = append(expected, c)
}
watched[c.ID] = 0
}

Expand All @@ -143,22 +147,18 @@ func (s *composeService) watchContainers(ctx context.Context, projectName string
}
name := getContainerNameWithoutProject(container)

if event.Status == "stop" {
service := container.Labels[api.ServiceLabel]
switch event.Status {
case "stop":
listener(api.ContainerEvent{
Type: api.ContainerEventStopped,
Container: name,
Service: container.Labels[api.ServiceLabel],
Service: service,
})

delete(watched, container.ID)
if len(watched) == 0 {
// all project containers stopped, we're done
stop()
}
return nil
}

if event.Status == "die" {
expected = expected.remove(container.ID)
case "die":
restarted := watched[container.ID]
watched[container.ID] = restarted + 1
// Container terminated.
Expand All @@ -167,29 +167,23 @@ func (s *composeService) watchContainers(ctx context.Context, projectName string
listener(api.ContainerEvent{
Type: api.ContainerEventExit,
Container: name,
Service: container.Labels[api.ServiceLabel],
Service: service,
ExitCode: inspected.State.ExitCode,
Restarting: willRestart,
})

if !willRestart {
// we're done with this one
delete(watched, container.ID)
expected = expected.remove(container.ID)
}

if len(watched) == 0 {
// all project containers stopped, we're done
stop()
}
return nil
}

if event.Status == "start" {
case "start":
count, ok := watched[container.ID]
mustAttach := ok && count > 0 // Container restarted, need to re-attach
if !ok {
// A new container has just been added to service by scale
watched[container.ID] = 0
expected = append(expected, container)
mustAttach = true
}
if mustAttach {
Expand All @@ -200,7 +194,9 @@ func (s *composeService) watchContainers(ctx context.Context, projectName string
}
}
}

if len(expected) == 0 {
stop()
}
return nil
},
})
Expand Down
6 changes: 3 additions & 3 deletions pkg/compose/up.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@ import (
"os/signal"
"syscall"

"github.com/docker/compose/v2/pkg/api"
"github.com/docker/compose/v2/pkg/progress"

"github.com/compose-spec/compose-go/types"
"github.com/docker/cli/cli"
"github.com/docker/compose/v2/pkg/api"
"github.com/docker/compose/v2/pkg/progress"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -92,6 +91,7 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
return err
}

printer.Stop()
err = eg.Wait()
if exitCode != 0 {
errMsg := ""
Expand Down
2 changes: 2 additions & 0 deletions pkg/e2e/fixtures/dependencies/compose.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
services:
foo:
image: nginx:alpine
command: "${COMMAND}"
depends_on:
- bar

bar:
image: nginx:alpine
scale: 2

0 comments on commit 76c3208

Please sign in to comment.