Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pyspark] SparkXGBRanker does not work on dataframe with multiple partitions #8491

Closed
tracykyle93 opened this issue Nov 28, 2022 · 10 comments · Fixed by #8497
Closed

[pyspark] SparkXGBRanker does not work on dataframe with multiple partitions #8491

tracykyle93 opened this issue Nov 28, 2022 · 10 comments · Fixed by #8497

Comments

@tracykyle93
Copy link

I have experienced the same issue mentioned in this discussion forum https://discuss.xgboost.ai/t/sparkxgbranker-does-not-work-on-parallel-workers/2986, and did not see any issue raised in Github, so I just do it here

"SparkXGBRanker from 1.7.0 release requires data to be sorted by qid. It works fine if we have one worker and sorted dataframe. However with multiple workers data comes to them unordered and raises exception:

org.apache.spark.api.python.PythonException: 'xgboost.core.XGBoostError: [17:46:40] …/src/data/data.cc:486: Check failed: non_dec: qid must be sorted in non-decreasing order along with data.

...

Is there any ways to prepare df in sorted order for workers? Or sorting should be done on each worker?"

One trivial solution is that given a df with ['qid', 'label', 'features'],
df = df.repartition(1)
df = df.sort(df.qid.asc())

repartition into 1 and do the sorting can make the SparkXGBRanker run without any error as far as I explore, but such expensive operation really slow down the total processing time, could you add support for such case? Thanks in advance!

@WeichenXu123
Copy link
Contributor

WeichenXu123 commented Nov 29, 2022

@tracykyle93
Does spark_df.sortWithinPartitions work ? you don't need to call repartition(1)

@trivialfis

For this qid must be sorted in non-decreasing order along with data.,
does it only require sorting within local DMatrix data ? or it requires sorting globally across all distributed worker DMatrix data ?

@trivialfis
Copy link
Member

Local is sufficient.

@tracykyle93
Copy link
Author

tracykyle93 commented Nov 29, 2022

Hi @WeichenXu123 @trivialfis , thanks for the quick response!

However, spark_df.sortWithinPartitions does not work, code to reproduce this issue,

`from xgboost.spark import SparkXGBRanker
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession, SQLContext

sparkSession = (SparkSession
.builder
.config('spark.dynamicAllocation.enabled', 'false')
.appName('clean-up')
.enableHiveSupport()
.getOrCreate())

df_train = sparkSession.createDataFrame(
[
(Vectors.dense(1.0, 2.0, 3.0), 0, 0),
(Vectors.dense(4.0, 5.0, 6.0), 1, 0),
(Vectors.dense(9.0, 4.0, 8.0), 2, 0),
(Vectors.sparse(3, {1: 1.0, 2: 5.5}), 0, 1),
(Vectors.sparse(3, {1: 6.0, 2: 7.5}), 1, 1),
(Vectors.sparse(3, {1: 8.0, 2: 9.5}), 2, 1),
(Vectors.dense(1.0, 2.0, 3.0), 0, 2),
(Vectors.dense(4.0, 5.0, 6.0), 1, 2),
(Vectors.dense(9.0, 4.0, 8.0), 2, 2),
(Vectors.sparse(3, {1: 1.0, 2: 5.5}), 0, 3),
(Vectors.sparse(3, {1: 6.0, 2: 7.5}), 1, 3),
(Vectors.sparse(3, {1: 8.0, 2: 9.5}), 2, 3),
(Vectors.dense(1.0, 2.0, 3.0), 0, 4),
(Vectors.dense(4.0, 5.0, 6.0), 1, 4),
(Vectors.dense(9.0, 4.0, 8.0), 2, 4),
(Vectors.sparse(3, {1: 1.0, 2: 5.5}), 0, 5),
(Vectors.sparse(3, {1: 6.0, 2: 7.5}), 1, 5),
(Vectors.sparse(3, {1: 8.0, 2: 9.5}), 2, 5),
(Vectors.dense(1.0, 2.0, 3.0), 0, 6),
(Vectors.dense(4.0, 5.0, 6.0), 1, 6),
(Vectors.dense(9.0, 4.0, 8.0), 2, 6),
(Vectors.sparse(3, {1: 1.0, 2: 5.5}), 0, 7),
(Vectors.sparse(3, {1: 6.0, 2: 7.5}), 1, 7),
(Vectors.sparse(3, {1: 8.0, 2: 9.5}), 2, 7),
(Vectors.dense(1.0, 2.0, 3.0), 0, 8),
(Vectors.dense(4.0, 5.0, 6.0), 1, 8),
(Vectors.dense(9.0, 4.0, 8.0), 2, 8),
(Vectors.sparse(3, {1: 1.0, 2: 5.5}), 0, 9),
(Vectors.sparse(3, {1: 6.0, 2: 7.5}), 1, 9),
(Vectors.sparse(3, {1: 8.0, 2: 9.5}), 2, 9),
],
["features", "label", "qid"],
)

print('partition number of df_train -- {}'.format(df_train.rdd.getNumPartitions()))

df_train = df_train.sortWithinPartitions("qid", ascending=True)

ranker = SparkXGBRanker(qid_col="qid")
model = ranker.fit(df_train)
model.transform(df_test).show()`

error log:
partition number of df_train -- 2
xgboost.core.XGBoostError: [20:33:28] ../src/data/data.cc:486: Check failed: non_dec: qid must be sorted in non-decreasing order along with data.

Sorting with local DMatrix data (within partitions) is definitely efficient, look forward to fix this issue

@wbo4958
Copy link
Contributor

wbo4958 commented Nov 30, 2022

Yeah, I can repro it according to #8491 (comment)

I will provide a fix for it.

@WeichenXu123
Copy link
Contributor

@wbo4958 Thanks!

@WeichenXu123
Copy link
Contributor

@trivialfis
Another issue: does it require each group data exist in only one worker DMatrix ?

@trivialfis
Copy link
Member

trivialfis commented Nov 30, 2022

No. We calculate gradient based on query group, and gradient calculation is not distributed, only the final histogram bin is synchronized. Is it possible that a better organized group with larger amount of data can bring better accuracy via better gradient? Maybe, but it's not required.

@wbo4958
Copy link
Contributor

wbo4958 commented Nov 30, 2022

@tracykyle93 could you help to verify #8497 ? BTW, df_train = df_train.sortWithinPartitions("qid", ascending=True) is not needed anymore.

@tracykyle93
Copy link
Author

Thanks for the hotfix @wbo4958 , while I want to test your change, I can not build from source on the hotfix branch, I have posted the question in the discussion forum https://discuss.xgboost.ai/t/fail-to-build-from-source-on-hotfix-branch/3015

@wbo4958
Copy link
Contributor

wbo4958 commented Dec 1, 2022

@tracykyle93 I can repro it. could you file an issue for xgboost. BTW, to workaround it. please use the newest pyspark. for example, pip install pyspark

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants