Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: delta plugin support write #284

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from

Conversation

milicevica23
Copy link
Contributor

No description provided.

Copy link
Collaborator

@jwills jwills left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry it's taken me all week to get to this, been feeling under the weather

@@ -0,0 +1,9 @@
Something what i don't understand?

- For example we want to push something with a sql plugin to a database. Why we first write it down to disk and then read in memory with pandas and then push it further?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Primarily an artifact of the historical architecture here-- we built the plugins on top of the existing external materialization type. There isn't a reason we could not also support plugins on non-external materializations (tables/views/etc.), it's just work that hasn't come up yet.


Future:

- If we make an external materialization we want that upstream models can refer to it? How to do it we have to register an df and create an view to it?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current external materialization does this already; dbt-duckdb creates a VIEW in the DuckDB database that points at the externally materialized file so that it can be used/queried like normal by any other models that we run.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit tricky because the underlying delta table can't be just created but it has to be registered first as df. We do that currently when the delta table is defined in the source but we have to invoke this process here too. Let me try some stuff here if not we can make it a limitation and provide as the last layer that can't be referenced

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As of my understanding, if you use SQL/excel write plugin, you cant reference it afterward?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you can reference any external materialization after it gets created in other, downstream models (sorry for the lag I've been traveling in Asia and will be here for the next few weeks)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noting that I would support a custom materialization type for delta tables if that is what we really need to make the write-side work well here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think that we can also put it in external but i have to refactor it a bit, but i am still not so far away, currently i just play around and try to make it work anyway

dbt/adapters/duckdb/environments/local.py Show resolved Hide resolved
@@ -43,6 +44,110 @@ def load(self, source_config: SourceConfig):
def default_materialization(self):
return "view"

def store(self, target_config: TargetConfig, df=None):
Copy link

@ZergRocks ZergRocks Jan 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this part will be updated as integration with current profiles.yml's filesystem option after this PR finished delta-io/delta-rs#570 right?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delta-rs's filesystem seems to will use pyarrow's fs and that can be incorporated into this method with if-else or match clause

def initialize_plugins(cls, creds: DuckDBCredentials) -> Dict[str, BasePlugin]:

also we can instantiate pyarrow.fs in current existing pattern. but I'm not familar with this repository's convention

just leaving a comment to want to be of little help..

I've never contributed to open source, is there any way I can apply if I want to contribute?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ZergRocks,
I am not sure if i follow here. As i understand, you ask if we support custom fs which can be used in the delta-rs package
https://delta-io.github.io/delta-rs/usage/loading-table/#custom-storage-backends?
So it is currently not implemented but maybe it can be at least for begining in the read part which is done by my side, so if you want to do it i would be happy to support you. You can write me in the slack dbt chanell and we can comment on that together

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pyarrow.fs won't be implemented, it was a placeholder for a possible implementation but we are not continuing with that anymore.

@nfoerster
Copy link

hi all, we are really into that feature. Is there a rough estimation if and when it could be merged to main branch? Thanks

@milicevica23
Copy link
Contributor Author

milicevica23 commented Jan 16, 2024

Hi @nfoerster,
Currently it is very hard to define some end data, but i am working on it when i have free time. The problem goes beyond just adding a plugin because to make it work in the current context with external materialization, there are things to refactor with other parts, and this branch can also break another functionality.
What I am doing now is trying to understand the memory management which is happening in the background in order to propose a change in the current implementation.

I also, from time to time, rebase on the main branch, and if you want, you can use this branch to try stuff out and provide feedback/use cases which are missing, but be aware that there are possibly other things broken.
Here is the example dbt project which i use while develop so there you can find some models which are writing data to Azure/local

  • Append and overwrite works
  • overwrite_partitions is in progress
  • and merge was very slow in that time when i tried it out but can be maybe faster now

I hope I will write in next few days a small proposal to refactor the current implementation so we can implement this plugin and test it.

def table_exists(table_path, storage_options):
# this is bad, i have to find the way to see if there is table behind path
try:
DeltaTable(table_path, storage_options=storage_options)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To do this operation a bit cheaper, set also without_files=True.

check_relations_equal,
run_dbt,
)
from deltalake.writer import write_deltalake

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can import it directly from deltalake


## TODO
# add partition writing
# add optimization, vacumm options to automatically run before each run ?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plan is to make these things configurable on the delta table itself which then should handle at which interval to vacuum or optimize, similar to spark-delta.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am courious about that one is there some docu?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet :) Was just sharing what I was planning to add

