Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
Signed-off-by: fahed dorgaa <fahed.dorgaa@gmail.com>
  • Loading branch information
fahedouch committed Apr 24, 2024
1 parent 8837e4f commit 90883dc
Showing 1 changed file with 35 additions and 57 deletions.
92 changes: 35 additions & 57 deletions cmd/containerd-shim-runc-v2/process/io.go
Expand Up @@ -27,6 +27,10 @@ import (
"os"
"os/exec"
"path/filepath"
"sync"
"sync/atomic"
"syscall"
"time"

"github.com/containerd/containerd/v2/defaults"
"github.com/containerd/containerd/v2/pkg/cio"
Expand All @@ -35,11 +39,6 @@ import (
"github.com/containerd/fifo"
runc "github.com/containerd/go-runc"
"github.com/containerd/log"

"sync"
"sync/atomic"
"syscall"
"time"
)

const binaryIOProcTermTimeout = 12 * time.Second // Give logger process solid 10 seconds for cleanup
Expand Down Expand Up @@ -256,33 +255,33 @@ func (c *countingWriteCloser) Close() error {
return c.WriteCloser.Close()
}

type PipesReader struct {
Stdout io.ReadCloser
Stderr io.ReadCloser
type pipesReader struct {
stdout io.ReadCloser
stderr io.ReadCloser
}

type PipesWriter struct {
Stdout io.WriteCloser
Stderr io.WriteCloser
type pipesWriter struct {
stdout io.WriteCloser
stderr io.WriteCloser
}

// NewBinaryIO runs a custom binary process for pluggable shim logging
func NewBinaryIO(ctx context.Context, id string, uri *url.URL) (_ runc.IO, err error) {
func NewBinaryIO(ctx context.Context, id string, uri *url.URL) (_ runc.IO, retErr error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}

var closers []func() error
defer func() {
if err == nil {
if retErr == nil {
return
}
result := []error{err}
result := []error{retErr}
for _, fn := range closers {
result = append(result, fn())
}
err = errors.Join(result...)
retErr = errors.Join(result...)
}()

out, err := newPipe()
Expand Down Expand Up @@ -364,7 +363,7 @@ func NewAttachableBinary(ctx context.Context, id string, uri *url.URL) (*cio.FIF
if err != nil {
return nil, err
}
closers = append(closers, []closer{pipesAttachableFifosPipes.Stdout.Close, pipesAttachableFifosPipes.Stderr.Close}...)
closers = append(closers, []closer{pipesAttachableFifosPipes.stdout.Close, pipesAttachableFifosPipes.stderr.Close}...)

binaryOut, err := newPipe()
if err != nil {
Expand Down Expand Up @@ -408,32 +407,32 @@ func NewAttachableBinary(ctx context.Context, id string, uri *url.URL) (*cio.FIF
if _, err := r.Read(b); err != nil && err != io.EOF {
return nil, fmt.Errorf("failed to read from logging binary: %w", err)
}
stdoutWriters := io.MultiWriter(binaryOut.w, pipesAttachableFifosPipes.Stdout)
stderrWriters := io.MultiWriter(binarySerr.w, pipesAttachableFifosPipes.Stderr)
stdoutWriters := io.MultiWriter(binaryOut.w, pipesAttachableFifosPipes.stdout)
stderrWriters := io.MultiWriter(binarySerr.w, pipesAttachableFifosPipes.stderr)

var wg sync.WaitGroup
wg.Add(2)
go func() {
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
if _, err := io.Copy(stdoutWriters, binaryFifosPipes.Stdout); err != nil {
if _, err := io.Copy(stdoutWriters, binaryFifosPipes.stdout); err != nil {
log.G(ctx).Debug(err.Error())
log.G(ctx).Warn("error copying stdout")
}
wg.Done()
binaryOut.w.Close()
pipesAttachableFifosPipes.Stdout.Close()
pipesAttachableFifosPipes.stdout.Close()
}()
go func() {
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
if _, err := io.Copy(stderrWriters, binaryFifosPipes.Stderr); err != nil {
if _, err := io.Copy(stderrWriters, binaryFifosPipes.stderr); err != nil {
log.G(ctx).Debug(err.Error())
log.G(ctx).Warn("error copying stdout")
}
wg.Done()
binarySerr.w.Close()
pipesAttachableFifosPipes.Stderr.Close()
pipesAttachableFifosPipes.stderr.Close()
}()
go func() {
wg.Wait()
Expand Down Expand Up @@ -465,62 +464,41 @@ func NewAttachableBinary(ctx context.Context, id string, uri *url.URL) (*cio.FIF
return binaryFifos, nil
}

func openBinaryFifos(ctx context.Context, fifos *cio.FIFOSet) (f PipesReader, retErr error) {
defer func() {
if retErr != nil {
fifos.Close()
}
}()
func openBinaryFifos(ctx context.Context, fifos *cio.FIFOSet) (f pipesReader, retErr error) {

if fifos.Stdout != "" {
if f.Stdout, retErr = fifo.OpenFifo(ctx, fifos.Stdout, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); retErr != nil {
if f.stdout, retErr = fifo.OpenFifo(ctx, fifos.Stdout, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); retErr != nil {
return f, fmt.Errorf("failed to open stdout fifo: %w", retErr)
}
defer func() {
if retErr != nil && f.Stdout != nil {
f.Stdout.Close()
}
}()
}
if fifos.Stderr != "" {
if f.Stderr, retErr = fifo.OpenFifo(ctx, fifos.Stderr, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); retErr != nil {
return f, fmt.Errorf("failed to open stderr fifo: %w", retErr)
}
defer func() {
if retErr != nil && f.Stderr != nil {
f.Stderr.Close()
if retErr != nil {
f.stdout.Close()
}
}()
if f.stderr, retErr = fifo.OpenFifo(ctx, fifos.Stderr, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); retErr != nil {
return f, fmt.Errorf("failed to open stderr fifo: %w", retErr)
}
}
return f, nil
}

func openAttachableFifos(ctx context.Context, fifos *cio.FIFOSet) (f PipesWriter, retErr error) {
defer func() {
if retErr != nil {
fifos.Close()
}
}()

func openAttachableFifos(ctx context.Context, fifos *cio.FIFOSet) (f pipesWriter, retErr error) {
if fifos.Stdout != "" {
if f.Stdout, retErr = fifo.OpenFifo(ctx, fifos.Stdout, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); retErr != nil {
if f.stdout, retErr = fifo.OpenFifo(ctx, fifos.Stdout, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); retErr != nil {
return f, fmt.Errorf("failed to open stdout fifo: %w", retErr)
}
defer func() {
if retErr != nil && f.Stdout != nil {
f.Stdout.Close()
}
}()
}
if fifos.Stderr != "" {
if f.Stderr, retErr = fifo.OpenFifo(ctx, fifos.Stderr, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); retErr != nil {
return f, fmt.Errorf("failed to open stderr fifo: %w", retErr)
}
defer func() {
if retErr != nil && f.Stderr != nil {
f.Stderr.Close()
if retErr != nil {
f.stdout.Close()
}
}()
if f.stderr, retErr = fifo.OpenFifo(ctx, fifos.Stderr, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); retErr != nil {
return f, fmt.Errorf("failed to open stderr fifo: %w", retErr)
}
}
return f, nil
}
Expand Down

0 comments on commit 90883dc

Please sign in to comment.