From 36900f1d03c2e44f74b7da2f5d4b4dcdc2cc34a8 Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Wed, 21 Dec 2016 11:29:39 -0800 Subject: [PATCH 01/13] Fix liverestore/restartmanager panic in v1.12 Signed-off-by: Tonis Tiigi --- integration-cli/docker_cli_daemon_test.go | 32 +++++++++++++++++++++++ libcontainerd/client_linux.go | 2 ++ 2 files changed, 34 insertions(+) diff --git a/integration-cli/docker_cli_daemon_test.go b/integration-cli/docker_cli_daemon_test.go index 7508f9fc678f6..1fd35e2842633 100644 --- a/integration-cli/docker_cli_daemon_test.go +++ b/integration-cli/docker_cli_daemon_test.go @@ -2717,3 +2717,35 @@ func (s *DockerDaemonSuite) TestRunWithRuntimeFromCommandLine(c *check.C) { out, err = s.d.Cmd("run", "--rm", "--runtime=runc", "busybox", "ls") c.Assert(err, check.IsNil, check.Commentf(out)) } + +// #29598 +func (s *DockerDaemonSuite) TestRestartPolicyWithLiveRestore(c *check.C) { + testRequires(c, SameHostDaemon, DaemonIsLinux) + c.Assert(s.d.StartWithBusybox("--live-restore"), check.IsNil) + + out, err := s.d.Cmd("run", "-d", "--restart", "always", "busybox", "top") + c.Assert(err, check.IsNil, check.Commentf("Output: %s", out)) + id := strings.TrimSpace(out) + + c.Assert(s.d.Restart("--live-restore"), check.IsNil) + + c.Assert(s.d.waitRun(id), check.IsNil) + + pid, err := s.d.Cmd("inspect", "-f", "{{.State.Pid}}", id) + c.Assert(err, check.IsNil) + pidint, err := strconv.Atoi(strings.TrimSpace(pid)) + c.Assert(err, check.IsNil) + c.Assert(syscall.Kill(pidint, syscall.SIGKILL), check.IsNil) + + // This test is only for v1.12 and only checks that killing of a process + // doesn't cause a panic. Actual issue is fixed in v1.13 with a proper test. + calls := 0 + for range time.NewTicker(500 * time.Millisecond).C { + out, err := s.d.inspectFilter(id, "json .Id") + c.Assert(err, checker.IsNil, check.Commentf(out)) + calls++ + if calls >= 10 { + break + } + } +} diff --git a/libcontainerd/client_linux.go b/libcontainerd/client_linux.go index 39b0999d3f4ec..1ea8922020bf4 100644 --- a/libcontainerd/client_linux.go +++ b/libcontainerd/client_linux.go @@ -424,6 +424,7 @@ func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Ev container := clnt.newContainer(cont.BundlePath, options...) container.systemPid = systemPid(cont) + container.attachStdio = attachStdio var terminal bool for _, p := range cont.Processes { @@ -596,6 +597,7 @@ func (clnt *client) Restore(containerID string, attachStdio StdioCallback, optio w := clnt.getOrCreateExitNotifier(containerID) clnt.lock(cont.Id) container := clnt.newContainer(cont.BundlePath) + container.attachStdio = attachStdio container.systemPid = systemPid(cont) clnt.appendContainer(container) clnt.unlock(cont.Id) From 85c245a239797a1571678960144bb2f40e9cf163 Mon Sep 17 00:00:00 2001 From: Kenfe-Mickael Laventure Date: Tue, 10 Jan 2017 11:17:27 -0800 Subject: [PATCH 02/13] Update runc to 50a19c6ff828c58e5dab13830bd3dacde268afe5 Signed-off-by: Kenfe-Mickael Laventure --- Dockerfile | 2 +- Dockerfile.aarch64 | 2 +- Dockerfile.armhf | 2 +- Dockerfile.gccgo | 2 +- Dockerfile.ppc64le | 2 +- Dockerfile.s390x | 2 +- Dockerfile.simple | 2 +- hack/vendor.sh | 2 +- 8 files changed, 8 insertions(+), 8 deletions(-) diff --git a/Dockerfile b/Dockerfile index ea15ae46c49db..40ab8ffbd659d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -242,7 +242,7 @@ RUN set -x \ && rm -rf "$GOPATH" # Install runc -ENV RUNC_COMMIT f59ba3cdd76fdc08c004f42aa915996f6f420899 +ENV RUNC_COMMIT 50a19c6ff828c58e5dab13830bd3dacde268afe5 RUN set -x \ && export GOPATH="$(mktemp -d)" \ && git clone https://github.com/docker/runc.git "$GOPATH/src/github.com/opencontainers/runc" \ diff --git a/Dockerfile.aarch64 b/Dockerfile.aarch64 index 6e5f48307e828..abbb71ab917f4 100644 --- a/Dockerfile.aarch64 +++ b/Dockerfile.aarch64 @@ -181,7 +181,7 @@ RUN set -x \ && rm -rf "$GOPATH" # Install runc -ENV RUNC_COMMIT f59ba3cdd76fdc08c004f42aa915996f6f420899 +ENV RUNC_COMMIT 50a19c6ff828c58e5dab13830bd3dacde268afe5 RUN set -x \ && export GOPATH="$(mktemp -d)" \ && git clone https://github.com/docker/runc.git "$GOPATH/src/github.com/opencontainers/runc" \ diff --git a/Dockerfile.armhf b/Dockerfile.armhf index aa587a75b57d5..a1269c83d9e20 100644 --- a/Dockerfile.armhf +++ b/Dockerfile.armhf @@ -190,7 +190,7 @@ RUN set -x \ && rm -rf "$GOPATH" # Install runc -ENV RUNC_COMMIT f59ba3cdd76fdc08c004f42aa915996f6f420899 +ENV RUNC_COMMIT 50a19c6ff828c58e5dab13830bd3dacde268afe5 RUN set -x \ && export GOPATH="$(mktemp -d)" \ && git clone https://github.com/docker/runc.git "$GOPATH/src/github.com/opencontainers/runc" \ diff --git a/Dockerfile.gccgo b/Dockerfile.gccgo index a032c51304d6a..9f79ce45501ea 100644 --- a/Dockerfile.gccgo +++ b/Dockerfile.gccgo @@ -74,7 +74,7 @@ WORKDIR /go/src/github.com/docker/docker ENV DOCKER_BUILDTAGS apparmor seccomp selinux # Install runc -ENV RUNC_COMMIT f59ba3cdd76fdc08c004f42aa915996f6f420899 +ENV RUNC_COMMIT 50a19c6ff828c58e5dab13830bd3dacde268afe5 RUN set -x \ && export GOPATH="$(mktemp -d)" \ && git clone https://github.com/docker/runc.git "$GOPATH/src/github.com/opencontainers/runc" \ diff --git a/Dockerfile.ppc64le b/Dockerfile.ppc64le index 746344730f8cb..fdce4eb649a3d 100644 --- a/Dockerfile.ppc64le +++ b/Dockerfile.ppc64le @@ -195,7 +195,7 @@ RUN set -x \ && rm -rf "$GOPATH" # Install runc -ENV RUNC_COMMIT f59ba3cdd76fdc08c004f42aa915996f6f420899 +ENV RUNC_COMMIT 50a19c6ff828c58e5dab13830bd3dacde268afe5 RUN set -x \ && export GOPATH="$(mktemp -d)" \ && git clone https://github.com/docker/runc.git "$GOPATH/src/github.com/opencontainers/runc" \ diff --git a/Dockerfile.s390x b/Dockerfile.s390x index 6382c82c7cf3a..346afa839356b 100644 --- a/Dockerfile.s390x +++ b/Dockerfile.s390x @@ -197,7 +197,7 @@ RUN set -x \ && rm -rf "$GOPATH" # Install runc -ENV RUNC_COMMIT f59ba3cdd76fdc08c004f42aa915996f6f420899 +ENV RUNC_COMMIT 50a19c6ff828c58e5dab13830bd3dacde268afe5 RUN set -x \ && export GOPATH="$(mktemp -d)" \ && git clone https://github.com/docker/runc.git "$GOPATH/src/github.com/opencontainers/runc" \ diff --git a/Dockerfile.simple b/Dockerfile.simple index 07edadd0ce65c..8aeb6e681c193 100644 --- a/Dockerfile.simple +++ b/Dockerfile.simple @@ -57,7 +57,7 @@ ENV GOPATH /go:/go/src/github.com/docker/docker/vendor ENV CGO_LDFLAGS -L/lib # Install runc -ENV RUNC_COMMIT f59ba3cdd76fdc08c004f42aa915996f6f420899 +ENV RUNC_COMMIT 50a19c6ff828c58e5dab13830bd3dacde268afe5 RUN set -x \ && export GOPATH="$(mktemp -d)" \ && git clone https://github.com/docker/runc.git "$GOPATH/src/github.com/opencontainers/runc" \ diff --git a/hack/vendor.sh b/hack/vendor.sh index a6fc6d29cb913..8ff5972f800a4 100755 --- a/hack/vendor.sh +++ b/hack/vendor.sh @@ -102,7 +102,7 @@ clone git github.com/miekg/pkcs11 df8ae6ca730422dba20c768ff38ef7d79077a59f clone git github.com/docker/go v1.5.1-1-1-gbaf439e clone git github.com/agl/ed25519 d2b94fd789ea21d12fac1a4443dd3a3f79cda72c -clone git github.com/opencontainers/runc f59ba3cdd76fdc08c004f42aa915996f6f420899 https://github.com/docker/runc.git # libcontainer +clone git github.com/opencontainers/runc 50a19c6ff828c58e5dab13830bd3dacde268afe5 https://github.com/docker/runc.git # libcontainer clone git github.com/opencontainers/specs 1c7c27d043c2a5e513a44084d2b10d77d1402b8c # specs clone git github.com/seccomp/libseccomp-golang 32f571b70023028bd57d9288c20efbcb237f3ce0 # libcontainer deps (see src/github.com/opencontainers/runc/Godeps/Godeps.json) From 94dfa200668a2520d1fbd6a5a540887beebc28df Mon Sep 17 00:00:00 2001 From: Kenfe-Mickael Laventure Date: Tue, 10 Jan 2017 11:26:47 -0800 Subject: [PATCH 03/13] Bump version to 1.12.6 Signed-off-by: Kenfe-Mickael Laventure --- CHANGELOG.md | 70 ++++++++++++++++++++++++++++++++++++++++++++++++++++ VERSION | 2 +- 2 files changed, 71 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cdfb27b2d5bb4..4fb49a8cc2a36 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,76 @@ information on the list of deprecated flags and APIs please have a look at https://docs.docker.com/engine/deprecated/ where target removal dates can also be found. +## 1.12.6 (2017-01-10) + +**IMPORTANT**: Docker 1.12 ships with an updated systemd unit file for rpm +based installs (which includes RHEL, Fedora, CentOS, and Oracle Linux 7). When +upgrading from an older version of docker, the upgrade process may not +automatically install the updated version of the unit file, or fail to start +the docker service if; + +- the systemd unit file (`/usr/lib/systemd/system/docker.service`) contains local changes, or +- a systemd drop-in file is present, and contains `-H fd://` in the `ExecStart` directive + +Starting the docker service will produce an error: + + Failed to start docker.service: Unit docker.socket failed to load: No such file or directory. + +or + + no sockets found via socket activation: make sure the service was started by systemd. + +To resolve this: + +- Backup the current version of the unit file, and replace the file with the + [version that ships with docker 1.12](https://raw.githubusercontent.com/docker/docker/v1.12.0/contrib/init/systemd/docker.service.rpm) +- Remove the `Requires=docker.socket` directive from the `/usr/lib/systemd/system/docker.service` file if present +- Remove `-H fd://` from the `ExecStart` directive (both in the main unit file, and in any drop-in files present). + +After making those changes, run `sudo systemctl daemon-reload`, and `sudo +systemctl restart docker` to reload changes and (re)start the docker daemon. + +**NOTE**: Docker 1.12.5 will correctly validate that either an IPv6 subnet is provided or +that the IPAM driver can provide one when you specify the `--ipv6` option. + +If you are currently using the `--ipv6` option _without_ specifying the +`--fixed-cidr-v6` option, the Docker daemon will refuse to start with the +following message: + +```none +Error starting daemon: Error initializing network controller: Error creating + default "bridge" network: failed to parse pool request + for address space "LocalDefault" pool " subpool ": + could not find an available, non-overlapping IPv6 address + pool among the defaults to assign to the network +``` + +To resolve this error, either remove the `--ipv6` flag (to preserve the same +behavior as in Docker 1.12.3 and earlier), or provide an IPv6 subnet as the +value of the `--fixed-cidr-v6` flag. + +In a similar way, if you specify the `--ipv6` flag when creating a network +with the default IPAM driver, without providing an IPv6 `--subnet`, network +creation will fail with the following message: + +```none +Error response from daemon: failed to parse pool request for address space + "LocalDefault" pool "" subpool "": could not find an + available, non-overlapping IPv6 address pool among + the defaults to assign to the network +``` + +To resolve this, either remove the `--ipv6` flag (to preserve the same behavior +as in Docker 1.12.3 and earlier), or provide an IPv6 subnet as the value of the +`--subnet` flag. + +The network network creation will instead succeed if you use an external IPAM driver +which supports automatic allocation of IPv6 subnets. + +### Runtime + +- Fix runC privilege escalation (CVE-2016-9962) + ## 1.12.5 (2016-12-15) **IMPORTANT**: Docker 1.12 ships with an updated systemd unit file for rpm diff --git a/VERSION b/VERSION index e0a6b34fb0aa0..456e5c4ad803e 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.12.5 +1.12.6 From c529d3d7ee22597d13f56c2cf8073bd6dbbe4c18 Mon Sep 17 00:00:00 2001 From: Adrien Duermael Date: Fri, 13 Jan 2017 11:44:11 -0800 Subject: [PATCH 04/13] docs: removed links to stack_tasks & stack_ls `docsreference/commandline/stack_tasks.md ` & `docsreference/commandline/stack_ls.md` do not exist. I need this to be merge to solve that issue: https://github.com/docker/docker.github.io/issues/1068, and merge new CI tests for the docs: https://github.com/docker/docker.github.io/pull/1052 Signed-off-by: Adrien Duermael --- docs/reference/commandline/deploy.md | 1 - docs/reference/commandline/stack_config.md | 1 - docs/reference/commandline/stack_deploy.md | 1 - docs/reference/commandline/stack_rm.md | 1 - docs/reference/commandline/stack_services.md | 1 - 5 files changed, 5 deletions(-) diff --git a/docs/reference/commandline/deploy.md b/docs/reference/commandline/deploy.md index 34cf39882f310..b0aed6f6eb0f8 100644 --- a/docs/reference/commandline/deploy.md +++ b/docs/reference/commandline/deploy.md @@ -52,4 +52,3 @@ axqh55ipl40h vossibility-stack_vossibility-collector 1 icecrime/vossibility-co * [stack config](stack_config.md) * [stack deploy](stack_deploy.md) * [stack rm](stack_rm.md) -* [stack tasks](stack_tasks.md) diff --git a/docs/reference/commandline/stack_config.md b/docs/reference/commandline/stack_config.md index bc759c163e661..88b716242a7e6 100644 --- a/docs/reference/commandline/stack_config.md +++ b/docs/reference/commandline/stack_config.md @@ -26,4 +26,3 @@ Displays the configuration of a stack. * [stack rm](stack_rm.md) * [stack services](stack_services.md) * [stack ps](stack_ps.md) -* [stack ls](stack_ls.md) diff --git a/docs/reference/commandline/stack_deploy.md b/docs/reference/commandline/stack_deploy.md index 51b3e1fc5879c..36bdace74c939 100644 --- a/docs/reference/commandline/stack_deploy.md +++ b/docs/reference/commandline/stack_deploy.md @@ -55,4 +55,3 @@ axqh55ipl40h vossibility-stack_vossibility-collector 1 icecrime/vossibility-co * [stack rm](stack_rm.md) * [stack services](stack_services.md) * [stack ps](stack_ps.md) -* [stack ls](stack_ls.md) diff --git a/docs/reference/commandline/stack_rm.md b/docs/reference/commandline/stack_rm.md index 60aca06f9d171..361c0b3c32ca9 100644 --- a/docs/reference/commandline/stack_rm.md +++ b/docs/reference/commandline/stack_rm.md @@ -29,4 +29,3 @@ a manager node. * [stack deploy](stack_deploy.md) * [stack services](stack_services.md) * [stack ps](stack_ps.md) -* [stack ls](stack_ls.md) diff --git a/docs/reference/commandline/stack_services.md b/docs/reference/commandline/stack_services.md index a1a2ec07ef7a4..032f816529633 100644 --- a/docs/reference/commandline/stack_services.md +++ b/docs/reference/commandline/stack_services.md @@ -60,4 +60,3 @@ The currently supported filters are: * [stack deploy](stack_deploy.md) * [stack rm](stack_rm.md) * [stack ps](stack_ps.md) -* [stack ls](stack_ls.md) From 915f6046bca3569bc2e557b564832ebdd31365f5 Mon Sep 17 00:00:00 2001 From: Adrien Duermael Date: Fri, 13 Jan 2017 11:52:09 -0800 Subject: [PATCH 05/13] fixed broken links in docs I need this to be merge to solve that issue: https://github.com/docker/docker.github.io/issues/1068, and merge new CI tests for the docs: https://github.com/docker/docker.github.io/pull/1052 Signed-off-by: Adrien Duermael --- docs/reference/commandline/login.md | 2 +- docs/understanding-docker.md | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/reference/commandline/login.md b/docs/reference/commandline/login.md index 7fe8d6a43e1ac..647284636722d 100644 --- a/docs/reference/commandline/login.md +++ b/docs/reference/commandline/login.md @@ -29,7 +29,7 @@ adding the server name. `docker login` requires user to use `sudo` or be `root`, except when: 1. connecting to a remote daemon, such as a `docker-machine` provisioned `docker engine`. -2. user is added to the `docker` group. This will impact the security of your system; the `docker` group is `root` equivalent. See [Docker Daemon Attack Surface](/security/security/#docker-daemon-attack-surface) for details. +2. user is added to the `docker` group. This will impact the security of your system; the `docker` group is `root` equivalent. See [Docker Daemon Attack Surface](/engine/security/security/#docker-daemon-attack-surface) for details. You can log into any public or private repository for which you have credentials. When you log in, the command stores encoded credentials in diff --git a/docs/understanding-docker.md b/docs/understanding-docker.md index 08efa688f848d..ea758261509b4 100644 --- a/docs/understanding-docker.md +++ b/docs/understanding-docker.md @@ -208,8 +208,8 @@ existing images and pull them from the registry to a host. [Docker Hub](http://hub.docker.com) is a public Docker registry which serves a huge collection of existing images and allows you to contribute your own. For more information, go to -[Docker Registry](https://docs.docker.com/registry/overview/) and -[Docker Trusted Registry](https://docs.docker.com/docker-trusted-registry/overview/). +[Docker Registry](https://docs.docker.com/registry/) and +[Docker Trusted Registry](https://docs.docker.com/datacenter/dtr/2.0/). [Docker store](http://store.docker.com) allows you to buy and sell Docker images. For image, you can buy a Docker image containing an application or service from From 5469d0f7995bf47bfab9e1e216006dc12a858f81 Mon Sep 17 00:00:00 2001 From: "yinzhuo.dingyz" Date: Thu, 29 Dec 2016 19:32:04 +0800 Subject: [PATCH 06/13] add alilogs to 1.12.x --- .gitignore | 1 + daemon/logdrivers_linux.go | 1 + daemon/logger/alilogs/alilogs.go | 295 +++++++++++++++++++++++++++++++ daemon/logger/alilogs/logapi.go | 72 ++++++++ vendor.conf | 145 +++++++++++++++ 5 files changed, 514 insertions(+) create mode 100644 daemon/logger/alilogs/alilogs.go create mode 100644 daemon/logger/alilogs/logapi.go create mode 100644 vendor.conf diff --git a/.gitignore b/.gitignore index 43aa9227e2320..f43b1e9423b27 100644 --- a/.gitignore +++ b/.gitignore @@ -27,4 +27,5 @@ docs/changed-files man/man1 man/man5 man/man8 +vendor/ vendor/pkg/ diff --git a/daemon/logdrivers_linux.go b/daemon/logdrivers_linux.go index 89fe49a858831..ea34eb9204852 100644 --- a/daemon/logdrivers_linux.go +++ b/daemon/logdrivers_linux.go @@ -3,6 +3,7 @@ package daemon import ( // Importing packages here only to make sure their init gets called and // therefore they register themselves to the logdriver factory. + _ "github.com/docker/docker/daemon/logger/alilogs" _ "github.com/docker/docker/daemon/logger/awslogs" _ "github.com/docker/docker/daemon/logger/fluentd" _ "github.com/docker/docker/daemon/logger/gcplogs" diff --git a/daemon/logger/alilogs/alilogs.go b/daemon/logger/alilogs/alilogs.go new file mode 100644 index 0000000000000..4fde6a97cbec4 --- /dev/null +++ b/daemon/logger/alilogs/alilogs.go @@ -0,0 +1,295 @@ +// Package alilogs provides the logdriver for forwarding container logs to Ali Log Service + +package alilogs + +import ( + "fmt" + "sync" + "time" + + "github.com/Sirupsen/logrus" + "github.com/docker/docker/daemon/logger" + "github.com/galaxydi/go-loghub" + "github.com/golang/protobuf/proto" +) + +const ( + name = "alilogs" + endpointKey = "alilogs-endpoint" + projectKey = "alilogs-project" + logstoreKey = "alilogs-logstore" + + topicEnvKey = "topic" + serviceEnvKey = "serviceName" + functionEnvKey = "functionName" + requestIDEnvKey = "requestID" + + accessKeyIDEnvKey = "accessKeyID" + accessKeySecretEnvKey = "accessKeySecret" + sessionTokenEnvKey = "sessionToken" + + batchPublishFrequency = 5 * time.Second + + //PutLogs接口每次可以写入的日志数据量上限为3MB或者4096条 + maximumBytesPerPut = 3145728 + maximumLogsPerPut = 4096 +) + +type logStream struct { + topic string + serviceName string + functionName string + requestID string + extraLogContents []*sls.LogContent + client AliLogAPI + messages chan *logger.Message + lock sync.RWMutex + closed bool +} + +// init registers the alilogs driver +func init() { + if err := logger.RegisterLogDriver(name, New); err != nil { + logrus.Fatal(err) + } + if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil { + logrus.Fatal(err) + } +} + +// New creates an alilogs logger using the configuration passed in on the context +func New(ctx logger.Context) (logger.Logger, error) { + endpoint := ctx.Config[endpointKey] + projectName := ctx.Config[projectKey] + logstoreName := ctx.Config[logstoreKey] + extraContents := []*sls.LogContent{} + accessKeyID := "" + accessKeySecret := "" + sessionToken := "" + topicName := "" + serviceName := "" + functionName := "" + requestID := "" + + extra := ctx.ExtraAttributes(nil) + value, ok := extra[accessKeyIDEnvKey] + if ok { + accessKeyID = value + } else { + return nil, fmt.Errorf("must specify a value for env '%s'", accessKeyIDEnvKey) + } + value, ok = extra[accessKeySecretEnvKey] + if ok { + accessKeySecret = value + } else { + return nil, fmt.Errorf("must specify a value for env '%s'", accessKeySecretEnvKey) + } + + if value, ok = extra[sessionTokenEnvKey]; ok { + sessionToken = value + } + + if value, ok = extra[topicEnvKey]; ok { + topicName = value + } + + // extra attributes in log record + if value, ok = extra[serviceEnvKey]; ok { + serviceName = value + serviceNameContent := &sls.LogContent{ + Key: proto.String(serviceEnvKey), + Value: proto.String(serviceName), + } + extraContents = append(extraContents, serviceNameContent) + } + if value, ok = extra[functionEnvKey]; ok { + functionName = value + functionNameContent := &sls.LogContent{ + Key: proto.String(functionEnvKey), + Value: proto.String(functionName), + } + extraContents = append(extraContents, functionNameContent) + } + if value, ok = extra[requestIDEnvKey]; ok { + requestID = value + requestIDContent := &sls.LogContent{ + Key: proto.String(requestIDEnvKey), + Value: proto.String(requestID), + } + extraContents = append(extraContents, requestIDContent) + } + + aliLogClient, err := NewAliLogClient(endpoint, projectName, logstoreName, accessKeyID, accessKeySecret, sessionToken) + if err != nil { + return nil, err + } + containerStream := &logStream{ + topic: topicName, + serviceName: serviceName, + functionName: functionName, + requestID: requestID, + extraLogContents: extraContents, + client: aliLogClient, + messages: make(chan *logger.Message, maximumLogsPerPut), + } + + go containerStream.collectLogs() + return containerStream, nil +} + +// Name returns the name of ali logging driver +func (ls *logStream) Name() string { + return name +} + +// Log submits messages for logging by an instance of the alilogs logging driver +func (ls *logStream) Log(msg *logger.Message) error { + ls.lock.RLock() + defer ls.lock.RUnlock() + if !ls.closed { + // buffer up the data, making sure to copy the Line data + ls.messages <- msg + } + return nil +} + +// Close closes the instance of the alilogs logging driver +func (ls *logStream) Close() error { + ls.lock.Lock() + defer ls.lock.Unlock() + if !ls.closed { + close(ls.messages) + } + ls.closed = true + return nil +} + +// newTicker is used for time-based batching. newTicker is a variable such +// that the implementation can be swapped out for unit tests. +var newTicker = func(freq time.Duration) *time.Ticker { + return time.NewTicker(freq) +} + +// PutLogs executes as a goroutine to perform put logs for +// submission to the logstore. Batching is performed on time- and size- +// bases. Time-based batching occurs at a 5 second interval (defined in the +// batchPublishFrequency const). Size-based batching is performed on the +// maximum number of logs per batch (defined in maximumLogsPerPut) and +// the maximum number of total bytes in a batch (defined in +// maximumBytesPerPut). +func (ls *logStream) collectLogs() { + aliLogClient := ls.client.(*AliLogClient) + logGroup := sls.LogGroup{ + Topic: proto.String(ls.topic), + Logs: []*sls.Log{}, + } + timer := newTicker(batchPublishFrequency) + for { + select { + case <-timer.C: + ls.publishLogs(&logGroup) + logrus.WithFields(logrus.Fields{ + "endpoint": aliLogClient.Endpoint, + "project": aliLogClient.ProjectName, + "logstore": aliLogClient.LogstoreName, + "serviceName": ls.serviceName, + "functionName": ls.functionName, + "requestID": ls.requestID, + "published log number": len(logGroup.Logs), + "published log size": logGroup.Size(), + }).Debug("publish log when timer timeout") + logGroup.Reset() + logGroup.Topic = proto.String(ls.topic) + case msg, more := <-ls.messages: + if !more { + ls.publishLogs(&logGroup) + logrus.WithFields(logrus.Fields{ + "endpoint": aliLogClient.Endpoint, + "project": aliLogClient.ProjectName, + "logstore": aliLogClient.LogstoreName, + "serviceName": ls.serviceName, + "functionName": ls.functionName, + "reuestID": ls.requestID, + "published log number": len(logGroup.Logs), + "published log size": logGroup.Size(), + }).Debug("publish log when no more logs") + return + } + unprocessedLine := msg.Line + logMsg := &sls.LogContent{ + Key: proto.String("message"), + Value: proto.String(string(unprocessedLine)), + } + contents := ls.extraLogContents + contents = append(contents, logMsg) + logRecord := sls.Log{ + Time: proto.Uint32(uint32(time.Now().Unix())), + Contents: contents, + } + if len(unprocessedLine) > 0 { + if (len(logGroup.Logs) >= maximumLogsPerPut) || (logGroup.Size()+logRecord.Size() > maximumBytesPerPut) { + // Publish an existing batch if it's already over the maximum number of logs or if adding this + // line would push it over the maximum number of total bytes. + ls.publishLogs(&logGroup) + logrus.WithFields(logrus.Fields{ + "endpoint": aliLogClient.Endpoint, + "project": aliLogClient.ProjectName, + "logstore": aliLogClient.LogstoreName, + "serviceName": ls.serviceName, + "functionName": ls.functionName, + "requestID": ls.requestID, + "published log number": len(logGroup.Logs), + "published log size": logGroup.Size(), + }).Debug("publish logs when touch the limit") + logGroup.Reset() + logGroup.Topic = proto.String(ls.topic) + } + logGroup.Logs = append(logGroup.Logs, &logRecord) + } + } + } +} + +// publishLogs calls PutLogs for a given LogGroup +func (ls *logStream) publishLogs(lg *sls.LogGroup) { + err := ls.client.PutLogs(lg) + if err != nil { + if serviceErr, ok := err.(sls.Error); ok { + aliLogClient := ls.client.(*AliLogClient) + logrus.WithFields(logrus.Fields{ + "errorCode": serviceErr.Code, + "errorMessage": serviceErr.Message, + "endpoint": aliLogClient.Endpoint, + "project": aliLogClient.ProjectName, + "logstore": aliLogClient.LogstoreName, + "serviceName": ls.serviceName, + "functionName": ls.functionName, + "requestId": ls.requestID, + }).Error("PutLogs occurs sls error") + } else { + logrus.Error(err) + } + } +} + +// ValidateLogOpt looks for alilogs-specific log options +func ValidateLogOpt(cfg map[string]string) error { + for key := range cfg { + switch key { + case "env": + case endpointKey, projectKey, logstoreKey: + default: + return fmt.Errorf("unknown log opt '%s' for %s log driver", key, name) + } + } + if cfg[endpointKey] == "" { + return fmt.Errorf("must specify a value for log opt '%s'", endpointKey) + } + if cfg[projectKey] == "" { + return fmt.Errorf("must specify a value for log opt '%s'", projectKey) + } + if cfg[logstoreKey] == "" { + return fmt.Errorf("must specify a value for log opt '%s'", logstoreKey) + } + return nil +} diff --git a/daemon/logger/alilogs/logapi.go b/daemon/logger/alilogs/logapi.go new file mode 100644 index 0000000000000..7292d36975cf8 --- /dev/null +++ b/daemon/logger/alilogs/logapi.go @@ -0,0 +1,72 @@ +// Pakcage alilogs api interface + +package alilogs + +import ( + "errors" + + "github.com/Sirupsen/logrus" + "github.com/galaxydi/go-loghub" +) + +// AliLogAPI define log api interface +type AliLogAPI interface { + PutLogs(*sls.LogGroup) error +} + +// AliLogClient implements AliLogAPI interface +type AliLogClient struct { + Endpoint string + ProjectName string + LogstoreName string + accessKeyID string + accessKeySecret string + sessionToken string + project *sls.LogProject + logstore *sls.LogStore +} + +// PutLogs implements ali PutLogs method +func (client *AliLogClient) PutLogs(logGroup *sls.LogGroup) error { + return client.logstore.PutLogs(logGroup) +} + +// NewAliLogClient ... +func NewAliLogClient(serviceEndpoint, projectName, logstoreName, accessID, accessSecret, token string) (AliLogAPI, error) { + client := AliLogClient{} + client.Endpoint = serviceEndpoint + client.ProjectName = projectName + client.LogstoreName = logstoreName + client.accessKeyID = accessID + client.accessKeySecret = accessSecret + client.sessionToken = token + + logrus.WithFields(logrus.Fields{ + "endpoint": serviceEndpoint, + "projectName": projectName, + "logstoreName": logstoreName, + }).Info("Created alilogs client") + + logProject, err := sls.NewLogProject(projectName, serviceEndpoint, accessID, accessSecret) + if err != nil { + logrus.WithFields(logrus.Fields{ + "error": err, + }).Error("Could not get ali log project") + return nil, errors.New("Could not get ali log project") + } + if client.sessionToken != "" { + logProject.WithToken(client.sessionToken) + } + client.project = logProject + + logStore, err := client.project.GetLogStore(logstoreName) + if err != nil { + logrus.WithFields(logrus.Fields{ + "error": err, + }).Error("Could not get ali logstore") + return nil, errors.New("Could not get ali logstore") + } + client.logstore = logStore + + return &client, nil +} diff --git a/vendor.conf b/vendor.conf new file mode 100644 index 0000000000000..e50aa273bd445 --- /dev/null +++ b/vendor.conf @@ -0,0 +1,145 @@ +# the following lines are in sorted order, FYI +github.com/Azure/go-ansiterm 388960b655244e76e24c75f48631564eaefade62 +github.com/Microsoft/hcsshim v0.5.9 +github.com/Microsoft/go-winio v0.3.7 +github.com/Sirupsen/logrus f76d643702a30fbffecdfe50831e11881c96ceb3 https://github.com/aaronlehmann/logrus +github.com/davecgh/go-spew 6d212800a42e8ab5c146b8ace3490ee17e5225f9 +github.com/docker/libtrust 9cbd2a1374f46905c68a4eb3694a130610adc62a +github.com/go-check/check 4ed411733c5785b40214c70bce814c3a3a689609 https://github.com/cpuguy83/check.git +github.com/gorilla/context v1.1 +github.com/gorilla/mux v1.1 +github.com/kr/pty 5cf931ef8f +github.com/mattn/go-shellwords v1.0.0 +github.com/mattn/go-sqlite3 v1.1.0 +github.com/tchap/go-patricia v2.2.6 +github.com/vdemeester/shakers 24d7f1d6a71aa5d9cbe7390e4afb66b7eef9e1b3 +# forked golang.org/x/net package includes a patch for lazy loading trace templates +golang.org/x/net 2beffdc2e92c8a3027590f898fe88f69af48a3f8 https://github.com/tonistiigi/net.git +golang.org/x/sys 8f0908ab3b2457e2e15403d3697c9ef5cb4b57a9 +github.com/docker/go-units 8a7beacffa3009a9ac66bad506b18ffdd110cf97 +github.com/docker/go-connections 4ccf312bf1d35e5dbda654e57a9be4c3f3cd0366 + +github.com/RackSec/srslog 456df3a81436d29ba874f3590eeeee25d666f8a5 +github.com/imdario/mergo 0.2.1 + +#get libnetwork packages +github.com/docker/libnetwork b908488a139e81cb8c4091cd836745aeb4d813a4 +github.com/docker/go-events 18b43f1bc85d9cdd42c05a6cd2d444c7a200a894 +github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80 +github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec +github.com/hashicorp/go-msgpack 71c2886f5a673a35f909803f38ece5810165097b +github.com/hashicorp/memberlist 88ac4de0d1a0ca6def284b571342db3b777a4c37 +github.com/hashicorp/go-multierror fcdddc395df1ddf4247c69bd436e84cfa0733f7e +github.com/hashicorp/serf 598c54895cc5a7b1a24a398d635e8c0ea0959870 +github.com/docker/libkv 1d8431073ae03cdaedb198a89722f3aab6d418ef +github.com/vishvananda/netns 604eaf189ee867d8c147fafc28def2394e878d25 +github.com/vishvananda/netlink 482f7a52b758233521878cb6c5904b6bd63f3457 +github.com/BurntSushi/toml f706d00e3de6abe700c994cdd545a1a4915af060 +github.com/samuel/go-zookeeper d0e0d8e11f318e000a8cc434616d69e329edc374 +github.com/deckarep/golang-set ef32fa3046d9f249d399f98ebaf9be944430fd1d +github.com/coreos/etcd 3a49cbb769ebd8d1dd25abb1e83386e9883a5707 +github.com/ugorji/go f1f1a805ed361a0e078bb537e4ea78cd37dcf065 +github.com/hashicorp/consul v0.5.2 +github.com/boltdb/bolt fff57c100f4dea1905678da7e90d92429dff2904 +github.com/miekg/dns 75e6e86cc601825c5dbcd4e0c209eab180997cd7 + +# get graph and distribution packages +github.com/docker/distribution 28602af35aceda2f8d571bad7ca37a54cf0250bc +github.com/vbatts/tar-split v0.10.1 + +# get go-zfs packages +github.com/mistifyio/go-zfs 22c9b32c84eb0d0c6f4043b6e90fc94073de92fa +github.com/pborman/uuid v1.0 + +# get desired notary commit, might also need to be updated in Dockerfile +github.com/docker/notary v0.4.2 + +google.golang.org/grpc v1.0.2 +github.com/miekg/pkcs11 df8ae6ca730422dba20c768ff38ef7d79077a59f +github.com/docker/go v1.5.1-1-1-gbaf439e +github.com/agl/ed25519 d2b94fd789ea21d12fac1a4443dd3a3f79cda72c + +github.com/opencontainers/runc 51371867a01c467f08af739783b8beafc15 # libcontainer +github.com/opencontainers/runtime-spec 1c7c27d043c2a5e513a44084d2b10d77d1402b8c # specs +github.com/seccomp/libseccomp-golang 32f571b70023028bd57d9288c20efbcb237f3ce0 +# libcontainer deps (see src/github.com/opencontainers/runc/Godeps/Godeps.json) +github.com/coreos/go-systemd v4 +github.com/godbus/dbus v4.0.0 +github.com/syndtr/gocapability 2c00daeb6c3b45114c80ac44119e7b8801fdd852 +github.com/golang/protobuf 1f49d83d9aa00e6ce4fc8258c71cc7786aec968a + +# gelf logging driver deps +github.com/Graylog2/go-gelf aab2f594e4585d43468ac57287b0dece9d806883 + +github.com/fluent/fluent-logger-golang v1.2.1 +# fluent-logger-golang deps +github.com/philhofer/fwd 899e4efba8eaa1fea74175308f3fae18ff3319fa +github.com/tinylib/msgp 75ee40d2601edf122ef667e2a07d600d4c44490c + +# fsnotify +github.com/fsnotify/fsnotify v1.2.11 + +# alilogs deps +github.com/galaxydi/go-loghub d52b6d91786547a03aadec3eabb867189fd96df3 +github.com/cloudflare/golz4 ef862a3cdc58a6f1fee4e3af3d44fbe279194cde +github.com/golang/glog 23def4e6c14b4da8ac2ed8007337bc5eb5007998 + +# awslogs deps +github.com/aws/aws-sdk-go v1.4.22 +github.com/go-ini/ini 060d7da055ba6ec5ea7a31f116332fe5efa04ce0 +github.com/jmespath/go-jmespath 0b12d6b521d83fc7f755e7cfc1b1fbdd35a01a74 + +# logentries +github.com/bsphere/le_go d3308aafe090956bc89a65f0769f58251a1b4f03 + +# gcplogs deps +golang.org/x/oauth2 2baa8a1b9338cf13d9eeb27696d761155fa480be +google.golang.org/api dc6d2353af16e2a2b0ff6986af051d473a4ed468 +google.golang.org/cloud dae7e3d993bc3812a2185af60552bb6b847e52a0 + +# native credentials +github.com/docker/docker-credential-helpers f72c04f1d8e71959a6d103f808c50ccbad79b9fd + +# containerd +github.com/docker/containerd 03e5862ec0d8d3b3f750e19fca3ee367e13c090e +github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4 + +# cluster +github.com/docker/swarmkit 9e4bd71a1690cd27400714fcd98c329b752b5c4c +github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9 +github.com/gogo/protobuf v0.3 +github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a +github.com/google/certificate-transparency d90e65c3a07988180c5b1ece71791c0b6506826e +golang.org/x/crypto 3fbbcd23f1cb824e69491a5930cfeff09b12f4d2 +golang.org/x/time a4bde12657593d5e90d0533a3e4fd95e635124cb +github.com/mreiferson/go-httpclient 63fe23f7434723dc904c901043af07931f293c47 +github.com/hashicorp/go-memdb 608dda3b1410a73eaf3ac8b517c9ae7ebab6aa87 https://github.com/floridoo/go-memdb +github.com/hashicorp/go-immutable-radix 8e8ed81f8f0bf1bdd829593fdd5c29922c1ea990 +github.com/hashicorp/golang-lru a0d98a5f288019575c6d1f4bb1573fef2d1fcdc4 +github.com/coreos/pkg fa29b1d70f0beaddd4c7021607cc3c3be8ce94b8 +github.com/pivotal-golang/clock 3fd3c1944c59d9742e1cd333672181cd1a6f9fa0 +github.com/prometheus/client_golang 52437c81da6b127a9925d17eb3a382a2e5fd395e +github.com/beorn7/perks 4c0e84591b9aa9e6dcfdf3e020114cd81f89d5f9 +github.com/prometheus/client_model fa8ad6fec33561be4280a8f0514318c79d7f6cb6 +github.com/prometheus/common ebdfc6da46522d58825777cf1f90490a5b1ef1d8 +github.com/prometheus/procfs abf152e5f3e97f2fafac028d2cc06c1feb87ffa5 +bitbucket.org/ww/goautoneg 75cd24fc2f2c2a2088577d12123ddee5f54e0675 +github.com/matttproud/golang_protobuf_extensions fc2b8d3a73c4867e51861bbdd5ae3c1f0869dd6a +github.com/pkg/errors 839d9e913e063e28dfd0e6c7b7512793e0a48be9 + +# cli +github.com/spf13/cobra v1.5 https://github.com/dnephin/cobra.git +github.com/spf13/pflag dabebe21bf790f782ea4c7bbd2efc430de182afd +github.com/inconshreveable/mousetrap 76626ae9c91c4f2a10f34cad8ce83ea42c93bb75 +github.com/flynn-archive/go-shlex 3f9db97f856818214da2e1057f8ad84803971cff + +# metrics +github.com/docker/go-metrics 86138d05f285fd9737a99bee2d9be30866b59d72 + +# composefile +github.com/aanand/compose-file a3e58764f50597b6217fec07e9bff7225c4a1719 +github.com/mitchellh/mapstructure f3009df150dadf309fdee4a54ed65c124afad715 +github.com/xeipuuv/gojsonpointer e0fe6f68307607d540ed8eac07a342c33fa1b54a +github.com/xeipuuv/gojsonreference e02fc20de94c78484cd5ffb007f8af96be030a45 +github.com/xeipuuv/gojsonschema 93e72a773fade158921402d6a24c819b48aba29d +gopkg.in/yaml.v2 a83829b6f1293c91addabc89d0571c246397bbf4 From 812c6f3a0d66b30c5781df3c1c97fce8cf19b769 Mon Sep 17 00:00:00 2001 From: "yinzhuo.dingyz" Date: Sat, 31 Dec 2016 17:27:55 +0800 Subject: [PATCH 07/13] add extra attrs in log message --- daemon/logger/alilogs/alilogs.go | 107 ++++++++++++++----------------- daemon/logger/alilogs/logapi.go | 43 +++++-------- 2 files changed, 64 insertions(+), 86 deletions(-) diff --git a/daemon/logger/alilogs/alilogs.go b/daemon/logger/alilogs/alilogs.go index 4fde6a97cbec4..183804df7faa5 100644 --- a/daemon/logger/alilogs/alilogs.go +++ b/daemon/logger/alilogs/alilogs.go @@ -13,33 +13,51 @@ import ( "github.com/golang/protobuf/proto" ) +/* +Ali logging driver usage + docker run -d --name logger2 \ + --log-driver alilogs \ + --log-opt alilogs-endpoint=cn-hangzhou.log.aliyuncs.com \ + --log-opt alilogs-project=test_project \ + --log-opt alilogs-logstore=test-logstore \ + + // You can add these extra attributes to log message + --log-opt labels=attr1,attr2,attr3 \ + --label attr1=attr1 \ + --label attr2=attr2 \ + --label attr3=attr3 \ + + // You assign these environment variables for alilogs logging driver to work + // "securityToken" and "topic" are optinal + --log-opt env=accessKeyID,accessKeySecret,securityToken,topic \ + --env "accessKeyID=xxx" \ + --env "accessKeySecret=xxx" \ + --env "securityToken=xxx" \ + --env "topic=demo_topic" \ + log-producer +*/ + const ( name = "alilogs" endpointKey = "alilogs-endpoint" projectKey = "alilogs-project" logstoreKey = "alilogs-logstore" - - topicEnvKey = "topic" - serviceEnvKey = "serviceName" - functionEnvKey = "functionName" - requestIDEnvKey = "requestID" + envKey = "env" + labelsKey = "labels" accessKeyIDEnvKey = "accessKeyID" accessKeySecretEnvKey = "accessKeySecret" - sessionTokenEnvKey = "sessionToken" + securityTokenEnvKey = "securityToken" + topicEnvKey = "topic" + // PutLogs limit in Loghub, 3MB or 4096 records per put batchPublishFrequency = 5 * time.Second - - //PutLogs接口每次可以写入的日志数据量上限为3MB或者4096条 - maximumBytesPerPut = 3145728 - maximumLogsPerPut = 4096 + maximumBytesPerPut = 3145728 + maximumLogsPerPut = 4096 ) type logStream struct { topic string - serviceName string - functionName string - requestID string extraLogContents []*sls.LogContent client AliLogAPI messages chan *logger.Message @@ -65,69 +83,51 @@ func New(ctx logger.Context) (logger.Logger, error) { extraContents := []*sls.LogContent{} accessKeyID := "" accessKeySecret := "" - sessionToken := "" + securityToken := "" topicName := "" - serviceName := "" - functionName := "" - requestID := "" extra := ctx.ExtraAttributes(nil) value, ok := extra[accessKeyIDEnvKey] if ok { accessKeyID = value + delete(extra, accessKeyIDEnvKey) } else { return nil, fmt.Errorf("must specify a value for env '%s'", accessKeyIDEnvKey) } + value, ok = extra[accessKeySecretEnvKey] if ok { accessKeySecret = value + delete(extra, accessKeySecretEnvKey) } else { return nil, fmt.Errorf("must specify a value for env '%s'", accessKeySecretEnvKey) } - if value, ok = extra[sessionTokenEnvKey]; ok { - sessionToken = value + if value, ok = extra[securityTokenEnvKey]; ok { + securityToken = value + delete(extra, securityTokenEnvKey) } if value, ok = extra[topicEnvKey]; ok { topicName = value + delete(extra, topicEnvKey) } - // extra attributes in log record - if value, ok = extra[serviceEnvKey]; ok { - serviceName = value - serviceNameContent := &sls.LogContent{ - Key: proto.String(serviceEnvKey), - Value: proto.String(serviceName), - } - extraContents = append(extraContents, serviceNameContent) - } - if value, ok = extra[functionEnvKey]; ok { - functionName = value - functionNameContent := &sls.LogContent{ - Key: proto.String(functionEnvKey), - Value: proto.String(functionName), - } - extraContents = append(extraContents, functionNameContent) - } - if value, ok = extra[requestIDEnvKey]; ok { - requestID = value - requestIDContent := &sls.LogContent{ - Key: proto.String(requestIDEnvKey), - Value: proto.String(requestID), + // add extra contents to log record + for key, value := range extra { + logContent := &sls.LogContent{ + Key: proto.String(key), + Value: proto.String(value), } - extraContents = append(extraContents, requestIDContent) + extraContents = append(extraContents, logContent) } - aliLogClient, err := NewAliLogClient(endpoint, projectName, logstoreName, accessKeyID, accessKeySecret, sessionToken) + aliLogClient, err := NewAliLogClient(endpoint, projectName, logstoreName, accessKeyID, accessKeySecret, securityToken) if err != nil { return nil, err } containerStream := &logStream{ topic: topicName, - serviceName: serviceName, - functionName: functionName, - requestID: requestID, extraLogContents: extraContents, client: aliLogClient, messages: make(chan *logger.Message, maximumLogsPerPut), @@ -192,9 +192,6 @@ func (ls *logStream) collectLogs() { "endpoint": aliLogClient.Endpoint, "project": aliLogClient.ProjectName, "logstore": aliLogClient.LogstoreName, - "serviceName": ls.serviceName, - "functionName": ls.functionName, - "requestID": ls.requestID, "published log number": len(logGroup.Logs), "published log size": logGroup.Size(), }).Debug("publish log when timer timeout") @@ -207,9 +204,6 @@ func (ls *logStream) collectLogs() { "endpoint": aliLogClient.Endpoint, "project": aliLogClient.ProjectName, "logstore": aliLogClient.LogstoreName, - "serviceName": ls.serviceName, - "functionName": ls.functionName, - "reuestID": ls.requestID, "published log number": len(logGroup.Logs), "published log size": logGroup.Size(), }).Debug("publish log when no more logs") @@ -235,9 +229,6 @@ func (ls *logStream) collectLogs() { "endpoint": aliLogClient.Endpoint, "project": aliLogClient.ProjectName, "logstore": aliLogClient.LogstoreName, - "serviceName": ls.serviceName, - "functionName": ls.functionName, - "requestID": ls.requestID, "published log number": len(logGroup.Logs), "published log size": logGroup.Size(), }).Debug("publish logs when touch the limit") @@ -262,9 +253,6 @@ func (ls *logStream) publishLogs(lg *sls.LogGroup) { "endpoint": aliLogClient.Endpoint, "project": aliLogClient.ProjectName, "logstore": aliLogClient.LogstoreName, - "serviceName": ls.serviceName, - "functionName": ls.functionName, - "requestId": ls.requestID, }).Error("PutLogs occurs sls error") } else { logrus.Error(err) @@ -276,8 +264,7 @@ func (ls *logStream) publishLogs(lg *sls.LogGroup) { func ValidateLogOpt(cfg map[string]string) error { for key := range cfg { switch key { - case "env": - case endpointKey, projectKey, logstoreKey: + case endpointKey, projectKey, logstoreKey, labelsKey, envKey: default: return fmt.Errorf("unknown log opt '%s' for %s log driver", key, name) } diff --git a/daemon/logger/alilogs/logapi.go b/daemon/logger/alilogs/logapi.go index 7292d36975cf8..67bd2d2b8736f 100644 --- a/daemon/logger/alilogs/logapi.go +++ b/daemon/logger/alilogs/logapi.go @@ -16,57 +16,48 @@ type AliLogAPI interface { // AliLogClient implements AliLogAPI interface type AliLogClient struct { - Endpoint string - ProjectName string - LogstoreName string - accessKeyID string - accessKeySecret string - sessionToken string - project *sls.LogProject - logstore *sls.LogStore + Endpoint string + ProjectName string + LogstoreName string + logStore *sls.LogStore } // PutLogs implements ali PutLogs method func (client *AliLogClient) PutLogs(logGroup *sls.LogGroup) error { - return client.logstore.PutLogs(logGroup) + return client.logStore.PutLogs(logGroup) } // NewAliLogClient ... -func NewAliLogClient(serviceEndpoint, projectName, logstoreName, accessID, accessSecret, token string) (AliLogAPI, error) { +func NewAliLogClient(endpoint, projectName, logstoreName, accessKeyID, accessKeySecret, securityToken string) (AliLogAPI, error) { client := AliLogClient{} - client.Endpoint = serviceEndpoint + client.Endpoint = endpoint client.ProjectName = projectName client.LogstoreName = logstoreName - client.accessKeyID = accessID - client.accessKeySecret = accessSecret - client.sessionToken = token - logrus.WithFields(logrus.Fields{ - "endpoint": serviceEndpoint, - "projectName": projectName, - "logstoreName": logstoreName, - }).Info("Created alilogs client") - - logProject, err := sls.NewLogProject(projectName, serviceEndpoint, accessID, accessSecret) + logProject, err := sls.NewLogProject(projectName, endpoint, accessKeyID, accessKeySecret) if err != nil { logrus.WithFields(logrus.Fields{ "error": err, }).Error("Could not get ali log project") return nil, errors.New("Could not get ali log project") } - if client.sessionToken != "" { - logProject.WithToken(client.sessionToken) + if securityToken != "" { + logProject.WithToken(securityToken) } - client.project = logProject - logStore, err := client.project.GetLogStore(logstoreName) + client.logStore, err = logProject.GetLogStore(logstoreName) if err != nil { logrus.WithFields(logrus.Fields{ "error": err, }).Error("Could not get ali logstore") return nil, errors.New("Could not get ali logstore") } - client.logstore = logStore + + logrus.WithFields(logrus.Fields{ + "endpoint": endpoint, + "projectName": projectName, + "logstoreName": logstoreName, + }).Info("Created alilogs client") return &client, nil } From acccf0c66802991041f023e4e8447bc3d7eeda24 Mon Sep 17 00:00:00 2001 From: "yinzhuo.dingyz" Date: Sat, 31 Dec 2016 17:32:19 +0800 Subject: [PATCH 08/13] refine comments --- daemon/logger/alilogs/alilogs.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/daemon/logger/alilogs/alilogs.go b/daemon/logger/alilogs/alilogs.go index 183804df7faa5..385b70f193ab0 100644 --- a/daemon/logger/alilogs/alilogs.go +++ b/daemon/logger/alilogs/alilogs.go @@ -15,7 +15,7 @@ import ( /* Ali logging driver usage - docker run -d --name logger2 \ + docker run -d --name test-logger \ --log-driver alilogs \ --log-opt alilogs-endpoint=cn-hangzhou.log.aliyuncs.com \ --log-opt alilogs-project=test_project \ @@ -23,9 +23,9 @@ Ali logging driver usage // You can add these extra attributes to log message --log-opt labels=attr1,attr2,attr3 \ - --label attr1=attr1 \ - --label attr2=attr2 \ - --label attr3=attr3 \ + --label attr1=attr1Value \ + --label attr2=attr2Value \ + --label attr3=attr3Value \ // You assign these environment variables for alilogs logging driver to work // "securityToken" and "topic" are optinal From 31ab9ec109705226ef1110160d476d95d26a4d74 Mon Sep 17 00:00:00 2001 From: "yinzhuo.dingyz" Date: Wed, 4 Jan 2017 11:26:44 +0800 Subject: [PATCH 09/13] add ut --- daemon/logger/alilogs/alilogs.go | 185 +++++++------ daemon/logger/alilogs/alilogs_test.go | 316 ++++++++++++++++++++++ daemon/logger/alilogs/logapi.go | 61 +++-- daemon/logger/alilogs/mock_client_test.go | 45 +++ 4 files changed, 501 insertions(+), 106 deletions(-) create mode 100644 daemon/logger/alilogs/alilogs_test.go create mode 100644 daemon/logger/alilogs/mock_client_test.go diff --git a/daemon/logger/alilogs/alilogs.go b/daemon/logger/alilogs/alilogs.go index 385b70f193ab0..0d533a7dac2ca 100644 --- a/daemon/logger/alilogs/alilogs.go +++ b/daemon/logger/alilogs/alilogs.go @@ -57,14 +57,26 @@ const ( ) type logStream struct { + endpoint string + projectName string + logstoreName string topic string extraLogContents []*sls.LogContent client AliLogAPI + logGroup *sls.LogGroup messages chan *logger.Message lock sync.RWMutex closed bool } +type contextParams struct { + accessKeyID string + accessKeySecret string + securityToken string + topicName string + extraContents []*sls.LogContent +} + // init registers the alilogs driver func init() { if err := logger.RegisterLogDriver(name, New); err != nil { @@ -80,57 +92,27 @@ func New(ctx logger.Context) (logger.Logger, error) { endpoint := ctx.Config[endpointKey] projectName := ctx.Config[projectKey] logstoreName := ctx.Config[logstoreKey] - extraContents := []*sls.LogContent{} - accessKeyID := "" - accessKeySecret := "" - securityToken := "" - topicName := "" - - extra := ctx.ExtraAttributes(nil) - value, ok := extra[accessKeyIDEnvKey] - if ok { - accessKeyID = value - delete(extra, accessKeyIDEnvKey) - } else { - return nil, fmt.Errorf("must specify a value for env '%s'", accessKeyIDEnvKey) - } - - value, ok = extra[accessKeySecretEnvKey] - if ok { - accessKeySecret = value - delete(extra, accessKeySecretEnvKey) - } else { - return nil, fmt.Errorf("must specify a value for env '%s'", accessKeySecretEnvKey) - } - if value, ok = extra[securityTokenEnvKey]; ok { - securityToken = value - delete(extra, securityTokenEnvKey) - } - - if value, ok = extra[topicEnvKey]; ok { - topicName = value - delete(extra, topicEnvKey) - } - - // add extra contents to log record - for key, value := range extra { - logContent := &sls.LogContent{ - Key: proto.String(key), - Value: proto.String(value), - } - extraContents = append(extraContents, logContent) + contextInput, err := parseContext(&ctx) + if err != nil { + return nil, err } - - aliLogClient, err := NewAliLogClient(endpoint, projectName, logstoreName, accessKeyID, accessKeySecret, securityToken) + aliLogClient, err := NewAliLogClient(endpoint, projectName, logstoreName, contextInput.accessKeyID, contextInput.accessKeySecret, contextInput.securityToken) if err != nil { return nil, err } containerStream := &logStream{ - topic: topicName, - extraLogContents: extraContents, + endpoint: endpoint, + projectName: projectName, + logstoreName: logstoreName, + topic: contextInput.topicName, + extraLogContents: contextInput.extraContents, client: aliLogClient, - messages: make(chan *logger.Message, maximumLogsPerPut), + logGroup: &sls.LogGroup{ + Topic: proto.String(contextInput.topicName), + Logs: []*sls.Log{}, + }, + messages: make(chan *logger.Message, maximumLogsPerPut), } go containerStream.collectLogs() @@ -170,7 +152,7 @@ var newTicker = func(freq time.Duration) *time.Ticker { return time.NewTicker(freq) } -// PutLogs executes as a goroutine to perform put logs for +// collectLogs executes as a goroutine to perform put logs for // submission to the logstore. Batching is performed on time- and size- // bases. Time-based batching occurs at a 5 second interval (defined in the // batchPublishFrequency const). Size-based batching is performed on the @@ -178,34 +160,31 @@ var newTicker = func(freq time.Duration) *time.Ticker { // the maximum number of total bytes in a batch (defined in // maximumBytesPerPut). func (ls *logStream) collectLogs() { - aliLogClient := ls.client.(*AliLogClient) - logGroup := sls.LogGroup{ - Topic: proto.String(ls.topic), - Logs: []*sls.Log{}, - } timer := newTicker(batchPublishFrequency) for { select { case <-timer.C: - ls.publishLogs(&logGroup) + ls.publishLogs() logrus.WithFields(logrus.Fields{ - "endpoint": aliLogClient.Endpoint, - "project": aliLogClient.ProjectName, - "logstore": aliLogClient.LogstoreName, - "published log number": len(logGroup.Logs), - "published log size": logGroup.Size(), + "time trigger": "send data", + "endpoint": ls.endpoint, + "project": ls.projectName, + "logstore": ls.logstoreName, + "published log number": len(ls.logGroup.Logs), + "published log size": ls.logGroup.Size(), }).Debug("publish log when timer timeout") - logGroup.Reset() - logGroup.Topic = proto.String(ls.topic) + ls.logGroup.Reset() + ls.logGroup.Topic = proto.String(ls.topic) case msg, more := <-ls.messages: if !more { - ls.publishLogs(&logGroup) + ls.publishLogs() logrus.WithFields(logrus.Fields{ - "endpoint": aliLogClient.Endpoint, - "project": aliLogClient.ProjectName, - "logstore": aliLogClient.LogstoreName, - "published log number": len(logGroup.Logs), - "published log size": logGroup.Size(), + "no more log": "send data", + "endpoint": ls.endpoint, + "project": ls.projectName, + "logstore": ls.logstoreName, + "published log number": len(ls.logGroup.Logs), + "published log size": ls.logGroup.Size(), }).Debug("publish log when no more logs") return } @@ -221,38 +200,38 @@ func (ls *logStream) collectLogs() { Contents: contents, } if len(unprocessedLine) > 0 { - if (len(logGroup.Logs) >= maximumLogsPerPut) || (logGroup.Size()+logRecord.Size() > maximumBytesPerPut) { + if (len(ls.logGroup.Logs) >= maximumLogsPerPut) || (ls.logGroup.Size()+logRecord.Size() > maximumBytesPerPut) { // Publish an existing batch if it's already over the maximum number of logs or if adding this // line would push it over the maximum number of total bytes. - ls.publishLogs(&logGroup) + ls.publishLogs() logrus.WithFields(logrus.Fields{ - "endpoint": aliLogClient.Endpoint, - "project": aliLogClient.ProjectName, - "logstore": aliLogClient.LogstoreName, - "published log number": len(logGroup.Logs), - "published log size": logGroup.Size(), + "get limit": "send data", + "endpoint": ls.endpoint, + "project": ls.projectName, + "logstore": ls.logstoreName, + "published log number": len(ls.logGroup.Logs), + "published log size": ls.logGroup.Size(), }).Debug("publish logs when touch the limit") - logGroup.Reset() - logGroup.Topic = proto.String(ls.topic) + ls.logGroup.Reset() + ls.logGroup.Topic = proto.String(ls.topic) } - logGroup.Logs = append(logGroup.Logs, &logRecord) + ls.logGroup.Logs = append(ls.logGroup.Logs, &logRecord) } } } } // publishLogs calls PutLogs for a given LogGroup -func (ls *logStream) publishLogs(lg *sls.LogGroup) { - err := ls.client.PutLogs(lg) +func (ls *logStream) publishLogs() { + err := ls.client.PutLogs(ls.logGroup) if err != nil { if serviceErr, ok := err.(sls.Error); ok { - aliLogClient := ls.client.(*AliLogClient) logrus.WithFields(logrus.Fields{ "errorCode": serviceErr.Code, "errorMessage": serviceErr.Message, - "endpoint": aliLogClient.Endpoint, - "project": aliLogClient.ProjectName, - "logstore": aliLogClient.LogstoreName, + "endpoint": ls.endpoint, + "project": ls.projectName, + "logstore": ls.logstoreName, }).Error("PutLogs occurs sls error") } else { logrus.Error(err) @@ -280,3 +259,49 @@ func ValidateLogOpt(cfg map[string]string) error { } return nil } + +func parseContext(ctx *logger.Context) (*contextParams, error) { + input := &contextParams{ + accessKeyID: "", + accessKeySecret: "", + securityToken: "", + topicName: "", + extraContents: []*sls.LogContent{}, + } + extra := ctx.ExtraAttributes(nil) + value, ok := extra[accessKeyIDEnvKey] + if ok { + input.accessKeyID = value + delete(extra, accessKeyIDEnvKey) + } else { + return nil, fmt.Errorf("must specify a value for env '%s'", accessKeyIDEnvKey) + } + + value, ok = extra[accessKeySecretEnvKey] + if ok { + input.accessKeySecret = value + delete(extra, accessKeySecretEnvKey) + } else { + return nil, fmt.Errorf("must specify a value for env '%s'", accessKeySecretEnvKey) + } + + if value, ok = extra[securityTokenEnvKey]; ok { + input.securityToken = value + delete(extra, securityTokenEnvKey) + } + + if value, ok = extra[topicEnvKey]; ok { + input.topicName = value + delete(extra, topicEnvKey) + } + + // add extra contents to log record + for key, value := range extra { + logContent := &sls.LogContent{ + Key: proto.String(key), + Value: proto.String(value), + } + input.extraContents = append(input.extraContents, logContent) + } + return input, nil +} diff --git a/daemon/logger/alilogs/alilogs_test.go b/daemon/logger/alilogs/alilogs_test.go new file mode 100644 index 0000000000000..373b217c743df --- /dev/null +++ b/daemon/logger/alilogs/alilogs_test.go @@ -0,0 +1,316 @@ +package alilogs + +import ( + "testing" + "time" + + "sync" + + "github.com/docker/docker/daemon/logger" + sls "github.com/galaxydi/go-loghub" + "github.com/gogo/protobuf/proto" +) + +func TestCollectLogsNumberLimit(t *testing.T) { + extraContents := []*sls.LogContent{} + mockClient := NewMockClient() + mockClient.ErrType = 0 + + stream := &logStream{ + endpoint: "test-endpoint", + projectName: "test-project", + logstoreName: "test-logstore", + topic: "demo_topic", + extraLogContents: extraContents, + client: mockClient, + logGroup: &sls.LogGroup{ + Topic: proto.String("demo_topic"), + Logs: []*sls.Log{}, + }, + messages: make(chan *logger.Message, maximumLogsPerPut), + } + + go stream.collectLogs() + + var wg sync.WaitGroup + wg.Add(maximumLogsPerPut + 102) + + for i := 0; i < maximumLogsPerPut+102; i++ { + go worker(stream, &wg) + } + wg.Wait() + time.Sleep(batchPublishFrequency) + stream.Close() + if mockClient.Topic != "demo_topic" { + t.Errorf("check topic fail, expect:%s, actual:%s", stream.topic, mockClient.Topic) + } + if len(mockClient.Logs) != maximumLogsPerPut+102 { + t.Errorf("check log number fail, expect:%v, actual:%v", maximumLogsPerPut+102, len(mockClient.Logs)) + } + +} + +func worker(stream *logStream, wg *sync.WaitGroup) { + stream.Log(&logger.Message{ + Line: []byte("test log"), + Timestamp: time.Time{}, + }) + wg.Done() +} + +func TestValidateOpt(t *testing.T) { + // endpointKey, projectKey, logstoreKey, labelsKey, envKey + opt := map[string]string{} + opt[endpointKey] = "" + err := ValidateLogOpt(opt) + if err == nil { + t.Errorf("check log opt fail: %v", opt) + } + + opt[endpointKey] = "test-endpoint" + opt[projectKey] = "" + err = ValidateLogOpt(opt) + if err == nil { + t.Errorf("check log opt fail: %v", opt) + } + + opt[projectKey] = "test-project" + opt[logstoreKey] = "" + err = ValidateLogOpt(opt) + if err == nil { + t.Errorf("check log opt fail: %v", opt) + } + + opt[logstoreKey] = "test-logstore" + opt[labelsKey] = "attr1,attr2" + opt[envKey] = "e1=v1,e2=v2" + err = ValidateLogOpt(opt) + if err != nil { + t.Errorf("check log opt fail: %v", opt) + } + + opt["error-key"] = "unsupported" + err = ValidateLogOpt(opt) + if err == nil { + t.Errorf("check log opt fail: %v", opt) + } +} + +func TestCollectLogsSimple(t *testing.T) { + ec1 := &sls.LogContent{ + Key: proto.String("ex1"), + Value: proto.String("ex1 value"), + } + ec2 := &sls.LogContent{ + Key: proto.String("ex2"), + Value: proto.String("ex2 value"), + } + extraContents := []*sls.LogContent{ec1, ec2} + mockClient := NewMockClient() + mockClient.ErrType = 0 + stream := &logStream{ + endpoint: "test-endpoint", + projectName: "test-project", + logstoreName: "test-logstore", + topic: "demo_topic", + extraLogContents: extraContents, + client: mockClient, + logGroup: &sls.LogGroup{ + Topic: proto.String("demo_topic"), + Logs: []*sls.Log{}, + }, + messages: make(chan *logger.Message, maximumLogsPerPut), + } + + go stream.collectLogs() + stream.Log(&logger.Message{ + Line: []byte("this is test log 1"), + Timestamp: time.Time{}, + }) + stream.Log(&logger.Message{ + Line: []byte("this is test log 2"), + Timestamp: time.Time{}, + }) + stream.Log(&logger.Message{ + Line: []byte("this is test log 3"), + Timestamp: time.Time{}, + }) + time.Sleep(batchPublishFrequency) + stream.Close() + if len(mockClient.Logs) != 3 { + t.Errorf("should be 3 number logs, actual log numbers: %v", len(mockClient.Logs)) + } +} + +func TestPublishLogs(t *testing.T) { + ec1 := &sls.LogContent{ + Key: proto.String("ex1"), + Value: proto.String("ex1 value"), + } + ec2 := &sls.LogContent{ + Key: proto.String("ex2"), + Value: proto.String("ex2 value"), + } + extraContents := []*sls.LogContent{ec1, ec2} + mockClient := NewMockClient() + stream := &logStream{ + endpoint: "test-endpoint", + projectName: "test-project", + logstoreName: "test-logstore", + topic: "demo_topic", + extraLogContents: extraContents, + client: mockClient, + logGroup: &sls.LogGroup{ + Topic: proto.String("demo_topic"), + Logs: []*sls.Log{}, + }, + messages: make(chan *logger.Message, maximumLogsPerPut), + } + + logMsg := &sls.LogContent{ + Key: proto.String("message"), + Value: proto.String(string("this is a log")), + } + contents := stream.extraLogContents + contents = append(contents, logMsg) + logRecord := sls.Log{ + Time: proto.Uint32(uint32(time.Now().Unix())), + Contents: contents, + } + stream.logGroup.Logs = append(stream.logGroup.Logs, &logRecord) + mockClient.ErrType = 0 + stream.publishLogs() + + mockClient.ErrType = 1 + stream.publishLogs() + + mockClient.ErrType = 2 + stream.publishLogs() +} + +func TestNewContainerStream(t *testing.T) { + extraContents := []*sls.LogContent{} + containerStream := &logStream{ + topic: "demo_topic", + extraLogContents: extraContents, + client: NewMockClient(), + messages: make(chan *logger.Message, maximumLogsPerPut), + } + if containerStream == nil { + t.Errorf("failed to new containerStream\n") + } + if containerStream.Name() != "alilogs" { + t.Errorf("error logger name: %s", containerStream.Name()) + } + + containerStream.Log(&logger.Message{ + Line: []byte("this is one log"), + Timestamp: time.Time{}, + }) + msg := containerStream.messages + if msg == nil { + t.Errorf("stream should has one log") + } + err := containerStream.Close() + if err != nil { + t.Errorf("stream should be close successful, err: %v", err) + } + if containerStream.closed != true { + t.Errorf("stream should be closed, close flag: %v", containerStream.closed) + } +} + +func TestParseContext(t *testing.T) { + envSlice := []string{"accessKeyID=mock_id", "accessKeySecret=mock_key", "securityToken=mock_token", "topic=mock_topic"} + labelMap := map[string]string{} + labelMap["a1"] = "v1" + labelMap["a2"] = "v2" + labelMap["a3"] = "v3" + + ctx := logger.Context{ + Config: map[string]string{ + endpointKey: "log.cn-hangzhou.aliyuncs.com", + projectKey: "test-project", + logstoreKey: "test-logstore", + "env": "accessKeyID,accessKeySecret,securityToken,topic", + "labels": "a1,a2,a3", + }, + ContainerEnv: envSlice, + ContainerLabels: labelMap, + } + input, err := parseContext(&ctx) + if err != nil { + t.Errorf("failed to parse context") + } + if input.accessKeyID != "mock_id" { + t.Errorf("parse accessKeyID fail:%s", input.accessKeyID) + } + if input.accessKeySecret != "mock_key" { + t.Errorf("parse accessKeySecret fail:%s", input.accessKeySecret) + } + if input.topicName != "mock_topic" { + t.Errorf("parse topic fail:%s", input.topicName) + } + if len(input.extraContents) != 3 { + t.Errorf("parse extraContents fail:%v", input.extraContents) + } +} + +func TestParseContextError(t *testing.T) { + envSlice := []string{"accessKeySecret=mock_key", "securityToken=mock_token", "topic=mock_topic"} + labelMap := map[string]string{} + labelMap["a1"] = "v1" + + ctx := logger.Context{ + Config: map[string]string{ + endpointKey: "log.cn-hangzhou.aliyuncs.com", + projectKey: "test-project", + logstoreKey: "test-logstore", + "env": "accessKeyID,accessKeySecret,securityToken,topic", + "labels": "a1,a2,a3", + }, + ContainerEnv: envSlice, + ContainerLabels: labelMap, + } + _, err := parseContext(&ctx) + if err == nil { + t.Errorf("invalid accessKeyID") + } + + envSlice = []string{"accessKeyID=mock_id", "securityToken=mock_token", "topic=mock_topic"} + ctx = logger.Context{ + Config: map[string]string{ + endpointKey: "log.cn-hangzhou.aliyuncs.com", + projectKey: "test-project", + logstoreKey: "test-logstore", + "env": "accessKeyID,accessKeySecret,securityToken,topic", + "labels": "a1,a2,a3", + }, + ContainerEnv: envSlice, + ContainerLabels: labelMap, + } + _, err = parseContext(&ctx) + if err == nil { + t.Errorf("invalid accessKeySecret") + } + + envSlice = []string{"accessKeyID=mock_id", "accessKeySecret=mock_key"} + ctx = logger.Context{ + Config: map[string]string{ + endpointKey: "log.cn-hangzhou.aliyuncs.com", + projectKey: "test-project", + logstoreKey: "test-logstore", + "env": "accessKeyID,accessKeySecret,securityToken,topic", + "labels": "a1,a2,a3", + }, + ContainerEnv: envSlice, + ContainerLabels: labelMap, + } + input, _ := parseContext(&ctx) + if input.securityToken != "" { + t.Errorf("token should be empty") + } + if input.topicName != "" { + t.Errorf("topic should be empty") + } +} diff --git a/daemon/logger/alilogs/logapi.go b/daemon/logger/alilogs/logapi.go index 67bd2d2b8736f..78c9a2cf4934d 100644 --- a/daemon/logger/alilogs/logapi.go +++ b/daemon/logger/alilogs/logapi.go @@ -16,10 +16,25 @@ type AliLogAPI interface { // AliLogClient implements AliLogAPI interface type AliLogClient struct { - Endpoint string - ProjectName string - LogstoreName string - logStore *sls.LogStore + logStore *sls.LogStore +} + +// NewAliLogClient ... +func NewAliLogClient(endpoint, projectName, logstoreName, accessKeyID, accessKeySecret, securityToken string) (AliLogAPI, error) { + client := AliLogClient{} + logStore, err := client.getLogStore(endpoint, projectName, logstoreName, accessKeyID, accessKeySecret, securityToken) + if err != nil { + return nil, err + } + client.logStore = logStore + + logrus.WithFields(logrus.Fields{ + "endpoint": endpoint, + "projectName": projectName, + "logstoreName": logstoreName, + }).Info("Created alilogs client") + + return &client, nil } // PutLogs implements ali PutLogs method @@ -27,13 +42,22 @@ func (client *AliLogClient) PutLogs(logGroup *sls.LogGroup) error { return client.logStore.PutLogs(logGroup) } -// NewAliLogClient ... -func NewAliLogClient(endpoint, projectName, logstoreName, accessKeyID, accessKeySecret, securityToken string) (AliLogAPI, error) { - client := AliLogClient{} - client.Endpoint = endpoint - client.ProjectName = projectName - client.LogstoreName = logstoreName +func (client *AliLogClient) getLogStore(endpoint, projectName, logstoreName, accessKeyID, accessKeySecret, securityToken string) (*sls.LogStore, error) { + logProject, err := client.getLogProject(projectName, endpoint, accessKeyID, accessKeySecret, securityToken) + if err != nil { + return nil, err + } + logStore, err := logProject.GetLogStore(logstoreName) + if err != nil { + logrus.WithFields(logrus.Fields{ + "error": err, + }).Error("Could not get ali logstore") + return nil, errors.New("Could not get ali logstore") + } + return logStore, nil +} +func (client *AliLogClient) getLogProject(projectName, endpoint, accessKeyID, accessKeySecret, securityToken string) (*sls.LogProject, error) { logProject, err := sls.NewLogProject(projectName, endpoint, accessKeyID, accessKeySecret) if err != nil { logrus.WithFields(logrus.Fields{ @@ -44,20 +68,5 @@ func NewAliLogClient(endpoint, projectName, logstoreName, accessKeyID, accessKey if securityToken != "" { logProject.WithToken(securityToken) } - - client.logStore, err = logProject.GetLogStore(logstoreName) - if err != nil { - logrus.WithFields(logrus.Fields{ - "error": err, - }).Error("Could not get ali logstore") - return nil, errors.New("Could not get ali logstore") - } - - logrus.WithFields(logrus.Fields{ - "endpoint": endpoint, - "projectName": projectName, - "logstoreName": logstoreName, - }).Info("Created alilogs client") - - return &client, nil + return logProject, nil } diff --git a/daemon/logger/alilogs/mock_client_test.go b/daemon/logger/alilogs/mock_client_test.go new file mode 100644 index 0000000000000..83a76050f9bad --- /dev/null +++ b/daemon/logger/alilogs/mock_client_test.go @@ -0,0 +1,45 @@ +package alilogs + +import ( + "fmt" + + sls "github.com/galaxydi/go-loghub" +) + +// MockClient is an autogenerated mock type for the AliLogAPI type +type MockClient struct { + ErrType int + Topic string + Logs []*sls.Log +} + +func NewMockClient() *MockClient { + client := &MockClient{ + ErrType: 0, + Topic: "", + Logs: []*sls.Log{}, + } + return client +} + +// PutLogs provides a mock function with given fields: _a0 +func (client *MockClient) PutLogs(lg *sls.LogGroup) error { + switch client.ErrType { + case 0: + // no error + client.Topic = lg.GetTopic() + for _, v := range lg.GetLogs() { + client.Logs = append(client.Logs, v) + } + return nil + case 1: + slsErr := &sls.Error{ + Code: "InternalServerError", + Message: "loghub service is not avaliable", + } + return slsErr + default: + err := fmt.Errorf("unknown error") + return err + } +} From d3ffda442c143d38d4bfaa06dea64463319d0412 Mon Sep 17 00:00:00 2001 From: "yinzhuo.dingyz" Date: Fri, 6 Jan 2017 15:10:22 +0800 Subject: [PATCH 10/13] fix comments --- daemon/logger/alilogs/alilogs.go | 53 +++++++++++------------ daemon/logger/alilogs/alilogs_test.go | 27 ++++++++---- daemon/logger/alilogs/mock_client_test.go | 26 ++++++----- 3 files changed, 61 insertions(+), 45 deletions(-) diff --git a/daemon/logger/alilogs/alilogs.go b/daemon/logger/alilogs/alilogs.go index 0d533a7dac2ca..eec7ddeb82be3 100644 --- a/daemon/logger/alilogs/alilogs.go +++ b/daemon/logger/alilogs/alilogs.go @@ -160,32 +160,32 @@ var newTicker = func(freq time.Duration) *time.Ticker { // the maximum number of total bytes in a batch (defined in // maximumBytesPerPut). func (ls *logStream) collectLogs() { + le := logrus.WithFields(logrus.Fields{ + "endpoint": ls.endpoint, + "project": ls.projectName, + "logstore": ls.logstoreName, + }) + timer := newTicker(batchPublishFrequency) for { select { case <-timer.C: ls.publishLogs() - logrus.WithFields(logrus.Fields{ - "time trigger": "send data", - "endpoint": ls.endpoint, - "project": ls.projectName, - "logstore": ls.logstoreName, - "published log number": len(ls.logGroup.Logs), - "published log size": ls.logGroup.Size(), - }).Debug("publish log when timer timeout") + le.WithFields(logrus.Fields{ + "trigger": "time", + "count": len(ls.logGroup.Logs), + "size": ls.logGroup.Size(), + }).Debug("") ls.logGroup.Reset() ls.logGroup.Topic = proto.String(ls.topic) case msg, more := <-ls.messages: if !more { ls.publishLogs() logrus.WithFields(logrus.Fields{ - "no more log": "send data", - "endpoint": ls.endpoint, - "project": ls.projectName, - "logstore": ls.logstoreName, - "published log number": len(ls.logGroup.Logs), - "published log size": ls.logGroup.Size(), - }).Debug("publish log when no more logs") + "trigger": "EOF", + "count": len(ls.logGroup.Logs), + "size": ls.logGroup.Size(), + }).Debug("") return } unprocessedLine := msg.Line @@ -205,13 +205,10 @@ func (ls *logStream) collectLogs() { // line would push it over the maximum number of total bytes. ls.publishLogs() logrus.WithFields(logrus.Fields{ - "get limit": "send data", - "endpoint": ls.endpoint, - "project": ls.projectName, - "logstore": ls.logstoreName, - "published log number": len(ls.logGroup.Logs), - "published log size": ls.logGroup.Size(), - }).Debug("publish logs when touch the limit") + "trigger": "size", + "count": len(ls.logGroup.Logs), + "size": ls.logGroup.Size(), + }).Debug("") ls.logGroup.Reset() ls.logGroup.Topic = proto.String(ls.topic) } @@ -225,16 +222,18 @@ func (ls *logStream) collectLogs() { func (ls *logStream) publishLogs() { err := ls.client.PutLogs(ls.logGroup) if err != nil { + le := logrus.WithFields(logrus.Fields{ + "endpoint": ls.endpoint, + "project": ls.projectName, + "logstore": ls.logstoreName, + }) if serviceErr, ok := err.(sls.Error); ok { - logrus.WithFields(logrus.Fields{ + le.WithFields(logrus.Fields{ "errorCode": serviceErr.Code, "errorMessage": serviceErr.Message, - "endpoint": ls.endpoint, - "project": ls.projectName, - "logstore": ls.logstoreName, }).Error("PutLogs occurs sls error") } else { - logrus.Error(err) + le.Error("PutLogs occurs err:", err) } } } diff --git a/daemon/logger/alilogs/alilogs_test.go b/daemon/logger/alilogs/alilogs_test.go index 373b217c743df..29c0fc7aaf433 100644 --- a/daemon/logger/alilogs/alilogs_test.go +++ b/daemon/logger/alilogs/alilogs_test.go @@ -1,11 +1,10 @@ package alilogs import ( + "sync" "testing" "time" - "sync" - "github.com/docker/docker/daemon/logger" sls "github.com/galaxydi/go-loghub" "github.com/gogo/protobuf/proto" @@ -14,7 +13,7 @@ import ( func TestCollectLogsNumberLimit(t *testing.T) { extraContents := []*sls.LogContent{} mockClient := NewMockClient() - mockClient.ErrType = 0 + mockClient.ErrType = NoError stream := &logStream{ endpoint: "test-endpoint", @@ -107,7 +106,6 @@ func TestCollectLogsSimple(t *testing.T) { } extraContents := []*sls.LogContent{ec1, ec2} mockClient := NewMockClient() - mockClient.ErrType = 0 stream := &logStream{ endpoint: "test-endpoint", projectName: "test-project", @@ -122,7 +120,15 @@ func TestCollectLogsSimple(t *testing.T) { messages: make(chan *logger.Message, maximumLogsPerPut), } + ticks := make(chan time.Time) + newTicker = func(_ time.Duration) *time.Ticker { + return &time.Ticker{ + C: ticks, + } + } + go stream.collectLogs() + stream.Log(&logger.Message{ Line: []byte("this is test log 1"), Timestamp: time.Time{}, @@ -135,8 +141,13 @@ func TestCollectLogsSimple(t *testing.T) { Line: []byte("this is test log 3"), Timestamp: time.Time{}, }) - time.Sleep(batchPublishFrequency) + + ticks <- time.Time{} stream.Close() + + // Wait a moment for the logs were writted into mockClient + time.Sleep(1 * time.Second) + if len(mockClient.Logs) != 3 { t.Errorf("should be 3 number logs, actual log numbers: %v", len(mockClient.Logs)) } @@ -178,13 +189,13 @@ func TestPublishLogs(t *testing.T) { Contents: contents, } stream.logGroup.Logs = append(stream.logGroup.Logs, &logRecord) - mockClient.ErrType = 0 + mockClient.ErrType = NoError stream.publishLogs() - mockClient.ErrType = 1 + mockClient.ErrType = InternalServerError stream.publishLogs() - mockClient.ErrType = 2 + mockClient.ErrType = UnknownError stream.publishLogs() } diff --git a/daemon/logger/alilogs/mock_client_test.go b/daemon/logger/alilogs/mock_client_test.go index 83a76050f9bad..e80bc7036da6c 100644 --- a/daemon/logger/alilogs/mock_client_test.go +++ b/daemon/logger/alilogs/mock_client_test.go @@ -6,16 +6,24 @@ import ( sls "github.com/galaxydi/go-loghub" ) +type errorType int + +const ( + NoError errorType = iota + InternalServerError + UnknownError +) + // MockClient is an autogenerated mock type for the AliLogAPI type type MockClient struct { - ErrType int + ErrType errorType Topic string Logs []*sls.Log } func NewMockClient() *MockClient { client := &MockClient{ - ErrType: 0, + ErrType: NoError, Topic: "", Logs: []*sls.Log{}, } @@ -25,21 +33,19 @@ func NewMockClient() *MockClient { // PutLogs provides a mock function with given fields: _a0 func (client *MockClient) PutLogs(lg *sls.LogGroup) error { switch client.ErrType { - case 0: - // no error + case NoError: client.Topic = lg.GetTopic() - for _, v := range lg.GetLogs() { - client.Logs = append(client.Logs, v) - } + client.Logs = append(client.Logs, lg.GetLogs()...) return nil - case 1: + case InternalServerError: slsErr := &sls.Error{ Code: "InternalServerError", Message: "loghub service is not avaliable", } return slsErr + case UnknownError: + return fmt.Errorf("Unknown error") default: - err := fmt.Errorf("unknown error") - return err + return fmt.Errorf("Unknown error") } } From 3907d6ba05cd90d0aac0d7d08c401f8bf14f82b6 Mon Sep 17 00:00:00 2001 From: "yinzhuo.dingyz" Date: Sat, 4 Mar 2017 16:08:11 +0800 Subject: [PATCH 11/13] refine error message --- daemon/logger/alilogs/alilogs.go | 1 - daemon/logger/alilogs/logapi.go | 11 +++++++---- vendor.conf | 2 +- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/daemon/logger/alilogs/alilogs.go b/daemon/logger/alilogs/alilogs.go index eec7ddeb82be3..42f7628c44455 100644 --- a/daemon/logger/alilogs/alilogs.go +++ b/daemon/logger/alilogs/alilogs.go @@ -1,5 +1,4 @@ // Package alilogs provides the logdriver for forwarding container logs to Ali Log Service - package alilogs import ( diff --git a/daemon/logger/alilogs/logapi.go b/daemon/logger/alilogs/logapi.go index 78c9a2cf4934d..2f8d5bfe9f1fa 100644 --- a/daemon/logger/alilogs/logapi.go +++ b/daemon/logger/alilogs/logapi.go @@ -4,6 +4,7 @@ package alilogs import ( "errors" + "fmt" "github.com/Sirupsen/logrus" "github.com/galaxydi/go-loghub" @@ -49,10 +50,11 @@ func (client *AliLogClient) getLogStore(endpoint, projectName, logstoreName, acc } logStore, err := logProject.GetLogStore(logstoreName) if err != nil { + errorMsg := fmt.Sprintf("get logstore fail due to '%s'", err.Error()) logrus.WithFields(logrus.Fields{ "error": err, - }).Error("Could not get ali logstore") - return nil, errors.New("Could not get ali logstore") + }).Error(errorMsg) + return nil, errors.New(errorMsg) } return logStore, nil } @@ -60,10 +62,11 @@ func (client *AliLogClient) getLogStore(endpoint, projectName, logstoreName, acc func (client *AliLogClient) getLogProject(projectName, endpoint, accessKeyID, accessKeySecret, securityToken string) (*sls.LogProject, error) { logProject, err := sls.NewLogProject(projectName, endpoint, accessKeyID, accessKeySecret) if err != nil { + errorMsg := fmt.Sprintf("get project fail due to '%s'", err.Error()) logrus.WithFields(logrus.Fields{ "error": err, - }).Error("Could not get ali log project") - return nil, errors.New("Could not get ali log project") + }).Error(errorMsg) + return nil, errors.New(errorMsg) } if securityToken != "" { logProject.WithToken(securityToken) diff --git a/vendor.conf b/vendor.conf index e50aa273bd445..dd1ec84a539d3 100644 --- a/vendor.conf +++ b/vendor.conf @@ -80,7 +80,7 @@ github.com/tinylib/msgp 75ee40d2601edf122ef667e2a07d600d4c44490c github.com/fsnotify/fsnotify v1.2.11 # alilogs deps -github.com/galaxydi/go-loghub d52b6d91786547a03aadec3eabb867189fd96df3 +github.com/galaxydi/go-loghub 174412b0a261627ee66637bdfcc9a17dc951b800 github.com/cloudflare/golz4 ef862a3cdc58a6f1fee4e3af3d44fbe279194cde github.com/golang/glog 23def4e6c14b4da8ac2ed8007337bc5eb5007998 From 158096740932842202b219d40407dd6f9a204fab Mon Sep 17 00:00:00 2001 From: "yinzhuo.dingyz" Date: Mon, 17 Apr 2017 16:25:16 +0800 Subject: [PATCH 12/13] fix error message --- daemon/logger/alilogs/alilogs.go | 4 ++-- daemon/logger/alilogs/logapi.go | 3 ++- daemon/logger/logger.go | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/daemon/logger/alilogs/alilogs.go b/daemon/logger/alilogs/alilogs.go index 42f7628c44455..b31e5e0bac4db 100644 --- a/daemon/logger/alilogs/alilogs.go +++ b/daemon/logger/alilogs/alilogs.go @@ -7,7 +7,7 @@ import ( "time" "github.com/Sirupsen/logrus" - "github.com/docker/docker/daemon/logger" + "github.com/galaxydi/docker/daemon/logger" "github.com/galaxydi/go-loghub" "github.com/golang/protobuf/proto" ) @@ -27,7 +27,7 @@ Ali logging driver usage --label attr3=attr3Value \ // You assign these environment variables for alilogs logging driver to work - // "securityToken" and "topic" are optinal + // "securityToken" and "topic" are optional --log-opt env=accessKeyID,accessKeySecret,securityToken,topic \ --env "accessKeyID=xxx" \ --env "accessKeySecret=xxx" \ diff --git a/daemon/logger/alilogs/logapi.go b/daemon/logger/alilogs/logapi.go index 2f8d5bfe9f1fa..67c7f7b272266 100644 --- a/daemon/logger/alilogs/logapi.go +++ b/daemon/logger/alilogs/logapi.go @@ -50,7 +50,8 @@ func (client *AliLogClient) getLogStore(endpoint, projectName, logstoreName, acc } logStore, err := logProject.GetLogStore(logstoreName) if err != nil { - errorMsg := fmt.Sprintf("get logstore fail due to '%s'", err.Error()) + // return loghub error message directly + errorMsg := fmt.Sprintf("Could not get ali logstore %s from project %s due to '%v'", logstoreName, projectName, err) logrus.WithFields(logrus.Fields{ "error": err, }).Error(errorMsg) diff --git a/daemon/logger/logger.go b/daemon/logger/logger.go index 77c6c90ce0a42..74fd5f788e666 100644 --- a/daemon/logger/logger.go +++ b/daemon/logger/logger.go @@ -14,7 +14,7 @@ import ( "sync" "time" - "github.com/docker/docker/pkg/jsonlog" + "github.com/galaxydi/docker/pkg/jsonlog" ) // ErrReadLogsNotSupported is returned when the logger does not support reading logs. From da70ec7deaf1c4c4bf300bdfef968723bfbc771c Mon Sep 17 00:00:00 2001 From: "yiming.wangym" Date: Tue, 18 Apr 2017 06:25:05 -0700 Subject: [PATCH 13/13] change the import path --- daemon/logger/alilogs/alilogs.go | 2 +- daemon/logger/logger.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/daemon/logger/alilogs/alilogs.go b/daemon/logger/alilogs/alilogs.go index b31e5e0bac4db..c6959b165edd6 100644 --- a/daemon/logger/alilogs/alilogs.go +++ b/daemon/logger/alilogs/alilogs.go @@ -7,7 +7,7 @@ import ( "time" "github.com/Sirupsen/logrus" - "github.com/galaxydi/docker/daemon/logger" + "github.com/docker/docker/daemon/logger" "github.com/galaxydi/go-loghub" "github.com/golang/protobuf/proto" ) diff --git a/daemon/logger/logger.go b/daemon/logger/logger.go index 74fd5f788e666..77c6c90ce0a42 100644 --- a/daemon/logger/logger.go +++ b/daemon/logger/logger.go @@ -14,7 +14,7 @@ import ( "sync" "time" - "github.com/galaxydi/docker/pkg/jsonlog" + "github.com/docker/docker/pkg/jsonlog" ) // ErrReadLogsNotSupported is returned when the logger does not support reading logs.