Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sharded.Log #22

Merged
merged 1 commit into from Feb 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion README.md
Expand Up @@ -21,6 +21,7 @@ structure modeled as a *Log*.
read more about the ideas behind `memlog` please see ["The Log: What every
software engineer should know about real-time data's unifying
abstraction"](https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying).

## Motivation

I keep hitting the same user story (use case) over and over again: one or more
Expand Down Expand Up @@ -117,9 +118,12 @@ One is not constrained by just creating **one** `Log`. For certain use cases,
creating multiple `Logs` might be useful. For example:

- Manage completely different data sets/sizes in the same process
- Partitioning input data by type or *key*
- Setting different `Log` sizes (i.e. retention times), e.g. premium users will
have access to a larger *history* of `Records`
- Partitioning input data by type or *key*

💡 For use cases where you want to order the log by `key(s)`, consider using the
specialised [`sharded.Log`](sharded/README.md).

## Example

Expand Down
2 changes: 1 addition & 1 deletion options.go
Expand Up @@ -9,7 +9,7 @@ import (
const (
// DefaultStartOffset is the start offset of the log
DefaultStartOffset = Offset(0)
// DefaultSegmentSize is the segment size of the log
// DefaultSegmentSize is the segment size, i.e. number of offsets, in the log
DefaultSegmentSize = 1024
// DefaultMaxRecordDataBytes is the maximum data (payload) size of a record
DefaultMaxRecordDataBytes = 1024 << 10 // 1MiB
Expand Down
92 changes: 92 additions & 0 deletions sharded/README.md
@@ -0,0 +1,92 @@
# About

## tl;dr

A purpose-built implementation of `memlog.Log` with support for sharding
(partitioning) `Records` by `key`.

# Usage

The `sharded.Log` is built on `memlog.Log` and provides a similar API for
reading and writing data (`Records`), `Offset` handling, etc.

The `Read()` and `Write()` methods accept a sharding `key` to distribute the
`Records` based on a (configurable) sharding strategy.

Unless specified otherwise, the default sharding strategy uses Golang's
[`fnv.New32a`](https://pkg.go.dev/hash/fnv#New32a) to retrieve a hash and find
the corresponding `Shard` using a *modulo* operation based on the number of
(configurable) `Shards` in the `Log`.

💡 Depending on the number of `Shards`, number of distinct `keys` and their
*hashes*, multiple `keys` might be stored in the same `Shard`. If strict `key`
separation is required, a custom `Sharder` can be implemented. For convenience,
a `KeySharder` is provided.

See [pkg.go.dev](https://pkg.go.dev/github.com/embano1/memlog/sharded) for the
API reference and examples.

## Example

```go
package main

import (
"context"
"errors"
"fmt"
"os"

"github.com/embano1/memlog"
"github.com/embano1/memlog/sharded"
)

func main() {
ctx := context.Background()

// the default number of shards (1000) is sufficient to assign a shard per key
// for this example (i.e. no key overlap within a shard)
l, err := sharded.New(ctx)
if err != nil {
fmt.Printf("create log: %v", err)
os.Exit(1)
}

data := map[string][]string{
"users": {"tom", "sarah", "ajit"},
"groups": {"friends", "family", "colleagues"},
}

for key, vals := range data {
for _, val := range vals {
_, err := l.Write(ctx, []byte(key), []byte(val))
if err != nil {
fmt.Printf("write: %v", err)
os.Exit(1)
}
}
}

fmt.Println("reading all users...")
offset := memlog.Offset(0)
for {
record, err := l.Read(ctx, []byte("users"), offset)
if err != nil {
if errors.Is(err, memlog.ErrFutureOffset) {
break // end of log
}
fmt.Printf("read: %v", err)
os.Exit(1)
}

fmt.Printf("- %s\n", string(record.Data))
offset++
}

// Output: reading all users...
// - tom
// - sarah
// - ajit
}

```
80 changes: 80 additions & 0 deletions sharded/example_sharder_test.go
@@ -0,0 +1,80 @@
package sharded_test

import (
"context"
"errors"
"fmt"
"os"

"github.com/embano1/memlog"
"github.com/embano1/memlog/sharded"
)

func Example_sharder() {
ctx := context.Background()

keys := []string{"galaxies", "planets"}
ks := sharded.NewKeySharder(keys)

opts := []sharded.Option{
sharded.WithNumShards(2), // must be >=len(keys)
sharded.WithSharder(ks),
}
l, err := sharded.New(ctx, opts...)
if err != nil {
fmt.Printf("create log: %v", err)
os.Exit(1)
}

data := map[string][]string{
keys[0]: {"Centaurus A", "Andromeda", "Eye of Sauron"},
keys[1]: {"Mercury", "Venus", "Earth", "Mars", "Jupiter", "Saturn", "Uranus", "Neptune"},
}

for key, vals := range data {
for _, val := range vals {
_, err := l.Write(ctx, []byte(key), []byte(val))
if err != nil {
fmt.Printf("write: %v", err)
os.Exit(1)
}
}
}

KEYS:
for _, key := range keys {
fmt.Printf("reading all %s...\n", key)

offset := memlog.Offset(0)
for {
read, err := l.Read(ctx, []byte(key), offset)
if err != nil {
if errors.Is(err, memlog.ErrFutureOffset) {
fmt.Println()
continue KEYS
}
fmt.Printf("read: %v", err)
os.Exit(1)
}

fmt.Printf("- %s\n", string(read.Data))
offset++
}

}

// Output: reading all galaxies...
// - Centaurus A
// - Andromeda
// - Eye of Sauron
//
// reading all planets...
// - Mercury
// - Venus
// - Earth
// - Mars
// - Jupiter
// - Saturn
// - Uranus
// - Neptune
}
59 changes: 59 additions & 0 deletions sharded/example_test.go
@@ -0,0 +1,59 @@
package sharded_test

import (
"context"
"errors"
"fmt"
"os"

"github.com/embano1/memlog"
"github.com/embano1/memlog/sharded"
)

func Example() {
ctx := context.Background()

// the default number of shards (1000) is sufficient to assign a shard per key
// for this example (i.e. no key overlap within a shard)
l, err := sharded.New(ctx)
if err != nil {
fmt.Printf("create log: %v", err)
os.Exit(1)
}

data := map[string][]string{
"users": {"tom", "sarah", "ajit"},
"groups": {"friends", "family", "colleagues"},
}

for key, vals := range data {
for _, val := range vals {
_, err := l.Write(ctx, []byte(key), []byte(val))
if err != nil {
fmt.Printf("write: %v", err)
os.Exit(1)
}
}
}

fmt.Println("reading all users...")
offset := memlog.Offset(0)
for {
record, err := l.Read(ctx, []byte("users"), offset)
if err != nil {
if errors.Is(err, memlog.ErrFutureOffset) {
break // end of log
}
fmt.Printf("read: %v", err)
os.Exit(1)
}

fmt.Printf("- %s\n", string(record.Data))
offset++
}

// Output: reading all users...
// - tom
// - sarah
// - ajit
}
108 changes: 108 additions & 0 deletions sharded/log.go
@@ -0,0 +1,108 @@
package sharded

import (
"context"
"errors"
"fmt"

"github.com/benbjohnson/clock"

"github.com/embano1/memlog"
)

type config struct {
shards uint

// memlog.Log settings
startOffset memlog.Offset
segmentSize int // offsets per segment
maxRecordSize int // bytes
}

// Log is a sharded log implementation on top of memlog.Log. It uses a
// configurable sharding strategy (see Sharder interface) during reads and
// writes.
type Log struct {
sharder Sharder
clock clock.Clock
conf config
shards []*memlog.Log
}

// New creates a new sharded log which can be customized with options. If not
// specified, the default sharding strategy uses fnv.New32a for key hashing.
func New(ctx context.Context, options ...Option) (*Log, error) {
var l Log

// apply defaults
for _, opt := range defaultOptions {
if err := opt(&l); err != nil {
return nil, fmt.Errorf("configure log default option: %v", err)
}
}

// apply custom settings
for _, opt := range options {
if err := opt(&l); err != nil {
return nil, fmt.Errorf("configure log custom option: %v", err)
}
}

shards := l.conf.shards
l.shards = make([]*memlog.Log, shards)
opts := []memlog.Option{
memlog.WithClock(l.clock),
memlog.WithMaxRecordDataSize(l.conf.maxRecordSize),
memlog.WithStartOffset(l.conf.startOffset),
memlog.WithMaxSegmentSize(l.conf.segmentSize),
}

for i := 0; i < int(shards); i++ {
ml, err := memlog.New(ctx, opts...)
if err != nil {
return nil, fmt.Errorf("create shard: %w", err)
}
l.shards[i] = ml
}

return &l, nil
}

// Write writes data to the log using the specified key for sharding
func (l *Log) Write(ctx context.Context, key []byte, data []byte) (memlog.Offset, error) {
if key == nil {
return -1, errors.New("invalid key")
}

shard, err := l.sharder.Shard(key, l.conf.shards)
if err != nil {
return -1, fmt.Errorf("get shard: %w", err)
}

offset, err := l.shards[shard].Write(ctx, data)
if err != nil {
return offset, fmt.Errorf("write to shard: %w", err)
}

return offset, nil
}

// Read reads a record from the log at offset using the specified key for shard
// lookup
func (l *Log) Read(ctx context.Context, key []byte, offset memlog.Offset) (memlog.Record, error) {
if key == nil {
return memlog.Record{}, errors.New("invalid key")
}

shard, err := l.sharder.Shard(key, l.conf.shards)
if err != nil {
return memlog.Record{}, fmt.Errorf("get shard: %w", err)
}

r, err := l.shards[shard].Read(ctx, offset)
if err != nil {
return memlog.Record{}, fmt.Errorf("read from shard: %w", err)
}

return r, nil
}