Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New add_load_date_suffix import option. #681

Merged
merged 3 commits into from
Aug 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
42 changes: 30 additions & 12 deletions tools/asset-inventory/asset_inventory/import_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
from datetime import datetime
import json
import logging
import random
import pprint
import random

import apache_beam as beam
from apache_beam.io import ReadFromText
Expand Down Expand Up @@ -294,10 +294,17 @@ def finish_bundle(self):
class BigQueryDoFn(beam.DoFn):
"""Superclass for a DoFn that requires BigQuery dataset information."""

def __init__(self, dataset):
def __init__(self, dataset, add_load_date_suffix, load_time):
if isinstance(dataset, string_types):
dataset = StaticValueProvider(str, dataset)
self.dataset = dataset
if isinstance(add_load_date_suffix, string_types):
add_load_date_suffix = StaticValueProvider(
str, add_load_date_suffix)
self.add_load_date_suffix = add_load_date_suffix
if isinstance(load_time, string_types):
load_time = StaticValueProvider(str, load_time)
self.load_time = load_time
self.bigquery_client = None
self.dataset_location = None
self.load_jobs = {}
Expand All @@ -316,7 +323,12 @@ def get_dataset_location(self):
return None

def asset_type_to_table_name(self, asset_type):
return asset_type.replace('.', '_').replace('/', '_')
suffix = ''
add_load_date_suffix = self.add_load_date_suffix.get()
if (add_load_date_suffix and
add_load_date_suffix.lower() in ('yes', 'true', 't', '1')):
suffix = '_' + self.load_time.get()[0:10].replace('-', '')
return asset_type.replace('.', '_').replace('/', '_') + suffix

def start_bundle(self):
if not self.bigquery_client:
Expand All @@ -332,11 +344,12 @@ class DeleteDataSetTables(BigQueryDoFn):
dataset before loading so that no old asset types remain.
"""

def __init__(self, dataset, write_disposition):
def __init__(self, dataset, add_load_date_suffix, load_time,
write_disposition):
# Can't use super().
# https://issues.apache.org/jira/browse/BEAM-6158?focusedCommentId=16919945
# super(DeleteDataSetTables, self).__init__(dataset)
BigQueryDoFn.__init__(self, dataset)
BigQueryDoFn.__init__(self, dataset, add_load_date_suffix, load_time)
if isinstance(write_disposition, string_types):
write_disposition = StaticValueProvider(str, write_disposition)
self.write_disposition = write_disposition
Expand All @@ -362,14 +375,11 @@ class LoadToBigQuery(BigQueryDoFn):
this must be done within the workers.
"""

def __init__(self, dataset, load_time):
def __init__(self, dataset, add_load_date_suffix, load_time):
# Can't use super().
# https://issues.apache.org/jira/browse/BEAM-6158?focusedCommentId=16919945
# super(LoadToBigQuery, self).__init__(dataset)
BigQueryDoFn.__init__(self, dataset)
if isinstance(load_time, string_types):
load_time = StaticValueProvider(str, load_time)
self.load_time = load_time
BigQueryDoFn.__init__(self, dataset, add_load_date_suffix, load_time)

def to_bigquery_schema(self, fields):
"""Convert list of dicts into `bigquery.SchemaFields`."""
Expand Down Expand Up @@ -463,6 +473,11 @@ def _add_argparse_args(cls, parser):
default=datetime.now().isoformat(),
help='Load time of the data (YYYY-MM-DD[HH:MM:SS])).')

parser.add_value_provider_argument(
'--add_load_date_suffix',
default='False',
help='If the load date [YYYYMMDD] is added as a table suffix.')

parser.add_value_provider_argument(
'--dataset', help='BigQuery dataset to load to.')

Expand Down Expand Up @@ -508,9 +523,12 @@ def run(argv=None):
WriteToGCS(options.stage, options.load_time))
| 'group_written_objects_by_key' >> beam.GroupByKey()
| 'delete_tables' >> beam.ParDo(
DeleteDataSetTables(options.dataset, options.write_disposition))
DeleteDataSetTables(options.dataset, options.add_load_date_suffix,
options.load_time,
options.write_disposition))
| 'load_to_bigquery' >> beam.ParDo(
LoadToBigQuery(options.dataset, options.load_time),
LoadToBigQuery(options.dataset, options.add_load_date_suffix,
options.load_time),
beam.pvalue.AsDict(schemas)))

