Skip to content

Commit

Permalink
cri: support io by streaming api
Browse files Browse the repository at this point in the history
Signed-off-by: Abel Feng <fshb1988@gmail.com>
  • Loading branch information
abel-von committed Mar 14, 2024
1 parent 1572d55 commit a9bb07d
Show file tree
Hide file tree
Showing 7 changed files with 231 additions and 36 deletions.
5 changes: 5 additions & 0 deletions internal/cri/config/config.go
Expand Up @@ -116,6 +116,11 @@ type Runtime struct {
// shim - means use whatever Controller implementation provided by shim (e.g. use RemoteController).
// podsandbox - means use Controller implementation from sbserver podsandbox package.
Sandboxer string `toml:"sandboxer" json:"sandboxer"`
// IOType defines how containerd transfer the io streams of the container
// if it is not set, the named pipe will be created for the container
// we can also set it to "streaming" to create a stream by streaming api,
// and use it as a channel to transfer the io stream
IOType string `toml:"io_type" json:"io_type"`
}

// ContainerdConfig contains toml config related to containerd
Expand Down
21 changes: 18 additions & 3 deletions internal/cri/io/container_io.go
Expand Up @@ -18,6 +18,7 @@ package io

import (
"errors"
"fmt"
"io"
"strings"
"sync"
Expand All @@ -39,7 +40,7 @@ type ContainerIO struct {
id string

fifos *cio.FIFOSet
*stdioPipes
*stdioStream

stdoutGroup *cioutil.WriterGroup
stderrGroup *cioutil.WriterGroup
Expand Down Expand Up @@ -71,6 +72,20 @@ func WithNewFIFOs(root string, tty, stdin bool) ContainerIOOpts {
}
}

// WithStreams creates new streams for the container io.
func WithStreams(address, protocol string, tty, stdin bool) ContainerIOOpts {
return func(c *ContainerIO) error {
if address == "" || protocol == "" {
return fmt.Errorf("address and protocol can not be empty for io stream")
}
fifos, err := newStreams(address, protocol, c.id, tty, stdin)
if err != nil {
return err
}
return WithFIFOs(fifos)(c)
}
}

// NewContainerIO creates container io.
func NewContainerIO(id string, opts ...ContainerIOOpts) (_ *ContainerIO, err error) {
c := &ContainerIO{
Expand All @@ -87,11 +102,11 @@ func NewContainerIO(id string, opts ...ContainerIOOpts) (_ *ContainerIO, err err
return nil, errors.New("fifos are not set")
}
// Create actual fifos.
stdio, closer, err := newStdioPipes(c.fifos)
stdio, closer, err := newStdioStream(c.fifos)
if err != nil {
return nil, err
}
c.stdioPipes = stdio
c.stdioStream = stdio
c.closer = closer
return c, nil
}
Expand Down
34 changes: 26 additions & 8 deletions internal/cri/io/exec_io.go
Expand Up @@ -29,27 +29,45 @@ import (
type ExecIO struct {
id string
fifos *cio.FIFOSet
*stdioPipes
*stdioStream
closer *wgCloser
}

var _ cio.IO = &ExecIO{}

// NewExecIO creates exec io.
func NewExecIO(id, root string, tty, stdin bool) (*ExecIO, error) {
// NewFifoExecIO creates exec io by named pipes.
func NewFifoExecIO(id, root string, tty, stdin bool) (*ExecIO, error) {
fifos, err := newFifos(root, id, tty, stdin)
if err != nil {
return nil, err
}
stdio, closer, err := newStdioPipes(fifos)
stdio, closer, err := newStdioStream(fifos)
if err != nil {
return nil, err
}
return &ExecIO{
id: id,
fifos: fifos,
stdioPipes: stdio,
closer: closer,
id: id,
fifos: fifos,
stdioStream: stdio,
closer: closer,
}, nil
}

// NewStreamExecIO creates exec io with streaming.
func NewStreamExecIO(id, address, protocol string, tty, stdin bool) (*ExecIO, error) {
fifos, err := newStreams(address, protocol, id, tty, stdin)
if err != nil {
return nil, err
}
stdio, closer, err := newStdioStream(fifos)
if err != nil {
return nil, err
}
return &ExecIO{
id: id,
fifos: fifos,
stdioStream: stdio,
closer: closer,
}, nil
}

Expand Down
166 changes: 148 additions & 18 deletions internal/cri/io/helpers.go
Expand Up @@ -18,14 +18,26 @@ package io

import (
"context"
"fmt"
"io"
"net/url"
"os"
"path/filepath"
"strings"
"sync"
"syscall"
"time"

"github.com/containerd/containerd/v2/pkg/cio"
"github.com/containerd/ttrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"

"github.com/containerd/containerd/v2/client"
streaming2 "github.com/containerd/containerd/v2/core/streaming"
"github.com/containerd/containerd/v2/core/transfer/streaming"
"github.com/containerd/containerd/v2/pkg/cio"
"github.com/containerd/containerd/v2/pkg/shim"
)

// AttachOptions specifies how to attach to a container.
Expand Down Expand Up @@ -88,19 +100,35 @@ func newFifos(root, id string, tty, stdin bool) (*cio.FIFOSet, error) {
return fifos, nil
}

type stdioPipes struct {
// newStreams init streams for io of container.
func newStreams(address, protocol, id string, tty, stdin bool) (*cio.FIFOSet, error) {
fifos := cio.NewFIFOSet(cio.Config{}, func() error { return nil })
if stdin {
streamID := id + "-stdin"
fifos.Stdin = fmt.Sprintf("streaming:%s?protocol=%s&id=%s", address, protocol, streamID)
}
stdoutStreamID := id + "-stdout"
fifos.Stdout = fmt.Sprintf("streaming:%s?protocol=%s&id=%s", address, protocol, stdoutStreamID)
if !tty {
stderrStreamID := id + "-stderr"
fifos.Stderr = fmt.Sprintf("streaming:%s?protocol=%s&id=%s", address, protocol, stderrStreamID)
}
fifos.Terminal = tty
return fifos, nil
}

type stdioStream struct {
stdin io.WriteCloser
stdout io.ReadCloser
stderr io.ReadCloser
}

// newStdioPipes creates actual fifos for stdio.
func newStdioPipes(fifos *cio.FIFOSet) (_ *stdioPipes, _ *wgCloser, err error) {
// newStdioStream creates actual fifos for stdio.
func newStdioStream(fifos *cio.FIFOSet) (_ *stdioStream, _ *wgCloser, err error) {
var (
f io.ReadWriteCloser
set []io.Closer
ctx, cancel = context.WithCancel(context.Background())
p = &stdioPipes{}
p = &stdioStream{}
)
defer func() {
if err != nil {
Expand All @@ -112,27 +140,30 @@ func newStdioPipes(fifos *cio.FIFOSet) (_ *stdioPipes, _ *wgCloser, err error) {
}()

if fifos.Stdin != "" {
if f, err = openPipe(ctx, fifos.Stdin, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return nil, nil, err
in, err := openStdin(ctx, fifos.Stdin)
if err != nil {
return nil, nil, fmt.Errorf("failed to open stdin, %w", err)
}
p.stdin = f
set = append(set, f)
p.stdin = in
set = append(set, in)
}

if fifos.Stdout != "" {
if f, err = openPipe(ctx, fifos.Stdout, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return nil, nil, err
out, err := openOutput(ctx, fifos.Stdout)
if err != nil {
return nil, nil, fmt.Errorf("failed to open stdout, %w", err)
}
p.stdout = f
set = append(set, f)
p.stdout = out
set = append(set, out)
}

if fifos.Stderr != "" {
if f, err = openPipe(ctx, fifos.Stderr, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return nil, nil, err
out, err := openOutput(ctx, fifos.Stderr)
if err != nil {
return nil, nil, fmt.Errorf("failed to open stderr, %w", err)
}
p.stderr = f
set = append(set, f)
p.stderr = out
set = append(set, out)
}

return p, &wgCloser{
Expand All @@ -142,3 +173,102 @@ func newStdioPipes(fifos *cio.FIFOSet) (_ *stdioPipes, _ *wgCloser, err error) {
cancel: cancel,
}, nil
}

func openStdin(ctx context.Context, address string) (io.WriteCloser, error) {
ok := strings.Contains(address, "://")
if !ok {
return openPipe(ctx, address, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700)
}

realURL, ok := strings.CutPrefix(address, "streaming:")
if !ok {
// TODO, support connect stream other than streaming
return nil, fmt.Errorf("only streamimg:<address>?protocol=xxx&id=xxx supported")
}

return openStdinStream(ctx, realURL)
}

func openStdinStream(ctx context.Context, url string) (io.WriteCloser, error) {
stream, err := openStream(ctx, url)
if err != nil {
return nil, err
}
return streaming.WriteByteStream(ctx, stream), nil
}

func openOutput(ctx context.Context, address string) (io.ReadCloser, error) {
ok := strings.Contains(address, "://")
if !ok {
return openPipe(ctx, address, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700)
}

realURL, ok := strings.CutPrefix(address, "streaming:")
if !ok {
// TODO, support connect stream other than streaming
return nil, fmt.Errorf("only streamimg:<address>?protocol=xxx&id=xxx supported")
}

return openOutputStream(ctx, realURL)
}

func openOutputStream(ctx context.Context, url string) (io.ReadCloser, error) {
stream, err := openStream(ctx, url)
if err != nil {
return nil, err
}
return streaming.ReadByteStream(ctx, stream), nil
}

func openStream(ctx context.Context, urlStr string) (streaming2.Stream, error) {
u, err := url.Parse(urlStr)
if err != nil {
return nil, fmt.Errorf("address url parse error: %v", err)
}
protocol := u.Query().Get("protocol")
if protocol == "" {
return nil, fmt.Errorf("no protocol in url queries")
}
id := u.Query().Get("id")
if id == "" {
return nil, fmt.Errorf("no stream id in url queries")
}
realAddress := fmt.Sprintf("%s://%s/%s", u.Scheme, u.Host, u.Path)
conn, err := shim.AnonReconnectDialer(realAddress, 100*time.Second)
if err != nil {
return nil, fmt.Errorf("failed to connect the stream %v", err)
}
var stream streaming2.Stream

switch protocol {
case "ttrpc":
c := ttrpc.NewClient(conn)
streamCreator := client.NewTTRPCStreamCreator(c)
stream, err = streamCreator.Create(ctx, id)
if err != nil {
return nil, err
}
return stream, nil

case "grpc":
ctx, cancel := context.WithTimeout(ctx, time.Second*100)
defer cancel()

gopts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
}
conn, err := grpc.DialContext(ctx, realAddress, gopts...)
if err != nil {
return nil, err
}
streamCreator := client.NewGRPCStreamCreator(conn)
stream, err = streamCreator.Create(ctx, id)
if err != nil {
return nil, err
}
return stream, nil
default:
return nil, fmt.Errorf("protocol not supported")
}
}
11 changes: 9 additions & 2 deletions internal/cri/server/container_create.go
Expand Up @@ -247,8 +247,15 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta
sandboxConfig.GetLogDirectory(), config.GetLogPath())
}

containerIO, err := cio.NewContainerIO(id,
cio.WithNewFIFOs(volatileContainerRootDir, config.GetTty(), config.GetStdin()))
var containerIO *cio.ContainerIO
switch ociRuntime.IOType {
case "streaming":
containerIO, err = cio.NewContainerIO(id,
cio.WithStreams(sandbox.Endpoint.Address, sandbox.Endpoint.Protocol, config.GetTty(), config.GetStdin()))
default:
containerIO, err = cio.NewContainerIO(id,
cio.WithNewFIFOs(volatileContainerRootDir, config.GetTty(), config.GetStdin()))
}
if err != nil {
return nil, fmt.Errorf("failed to create container io: %w", err)
}
Expand Down

0 comments on commit a9bb07d

Please sign in to comment.