Skip to content

Commit

Permalink
testing(bigquery/storage/managedwriter): extend table validation func…
Browse files Browse the repository at this point in the history
…tionality (#4517)

Soon we'll start tackling null values and complex schemas, so this PR augments the existing table validator with one that can be passed multiple validation constraints for evaluation (how many rows, how many nulls, cardinality, etc).  It updates the existing integration tests to use the new validator, and tightens validation to ensure that tests are propagating the appended values as expected.

Towards: #4366
  • Loading branch information
shollyman committed Jul 30, 2021
1 parent 73b6f5e commit 14e9829
Show file tree
Hide file tree
Showing 2 changed files with 210 additions and 47 deletions.
86 changes: 39 additions & 47 deletions bigquery/storage/managedwriter/integration_test.go
Expand Up @@ -47,6 +47,7 @@ var testSimpleSchema = bigquery.Schema{
{Name: "value", Type: bigquery.IntegerFieldType, Required: true},
}

// our test data has cardinality 5 for names, 3 for values
var testSimpleData = []*testdata.SimpleMessage{
{Name: "one", Value: 1},
{Name: "two", Value: 2},
Expand Down Expand Up @@ -80,32 +81,6 @@ func getTestClients(ctx context.Context, t *testing.T, opts ...option.ClientOpti
return client, bqClient
}

// validateRowCount confirms the number of rows in a table visible to the query engine.
func validateRowCount(ctx context.Context, t *testing.T, client *bigquery.Client, tbl *bigquery.Table, expectedRows int64, description string) {

// Verify data is present in the table with a count query.
sql := fmt.Sprintf("SELECT COUNT(1) FROM `%s`.%s.%s", tbl.ProjectID, tbl.DatasetID, tbl.TableID)
q := client.Query(sql)
it, err := q.Read(ctx)
if err != nil {
t.Errorf("failed to issue validation query %q: %v", description, err)
return
}
var rowdata []bigquery.Value
err = it.Next(&rowdata)
if err != nil {
t.Errorf("error fetching validation results %q: %v", description, err)
return
}
count, ok := rowdata[0].(int64)
if !ok {
t.Errorf("got unexpected data from validation query %q: %v", description, rowdata[0])
}
if count != expectedRows {
t.Errorf("rows mismatch from %q, expected rows: got %d want %d", description, count, expectedRows)
}
}

// setupTestDataset generates a unique dataset for testing, and a cleanup that can be deferred.
func setupTestDataset(ctx context.Context, t *testing.T, bqc *bigquery.Client) (ds *bigquery.Dataset, cleanup func(), err error) {
dataset := bqc.Dataset(datasetIDs.New())
Expand Down Expand Up @@ -156,7 +131,7 @@ func TestIntegration_ManagedWriter(t *testing.T) {
t.Parallel()
testDefaultStream(ctx, t, mwClient, bqClient, dataset)
})
t.Run("DefaultStream_DynamicJSON", func(t *testing.T) {
t.Run("DefaultStreamDynamicJSON", func(t *testing.T) {
t.Parallel()
testDefaultStreamDynamicJSON(ctx, t, mwClient, bqClient, dataset)
})
Expand Down Expand Up @@ -199,7 +174,8 @@ func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
validateRowCount(ctx, t, bqClient, testTable, 0, "before send")
validateTableConstraints(ctx, t, bqClient, testTable, "before send",
withExactRowCount(0))

// First, send the test rows individually.
var results []*AppendResult
Expand All @@ -216,8 +192,9 @@ func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl
}
// wait for the result to indicate ready, then validate.
results[0].Ready()
wantRows := int64(len(testSimpleData))
validateRowCount(ctx, t, bqClient, testTable, wantRows, "after first send")
validateTableConstraints(ctx, t, bqClient, testTable, "after first send round",
withExactRowCount(int64(len(testSimpleData))),
withDistinctValues("name", int64(len(testSimpleData))))

// Now, send the test rows grouped into in a single append.
var data [][]byte
Expand All @@ -232,10 +209,14 @@ func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl
t.Errorf("grouped-row append failed: %v", err)
}
}
// wait for the result to indicate ready, then validate again.
// wait for the result to indicate ready, then validate again. Our total rows have increased, but
// cardinality should not.
results[0].Ready()
wantRows = wantRows * 2
validateRowCount(ctx, t, bqClient, testTable, wantRows, "after second send")
validateTableConstraints(ctx, t, bqClient, testTable, "after second send round",
withExactRowCount(int64(2*len(testSimpleData))),
withDistinctValues("name", int64(len(testSimpleData))),
withDistinctValues("value", int64(3)),
)
}

