-
Notifications
You must be signed in to change notification settings - Fork 24
/
influxdb.go
135 lines (115 loc) · 2.78 KB
/
influxdb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package main
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"net/url"
"time"
log "github.com/Sirupsen/logrus"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
)
const (
DefaultTick = 1
PingTimeout = 500 * time.Millisecond
)
type InfluxDBConf struct {
Hostname string
Port int
Url string
Db string
Token string
Tick int
UDP bool
Debug string
TagsAttributes []string
TopicMap []string // maps the end of the mqtt topic to tags `weather/{loc}/{sensor}`
NoTopicTag bool // does not forward the topic as tag
Series string // override the series name instead of topic mapping
CaCerts []string
Scheme string
Insecure bool // skips certificate validation
Bucket string
Org string
}
type InfluxDBClient struct {
Client influxdb2.Client
Config InfluxDBConf
Coder *MqttSeriesEncoder
ifChan chan Message
write api.WriteAPI
}
func LoadCertPool(conf InfluxDBConf) *x509.CertPool {
certPool, err := x509.SystemCertPool()
if err != nil {
log.Errorf("Error while loading system cert pool")
log.Error(err)
}
for _, path := range conf.CaCerts {
path = ExpandPath(path)
log.Debugf("Loading certificate %s", path)
raw, err := ioutil.ReadFile(path)
if err != nil {
log.Errorf("Error while loading certificate %s", path)
log.Fatal(err)
}
certPool.AppendCertsFromPEM(raw)
}
return certPool
}
func NewInfluxDBClient(conf InfluxDBConf, ifChan chan Message) (*InfluxDBClient, error) {
host := conf.Url
if len(host) == 0 {
scheme := conf.Scheme
if scheme == "" {
scheme = "http"
}
host = fmt.Sprintf("%s://%s:%d", conf.Scheme, conf.Hostname, conf.Port)
}
log.Infof("influxdb host: %s", host)
_, err := url.Parse(host)
if err != nil {
return nil, err
}
certPool := LoadCertPool(conf)
// Make client
client := influxdb2.NewClientWithOptions(host, conf.Token,
influxdb2.DefaultOptions().
SetTLSConfig(&tls.Config{
RootCAs: certPool,
InsecureSkipVerify: conf.Insecure,
}))
// Check connectivity
ctx, cancel := context.WithTimeout(context.Background(), PingTimeout)
defer cancel()
_, err = client.Ping(ctx)
if err != nil {
return nil, err
}
log.Infof("influxdb connected.")
tick := conf.Tick
if tick == 0 {
tick = DefaultTick
}
ifc := InfluxDBClient{
Client: client,
Coder: NewMqttSeriesEncoder(&conf),
Config: conf,
ifChan: ifChan,
write: client.WriteAPI(conf.Org, conf.Bucket),
}
return &ifc, nil
}
// Start start sending
func (ifc *InfluxDBClient) Start() error {
for {
msg := <-ifc.ifChan
point := ifc.Coder.Encode(msg)
if point == nil {
continue
}
ifc.write.WritePoint(point)
}
}