Skip to content

Commit

Permalink
Merge pull request moby#6 from squaremo/net_ext_plus_plugins
Browse files Browse the repository at this point in the history
Add in (rebased) plugin branch
  • Loading branch information
squaremo committed Apr 14, 2015
2 parents 2018905 + 0a55542 commit 62b114c
Show file tree
Hide file tree
Showing 11 changed files with 322 additions and 5 deletions.
40 changes: 40 additions & 0 deletions Gunfile
@@ -0,0 +1,40 @@

# temporary Gunfile for plugins development on OS X.
# curious to use it yourself?
# https://github.com/gliderlabs/glidergun

init() {
cmd-export build
cmd-export push
cmd-export restore
cmd-export backup
cmd-export install
}

build() {
make BINDDIR=.
}

install() {
cp bundles/1.5.0-plugins/cross/darwin/amd64/docker-1.5.0-plugins \
~/Projects/.bin/docker
}

push() {
cat bundles/1.5.0-plugins/binary/docker-1.5.0-plugins \
| boot2docker ssh "cat > docker"
boot2docker ssh "
chmod +x ./docker
sudo mv ./docker /usr/local/bin/docker
sudo /etc/init.d/docker restart
"
}

backup() {
boot2docker ssh "cp /usr/local/bin/docker /usr/local/bin/docker.backup"
}

restore() {
boot2docker ssh "cp /usr/local/bin/docker.backup /usr/local/bin/docker"
boot2docker ssh "sudo /etc/init.d/docker restart"
}
2 changes: 1 addition & 1 deletion VERSION
@@ -1 +1 @@
1.5.0-dev
1.5.0-plugins
56 changes: 56 additions & 0 deletions daemon/container.go
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"io/ioutil"
"net"
"os"
"path"
"path/filepath"
Expand Down Expand Up @@ -40,6 +41,7 @@ import (
"github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/pkg/symlink"
"github.com/docker/docker/pkg/ulimit"
"github.com/docker/docker/plugins"
"github.com/docker/docker/runconfig"
"github.com/docker/docker/utils"
)
Expand Down Expand Up @@ -1471,9 +1473,59 @@ func (container *Container) waitForStart() error {
return err
}

if container.hostConfig.Plugin {
if err := container.waitForPluginSock(); err != nil {
return err
}
}

return nil
}

func (container *Container) waitForPluginSock() error {
pluginSock, err := container.getPluginSocketPath()
if err != nil {
return err
}

chConn := make(chan net.Conn)
chStop := make(chan struct{})
go func() {
logrus.Debugf("waiting for plugin socket at: %s", pluginSock)
for {
addr, err := net.ResolveUnixAddr("unix", pluginSock)
if err != nil {
logrus.Debugf("bad addr: %s", err)
}
conn, err := net.DialUnix("unix", nil, addr)
// If the file doesn't exist yet, that's ok, maybe plugin hasn't created it yet
if err != nil {
select {
case <-chStop:
return
default:
logrus.Debugf("retrying. got: %s", err)
time.Sleep(time.Second)
continue
}
}
logrus.Debugf("got plugin socket")
chConn <- conn
return
}
}()

select {
case conn := <-chConn:
// We can close this net.Conn since the plugin system will establish it's own connection
conn.Close()
return plugins.Repo.RegisterPlugin(pluginSock)
case <-time.After(5 * time.Second):
chStop <- struct{}{}
return fmt.Errorf("connection to plugin sock timed out")
}
}

