From 755f6f86ca6b4b5686be1376e65043bf2210f9c1 Mon Sep 17 00:00:00 2001 From: Mihai Todor Date: Thu, 25 Nov 2021 19:00:06 +0000 Subject: [PATCH] Infer the Project ID when using the GCP BigQuery output Also fix a bunch of typos --- internal/impl/gcp/output_bigquery.go | 15 +++++++++------ internal/impl/gcp/output_bigquery_test.go | 16 ++++++++++++++-- website/docs/components/outputs/gcp_bigquery.md | 3 ++- 3 files changed, 25 insertions(+), 9 deletions(-) diff --git a/internal/impl/gcp/output_bigquery.go b/internal/impl/gcp/output_bigquery.go index 792adf03a9..a671abd26e 100644 --- a/internal/impl/gcp/output_bigquery.go +++ b/internal/impl/gcp/output_bigquery.go @@ -66,6 +66,9 @@ func gcpBigQueryOutputConfigFromParsed(conf *service.ParsedConfig) (gconf gcpBig if gconf.ProjectID, err = conf.FieldString("project"); err != nil { return } + if gconf.ProjectID == "" { + gconf.ProjectID = bigquery.DetectProjectID + } if gconf.DatasetID, err = conf.FieldString("dataset"); err != nil { return } @@ -146,7 +149,7 @@ The same is true for the CSV format. ### CSV For the CSV format when the field `+"`csv.header`"+` is specified a header row will be inserted as the first line of each message batch. If this field is not provided then the first message of each message batch must include a header line.`)). - Field(service.NewStringField("project").Description("The project ID of the dataset to insert data to.")). + Field(service.NewStringField("project").Description("The project ID of the dataset to insert data to. If not set, it will be inferred from the credentials or read from the GOOGLE_CLOUD_PROJECT environment variable.").Default("")). Field(service.NewStringField("dataset").Description("The BigQuery Dataset ID.")). Field(service.NewStringField("table").Description("The table to insert messages to.")). Field(service.NewStringEnumField("format", string(bigquery.JSON), string(bigquery.CSV)). @@ -304,10 +307,10 @@ func (g *gcpBigQueryOutput) Connect(ctx context.Context) (err error) { } }() - dataset := client.DatasetInProject(g.conf.ProjectID, g.conf.DatasetID) + dataset := client.DatasetInProject(client.Project(), g.conf.DatasetID) if _, err = dataset.Metadata(ctx); err != nil { if hasStatusCode(err, http.StatusNotFound) { - err = fmt.Errorf("dataset does not exists: %v", g.conf.DatasetID) + err = fmt.Errorf("dataset does not exist: %v", g.conf.DatasetID) } else { err = fmt.Errorf("error checking dataset existence: %w", err) } @@ -318,7 +321,7 @@ func (g *gcpBigQueryOutput) Connect(ctx context.Context) (err error) { table := dataset.Table(g.conf.TableID) if _, err = table.Metadata(ctx); err != nil { if hasStatusCode(err, http.StatusNotFound) { - err = fmt.Errorf("table does not exists: %v", g.conf.TableID) + err = fmt.Errorf("table does not exist: %v", g.conf.TableID) } else { err = fmt.Errorf("error checking table existence: %w", err) } @@ -327,7 +330,7 @@ func (g *gcpBigQueryOutput) Connect(ctx context.Context) (err error) { } g.client = client - g.log.Infof("Inserting messages as objects to GCP BigQuery: %v:%v:%v\n", g.conf.ProjectID, g.conf.DatasetID, g.conf.TableID) + g.log.Infof("Inserting messages as objects to GCP BigQuery: %v:%v:%v\n", client.Project(), g.conf.DatasetID, g.conf.TableID) return nil } @@ -380,7 +383,7 @@ func (g *gcpBigQueryOutput) WriteBatch(ctx context.Context, batch service.Messag } func (g *gcpBigQueryOutput) createTableLoader(data *[]byte) *bigquery.Loader { - table := g.client.DatasetInProject(g.conf.ProjectID, g.conf.DatasetID).Table(g.conf.TableID) + table := g.client.DatasetInProject(g.client.Project(), g.conf.DatasetID).Table(g.conf.TableID) source := bigquery.NewReaderSource(bytes.NewReader(*data)) source.SourceFormat = bigquery.DataFormat(g.conf.Format) diff --git a/internal/impl/gcp/output_bigquery_test.go b/internal/impl/gcp/output_bigquery_test.go index 5e5b1813e0..b23319e217 100644 --- a/internal/impl/gcp/output_bigquery_test.go +++ b/internal/impl/gcp/output_bigquery_test.go @@ -175,6 +175,13 @@ func TestGCPBigQueryOutputConvertToIsoError(t *testing.T) { } func TestGCPBigQueryOutputCreateTableLoaderOk(t *testing.T) { + server := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(`{"id" : "dataset_meow"}`)) + }), + ) + defer server.Close() + // Setting non-default values outputConfig := gcpBigQueryConfFromYAML(t, ` project: project_meow @@ -197,6 +204,11 @@ csv: output, err := newGCPBigQueryOutput(outputConfig, nil) require.NoError(t, err) + output.clientURL = gcpBQClientURL(server.URL) + err = output.Connect(context.Background()) + defer output.Close(context.Background()) + require.NoError(t, err) + var data = []byte("1,2,3") loader := output.createTableLoader(&data) @@ -246,7 +258,7 @@ table: table_meow err = output.Connect(context.Background()) defer output.Close(context.Background()) - require.EqualError(t, err, "dataset does not exists: dataset_meow") + require.EqualError(t, err, "dataset does not exist: dataset_meow") } func TestGCPBigQueryOutputDatasetDoNotExistsUnknownError(t *testing.T) { @@ -305,7 +317,7 @@ create_disposition: CREATE_NEVER err = output.Connect(context.Background()) defer output.Close(context.Background()) - require.EqualError(t, err, "table does not exists: table_meow") + require.EqualError(t, err, "table does not exist: table_meow") } func TestGCPBigQueryOutputTableDoNotExistsUnknownError(t *testing.T) { diff --git a/website/docs/components/outputs/gcp_bigquery.md b/website/docs/components/outputs/gcp_bigquery.md index 9a3307815f..7373e70577 100644 --- a/website/docs/components/outputs/gcp_bigquery.md +++ b/website/docs/components/outputs/gcp_bigquery.md @@ -135,10 +135,11 @@ Batches can be formed at both the input and output level. You can find out more ### `project` -The project ID of the dataset to insert data to. +The project ID of the dataset to insert data to. If not set, it will be inferred from the credentials or read from the GOOGLE_CLOUD_PROJECT environment variable. Type: `string` +Default: `""` ### `dataset`