Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Context not propagated to snowflakeFileTransferAgent on PUT command; cancellation is not supported #1028

Open
joellubi opened this issue Jan 11, 2024 · 3 comments
Assignees
Labels
bug Erroneous or unexpected behaviour status-pr_pending_merge A PR is made and is under review status-triage_done Initial triage done, will be further handled by the driver team

Comments

@joellubi
Copy link

Context cancellation works for other queries executed using conn.ExecContext(), but is ignored for PUT/GET commands because it is not propagated any further than the snowflakeConn.processFileTransfer() method.

  1. What version of GO driver are you using?
    github.com/snowflakedb/gosnowflake v1.7.1

  2. What operating system and processor architecture are you using?
    macOS 14.1.1 x86_64

  3. What version of GO are you using?
    go version go1.21.5 darwin/amd64

  4. Server version:
    8.1.0

  5. What did you do?

package main

import (
	"context"
	"database/sql"
	"log"
	"time"

	"github.com/snowflakedb/gosnowflake"
)

func main() {
	cfg := &gosnowflake.Config{
		// Redacted
	}

	dsn, err := gosnowflake.DSN(cfg)
	if err != nil {
		log.Fatal(err)
	}

	db, err := sql.Open("snowflake", dsn)
	if err != nil {
		log.Fatal(err)
	}
	defer db.Close()

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	conn, err := db.Conn(ctx)
	if err != nil {
		log.Fatal(err)
	}
	defer conn.Close()

	stageName := "my_temp_stage"
	createStageQuery := "CREATE OR REPLACE STAGE my_temp_stage" + stageName
	putQuery := "PUT file:///path/to/test.parquet @" + stageName + " OVERWRITE=true"

	_, err = conn.ExecContext(ctx, createStageQuery)
	if err != nil {
		log.Fatal(err)
	}

	go func() {
		time.Sleep(10 * time.Second)
		log.Println("Canceled")
		cancel()
	}()

	log.Println("Starting PUT upload")
	_, err = conn.ExecContext(ctx, putQuery)
	if err != nil {
		log.Fatal(err)
	}
	log.Println("Completed PUT upload")
}
  1. What did you expect to see?

Expected Logs

% go run .
2024/01/10 21:11:22 Starting PUT upload
2024/01/10 21:11:32 Canceled
2024/01/10 21:11:32 context canceled
exit status 1

Actual Logs

% go run .
2024/01/10 21:11:22 Starting PUT upload
2024/01/10 21:11:32 Canceled
2024/01/10 21:12:36 Completed PUT upload
  1. Can you set logging to DEBUG and collect the logs?

    https://community.snowflake.com/s/article/How-to-generate-log-file-on-Snowflake-connectors

Yes, if it will help. But it's clear in the code that ctx is not passed all the way down to the upload methods.

  1. What is your Snowflake account identifier, if any? (Optional)
@joellubi joellubi added the bug Erroneous or unexpected behaviour label Jan 11, 2024
@sfc-gh-dszmolka sfc-gh-dszmolka self-assigned this Jan 11, 2024
@sfc-gh-dszmolka sfc-gh-dszmolka added the status-triage Issue is under initial triage label Jan 11, 2024
@sfc-gh-dszmolka
Copy link
Contributor

Hi and thank you for raising this issue; especially with such a nice self contained reproduction! It indeed reproduces and when replacing putQuery with something else like a SELECT..., it works as expected. But not with PUT/GET.
We'll take a look.

@sfc-gh-dszmolka sfc-gh-dszmolka added status-in_progress Issue is worked on by the driver team and removed status-triage Issue is under initial triage labels Jan 11, 2024
zeroshade pushed a commit to apache/arrow-adbc that referenced this issue Jan 26, 2024
# What