return p.run()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@
"help_text": "Either 'WRITE_APPEND' or WRITE_EMPTY.",
"regexes": ["^(WRITE_APPEND|WRITE_EMPTY)$"]
},
{
"name": "add_load_date_suffix",
"label": "If the load date [YYYYMMDD] is added as a table suffix.",
"help_text": "Either 'true' or 'false'.",
"isOptional": true,
"regexes": ["^(true|false)$"]
},
{
"name": "dataset",
"label": "BigQuery dataset to load to.",
Expand Down
10 changes: 8 additions & 2 deletions tools/asset-inventory/asset_inventory/pipeline_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ def wait_on_pipeline_job(df_service, pipeline_job):

def run_pipeline_template(dataflow_project, template_region, template_location,
input_location, group_by, write_disposition, dataset,
stage, load_time, num_shards, runtime_environment):
stage, load_time, num_shards, add_load_date_suffix,
runtime_environment):
"""Invoke the suplied pipeline template.

Args:
Expand All @@ -81,6 +82,7 @@ def run_pipeline_template(dataflow_project, template_region, template_location,
stage: GCS path to write BigQuery load files.
load_time: Timestamp or date to load data with.
num_shards: Shards for for each asset type.
add_load_date_suffix: If the load date is added as a table suffix.
runtime_environment: Dict suppling other runtime overrides.
Returns:
End state of the pipline and job object.
Expand All @@ -100,6 +102,7 @@ def run_pipeline_template(dataflow_project, template_region, template_location,
'group_by': group_by,
'write_disposition': write_disposition,
'num_shards': num_shards,
'add_load_date_suffix': add_load_date_suffix,
'dataset': dataset,
},
'environment': runtime_environment
Expand All @@ -118,7 +121,8 @@ def run_pipeline_template(dataflow_project, template_region, template_location,

def run_pipeline_beam_runner(pipeline_runner, dataflow_project, input_location,
group_by, write_disposition, dataset, stage,
load_time, num_shards, pipeline_arguments):
load_time, num_shards, add_load_date_suffix,
pipeline_arguments):
"""Invokes the pipeline with a beam runner.

Only tested with the dataflow and direct runners.
Expand All @@ -133,6 +137,7 @@ def run_pipeline_beam_runner(pipeline_runner, dataflow_project, input_location,
stage: GCS path to write BigQuery load files.
load_time: Timestamp to add to data during during BigQuery load.
num_shards: Shards for for each asset type.
add_load_date_suffix: If the load date is added as a table suffix.
pipeline_arguments: List of additional runner arguments.
Returns:
The end state of the pipeline run (a string), and PipelineResult.
Expand All @@ -155,6 +160,7 @@ def run_pipeline_beam_runner(pipeline_runner, dataflow_project, input_location,
'--group_by': group_by,
'--write_disposition': write_disposition,
'--num_shards': num_shards,
'--add_load_date_suffix': add_load_date_suffix,
'--dataset': dataset,
'--stage': stage,
'--runner': pipeline_runner
Expand Down
3 changes: 3 additions & 0 deletions tools/asset-inventory/gae/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,8 @@ import_write_disposition: WRITE_APPEND
# *=1,resource=100,google.cloud.bigquery.Table=100
import_num_shards: "*=1"

# If the load date [YYYYMMDD] is added as a table suffix.
import_add_load_date_suffix: False

# If we are running on App Engine and only want to be invoked by cron tasks for security reasons.
restrict_to_cron_tasks: True
9 changes: 6 additions & 3 deletions tools/asset-inventory/gae/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def get_import_arguments():
CONFIG.import_template_region, CONFIG.import_template_location,
'{}/*.json'.format(CONFIG.gcs_destination), CONFIG.import_group_by,
CONFIG.import_write_disposition, CONFIG.import_dataset,
CONFIG.import_add_load_date_suffix,
CONFIG.import_stage,
datetime.datetime.now().isoformat(),
CONFIG.import_num_shards,
Expand All @@ -87,7 +88,8 @@ def run_import():
import_arguments = get_import_arguments()
logging.info('running import %s', import_arguments)
(runner, dataflow_project, template_region, template_location,
input_location, group_by, write_disposition, dataset, stage, load_time,
input_location, group_by, write_disposition, dataset,
add_load_date_suffix, stage, load_time,
num_shards, pipeline_arguments,
pipeline_runtime_environment) = import_arguments

Expand All @@ -96,12 +98,13 @@ def run_import():
dataflow_project, template_region,
template_location, input_location,
group_by, write_disposition, dataset, stage,
load_time, num_shards, pipeline_runtime_environment)
load_time, num_shards, add_load_date_suffix,
pipeline_runtime_environment)
else:
return pipeline_runner.run_pipeline_beam_runner(
runner, dataflow_project, input_location,
group_by, write_disposition, dataset, stage, load_time,
num_shards, pipeline_arguments)
num_shards, add_load_date_suffix, pipeline_arguments)


@app.route('/export_import')
Expand Down