Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Gasch <mgasch@vmware.com>
  • Loading branch information
Michael Gasch committed Jan 18, 2022
1 parent f688f35 commit fffd230
Show file tree
Hide file tree
Showing 4 changed files with 304 additions and 0 deletions.
96 changes: 96 additions & 0 deletions sharded/log.go
@@ -0,0 +1,96 @@
package sharded

import (
"context"
"fmt"

"github.com/benbjohnson/clock"

"github.com/embano1/memlog"
)

const (
maxShards = 1000
)

type config struct {
shards uint

// memlog.Log settings
startOffset memlog.Offset
segmentSize int // offsets per segment
maxRecordSize int // bytes
}

type Log struct {
Sharder
clock clock.Clock
conf config
logs []*memlog.Log
}

func New(ctx context.Context, shards uint, options ...Option) (*Log, error) {
if shards < 1 {
return nil, fmt.Errorf("shards must be greater than 0")
}

if shards > maxShards {
return nil, fmt.Errorf("shards must not be greater than %d", maxShards)
}

var l Log
l.conf.shards = shards

// apply defaults
for _, opt := range defaultOptions {
if err := opt(&l); err != nil {
return nil, fmt.Errorf("configure log default option: %v", err)
}
}

// apply custom settings
for _, opt := range options {
if err := opt(&l); err != nil {
return nil, fmt.Errorf("configure log custom option: %v", err)
}
}

l.logs = make([]*memlog.Log, shards)
for i := 0; i < int(shards); i++ {
ml, err := memlog.New(ctx)
if err != nil {
return nil, fmt.Errorf("create shard: %w", err)
}
l.logs[i] = ml
}

return &l, nil
}

func (l *Log) Write(ctx context.Context, key []byte, data []byte) (memlog.Offset, error) {
shard, err := l.Shard(key, l.conf.shards)
if err != nil {
return -1, fmt.Errorf("get shard: %w", err)
}

offset, err := l.logs[shard].Write(ctx, data)
if err != nil {
return offset, fmt.Errorf("write to shard: %w", err)
}

return offset, nil
}

func (l *Log) Read(ctx context.Context, key []byte, offset memlog.Offset) (memlog.Record, error) {
shard, err := l.Shard(key, l.conf.shards)
if err != nil {
return memlog.Record{}, fmt.Errorf("get shard: %w", err)
}

r, err := l.logs[shard].Read(ctx, offset)
if err != nil {
return memlog.Record{}, fmt.Errorf("read from shard: %w", err)
}

return r, nil
}
73 changes: 73 additions & 0 deletions sharded/log_test.go
@@ -0,0 +1,73 @@
package sharded

import (
"context"
"errors"
"fmt"
"testing"

"gotest.tools/v3/assert"

"github.com/embano1/memlog"
)

func TestLog(t *testing.T) {
ctx := context.Background()
kvs := map[string][]string{
"group": {"michael", "hannah", "bjarne", "soenke"},
"groups": {"family", "friends", "colleagues"},
"Z": {"ent inc.", "vmware", "google", "microsoft"},
"K": {"ent inc.", "vmware", "google", "microsoft"},
"A": {"ent inc.", "vmware", "google", "microsoft"},
}

l, err := New(ctx, 20)
assert.NilError(t, err)

for k, vals := range kvs {
for _, v := range vals {
_, err = l.Write(ctx, []byte(k), []byte(v))
assert.NilError(t, err)
// fmt.Printf("key: %s\tval: %s\toffset: %d\n", k, v, offset)
}
}

for i := range l.logs {
offset := memlog.Offset(0)
for {
r, err := l.logs[i].Read(ctx, offset)
if errors.Is(err, memlog.ErrFutureOffset) {
break
}
assert.NilError(t, err)
fmt.Printf("log: %d\trecord: %v\tdata: %s\n", i, r.Metadata, r.Data)
offset++
}
}

offset := 0
for {
r, err := l.Read(ctx, []byte("groups"), memlog.Offset(offset))
if errors.Is(err, memlog.ErrFutureOffset) {
break
}
assert.NilError(t, err)
fmt.Printf("key: %s\trecord: %v\tdata:%s\n", "groups", r.Metadata, r.Data)
offset++
}
}

