Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Infer the Project ID when using the GCP BigQuery output #968

Merged
merged 1 commit into from Nov 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 9 additions & 6 deletions internal/impl/gcp/output_bigquery.go
Expand Up @@ -66,6 +66,9 @@ func gcpBigQueryOutputConfigFromParsed(conf *service.ParsedConfig) (gconf gcpBig
if gconf.ProjectID, err = conf.FieldString("project"); err != nil {
return
}
if gconf.ProjectID == "" {
gconf.ProjectID = bigquery.DetectProjectID
}
if gconf.DatasetID, err = conf.FieldString("dataset"); err != nil {
return
}
Expand Down Expand Up @@ -146,7 +149,7 @@ The same is true for the CSV format.
### CSV

For the CSV format when the field `+"`csv.header`"+` is specified a header row will be inserted as the first line of each message batch. If this field is not provided then the first message of each message batch must include a header line.`)).
Field(service.NewStringField("project").Description("The project ID of the dataset to insert data to.")).
Field(service.NewStringField("project").Description("The project ID of the dataset to insert data to. If not set, it will be inferred from the credentials or read from the GOOGLE_CLOUD_PROJECT environment variable.").Default("")).
Field(service.NewStringField("dataset").Description("The BigQuery Dataset ID.")).
Field(service.NewStringField("table").Description("The table to insert messages to.")).
Field(service.NewStringEnumField("format", string(bigquery.JSON), string(bigquery.CSV)).
Expand Down Expand Up @@ -304,10 +307,10 @@ func (g *gcpBigQueryOutput) Connect(ctx context.Context) (err error) {
}
}()

dataset := client.DatasetInProject(g.conf.ProjectID, g.conf.DatasetID)
dataset := client.DatasetInProject(client.Project(), g.conf.DatasetID)
if _, err = dataset.Metadata(ctx); err != nil {
if hasStatusCode(err, http.StatusNotFound) {
err = fmt.Errorf("dataset does not exists: %v", g.conf.DatasetID)
err = fmt.Errorf("dataset does not exist: %v", g.conf.DatasetID)
} else {
err = fmt.Errorf("error checking dataset existence: %w", err)
}
Expand All @@ -318,7 +321,7 @@ func (g *gcpBigQueryOutput) Connect(ctx context.Context) (err error) {
table := dataset.Table(g.conf.TableID)
if _, err = table.Metadata(ctx); err != nil {
if hasStatusCode(err, http.StatusNotFound) {
err = fmt.Errorf("table does not exists: %v", g.conf.TableID)
err = fmt.Errorf("table does not exist: %v", g.conf.TableID)
} else {
err = fmt.Errorf("error checking table existence: %w", err)
}
Expand All @@ -327,7 +330,7 @@ func (g *gcpBigQueryOutput) Connect(ctx context.Context) (err error) {
}

g.client = client
g.log.Infof("Inserting messages as objects to GCP BigQuery: %v:%v:%v\n", g.conf.ProjectID, g.conf.DatasetID, g.conf.TableID)
g.log.Infof("Inserting messages as objects to GCP BigQuery: %v:%v:%v\n", client.Project(), g.conf.DatasetID, g.conf.TableID)
return nil
}

Expand Down Expand Up @@ -380,7 +383,7 @@ func (g *gcpBigQueryOutput) WriteBatch(ctx context.Context, batch service.Messag
}

func (g *gcpBigQueryOutput) createTableLoader(data *[]byte) *bigquery.Loader {
table := g.client.DatasetInProject(g.conf.ProjectID, g.conf.DatasetID).Table(g.conf.TableID)
table := g.client.DatasetInProject(g.client.Project(), g.conf.DatasetID).Table(g.conf.TableID)

source := bigquery.NewReaderSource(bytes.NewReader(*data))
source.SourceFormat = bigquery.DataFormat(g.conf.Format)
Expand Down
16 changes: 14 additions & 2 deletions internal/impl/gcp/output_bigquery_test.go
Expand Up @@ -175,6 +175,13 @@ func TestGCPBigQueryOutputConvertToIsoError(t *testing.T) {
}

func TestGCPBigQueryOutputCreateTableLoaderOk(t *testing.T) {
server := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`{"id" : "dataset_meow"}`))
}),
)
defer server.Close()

// Setting non-default values
outputConfig := gcpBigQueryConfFromYAML(t, `
project: project_meow
Expand All @@ -197,6 +204,11 @@ csv:
output, err := newGCPBigQueryOutput(outputConfig, nil)
require.NoError(t, err)

output.clientURL = gcpBQClientURL(server.URL)
err = output.Connect(context.Background())
defer output.Close(context.Background())
require.NoError(t, err)

var data = []byte("1,2,3")
loader := output.createTableLoader(&data)

Expand Down Expand Up @@ -246,7 +258,7 @@ table: table_meow
err = output.Connect(context.Background())
defer output.Close(context.Background())

require.EqualError(t, err, "dataset does not exists: dataset_meow")
require.EqualError(t, err, "dataset does not exist: dataset_meow")
}

func TestGCPBigQueryOutputDatasetDoNotExistsUnknownError(t *testing.T) {
Expand Down Expand Up @@ -305,7 +317,7 @@ create_disposition: CREATE_NEVER
err = output.Connect(context.Background())
defer output.Close(context.Background())

require.EqualError(t, err, "table does not exists: table_meow")
require.EqualError(t, err, "table does not exist: table_meow")
}

func TestGCPBigQueryOutputTableDoNotExistsUnknownError(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion website/docs/components/outputs/gcp_bigquery.md
Expand Up @@ -135,10 +135,11 @@ Batches can be formed at both the input and output level. You can find out more

### `project`

The project ID of the dataset to insert data to.
The project ID of the dataset to insert data to. If not set, it will be inferred from the credentials or read from the GOOGLE_CLOUD_PROJECT environment variable.


Type: `string`
Default: `""`

### `dataset`

Expand Down