Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

limit build concurrency according to --parallel #10133

Merged
merged 2 commits into from Jan 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions cmd/compose/compose.go
Expand Up @@ -22,6 +22,7 @@ import (
"os"
"os/signal"
"path/filepath"
"strconv"
"strings"
"syscall"

Expand Down Expand Up @@ -324,6 +325,13 @@ func RootCommand(streams api.Streams, backend api.Service) *cobra.Command { //no
return err
}
}
if v, ok := os.LookupEnv("COMPOSE_PARALLEL_LIMIT"); ok && !cmd.Flags().Changed("parallel") {
i, err := strconv.Atoi(v)
if err != nil {
return fmt.Errorf("COMPOSE_PARALLEL_LIMIT must be an integer (found: %q)", v)
}
parallel = i
}
if parallel > 0 {
backend.MaxConcurrency(parallel)
}
Expand Down
25 changes: 14 additions & 11 deletions pkg/compose/build.go
Expand Up @@ -51,14 +51,16 @@ func (s *composeService) build(ctx context.Context, project *types.Project, opti
opts := map[string]build.Options{}
args := flatten(options.Args.Resolve(envResolver(project.Environment)))

services, err := project.GetServices(options.Services...)
if err != nil {
return err
}

for _, service := range services {
return InDependencyOrder(ctx, project, func(ctx context.Context, name string) error {
if len(options.Services) > 0 && !utils.Contains(options.Services, name) {
return nil
}
service, err := project.GetService(name)
if err != nil {
return err
}
if service.Build == nil {
continue
return nil
}
imageName := api.GetImageNameOrDefault(service, project.Name)
buildOptions, err := s.toBuildOptions(project, service, imageName, options.SSHs)
Expand Down Expand Up @@ -91,10 +93,11 @@ func (s *composeService) build(ctx context.Context, project *types.Project, opti
}}
}
opts[imageName] = buildOptions
}

_, err = s.doBuild(ctx, project, opts, options.Progress)
return err
_, err = s.doBuild(ctx, project, opts, options.Progress)
return err
}, func(traversal *graphTraversal) {
traversal.maxConcurrency = s.maxConcurrency
})
}

func (s *composeService) ensureImagesExists(ctx context.Context, project *types.Project, quietPull bool) error {
Expand Down
9 changes: 8 additions & 1 deletion pkg/compose/dependencies.go
Expand Up @@ -47,7 +47,8 @@ type graphTraversal struct {
targetServiceStatus ServiceStatus
adjacentServiceStatusToSkip ServiceStatus

visitorFn func(context.Context, string) error
visitorFn func(context.Context, string) error
maxConcurrency int
}

func upDirectionTraversal(visitorFn func(context.Context, string) error) *graphTraversal {
Expand Down Expand Up @@ -79,6 +80,9 @@ func InDependencyOrder(ctx context.Context, project *types.Project, fn func(cont
return err
}
t := upDirectionTraversal(fn)
for _, option := range options {
option(t)
}
return t.visit(ctx, graph)
}

Expand All @@ -96,6 +100,9 @@ func (t *graphTraversal) visit(ctx context.Context, g *Graph) error {
nodes := t.extremityNodesFn(g)

eg, ctx := errgroup.WithContext(ctx)
if t.maxConcurrency > 0 {
eg.SetLimit(t.maxConcurrency)
}
t.run(ctx, g, eg, nodes)

return eg.Wait()
Expand Down