From fb5376d41afeb23f4762bfdb14529b2922e6e65d Mon Sep 17 00:00:00 2001 From: mkvolkov Date: Thu, 8 Sep 2022 15:37:50 +0300 Subject: [PATCH] etcd watch + test --- examples/complex-etcd/main.go | 57 +++++++++++++++++++++++++++++++++++ providers/etcd/etcd.go | 20 ++++++++++++ 2 files changed, 77 insertions(+) diff --git a/examples/complex-etcd/main.go b/examples/complex-etcd/main.go index 300ca0e..22a5448 100644 --- a/examples/complex-etcd/main.go +++ b/examples/complex-etcd/main.go @@ -6,6 +6,7 @@ import ( "context" "fmt" "log" + "os/exec" "strings" "time" @@ -227,5 +228,61 @@ func main() { } fmt.Printf("Third (combined) request test passed.\n") + + kCheck.Delete("") + + // Watch test + + sKey = "child" + providerCfg = etcd.Config{ + Endpoints: []string{"localhost:2379"}, + DialTimeout: time.Second * 5, + Prefix: true, + Key: "child", + } + + provider = etcd.Provider(providerCfg) + + if err := kCheck.Load(provider, nil); err != nil { + log.Fatalf("error loading config: %v", err) + } + + changedC := make(chan string, 1) + + provider.Watch(func(event interface{}, err error) { + if err != nil { + fmt.Printf("Unexpected error: %v", err) + return + } + + kCheck.Load(provider, nil) + changedC <- kCheck.String(string(event.(*clientv3.Event).Kv.Key)) + }) + + var newVal string = "Brian" + cmd := exec.Command("etcdctl", "put", "child1", newVal) + err = cmd.Run() + if err != nil { + log.Fatal(err) + } + + if strings.Compare(newVal, <-changedC) != 0 { + fmt.Printf("Watch failed: new value comparison FAILED\n") + return + } + + newVal = "Kate" + cmd = exec.Command("etcdctl", "put", "child2", newVal) + err = cmd.Run() + if err != nil { + log.Fatal(err) + } + + if strings.Compare(newVal, <-changedC) != 0 { + fmt.Printf("Watch failed: new value comparison FAILED\n") + return + } + fmt.Printf("Watch test passed.\n") + fmt.Printf("ALL TESTS PASSED\n") } diff --git a/providers/etcd/etcd.go b/providers/etcd/etcd.go index 11a82de..d67e1a6 100644 --- a/providers/etcd/etcd.go +++ b/providers/etcd/etcd.go @@ -90,3 +90,23 @@ func (e *Etcd) Read() (map[string]interface{}, error) { return mp, nil } + +func (e *Etcd) Watch(cb func(event interface{}, err error)) error { + var w clientv3.WatchChan + + go func() { + if e.cfg.Prefix { + w = e.client.Watch(context.Background(), e.cfg.Key, clientv3.WithPrefix()) + } else { + w = e.client.Watch(context.Background(), e.cfg.Key) + } + + for wresp := range w { + for _, ev := range wresp.Events { + cb(ev, nil) + } + } + }() + + return nil +}