Skip to content

Latest commit

 

History

History
321 lines (259 loc) · 32.4 KB

PUSHDOWN.md

File metadata and controls

321 lines (259 loc) · 32.4 KB

#Spark to BigQuery Query Pushdown Usage Instructions and Technical Details

Query Pushdown is an advanced optimization technique in which Spark transformations/actions (hereby referred to as “Spark operations”) performed for reading data into Spark from BigQuery are translated into SQL queries and pushed down to BigQuery from the open-source spark-bigquery-connector (hereby referred to as “connector”) enabling improved read performance. With BigQuery as the data source for Spark, the connector can push large and complex Spark SQL queries to be processed in BigQuery thus bringing the computation next to the data and reducing the I/O overhead. This capability combines the robust query-processing of BigQuery with the computational capabilities of Spark and its ecosystem.

Usage

spark-bigquery-connector release 0.27.1 contains the full Query Pushdown functionality. It is released as a Preview release. Query Pushdown can be used with both Spark SQL and DataFrame APIs.

Please take a look at the documentation for Connector to Spark Compatibility Matrix and Connector to Dataproc Image Compatibility Matrix to determine which connector to use for your particular use case.

Note: Query pushdown is not enabled on the Spark session by default. It has to be explicitly enabled.

Note: Query Pushdown is either enabled or disabled on the entire Spark session rather than on particular read queries. If you want to run some queries with Query Pushdown enabled and others with Query Pushdown disabled, you will need to explicitly enable/disable the Query Pushdown functionality as shown in the next section. To validate if the Query Pushdown was used for a particular query, use the Section “Validating/Debugging Query Pushdown”

Enable/Disable Query Pushdown from Python/Pyspark/Jupyter

To enable query pushdown from Python, you will need to create a Spark session with the correct jar and call BigQueryConnectorUtils.enablePushdownSession on the Spark context.

Python code for enabling Query Pushdown

from pyspark.sql import SparkSession
from pyspark import SparkFiles

spark = SparkSession.builder \
.appName('Pushdown demo') \
.config('spark.jars','gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.27.1.jar') \
.config('spark.submit.pyFiles','gs://spark-lib/bigquery/spark-bigquery-support-0.27.1.zip') \
.config('spark.files','gs://spark-lib/bigquery/spark-bigquery-support-0.27.1.zip') \
.getOrCreate()

# Code to enable query pushdown start
# extract the spark-bigquery-support zip file
import zipfile
with zipfile.ZipFile(SparkFiles.get("spark-bigquery-support-0.27.0.zip")) as zf:
  zf.extractall()
try:
    import pkg_resources
    pkg_resources.declare_namespace(__name__)
except ImportError:
    import pkgutil
    __path__ = pkgutil.extend_path(__path__, __name__)
# Enable pushdown
from google.cloud.spark.bigquery import big_query_connector_utils
big_query_connector_utils.enablePushdownSession(spark)
# Code to enable query pushdown end

Note: If the above code gives ModuleNotFoundError, you can directly enable pushdown for the Spark session as spark.sparkContext._jvm.com.google.cloud.spark.bigquery.BigQueryConnectorUtils.enablePushdownSession(spark._jsparkSession)

Note: that this will enable query pushdown on the entire Spark session. If you want to disable pushdown, you will need to call BigQueryConnectorUtils.disablePushdownSession

Python code for disabling Query Pushdown

# Disable pushdown

from google.cloud.spark.bigquery import big_query_connector_utils
big_query_connector_utils.disablePushdownSession(spark)

Note: that materializationDataset option has to be passed in during the spark.read call for queries to be pushed down to BigQuery. This dataset holds the temporary tables generated by the SQL query that is run in BigQuery. These temporary tables will get deleted after 24 hours.

Enable/Disable Query Pushdown from Java/Scala

To enable Query Pushdown from Java/Scala, you will first need to include the connector as a dependency. The instructions for the same are here. Then, you can call com.google.cloud.spark.bigquery.BigQueryConnectorUtils.enablePushdownSession(spark) to enable Query Pushdown for your Spark session from your project code.

If you want to disable Query Pushdown for the already enabled Spark session, you can call com.google.cloud.spark.bigquery.BigQueryConnectorUtils.disablePushdownSession(spark)

Validating/Debugging Query Pushdown

If you experience any issues with Query Pushdown, you can use df.explain() on the DataFrame. If you pass the argument true you will get a lot more output that includes pre and post optimization phases.

You can be sure that the entire query has been pushed down if you only see Spark(Version)BigQueryPushdownPlan as part of the physical plan output.

Note: If you are running jobs on Dataproc using the connector, the job logs will also give information about any errors encountered during query pushdown

Supported Pushdown Operations

The following Spark functions/operations are supported in Query Pushdown.

Aggregation Functions

Spark Logical Plan Construct Spark SQL function name
Average avg
Corr corr
CovPopulation covar_pop
CovSample covar_samp
Count count
Max max
Min min
StddevPop stddev_pop
StddevSamp stddev_samp
Sum sum
VariancePop var_pop
VarianceSamp var_samp

Boolean Functions

Spark Logical Plan Construct Spark SQL function name
And and
Between between
Contains contains
EndsWith endswith
EqualNullSafe <=>
EqualTo ==
GreaterThan >
GreaterThanOrEqual >=
In in
IsNull isnull
IsNotNull isnotnull
LessThan <
LessThanOrEqual <=
Not !
Or or
StartsWith startswith
NotEqual !=
NotGreaterThan <
NotGreaterThanOrEqual <=
NotLessThan >
NotLessThanOrEqual >=

Mathematical Functions

Spark Logical Plan Construct Spark SQL function name
‘+’ (addition) +
‘-‘ (subtraction) -
‘*’(multiplication) *
‘/’ (division) /
‘-‘ (unary negation) -
Abs abs
Acos acos
Asin asin
Atan atan
CheckOverflow
Cos cos
Cosh cosh
Exp exp
Floor floor
Greatest greatest
Least least
Log log
Log10 log10
Pi pi
Pow pow
PromotePrecision
Rand rand
Round round
Sin sin
Sinh sinh
Sqrt sqrt
Tan tan
Tanh tanh
IsNan isnan
SigNum signum

Miscellaneous Operators

Spark Logical Plan Construct Spark SQL function name
Alias (AS expressions)
BitwiseAnd Bit_and ,&
BitwiseNot ~
BitwiseOr Bit_or
BitwiseXor Bit_xor, ^
CaseWhen case
Cast(child, t, _) cast
Coalesce coalesce
If if
IsNull ifnull
IsNotNull isnotnull
ScalarSubquery
ShiftLeft shiftleft
ShiftRight shiftright
SortOrder
UnscaledValue

String Functions

Spark Logical Plan Construct Spark SQL function name
Ascii ascii
Concat(children) concat
Contains contains
EndsWith endswith
Length length
Like like
Lower lower
StartsWith startswith
StringLPad lpad
StringRPad rpad
StringTranslate translate
StringTrim trim
StringTrimLeft ltrim
StringTrimRight rtrim
Substring Substring, substr
Upper upper
StringInstr instr
InitCap initcap
SoundEx soundex
RegExpExtract regexp_extract
RegExpReplace regexp_replace
FormatString format_string
FormatNumber format_number
Base64 base64
UnBase64 unbase64
Upper upper

Date Functions

Spark Logical Plan Construct Spark SQL function name
DateAdd date_add
DateSub date_sub
Month month
Quarter quarter
DateTrunc date_trunc
Year year

Window Functions

Spark Logical Plan Construct Spark SQL function name
DenseRank dense_rank
Rank rank
RowNumber row_number
PercentRank percent_rank

Relational Operators

Spark Logical Plan Construct Spark SQL function name
Aggregate functions and group-by clauses GROUP BY clause
Distinct DISTINCT clause
Exists EXISTS clause
Filters WHERE clause
In/InSet IN clause
Having HAVING clause
Joins (Inner, LeftOuter, RightOuter, FullOuter, Cross, LeftSemi, LeftAnti) JOIN clause
Limits LIMIT clause
Projections SELECT clause
Sorts (ORDER BY) ORDER BY and SORT BY clauses
Union and Union All UNION and UNION ALL clauses
Window functions and Windowing clauses WINDOW clause

