Skip to content

Commit

Permalink
support attachable binary I/O
Browse files Browse the repository at this point in the history
Signed-off-by: fahed dorgaa <fahed.dorgaa@gmail.com>
  • Loading branch information
fahedouch committed Apr 24, 2024
1 parent 124456e commit cd48cde
Show file tree
Hide file tree
Showing 2 changed files with 220 additions and 4 deletions.
204 changes: 200 additions & 4 deletions cmd/containerd-shim-runc-v2/process/io.go
Expand Up @@ -32,6 +32,8 @@ import (
"syscall"
"time"

"github.com/containerd/containerd/v2/defaults"
"github.com/containerd/containerd/v2/pkg/cio"
"github.com/containerd/containerd/v2/pkg/namespaces"
"github.com/containerd/containerd/v2/pkg/stdio"
"github.com/containerd/fifo"
Expand Down Expand Up @@ -107,6 +109,16 @@ func createIO(ctx context.Context, id string, ioUID, ioGID int, stdio stdio.Stdi
pio.io, err = runc.NewPipeIO(ioUID, ioGID, withConditionalIO(stdio))
case "binary":
pio.io, err = NewBinaryIO(ctx, id, u)
case "attachablebinary":
var fifoSet *cio.FIFOSet
fifoSet, err = NewAttachableBinary(ctx, id, u)
if err != nil {
return nil, err
}
pio.stdio.Stdout = fifoSet.Stdout
pio.stdio.Stderr = fifoSet.Stderr
pio.copy = true
pio.io, err = runc.NewPipeIO(ioUID, ioGID, withConditionalIO(stdio))
case "file":
filePath := u.Path
if err := os.MkdirAll(filepath.Dir(filePath), 0755); err != nil {
Expand Down Expand Up @@ -243,23 +255,33 @@ func (c *countingWriteCloser) Close() error {
return c.WriteCloser.Close()
}

type pipesReader struct {
stdout io.ReadCloser
stderr io.ReadCloser
}

type pipesWriter struct {
stdout io.WriteCloser
stderr io.WriteCloser
}

// NewBinaryIO runs a custom binary process for pluggable shim logging
func NewBinaryIO(ctx context.Context, id string, uri *url.URL) (_ runc.IO, err error) {
func NewBinaryIO(ctx context.Context, id string, uri *url.URL) (_ runc.IO, retErr error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}

var closers []func() error
defer func() {
if err == nil {
if retErr == nil {
return
}
result := []error{err}
result := []error{retErr}
for _, fn := range closers {
result = append(result, fn())
}
err = errors.Join(result...)
retErr = errors.Join(result...)
}()

out, err := newPipe()
Expand Down Expand Up @@ -307,6 +329,180 @@ func NewBinaryIO(ctx context.Context, id string, uri *url.URL) (_ runc.IO, err e
}, nil
}

// NewAttachableBinary runs a custom binary process and opens attachable fifos for pluggable shim logging
func NewAttachableBinary(ctx context.Context, id string, uri *url.URL) (*cio.FIFOSet, error) {
binaryFifos, err := cio.NewFIFOSetInDir(defaults.DefaultFIFODir, id, true)
if err != nil {
return nil, fmt.Errorf("failed to create birany fifos: %w", err)
}

binaryFifosPipes, err := openBinaryFifos(ctx, binaryFifos)
if err != nil {
return nil, err
}

attachableFifos, err := cio.NewFIFOSetInDir(filepath.Join(defaults.DefaultFIFODir, "attach"), id, true)
if err != nil {
return nil, fmt.Errorf("failed to create attachable fifos: %w", err)
}
type closer func() error
var closers []closer

defer func() {
if err == nil {
return
}
result := []error{err}
for _, fn := range closers {
result = append(result, fn())
}
err = errors.Join(result...)
}()

pipesAttachableFifosPipes, err := openAttachableFifos(ctx, attachableFifos)
if err != nil {
return nil, err
}
closers = append(closers, []closer{pipesAttachableFifosPipes.stdout.Close, pipesAttachableFifosPipes.stderr.Close}...)

binaryOut, err := newPipe()
if err != nil {
return nil, fmt.Errorf("failed to create binary stdout pipes: %w", err)
}
closers = append(closers, binaryOut.Close)

binarySerr, err := newPipe()
if err != nil {
return nil, fmt.Errorf("failed to create binary stderr pipes: %w", err)
}
closers = append(closers, binarySerr.Close)

r, w, err := os.Pipe()
if err != nil {
return nil, err
}
closers = append(closers, r.Close, w.Close)

ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}

cmd := NewBinaryCmd(uri, id, ns)
cmd.ExtraFiles = append(cmd.ExtraFiles, binaryOut.r, binarySerr.r, w)
// don't need to register this with the reaper or wait when
// running inside a shim
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("failed to start binary process: %w", err)
}
closers = append(closers, func() error { return cmd.Process.Kill() })

// close our side of the pipe after start
if err := w.Close(); err != nil {
return nil, fmt.Errorf("failed to close write pipe after start: %w", err)
}

// wait for the logging binary to be ready
b := make([]byte, 1)
if _, err := r.Read(b); err != nil && err != io.EOF {
return nil, fmt.Errorf("failed to read from logging binary: %w", err)
}
stdoutWriters := io.MultiWriter(binaryOut.w, pipesAttachableFifosPipes.stdout)
stderrWriters := io.MultiWriter(binarySerr.w, pipesAttachableFifosPipes.stderr)

var wg sync.WaitGroup
wg.Add(2)
go func() {
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
if _, err := io.Copy(stdoutWriters, binaryFifosPipes.stdout); err != nil {
log.G(ctx).Debug(err.Error())
log.G(ctx).Warn("error copying stdout")
}
wg.Done()
binaryOut.w.Close()
pipesAttachableFifosPipes.stdout.Close()
}()
go func() {
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
if _, err := io.Copy(stderrWriters, binaryFifosPipes.stderr); err != nil {
log.G(ctx).Debug(err.Error())
log.G(ctx).Warn("error copying stdout")
}
wg.Done()
binarySerr.w.Close()
pipesAttachableFifosPipes.stderr.Close()
}()
go func() {
wg.Wait()
// Send SIGTERM first, so logger process has a chance to flush and exit properly
if err := cmd.Process.Signal(syscall.SIGTERM); err != nil {
log.L.WithError(err).Warn("failed to send SIGTERM")
if err := cmd.Process.Kill(); err != nil {
log.L.WithError(err).Warn("failed to kill process after faulty SIGTERM")
}
return
}

done := make(chan error, 1)
go func() {
done <- cmd.Wait()
}()

select {
case err := <-done:
log.L.WithError(err).Warn("failed to kill shim logger process")
case <-time.After(binaryIOProcTermTimeout):
log.L.Warn("failed to wait for shim logger process to exit, killing")
err := cmd.Process.Kill()
if err != nil {
log.L.WithError(err).Warn("failed to kill shim logger process")
}
}
}()
return binaryFifos, nil
}

func openBinaryFifos(ctx context.Context, fifos *cio.FIFOSet) (f pipesReader, retErr error) {

if fifos.Stdout != "" {
if f.stdout, retErr = fifo.OpenFifo(ctx, fifos.Stdout, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); retErr != nil {
return f, fmt.Errorf("failed to open stdout fifo: %w", retErr)
}
}
if fifos.Stderr != "" {
defer func() {
if retErr != nil {
f.stdout.Close()
}
}()
if f.stderr, retErr = fifo.OpenFifo(ctx, fifos.Stderr, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); retErr != nil {
return f, fmt.Errorf("failed to open stderr fifo: %w", retErr)
}
}
return f, nil
}

func openAttachableFifos(ctx context.Context, fifos *cio.FIFOSet) (f pipesWriter, retErr error) {
if fifos.Stdout != "" {
if f.stdout, retErr = fifo.OpenFifo(ctx, fifos.Stdout, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); retErr != nil {
return f, fmt.Errorf("failed to open stdout fifo: %w", retErr)
}
}
if fifos.Stderr != "" {
defer func() {
if retErr != nil {
f.stdout.Close()
}
}()
if f.stderr, retErr = fifo.OpenFifo(ctx, fifos.Stderr, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); retErr != nil {
return f, fmt.Errorf("failed to open stderr fifo: %w", retErr)
}
}
return f, nil
}

type binaryIO struct {
cmd *exec.Cmd
out, err *pipe
Expand Down
20 changes: 20 additions & 0 deletions cmd/containerd-shim-runc-v2/process/io_test.go
Expand Up @@ -25,6 +25,7 @@ import (
"testing"

"github.com/containerd/containerd/v2/pkg/namespaces"
"github.com/containerd/containerd/v2/pkg/testutil"
)

func TestNewBinaryIO(t *testing.T) {
Expand All @@ -49,6 +50,25 @@ func TestNewBinaryIO(t *testing.T) {
}
}

func TestNewAttachableBinaryIO(t *testing.T) {
testutil.RequiresRoot(t)

ctx := namespaces.WithNamespace(context.Background(), "test")
uri, _ := url.Parse("attachablebinary:///bin/echo?test")

before := descriptorCount(t)

_, err := NewAttachableBinary(ctx, "1", uri)
if err != nil {
t.Fatal(err)
}

after := descriptorCount(t)
if after != before+9 {
t.Fatalf("descriptors weren't created correctly (%d != %d -1)", before, after)
}
}

func TestNewBinaryIOCleanup(t *testing.T) {
ctx := namespaces.WithNamespace(context.Background(), "test")
uri, _ := url.Parse("binary:///not/existing")
Expand Down

0 comments on commit cd48cde

Please sign in to comment.