Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

shimv2: support attachable binary I/O #10020

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
196 changes: 196 additions & 0 deletions cmd/containerd-shim-runc-v2/process/io.go
Expand Up @@ -32,6 +32,8 @@ import (
"syscall"
"time"

"github.com/containerd/containerd/v2/defaults"
"github.com/containerd/containerd/v2/pkg/cio"
"github.com/containerd/containerd/v2/pkg/namespaces"
"github.com/containerd/containerd/v2/pkg/stdio"
"github.com/containerd/fifo"
Expand Down Expand Up @@ -107,6 +109,16 @@ func createIO(ctx context.Context, id string, ioUID, ioGID int, stdio stdio.Stdi
pio.io, err = runc.NewPipeIO(ioUID, ioGID, withConditionalIO(stdio))
case "binary":
pio.io, err = NewBinaryIO(ctx, id, u)
case "attachablebinary":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs godoc and test

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test added.
I am not really sure where to add godoc cc @AkihiroSuda

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

godoc added

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can merge attachablebinary with existing one, binary, by using reserve param key like _attach=true.

cc @AkihiroSuda @dmcgowan

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intended to merge them but I think it will introduce a non-negligible complexity and more responsibilities to NewBinaryIO() func so I prefer to make a seperate function

Copy link
Member

@mxpv mxpv Apr 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can merge attachablebinary with existing one, binary, by using reserve param key like _attach=true.

That was my thinking too. "attachablebinary" is extended case of "binary", I like the idea of introducing attach flag. The code is indeed complex and needs some refactoring.

var fifoSet *cio.FIFOSet
fifoSet, err = NewAttachableBinary(ctx, id, u)
if err != nil {
return nil, err
}
pio.stdio.Stdout = fifoSet.Stdout
pio.stdio.Stderr = fifoSet.Stderr
pio.copy = true
pio.io, err = runc.NewPipeIO(ioUID, ioGID, withConditionalIO(stdio))
case "file":
filePath := u.Path
if err := os.MkdirAll(filepath.Dir(filePath), 0755); err != nil {
Expand Down Expand Up @@ -243,6 +255,16 @@ func (c *countingWriteCloser) Close() error {
return c.WriteCloser.Close()
}

type pipesReader struct {
stdout io.ReadCloser
stderr io.ReadCloser
}

type pipesWriter struct {
stdout io.WriteCloser
stderr io.WriteCloser
}

// NewBinaryIO runs a custom binary process for pluggable shim logging
func NewBinaryIO(ctx context.Context, id string, uri *url.URL) (_ runc.IO, err error) {
ns, err := namespaces.NamespaceRequired(ctx)
Expand Down Expand Up @@ -307,6 +329,180 @@ func NewBinaryIO(ctx context.Context, id string, uri *url.URL) (_ runc.IO, err e
}, nil
}

// NewAttachableBinary runs a custom binary process and opens attachable fifos for pluggable shim logging
func NewAttachableBinary(ctx context.Context, id string, uri *url.URL) (_ *cio.FIFOSet, err error) {
binaryFifos, err := cio.NewFIFOSetInDir(defaults.DefaultFIFODir, id, true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why terminal=true here?

if err != nil {
return nil, fmt.Errorf("failed to create birany fifos: %w", err)
}

binaryFifosPipes, err := openBinaryFifos(ctx, binaryFifos)
if err != nil {
return nil, err
}

attachableFifos, err := cio.NewFIFOSetInDir(filepath.Join(defaults.DefaultFIFODir, "attach"), id, true)
if err != nil {
return nil, fmt.Errorf("failed to create attachable fifos: %w", err)
}
type closer func() error
var closers []closer

defer func() {
if err == nil {
return
}
result := []error{err}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use named var defined in return parameters.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is better now ?

for _, fn := range closers {
result = append(result, fn())
}
err = errors.Join(result...)
}()

pipesAttachableFifosPipes, err := openAttachableFifos(ctx, attachableFifos)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it be problematic if there are no consumers of these FIFOs? Pipe has limited capacity and when it's full the writing side may block or fail.

Copy link
Member Author

@fahedouch fahedouch May 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by default fifo would block when it's full, but with O_NONBLOCK flag coming data will be lost and new data will be queued to the writer(fifo), so attaching tails logs and this is the intended behavior

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

according to pipe(7):

A pipe has a limited capacity. If the pipe is full, then a
write(2) will block or fail, depending on whether the O_NONBLOCK
flag is set (see below).

IIRC non-block flag will simply result in write failure rather than blocking the caller. Either way the result is not favorable.

Not sure if i misunderstood this :)

if err != nil {
return nil, err
}
closers = append(closers, []closer{pipesAttachableFifosPipes.stdout.Close, pipesAttachableFifosPipes.stderr.Close}...)

binaryOut, err := newPipe()
if err != nil {
return nil, fmt.Errorf("failed to create binary stdout pipes: %w", err)
}
closers = append(closers, binaryOut.Close)

binarySerr, err := newPipe()
if err != nil {
return nil, fmt.Errorf("failed to create binary stderr pipes: %w", err)
}
closers = append(closers, binarySerr.Close)

r, w, err := os.Pipe()
if err != nil {
return nil, err
}
closers = append(closers, r.Close, w.Close)

ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}

cmd := NewBinaryCmd(uri, id, ns)
cmd.ExtraFiles = append(cmd.ExtraFiles, binaryOut.r, binarySerr.r, w)
// don't need to register this with the reaper or wait when
// running inside a shim
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("failed to start binary process: %w", err)
}
closers = append(closers, func() error { return cmd.Process.Kill() })

// close our side of the pipe after start
if err := w.Close(); err != nil {
return nil, fmt.Errorf("failed to close write pipe after start: %w", err)
}

// wait for the logging binary to be ready
b := make([]byte, 1)
if _, err := r.Read(b); err != nil && err != io.EOF {
return nil, fmt.Errorf("failed to read from logging binary: %w", err)
}
stdoutWriters := io.MultiWriter(binaryOut.w, pipesAttachableFifosPipes.stdout)
stderrWriters := io.MultiWriter(binarySerr.w, pipesAttachableFifosPipes.stderr)

var wg sync.WaitGroup
wg.Add(2)
go func() {
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
if _, err := io.Copy(stdoutWriters, binaryFifosPipes.stdout); err != nil {
log.G(ctx).Debug(err.Error())
log.G(ctx).Warn("error copying stdout")
}
wg.Done()
binaryOut.w.Close()
pipesAttachableFifosPipes.stdout.Close()
}()
go func() {
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
if _, err := io.Copy(stderrWriters, binaryFifosPipes.stderr); err != nil {
log.G(ctx).Debug(err.Error())
log.G(ctx).Warn("error copying stdout")
}
wg.Done()
binarySerr.w.Close()
pipesAttachableFifosPipes.stderr.Close()
}()
go func() {
wg.Wait()
// Send SIGTERM first, so logger process has a chance to flush and exit properly
if err := cmd.Process.Signal(syscall.SIGTERM); err != nil {
log.L.WithError(err).Warn("failed to send SIGTERM")
if err := cmd.Process.Kill(); err != nil {
log.L.WithError(err).Warn("failed to kill process after faulty SIGTERM")
}
return
}

done := make(chan error, 1)
go func() {
done <- cmd.Wait()
}()

select {
case err := <-done:
log.L.WithError(err).Warn("failed to kill shim logger process")
case <-time.After(binaryIOProcTermTimeout):
log.L.Warn("failed to wait for shim logger process to exit, killing")
err := cmd.Process.Kill()
if err != nil {
log.L.WithError(err).Warn("failed to kill shim logger process")
}
}
}()
return binaryFifos, nil
}

func openBinaryFifos(ctx context.Context, fifos *cio.FIFOSet) (f pipesReader, retErr error) {

if fifos.Stdout != "" {
if f.stdout, retErr = fifo.OpenFifo(ctx, fifos.Stdout, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); retErr != nil {
return f, fmt.Errorf("failed to open stdout fifo: %w", retErr)
}
}
if fifos.Stderr != "" {
defer func() {
if retErr != nil {
f.stdout.Close()
}
}()
if f.stderr, retErr = fifo.OpenFifo(ctx, fifos.Stderr, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); retErr != nil {
return f, fmt.Errorf("failed to open stderr fifo: %w", retErr)
}
}
return f, nil
}

func openAttachableFifos(ctx context.Context, fifos *cio.FIFOSet) (f pipesWriter, retErr error) {
if fifos.Stdout != "" {
if f.stdout, retErr = fifo.OpenFifo(ctx, fifos.Stdout, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); retErr != nil {
return f, fmt.Errorf("failed to open stdout fifo: %w", retErr)
}
}
if fifos.Stderr != "" {
defer func() {
if retErr != nil {
f.stdout.Close()
}
}()
if f.stderr, retErr = fifo.OpenFifo(ctx, fifos.Stderr, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); retErr != nil {
return f, fmt.Errorf("failed to open stderr fifo: %w", retErr)
}
}
return f, nil
}

type binaryIO struct {
cmd *exec.Cmd
out, err *pipe
Expand Down
20 changes: 20 additions & 0 deletions cmd/containerd-shim-runc-v2/process/io_test.go
Expand Up @@ -25,6 +25,7 @@ import (
"testing"

"github.com/containerd/containerd/v2/pkg/namespaces"
"github.com/containerd/containerd/v2/pkg/testutil"
)

func TestNewBinaryIO(t *testing.T) {
Expand All @@ -49,6 +50,25 @@ func TestNewBinaryIO(t *testing.T) {
}
}

func TestNewAttachableBinaryIO(t *testing.T) {
testutil.RequiresRoot(t)

ctx := namespaces.WithNamespace(context.Background(), "test")
uri, _ := url.Parse("attachablebinary:///bin/echo?test")

before := descriptorCount(t)

_, err := NewAttachableBinary(ctx, "1", uri)
if err != nil {
t.Fatal(err)
}

after := descriptorCount(t)
if after != before+9 {
t.Fatalf("descriptors weren't created correctly (%d != %d -1)", before, after)
}
}

func TestNewBinaryIOCleanup(t *testing.T) {
ctx := namespaces.WithNamespace(context.Background(), "test")
uri, _ := url.Parse("binary:///not/existing")
Expand Down