Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pyspark] Cleanup data processing. #8088

Merged
merged 3 commits into from Jul 26, 2022
Merged

Conversation

trivialfis
Copy link
Member

@trivialfis trivialfis commented Jul 17, 2022

This PR does some cleanups for the data processing precedures for the newly gained pyspark interface.

  • Use numpy stack for handling list of arrays.
  • Reuse concat function from dask.
  • Remove unused code.
  • Use iterator for prediction to avoid initializing xgboost model (pickle is not cheap).

To-dos:

  • Test for different types of inputs.

I'm not entirely sure how to work with sparse data since it hasn't been supported yet in current codebase.

@WeichenXu123

@trivialfis
Copy link
Member Author

@wbo4958 Please upstream your work on the support of cuDF when you are available.

@wbo4958
Copy link
Contributor

wbo4958 commented Jul 18, 2022

Seems this PR has changed the original way to build DMatrix, can you merge it first, then I will add the whole GPU pipeline support? @trivialfis

@WeichenXu123
Copy link
Contributor

@trivialfis

I'm not entirely sure how to work with sparse data since it hasn't been supported yet in current codebase.

Let me do this.
It requires a new spark function API unwrap_udt (will added in spark 3.4)
and it supports the pyspark.ml.linalg.SparseVector type features column.
the unwrap_udt API will unwrap pyspark.ml.linalg.SparseVector column into a struct of sparse vector active value indexes array, active value array, etc. and so that we can convert the struct data into DMatrix in spark python UDF.

@trivialfis
Copy link
Member Author

Thank you for the quick reviews. ;-) This branch is still working in progress, sharing it as a sketch to discuss how we might proceed with the data processing procedures along with support for GPU. I will update the branch and remove the WIP tag once it's ready.

@WeichenXu123
Copy link
Contributor

@trivialfis
It will be great if you can prioritize this PR,
so that it can unblock follow-up work including data iteration input, external memory, sparse vector support, etc.

@trivialfis
Copy link
Member Author

Apologies for the delay. I was trying to get #8050 ready and mitigate the memory usage surge with it. Will focus on this PR now.

@trivialfis trivialfis changed the title [WIP] [pyspark] Cleanup data processing. [pyspark] Cleanup data processing. Jul 21, 2022
else:
train_data[name].append(array)

cache_partitions(iterator, append)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For DeviceQuantileDMatrix, it supports iterator as input, we don't need to load the whole partition data into memory, right ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Writing it down to disk would be quite slow and we need to iterate through it twice to finish construction. If memory usage is prioritized over efficiency, one can choose external memory. (For GPU we still concatenate the data internally to avoid disk IO, but users can use sampling + external memory to reduce size for GPU hist).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current code (for GPU) is:

  1. load whole partition data into memory arrays
  2. construct an iterator from memory arrays in step (1)
  3. construct DMatrix from iterator in step (2)

What I propose it not to write data to disk, but change step (1)(2) into: construct an iterator from spark python UDF data iterator. This avoid load whole partition data into memory in (1) step.

We can do this improvement in follow-up PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WeichenXu123 I think the data is still loaded into memory. We need to iterate through the data twice (notice the reset function). Unless the spark iterator can be reset and start again, we need to cache the data somewhere.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, got it , spark udf data iterator does not support reset. that's bad.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously I thought DMatrix should accept an one-shot iterator, it exhausts the iterator and cache all data into DMatrix format in memory, and then xgboost training can use the DMatrix data in mutilple passes. Why DMatrix iteration API is not designed this way ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WeichenXu123 Apologies for the confusion here. Here's a little bit of clarification on the difference between different DMatric constructors:

  • Normal DMatrix: DMatrix(np.array(...))
    It accepts one single batch of data, and constructs an internal CSR representation that can be used by all algorithms. It has 2 issues with distributed training: firstly, it needs to concatenate all partitions, which doubles the memory usage, secondly, it's a csr, which can triple the memory usage when input is dense.
  • QuantileDMatrix: This is designed to be as "inplace" as possible and can only be used by GPU hist. It omits the CSR representation and constructs the internal histogram index directly from external data (including quantilization). The iterator concept you see there is designed for a distributed system that handles data in the form of partitions, the sole purpose of accepting an iterator instead of a whole blob here is to avoid the concatenate function on the input data, which doubles the memory. (concat_or_none in this PR.) As a result, we cannot have an internal representation (CSR) and need to iterate through the input partitions multiple times before we can start training since we are trying not to make any copy.
  • External memory: DMatrix(iterator). Yes, using CPU hist/approx DMatrix caches the batches on disk and iterates through it. But it's quite slow since we need to iterate through the data 3-4 times for each layer of the tree, you can estimate the time usage based on the throughput of your hard disk.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

