Skip to content

Commit

Permalink
support attachable binary I/O
Browse files Browse the repository at this point in the history
Signed-off-by: fahed dorgaa <fahed.dorgaa@gmail.com>
  • Loading branch information
fahedouch committed Apr 1, 2024
1 parent 124456e commit 8659051
Show file tree
Hide file tree
Showing 2 changed files with 223 additions and 6 deletions.
223 changes: 219 additions & 4 deletions cmd/containerd-shim-runc-v2/process/io.go
Expand Up @@ -27,16 +27,19 @@ import (
"os"
"os/exec"
"path/filepath"
"sync"
"sync/atomic"
"syscall"
"time"

"github.com/containerd/containerd/v2/defaults"
"github.com/containerd/containerd/v2/pkg/cio"
"github.com/containerd/containerd/v2/pkg/namespaces"
"github.com/containerd/containerd/v2/pkg/stdio"
"github.com/containerd/fifo"
runc "github.com/containerd/go-runc"
"github.com/containerd/log"

"sync"
"sync/atomic"
"syscall"
"time"
)

const binaryIOProcTermTimeout = 12 * time.Second // Give logger process solid 10 seconds for cleanup
Expand Down Expand Up @@ -107,6 +110,15 @@ func createIO(ctx context.Context, id string, ioUID, ioGID int, stdio stdio.Stdi
pio.io, err = runc.NewPipeIO(ioUID, ioGID, withConditionalIO(stdio))
case "binary":
pio.io, err = NewBinaryIO(ctx, id, u)
case "attachablebinary":
fifoSet, err := newAttachableBinaryIO(ctx, id, u)
if err != nil {
return nil, err
}
pio.stdio.Stdout = fifoSet.Stdout
pio.stdio.Stderr = fifoSet.Stderr
pio.copy = true
pio.io, err = runc.NewPipeIO(ioUID, ioGID, withConditionalIO(stdio))

Check failure on line 121 in cmd/containerd-shim-runc-v2/process/io.go

View workflow job for this annotation

GitHub Actions / Linters (ubuntu-22.04)

ineffectual assignment to err (ineffassign)
case "file":
filePath := u.Path
if err := os.MkdirAll(filepath.Dir(filePath), 0755); err != nil {
Expand Down Expand Up @@ -243,6 +255,16 @@ func (c *countingWriteCloser) Close() error {
return c.WriteCloser.Close()
}

type PipesReader struct {
Stdout io.ReadCloser
Stderr io.ReadCloser
}

type PipesWriter struct {
Stdout io.WriteCloser
Stderr io.WriteCloser
}

// NewBinaryIO runs a custom binary process for pluggable shim logging
func NewBinaryIO(ctx context.Context, id string, uri *url.URL) (_ runc.IO, err error) {
ns, err := namespaces.NamespaceRequired(ctx)
Expand Down Expand Up @@ -306,6 +328,199 @@ func NewBinaryIO(ctx context.Context, id string, uri *url.URL) (_ runc.IO, err e
err: serr,
}, nil
}
func newAttachableBinaryIO(ctx context.Context, id string, uri *url.URL) (*cio.FIFOSet, error) {
binaryFifos, err := cio.NewFIFOSetInDir(defaults.DefaultFIFODir, id, true)
if err != nil {
return nil, fmt.Errorf("failed to create birany fifos: %w", err)
}

binaryFifosPipes, err := openBinaryFifos(ctx, binaryFifos)
if err != nil {
return nil, err
}

attachableFifos, err := cio.NewFIFOSetInDir(filepath.Join(defaults.DefaultFIFODir, "attach"), id, true)
if err != nil {
return nil, fmt.Errorf("failed to create attachable fifos: %w", err)
}
type closer func() error
var closers []closer

defer func() {
if err == nil {
return
}
result := []error{err}
for _, fn := range closers {
result = append(result, fn())
}
err = errors.Join(result...)
}()

pipesAttachableFifosPipes, err := openAttachableFifos(ctx, attachableFifos)
if err != nil {
return nil, err
}
closers = append(closers, []closer{pipesAttachableFifosPipes.Stdout.Close, pipesAttachableFifosPipes.Stderr.Close}...)

binaryOut, err := newPipe()
if err != nil {
return nil, fmt.Errorf("failed to create binary stdout pipes: %w", err)
}
closers = append(closers, binaryOut.Close)

binarySerr, err := newPipe()
if err != nil {
return nil, fmt.Errorf("failed to create binary stderr pipes: %w", err)
}
closers = append(closers, binarySerr.Close)

r, w, err := os.Pipe()
if err != nil {
return nil, err
}
closers = append(closers, r.Close, w.Close)

ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}

cmd := NewBinaryCmd(uri, id, ns)
cmd.ExtraFiles = append(cmd.ExtraFiles, binaryOut.r, binarySerr.r, w)
// don't need to register this with the reaper or wait when
// running inside a shim
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("failed to start binary process: %w", err)
}
closers = append(closers, func() error { return cmd.Process.Kill() })

// close our side of the pipe after start
if err := w.Close(); err != nil {
return nil, fmt.Errorf("failed to close write pipe after start: %w", err)
}

// wait for the logging binary to be ready
b := make([]byte, 1)
if _, err := r.Read(b); err != nil && err != io.EOF {
return nil, fmt.Errorf("failed to read from logging binary: %w", err)
}
stdoutWriters := io.MultiWriter(binaryOut.w, pipesAttachableFifosPipes.Stdout)
stderrWriters := io.MultiWriter(binarySerr.w, pipesAttachableFifosPipes.Stderr)

var wg sync.WaitGroup
go func() {
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
if _, err := io.Copy(stdoutWriters, binaryFifosPipes.Stdout); err != nil {
log.G(ctx).Debug(err.Error())
log.G(ctx).Warn("error copying stdout")
}
wg.Done()
binaryOut.w.Close()
pipesAttachableFifosPipes.Stdout.Close()
}()
go func() {
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
if _, err := io.Copy(stderrWriters, binaryFifosPipes.Stderr); err != nil {
log.G(ctx).Debug(err.Error())
log.G(ctx).Warn("error copying stdout")
}
wg.Done()
binarySerr.w.Close()
pipesAttachableFifosPipes.Stderr.Close()
}()
wg.Add(2)
go func() {
wg.Wait()
// Send SIGTERM first, so logger process has a chance to flush and exit properly
if err := cmd.Process.Signal(syscall.SIGTERM); err != nil {
log.L.WithError(err).Warn("failed to send SIGTERM")
if err := cmd.Process.Kill(); err != nil {
log.L.WithError(err).Warn("failed to kill process after faulty SIGTERM")
}
return
}

done := make(chan error, 1)
go func() {
done <- cmd.Wait()
}()

select {
case err := <-done:
log.L.WithError(err).Warn("failed to kill shim logger process")
case <-time.After(binaryIOProcTermTimeout):
log.L.Warn("failed to wait for shim logger process to exit, killing")
err := cmd.Process.Kill()
if err != nil {
log.L.WithError(err).Warn("failed to kill shim logger process")
}
}
}()
return binaryFifos, nil
}

func openBinaryFifos(ctx context.Context, fifos *cio.FIFOSet) (f PipesReader, retErr error) {
defer func() {
if retErr != nil {
fifos.Close()
}
}()

if fifos.Stdout != "" {
if f.Stdout, retErr = fifo.OpenFifo(ctx, fifos.Stdout, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); retErr != nil {
return f, fmt.Errorf("failed to open stdout fifo: %w", retErr)
}
defer func() {
if retErr != nil && f.Stdout != nil {
f.Stdout.Close()
}
}()
}
if fifos.Stderr != "" {
if f.Stderr, retErr = fifo.OpenFifo(ctx, fifos.Stderr, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); retErr != nil {
return f, fmt.Errorf("failed to open stderr fifo: %w", retErr)
}
defer func() {
if retErr != nil && f.Stderr != nil {
f.Stderr.Close()
}
}()
}
return f, nil
}

func openAttachableFifos(ctx context.Context, fifos *cio.FIFOSet) (f PipesWriter, retErr error) {
defer func() {
if retErr != nil {
fifos.Close()
}
}()

if fifos.Stdout != "" {
if f.Stdout, retErr = fifo.OpenFifo(ctx, fifos.Stdout, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); retErr != nil {
return f, fmt.Errorf("failed to open stdout fifo: %w", retErr)
}
defer func() {
if retErr != nil && f.Stdout != nil {
f.Stdout.Close()
}
}()
}
if fifos.Stderr != "" {
if f.Stderr, retErr = fifo.OpenFifo(ctx, fifos.Stderr, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); retErr != nil {
return f, fmt.Errorf("failed to open stderr fifo: %w", retErr)
}
defer func() {
if retErr != nil && f.Stderr != nil {
f.Stderr.Close()
}
}()
}
return f, nil
}

type binaryIO struct {
cmd *exec.Cmd
Expand Down
6 changes: 4 additions & 2 deletions cmd/containerd-shim-runc-v2/process/io_test.go
Expand Up @@ -20,11 +20,13 @@ package process

import (
"context"

"github.com/containerd/containerd/v2/pkg/namespaces"

"net/url"
"os"
"testing"

"github.com/containerd/containerd/v2/pkg/namespaces"
"testing"
)

func TestNewBinaryIO(t *testing.T) {
Expand Down

0 comments on commit 8659051

Please sign in to comment.