Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery): add remote function options to routine metadata #6702

Merged
merged 4 commits into from Sep 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 11 additions & 0 deletions bigquery/integration_test.go
Expand Up @@ -29,6 +29,7 @@ import (
"testing"
"time"

connection "cloud.google.com/go/bigquery/connection/apiv1"
"cloud.google.com/go/civil"
datacatalog "cloud.google.com/go/datacatalog/apiv1"
"cloud.google.com/go/httpreplay"
Expand All @@ -54,6 +55,7 @@ var record = flag.Bool("record", false, "record RPCs")
var (
client *Client
storageClient *storage.Client
connectionsClient *connection.Client
policyTagManagerClient *datacatalog.PolicyTagManagerClient
dataset *Dataset
otherDataset *Dataset
Expand Down Expand Up @@ -123,6 +125,10 @@ func initIntegrationTest() func() {
if err != nil {
log.Fatal(err)
}
connectionsClient, err = connection.NewClient(ctx, option.WithHTTPClient(hc))
if err != nil {
log.Fatal(err)
}
policyTagManagerClient, err = datacatalog.NewPolicyTagManagerClient(ctx)
if err != nil {
log.Fatal(err)
Expand All @@ -140,6 +146,7 @@ func initIntegrationTest() func() {
}
client = nil
storageClient = nil
connectionsClient = nil
return func() {}

default: // Run integration tests against a real backend.
Expand Down Expand Up @@ -203,6 +210,10 @@ func initIntegrationTest() func() {
if err != nil {
log.Fatalf("datacatalog.NewPolicyTagManagerClient: %v", err)
}
connectionsClient, err = connection.NewClient(ctx, sOpts...)
if err != nil {
log.Fatalf("connection.NewService: %v", err)
}
c := initTestState(client, now)
return func() { c(); cleanup() }
}
Expand Down
87 changes: 86 additions & 1 deletion bigquery/routine.go
Expand Up @@ -163,6 +163,15 @@ const (
NotDeterministic RoutineDeterminism = "NOT_DETERMINISTIC"
)

const (
// ScalarFunctionRoutine scalar function routine type
ScalarFunctionRoutine = "SCALAR_FUNCTION"
shollyman marked this conversation as resolved.
Show resolved Hide resolved
// ProcedureRoutine procedure routine type
ProcedureRoutine = "PROCEDURE"
// TableValuedFunctionRoutine routine type for table valued functions
TableValuedFunctionRoutine = "TABLE_VALUED_FUNCTION"
)

// RoutineMetadata represents details of a given BigQuery Routine.
type RoutineMetadata struct {
ETag string
Expand All @@ -177,7 +186,11 @@ type RoutineMetadata struct {
// Language of the routine, such as SQL or JAVASCRIPT.
Language string
// The list of arguments for the the routine.
Arguments []*RoutineArgument
Arguments []*RoutineArgument

// Information for a remote user-defined function.
RemoteFunctionOptions *RemoteFunctionOptions

ReturnType *StandardSQLDataType

// Set only if the routine type is TABLE_VALUED_FUNCTION.
Expand All @@ -195,6 +208,66 @@ type RoutineMetadata struct {
Body string
}

// RemoteFunctionOptions contains information for a remote user-defined function.
type RemoteFunctionOptions struct {

// Fully qualified name of the user-provided connection object which holds
// the authentication information to send requests to the remote service.
// Format:
// projects/{projectId}/locations/{locationId}/connections/{connectionId}
Connection string

// Endpoint of the user-provided remote service (e.g. a function url in
// Google Cloud Function or Cloud Run )
Endpoint string

// Max number of rows in each batch sent to the remote service.
// If absent or if 0, it means no limit.
MaxBatchingRows int64

// User-defined context as a set of key/value pairs,
// which will be sent as function invocation context together with
// batched arguments in the requests to the remote service. The total
// number of bytes of keys and values must be less than 8KB.
UserDefinedContext map[string]string
}

func bqToRemoteFunctionOptions(in *bq.RemoteFunctionOptions) (*RemoteFunctionOptions, error) {
if in == nil {
return nil, nil
}
rfo := &RemoteFunctionOptions{
Connection: in.Connection,
Endpoint: in.Endpoint,
MaxBatchingRows: in.MaxBatchingRows,
}
if in.UserDefinedContext != nil {
rfo.UserDefinedContext = make(map[string]string)
for k, v := range in.UserDefinedContext {
rfo.UserDefinedContext[k] = v
}
}
return rfo, nil
}

func (rfo *RemoteFunctionOptions) toBQ() (*bq.RemoteFunctionOptions, error) {
if rfo == nil {
return nil, nil
}
r := &bq.RemoteFunctionOptions{
Connection: rfo.Connection,
Endpoint: rfo.Endpoint,
MaxBatchingRows: rfo.MaxBatchingRows,
}
if rfo.UserDefinedContext != nil {
r.UserDefinedContext = make(map[string]string)
for k, v := range rfo.UserDefinedContext {
r.UserDefinedContext[k] = v
}
}
return r, nil
}

func (rm *RoutineMetadata) toBQ() (*bq.Routine, error) {
r := &bq.Routine{}
if rm == nil {
Expand Down Expand Up @@ -227,6 +300,13 @@ func (rm *RoutineMetadata) toBQ() (*bq.Routine, error) {
}
r.Arguments = args
r.ImportedLibraries = rm.ImportedLibraries
if rm.RemoteFunctionOptions != nil {
rfo, err := rm.RemoteFunctionOptions.toBQ()
if err != nil {
return nil, err
}
r.RemoteFunctionOptions = rfo
}
if !rm.CreationTime.IsZero() {
return nil, errors.New("cannot set CreationTime on create")
}
Expand Down Expand Up @@ -436,6 +516,11 @@ func bqToRoutineMetadata(r *bq.Routine) (*RoutineMetadata, error) {
return nil, err
}
meta.ReturnType = ret
rfo, err := bqToRemoteFunctionOptions(r.RemoteFunctionOptions)
if err != nil {
return nil, err
}
meta.RemoteFunctionOptions = rfo
tt, err := bqToStandardSQLTableType(r.ReturnTableType)
if err != nil {
return nil, err
Expand Down
80 changes: 80 additions & 0 deletions bigquery/routine_integration_test.go
Expand Up @@ -21,6 +21,7 @@ import (

"cloud.google.com/go/internal/testutil"
"google.golang.org/api/iterator"
"google.golang.org/genproto/googleapis/cloud/bigquery/connection/v1"
)

func TestIntegration_RoutineScalarUDF(t *testing.T) {
Expand Down Expand Up @@ -88,6 +89,85 @@ func TestIntegration_RoutineJSUDF(t *testing.T) {
}
}

func TestIntegration_RoutineRemoteUDF(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
}
ctx := context.Background()

routineID := routineIDs.New()
routine := dataset.Routine(routineID)
uri := "https://aaabbbccc-uc.a.run.app"

connectionLocation := fmt.Sprintf("projects/%s/locations/%s", dataset.ProjectID, "us")
connectionName := fmt.Sprintf("udf_conn%s", routineID)
cleanupConnection, connectionID, err := createConnection(ctx, t, connectionLocation, connectionName)
if err != nil {
t.Fatal(err)
}
defer cleanupConnection()

remoteOpts := &RemoteFunctionOptions{
Endpoint: uri,
Connection: connectionID,
MaxBatchingRows: 50,
UserDefinedContext: map[string]string{"foo": "bar"},
}
meta := &RoutineMetadata{
RemoteFunctionOptions: remoteOpts,
Description: "defines a remote function",
Type: ScalarFunctionRoutine,
ReturnType: &StandardSQLDataType{
TypeKind: "STRING",
},
}
if err := routine.Create(ctx, meta); err != nil {
t.Fatalf("routine.Create: %v", err)
}

gotMeta, err := routine.Metadata(ctx)
if err != nil {
t.Fatalf("routine.Metadata: %v", err)
}

if diff := testutil.Diff(gotMeta.RemoteFunctionOptions, remoteOpts); diff != "" {
t.Fatalf("RemoteFunctionOptions: -got, +want:\n%s", diff)
}
}

func createConnection(ctx context.Context, t *testing.T, parent, name string) (cleanup func(), connectionID string, err error) {
fullname := fmt.Sprintf("%s/connections/%s", parent, name)
conn, err := connectionsClient.CreateConnection(ctx, &connection.CreateConnectionRequest{
Parent: parent,
ConnectionId: name,
Connection: &connection.Connection{
FriendlyName: name,
Properties: &connection.Connection_CloudResource{
CloudResource: &connection.CloudResourceProperties{},
},
},
})
if err != nil {
return
}
conn, err = connectionsClient.GetConnection(ctx, &connection.GetConnectionRequest{
Name: fullname,
})
if err != nil {
return
}
cleanup = func() {
err := connectionsClient.DeleteConnection(ctx, &connection.DeleteConnectionRequest{
Name: fullname,
})
if err != nil {
t.Logf("could not delete connection: %s", fullname)
}
}
connectionID = conn.Name
return
}

func TestIntegration_RoutineComplexTypes(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
Expand Down