Skip to content

Commit

Permalink
Merge pull request moby#5 from crosbymichael/update-events
Browse files Browse the repository at this point in the history
Update events to consume the json stream and not buffer in memory
  • Loading branch information
samalba committed Apr 25, 2014
2 parents f3649e5 + 33ecf7a commit 8980ee4
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 54 deletions.
78 changes: 24 additions & 54 deletions dockerclient.go
Expand Up @@ -5,11 +5,10 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"net"
"log"
"net/http"
"net/url"
"sync/atomic"
"time"
)

type DockerClient struct {
Expand All @@ -18,6 +17,8 @@ type DockerClient struct {
monitorEvents int32
}

type Callback func(*Event, ...interface{})

func NewDockerClient(daemonUrl string) (*DockerClient, error) {
u, err := url.Parse(daemonUrl)
if err != nil {
Expand All @@ -27,22 +28,6 @@ func NewDockerClient(daemonUrl string) (*DockerClient, error) {
return &DockerClient{u, httpClient, 0}, nil
}

func newHTTPClient(u *url.URL) *http.Client {
httpTransport := &http.Transport{}
if u.Scheme == "unix" {
socketPath := u.Path
unixDial := func(proto string, addr string) (net.Conn, error) {
return net.Dial("unix", socketPath)
}
httpTransport.Dial = unixDial
// Override the main URL object so the HTTP lib won't complain
u.Scheme = "http"
u.Host = "unix.sock"
}
u.Path = ""
return &http.Client{Transport: httpTransport}
}

func (client *DockerClient) doRequest(method string, path string, body []byte) ([]byte, error) {
b := bytes.NewBuffer(body)
req, err := http.NewRequest(method, client.URL.String()+path, b)
Expand Down Expand Up @@ -150,44 +135,29 @@ func (client *DockerClient) KillContainer(id string) error {
return nil
}

func (client *DockerClient) StartMonitorEvents(cb func(*Event, ...interface{}), args ...interface{}) {
func (client *DockerClient) StartMonitorEvents(cb Callback, args ...interface{}) {
atomic.StoreInt32(&client.monitorEvents, 1)
wait := 100 * time.Millisecond
buffer := make([]byte, 4096)
var running int32 = 1
go func() {
for running > 0 {
running = atomic.LoadInt32(&client.monitorEvents)
if running == 0 {
break
}
uri := client.URL.String() + "/v1.10/events"
resp, err := client.HTTPClient.Get(uri)
if err != nil {
time.Sleep(wait)
continue
}
if resp.StatusCode >= 300 {
resp.Body.Close()
time.Sleep(wait)
continue
}
for {
nBytes, err := resp.Body.Read(buffer)
if err != nil {
resp.Body.Close()
time.Sleep(wait)
break
}
event := &Event{}
err = json.Unmarshal(buffer[:nBytes], event)
if err == nil {
cb(event, args...)
}
}
time.Sleep(wait)
go client.getEvents(cb, args...)
}

func (client *DockerClient) getEvents(cb Callback, args ...interface{}) {
uri := client.URL.String() + "/v1.10/events"
resp, err := client.HTTPClient.Get(uri)
if err != nil {
log.Println(err)
return
}
defer resp.Body.Close()

dec := json.NewDecoder(resp.Body)
for atomic.LoadInt32(&client.monitorEvents) > 0 {
var event *Event
if err := dec.Decode(&event); err != nil {
log.Println(err)
return
}
}()
cb(event, args...)
}
}

func (client *DockerClient) StopAllMonitorEvents() {
Expand Down
32 changes: 32 additions & 0 deletions examples/events.go
@@ -0,0 +1,32 @@
package main

import (
"github.com/samalba/dockerclient"
"log"
"os"
"os/signal"
"syscall"
)

func eventCallback(e *dockerclient.Event, args ...interface{}) {
log.Println(e)
}

func waitForInterrupt() {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT)
for _ = range sigChan {
os.Exit(0)
}
}

func main() {
docker, err := dockerclient.NewDockerClient(os.Getenv("DOCKER_HOST"))
if err != nil {
log.Fatal(err)
}

docker.StartMonitorEvents(eventCallback)

waitForInterrupt()
}
23 changes: 23 additions & 0 deletions utils.go
@@ -0,0 +1,23 @@
package dockerclient

import (
"net"
"net/http"
"net/url"
)

func newHTTPClient(u *url.URL) *http.Client {
httpTransport := &http.Transport{}
if u.Scheme == "unix" {
socketPath := u.Path
unixDial := func(proto string, addr string) (net.Conn, error) {
return net.Dial("unix", socketPath)
}
httpTransport.Dial = unixDial
// Override the main URL object so the HTTP lib won't complain
u.Scheme = "http"
u.Host = "unix.sock"
}
u.Path = ""
return &http.Client{Transport: httpTransport}
}

0 comments on commit 8980ee4

Please sign in to comment.