Skip to content

Commit

Permalink
cri: support io by streaming api
Browse files Browse the repository at this point in the history
Signed-off-by: Abel Feng <fshb1988@gmail.com>
  • Loading branch information
abel-von committed May 7, 2024
1 parent a26c686 commit 8f73d56
Show file tree
Hide file tree
Showing 8 changed files with 363 additions and 38 deletions.
121 changes: 121 additions & 0 deletions core/transfer/streaming/reader.go
@@ -0,0 +1,121 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package streaming

import (
"context"
"errors"
"fmt"
"io"

transferapi "github.com/containerd/containerd/api/types/transfer"
"github.com/containerd/containerd/v2/core/streaming"
"github.com/containerd/typeurl/v2"
)

type readByteStream struct {
ctx context.Context
stream streaming.Stream
window int32
updated chan struct{}
errCh chan error
remaining []byte
}

func ReadByteStream(ctx context.Context, stream streaming.Stream) io.ReadCloser {
rbs := &readByteStream{
ctx: ctx,
stream: stream,
window: 0,
errCh: make(chan error),
updated: make(chan struct{}, 1),
}
go func() {
for {
if rbs.window >= windowSize {
select {
case <-ctx.Done():
return
case <-rbs.updated:
continue
}
}
update := &transferapi.WindowUpdate{
Update: windowSize,
}
anyType, err := typeurl.MarshalAny(update)
if err != nil {
rbs.errCh <- err
return
}
if err := stream.Send(anyType); err == nil {
rbs.window += windowSize
} else if !errors.Is(err, io.EOF) {
rbs.errCh <- err
}
}

}()
return rbs
}

func (r *readByteStream) Read(p []byte) (n int, err error) {
plen := len(p)
if len(r.remaining) > 0 {
copied := copy(p, r.remaining)
if len(r.remaining) > plen {
r.remaining = r.remaining[plen:]
} else {
r.remaining = nil
}
return copied, nil
}
select {
case <-r.ctx.Done():
return 0, r.ctx.Err()
case err := <-r.errCh:
return 0, err
default:
}
anyType, err := r.stream.Recv()
if err != nil {
return 0, err
}
i, err := typeurl.UnmarshalAny(anyType)
if err != nil {
return 0, err
}
switch v := i.(type) {
case *transferapi.Data:
n := copy(p, v.Data)
if len(v.Data) > plen {
r.remaining = v.Data[plen:]
}
r.window = r.window - int32(n)
if r.window < windowSize {
r.updated <- struct{}{}
}
return n, nil
default:
return 0, fmt.Errorf("stream received error type %v", v)
}

}

func (r *readByteStream) Close() error {
return r.stream.Close()
}
16 changes: 16 additions & 0 deletions internal/cri/config/config.go
Expand Up @@ -71,6 +71,10 @@ const (
// DefaultSandboxImage is the default image to use for sandboxes when empty or
// for default configurations.
DefaultSandboxImage = "registry.k8s.io/pause:3.9"
// IOTypeNamedPipe is container io implemented by creating named pipe
IOTypeNamedPipe = "named_pipe"
// IOTypeStreaming is container io implemented by connecting the streaming api to sandbox endpoint
IOTypeStreaming = "streaming"
)

// Runtime struct to contain the type(ID), engine, and root variables for a default runtime
Expand Down Expand Up @@ -116,6 +120,11 @@ type Runtime struct {
// shim - means use whatever Controller implementation provided by shim (e.g. use RemoteController).
// podsandbox - means use Controller implementation from sbserver podsandbox package.
Sandboxer string `toml:"sandboxer" json:"sandboxer"`
// IOType defines how containerd transfer the io streams of the container
// if it is not set, the named pipe will be created for the container
// we can also set it to "streaming" to create a stream by streaming api,
// and use it as a channel to transfer the io stream
IOType string `toml:"io_type" json:"io_type"`
}

// ContainerdConfig contains toml config related to containerd
Expand Down Expand Up @@ -527,6 +536,13 @@ func ValidateRuntimeConfig(ctx context.Context, c *RuntimeConfig) ([]deprecation
r.Sandboxer = string(ModePodSandbox)
c.ContainerdConfig.Runtimes[k] = r
}

if len(r.IOType) == 0 {
r.IOType = IOTypeNamedPipe
}
if r.IOType != IOTypeStreaming && r.IOType != IOTypeNamedPipe {
return warnings, errors.New("`io_type` can only be `streaming` or `named_pipe`")
}
}

// Validation for drain_exec_sync_io_timeout
Expand Down
23 changes: 19 additions & 4 deletions internal/cri/io/container_io.go
Expand Up @@ -18,14 +18,15 @@ package io

import (
"errors"
"fmt"
"io"
"strings"
"sync"

"github.com/containerd/containerd/v2/pkg/cio"
"github.com/containerd/log"

"github.com/containerd/containerd/v2/internal/cri/util"
"github.com/containerd/containerd/v2/pkg/cio"
cioutil "github.com/containerd/containerd/v2/pkg/ioutil"
)

Expand All @@ -39,7 +40,7 @@ type ContainerIO struct {
id string

fifos *cio.FIFOSet
*stdioPipes
*stdioStream

stdoutGroup *cioutil.WriterGroup
stderrGroup *cioutil.WriterGroup
Expand Down Expand Up @@ -71,6 +72,20 @@ func WithNewFIFOs(root string, tty, stdin bool) ContainerIOOpts {
}
}

// WithStreams creates new streams for the container io.
func WithStreams(address string, tty, stdin bool) ContainerIOOpts {
return func(c *ContainerIO) error {
if address == "" {
return fmt.Errorf("address can not be empty for io stream")
}
fifos, err := newStreams(address, c.id, tty, stdin)
if err != nil {
return err
}
return WithFIFOs(fifos)(c)
}
}

// NewContainerIO creates container io.
func NewContainerIO(id string, opts ...ContainerIOOpts) (_ *ContainerIO, err error) {
c := &ContainerIO{
Expand All @@ -87,11 +102,11 @@ func NewContainerIO(id string, opts ...ContainerIOOpts) (_ *ContainerIO, err err
return nil, errors.New("fifos are not set")
}
// Create actual fifos.
stdio, closer, err := newStdioPipes(c.fifos)
stdio, closer, err := newStdioStream(c.fifos)
if err != nil {
return nil, err
}
c.stdioPipes = stdio
c.stdioStream = stdio
c.closer = closer
return c, nil
}
Expand Down
37 changes: 28 additions & 9 deletions internal/cri/io/exec_io.go
Expand Up @@ -20,36 +20,55 @@ import (
"io"
"sync"

"github.com/containerd/log"

"github.com/containerd/containerd/v2/pkg/cio"
cioutil "github.com/containerd/containerd/v2/pkg/ioutil"
"github.com/containerd/log"
)

// ExecIO holds the exec io.
type ExecIO struct {
id string
fifos *cio.FIFOSet
*stdioPipes
*stdioStream
closer *wgCloser
}

var _ cio.IO = &ExecIO{}

// NewExecIO creates exec io.
func NewExecIO(id, root string, tty, stdin bool) (*ExecIO, error) {
// NewFifoExecIO creates exec io by named pipes.
func NewFifoExecIO(id, root string, tty, stdin bool) (*ExecIO, error) {
fifos, err := newFifos(root, id, tty, stdin)
if err != nil {
return nil, err
}
stdio, closer, err := newStdioPipes(fifos)
stdio, closer, err := newStdioStream(fifos)
if err != nil {
return nil, err
}
return &ExecIO{
id: id,
fifos: fifos,
stdioStream: stdio,
closer: closer,
}, nil
}

// NewStreamExecIO creates exec io with streaming.
func NewStreamExecIO(id, address string, tty, stdin bool) (*ExecIO, error) {
fifos, err := newStreams(address, id, tty, stdin)
if err != nil {
return nil, err
}
stdio, closer, err := newStdioStream(fifos)
if err != nil {
return nil, err
}
return &ExecIO{
id: id,
fifos: fifos,
stdioPipes: stdio,
closer: closer,
id: id,
fifos: fifos,
stdioStream: stdio,
closer: closer,
}, nil
}

Expand Down

0 comments on commit 8f73d56

Please sign in to comment.