forked from GoogleCloudPlatform/DataflowTemplates
/
JdbcToBigQuery.java
135 lines (123 loc) · 6.08 KB
/
JdbcToBigQuery.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.templates.common.JdbcConverters;
import com.google.cloud.teleport.util.GCSAwareValueProvider;
import com.google.cloud.teleport.util.KMSEncryptedNestedValueProvider;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A template that copies data from a relational database using JDBC to an existing BigQuery table.
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Jdbc_to_BigQuery.md">README</a>
* for instructions on how to use or modify this template.
*/
@Template(
name = "Jdbc_to_BigQuery",
category = TemplateCategory.BATCH,
displayName = "JDBC to BigQuery",
description =
"A pipeline that reads from a JDBC source and writes to a BigQuery table. JDBC connection string, user name and password can be passed in directly as plaintext or encrypted using the Google Cloud KMS API. If the parameter KMSEncryptionKey is specified, connectionURL, username, and password should be all in encrypted format.",
additionalHelp =
"A sample curl command for the KMS API encrypt endpoint: curl -s -X POST \"https://cloudkms.googleapis.com/v1/projects/your-project/locations/your-path/keyRings/your-keyring/cryptoKeys/your-key:encrypt\" -d \"{\\\"plaintext\\\":\\\"PasteBase64EncodedString\\\"}\" -H \"Authorization: Bearer $(gcloud auth application-default print-access-token)\" -H \"Content-Type: application/json\"",
optionsClass = JdbcConverters.JdbcToBigQueryOptions.class,
documentation =
"https://cloud.google.com/dataflow/docs/guides/templates/provided/jdbc-to-bigquery",
contactInformation = "https://cloud.google.com/support")
public class JdbcToBigQuery {
private static final Logger LOG = LoggerFactory.getLogger(JdbcToBigQuery.class);
private static ValueProvider<String> maybeDecrypt(
ValueProvider<String> unencryptedValue, ValueProvider<String> kmsKey) {
return new KMSEncryptedNestedValueProvider(unencryptedValue, kmsKey);
}
/**
* Main entry point for executing the pipeline. This will run the pipeline asynchronously. If
* blocking execution is required, use the {@link
* JdbcToBigQuery#run(JdbcConverters.JdbcToBigQueryOptions)} method to start the pipeline and
* invoke {@code result.waitUntilFinish()} on the {@link PipelineResult}
*
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
// Parse the user options passed from the command-line
JdbcConverters.JdbcToBigQueryOptions options =
PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(JdbcConverters.JdbcToBigQueryOptions.class);
run(options);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
private static PipelineResult run(JdbcConverters.JdbcToBigQueryOptions options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
/*
* Steps: 1) Read records via JDBC and convert to TableRow via RowMapper
* 2) Append TableRow to BigQuery via BigQueryIO
*/
JdbcIO.DataSourceConfiguration dataSourceConfiguration =
JdbcIO.DataSourceConfiguration.create(
options.getDriverClassName(),
maybeDecrypt(options.getConnectionURL(), options.getKMSEncryptionKey()))
.withUsername(maybeDecrypt(options.getUsername(), options.getKMSEncryptionKey()))
.withPassword(maybeDecrypt(options.getPassword(), options.getKMSEncryptionKey()))
.withDriverJars(options.getDriverJars());
if (options.getConnectionProperties() != null) {
dataSourceConfiguration =
dataSourceConfiguration.withConnectionProperties(options.getConnectionProperties());
}
pipeline
/*
* Step 1: Read records via JDBC and convert to TableRow
* via {@link org.apache.beam.sdk.io.jdbc.JdbcIO.RowMapper}
*/
.apply(
"Read from JdbcIO",
JdbcIO.<TableRow>read()
.withDataSourceConfiguration(dataSourceConfiguration)
.withQuery(new GCSAwareValueProvider(options.getQuery()))
.withCoder(TableRowJsonCoder.of())
.withRowMapper(JdbcConverters.getResultSetToTableRow(options.getUseColumnAlias())))
/*
* Step 2: Append TableRow to an existing BigQuery table
*/
.apply(
"Write to BigQuery",
BigQueryIO.writeTableRows()
.withoutValidation()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory())
.to(options.getOutputTable()));
// Execute the pipeline and return the result.
return pipeline.run();
}
}