Skip to content

Commit

Permalink
Merge pull request #968 from mihaitodor/gcp-bigquery-infer-projectid
Browse files Browse the repository at this point in the history
Infer the Project ID when using the GCP BigQuery output
  • Loading branch information
Jeffail committed Nov 25, 2021
2 parents 5983fd7 + 755f6f8 commit 1231c4f
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 9 deletions.
15 changes: 9 additions & 6 deletions internal/impl/gcp/output_bigquery.go
Expand Up @@ -66,6 +66,9 @@ func gcpBigQueryOutputConfigFromParsed(conf *service.ParsedConfig) (gconf gcpBig
if gconf.ProjectID, err = conf.FieldString("project"); err != nil {
return
}
if gconf.ProjectID == "" {
gconf.ProjectID = bigquery.DetectProjectID
}
if gconf.DatasetID, err = conf.FieldString("dataset"); err != nil {
return
}
Expand Down Expand Up @@ -146,7 +149,7 @@ The same is true for the CSV format.
### CSV
For the CSV format when the field `+"`csv.header`"+` is specified a header row will be inserted as the first line of each message batch. If this field is not provided then the first message of each message batch must include a header line.`)).
Field(service.NewStringField("project").Description("The project ID of the dataset to insert data to.")).
Field(service.NewStringField("project").Description("The project ID of the dataset to insert data to. If not set, it will be inferred from the credentials or read from the GOOGLE_CLOUD_PROJECT environment variable.").Default("")).
Field(service.NewStringField("dataset").Description("The BigQuery Dataset ID.")).
Field(service.NewStringField("table").Description("The table to insert messages to.")).
Field(service.NewStringEnumField("format", string(bigquery.JSON), string(bigquery.CSV)).
Expand Down Expand Up @@ -304,10 +307,10 @@ func (g *gcpBigQueryOutput) Connect(ctx context.Context) (err error) {
}
}()

dataset := client.DatasetInProject(g.conf.ProjectID, g.conf.DatasetID)
dataset := client.DatasetInProject(client.Project(), g.conf.DatasetID)
if _, err = dataset.Metadata(ctx); err != nil {
if hasStatusCode(err, http.StatusNotFound) {
err = fmt.Errorf("dataset does not exists: %v", g.conf.DatasetID)
err = fmt.Errorf("dataset does not exist: %v", g.conf.DatasetID)
} else {
err = fmt.Errorf("error checking dataset existence: %w", err)
}
Expand All @@ -318,7 +321,7 @@ func (g *gcpBigQueryOutput) Connect(ctx context.Context) (err error) {
table := dataset.Table(g.conf.TableID)
if _, err = table.Metadata(ctx); err != nil {
if hasStatusCode(err, http.StatusNotFound) {
err = fmt.Errorf("table does not exists: %v", g.conf.TableID)
err = fmt.Errorf("table does not exist: %v", g.conf.TableID)
} else {
err = fmt.Errorf("error checking table existence: %w", err)
}
Expand All @@ -327,7 +330,7 @@ func (g *gcpBigQueryOutput) Connect(ctx context.Context) (err error) {
}

g.client = client
g.log.Infof("Inserting messages as objects to GCP BigQuery: %v:%v:%v\n", g.conf.ProjectID, g.conf.DatasetID, g.conf.TableID)
g.log.Infof("Inserting messages as objects to GCP BigQuery: %v:%v:%v\n", client.Project(), g.conf.DatasetID, g.conf.TableID)
return nil
}

Expand Down Expand Up @@ -380,7 +383,7 @@ func (g *gcpBigQueryOutput) WriteBatch(ctx context.Context, batch service.Messag
}

func (g *gcpBigQueryOutput) createTableLoader(data *[]byte) *bigquery.Loader {
table := g.client.DatasetInProject(g.conf.ProjectID, g.conf.DatasetID).Table(g.conf.TableID)
table := g.client.DatasetInProject(g.client.Project(), g.conf.DatasetID).Table(g.conf.TableID)

source := bigquery.NewReaderSource(bytes.NewReader(*data))
source.SourceFormat = bigquery.DataFormat(g.conf.Format)
Expand Down
16 changes: 14 additions & 2 deletions internal/impl/gcp/output_bigquery_test.go
Expand Up @@ -175,6 +175,13 @@ func TestGCPBigQueryOutputConvertToIsoError(t *testing.T) {
}

func TestGCPBigQueryOutputCreateTableLoaderOk(t *testing.T) {
server := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`{"id" : "dataset_meow"}`))
}),
)
defer server.Close()

// Setting non-default values
outputConfig := gcpBigQueryConfFromYAML(t, `
project: project_meow
Expand All @@ -197,6 +204,11 @@ csv:
output, err := newGCPBigQueryOutput(outputConfig, nil)
require.NoError(t, err)

output.clientURL = gcpBQClientURL(server.URL)
err = output.Connect(context.Background())
defer output.Close(context.Background())
require.NoError(t, err)

var data = []byte("1,2,3")
loader := output.createTableLoader(&data)

Expand Down Expand Up @@ -246,7 +258,7 @@ table: table_meow
err = output.Connect(context.Background())
defer output.Close(context.Background())

require.EqualError(t, err, "dataset does not exists: dataset_meow")
require.EqualError(t, err, "dataset does not exist: dataset_meow")
}

func TestGCPBigQueryOutputDatasetDoNotExistsUnknownError(t *testing.T) {
Expand Down Expand Up @@ -305,7 +317,7 @@ create_disposition: CREATE_NEVER
err = output.Connect(context.Background())
defer output.Close(context.Background())

require.EqualError(t, err, "table does not exists: table_meow")
require.EqualError(t, err, "table does not exist: table_meow")
}

func TestGCPBigQueryOutputTableDoNotExistsUnknownError(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion website/docs/components/outputs/gcp_bigquery.md
Expand Up @@ -135,10 +135,11 @@ Batches can be formed at both the input and output level. You can find out more

### `project`

The project ID of the dataset to insert data to.
The project ID of the dataset to insert data to. If not set, it will be inferred from the credentials or read from the GOOGLE_CLOUD_PROJECT environment variable.


Type: `string`
Default: `""`

### `dataset`

Expand Down

0 comments on commit 1231c4f

Please sign in to comment.