Skip to content

Commit

Permalink
feat: Add sharded log
Browse files Browse the repository at this point in the history
Closes: #10
Signed-off-by: Michael Gasch <mgasch@vmware.com>
  • Loading branch information
Michael Gasch committed Feb 7, 2022
1 parent d5b2fcd commit c03dec5
Show file tree
Hide file tree
Showing 11 changed files with 981 additions and 10 deletions.
6 changes: 5 additions & 1 deletion README.md
Expand Up @@ -21,6 +21,7 @@ structure modeled as a *Log*.
read more about the ideas behind `memlog` please see ["The Log: What every
software engineer should know about real-time data's unifying
abstraction"](https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying).

## Motivation

I keep hitting the same user story (use case) over and over again: one or more
Expand Down Expand Up @@ -117,9 +118,12 @@ One is not constrained by just creating **one** `Log`. For certain use cases,
creating multiple `Logs` might be useful. For example:

- Manage completely different data sets/sizes in the same process
- Partitioning input data by type or *key*
- Setting different `Log` sizes (i.e. retention times), e.g. premium users will
have access to a larger *history* of `Records`
- Partitioning input data by type or *key*

💡 For use cases where you want to order the log by `key(s)`, consider using the
specialised [`sharded.Log`](sharded/README.md).

## Example

Expand Down
2 changes: 1 addition & 1 deletion options.go
Expand Up @@ -9,7 +9,7 @@ import (
const (
// DefaultStartOffset is the start offset of the log
DefaultStartOffset = Offset(0)
// DefaultSegmentSize is the segment size of the log
// DefaultSegmentSize is the segment size, i.e. number of offsets, in the log
DefaultSegmentSize = 1024
// DefaultMaxRecordDataBytes is the maximum data (payload) size of a record
DefaultMaxRecordDataBytes = 1024 << 10 // 1MiB
Expand Down
92 changes: 92 additions & 0 deletions sharded/README.md
@@ -0,0 +1,92 @@
# About

## tl;dr

A purpose-built implementation of `memlog.Log` with support for sharding
(partitioning) `Records` by `key`.

# Usage

The `sharded.Log` is built on `memlog.Log` and provides a similar API for
reading and writing data (`Records`), `Offset` handling, etc.

The `Read()` and `Write()` methods accept a sharding `key` to distribute the
`Records` based on a (configurable) sharding strategy.

Unless specified otherwise, the default sharding strategy uses Golang's
[`fnv.New32a`](https://pkg.go.dev/hash/fnv#New32a) to retrieve a hash and find
the corresponding `Shard` using a *modulo* operation based on the number of
(configurable) `Shards` in the `Log`.

💡 Depending on the number of `Shards`, number of distinct `keys` and their
*hashes*, multiple `keys` might be stored in the same `Shard`. If strict `key`
separation is required, a custom `Sharder` can be implemented. For convenience,
a `KeySharder` is provided.

See [pkg.go.dev](https://pkg.go.dev/github.com/embano1/memlog/sharded) for the
API reference and examples.

## Example

```go
package main

import (
"context"
"errors"
"fmt"
"os"

"github.com/embano1/memlog"
"github.com/embano1/memlog/sharded"
)

func main() {
ctx := context.Background()

// the default number of shards (1000) is sufficient to assign a shard per key
// for this example (i.e. no key overlap within a shard)
l, err := sharded.New(ctx)
if err != nil {
fmt.Printf("create log: %v", err)
os.Exit(1)
}

data := map[string][]string{
"users": {"tom", "sarah", "ajit"},
"groups": {"friends", "family", "colleagues"},
}

for key, vals := range data {
for _, val := range vals {
_, err := l.Write(ctx, []byte(key), []byte(val))
if err != nil {
fmt.Printf("write: %v", err)
os.Exit(1)
}
}
}

fmt.Println("reading all users...")
offset := memlog.Offset(0)
for {
record, err := l.Read(ctx, []byte("users"), offset)
if err != nil {
if errors.Is(err, memlog.ErrFutureOffset) {
break // end of log
}
fmt.Printf("read: %v", err)
os.Exit(1)
}

fmt.Printf("- %s\n", string(record.Data))
offset++
}

// Output: reading all users...
// - tom
// - sarah
// - ajit
}

```
80 changes: 80 additions & 0 deletions sharded/example_sharder_test.go
@@ -0,0 +1,80 @@
package sharded_test

import (
"context"
"errors"
"fmt"
"os"

"github.com/embano1/memlog"
"github.com/embano1/memlog/sharded"
)

func Example_sharder() {
ctx := context.Background()

keys := []string{"galaxies", "planets"}
ks := sharded.NewKeySharder(keys)

opts := []sharded.Option{
sharded.WithNumShards(2), // must be >=len(keys)
sharded.WithSharder(ks),
}
l, err := sharded.New(ctx, opts...)
if err != nil {
fmt.Printf("create log: %v", err)
os.Exit(1)
}

data := map[string][]string{
keys[0]: {"Centaurus A", "Andromeda", "Eye of Sauron"},
keys[1]: {"Mercury", "Venus", "Earth", "Mars", "Jupiter", "Saturn", "Uranus", "Neptune"},
}

for key, vals := range data {
for _, val := range vals {
_, err := l.Write(ctx, []byte(key), []byte(val))
if err != nil {
fmt.Printf("write: %v", err)
os.Exit(1)
}
}
}

KEYS:
for _, key := range keys {
fmt.Printf("reading all %s...\n", key)

offset := memlog.Offset(0)
for {
read, err := l.Read(ctx, []byte(key), offset)
if err != nil {
if errors.Is(err, memlog.ErrFutureOffset) {
fmt.Println()
continue KEYS
}
fmt.Printf("read: %v", err)
os.Exit(1)
}

fmt.Printf("- %s\n", string(read.Data))
offset++
}

}

// Output: reading all galaxies...
// - Centaurus A
// - Andromeda
// - Eye of Sauron
//
// reading all planets...
// - Mercury
// - Venus
// - Earth
// - Mars
// - Jupiter
// - Saturn
// - Uranus
// - Neptune
}
59 changes: 59 additions & 0 deletions sharded/example_test.go
@@ -0,0 +1,59 @@
package sharded_test

import (
"context"
"errors"
"fmt"
"os"

"github.com/embano1/memlog"
"github.com/embano1/memlog/sharded"
)

func Example() {
ctx := context.Background()

// the default number of shards (1000) is sufficient to assign a shard per key
// for this example (i.e. no key overlap within a shard)
l, err := sharded.New(ctx)
if err != nil {
fmt.Printf("create log: %v", err)
os.Exit(1)
}

data := map[string][]string{
"users": {"tom", "sarah", "ajit"},
"groups": {"friends", "family", "colleagues"},
}

for key, vals := range data {
for _, val := range vals {
_, err := l.Write(ctx, []byte(key), []byte(val))
if err != nil {
fmt.Printf("write: %v", err)
os.Exit(1)
}
}
}

fmt.Println("reading all users...")
offset := memlog.Offset(0)
for {
record, err := l.Read(ctx, []byte("users"), offset)
if err != nil {
if errors.Is(err, memlog.ErrFutureOffset) {
break // end of log
}
fmt.Printf("read: %v", err)
os.Exit(1)
}

fmt.Printf("- %s\n", string(record.Data))
offset++
}

// Output: reading all users...
// - tom
// - sarah
// - ajit
}
108 changes: 108 additions & 0 deletions sharded/log.go
@@ -0,0 +1,108 @@
package sharded

import (
"context"
"errors"
"fmt"

"github.com/benbjohnson/clock"

"github.com/embano1/memlog"
)

type config struct {
shards uint

// memlog.Log settings
startOffset memlog.Offset
segmentSize int // offsets per segment
maxRecordSize int // bytes
}

// Log is a sharded log implementation on top of memlog.Log. It uses a
// configurable sharding strategy (see Sharder interface) during reads and
// writes.
type Log struct {
sharder Sharder
clock clock.Clock
conf config
shards []*memlog.Log
}

// New creates a new sharded log which can be customized with options. If not
// specified, the default sharding strategy uses fnv.New32a for key hashing.
func New(ctx context.Context, options ...Option) (*Log, error) {
var l Log

// apply defaults
for _, opt := range defaultOptions {
if err := opt(&l); err != nil {
return nil, fmt.Errorf("configure log default option: %v", err)
}
}

// apply custom settings
for _, opt := range options {
if err := opt(&l); err != nil {
return nil, fmt.Errorf("configure log custom option: %v", err)
}
}

shards := l.conf.shards
l.shards = make([]*memlog.Log, shards)
opts := []memlog.Option{
memlog.WithClock(l.clock),
memlog.WithMaxRecordDataSize(l.conf.maxRecordSize),
memlog.WithStartOffset(l.conf.startOffset),
memlog.WithMaxSegmentSize(l.conf.segmentSize),
}

for i := 0; i < int(shards); i++ {
ml, err := memlog.New(ctx, opts...)
if err != nil {
return nil, fmt.Errorf("create shard: %w", err)
}
l.shards[i] = ml
}

return &l, nil
}

// Write writes data to the log using the specified key for sharding
func (l *Log) Write(ctx context.Context, key []byte, data []byte) (memlog.Offset, error) {
if key == nil {
return -1, errors.New("invalid key")
}

shard, err := l.sharder.Shard(key, l.conf.shards)
if err != nil {
return -1, fmt.Errorf("get shard: %w", err)
}

offset, err := l.shards[shard].Write(ctx, data)
if err != nil {
return offset, fmt.Errorf("write to shard: %w", err)
}

return offset, nil
}

// Read reads a record from the log at offset using the specified key for shard
// lookup
func (l *Log) Read(ctx context.Context, key []byte, offset memlog.Offset) (memlog.Record, error) {
if key == nil {
return memlog.Record{}, errors.New("invalid key")
}

shard, err := l.sharder.Shard(key, l.conf.shards)
if err != nil {
return memlog.Record{}, fmt.Errorf("get shard: %w", err)
}

r, err := l.shards[shard].Read(ctx, offset)
if err != nil {
return memlog.Record{}, fmt.Errorf("read from shard: %w", err)
}

return r, nil
}

0 comments on commit c03dec5

Please sign in to comment.