so QuantileDMatrix this does not support sparse data input ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to assign all data iterator related work to you because you have deeper understanding on this. :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so QuantileDMatrix this does not support sparse data input ?

It will support CSR once the CPU impl is merged.

I would like to assign all data iterator related work to you because you have deeper understanding on this. :)

Got it.

@trivialfis
Copy link
Member Author

@wbo4958 @WeichenXu123 Could you please take another look?

@WeichenXu123
Copy link
Contributor

overall good, @trivialfis Have you test whether it causes performance regression ?

)
return training_dmatrix

is_dmatrix = feature_cols is None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why use DeviceQuantileDMatrix only when feature_cols is not None ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use DeviceQuantileDMatrix or not should be controlled by use_gpu param if I understand correctly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far QuantileDMatrix is GPU only until #8050 is merged. feature_cols will also be a GPU-only thing for the recent future. (Actually I'm not entirely sure about the feature_cols parameter that's available on Jvm packages, I don't know how does it work with spark ml pipeline, I will leave these questions to @wbo4958 ).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to support feature_cols(multiple features columns) in xgboost pyspark estimator. The transformers in spark pipeline can assemble multiple feature columns into one vector type feature column.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @wbo4958 wanted to avoid the vector assembler since xgboost needs to undo it. (the stack_series). He wanted to have the input data just like other python libraries with 1 column per feature. I think it's a reasonable optimization looking solely on XGBoost, but might not play well with spark ml pipelines.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, I will remove it. cc @wbo4958 .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WeichenXu123, yeah, it indeed does not play well with some spark ml pipelines, but we can give users some warning about that. Some users may not use spark ml pipeline or meta-estimators at all, they may just want to train an XGBoost model. I think we should provide this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wbo4958
OK. But I doubt "1 column per feature" in spark dataframe and pass many columns to python UDF it might not have good performance when there're many features. We'd better benchmark it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WeichenXu123, Yes, we will do some benchmark testing. BTW, may I ask why "pass many columns to python UDF it might not have good performance when there're many features", is the penalty happening on the final data size of ArrowRecordBatch?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the penalty happening on the final data size of ArrowRecordBatch?

Yes, that's my concern. But I am not expert on this, maybe I am wrong. Let's benchmark.

Copy link
Contributor

@WeichenXu123 WeichenXu123 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@wbo4958 wbo4958 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

else:
cache_partitions(iterator, append_dqm)
it = PartIter(train_data, True)
dtrain = DeviceQuantileDMatrix(it, **kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems DeviceQuantileDMatrix needs the extra "max_bin", while kwargs does not contain this. But it's ok to file a follow-up for this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's mark a TODO there, to prevent forget it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wbo4958 @trivialfis Does DMatrix need max_bin param ?
And could you help me check whether DMatrix need other param settings, I might miss some params in my PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't. However, #8087 needs a fix.

- Use numpy stack for handling list of arrays.
- Reuse concat function from dask.
- Prepare for `QuantileDMatrix`.
- Remove unused code.
- Use iterator for prediction to avoid initializing xgboost model
@WeichenXu123
Copy link
Contributor

If use feature_cols, we should check that this only supports GPU (e.g. use_gpu param must be True). @wbo4958 You should add this logic in follow-up PR.

@WeichenXu123
Copy link
Contributor

@trivialfis Have you test whether it causes performance regression ? The code has big changes.

@trivialfis
Copy link
Member Author

Have you test whether it causes performance regression ? The code has big changes.

There's no performance change. Actually, I was hoping numpy can deliver something faster than python list. Seems not much can be done there.

I implemented a very primitive pyspark interface for xgboost for self-education purposes before. I handled the features column by expanding it into multiple columns first before passing it to the local fit function. But it handles only vector and array. Wondering if it's a possible future change.

@WeichenXu123
Copy link
Contributor

I handled the features column by expanding it into multiple columns first before passing it to the local fit function

I guess it might not increase performance, but has risk to downgrade performance

@wbo4958
Copy link
Contributor

wbo4958 commented Jul 26, 2022

If use feature_cols, we should check that this only supports GPU (e.g. use_gpu param must be True). @wbo4958 You should add this logic in follow-up PR.

Sure, actually, I had the follow up PR up

@trivialfis
Copy link
Member Author

guess it might not increase performance, but has risk to downgrade performance

I thought spark might handle large number of samples faster than Python. Python list is usually really slow compared to optimized procedures

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants