Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

build: revert deletion of influx CLI code #21919

Merged
merged 6 commits into from
Jul 22, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
48 changes: 46 additions & 2 deletions .goreleaser.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,32 @@
project_name: influxdb2
builds:
- id: influx
goos:
- linux
- darwin
- windows
goarch:
- amd64
- arm64
ignore:
- goos: darwin
goarch: arm64
- goos: windows
goarch: arm64
main: ./cmd/influx/
flags:
- -tags={{if eq .Os "linux"}}osusergo,netgo,static_build{{if not (eq .Arch "amd64")}},noasm{{end}}{{end}}
- -buildmode={{if eq .Os "windows"}}exe{{else}}pie{{end}}
env:
- GO111MODULE=on
- CGO_ENABLED=1
- CC=xcc
- PKG_CONFIG=$GOPATH/bin/pkg-config
- MACOSX_DEPLOYMENT_TARGET=10.11
ldflags:
- -s -w -X main.version={{.Version}} -X main.commit={{.ShortCommit}} -X main.date={{.Date}} {{if eq .Os "linux"}}-extldflags "-fno-PIC -static -Wl,-z,stack-size=8388608"{{end}}
binary: influx

- id: influxd
goos:
- linux
Expand Down Expand Up @@ -31,7 +58,7 @@ builds:

nfpms:
- id: "influxdb2"
builds: ["influxd"]
builds: ["influx", "influxd"]
formats:
- deb
- rpm
Expand Down Expand Up @@ -69,7 +96,18 @@ nfpms:
license: MIT

archives:
- id: influxd
- id: influx_only
builds: ["influx"]
format: tar.gz
format_overrides:
- goos: windows
format: zip
wrap_in_directory: true
name_template: "influxdb2-client-{{ .Version }}-{{ .Os }}-{{ .Arch }}"
files:
- LICENSE
- README.md
- id: influx_and_influxd
format: tar.gz
format_overrides:
- goos: windows
Expand All @@ -93,6 +131,9 @@ checksum:
dockers:
- goos: linux
goarch: amd64
binaries:
- influxd
- influx
image_templates:
- "quay.io/influxdb/influxdb-amd64:{{ .Tag }}"
dockerfile: docker/influxd/Dockerfile
Expand All @@ -103,6 +144,9 @@ dockers:
use_buildx: true
- goos: linux
goarch: arm64
binaries:
- influxd
- influx
image_templates:
- "quay.io/influxdb/influxdb-arm64v8:{{ .Tag }}"
dockerfile: docker/influxd/Dockerfile
Expand Down
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ RUN make
FROM debian:stretch-slim AS influx

COPY --from=dbuild-all /code/bin/linux/influxd /usr/bin/influxd
COPY --from=dbuild-all /code/bin/linux/influx /usr/bin/influx

EXPOSE 8086

