Skip to content

Commit

Permalink
feat: add nats provider (#213)
Browse files Browse the repository at this point in the history
* feat(providers): add nats provider
adds the nats provider which reads the Nats JetStream Key-Value storage similarly to ETCD

Co-authored-by: Kailash Nadh <kailash@nadh.in>
  • Loading branch information
TECHNOFAB11 and knadh committed Apr 13, 2023
1 parent 56f8eca commit 317371e
Show file tree
Hide file tree
Showing 7 changed files with 325 additions and 1 deletion.
12 changes: 11 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,14 @@ require (
github.com/mitchellh/mapstructure v1.5.0
)

require github.com/mitchellh/reflectwalk v1.0.2 // indirect
require (
github.com/nats-io/nkeys v0.4.4 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
golang.org/x/crypto v0.6.0 // indirect
golang.org/x/sys v0.5.0 // indirect
)

require (
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/nats-io/nats.go v1.25.0
)
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,13 @@ github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyua
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ=
github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
github.com/nats-io/nats.go v1.25.0 h1:t5/wCPGciR7X3Mu8QOi4jiJaXaWM8qtkLu4lzGZvYHE=
github.com/nats-io/nats.go v1.25.0/go.mod h1:D2WALIhz7V8M0pH8Scx8JZXlg6Oqz5VG+nQkK8nJdvg=
github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA=
github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc=
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
30 changes: 30 additions & 0 deletions providers/nats/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
module github.com/knadh/koanf/providers/nats

go 1.18

require (
github.com/knadh/koanf/v2 v2.0.0
github.com/nats-io/nats-server/v2 v2.9.15
github.com/nats-io/nats.go v1.25.0
github.com/stretchr/testify v1.8.2
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/klauspost/compress v1.16.0 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.3.0 // indirect
github.com/nats-io/nkeys v0.4.4 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/crypto v0.6.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
64 changes: 64 additions & 0 deletions providers/nats/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.16.0 h1:iULayQNOReoYUe+1qtKOqw9CwJv3aNQu8ivo7lw1HU4=
github.com/klauspost/compress v1.16.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs=
github.com/knadh/koanf/maps v0.1.1/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI=
github.com/knadh/koanf/v2 v2.0.0 h1:XPQ5ilNnwnNaHrfQ1YpTVhUAjcGHnEKA+lRpipQv02Y=
github.com/knadh/koanf/v2 v2.0.0/go.mod h1:ZeiIlIDXTE7w1lMT6UVcNiRAS2/rCeLn/GdLNvY1Dus=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw=
github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ=
github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI=
github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.9.15 h1:MuwEJheIwpvFgqvbs20W8Ish2azcygjf4Z0liVu2I4c=
github.com/nats-io/nats-server/v2 v2.9.15/go.mod h1:QlCTy115fqpx4KSOPFIxSV7DdI6OxtZsGOL1JLdeRlE=
github.com/nats-io/nats.go v1.25.0 h1:t5/wCPGciR7X3Mu8QOi4jiJaXaWM8qtkLu4lzGZvYHE=
github.com/nats-io/nats.go v1.25.0/go.mod h1:D2WALIhz7V8M0pH8Scx8JZXlg6Oqz5VG+nQkK8nJdvg=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA=
github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc=
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
99 changes: 99 additions & 0 deletions providers/nats/nats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package nats

import (
"errors"
"fmt"
"strings"
"time"

"github.com/nats-io/nats.go"
)

type Config struct {
// nats endpoint (comma separated urls are possible, eg "nats://one, nats://two").
URL string

// Optional NATS options: nats.Connect(url, ...options)
Options []nats.Option

// Optional JetStream options: nc.JetStream(...options)
JetStreamOptions []nats.JSOpt

// Bucket is the Nats KV bucket.
Bucket string

// Prefix (optional).
Prefix string
}

// Nats implements the nats config provider.
type Nats struct {
kv nats.KeyValue
cfg Config
}

// Provider returns a provider that takes nats config.
func Provider(cfg Config) (*Nats, error) {
nc, err := nats.Connect(cfg.URL, cfg.Options...)
if err != nil {
return nil, err
}

js, err := nc.JetStream(cfg.JetStreamOptions...)
if err != nil {
return nil, err
}

kv, err := js.KeyValue(cfg.Bucket)
if err != nil {
return nil, err
}

return &Nats{kv: kv, cfg: cfg}, nil
}

// ReadBytes is not supported by nats provider.
func (n *Nats) ReadBytes() ([]byte, error) {
return nil, errors.New("nats provider does not support this method")
}

// Read returns a nested config map.
func (n *Nats) Read() (map[string]interface{}, error) {
keys, err := n.kv.Keys()
if err != nil {
return nil, err
}

mp := make(map[string]interface{})
for _, key := range keys {
if !strings.HasPrefix(key, n.cfg.Prefix) {
continue
}
res, err := n.kv.Get(key)
if err != nil {
return nil, err
}
mp[res.Key()] = string(res.Value())
}

return mp, nil
}

func (n *Nats) Watch(cb func(event interface{}, err error)) error {
w, err := n.kv.Watch(fmt.Sprintf("%s.*", n.cfg.Prefix))
if err != nil {
return err
}

start := time.Now()
go func(watcher nats.KeyWatcher) {
for update := range watcher.Updates() {
// ignore nil events and only callback when the event is new (nats always sends one "old" event)
if update != nil && update.Created().After(start) {
cb(update, nil)
}
}
}(w)

return nil
}
74 changes: 74 additions & 0 deletions providers/nats/nats_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package nats

import (
"testing"
"time"

"github.com/knadh/koanf/v2"
"github.com/nats-io/nats.go"
"github.com/stretchr/testify/assert"
)

func TestNats(t *testing.T) {
k := koanf.NewWithConf(koanf.Conf{})

nc, err := nats.Connect(testNatsURL)
if err != nil {
t.Fatal(err)
}
defer nc.Drain()

js, err := nc.JetStream()
if err != nil {
t.Fatal(err)
}
kv, err := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "test",
})
_, err = kv.Put("some.test.color", []byte("blue"))
if err != nil {
t.Fatal(err)
}

provider, err := Provider(Config{
URL: testNatsURL,
Bucket: "test",
Prefix: "some.test",
})
if err != nil {
t.Fatal(err)
}
err = k.Load(provider, nil)
if err != nil {
t.Fatal(err)
}

assert.Equal(t, k.Keys(), []string{"some.test.color"})
assert.Equal(t, k.Get("some.test.color"), "blue")

err = provider.Watch(func(event interface{}, err error) {
if err != nil {
t.Fatal(err)
}

err = k.Load(provider, nil)
if err != nil {
t.Fatal(err)
}
})
if err != nil {
t.Fatal(err)
}

go func() {
_, err := kv.Put("some.test.color", []byte("yellow"))
if err != nil {
t.Error(err)
return
}
}()

time.Sleep(100 * time.Millisecond)

assert.Equal(t, k.Get("some.test.color"), "yellow")
}
37 changes: 37 additions & 0 deletions providers/nats/testrunner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package nats

import (
"log"
"os"
"testing"
"time"

"github.com/nats-io/nats-server/v2/logger"
"github.com/nats-io/nats-server/v2/server"
)

var testNatsURL string

func TestMain(m *testing.M) {
gnatsd, err := server.NewServer(&server.Options{
Port: server.RANDOM_PORT,
JetStream: true,
})
if err != nil {
log.Fatal("failed to create gnatsd server")
}
gnatsd.SetLogger(
logger.NewStdLogger(false, false, false, false, false),
false,
false,
)
go gnatsd.Start()
defer gnatsd.Shutdown()

if !gnatsd.ReadyForConnections(time.Second) {
log.Fatal("failed to start the gnatsd server")
}
testNatsURL = "nats://" + gnatsd.Addr().String()

os.Exit(m.Run())
}

0 comments on commit 317371e

Please sign in to comment.