Caveats/Limitations

  • Because the translation in Query Pushdown from the Spark LogicalPlan to SQL requires almost a one-to-one translation of Spark SQL operators to BigQuery expressions, not all of Spark SQL operators can be pushed down. When the pushdown fails, the connector falls back to the original execution plan. The unsupported operations are instead performed in Spark.
  • Query Pushdown will guarantee ordered results only if there is an outer ORDER BY clause in the Spark SQL query and the total number of results can fit in a single partition.
  • Spark UDFs cannot be pushed down to BigQuery
  • Query Pushdown functionality is not bug-for-bug compatible with Spark. So, the results will be based on BigQuery’s behavior (which follows SQL standard)
  • Full outer, right outer and left outer joins return WrappedArray instead of null, if the schema type is RECORD

Query Pushdown Technical Details

To understand how query pushdown works, let’s take a look at the typical flow of a Spark DataFrame query. The Catalyst optimizer (a general library for representing and performing Spark operations) performs a set of source-agnostic optimizations on the logical plan of a DataFrame. Since DataFrames are executed lazily, Spark can evaluate and optimize relational operators applied to a DataFrame and only execute the DataFrame when an action is invoked.

When an action is invoked, Spark’s Catalyst optimizer first produces an optimized logical plan. The process then moves to the physical planning stage.

Let us consider a simple join query

select ss_item_sk, ss_ticket_number, ss_customer_sk
    from store_sales
    join store_returns
    on (sr_item_sk = ss_item_sk and sr_ticket_number = ss_ticket_number)

Without Pushdown, the above Query is translated into a Physical Plan as

== Physical Plan ==
*(2) Project [ss_item_sk#748L, ss_ticket_number#755L, ss_customer_sk#749L]
+- *(2) BroadcastHashJoin [ss_item_sk#748L, ss_ticket_number#755L], [sr_item_sk#794L, sr_ticket_number#801L], Inner, BuildRight
   :- *(2) Scan com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation@69e8f7a5 [ss_item_sk#748L,ss_customer_sk#749L,ss_ticket_number#755L] PushedFilters: [*IsNotNull(ss_item_sk), *IsNotNull(ss_ticket_number)], ReadSchema: struct<ss_item_sk:bigint,ss_customer_sk:bigint,ss_ticket_number:bigint>
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true], input[1, bigint, true]))
      +- *(1) Scan com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation@738153d8 [sr_item_sk#794L,sr_ticket_number#801L] PushedFilters: [*IsNotNull(sr_item_sk), *IsNotNull(sr_ticket_number)], ReadSchema: struct<sr_item_sk:bigint,sr_ticket_number:bigint>

The Physical Plan need to scan both the tables before performing the Join. This approach is typically not ideal for a more capable Spark data source, such as BigQuery since the data needs to be transferred over the network.

With Pushdown enabled, Catalyst inserts a BigQuery plan as

== Physical Plan ==
Spark24BigQueryPushdownPlan [SUBQUERY_14_COL_0#748L, SUBQUERY_14_COL_1#755L, SUBQUERY_14_COL_2#749L], PreScala213BigQueryRDD[4] at RDD at PreScala213BigQueryRDD.java:61

This generated SQL is pushed down and executed in BigQuery and the result is sent to the Spark executor nodes. Which corresponds to a significant reduce in data that needs to be transferred to Spark over the network to improve the response times.

Read without Query Pushdown

In the connector, customers can read data in Spark from a BigQuery table by using the DataFrame APIs or Spark SQL. An example using Spark SQL is shown below

val wordsDF = spark.read.bigquery("bigquery-public-data.samples.shakespeare").cache()
wordsDF.createOrReplaceTempView("words")

// Perform word count.
val wordCountDF = spark.sql("SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word")
wordCountDF.show()

When a read is issued, the connector performs the following steps on the Spark driver.

  1. Reads the corresponding table metadata using the BigQuery API.
  2. Creates a ReadSession using the BigQuery Storage Read API due to which multiple streams are returned over the table data that needs to be read
  3. Distributes the streams across Spark executors which are responsible for actually reading the data and storing it in DataFrames

Read with Query Pushdown

Query pushdown is an optimization in this process in which the connector code in the Spark driver performs a translation of the Spark operations into BigQuery Standard SQL and creates a temporary table in BigQuery which is then read by Spark executors. Thus, the data that needs to be read by the executors is (potentially) significantly reduced.

  1. Reads the corresponding table metadata using the BigQuery API.
  2. Generate SQL by translating the Optimized Spark LogicalPlan. Create temorary table in BigQuery from the generated SQL.
  3. Creates a ReadSession over the temporary table created in Step 2.
  4. Distributes the streams across Spark executors which are responsible for actually reading the data and storing it in DataFrames