diff --git a/sharded/log.go b/sharded/log.go new file mode 100644 index 0000000..d870be7 --- /dev/null +++ b/sharded/log.go @@ -0,0 +1,96 @@ +package sharded + +import ( + "context" + "fmt" + + "github.com/benbjohnson/clock" + + "github.com/embano1/memlog" +) + +const ( + maxShards = 1000 +) + +type config struct { + shards uint + + // memlog.Log settings + startOffset memlog.Offset + segmentSize int // offsets per segment + maxRecordSize int // bytes +} + +type Log struct { + Sharder + clock clock.Clock + conf config + logs []*memlog.Log +} + +func New(ctx context.Context, shards uint, options ...Option) (*Log, error) { + if shards < 1 { + return nil, fmt.Errorf("shards must be greater than 0") + } + + if shards > maxShards { + return nil, fmt.Errorf("shards must not be greater than %d", maxShards) + } + + var l Log + l.conf.shards = shards + + // apply defaults + for _, opt := range defaultOptions { + if err := opt(&l); err != nil { + return nil, fmt.Errorf("configure log default option: %v", err) + } + } + + // apply custom settings + for _, opt := range options { + if err := opt(&l); err != nil { + return nil, fmt.Errorf("configure log custom option: %v", err) + } + } + + l.logs = make([]*memlog.Log, shards) + for i := 0; i < int(shards); i++ { + ml, err := memlog.New(ctx) + if err != nil { + return nil, fmt.Errorf("create shard: %w", err) + } + l.logs[i] = ml + } + + return &l, nil +} + +func (l *Log) Write(ctx context.Context, key []byte, data []byte) (memlog.Offset, error) { + shard, err := l.Shard(key, l.conf.shards) + if err != nil { + return -1, fmt.Errorf("get shard: %w", err) + } + + offset, err := l.logs[shard].Write(ctx, data) + if err != nil { + return offset, fmt.Errorf("write to shard: %w", err) + } + + return offset, nil +} + +func (l *Log) Read(ctx context.Context, key []byte, offset memlog.Offset) (memlog.Record, error) { + shard, err := l.Shard(key, l.conf.shards) + if err != nil { + return memlog.Record{}, fmt.Errorf("get shard: %w", err) + } + + r, err := l.logs[shard].Read(ctx, offset) + if err != nil { + return memlog.Record{}, fmt.Errorf("read from shard: %w", err) + } + + return r, nil +} diff --git a/sharded/log_test.go b/sharded/log_test.go new file mode 100644 index 0000000..c572146 --- /dev/null +++ b/sharded/log_test.go @@ -0,0 +1,73 @@ +package sharded + +import ( + "context" + "errors" + "fmt" + "testing" + + "gotest.tools/v3/assert" + + "github.com/embano1/memlog" +) + +func TestLog(t *testing.T) { + ctx := context.Background() + kvs := map[string][]string{ + "group": {"michael", "hannah", "bjarne", "soenke"}, + "groups": {"family", "friends", "colleagues"}, + "Z": {"ent inc.", "vmware", "google", "microsoft"}, + "K": {"ent inc.", "vmware", "google", "microsoft"}, + "A": {"ent inc.", "vmware", "google", "microsoft"}, + } + + l, err := New(ctx, 20) + assert.NilError(t, err) + + for k, vals := range kvs { + for _, v := range vals { + _, err = l.Write(ctx, []byte(k), []byte(v)) + assert.NilError(t, err) + // fmt.Printf("key: %s\tval: %s\toffset: %d\n", k, v, offset) + } + } + + for i := range l.logs { + offset := memlog.Offset(0) + for { + r, err := l.logs[i].Read(ctx, offset) + if errors.Is(err, memlog.ErrFutureOffset) { + break + } + assert.NilError(t, err) + fmt.Printf("log: %d\trecord: %v\tdata: %s\n", i, r.Metadata, r.Data) + offset++ + } + } + + offset := 0 + for { + r, err := l.Read(ctx, []byte("groups"), memlog.Offset(offset)) + if errors.Is(err, memlog.ErrFutureOffset) { + break + } + assert.NilError(t, err) + fmt.Printf("key: %s\trecord: %v\tdata:%s\n", "groups", r.Metadata, r.Data) + offset++ + } +} + +/* for k := range kvs { + offset := 0 + for { + r, err := l.Read(ctx, []byte(k), memlog.Offset(offset)) + if errors.Is(err, memlog.ErrFutureOffset) { + break + } + assert.NilError(t, err) + fmt.Printf("key: %s\trecord: %v\tdata:%s\n", k, r.Metadata, r.Data) + offset++ + } + + } +*/ diff --git a/sharded/options.go b/sharded/options.go new file mode 100644 index 0000000..c5fee53 --- /dev/null +++ b/sharded/options.go @@ -0,0 +1,86 @@ +package sharded + +import ( + "errors" + + "github.com/benbjohnson/clock" + + "github.com/embano1/memlog" +) + +const ( + // DefaultStartOffset is the start offset of each shard + DefaultStartOffset = memlog.DefaultStartOffset + // DefaultSegmentSize is the segment size of each shard + DefaultSegmentSize = memlog.DefaultSegmentSize + // DefaultMaxRecordDataBytes is the maximum data (payload) size of a record + DefaultMaxRecordDataBytes = memlog.DefaultMaxRecordDataBytes +) + +// Option customizes a log +type Option func(*Log) error + +var defaultOptions = []Option{ + WithClock(clock.New()), + WithMaxRecordDataSize(DefaultMaxRecordDataBytes), + WithMaxSegmentSize(DefaultSegmentSize), + WithSharder(newDefaultSharder()), + WithStartOffset(DefaultStartOffset), +} + +// WithClock uses the specified clock for setting record timestamps +func WithClock(c clock.Clock) Option { + return func(log *Log) error { + if c == nil { + return errors.New("clock must not be nil") + } + + log.clock = c + return nil + } +} + +// WithMaxRecordDataSize sets the maximum record data (payload) size in bytes +func WithMaxRecordDataSize(size int) Option { + return func(log *Log) error { + if size <= 0 { + return errors.New("size must be greater than 0") + } + log.conf.maxRecordSize = size + return nil + } +} + +// WithMaxSegmentSize sets the maximum size, i.e. number of offsets, in a log +// segment. Must be greater than 0. +func WithMaxSegmentSize(size int) Option { + return func(log *Log) error { + if size <= 0 { + return errors.New("size must be greater than 0") + } + log.conf.segmentSize = size + return nil + } +} + +func WithSharder(s Sharder) Option { + return func(log *Log) error { + if s == nil { + return errors.New("sharder must not be nil") + } + log.Sharder = s + return nil + } +} + +// WithStartOffset sets the start offset of the log. Must be equal or greater +// than 0. +func WithStartOffset(offset memlog.Offset) Option { + return func(log *Log) error { + if offset < 0 { + return errors.New("start offset must not be negative") + } + log.conf.startOffset = offset + return nil + } +} diff --git a/sharded/sharder.go b/sharded/sharder.go new file mode 100644 index 0000000..6ce2292 --- /dev/null +++ b/sharded/sharder.go @@ -0,0 +1,49 @@ +package sharded + +import ( + "fmt" + "hash" + "hash/fnv" + "sync" +) + +type Sharder interface { + Shard(key []byte, shards uint) (uint, error) +} + +type defaultSharder struct { + sync.Mutex + hasher hash.Hash32 +} + +func newDefaultSharder() *defaultSharder { + return &defaultSharder{ + hasher: fnv.New32a(), + } +} + +func (d *defaultSharder) Shard(key []byte, shards uint) (uint, error) { + h, err := d.hash(key) + if err != nil { + return 0, fmt.Errorf("hash key: %w", err) + } + + shard := int32(h) % int32(shards) + if shard < 0 { + shard = -shard + } + return uint(shard), nil +} + +func (d *defaultSharder) hash(key []byte) (uint32, error) { + d.Lock() + defer d.Unlock() + + d.hasher.Reset() + _, err := d.hasher.Write(key) + if err != nil { + return 0, err + } + + return d.hasher.Sum32(), nil +}