func testDefaultStreamDynamicJSON(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
Expand All @@ -254,9 +235,10 @@ func testDefaultStreamDynamicJSON(ctx context.Context, t *testing.T, mwClient *C
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
validateRowCount(ctx, t, bqClient, testTable, 0, "before send")
validateTableConstraints(ctx, t, bqClient, testTable, "before send",
withExactRowCount(0))

sampleData := [][]byte{
sampleJSONData := [][]byte{
[]byte(`{"name": "one", "value": 1}`),
[]byte(`{"name": "two", "value": 2}`),
[]byte(`{"name": "three", "value": 3}`),
Expand All @@ -265,7 +247,7 @@ func testDefaultStreamDynamicJSON(ctx context.Context, t *testing.T, mwClient *C
}

var results []*AppendResult
for k, v := range sampleData {
for k, v := range sampleJSONData {
message := dynamicpb.NewMessage(md)

// First, json->proto message
Expand All @@ -286,8 +268,10 @@ func testDefaultStreamDynamicJSON(ctx context.Context, t *testing.T, mwClient *C

// wait for the result to indicate ready, then validate.
results[0].Ready()
wantRows := int64(len(sampleData))
validateRowCount(ctx, t, bqClient, testTable, wantRows, "after send")
validateTableConstraints(ctx, t, bqClient, testTable, "after send",
withExactRowCount(int64(len(sampleJSONData))),
withDistinctValues("name", int64(len(sampleJSONData))),
withDistinctValues("value", int64(len(sampleJSONData))))
}

func testBufferedStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
Expand Down Expand Up @@ -315,8 +299,8 @@ func testBufferedStream(ctx context.Context, t *testing.T, mwClient *Client, bqC
if info.GetType().String() != string(ms.StreamType()) {
t.Errorf("mismatch on stream type, got %s want %s", info.GetType(), ms.StreamType())
}

validateRowCount(ctx, t, bqClient, testTable, 0, "before send")
validateTableConstraints(ctx, t, bqClient, testTable, "before send",
withExactRowCount(0))

var expectedRows int64
for k, mesg := range testSimpleData {
Expand All @@ -334,14 +318,19 @@ func testBufferedStream(ctx context.Context, t *testing.T, mwClient *Client, bqC
if err != nil {
t.Errorf("got error from pending result %d: %v", k, err)
}
validateRowCount(ctx, t, bqClient, testTable, expectedRows, fmt.Sprintf("before flush %d", k))
validateTableConstraints(ctx, t, bqClient, testTable, fmt.Sprintf("before flush %d", k),
withExactRowCount(expectedRows),
withDistinctValues("name", expectedRows))

// move offset and re-validate.
flushOffset, err := ms.FlushRows(ctx, offset)
if err != nil {
t.Errorf("failed to flush offset to %d: %v", offset, err)
}
expectedRows = flushOffset + 1
validateRowCount(ctx, t, bqClient, testTable, expectedRows, fmt.Sprintf("after flush %d", k))
validateTableConstraints(ctx, t, bqClient, testTable, fmt.Sprintf("after flush %d", k),
withExactRowCount(expectedRows),
withDistinctValues("name", expectedRows))
}
}

Expand All @@ -363,7 +352,8 @@ func testCommittedStream(ctx context.Context, t *testing.T, mwClient *Client, bq
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
validateRowCount(ctx, t, bqClient, testTable, 0, "before send")
validateTableConstraints(ctx, t, bqClient, testTable, "before send",
withExactRowCount(0))

var results []*AppendResult
for k, mesg := range testSimpleData {
Expand All @@ -379,8 +369,8 @@ func testCommittedStream(ctx context.Context, t *testing.T, mwClient *Client, bq
}
// wait for the result to indicate ready, then validate.
results[0].Ready()
wantRows := int64(len(testSimpleData))
validateRowCount(ctx, t, bqClient, testTable, wantRows, "after send")
validateTableConstraints(ctx, t, bqClient, testTable, "after send",
withExactRowCount(int64(len(testSimpleData))))
}

func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
Expand All @@ -400,7 +390,8 @@ func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
validateRowCount(ctx, t, bqClient, testTable, 0, "before send")
validateTableConstraints(ctx, t, bqClient, testTable, "before send",
withExactRowCount(0))

// Send data.
var results []*AppendResult
Expand Down Expand Up @@ -436,7 +427,8 @@ func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl
if len(resp.StreamErrors) > 0 {
t.Errorf("stream errors present: %v", resp.StreamErrors)
}
validateRowCount(ctx, t, bqClient, testTable, wantRows, "after send")
validateTableConstraints(ctx, t, bqClient, testTable, "after send",
withExactRowCount(int64(len(testSimpleData))))
}

func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
Expand Down
171 changes: 171 additions & 0 deletions bigquery/storage/managedwriter/testutils_test.go
@@ -0,0 +1,171 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package managedwriter

import (
"bytes"
"context"
"fmt"
"testing"

"cloud.google.com/go/bigquery"
)

// validateTableConstraints is used to validate properties of a table by computing stats using the query engine.
func validateTableConstraints(ctx context.Context, t *testing.T, client *bigquery.Client, table *bigquery.Table, description string, opts ...constraintOption) {
vi := &validationInfo{
constraints: make(map[string]*constraint),
}

for _, o := range opts {
o(vi)
}

if len(vi.constraints) == 0 {
t.Errorf("%q: no constraints were specified", description)
return
}

sql := new(bytes.Buffer)
sql.WriteString("SELECT\n")
var i int
for _, c := range vi.constraints {
if i > 0 {
sql.WriteString(",")
}
sql.WriteString(c.projection)
i++
}
sql.WriteString(fmt.Sprintf("\nFROM `%s`.%s.%s", table.ProjectID, table.DatasetID, table.TableID))
q := client.Query(sql.String())
it, err := q.Read(ctx)
if err != nil {
t.Errorf("%q: failed to issue validation query: %v", description, err)
return
}
var resultrow []bigquery.Value
err = it.Next(&resultrow)
if err != nil {
t.Errorf("%q: failed to get result row: %v", description, err)
return
}

for colname, con := range vi.constraints {
off := -1
for k, v := range it.Schema {
if v.Name == colname {
off = k
break
}
}
if off == -1 {
t.Errorf("%q: missing constraint %q from results", description, colname)
continue
}
val, ok := resultrow[off].(int64)
if !ok {
t.Errorf("%q: constraint %q type mismatch", description, colname)
}
if con.allowedError == 0 {
if val != con.expectedValue {
t.Errorf("%q: constraint %q mismatch, got %d want %d", description, colname, val, con.expectedValue)
}
continue
}
res := val - con.expectedValue
if res < 0 {
res = -res
}
if res > con.allowedError {
t.Errorf("%q: constraint %q outside error bound %d, got %d want %d", description, colname, con.allowedError, val, con.expectedValue)
}
}
}

// constraint is a specific table constraint.
type constraint struct {
// sql fragment that projects a result value
projection string

// all validation constraints must eval as int64.
expectedValue int64

// if nonzero, the constraint value must be within allowedError distance of expectedValue.
allowedError int64
}

// validationInfo is keyed by the result column name.
type validationInfo struct {
constraints map[string]*constraint
}

// constraintOption is for building validation rules.
type constraintOption func(*validationInfo)

// withExactRowCount asserts the exact total row count of the table.
func withExactRowCount(totalRows int64) constraintOption {
return func(vi *validationInfo) {
resultCol := "total_rows"
vi.constraints[resultCol] = &constraint{
projection: fmt.Sprintf("COUNT(1) AS %s", resultCol),
expectedValue: totalRows,
}
}
}

// withNullCount asserts the number of null values in a column.
func withNullCount(colname string, nullcount int64) constraintOption {
return func(vi *validationInfo) {
resultCol := fmt.Sprintf("nullcol_count_%s", colname)
vi.constraints[resultCol] = &constraint{
projection: fmt.Sprintf("COUNTIF(ISNULL(%s)) AS %s", colname, resultCol),
expectedValue: nullcount,
}
}
}

// withNonNullCount asserts the number of non null values in a column.
func withNonNullCount(colname string, nullcount int64) constraintOption {
return func(vi *validationInfo) {
resultCol := fmt.Sprintf("nonnullcol_count_%s", colname)
vi.constraints[resultCol] = &constraint{
projection: fmt.Sprintf("COUNTIF(NOT ISNULL(%s)) AS %s", colname, resultCol),
expectedValue: nullcount,
}
}
}

// withDistinctValues validates the exact cardinality of a column.
func withDistinctValues(colname string, distinctVals int64) constraintOption {
return func(vi *validationInfo) {
resultCol := fmt.Sprintf("distinct_count_%s", colname)
vi.constraints[resultCol] = &constraint{
projection: fmt.Sprintf("COUNT(DISTINCT %s) AS %s", colname, resultCol),
expectedValue: distinctVals,
}
}
}

// withApproxDistinctValues validates the approximate cardinality of a column with an error bound.
func withApproxDistinctValues(colname string, approxValues int64, errorBound int64) constraintOption {
return func(vi *validationInfo) {
resultCol := fmt.Sprintf("distinct_count_%s", colname)
vi.constraints[resultCol] = &constraint{
projection: fmt.Sprintf("APPROX_COUNT_DISTINCT(%s) AS %s", colname, resultCol),
expectedValue: approxValues,
allowedError: errorBound,
}
}
}

0 comments on commit 14e9829

Please sign in to comment.