Skip to content

Commit

Permalink
feat: compaction support (#282)
Browse files Browse the repository at this point in the history
* feat: compaction support
* docs: add jetstream note for NATS backend compaction
* fix: handle nil compaction response

Signed-off-by: Tyler Gillson <tyler.gillson@gmail.com>
  • Loading branch information
TylerGillson committed May 9, 2024
1 parent 5a50c68 commit edd8f35
Show file tree
Hide file tree
Showing 10 changed files with 47 additions and 9 deletions.
9 changes: 9 additions & 0 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Client interface {
Create(ctx context.Context, key string, value []byte) error
Update(ctx context.Context, key string, revision int64, value []byte) error
Delete(ctx context.Context, key string, revision int64) error
Compact(ctx context.Context, revision int64) (int64, error)
Close() error
}

Expand Down Expand Up @@ -144,6 +145,14 @@ func (c *client) Delete(ctx context.Context, key string, revision int64) error {
return nil
}

func (c *client) Compact(ctx context.Context, revision int64) (int64, error) {
resp, err := c.c.Compact(ctx, revision)
if resp != nil {
return resp.Header.GetRevision(), err
}
return 0, err
}

func (c *client) Close() error {
return c.c.Close()
}
5 changes: 5 additions & 0 deletions pkg/drivers/nats/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,3 +427,8 @@ func (b *Backend) Watch(ctx context.Context, prefix string, startRevision int64)
CurrentRevision: rev,
}
}

// Compact is a no-op / not implemented. Revision history is managed by the jetstream bucket.
func (b *Backend) Compact(ctx context.Context, revision int64) (int64, error) {
return revision, nil
}
5 changes: 5 additions & 0 deletions pkg/drivers/nats/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,8 @@ func (b *BackendLogger) DbSize(ctx context.Context) (int64, error) {
func (b *BackendLogger) CurrentRevision(ctx context.Context) (int64, error) {
return b.backend.CurrentRevision(ctx)
}

// Compact is a no-op / not implemented. Revision history is managed by the jetstream bucket.
func (b *BackendLogger) Compact(ctx context.Context, revision int64) (int64, error) {
return revision, nil
}
5 changes: 5 additions & 0 deletions pkg/logstructured/logstructured.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Log interface {
Watch(ctx context.Context, prefix string) <-chan []*server.Event
Append(ctx context.Context, event *server.Event) (int64, error)
DbSize(ctx context.Context) (int64, error)
Compact(ctx context.Context, revision int64) (int64, error)
}

type ttlEventKV struct {
Expand Down Expand Up @@ -496,3 +497,7 @@ func (l *LogStructured) DbSize(ctx context.Context) (int64, error) {
func (l *LogStructured) CurrentRevision(ctx context.Context) (int64, error) {
return l.log.CurrentRevision(ctx)
}

func (l *LogStructured) Compact(ctx context.Context, revision int64) (int64, error) {
return l.log.Compact(ctx, revision)
}
4 changes: 4 additions & 0 deletions pkg/logstructured/sqllog/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,3 +612,7 @@ func safeCompactRev(targetCompactRev int64, currentRev int64) int64 {
func (s *SQLLog) DbSize(ctx context.Context) (int64, error) {
return s.d.GetSize(ctx)
}

func (s *SQLLog) Compact(ctx context.Context, revision int64) (int64, error) {
return s.d.Compact(ctx, revision)
}
11 changes: 10 additions & 1 deletion pkg/server/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ import (
"go.etcd.io/etcd/api/v3/mvccpb"
)

func (l *LimitedServer) Compact(ctx context.Context, r *etcdserverpb.CompactionRequest) (*etcdserverpb.CompactionResponse, error) {
rev, err := l.backend.Compact(ctx, r.Revision)
return &etcdserverpb.CompactionResponse{
Header: &etcdserverpb.ResponseHeader{
Revision: rev,
},
}, err
}

func isCompact(txn *etcdserverpb.TxnRequest) bool {
// See https://github.com/kubernetes/kubernetes/blob/442a69c3bdf6fe8e525b05887e57d89db1e2f3a5/staging/src/k8s.io/apiserver/pkg/storage/etcd3/compact.go#L72
return len(txn.Compare) == 1 &&
Expand All @@ -19,7 +28,7 @@ func isCompact(txn *etcdserverpb.TxnRequest) bool {
string(txn.Compare[0].Key) == "compact_rev_key"
}

func (l *LimitedServer) compact(ctx context.Context) (*etcdserverpb.TxnResponse, error) {
func (l *LimitedServer) compact() (*etcdserverpb.TxnResponse, error) {
// return comparison failure so that the apiserver does not bother compacting
return &etcdserverpb.TxnResponse{
Header: &etcdserverpb.ResponseHeader{},
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func isCreate(txn *etcdserverpb.TxnRequest) *etcdserverpb.PutRequest {
return nil
}

func (l *LimitedServer) create(ctx context.Context, put *etcdserverpb.PutRequest, txn *etcdserverpb.TxnRequest) (*etcdserverpb.TxnResponse, error) {
func (l *LimitedServer) create(ctx context.Context, put *etcdserverpb.PutRequest) (*etcdserverpb.TxnResponse, error) {
if put.IgnoreLease {
return nil, unsupported("ignoreLease")
} else if put.IgnoreValue {
Expand Down
10 changes: 5 additions & 5 deletions pkg/server/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ func (k *KVServerBridge) Txn(ctx context.Context, r *etcdserverpb.TxnRequest) (*
}

func (k *KVServerBridge) Compact(ctx context.Context, r *etcdserverpb.CompactionRequest) (*etcdserverpb.CompactionResponse, error) {
return &etcdserverpb.CompactionResponse{
Header: &etcdserverpb.ResponseHeader{
Revision: r.Revision,
},
}, nil
res, err := k.limited.Compact(ctx, r)
if err != nil {
logrus.Errorf("error in compact %s: %v", r, err)
}
return res, err
}
4 changes: 2 additions & 2 deletions pkg/server/limited.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func txnHeader(rev int64) *etcdserverpb.ResponseHeader {

func (l *LimitedServer) Txn(ctx context.Context, txn *etcdserverpb.TxnRequest) (*etcdserverpb.TxnResponse, error) {
if put := isCreate(txn); put != nil {
return l.create(ctx, put, txn)
return l.create(ctx, put)
}
if rev, key, ok := isDelete(txn); ok {
return l.delete(ctx, key, rev)
Expand All @@ -37,7 +37,7 @@ func (l *LimitedServer) Txn(ctx context.Context, txn *etcdserverpb.TxnRequest) (
return l.update(ctx, rev, key, value, lease)
}
if isCompact(txn) {
return l.compact(ctx)
return l.compact()
}
return nil, ErrNotSupported
}
Expand Down
1 change: 1 addition & 0 deletions pkg/server/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Backend interface {
Watch(ctx context.Context, key string, revision int64) WatchResult
DbSize(ctx context.Context) (int64, error)
CurrentRevision(ctx context.Context) (int64, error)
Compact(ctx context.Context, revision int64) (int64, error)
}

type Dialect interface {
Expand Down

0 comments on commit edd8f35

Please sign in to comment.