Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #22 from embano1/issue-10
Add `sharded.Log`
- Loading branch information
Showing
11 changed files
with
981 additions
and
10 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
# About | ||
|
||
## tl;dr | ||
|
||
A purpose-built implementation of `memlog.Log` with support for sharding | ||
(partitioning) `Records` by `key`. | ||
|
||
# Usage | ||
|
||
The `sharded.Log` is built on `memlog.Log` and provides a similar API for | ||
reading and writing data (`Records`), `Offset` handling, etc. | ||
|
||
The `Read()` and `Write()` methods accept a sharding `key` to distribute the | ||
`Records` based on a (configurable) sharding strategy. | ||
|
||
Unless specified otherwise, the default sharding strategy uses Golang's | ||
[`fnv.New32a`](https://pkg.go.dev/hash/fnv#New32a) to retrieve a hash and find | ||
the corresponding `Shard` using a *modulo* operation based on the number of | ||
(configurable) `Shards` in the `Log`. | ||
|
||
💡 Depending on the number of `Shards`, number of distinct `keys` and their | ||
*hashes*, multiple `keys` might be stored in the same `Shard`. If strict `key` | ||
separation is required, a custom `Sharder` can be implemented. For convenience, | ||
a `KeySharder` is provided. | ||
|
||
See [pkg.go.dev](https://pkg.go.dev/github.com/embano1/memlog/sharded) for the | ||
API reference and examples. | ||
|
||
## Example | ||
|
||
```go | ||
package main | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"os" | ||
|
||
"github.com/embano1/memlog" | ||
"github.com/embano1/memlog/sharded" | ||
) | ||
|
||
func main() { | ||
ctx := context.Background() | ||
|
||
// the default number of shards (1000) is sufficient to assign a shard per key | ||
// for this example (i.e. no key overlap within a shard) | ||
l, err := sharded.New(ctx) | ||
if err != nil { | ||
fmt.Printf("create log: %v", err) | ||
os.Exit(1) | ||
} | ||
|
||
data := map[string][]string{ | ||
"users": {"tom", "sarah", "ajit"}, | ||
"groups": {"friends", "family", "colleagues"}, | ||
} | ||
|
||
for key, vals := range data { | ||
for _, val := range vals { | ||
_, err := l.Write(ctx, []byte(key), []byte(val)) | ||
if err != nil { | ||
fmt.Printf("write: %v", err) | ||
os.Exit(1) | ||
} | ||
} | ||
} | ||
|
||
fmt.Println("reading all users...") | ||
offset := memlog.Offset(0) | ||
for { | ||
record, err := l.Read(ctx, []byte("users"), offset) | ||
if err != nil { | ||
if errors.Is(err, memlog.ErrFutureOffset) { | ||
break // end of log | ||
} | ||
fmt.Printf("read: %v", err) | ||
os.Exit(1) | ||
} | ||
|
||
fmt.Printf("- %s\n", string(record.Data)) | ||
offset++ | ||
} | ||
|
||
// Output: reading all users... | ||
// - tom | ||
// - sarah | ||
// - ajit | ||
} | ||
|
||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
package sharded_test | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"os" | ||
|
||
"github.com/embano1/memlog" | ||
"github.com/embano1/memlog/sharded" | ||
) | ||
|
||
func Example_sharder() { | ||
ctx := context.Background() | ||
|
||
keys := []string{"galaxies", "planets"} | ||
ks := sharded.NewKeySharder(keys) | ||
|
||
opts := []sharded.Option{ | ||
sharded.WithNumShards(2), // must be >=len(keys) | ||
sharded.WithSharder(ks), | ||
} | ||
l, err := sharded.New(ctx, opts...) | ||
if err != nil { | ||
fmt.Printf("create log: %v", err) | ||
os.Exit(1) | ||
} | ||
|
||
data := map[string][]string{ | ||
keys[0]: {"Centaurus A", "Andromeda", "Eye of Sauron"}, | ||
keys[1]: {"Mercury", "Venus", "Earth", "Mars", "Jupiter", "Saturn", "Uranus", "Neptune"}, | ||
} | ||
|
||
for key, vals := range data { | ||
for _, val := range vals { | ||
_, err := l.Write(ctx, []byte(key), []byte(val)) | ||
if err != nil { | ||
fmt.Printf("write: %v", err) | ||
os.Exit(1) | ||
} | ||
} | ||
} | ||
|
||
KEYS: | ||
for _, key := range keys { | ||
fmt.Printf("reading all %s...\n", key) | ||
|
||
offset := memlog.Offset(0) | ||
for { | ||
read, err := l.Read(ctx, []byte(key), offset) | ||
if err != nil { | ||
if errors.Is(err, memlog.ErrFutureOffset) { | ||
fmt.Println() | ||
continue KEYS | ||
} | ||
fmt.Printf("read: %v", err) | ||
os.Exit(1) | ||
} | ||
|
||
fmt.Printf("- %s\n", string(read.Data)) | ||
offset++ | ||
} | ||
|
||
} | ||
|
||
// Output: reading all galaxies... | ||
// - Centaurus A | ||
// - Andromeda | ||
// - Eye of Sauron | ||
// | ||
// reading all planets... | ||
// - Mercury | ||
// - Venus | ||
// - Earth | ||
// - Mars | ||
// - Jupiter | ||
// - Saturn | ||
// - Uranus | ||
// - Neptune | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
package sharded_test | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"os" | ||
|
||
"github.com/embano1/memlog" | ||
"github.com/embano1/memlog/sharded" | ||
) | ||
|
||
func Example() { | ||
ctx := context.Background() | ||
|
||
// the default number of shards (1000) is sufficient to assign a shard per key | ||
// for this example (i.e. no key overlap within a shard) | ||
l, err := sharded.New(ctx) | ||
if err != nil { | ||
fmt.Printf("create log: %v", err) | ||
os.Exit(1) | ||
} | ||
|
||
data := map[string][]string{ | ||
"users": {"tom", "sarah", "ajit"}, | ||
"groups": {"friends", "family", "colleagues"}, | ||
} | ||
|
||
for key, vals := range data { | ||
for _, val := range vals { | ||
_, err := l.Write(ctx, []byte(key), []byte(val)) | ||
if err != nil { | ||
fmt.Printf("write: %v", err) | ||
os.Exit(1) | ||
} | ||
} | ||
} | ||
|
||
fmt.Println("reading all users...") | ||
offset := memlog.Offset(0) | ||
for { | ||
record, err := l.Read(ctx, []byte("users"), offset) | ||
if err != nil { | ||
if errors.Is(err, memlog.ErrFutureOffset) { | ||
break // end of log | ||
} | ||
fmt.Printf("read: %v", err) | ||
os.Exit(1) | ||
} | ||
|
||
fmt.Printf("- %s\n", string(record.Data)) | ||
offset++ | ||
} | ||
|
||
// Output: reading all users... | ||
// - tom | ||
// - sarah | ||
// - ajit | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
package sharded | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
|
||
"github.com/benbjohnson/clock" | ||
|
||
"github.com/embano1/memlog" | ||
) | ||
|
||
type config struct { | ||
shards uint | ||
|
||
// memlog.Log settings | ||
startOffset memlog.Offset | ||
segmentSize int // offsets per segment | ||
maxRecordSize int // bytes | ||
} | ||
|
||
// Log is a sharded log implementation on top of memlog.Log. It uses a | ||
// configurable sharding strategy (see Sharder interface) during reads and | ||
// writes. | ||
type Log struct { | ||
sharder Sharder | ||
clock clock.Clock | ||
conf config | ||
shards []*memlog.Log | ||
} | ||
|
||
// New creates a new sharded log which can be customized with options. If not | ||
// specified, the default sharding strategy uses fnv.New32a for key hashing. | ||
func New(ctx context.Context, options ...Option) (*Log, error) { | ||
var l Log | ||
|
||
// apply defaults | ||
for _, opt := range defaultOptions { | ||
if err := opt(&l); err != nil { | ||
return nil, fmt.Errorf("configure log default option: %v", err) | ||
} | ||
} | ||
|
||
// apply custom settings | ||
for _, opt := range options { | ||
if err := opt(&l); err != nil { | ||
return nil, fmt.Errorf("configure log custom option: %v", err) | ||
} | ||
} | ||
|
||
shards := l.conf.shards | ||
l.shards = make([]*memlog.Log, shards) | ||
opts := []memlog.Option{ | ||
memlog.WithClock(l.clock), | ||
memlog.WithMaxRecordDataSize(l.conf.maxRecordSize), | ||
memlog.WithStartOffset(l.conf.startOffset), | ||
memlog.WithMaxSegmentSize(l.conf.segmentSize), | ||
} | ||
|
||
for i := 0; i < int(shards); i++ { | ||
ml, err := memlog.New(ctx, opts...) | ||
if err != nil { | ||
return nil, fmt.Errorf("create shard: %w", err) | ||
} | ||
l.shards[i] = ml | ||
} | ||
|
||
return &l, nil | ||
} | ||
|
||
// Write writes data to the log using the specified key for sharding | ||
func (l *Log) Write(ctx context.Context, key []byte, data []byte) (memlog.Offset, error) { | ||
if key == nil { | ||
return -1, errors.New("invalid key") | ||
} | ||
|
||
shard, err := l.sharder.Shard(key, l.conf.shards) | ||
if err != nil { | ||
return -1, fmt.Errorf("get shard: %w", err) | ||
} | ||
|
||
offset, err := l.shards[shard].Write(ctx, data) | ||
if err != nil { | ||
return offset, fmt.Errorf("write to shard: %w", err) | ||
} | ||
|
||
return offset, nil | ||
} | ||
|
||
// Read reads a record from the log at offset using the specified key for shard | ||
// lookup | ||
func (l *Log) Read(ctx context.Context, key []byte, offset memlog.Offset) (memlog.Record, error) { | ||
if key == nil { | ||
return memlog.Record{}, errors.New("invalid key") | ||
} | ||
|
||
shard, err := l.sharder.Shard(key, l.conf.shards) | ||
if err != nil { | ||
return memlog.Record{}, fmt.Errorf("get shard: %w", err) | ||
} | ||
|
||
r, err := l.shards[shard].Read(ctx, offset) | ||
if err != nil { | ||
return memlog.Record{}, fmt.Errorf("read from shard: %w", err) | ||
} | ||
|
||
return r, nil | ||
} |
Oops, something went wrong.