Skip to content

Commit

Permalink
Fix nil access in natskv
Browse files Browse the repository at this point in the history
  • Loading branch information
cavus700 committed Apr 19, 2024
1 parent 3ea50a0 commit 3c525a8
Showing 1 changed file with 22 additions and 18 deletions.
40 changes: 22 additions & 18 deletions backend/natskv/natskv.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ package natskv

import (
"context"
"strings"
"time"

"github.com/nats-io/nats.go"
"github.com/sagikazarmark/crypt/backend"
"strings"
)

type Client struct {
Expand Down Expand Up @@ -35,7 +33,6 @@ func (c *Client) Get(key string) ([]byte, error) {
}

func (c *Client) GetWithContext(ctx context.Context, key string) ([]byte, error) {
defer c.conn.Close()
kv, err := c.js.KeyValue(c.bucket)
if err != nil {
return nil, err
Expand All @@ -54,7 +51,6 @@ func (c *Client) List(bucket string) (backend.KVPairs, error) {
}

func (c *Client) ListWithContext(ctx context.Context, bucket string) (backend.KVPairs, error) {
defer c.conn.Close()
res := backend.KVPairs{}
kv, err := c.js.KeyValue(c.bucket)
if err != nil {
Expand Down Expand Up @@ -87,7 +83,6 @@ func (c *Client) Set(key string, value []byte) error {
}

func (c *Client) SetWithContext(ctx context.Context, key string, value []byte) error {
defer c.conn.Close()
kv, err := c.js.KeyValue(c.bucket)
if err != nil {
return err
Expand All @@ -105,8 +100,7 @@ func (c *Client) Watch(key string, stop chan bool) <-chan *backend.Response {
return c.WatchWithContext(context.TODO(), key, stop)
}

func (c *Client) WatchWithContext(ctx context.Context, key string, stop chan bool) <-chan *backend.Response {
defer c.conn.Close()
func (c *Client) WatchWithContext(ctx context.Context, key string, stop <-chan bool) <-chan *backend.Response {
ch := make(chan *backend.Response, 0)

kv, err := c.js.KeyValue(c.bucket)
Expand All @@ -120,18 +114,28 @@ func (c *Client) WatchWithContext(ctx context.Context, key string, stop chan boo
}

go func() {
go func() {
defer c.conn.Close()
<-stop
watch.Stop()
}()

defer close(ch)
initialValuesReceived := false
for {
k := <-watch.Updates()
ch <- &backend.Response{
Value: k.Value(),
select {
case k := <-watch.Updates():
if !initialValuesReceived {
if k == nil {
initialValuesReceived = true
}
continue
}

ch <- &backend.Response{
Value: k.Value(),
}
case <-ctx.Done():
_ = watch.Stop()
return
case <-stop:
_ = watch.Stop()
return
}
time.Sleep(time.Second * 5)
}

}()
Expand Down

0 comments on commit 3c525a8

Please sign in to comment.