-
Notifications
You must be signed in to change notification settings - Fork 759
/
output_bigquery.go
418 lines (361 loc) · 13.8 KB
/
output_bigquery.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
package gcp
import (
"bytes"
"context"
"fmt"
"net/http"
"strings"
"sync"
"cloud.google.com/go/bigquery"
"github.com/Jeffail/benthos/v3/internal/component/output"
"github.com/Jeffail/benthos/v3/public/service"
"golang.org/x/text/encoding/charmap"
"google.golang.org/api/googleapi"
"google.golang.org/api/option"
)
type gcpBigQueryCSVConfig struct {
Header []string
FieldDelimiter string
AllowJaggedRows bool
AllowQuotedNewlines bool
Encoding string
SkipLeadingRows int
}
func gcpBigQueryCSVConfigFromParsed(conf *service.ParsedConfig) (csvconf gcpBigQueryCSVConfig, err error) {
if csvconf.Header, err = conf.FieldStringList("header"); err != nil {
return
}
if csvconf.FieldDelimiter, err = conf.FieldString("field_delimiter"); err != nil {
return
}
if csvconf.AllowJaggedRows, err = conf.FieldBool("allow_jagged_rows"); err != nil {
return
}
if csvconf.AllowQuotedNewlines, err = conf.FieldBool("allow_quoted_newlines"); err != nil {
return
}
if csvconf.Encoding, err = conf.FieldString("encoding"); err != nil {
return
}
if csvconf.SkipLeadingRows, err = conf.FieldInt("skip_leading_rows"); err != nil {
return
}
return
}
type gcpBigQueryOutputConfig struct {
ProjectID string
DatasetID string
TableID string
Format string
WriteDisposition string
CreateDisposition string
AutoDetect bool
IgnoreUnknownValues bool
MaxBadRecords int
// CSV options
CSVOptions gcpBigQueryCSVConfig
}
func gcpBigQueryOutputConfigFromParsed(conf *service.ParsedConfig) (gconf gcpBigQueryOutputConfig, err error) {
if gconf.ProjectID, err = conf.FieldString("project"); err != nil {
return
}
if gconf.ProjectID == "" {
gconf.ProjectID = bigquery.DetectProjectID
}
if gconf.DatasetID, err = conf.FieldString("dataset"); err != nil {
return
}
if gconf.TableID, err = conf.FieldString("table"); err != nil {
return
}
if gconf.Format, err = conf.FieldString("format"); err != nil {
return
}
if gconf.WriteDisposition, err = conf.FieldString("write_disposition"); err != nil {
return
}
if gconf.CreateDisposition, err = conf.FieldString("create_disposition"); err != nil {
return
}
if gconf.IgnoreUnknownValues, err = conf.FieldBool("ignore_unknown_values"); err != nil {
return
}
if gconf.MaxBadRecords, err = conf.FieldInt("max_bad_records"); err != nil {
return
}
if gconf.AutoDetect, err = conf.FieldBool("auto_detect"); err != nil {
return
}
if gconf.CSVOptions, err = gcpBigQueryCSVConfigFromParsed(conf.Namespace("csv")); err != nil {
return
}
return
}
type gcpBQClientURL string
func (g gcpBQClientURL) NewClient(ctx context.Context, projectID string) (*bigquery.Client, error) {
if g == "" {
return bigquery.NewClient(ctx, projectID)
}
return bigquery.NewClient(ctx, projectID, option.WithoutAuthentication(), option.WithEndpoint(string(g)))
}
func gcpBigQueryConfig() *service.ConfigSpec {
return service.NewConfigSpec().
// Stable(). TODO
Categories("GCP", "Services").
Version("3.55.0").
Summary(`Sends messages as new rows to a Google Cloud BigQuery table.`).
Description(output.Description(true, true, `
## Credentials
By default Benthos will use a shared credentials file when connecting to GCP services. You can find out more [in this document](/docs/guides/cloud/gcp).
## Format
This output currently supports only CSV and NEWLINE_DELIMITED_JSON formats. Learn more about how to use GCP BigQuery with them here:
- `+"[`NEWLINE_DELIMITED_JSON`](https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-json)"+`
- `+"[`CSV`](https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-csv)"+`
Each message may contain multiple elements separated by newlines. For example a single message containing:
`+"```json"+`
{"key": "1"}
{"key": "2"}
`+"```"+`
Is equivalent to two separate messages:
`+"```json"+`
{"key": "1"}
`+"```"+`
And:
`+"```json"+`
{"key": "2"}
`+"```"+`
The same is true for the CSV format.
### CSV
For the CSV format when the field `+"`csv.header`"+` is specified a header row will be inserted as the first line of each message batch. If this field is not provided then the first message of each message batch must include a header line.`)).
Field(service.NewStringField("project").Description("The project ID of the dataset to insert data to. If not set, it will be inferred from the credentials or read from the GOOGLE_CLOUD_PROJECT environment variable.").Default("")).
Field(service.NewStringField("dataset").Description("The BigQuery Dataset ID.")).
Field(service.NewStringField("table").Description("The table to insert messages to.")).
Field(service.NewStringEnumField("format", string(bigquery.JSON), string(bigquery.CSV)).
Description("The format of each incoming message.").
Default(string(bigquery.JSON))).
Field(service.NewIntField("max_in_flight").
Description("The maximum number of messages to have in flight at a given time. Increase this to improve throughput.").
Default(64)). // TODO: Tune this default
Field(service.NewStringEnumField("write_disposition",
string(bigquery.WriteAppend), string(bigquery.WriteEmpty), string(bigquery.WriteTruncate)).
Description("Specifies how existing data in a destination table is treated.").
Advanced().
Default(string(bigquery.WriteAppend))).
Field(service.NewStringEnumField("create_disposition", string(bigquery.CreateIfNeeded), string(bigquery.CreateNever)).
Description("Specifies the circumstances under which destination table will be created. If CREATE_IF_NEEDED is used the GCP BigQuery will create the table if it does not already exist and tables are created atomically on successful completion of a job. The CREATE_NEVER option ensures the table must already exist and will not be automatically created.").
Advanced().
Default(string(bigquery.CreateIfNeeded))).
Field(service.NewBoolField("ignore_unknown_values").
Description("Causes values not matching the schema to be tolerated. Unknown values are ignored. For CSV this ignores extra values at the end of a line. For JSON this ignores named values that do not match any column name. If this field is set to false (the default value), records containing unknown values are treated as bad records. The max_bad_records field can be used to customize how bad records are handled.").
Advanced().
Default(false)).
Field(service.NewIntField("max_bad_records").
Description("The maximum number of bad records that will be ignored when reading data.").
Advanced().
Default(0)).
Field(service.NewBoolField("auto_detect").
Description("Indicates if we should automatically infer the options and schema for CSV and JSON sources. If the table doesn't exist and this field is set to `false` the output may not be able to insert data and will throw insertion error. Be careful using this field since it delegates to the GCP BigQuery service the schema detection and values like `\"no\"` may be treated as booleans for the CSV format.").
Advanced().
Default(false)).
Field(service.NewObjectField("csv",
service.NewStringListField("header").
Description("A list of values to use as header for each batch of messages. If not specified the first line of each message will be used as header.").
Default([]interface{}{}),
service.NewStringField("field_delimiter").
Description("The separator for fields in a CSV file, used when reading or exporting data.").
Default(","),
service.NewBoolField("allow_jagged_rows").
Description("Causes missing trailing optional columns to be tolerated when reading CSV data. Missing values are treated as nulls.").
Advanced().
Default(false),
service.NewBoolField("allow_quoted_newlines").
Description("Sets whether quoted data sections containing newlines are allowed when reading CSV data.").
Advanced().
Default(false),
service.NewStringEnumField("encoding", string(bigquery.UTF_8), string(bigquery.ISO_8859_1)).
Description("Encoding is the character encoding of data to be read.").
Advanced().
Default(string(bigquery.UTF_8)),
service.NewIntField("skip_leading_rows").
Description("The number of rows at the top of a CSV file that BigQuery will skip when reading data. The default value is 1 since Benthos will add the specified header in the first line of each batch sent to BigQuery.").
Advanced().
Default(1),
).Description("Specify how CSV data should be interpretted.")).
Field(service.NewBatchPolicyField("batching"))
}
func init() {
err := service.RegisterBatchOutput(
"gcp_bigquery", gcpBigQueryConfig(),
func(conf *service.ParsedConfig, mgr *service.Resources) (output service.BatchOutput, batchPol service.BatchPolicy, maxInFlight int, err error) {
if batchPol, err = conf.FieldBatchPolicy("batching"); err != nil {
return
}
if maxInFlight, err = conf.FieldInt("max_in_flight"); err != nil {
return
}
var gconf gcpBigQueryOutputConfig
if gconf, err = gcpBigQueryOutputConfigFromParsed(conf); err != nil {
return
}
output, err = newGCPBigQueryOutput(gconf, mgr.Logger())
return
})
if err != nil {
panic(err)
}
}
type gcpBigQueryOutput struct {
conf gcpBigQueryOutputConfig
clientURL gcpBQClientURL
client *bigquery.Client
connMut sync.RWMutex
fieldDelimiterBytes []byte
csvHeaderBytes []byte
newLineBytes []byte
log *service.Logger
}
func newGCPBigQueryOutput(
conf gcpBigQueryOutputConfig,
log *service.Logger,
) (*gcpBigQueryOutput, error) {
g := &gcpBigQueryOutput{
conf: conf,
log: log,
}
g.newLineBytes = []byte("\n")
if conf.Format != string(bigquery.CSV) {
return g, nil
}
g.fieldDelimiterBytes = []byte(conf.CSVOptions.FieldDelimiter)
if len(conf.CSVOptions.Header) > 0 {
header := fmt.Sprint("\"", strings.Join(conf.CSVOptions.Header, fmt.Sprint("\"", conf.CSVOptions.FieldDelimiter, "\"")), "\"")
g.csvHeaderBytes = []byte(header)
}
if conf.CSVOptions.Encoding == string(bigquery.UTF_8) {
return g, nil
}
var err error
if g.fieldDelimiterBytes, err = convertToIso(g.fieldDelimiterBytes); err != nil {
return nil, fmt.Errorf("error parsing csv.field_delimiter field: %w", err)
}
if g.newLineBytes, err = convertToIso([]byte("\n")); err != nil {
return nil, fmt.Errorf("error creating newline bytes: %w", err)
}
if len(g.csvHeaderBytes) == 0 {
return g, nil
}
if g.csvHeaderBytes, err = convertToIso(g.csvHeaderBytes); err != nil {
return nil, fmt.Errorf("error parsing csv.header field: %w", err)
}
return g, nil
}
// convertToIso converts a utf-8 byte encoding to iso-8859-1 byte encoding
func convertToIso(value []byte) (result []byte, err error) {
return charmap.ISO8859_1.NewEncoder().Bytes(value)
}
func (g *gcpBigQueryOutput) Connect(ctx context.Context) (err error) {
g.connMut.Lock()
defer g.connMut.Unlock()
var client *bigquery.Client
if client, err = g.clientURL.NewClient(ctx, g.conf.ProjectID); err != nil {
err = fmt.Errorf("error creating big query client: %w", err)
return
}
defer func() {
if err != nil {
client.Close()
}
}()
dataset := client.DatasetInProject(client.Project(), g.conf.DatasetID)
if _, err = dataset.Metadata(ctx); err != nil {
if hasStatusCode(err, http.StatusNotFound) {
err = fmt.Errorf("dataset does not exist: %v", g.conf.DatasetID)
} else {
err = fmt.Errorf("error checking dataset existence: %w", err)
}
return
}
if g.conf.CreateDisposition == string(bigquery.CreateNever) {
table := dataset.Table(g.conf.TableID)
if _, err = table.Metadata(ctx); err != nil {
if hasStatusCode(err, http.StatusNotFound) {
err = fmt.Errorf("table does not exist: %v", g.conf.TableID)
} else {
err = fmt.Errorf("error checking table existence: %w", err)
}
return
}
}
g.client = client
g.log.Infof("Inserting messages as objects to GCP BigQuery: %v:%v:%v\n", client.Project(), g.conf.DatasetID, g.conf.TableID)
return nil
}
func hasStatusCode(err error, code int) bool {
if e, ok := err.(*googleapi.Error); ok && e.Code == code {
return true
}
return false
}
func (g *gcpBigQueryOutput) WriteBatch(ctx context.Context, batch service.MessageBatch) error {
g.connMut.RLock()
client := g.client
g.connMut.RUnlock()
if client == nil {
return service.ErrNotConnected
}
var data bytes.Buffer
if g.csvHeaderBytes != nil {
data.Write(g.csvHeaderBytes)
}
for _, msg := range batch {
msgBytes, err := msg.AsBytes()
if err != nil {
return err
}
if data.Len() > 0 {
data.Write(g.newLineBytes)
}
data.Write(msgBytes)
}
dataBytes := data.Bytes()
job, err := g.createTableLoader(&dataBytes).Run(ctx)
if err != nil {
return err
}
status, err := job.Wait(ctx)
if err == nil {
err = status.Err()
}
if err != nil {
return fmt.Errorf("error inserting data in bigquery: %w", err)
}
return nil
}
func (g *gcpBigQueryOutput) createTableLoader(data *[]byte) *bigquery.Loader {
table := g.client.DatasetInProject(g.client.Project(), g.conf.DatasetID).Table(g.conf.TableID)
source := bigquery.NewReaderSource(bytes.NewReader(*data))
source.SourceFormat = bigquery.DataFormat(g.conf.Format)
source.AutoDetect = g.conf.AutoDetect
source.IgnoreUnknownValues = g.conf.IgnoreUnknownValues
source.MaxBadRecords = int64(g.conf.MaxBadRecords)
if g.conf.Format == string(bigquery.CSV) {
source.FieldDelimiter = g.conf.CSVOptions.FieldDelimiter
source.AllowJaggedRows = g.conf.CSVOptions.AllowJaggedRows
source.AllowQuotedNewlines = g.conf.CSVOptions.AllowQuotedNewlines
source.Encoding = bigquery.Encoding(g.conf.CSVOptions.Encoding)
source.SkipLeadingRows = int64(g.conf.CSVOptions.SkipLeadingRows)
}
loader := table.LoaderFrom(source)
loader.CreateDisposition = bigquery.TableCreateDisposition(g.conf.CreateDisposition)
loader.WriteDisposition = bigquery.TableWriteDisposition(g.conf.WriteDisposition)
return loader
}
func (g *gcpBigQueryOutput) Close(ctx context.Context) error {
g.connMut.Lock()
if g.client != nil {
g.client.Close()
g.client = nil
}
g.connMut.Unlock()
return nil
}