diff --git a/README.md b/README.md index 2ed19bb..811d6c7 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ structure modeled as a *Log*. read more about the ideas behind `memlog` please see ["The Log: What every software engineer should know about real-time data's unifying abstraction"](https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying). + ## Motivation I keep hitting the same user story (use case) over and over again: one or more @@ -117,9 +118,12 @@ One is not constrained by just creating **one** `Log`. For certain use cases, creating multiple `Logs` might be useful. For example: - Manage completely different data sets/sizes in the same process -- Partitioning input data by type or *key* - Setting different `Log` sizes (i.e. retention times), e.g. premium users will have access to a larger *history* of `Records` +- Partitioning input data by type or *key* + +💡 For use cases where you want to order the log by `key(s)`, consider using the +specialised [`sharded.Log`](sharded/README.md). ## Example diff --git a/options.go b/options.go index 0c1c3c6..5292e32 100644 --- a/options.go +++ b/options.go @@ -9,7 +9,7 @@ import ( const ( // DefaultStartOffset is the start offset of the log DefaultStartOffset = Offset(0) - // DefaultSegmentSize is the segment size of the log + // DefaultSegmentSize is the segment size, i.e. number of offsets, in the log DefaultSegmentSize = 1024 // DefaultMaxRecordDataBytes is the maximum data (payload) size of a record DefaultMaxRecordDataBytes = 1024 << 10 // 1MiB diff --git a/sharded/README.md b/sharded/README.md new file mode 100644 index 0000000..f15ffc3 --- /dev/null +++ b/sharded/README.md @@ -0,0 +1,92 @@ +# About + +## tl;dr + +A purpose-built implementation of `memlog.Log` with support for sharding +(partitioning) `Records` by `key`. + +# Usage + +The `sharded.Log` is built on `memlog.Log` and provides a similar API for +reading and writing data (`Records`), `Offset` handling, etc. + +The `Read()` and `Write()` methods accept a sharding `key` to distribute the +`Records` based on a (configurable) sharding strategy. + +Unless specified otherwise, the default sharding strategy uses Golang's +[`fnv.New32a`](https://pkg.go.dev/hash/fnv#New32a) to retrieve a hash and find +the corresponding `Shard` using a *modulo* operation based on the number of +(configurable) `Shards` in the `Log`. + +💡 Depending on the number of `Shards`, number of distinct `keys` and their +*hashes*, multiple `keys` might be stored in the same `Shard`. If strict `key` +separation is required, a custom `Sharder` can be implemented. For convenience, +a `KeySharder` is provided. + +See [pkg.go.dev](https://pkg.go.dev/github.com/embano1/memlog/sharded) for the +API reference and examples. + +## Example + +```go +package main + +import ( + "context" + "errors" + "fmt" + "os" + + "github.com/embano1/memlog" + "github.com/embano1/memlog/sharded" +) + +func main() { + ctx := context.Background() + + // the default number of shards (1000) is sufficient to assign a shard per key + // for this example (i.e. no key overlap within a shard) + l, err := sharded.New(ctx) + if err != nil { + fmt.Printf("create log: %v", err) + os.Exit(1) + } + + data := map[string][]string{ + "users": {"tom", "sarah", "ajit"}, + "groups": {"friends", "family", "colleagues"}, + } + + for key, vals := range data { + for _, val := range vals { + _, err := l.Write(ctx, []byte(key), []byte(val)) + if err != nil { + fmt.Printf("write: %v", err) + os.Exit(1) + } + } + } + + fmt.Println("reading all users...") + offset := memlog.Offset(0) + for { + record, err := l.Read(ctx, []byte("users"), offset) + if err != nil { + if errors.Is(err, memlog.ErrFutureOffset) { + break // end of log + } + fmt.Printf("read: %v", err) + os.Exit(1) + } + + fmt.Printf("- %s\n", string(record.Data)) + offset++ + } + + // Output: reading all users... + // - tom + // - sarah + // - ajit +} + +``` \ No newline at end of file diff --git a/sharded/example_sharder_test.go b/sharded/example_sharder_test.go new file mode 100644 index 0000000..4585f0b --- /dev/null +++ b/sharded/example_sharder_test.go @@ -0,0 +1,80 @@ +package sharded_test + +import ( + "context" + "errors" + "fmt" + "os" + + "github.com/embano1/memlog" + "github.com/embano1/memlog/sharded" +) + +func Example_sharder() { + ctx := context.Background() + + keys := []string{"galaxies", "planets"} + ks := sharded.NewKeySharder(keys) + + opts := []sharded.Option{ + sharded.WithNumShards(2), // must be >=len(keys) + sharded.WithSharder(ks), + } + l, err := sharded.New(ctx, opts...) + if err != nil { + fmt.Printf("create log: %v", err) + os.Exit(1) + } + + data := map[string][]string{ + keys[0]: {"Centaurus A", "Andromeda", "Eye of Sauron"}, + keys[1]: {"Mercury", "Venus", "Earth", "Mars", "Jupiter", "Saturn", "Uranus", "Neptune"}, + } + + for key, vals := range data { + for _, val := range vals { + _, err := l.Write(ctx, []byte(key), []byte(val)) + if err != nil { + fmt.Printf("write: %v", err) + os.Exit(1) + } + } + } + +KEYS: + for _, key := range keys { + fmt.Printf("reading all %s...\n", key) + + offset := memlog.Offset(0) + for { + read, err := l.Read(ctx, []byte(key), offset) + if err != nil { + if errors.Is(err, memlog.ErrFutureOffset) { + fmt.Println() + continue KEYS + } + fmt.Printf("read: %v", err) + os.Exit(1) + } + + fmt.Printf("- %s\n", string(read.Data)) + offset++ + } + + } + + // Output: reading all galaxies... + // - Centaurus A + // - Andromeda + // - Eye of Sauron + // + // reading all planets... + // - Mercury + // - Venus + // - Earth + // - Mars + // - Jupiter + // - Saturn + // - Uranus + // - Neptune +} diff --git a/sharded/example_test.go b/sharded/example_test.go new file mode 100644 index 0000000..c627d8f --- /dev/null +++ b/sharded/example_test.go @@ -0,0 +1,59 @@ +package sharded_test + +import ( + "context" + "errors" + "fmt" + "os" + + "github.com/embano1/memlog" + "github.com/embano1/memlog/sharded" +) + +func Example() { + ctx := context.Background() + + // the default number of shards (1000) is sufficient to assign a shard per key + // for this example (i.e. no key overlap within a shard) + l, err := sharded.New(ctx) + if err != nil { + fmt.Printf("create log: %v", err) + os.Exit(1) + } + + data := map[string][]string{ + "users": {"tom", "sarah", "ajit"}, + "groups": {"friends", "family", "colleagues"}, + } + + for key, vals := range data { + for _, val := range vals { + _, err := l.Write(ctx, []byte(key), []byte(val)) + if err != nil { + fmt.Printf("write: %v", err) + os.Exit(1) + } + } + } + + fmt.Println("reading all users...") + offset := memlog.Offset(0) + for { + record, err := l.Read(ctx, []byte("users"), offset) + if err != nil { + if errors.Is(err, memlog.ErrFutureOffset) { + break // end of log + } + fmt.Printf("read: %v", err) + os.Exit(1) + } + + fmt.Printf("- %s\n", string(record.Data)) + offset++ + } + + // Output: reading all users... + // - tom + // - sarah + // - ajit +} diff --git a/sharded/log.go b/sharded/log.go new file mode 100644 index 0000000..4475052 --- /dev/null +++ b/sharded/log.go @@ -0,0 +1,108 @@ +package sharded + +import ( + "context" + "errors" + "fmt" + + "github.com/benbjohnson/clock" + + "github.com/embano1/memlog" +) + +type config struct { + shards uint + + // memlog.Log settings + startOffset memlog.Offset + segmentSize int // offsets per segment + maxRecordSize int // bytes +} + +// Log is a sharded log implementation on top of memlog.Log. It uses a +// configurable sharding strategy (see Sharder interface) during reads and +// writes. +type Log struct { + sharder Sharder + clock clock.Clock + conf config + shards []*memlog.Log +} + +// New creates a new sharded log which can be customized with options. If not +// specified, the default sharding strategy uses fnv.New32a for key hashing. +func New(ctx context.Context, options ...Option) (*Log, error) { + var l Log + + // apply defaults + for _, opt := range defaultOptions { + if err := opt(&l); err != nil { + return nil, fmt.Errorf("configure log default option: %v", err) + } + } + + // apply custom settings + for _, opt := range options { + if err := opt(&l); err != nil { + return nil, fmt.Errorf("configure log custom option: %v", err) + } + } + + shards := l.conf.shards + l.shards = make([]*memlog.Log, shards) + opts := []memlog.Option{ + memlog.WithClock(l.clock), + memlog.WithMaxRecordDataSize(l.conf.maxRecordSize), + memlog.WithStartOffset(l.conf.startOffset), + memlog.WithMaxSegmentSize(l.conf.segmentSize), + } + + for i := 0; i < int(shards); i++ { + ml, err := memlog.New(ctx, opts...) + if err != nil { + return nil, fmt.Errorf("create shard: %w", err) + } + l.shards[i] = ml + } + + return &l, nil +} + +// Write writes data to the log using the specified key for sharding +func (l *Log) Write(ctx context.Context, key []byte, data []byte) (memlog.Offset, error) { + if key == nil { + return -1, errors.New("invalid key") + } + + shard, err := l.sharder.Shard(key, l.conf.shards) + if err != nil { + return -1, fmt.Errorf("get shard: %w", err) + } + + offset, err := l.shards[shard].Write(ctx, data) + if err != nil { + return offset, fmt.Errorf("write to shard: %w", err) + } + + return offset, nil +} + +// Read reads a record from the log at offset using the specified key for shard +// lookup +func (l *Log) Read(ctx context.Context, key []byte, offset memlog.Offset) (memlog.Record, error) { + if key == nil { + return memlog.Record{}, errors.New("invalid key") + } + + shard, err := l.sharder.Shard(key, l.conf.shards) + if err != nil { + return memlog.Record{}, fmt.Errorf("get shard: %w", err) + } + + r, err := l.shards[shard].Read(ctx, offset) + if err != nil { + return memlog.Record{}, fmt.Errorf("read from shard: %w", err) + } + + return r, nil +} diff --git a/sharded/log_internal_test.go b/sharded/log_internal_test.go new file mode 100644 index 0000000..490abd4 --- /dev/null +++ b/sharded/log_internal_test.go @@ -0,0 +1,103 @@ +package sharded + +import ( + "context" + "testing" + + "github.com/benbjohnson/clock" + "gotest.tools/v3/assert" + + "github.com/embano1/memlog" +) + +func TestNew(t *testing.T) { + t.Run("fails to create new log", func(t *testing.T) { + const ( + defaultShards = 10 + defaultStart = 0 + defaultSegSize = 10 + ) + + testCases := []struct { + name string + shards int + sharder Sharder + clock clock.Clock + start memlog.Offset // log start + segSize int + wantErr string + }{ + { + name: "fails with invalid shard count", + shards: 0, + sharder: &defaultSharder{}, + clock: clock.NewMock(), + start: defaultStart, + segSize: defaultSegSize, + wantErr: "must be greater than 1", + }, + { + name: "fails with invalid segment size", + shards: defaultShards, + sharder: &defaultSharder{}, + clock: clock.NewMock(), + start: defaultStart, + segSize: 0, + wantErr: "must be greater than 0", + }, + { + name: "fails with invalid start offset", + shards: defaultShards, + sharder: &defaultSharder{}, + clock: clock.NewMock(), + start: -10, + segSize: defaultSegSize, + wantErr: "must not be negative", + }, + { + name: "fails with invalid clock", + shards: defaultShards, + sharder: &defaultSharder{}, + clock: nil, + start: defaultStart, + segSize: defaultSegSize, + wantErr: "must not be nil", + }, + { + name: "fails with invalid sharder", + shards: defaultShards, + sharder: nil, + clock: clock.NewMock(), + start: defaultStart, + segSize: defaultSegSize, + wantErr: "must not be nil", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + opts := []Option{ + WithNumShards(uint(tc.shards)), + WithClock(tc.clock), + WithStartOffset(tc.start), + WithMaxSegmentSize(tc.segSize), + WithSharder(tc.sharder), + } + l, err := New(ctx, opts...) + assert.ErrorContains(t, err, tc.wantErr) + assert.DeepEqual(t, l, (*Log)(nil)) + }) + } + }) + + t.Run("successfully creates new log with defaults", func(t *testing.T) { + l, err := New(context.Background()) + assert.NilError(t, err) + assert.Assert(t, l.conf.startOffset == DefaultStartOffset) + assert.Assert(t, l.conf.segmentSize == DefaultSegmentSize) + assert.Assert(t, l.conf.shards == DefaultShards) + assert.Assert(t, l.conf.maxRecordSize == DefaultMaxRecordDataBytes) + assert.Assert(t, len(l.shards) == DefaultShards) + }) +} diff --git a/sharded/log_test.go b/sharded/log_test.go new file mode 100644 index 0000000..da154eb --- /dev/null +++ b/sharded/log_test.go @@ -0,0 +1,343 @@ +package sharded_test + +import ( + "context" + "encoding/json" + "errors" + "strconv" + "sync" + "sync/atomic" + "testing" + + "github.com/benbjohnson/clock" + "gotest.tools/v3/assert" + + "github.com/embano1/memlog" + "github.com/embano1/memlog/sharded" +) + +const ( + defaultShards = 10 + defaultStart = 0 + defaultSegSize = 10 +) + +func TestLog_Read_SingleRecord(t *testing.T) { + testCases := []struct { + name string + + clock clock.Clock + shards int + sharder sharded.Sharder + start memlog.Offset // log start + segSize int + records map[string][][]byte + + offset memlog.Offset // read start offset + keys []string // read keys + wantRecords int // total number records read + wantReadErr string + wantWriteErr string + }{ + { + name: "read fails with out of range error", + clock: clock.NewMock(), + shards: defaultShards, + sharder: sharded.NewKeySharder([]string{"users"}), + start: defaultStart, + segSize: defaultSegSize, + records: newTestDataMap(t, 100, "users"), + offset: 0, // purged + keys: []string{"users"}, + wantRecords: 0, + wantReadErr: "out of range", + wantWriteErr: "", + }, + { + name: "read fails with future offset error", + clock: clock.NewMock(), + shards: defaultShards, + sharder: sharded.NewKeySharder([]string{"users"}), + start: defaultStart, + segSize: defaultSegSize, + records: newTestDataMap(t, 100, "users"), + offset: 101, // future + keys: []string{"users"}, + wantRecords: 0, + wantReadErr: "future offset", + wantWriteErr: "", + }, + { + name: "read fails due to invalid offset", + clock: clock.NewMock(), + shards: defaultShards, + sharder: sharded.NewKeySharder([]string{"users"}), + start: defaultStart, + segSize: defaultSegSize, + records: newTestDataMap(t, 100, "users"), + offset: -10, + keys: []string{"users"}, + wantRecords: 0, + wantReadErr: "out of range", + wantWriteErr: "", + }, + { + name: "read fails due to non-existing shard key", + clock: clock.NewMock(), + shards: defaultShards, + sharder: sharded.NewKeySharder([]string{"users"}), + start: defaultStart, + segSize: defaultSegSize, + records: newTestDataMap(t, 100, "users"), + offset: 0, + keys: []string{"groups"}, + wantRecords: 0, + wantReadErr: "shard not found", + wantWriteErr: "", + }, + { + name: "read fails due to key shard count mismatch", + clock: clock.NewMock(), + shards: 2, // must be smaller than key count + sharder: sharded.NewKeySharder([]string{"users", "groups", "machines"}), + start: defaultStart, + segSize: defaultSegSize, + records: newTestDataMap(t, 100, "users", "groups", "machines"), + offset: 0, + keys: []string{"users", "groups", "machines"}, + wantRecords: 0, + wantReadErr: "greater than available shards", + wantWriteErr: "greater than available shards", + }, + { + name: "read succeeds, one key, start offset 0, read offset 0", + clock: clock.NewMock(), + shards: defaultShards, + sharder: sharded.NewKeySharder([]string{"users"}), + start: defaultStart, + segSize: defaultSegSize, + records: newTestDataMap(t, 10, "users"), + offset: 0, + keys: []string{"users"}, + wantRecords: 1, + wantReadErr: "", + wantWriteErr: "", + }, + { + name: "read succeeds, one key, start offset 100, read offset 100", + clock: clock.NewMock(), + shards: defaultShards, + sharder: sharded.NewKeySharder([]string{"users"}), + start: 100, + segSize: defaultSegSize, + records: newTestDataMap(t, 10, "users"), + offset: 100, + keys: []string{"users"}, + wantRecords: 1, + wantReadErr: "", + wantWriteErr: "", + }, + { + name: "read succeeds, two keys, read offset 5", + clock: clock.NewMock(), + shards: defaultShards, + sharder: sharded.NewKeySharder([]string{"users", "groups"}), + start: defaultStart, + segSize: defaultSegSize, + records: newTestDataMap(t, 10, "users", "groups"), + offset: 5, + keys: []string{"users", "groups"}, + wantRecords: 2, + wantReadErr: "", + wantWriteErr: "", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + opts := []sharded.Option{ + sharded.WithNumShards(uint(tc.shards)), + sharded.WithClock(tc.clock), + sharded.WithStartOffset(tc.start), + sharded.WithMaxSegmentSize(tc.segSize), + sharded.WithSharder(tc.sharder), + } + l, err := sharded.New(ctx, opts...) + assert.NilError(t, err) + + // seed log + for k, records := range tc.records { + for _, r := range records { + _, err := l.Write(ctx, []byte(k), r) + if tc.wantWriteErr != "" { + assert.ErrorContains(t, err, tc.wantWriteErr) + } else { + assert.NilError(t, err) + } + } + } + + records := 0 + for _, k := range tc.keys { + r, err := l.Read(ctx, []byte(k), tc.offset) + if tc.wantReadErr != "" { + assert.ErrorContains(t, err, tc.wantReadErr) + } else { + assert.NilError(t, err) + } + + if !r.Metadata.Created.IsZero() { + // valid record + records++ + } + } + + assert.Equal(t, records, tc.wantRecords) + }) + } +} + +func TestLog_Read_AllRecords_Concurrent(t *testing.T) { + keys := []string{"users", "groups", "machines", "clusters", "datacenters"} + + ctx := context.Background() + opts := []sharded.Option{ + sharded.WithNumShards(uint(defaultShards)), + sharded.WithStartOffset(defaultStart), + sharded.WithMaxSegmentSize(defaultSegSize), + sharded.WithSharder(sharded.NewKeySharder(keys)), + } + l, err := sharded.New(ctx, opts...) + assert.NilError(t, err) + + // seed log + data := newTestDataMap(t, defaultSegSize, keys...) + for k, records := range data { + for _, r := range records { + _, err := l.Write(ctx, []byte(k), r) + assert.NilError(t, err) + } + } + + var ( + got int32 + wg sync.WaitGroup + ) + + for _, k := range keys { + wg.Add(1) + go func(key string) { + defer wg.Done() + + offset := memlog.Offset(defaultStart) + for { + r, err := l.Read(ctx, []byte(key), offset) + if err != nil { + if errors.Is(err, memlog.ErrFutureOffset) { + break + } + } + assert.NilError(t, err) // catches all err cases + assert.Equal(t, r.Metadata.Offset, offset) + + atomic.AddInt32(&got, 1) + offset++ + } + }(k) + } + + wg.Wait() + + want := len(keys) * defaultSegSize + assert.Equal(t, got, int32(want)) +} + +func TestLog_Read_AllRecords(t *testing.T) { + keys := []string{"friends", "family", "colleagues"} + + ctx := context.Background() + opts := []sharded.Option{ + sharded.WithNumShards(uint(defaultShards)), + sharded.WithStartOffset(defaultStart), + sharded.WithMaxSegmentSize(defaultSegSize), + } + + // uses default sharder + l, err := sharded.New(ctx, opts...) + assert.NilError(t, err) + + // seed log + data := newTestDataMap(t, defaultSegSize, keys...) + for k, records := range data { + for _, r := range records { + _, err := l.Write(ctx, []byte(k), r) + assert.NilError(t, err) + } + } + + var got int + for _, k := range keys { + offset := memlog.Offset(defaultStart) + for { + r, err := l.Read(ctx, []byte(k), offset) + if err != nil { + if errors.Is(err, memlog.ErrFutureOffset) { + break + } + } + assert.NilError(t, err) // catches all err cases + assert.Assert(t, r.Metadata.Created.IsZero() == false) // must be valid record + assert.Equal(t, r.Metadata.Offset, offset) + + hasKey := func(key string) bool { + // matches testdata + d := struct { + Key string `json:"key,omitempty"` + }{} + + err := json.Unmarshal(r.Data, &d) + assert.NilError(t, err) + return d.Key == key + } + + // default sharder is hash-based so a shard (memlog.Log) could contain multiple + // keys, only count unique items + if hasKey(k) { + got++ + } + offset++ + } + } + + want := len(keys) * defaultSegSize + assert.Equal(t, got, want) +} + +func newTestData(t *testing.T, id, key string) []byte { + r := map[string]string{ + "id": id, + "key": key, + "type": "record.created.event.v0", + "source": "/api/v1/memlog_test", + } + + b, err := json.Marshal(r) + assert.NilError(t, err) + + return b +} + +// map of key/records, creates "count" records per key +func newTestDataMap(t *testing.T, count int, keys ...string) map[string][][]byte { + t.Helper() + + records := make(map[string][][]byte) + for i := 0; i < count; i++ { + for _, k := range keys { + records[k] = append(records[k], newTestData(t, strconv.Itoa(i+1), k)) + } + } + + return records +} diff --git a/sharded/options.go b/sharded/options.go new file mode 100644 index 0000000..f7991e1 --- /dev/null +++ b/sharded/options.go @@ -0,0 +1,103 @@ +package sharded + +import ( + "errors" + + "github.com/benbjohnson/clock" + + "github.com/embano1/memlog" +) + +const ( + // DefaultShards is the number of shards unless specified otherwise + DefaultShards = 1000 + // DefaultStartOffset is the start offset in a shard + DefaultStartOffset = memlog.DefaultStartOffset + // DefaultSegmentSize is the segment size, i.e. number of offsets, in a shard + DefaultSegmentSize = memlog.DefaultSegmentSize + // DefaultMaxRecordDataBytes is the maximum data (payload) size of a record in a shard + DefaultMaxRecordDataBytes = memlog.DefaultMaxRecordDataBytes +) + +// Option customizes a log +type Option func(*Log) error + +var defaultOptions = []Option{ + WithClock(clock.New()), + WithMaxRecordDataSize(DefaultMaxRecordDataBytes), + WithMaxSegmentSize(DefaultSegmentSize), + WithNumShards(DefaultShards), + WithSharder(newDefaultSharder()), + WithStartOffset(DefaultStartOffset), +} + +// WithClock uses the specified clock for setting record timestamps +func WithClock(c clock.Clock) Option { + return func(log *Log) error { + if c == nil { + return errors.New("clock must not be nil") + } + + log.clock = c + return nil + } +} + +// WithMaxRecordDataSize sets the maximum record data (payload) size in bytes in +// each shard +func WithMaxRecordDataSize(size int) Option { + return func(log *Log) error { + if size <= 0 { + return errors.New("size must be greater than 0") + } + log.conf.maxRecordSize = size + return nil + } +} + +// WithMaxSegmentSize sets the maximum size, i.e. number of offsets, in each shard. +// Must be greater than 0. +func WithMaxSegmentSize(size int) Option { + return func(log *Log) error { + if size <= 0 { + return errors.New("size must be greater than 0") + } + log.conf.segmentSize = size + return nil + } +} + +// WithNumShards sets the number of shards in a log +func WithNumShards(n uint) Option { + return func(log *Log) error { + // sharded log with < 2 shards is a standard log + if n < 2 { + return errors.New("number of shards must be greater than 1") + } + log.conf.shards = n + return nil + } +} + +// WithSharder uses the specified sharder for key sharding +func WithSharder(s Sharder) Option { + return func(log *Log) error { + if s == nil { + return errors.New("sharder must not be nil") + } + log.sharder = s + return nil + } +} + +// WithStartOffset sets the start offset of each shard. Must be equal or greater +// than 0. +func WithStartOffset(offset memlog.Offset) Option { + return func(log *Log) error { + if offset < 0 { + return errors.New("start offset must not be negative") + } + log.conf.startOffset = offset + return nil + } +} diff --git a/sharded/shard.go b/sharded/shard.go new file mode 100644 index 0000000..a08e442 --- /dev/null +++ b/sharded/shard.go @@ -0,0 +1,87 @@ +package sharded + +import ( + "errors" + "fmt" + "hash" + "hash/fnv" + "sync" +) + +// Sharder returns a shard for the specified key and number of shards in the +// log. The number of shards specified is expected to match the number of shards +// during log creation to avoid undefined behavior. +type Sharder interface { + Shard(key []byte, shards uint) (uint, error) +} + +type defaultSharder struct { + sync.Mutex + hash32 hash.Hash32 +} + +func newDefaultSharder() *defaultSharder { + return &defaultSharder{ + hash32: fnv.New32a(), + } +} + +func (d *defaultSharder) Shard(key []byte, shards uint) (uint, error) { + h, err := d.hash(key) + if err != nil { + return 0, fmt.Errorf("hash key: %w", err) + } + + shard := int32(h) % int32(shards) + if shard < 0 { + shard = -shard + } + return uint(shard), nil +} + +func (d *defaultSharder) hash(key []byte) (uint32, error) { + d.Lock() + defer d.Unlock() + + d.hash32.Reset() + _, err := d.hash32.Write(key) + if err != nil { + return 0, err + } + + return d.hash32.Sum32(), nil +} + +// KeySharder assigns a shard per unique key +type KeySharder struct { + mu sync.RWMutex + shards map[string]uint +} + +// NewKeySharder creates a new key-based Sharder, assigning a shard to each +// unique key. The caller must ensure that there are at least len(keys) shards +// available in the log. +func NewKeySharder(keys []string) *KeySharder { + ks := KeySharder{shards: map[string]uint{}} + for shard, key := range keys { + ks.shards[key] = uint(shard) + } + + return &ks +} + +// Shard implements Sharder interface +func (k *KeySharder) Shard(key []byte, shards uint) (uint, error) { + k.mu.RLock() + defer k.mu.RUnlock() + + if len(k.shards) > int(shards) { + return 0, fmt.Errorf("number of keys greater than available shards") + } + + if s, ok := k.shards[string(key)]; ok { + return s, nil + } + + return 0, errors.New("shard not found") +} diff --git a/stream_test.go b/stream_test.go index bb54261..126e500 100644 --- a/stream_test.go +++ b/stream_test.go @@ -37,8 +37,6 @@ func TestLog_Stream(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - t.Parallel() - testData := NewTestDataSlice(t, tc.segSize) opts := []Option{ WithStartOffset(tc.logStart), @@ -119,8 +117,6 @@ func TestLog_Stream(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - t.Parallel() - testData := NewTestDataSlice(t, tc.writeRecords) opts := []Option{ WithStartOffset(tc.logStart), @@ -190,8 +186,6 @@ func TestLog_Stream(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - t.Parallel() - testData := NewTestDataSlice(t, tc.writeRecords) opts := []Option{ WithStartOffset(tc.logStart), @@ -247,8 +241,6 @@ func TestLog_Stream(t *testing.T) { }) t.Run("two stream receivers, starting at different offsets until stream cancelled", func(t *testing.T) { - t.Parallel() - const ( logStart = Offset(0) segSize = 1000