f"Overwriting delta table under: {table_path} \nwith partition expr: {partition_expr}"
)
write_deltalake(
table_path, data, partition_filters=partition_expr, mode="overwrite"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partition_filters are going to be a thing of the past. We are moving to the rust engine slowly and soon there will be a predicate overwrite (in the next python 0.15.2) that is more flexible than the partition_filter overwrite that used by the pyarrow writer.

)
create_insert_partition(table_path, df, partition_dict, storage_options)
elif mode == "merge":
# very slow -> https://github.com/delta-io/delta-rs/issues/1846

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the case anymore :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ion-elgreco, thank you very much for all the comments; I am happy that there is somebody from the delta-rs side with some insights. I very much appreciate your help.
I will recheck this when I finish general plugin refactoring on the dbt side and start again to test with the deltalake integration.

@milicevica23
Copy link
Contributor Author

Hi @jwills,
To make some progress on this PR, I will write my current status so that you know what I think and you can provide me with some extra information/thoughts.

The main problem is that when we use the plugin export, we have an extra step where we export data to the parquet file and then load it again into memory with pandas.

I think we have to provide a direct reference to the arrow format (ideally dataset) to the underlying export library (pandas, datafusion in delta case) if this is somehow possible.

TL;DR;
Current implementation:
So what do we do now?

image

step 3.
write to storage

step 5.
sqlaclemy write
excel write

New implementation:

image

What is the problem right now?

  1. Looking into the arrow <-> duckdb connection i found out that .arrow() makes a copy, which is clear to me but the question is how to do it without a copy?
    So following https://duckdb.org/2021/12/03/duck-arrow.html i started to question the memory in the background and would like to profile the whole dbt-duckdb process
    This is my repo with some examples https://github.com/milicevica23/wtf-memory
    If you know somebody who can help me to understand how to transfer data from duckdb to arrow format with zero copy i would appreciate that a lot

Regarding other plugins i took a look into pandas <-> arrow combination https://arrow.apache.org/docs/python/pandas.html
which is not ideal but still can be better than parquet export

  1. I have a hard time imagining how we can make it so that such an export is again referencable in the downstream models. What we do right now is export to the file and provide a view over this chunk. But this is more of a workaround than a solution.
    If we imagine using the SQLalchemy plugin and looking from another perspective, it is a last-run chunk of data that is pushed and not a source from the database.
    This can be interesting if we export locally and to parquet, but pushing to some remote storage is harder to implement.
    e.g Issues when trying to connect to deltalake l-mds/demo-dbt-duckdb-delta-plugin#1

In my opinion i think we should ask around if somebody reference to something what is exported with a plugin and try to understand the use case and try to find a solution for this. I know that especially for delta plugin this feature would be nice to have it so i would think about this

Nevertheless, I think that some good architecture should not reference something that you export and if you want to use it in following steps it should be again written in the source definition

image

Next steps:

  1. I would like that we comment on the above and then implement what we agree on. If we go with arrow integration, this should be then implemented also in the excel and sqlalchemy plugin which have export part
  2. We have to think what this change means for breaking changes if there are some and prepare for that
  3. Optional: I would like to make as less data copy as possible so I would stay on track with duckdb <-> arrow <-> pandas integration, so i appreciate here any help

@jwills
Copy link
Collaborator

jwills commented Jan 31, 2024

Hey @milicevica23 apologies for the lag here, I missed this go by in my email. My limited understanding of the issue is that it sounds like the external approach-- where we write a parquet file out and then treat any subsequent references to that file as a view in DuckDB-- isn't the right thing to do for delta (and presumably Iceberg) b/c both of those tools expect to operate on something like an Arrow Table/RecordBatch instead of a set of parquet files-- is that correct, or am I missing something?

If that's the case, then I think that we shouldn't keep trying to jam the iceberg and delta stuff into the external materialization; we should just create delta and iceberg materialization strategies that are designed to work in the way that those tools need them to work. It's more work, but I don't think that trying to push the external concept beyond it's design (especially if doing so would require us to break backwards compatibility) is a good idea, and I fully expect that the Iceberg/Delta table design patterns will be common enough to deserve first-class support in dbt-duckdb.

@milicevica23
Copy link
Contributor Author

Hi @jwills,
So, this problem is not on the level of a delta or iceberg but on the general plugin level.
We have two use cases

  1. export files parquet, csv from duckdb connection
  2. use a plugin to export data to Excel, delta, and Iceberg from the duckdb connection

My understanding is that the first developed feature was 1. but with time, the 2. case was built on top of the first one. This introduced the wrong process with one unnecessary step, which was that we first export the file into the parquet and then reread it in the memory instead of giving the data directly from the duckdb over the arrow format to the exporter.
So this is conceptually wrong, but one nice feature which was luckily born from that is that you can reference exported models but with limitations e.g you can't do incremental models with plugins.
This feature should be solved with the data frame registration as we do in the read part of the plugin

I am still unsure how and what the best approach is, but we have to rethink and simplify the whole export part and combine the above 1. and 2. use cases into a unified one-plugin approach. It is not easy, but I think it is a needed step for the feature.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants