forked from knadh/koanf
/
consul.go
156 lines (127 loc) · 3.46 KB
/
consul.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package consul
import (
"errors"
"fmt"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/api/watch"
)
// Config represents the Consul client configuration.
type Config struct {
// Path of the key to read. If Recurse is true, this is treated
// as a prefix.
Key string
// https://www.consul.io/api-docs/kv#read-key
// If recurse is true, Consul returns an array of keys.
// It specifies if the lookup should be recursive and treat
// Key as a prefix instead of a literal match.
// This is analogous to: consul kv get -recurse key
Recurse bool
// Gets additional metadata about the key in addition to the value such
// as the ModifyIndex and any flags that may have been set on the key.
// This is analogous to: consul kv get -detailed key
Detailed bool
// Consul client config
Cfg *api.Config
}
// Consul implements the Consul provider.
type Consul struct {
client *api.Client
cfg Config
}
// Provider returns an instance of the Consul provider.
func Provider(cfg Config) *Consul {
c, err := api.NewClient(cfg.Cfg)
if err != nil {
return nil
}
return &Consul{client: c, cfg: cfg}
}
// ReadBytes is not supported by the Consul provider.
func (c *Consul) ReadBytes() ([]byte, error) {
return nil, errors.New("consul provider does not support this method")
}
// Read reads configuration from the Consul provider.
func (c *Consul) Read() (map[string]interface{}, error) {
var (
mp = make(map[string]interface{})
kv = c.client.KV()
)
if c.cfg.Recurse {
pairs, _, err := kv.List(c.cfg.Key, nil)
if err != nil {
return nil, err
}
// Detailed information can be obtained using standard koanf flattened delimited keys:
// For example:
// "parent1.CreateIndex"
// "parent1.Flags"
// "parent1.LockIndex"
// "parent1.ModifyIndex"
// "parent1.Session"
// "parent1.Value"
if c.cfg.Detailed {
for _, pair := range pairs {
m := make(map[string]interface{})
m["CreateIndex"] = fmt.Sprintf("%d", pair.CreateIndex)
m["Flags"] = fmt.Sprintf("%d", pair.Flags)
m["LockIndex"] = fmt.Sprintf("%d", pair.LockIndex)
m["ModifyIndex"] = fmt.Sprintf("%d", pair.ModifyIndex)
if pair.Session == "" {
m["Session"] = "-"
} else {
m["Session"] = fmt.Sprintf("%s", pair.Session)
}
m["Value"] = string(pair.Value)
mp[pair.Key] = m
}
} else {
for _, pair := range pairs {
mp[pair.Key] = string(pair.Value)
}
}
return mp, nil
}
pair, _, err := kv.Get(c.cfg.Key, nil)
if err != nil {
return nil, err
}
if c.cfg.Detailed {
m := make(map[string]interface{})
m["CreateIndex"] = fmt.Sprintf("%d", pair.CreateIndex)
m["Flags"] = fmt.Sprintf("%d", pair.Flags)
m["LockIndex"] = fmt.Sprintf("%d", pair.LockIndex)
m["ModifyIndex"] = fmt.Sprintf("%d", pair.ModifyIndex)
if pair.Session == "" {
m["Session"] = "-"
} else {
m["Session"] = fmt.Sprintf("%s", pair.Session)
}
m["Value"] = string(pair.Value)
mp[pair.Key] = m
} else {
mp[pair.Key] = string(pair.Value)
}
return mp, nil
}
// Watch watches for changes in the Consul API and triggers a callback.
func (c *Consul) Watch(cb func(event interface{}, err error)) error {
p := make(map[string]interface{})
if c.cfg.Recurse {
p["type"] = "keyprefix"
p["prefix"] = c.cfg.Key
} else {
p["type"] = "key"
p["key"] = c.cfg.Key
}
plan, err := watch.Parse(p)
if err != nil {
return err
}
plan.Handler = func(_ uint64, val interface{}) {
cb(val, nil)
}
go func() {
plan.Run(c.cfg.Cfg.Address)
}()
return nil
}