Skip to content

Commit

Permalink
feat(bigquery): add reference file schema option for federated formats (
Browse files Browse the repository at this point in the history
#6693)

Resolves internal b/246809553
  • Loading branch information
alvarowolfx committed Sep 20, 2022
1 parent 17cceeb commit 3d26091
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 0 deletions.
6 changes: 6 additions & 0 deletions bigquery/external.go
Expand Up @@ -109,6 +109,10 @@ type ExternalDataConfig struct {
// Connections are managed through the BigQuery Connection API:
// https://pkg.go.dev/cloud.google.com/go/bigquery/connection/apiv1
ConnectionID string

// When creating an external table, the user can provide a reference file with the table schema.
// This is enabled for the following formats: AVRO, PARQUET, ORC.
ReferenceFileSchemaURI string
}

func (e *ExternalDataConfig) toBQ() bq.ExternalDataConfiguration {
Expand All @@ -121,6 +125,7 @@ func (e *ExternalDataConfig) toBQ() bq.ExternalDataConfiguration {
MaxBadRecords: e.MaxBadRecords,
HivePartitioningOptions: e.HivePartitioningOptions.toBQ(),
ConnectionId: e.ConnectionID,
ReferenceFileSchemaUri: e.ReferenceFileSchemaURI,
}
if e.Schema != nil {
q.Schema = e.Schema.toBQ()
Expand All @@ -145,6 +150,7 @@ func bqToExternalDataConfig(q *bq.ExternalDataConfiguration) (*ExternalDataConfi
Schema: bqToSchema(q.Schema),
HivePartitioningOptions: bqToHivePartitioningOptions(q.HivePartitioningOptions),
ConnectionID: q.ConnectionId,
ReferenceFileSchemaURI: q.ReferenceFileSchemaUri,
}
for _, v := range q.DecimalTargetTypes {
e.DecimalTargetTypes = append(e.DecimalTargetTypes, DecimalTargetType(v))
Expand Down
94 changes: 94 additions & 0 deletions bigquery/integration_test.go
Expand Up @@ -1393,6 +1393,100 @@ func TestIntegration_Load(t *testing.T) {

}

func TestIntegration_LoadWithReferenceSchemaFile(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
}

formats := []DataFormat{Avro, Parquet}
for _, format := range formats {
ctx := context.Background()
table := dataset.Table(tableIDs.New())
defer table.Delete(ctx)

expectedSchema := Schema{
{Name: "username", Type: StringFieldType, Required: false},
{Name: "tweet", Type: StringFieldType, Required: false},
{Name: "timestamp", Type: StringFieldType, Required: false},
{Name: "likes", Type: IntegerFieldType, Required: false},
}
ext := strings.ToLower(string(format))
sourceURIs := []string{
"gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/a-twitter." + ext,
"gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/b-twitter." + ext,
"gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/c-twitter." + ext,
}
referenceURI := "gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/a-twitter." + ext
source := NewGCSReference(sourceURIs...)
source.SourceFormat = format
loader := table.LoaderFrom(source)
loader.ReferenceFileSchemaURI = referenceURI
job, err := loader.Run(ctx)
if err != nil {
t.Fatalf("loader.Run: %v", err)
}
err = wait(ctx, job)
if err != nil {
t.Fatalf("wait: %v", err)
}
metadata, err := table.Metadata(ctx)
if err != nil {
t.Fatalf("table.Metadata: %v", err)
}
diff := testutil.Diff(expectedSchema, metadata.Schema)
if diff != "" {
t.Errorf("got=-, want=+:\n%s", diff)
}
}
}

func TestIntegration_ExternalTableWithReferenceSchemaFile(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
}

formats := []DataFormat{Avro, Parquet}
for _, format := range formats {
ctx := context.Background()
externalTable := dataset.Table(tableIDs.New())
defer externalTable.Delete(ctx)

expectedSchema := Schema{
{Name: "username", Type: StringFieldType, Required: false},
{Name: "tweet", Type: StringFieldType, Required: false},
{Name: "timestamp", Type: StringFieldType, Required: false},
{Name: "likes", Type: IntegerFieldType, Required: false},
}
ext := strings.ToLower(string(format))
sourceURIs := []string{
"gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/a-twitter." + ext,
"gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/b-twitter." + ext,
"gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/c-twitter." + ext,
}
referenceURI := "gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/a-twitter." + ext

err := externalTable.Create(ctx, &TableMetadata{
ExternalDataConfig: &ExternalDataConfig{
SourceFormat: format,
SourceURIs: sourceURIs,
ReferenceFileSchemaURI: referenceURI,
},
})
if err != nil {
t.Fatalf("table.Create: %v", err)
}

metadata, err := externalTable.Metadata(ctx)
if err != nil {
t.Fatalf("table.Metadata: %v", err)
}
diff := testutil.Diff(expectedSchema, metadata.Schema)
if diff != "" {
t.Errorf("got=-, want=+:\n%s", diff)
}
}
}

func TestIntegration_DML(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
Expand Down
6 changes: 6 additions & 0 deletions bigquery/load.go
Expand Up @@ -88,6 +88,10 @@ type LoadConfig struct {
// Experimental: this option is experimental and may be modified or removed in future versions,
// regardless of any other documented package stability guarantees.
JobTimeout time.Duration

// When loading a table with external data, the user can provide a reference file with the table schema.
// This is enabled for the following formats: AVRO, PARQUET, ORC.
ReferenceFileSchemaURI string
}

func (l *LoadConfig) toBQ() (*bq.JobConfiguration, io.Reader) {
Expand All @@ -105,6 +109,7 @@ func (l *LoadConfig) toBQ() (*bq.JobConfiguration, io.Reader) {
UseAvroLogicalTypes: l.UseAvroLogicalTypes,
ProjectionFields: l.ProjectionFields,
HivePartitioningOptions: l.HivePartitioningOptions.toBQ(),
ReferenceFileSchemaUri: l.ReferenceFileSchemaURI,
},
JobTimeoutMs: l.JobTimeout.Milliseconds(),
}
Expand All @@ -129,6 +134,7 @@ func bqToLoadConfig(q *bq.JobConfiguration, c *Client) *LoadConfig {
UseAvroLogicalTypes: q.Load.UseAvroLogicalTypes,
ProjectionFields: q.Load.ProjectionFields,
HivePartitioningOptions: bqToHivePartitioningOptions(q.Load.HivePartitioningOptions),
ReferenceFileSchemaURI: q.Load.ReferenceFileSchemaUri,
}
if q.JobTimeoutMs > 0 {
lc.JobTimeout = time.Duration(q.JobTimeoutMs) * time.Millisecond
Expand Down
17 changes: 17 additions & 0 deletions bigquery/load_test.go
Expand Up @@ -388,6 +388,23 @@ func TestLoad(t *testing.T) {
return j
}(),
},
{
dst: c.Dataset("dataset-id").Table("table-id"),
src: func() *GCSReference {
g := NewGCSReference("uri")
g.SourceFormat = Parquet
return g
}(),
config: LoadConfig{
ReferenceFileSchemaURI: "schema.parquet",
},
want: func() *bq.Job {
j := defaultLoadJob()
j.Configuration.Load.SourceFormat = "PARQUET"
j.Configuration.Load.ReferenceFileSchemaUri = "schema.parquet"
return j
}(),
},
}

for i, tc := range testCases {
Expand Down

0 comments on commit 3d26091

Please sign in to comment.