- Replace Snowflake bulk ingestion with Parquet-based approach with
higher throughput and better type support
- Previously: INSERT bind parameters were uploaded to a CSV-based stage,
once per record batch
- Now: Parquet files written concurrently to stage independently of
record batch size. Parquet logical types are used to infer schema on
COPY.
- Tests to validate type support and consistency through Arrow ->
Parquet -> Snowflake -> Arrow roundtrip
- Improved type mapping between Arrow <-> Snowflake timestamps.
[TIMESTAMP_LTZ](https://docs.snowflake.com/en/sql-reference/data-types-datetime#timestamp-ltz-timestamp-ntz-timestamp-tz)
is more consistent with Arrow timestamp semantics than TIMESTAMP_TZ,
which can lead to lossy roundtrips.
- Minor bugfix where Snowflake local timestamps with timezone set to UTC
were being interpreted as non-local.

# Why

- Implements #1327, which comes from improvement request #1322
- BindStream ingestion is significantly faster
- Arrow type support is improved

# Methodology

The general approach for ingestion is most clearly demonstrated by the
path taken when `stmt.Bind()` for a single record is used:
### IngestRecord
```mermaid
flowchart LR
    A(Record) --> B(Write Parquet)
    B --> C(Upload File)
    C --> D(Execute COPY)
    D --> E(Check Row Count)
```
The Arrow record is written to a Parquet file due to its logical type
support, compressibility, and native Snowflake support. The file is then
uploaded to a temporary Snowflake stage via PUT query, and then loaded
into the target table via COPY query. Once the COPY has finished, one
more query to check the resulting row count is dispatched to accurately
return the number of rows affected. This is used instead of counting the
Arrow rows written in case there are any undetected losses when
importing the uploaded file into Snowflake.

A similar approach is taken when ingesting an arbitrarily large stream
of records via `stmt.BindStream()`, but makes use of several
opportunities to parallelize the work involved at different stages:

### IngestStream
```mermaid
flowchart LR

    A(Read Records) --> B(Write Parquet)

    A --> C(Write Parquet)
    A --> D(Write Parquet)
    A --> E(Write Parquet)

    B --> J(Buffer Pool)
    C --> J
    D --> J
    E --> J

    J --> K(Upload File)
    J --> L(Upload File)

    K --> M(Finalize COPY)
    L --> M

    M --> N(Check Row Count)


    O(File Ready) --> P(Execute COPY)
    P --> O
```
The same steps are used, but the stream of records is now distributed
among a pool of Parquet writers. This step is inherently CPU-bound, so
it is desirable for it to scale independently with the availability of
logical cores for writing/compression. These Parquet files are written
to a buffer pool in memory to help decouple the upload stage from
writing, and so that a writer can start working on the next file _while_
the last file it wrote is being uploaded. Uploads from the buffer pool
also benefit from parallelism, but more so to maximize network
utilization by limiting idle time between uploads and amortizing
potential slowdown in any one upload.

Technically, only a single COPY command is required after the last file
is uploaded in order to load the Parquet files into the Snowflake table.
However, on many warehouses this operation takes as long or even longer
than the upload itself but can be made faster by paying for a larger
warehouse. Given the batched approach taken and that the COPY command is
idempotent, we can execute COPY repeatedly as files are uploaded to load
them into the table on an ongoing basis. These COPY queries are executed
asynchronously and listen for an upload-completed callback to ensure at
least one file will be loaded by the query (otherwise it will no-op so
this just prevents spamming Snowflake with a bunch of no-op COPYs).

Empirically, ingestion works reasonably well on an XS warehouse. COPY
speed is no longer a bottleneck with an S warehouse with high-speed home
internet, or on an M warehouse with same-region data center networking.

# Performance

Running on GCP e2-medium (shared-core 1 vCPU, 4GB RAM)
Snowflake warehouse size M, same GCP region as Snowflake account
Default ingestion settings

Benchmarking TPC-H Lineitem @ SF1 (6M Rows):
- Current: 11m50s
- New: 14s

Benchmarking TPC-H Lineitem @ SF10 (60M Rows):
- Current: Didn't attempt
- New: 1m16s

_This configuration is CPU bound, so I did another attempt with more
cores available..._
Now with GCP e2-standard-4 (4 vCPU, 16GB RAM)

Benchmarking TPC-H Lineitem @ SF1 (6M Rows):
- Current: 11m17s
- New: 9.5s

Benchmarking TPC-H Lineitem @ SF10 (60M Rows):
- Current: 1h47m
- New: 45s

# Considerations

- Snowflake
[guides](https://community.snowflake.com/s/article/How-to-Load-Terabytes-Into-Snowflake-Speeds-Feeds-and-Techniques)
indicate that ingestion via CSV is the fastest. Experimentally, it does
appear to be true that a COPY query on staged CSV files executes much
faster than for Parquet files. However by distributing the COPY
workloads _in parallel to_ the batched file uploads, overall performance
is better with Parquet since it can be compressed _much_ more
efficiently allowing the upload to complete in less time and with fewer
bytes transferred than with CSV. Type support is also much better.
- Single-Record ingestion performance is slightly worse than the
previous INSERT-bind approach. As a rough idea, a record that previously
ingested in about 1.7s now ingests in about 2.5s. However, the new
approach does come with expanded type support and better consistency
with the streaming approach.
- An ingestion run that fails part-way through may leave the table with
partial results. Transaction semantics may be added in the future by
overriding the CopyConcurrency parameter to be 0, in which case only the
final COPY will execute.

# Additional Work

### Blocking
- ~Timestamps will roundtrip properly after Arrow
[GH-39466](apache/arrow#39466) is closed. A
test is included but skipped for now.~
- ~Date64 will roundtrip properly after Arrow
[GH-39456](apache/arrow#39456) is closed. A
test is included but skipped for now.~

### Non-Blocking
- Compression codec and level are included in `ingestOptions` but are
not configurable using `stmt.SetOption()`. It is trivial to add this,
but it would be nice to be able to use the currently internal
[CompressionCodecFromString](https://github.com/apache/arrow/blob/e6323646558ee01234ce58af273c5a834745f298/go/parquet/internal/gen-go/parquet/parquet.go#L387-L399)
method to automatically pick up support for any other codecs added in
the future. Captured in #1473.
- List and Map types have some issues on ingestion. Snowflake returns
`SQL execution internal error` whenever repetition level is greater than
0. Still some more investigation to do here. This is non-blocking
because neither type was previously supported for ingestion.
- Context cancelation is supported for all goroutines and queries
executed as part of ingestion, _except_ for the PUT query (i.e. file
uploads). This issue is being tracked in gosnowflake
[1028](snowflakedb/gosnowflake#1028). In
practice, it likely takes just a few seconds for in-progress uploads to
complete and properly conclude cancelation. Once this issue is fixed,
the queries would be canceled in Snowflake, allowing the process to exit
faster and reduce unnecessary work.
- ~The code previously meant to map Snowflake types to Go types is no
longer used. It may still be useful for binding an Arrow record to an
arbitrary Update query, but `stmt.Prepare` should be implemented first
to follow ADBC spec for binding parameters.~
soumyadsanyal pushed a commit to soumyadsanyal/arrow-adbc that referenced this issue Jan 31, 2024
…1456)

# What

- Replace Snowflake bulk ingestion with Parquet-based approach with
higher throughput and better type support
- Previously: INSERT bind parameters were uploaded to a CSV-based stage,
once per record batch
- Now: Parquet files written concurrently to stage independently of
record batch size. Parquet logical types are used to infer schema on
COPY.
- Tests to validate type support and consistency through Arrow ->
Parquet -> Snowflake -> Arrow roundtrip
- Improved type mapping between Arrow <-> Snowflake timestamps.
[TIMESTAMP_LTZ](https://docs.snowflake.com/en/sql-reference/data-types-datetime#timestamp-ltz-timestamp-ntz-timestamp-tz)
is more consistent with Arrow timestamp semantics than TIMESTAMP_TZ,
which can lead to lossy roundtrips.
- Minor bugfix where Snowflake local timestamps with timezone set to UTC
were being interpreted as non-local.

# Why

- Implements apache#1327, which comes from improvement request apache#1322
- BindStream ingestion is significantly faster
- Arrow type support is improved

# Methodology

The general approach for ingestion is most clearly demonstrated by the
path taken when `stmt.Bind()` for a single record is used:
### IngestRecord
```mermaid
flowchart LR
    A(Record) --> B(Write Parquet)
    B --> C(Upload File)
    C --> D(Execute COPY)
    D --> E(Check Row Count)
```
The Arrow record is written to a Parquet file due to its logical type
support, compressibility, and native Snowflake support. The file is then
uploaded to a temporary Snowflake stage via PUT query, and then loaded
into the target table via COPY query. Once the COPY has finished, one
more query to check the resulting row count is dispatched to accurately
return the number of rows affected. This is used instead of counting the
Arrow rows written in case there are any undetected losses when
importing the uploaded file into Snowflake.

A similar approach is taken when ingesting an arbitrarily large stream
of records via `stmt.BindStream()`, but makes use of several
opportunities to parallelize the work involved at different stages:

### IngestStream
```mermaid
flowchart LR

    A(Read Records) --> B(Write Parquet)

    A --> C(Write Parquet)
    A --> D(Write Parquet)
    A --> E(Write Parquet)

    B --> J(Buffer Pool)
    C --> J
    D --> J
    E --> J

    J --> K(Upload File)
    J --> L(Upload File)

    K --> M(Finalize COPY)
    L --> M

    M --> N(Check Row Count)


    O(File Ready) --> P(Execute COPY)
    P --> O
```
The same steps are used, but the stream of records is now distributed
among a pool of Parquet writers. This step is inherently CPU-bound, so
it is desirable for it to scale independently with the availability of
logical cores for writing/compression. These Parquet files are written
to a buffer pool in memory to help decouple the upload stage from
writing, and so that a writer can start working on the next file _while_
the last file it wrote is being uploaded. Uploads from the buffer pool
also benefit from parallelism, but more so to maximize network
utilization by limiting idle time between uploads and amortizing
potential slowdown in any one upload.

Technically, only a single COPY command is required after the last file
is uploaded in order to load the Parquet files into the Snowflake table.
However, on many warehouses this operation takes as long or even longer
than the upload itself but can be made faster by paying for a larger
warehouse. Given the batched approach taken and that the COPY command is
idempotent, we can execute COPY repeatedly as files are uploaded to load
them into the table on an ongoing basis. These COPY queries are executed
asynchronously and listen for an upload-completed callback to ensure at
least one file will be loaded by the query (otherwise it will no-op so
this just prevents spamming Snowflake with a bunch of no-op COPYs).

Empirically, ingestion works reasonably well on an XS warehouse. COPY
speed is no longer a bottleneck with an S warehouse with high-speed home
internet, or on an M warehouse with same-region data center networking.

# Performance

Running on GCP e2-medium (shared-core 1 vCPU, 4GB RAM)
Snowflake warehouse size M, same GCP region as Snowflake account
Default ingestion settings

Benchmarking TPC-H Lineitem @ SF1 (6M Rows):
- Current: 11m50s
- New: 14s

Benchmarking TPC-H Lineitem @ SF10 (60M Rows):
- Current: Didn't attempt
- New: 1m16s

_This configuration is CPU bound, so I did another attempt with more
cores available..._
Now with GCP e2-standard-4 (4 vCPU, 16GB RAM)

Benchmarking TPC-H Lineitem @ SF1 (6M Rows):
- Current: 11m17s
- New: 9.5s

Benchmarking TPC-H Lineitem @ SF10 (60M Rows):
- Current: 1h47m
- New: 45s

# Considerations

- Snowflake
[guides](https://community.snowflake.com/s/article/How-to-Load-Terabytes-Into-Snowflake-Speeds-Feeds-and-Techniques)
indicate that ingestion via CSV is the fastest. Experimentally, it does
appear to be true that a COPY query on staged CSV files executes much
faster than for Parquet files. However by distributing the COPY
workloads _in parallel to_ the batched file uploads, overall performance
is better with Parquet since it can be compressed _much_ more
efficiently allowing the upload to complete in less time and with fewer
bytes transferred than with CSV. Type support is also much better.
- Single-Record ingestion performance is slightly worse than the
previous INSERT-bind approach. As a rough idea, a record that previously
ingested in about 1.7s now ingests in about 2.5s. However, the new
approach does come with expanded type support and better consistency
with the streaming approach.
- An ingestion run that fails part-way through may leave the table with
partial results. Transaction semantics may be added in the future by
overriding the CopyConcurrency parameter to be 0, in which case only the
final COPY will execute.

# Additional Work

### Blocking
- ~Timestamps will roundtrip properly after Arrow
[GH-39466](apache/arrow#39466) is closed. A
test is included but skipped for now.~
- ~Date64 will roundtrip properly after Arrow
[GH-39456](apache/arrow#39456) is closed. A
test is included but skipped for now.~

### Non-Blocking
- Compression codec and level are included in `ingestOptions` but are
not configurable using `stmt.SetOption()`. It is trivial to add this,
but it would be nice to be able to use the currently internal
[CompressionCodecFromString](https://github.com/apache/arrow/blob/e6323646558ee01234ce58af273c5a834745f298/go/parquet/internal/gen-go/parquet/parquet.go#L387-L399)
method to automatically pick up support for any other codecs added in
the future. Captured in apache#1473.
- List and Map types have some issues on ingestion. Snowflake returns
`SQL execution internal error` whenever repetition level is greater than
0. Still some more investigation to do here. This is non-blocking
because neither type was previously supported for ingestion.
- Context cancelation is supported for all goroutines and queries
executed as part of ingestion, _except_ for the PUT query (i.e. file
uploads). This issue is being tracked in gosnowflake
[1028](snowflakedb/gosnowflake#1028). In
practice, it likely takes just a few seconds for in-progress uploads to
complete and properly conclude cancelation. Once this issue is fixed,
the queries would be canceled in Snowflake, allowing the process to exit
faster and reduce unnecessary work.
- ~The code previously meant to map Snowflake types to Go types is no
longer used. It may still be useful for binding an Arrow record to an
arbitrary Update query, but `stmt.Prepare` should be implemented first
to follow ADBC spec for binding parameters.~
@sfc-gh-dszmolka sfc-gh-dszmolka added status-triage_done Initial triage done, will be further handled by the driver team and removed status-in_progress Issue is worked on by the driver team labels Feb 11, 2024
@sfc-gh-dszmolka sfc-gh-dszmolka added the status-in_progress Issue is worked on by the driver team label Mar 28, 2024
@sfc-gh-dszmolka
Copy link
Contributor

Short update; indeed the processFileTransfer does not seem to contain WithCancel. We're working on a fix and I'll keep this thread posted.

@sfc-gh-dszmolka
Copy link
Contributor

we have a draft PR at #1108 for this, will keep this thread updated

@sfc-gh-dszmolka sfc-gh-dszmolka added status-pr_pending_merge A PR is made and is under review and removed status-in_progress Issue is worked on by the driver team labels May 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Erroneous or unexpected behaviour status-pr_pending_merge A PR is made and is under review status-triage_done Initial triage done, will be further handled by the driver team
Projects
None yet
Development

No branches or pull requests

3 participants