/* for k := range kvs {
offset := 0
for {
r, err := l.Read(ctx, []byte(k), memlog.Offset(offset))
if errors.Is(err, memlog.ErrFutureOffset) {
break
}
assert.NilError(t, err)
fmt.Printf("key: %s\trecord: %v\tdata:%s\n", k, r.Metadata, r.Data)
offset++
}
}
*/
86 changes: 86 additions & 0 deletions sharded/options.go
@@ -0,0 +1,86 @@
package sharded

import (
"errors"

"github.com/benbjohnson/clock"

"github.com/embano1/memlog"
)

const (
// DefaultStartOffset is the start offset of each shard
DefaultStartOffset = memlog.DefaultStartOffset
// DefaultSegmentSize is the segment size of each shard
DefaultSegmentSize = memlog.DefaultSegmentSize
// DefaultMaxRecordDataBytes is the maximum data (payload) size of a record
DefaultMaxRecordDataBytes = memlog.DefaultMaxRecordDataBytes
)

// Option customizes a log
type Option func(*Log) error

var defaultOptions = []Option{
WithClock(clock.New()),
WithMaxRecordDataSize(DefaultMaxRecordDataBytes),
WithMaxSegmentSize(DefaultSegmentSize),
WithSharder(newDefaultSharder()),
WithStartOffset(DefaultStartOffset),
}

// WithClock uses the specified clock for setting record timestamps
func WithClock(c clock.Clock) Option {
return func(log *Log) error {
if c == nil {
return errors.New("clock must not be nil")
}

log.clock = c
return nil
}
}

// WithMaxRecordDataSize sets the maximum record data (payload) size in bytes
func WithMaxRecordDataSize(size int) Option {
return func(log *Log) error {
if size <= 0 {
return errors.New("size must be greater than 0")
}
log.conf.maxRecordSize = size
return nil
}
}

// WithMaxSegmentSize sets the maximum size, i.e. number of offsets, in a log
// segment. Must be greater than 0.
func WithMaxSegmentSize(size int) Option {
return func(log *Log) error {
if size <= 0 {
return errors.New("size must be greater than 0")
}
log.conf.segmentSize = size
return nil
}
}

func WithSharder(s Sharder) Option {
return func(log *Log) error {
if s == nil {
return errors.New("sharder must not be nil")
}
log.Sharder = s
return nil
}
}

// WithStartOffset sets the start offset of the log. Must be equal or greater
// than 0.
func WithStartOffset(offset memlog.Offset) Option {
return func(log *Log) error {
if offset < 0 {
return errors.New("start offset must not be negative")
}
log.conf.startOffset = offset
return nil
}
}
49 changes: 49 additions & 0 deletions sharded/sharder.go
@@ -0,0 +1,49 @@
package sharded

import (
"fmt"
"hash"
"hash/fnv"
"sync"
)

type Sharder interface {
Shard(key []byte, shards uint) (uint, error)
}

type defaultSharder struct {
sync.Mutex
hasher hash.Hash32
}

func newDefaultSharder() *defaultSharder {
return &defaultSharder{
hasher: fnv.New32a(),
}
}

func (d *defaultSharder) Shard(key []byte, shards uint) (uint, error) {
h, err := d.hash(key)
if err != nil {
return 0, fmt.Errorf("hash key: %w", err)
}

shard := int32(h) % int32(shards)
if shard < 0 {
shard = -shard
}
return uint(shard), nil
}

func (d *defaultSharder) hash(key []byte) (uint32, error) {
d.Lock()
defer d.Unlock()

d.hasher.Reset()
_, err := d.hasher.Write(key)
if err != nil {
return 0, err
}

return d.hasher.Sum32(), nil
}

0 comments on commit fffd230

Please sign in to comment.