Expand Down
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ SOURCES_NO_VENDOR := $(shell find . -path ./vendor -prune -o -name "*.go" -not -

# List of binary cmds to build
CMDS := \
bin/$(GOOS)/influx \
bin/$(GOOS)/influxd

all: generate $(CMDS)
Expand All @@ -74,8 +75,14 @@ all: generate $(CMDS)
bin/$(GOOS)/influxd: $(SOURCES)
$(GO_BUILD) -o $@ ./cmd/$(shell basename "$@")

bin/$(GOOS)/influx: $(SOURCES)
$(GO_BUILD_SM) -o $@ ./cmd/$(shell basename "$@")

# Ease of use build for just the go binary
influxd: bin/$(GOOS)/influxd

influx: bin/$(GOOS)/influx

static/data/build: scripts/fetch-ui-assets.sh
./scripts/fetch-ui-assets.sh

Expand Down
251 changes: 251 additions & 0 deletions backup/backup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
package backup

import (
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"time"

"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/bolt"
"github.com/influxdata/influxdb/v2/tenant"
"github.com/influxdata/influxdb/v2/v1/services/meta"
"go.uber.org/zap"
)

type Request struct {
// Organization to backup.
// If not set, all orgs will be included.
OrgID influxdb.ID
Org string

// Bucket to backup.
// If not set, all buckets within the org filter will be included.
BucketID influxdb.ID
Bucket string

// Path to the directory where backup files should be written.
Path string
}

type backupRunner struct {
baseName string
backupSvc influxdb.BackupService
tenantService influxdb.TenantService
metaClient *meta.Client
log *zap.Logger
}

func RunBackup(ctx context.Context, req Request, svc influxdb.BackupService, log *zap.Logger) error {
if err := os.MkdirAll(req.Path, 0777); err != nil {
return err
}

var manifest influxdb.Manifest
runner := backupRunner{
baseName: time.Now().UTC().Format(influxdb.BackupFilenamePattern),
backupSvc: svc,
log: log,
}

manifest.KV.FileName = fmt.Sprintf("%s.bolt", runner.baseName)
kvPath := filepath.Join(req.Path, manifest.KV.FileName)
if err := runner.backupKV(ctx, kvPath); err != nil {
return err
}

fi, err := os.Stat(kvPath)
if err != nil {
return fmt.Errorf("failed to inspect local KV backup at %q: %w", kvPath, err)
}
manifest.KV.Size = fi.Size()

// Inspect the backed-up KV data so we can iterate through orgs & buckets.
kvStore := bolt.NewKVStore(runner.log, kvPath)
if err := kvStore.Open(ctx); err != nil {
return err
}
defer kvStore.Close()

runner.tenantService = tenant.NewService(tenant.NewStore(kvStore))
runner.metaClient = meta.NewClient(meta.NewConfig(), kvStore)
if err := runner.metaClient.Open(); err != nil {
return err
}
defer runner.metaClient.Close()

manifest.Files, err = runner.findShards(ctx, req)
if err != nil {
return err
}

for i := range manifest.Files {
if err := runner.backupShard(ctx, &manifest.Files[i], req); err != nil {
return err
}
}

manifestPath := filepath.Join(req.Path, fmt.Sprintf("%s.manifest", runner.baseName))
if err := runner.writeManifest(manifest, manifestPath); err != nil {
return fmt.Errorf("failed to write backup manfiest to %q: %w", manifestPath, err)
}

log.Info("Backup complete", zap.String("path", req.Path))
return nil
}

func (r *backupRunner) backupKV(ctx context.Context, path string) error {
r.log.Info("Backing up KV store", zap.String("path", path))
f, err := os.Create(path)
if err != nil {
return fmt.Errorf("failed to open local KV backup file at %q: %w", path, err)
}

// Stream bolt file from server, sync, and ensure file closes correctly.
if err := r.backupSvc.BackupKVStore(ctx, f); err != nil {
_ = f.Close()
return fmt.Errorf("failed to download KV backup: %w", err)
}
if err := f.Sync(); err != nil {
_ = f.Close()
return fmt.Errorf("failed to flush KV backup to local disk: %w", err)
}
if err := f.Close(); err != nil {
return fmt.Errorf("failed to close local KV backup at %q: %w", path, err)
}

return nil
}

func (r *backupRunner) findShards(ctx context.Context, req Request) ([]influxdb.ManifestEntry, error) {
filter := influxdb.OrganizationFilter{}
if req.OrgID.Valid() {
filter.ID = &req.OrgID
}
if req.Org != "" {
filter.Name = &req.Org
}

orgs, _, err := r.tenantService.FindOrganizations(ctx, filter)
if err != nil {
return nil, fmt.Errorf("failed to find matching organizations: %w", err)
}

var entries []influxdb.ManifestEntry
for _, org := range orgs {
r.log.Info("Backing up organization", zap.String("id", org.ID.String()), zap.String("name", org.Name))
oentries, err := r.findOrgShards(ctx, org, req)
if err != nil {
return nil, err
}
entries = append(entries, oentries...)
}

return entries, nil
}

func (r *backupRunner) findOrgShards(ctx context.Context, org *influxdb.Organization, req Request) ([]influxdb.ManifestEntry, error) {
filter := influxdb.BucketFilter{OrganizationID: &org.ID}
if req.BucketID.Valid() {
filter.ID = &req.BucketID
}
if req.Bucket != "" {
filter.Name = &req.Bucket
}
buckets, _, err := r.tenantService.FindBuckets(ctx, filter)
if err != nil {
return nil, fmt.Errorf("failed to find matching buckets: %w", err)
}

var entries []influxdb.ManifestEntry
for _, bucket := range buckets {
r.log.Info("Backing up bucket", zap.String("id", bucket.ID.String()), zap.String("name", bucket.Name))

// Lookup matching database from the bucket.
dbi := r.metaClient.Database(bucket.ID.String())
if dbi == nil {
return nil, fmt.Errorf("bucket database not found: %s", bucket.ID.String())
}

// Collect info for each shard in the DB.
for _, rpi := range dbi.RetentionPolicies {
for _, sg := range rpi.ShardGroups {
if sg.Deleted() {
continue
}

for _, sh := range sg.Shards {
entries = append(entries, influxdb.ManifestEntry{
OrganizationID: org.ID.String(),
OrganizationName: org.Name,
BucketID: bucket.ID.String(),
BucketName: bucket.Name,
ShardID: sh.ID,
})
}
}
}
}

return entries, nil
}

func (r *backupRunner) backupShard(ctx context.Context, shardInfo *influxdb.ManifestEntry, req Request) error {
shardInfo.FileName = fmt.Sprintf("%s.s%d.tar.gz", r.baseName, shardInfo.ShardID)
path := filepath.Join(req.Path, shardInfo.FileName)
r.log.Info("Backing up shard", zap.Uint64("id", shardInfo.ShardID), zap.String("path", path))

f, err := os.Create(path)
if err != nil {
return fmt.Errorf("failed to open local shard backup at %q: %w", path, err)
}
gw := gzip.NewWriter(f)

// Stream file from server, sync, and ensure file closes correctly.
if err := r.backupSvc.BackupShard(ctx, gw, shardInfo.ShardID, time.Time{}); err != nil {
_ = gw.Close()
_ = f.Close()

if influxdb.ErrorCode(err) == influxdb.ENotFound {
r.log.Warn("Shard removed during backup", zap.Uint64("id", shardInfo.ShardID))
return nil
}
return fmt.Errorf("failed to download shard backup: %w", err)
}
if err := gw.Close(); err != nil {
_ = f.Close()
return fmt.Errorf("failed to flush GZIP footer to local shard backup: %w", err)
}
if err := f.Sync(); err != nil {
_ = f.Close()
return fmt.Errorf("failed to flush shard backup to local disk: %w", err)
}
if err := f.Close(); err != nil {
return fmt.Errorf("failed to close local shard backup at %q: %w", path, err)
}

// Use downloaded file's info to fill in remaining pieces of manifest.
fi, err := os.Stat(path)
if err != nil {
return fmt.Errorf("failed to inspect local shard backup at %q: %w", path, err)
}
shardInfo.Size = fi.Size()
shardInfo.LastModified = fi.ModTime().UTC()

return nil
}

func (r *backupRunner) writeManifest(manifest influxdb.Manifest, path string) error {
r.log.Info("Writing manifest", zap.String("path", path))

buf, err := json.MarshalIndent(manifest, "", " ")
if err != nil {
return err
}
buf = append(buf, '\n')
return ioutil.WriteFile(path, buf, 0600)
}