func (container *Container) allocatePort(eng *engine.Engine, port nat.Port, bindings nat.PortMap) error {
binding := bindings[port]
if container.hostConfig.PublishAllPorts && len(binding) == 0 {
Expand Down Expand Up @@ -1560,3 +1612,7 @@ func (c *Container) GetEndpoint(id string) (int, *Endpoint) {
}
return -1, nil
}

func (container *Container) getPluginSocketPath() (string, error) {
return container.getRootResourcePath(filepath.Join("p", "p.s"))
}
1 change: 1 addition & 0 deletions daemon/daemon.go
Expand Up @@ -319,6 +319,7 @@ func (daemon *Daemon) restore() error {
return err
}

// TODO: sort out plugins from normal containers and load plugins first
for _, v := range dir {
id := v.Name()
container, err := daemon.load(id)
Expand Down
20 changes: 17 additions & 3 deletions daemon/volumes.go
Expand Up @@ -131,7 +131,7 @@ func (container *Container) registerVolumes() {
if rw, exists := container.VolumesRW[path]; exists {
writable = rw
}
v, err := container.daemon.volumes.FindOrCreateVolume(path, writable)
v, err := container.daemon.volumes.FindOrCreateVolume(path, container.ID, writable)
if err != nil {
logrus.Debugf("error registering volume %s: %v", path, err)
continue
Expand Down Expand Up @@ -164,7 +164,7 @@ func (container *Container) parseVolumeMountConfig() (map[string]*Mount, error)
return nil, fmt.Errorf("Duplicate volume %q: %q already in use, mounted from %q", path, mountToPath, m.volume.Path)
}
// Check if a volume already exists for this and use it
vol, err := container.daemon.volumes.FindOrCreateVolume(path, writable)
vol, err := container.daemon.volumes.FindOrCreateVolume(path, container.ID, writable)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -199,7 +199,7 @@ func (container *Container) parseVolumeMountConfig() (map[string]*Mount, error)
}
}

vol, err := container.daemon.volumes.FindOrCreateVolume("", true)
vol, err := container.daemon.volumes.FindOrCreateVolume("", container.ID, true)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -323,6 +323,20 @@ func validMountMode(mode string) bool {
func (container *Container) setupMounts() error {
mounts := []execdriver.Mount{}

if container.hostConfig.Plugin {
// We are going to create this socket then close/unlink it so it can be
// bind-mounted into the container and used by the plugin process.
socketPath, err := container.getPluginSocketPath()
if err != nil {
return err
}
pluginDir := filepath.Dir(socketPath)
if err := os.MkdirAll(pluginDir, 0700); err != nil {
return err
}
mounts = append(mounts, execdriver.Mount{Source: pluginDir, Destination: "/var/run/docker-plugin", Writable: true, Private: true})
}

// Mount user specified volumes
// Note, these are not private because you may want propagation of (un)mounts from host
// volumes. For instance if you use -v /usr:/usr and the host later mounts /usr/share you
Expand Down
64 changes: 64 additions & 0 deletions plugins/client.go
@@ -0,0 +1,64 @@
package plugins

import (
"bytes"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"net/http/httputil"
"time"

log "github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/ioutils"
)

const pluginApiVersion = "v1"

func connect(addr string) (*httputil.ClientConn, error) {
c, err := net.DialTimeout("unix", addr, 30*time.Second)
if err != nil {
return nil, err
}
return httputil.NewClientConn(c, nil), nil
}

func call(addr, method, path string, data interface{}) (io.ReadCloser, error) {
client, err := connect(addr)
if err != nil {
return nil, err
}

reqBody, err := json.Marshal(data)
if err != nil {
return nil, err
}

log.Debugf("sending request for extension:\n%s", string(reqBody))
path = "/" + pluginApiVersion + "/" + path
req, err := http.NewRequest(method, path, bytes.NewBuffer(reqBody))
if err != nil {
client.Close()
return nil, err
}
req.Header.Set("Content-Type", "application/json")

resp, err := client.Do(req)
if err != nil {
client.Close()
return nil, err
}

// FIXME: this should be better defined
if resp.StatusCode >= 400 {
return nil, fmt.Errorf("got bad status: %s", resp.Status)
}

return ioutils.NewReadCloserWrapper(resp.Body, func() error {
if err := resp.Body.Close(); err != nil {
return err
}
return client.Close()
}), nil
}
36 changes: 36 additions & 0 deletions plugins/plugin.go
@@ -0,0 +1,36 @@
package plugins

import (
"encoding/json"
"io"
)

type Plugin struct {
addr string
kind string
}

type handshakeResp struct {
InterestedIn []string
Name string
Author string
Org string
Website string
}

func (p *Plugin) Call(method, path string, data interface{}) (io.ReadCloser, error) {
path = p.kind + "/" + path
return call(p.addr, method, path, data)
}

func (p *Plugin) handshake() (*handshakeResp, error) {
// Don't use the local `call` because this shouldn't be namespaced
respBody, err := call(p.addr, "POST", "handshake", nil)
if err != nil {
return nil, err
}
defer respBody.Close()

var data handshakeResp
return &data, json.NewDecoder(respBody).Decode(&data)
}
61 changes: 61 additions & 0 deletions plugins/repository.go
@@ -0,0 +1,61 @@
package plugins

import (
"errors"
"fmt"
)

// Temporary singleton
var Repo = NewRepository()

var ErrNotRegistered = errors.New("plugin type is not registered")

type Repository struct {
plugins map[string]Plugins
}

type Plugins []*Plugin

func (repository *Repository) GetPlugins(kind string) (Plugins, error) {
plugins, exists := repository.plugins[kind]
// TODO: check whether 'kind' is a supportedPluginType
if !exists {
// If no plugins have been registered for this kind yet, that's
// OK. Just set and return an empty list.
repository.plugins[kind] = make([]*Plugin, 0)
return repository.plugins[kind], nil
}
return plugins, nil
}

var supportedPluginTypes = map[string]struct{}{
"volume": {},
}

func NewRepository() *Repository {
return &Repository{
plugins: make(map[string]Plugins),
}
}

func (repository *Repository) RegisterPlugin(addr string) error {
plugin := &Plugin{addr: addr}
resp, err := plugin.handshake()
if err != nil {
return fmt.Errorf("error in plugin handshake: %v", err)
}

for _, interest := range resp.InterestedIn {
if _, exists := supportedPluginTypes[interest]; !exists {
return fmt.Errorf("plugin type %s is not supported", interest)
}

if _, exists := repository.plugins[interest]; !exists {
repository.plugins[interest] = []*Plugin{}
}
plugin.kind = interest
repository.plugins[interest] = append(repository.plugins[interest], plugin)
}

return nil
}
2 changes: 2 additions & 0 deletions runconfig/hostconfig.go
Expand Up @@ -132,6 +132,7 @@ type HostConfig struct {
Ulimits []*ulimit.Ulimit
LogConfig LogConfig
CgroupParent string // Parent cgroup.
Plugin bool
}

// This is used by the create command when you want to set both the
Expand Down Expand Up @@ -184,6 +185,7 @@ func ContainerHostConfigFromJob(job *engine.Job) *HostConfig {
PidMode: PidMode(job.Getenv("PidMode")),
ReadonlyRootfs: job.GetenvBool("ReadonlyRootfs"),
CgroupParent: job.Getenv("CgroupParent"),
Plugin: job.GetenvBool("Plugin"),
}

// FIXME: This is for backward compatibility, if people use `Cpuset`
Expand Down
2 changes: 2 additions & 0 deletions runconfig/parse.go
Expand Up @@ -73,6 +73,7 @@ func Parse(cmd *flag.FlagSet, args []string) (*Config, *HostConfig, *flag.FlagSe
flReadonlyRootfs = cmd.Bool([]string{"-read-only"}, false, "Mount the container's root filesystem as read only")
flLoggingDriver = cmd.String([]string{"-log-driver"}, "", "Logging driver for container")
flCgroupParent = cmd.String([]string{"-cgroup-parent"}, "", "Optional parent cgroup for the container")
flPlugin = cmd.Bool([]string{"-plugin"}, false, "Enable plugin mode!")
)

cmd.Var(&flAttach, []string{"a", "-attach"}, "Attach to STDIN, STDOUT or STDERR")
Expand Down Expand Up @@ -337,6 +338,7 @@ func Parse(cmd *flag.FlagSet, args []string) (*Config, *HostConfig, *flag.FlagSe
Ulimits: flUlimits.GetList(),
LogConfig: LogConfig{Type: *flLoggingDriver},
CgroupParent: *flCgroupParent,
Plugin: *flPlugin,
}

// When allocating stdin in attached mode, close stdin at client disconnect
Expand Down

0 comments on commit 62b114c

Please sign in to comment.