Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pyspark] SparkXGBClassifier failed to train with early_stopping_rounds and validation_indicator_col #8221

Closed
faaany opened this issue Sep 2, 2022 · 53 comments · Fixed by #8231 or #8245

Comments

@faaany
Copy link

faaany commented Sep 2, 2022

Following the official SparkXGBClassifier example shown here , I try to run XGBoost training on my own dataset and get the following error:

22/09/02 17:05:15 ERROR PythonRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark-3.3.0-bin-hadoop3/python/lib/pyspark.zip/pyspark/worker.py", line 666, in main
    eval_type = read_int(infile)
  File "/opt/spark-3.3.0-bin-hadoop3/python/lib/pyspark.zip/pyspark/serializers.py", line 595, in read_int
    raise EOFError
EOFError
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
        at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
        at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
        at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
        at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
        at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
        at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
        at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
        at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
        at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
        at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
        at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1021)
        at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2268)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/xgboost/spark/core.py", line 763, in _train_booster
    **train_call_kwargs_params,
  File "/usr/local/lib/python3.7/site-packages/xgboost/spark/data.py", line 245, in create_dmatrix_from_partitions
    cache_partitions(iterator, append_fn)
  File "/usr/local/lib/python3.7/site-packages/xgboost/spark/data.py", line 62, in cache_partitions
    make_blob(valid, True)
  File "/usr/local/lib/python3.7/site-packages/xgboost/spark/data.py", line 40, in make_blob
    append(part, alias.data, is_valid)
  File "/usr/local/lib/python3.7/site-packages/xgboost/spark/data.py", line 179, in append_m
    array = stack_series(array)
  File "/usr/local/lib/python3.7/site-packages/xgboost/spark/data.py", line 16, in stack_series
    array = np.stack(array)
  File "<__array_function__ internals>", line 6, in stack
  File "/usr/local/lib/python3.7/site-packages/numpy/core/shape_base.py", line 422, in stack
    raise ValueError('need at least one array to stack')
ValueError: need at least one array to stack

Here is my code:

spark = SparkSession.builder.master('local[*]')\
        .appName("xgboost_train")\
        .config("spark.driver.memory", '300g')\
        .config("spark.local.dir", "/mnt/spark")\
        .getOrCreate()
train = spark.read.parquet(f'{train_data_path}/*').withColumn('isVal', lit(False))       
valid = spark.read.parquet(f'{valid_data_path}/*').withColumn('isVal', lit(True)) 
data = train.union(valid)
data = data.withColumnRenamed(name, 'label')
feature_list = [...] # a list with over 140 feature names
vector_assembler = VectorAssembler()\
                            .setInputCols(feature_list)\
                            .setOutputCol("features")
data_trans = vector_assembler.setHandleInvalid("keep").transform(data)
xgb_classifier = SparkXGBClassifier(max_depth=5, missing=0.0, eval_metric='logloss', early_stopping_rounds=1, validation_indicator_col='isVal')
xgb_clf_model = xgb_classifier.fit(data_trans)

The code runs through when I don't specify the validation_indicator_col in SparkXGBClassifier. But with the same configuration, it works with the official example code. I also checked the data types of both datasets. My isVal and features columns have the same data types as the example data frame.

Here is my environment setup:

  • pyspark=3.3.0
  • xgboost=2.0.0-dev

I build the xgboost from source and installed pyspark using standard pip install pyspark.

If more info is needed from my side, pls let me know. Thanks!!

@faaany
Copy link
Author

faaany commented Sep 2, 2022

@WeichenXu123 Could you pls help review this issue? Really grateful for your help!

@trivialfis
Copy link
Member

line 422, in stack raise ValueError('need at least one array to stack') ValueError: need at least one array to stack

@WeichenXu123 Is it possible to have zero data returned from the iterator?

@WeichenXu123
Copy link
Contributor

@trivialfis

Is the error caused by an empty partition input ? I haven't take deep look and I am a bit confusing.

@trivialfis
Copy link
Member

Just my guess based on the error message: need at least one array to stack.

@faaany
Copy link
Author

faaany commented Sep 5, 2022

I have a train folder and a valid folder. Both contain over 100 parquet files. For testing, I randomly select 2 parquet files from each folder to run the code. This time, I changed the parquet files for testing and the python version. I got a new error as follows:

[15:01:31] task 0 got new rank 0                                    (0 + 1) / 1]
22/09/05 15:01:35 ERROR PythonRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark-3.3.0-bin-hadoop3/python/lib/pyspark.zip/pyspark/worker.py", line 666, in main
    eval_type = read_int(infile)
  File "/opt/spark-3.3.0-bin-hadoop3/python/lib/pyspark.zip/pyspark/serializers.py", line 595, in read_int
    raise EOFError
EOFError
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
        at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
        at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
        at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
        at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
        at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
        at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
        at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
        at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
        at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
        at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
        at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1021)
        at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2268)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/xgboost/spark/core.py", line 609, in _train_booster
    booster = worker_train(
  File "/usr/local/lib/python3.8/site-packages/xgboost/spark/data.py", line 156, in _convert_partition_data_to_dmatrix
    train_val_data = _prepare_train_val_data(
  File "/usr/local/lib/python3.8/site-packages/xgboost/spark/data.py", line 19, in _prepare_train_val_data
    return _process_data_iter(
  File "/usr/local/lib/python3.8/site-packages/xgboost/spark/data.py", line 136, in _process_data_iter
    return _row_tuple_list_to_feature_matrix_y_w(
  File "/usr/local/lib/python3.8/site-packages/xgboost/spark/data.py", line 73, in _row_tuple_list_to_feature_matrix_y_w
    num_feature_dims = len(pdf["values"].values[0])
IndexError: index 0 is out of bounds for axis 0 with size 0

@faaany
Copy link
Author

faaany commented Sep 5, 2022

Do I need to have a specific version of Python or PySpark or Spark to run distributed xgboost training on spark using the new pyspark API?

@WeichenXu123
Copy link
Contributor

I will take a look, maybe dataset repartition has some issue.
@faaany Could you try manually repartition the dataset to the num_workers partitions ? like.

train_df = spark.read.load(...).repartition(num_workers)

@faaany
Copy link
Author

faaany commented Sep 6, 2022

@WeichenXu123 Awesome! thanks!

I am not sure whether the num_workers works since I am using spark local mode, but I tried it anyway with DATAFRAME.repartition(160) and the error remains. Another observation is that the df_train.rdd.getNumPartitions() gives 160 back with the example dataset and only 70 with my own dataset.

Now I will try my program with 2-node and will update you if any new finding is observed.

@faaany
Copy link
Author

faaany commented Sep 6, 2022

Now I am running my code on a 2-node standalone spark cluster. With the example code, df_train.rdd.getNumPartitions() returns 2, while it still returns 70 with my own dataset. Even after repartitioning with data_trans = data_trans.repartition(2), the error persists. Both my data_trans and df_train have the data type pyspark.sql.dataframe.DataFrame.

Might it be related to the way how I read my data, because I am using spark.read.parquet(f'{train_data_path}/*') instead of directly defining dataframe using spark.createDataFrame?

@WeichenXu123
Copy link
Contributor

@faaany
Do you mean on your dataset df_train.repartition(2).rdd.getNumPartitions() returns 70 ?
That's weird.

@faaany
Copy link
Author

faaany commented Sep 7, 2022

@WeichenXu123 no, 70 is by default, I tried repartition to 2, but the error doesn't disappear.

@WeichenXu123
Copy link
Contributor

@faaany Understood. The auto repartition logic in xgboost spark has some issue.

@WeichenXu123
Copy link
Contributor

@faaany Could you provide your spark version ?

@faaany
Copy link
Author

faaany commented Sep 7, 2022

@WeichenXu123 spark-3.3.0 is my version.

@WeichenXu123
Copy link
Contributor

@faaany
Could you try set force_repartition=True argument for xgboost spark estimator ?
Does it also fail ?

If so, this should be spark dataframe repartition result inbalance issue, in some cases, after repartition, some partition become empty.

@faaany
Copy link
Author

faaany commented Sep 7, 2022

@WeichenXu123 Yes, failed with the same error and I got this message:

You set force_repartition to true when there is no need for a repartition.Therefore, that parameter will be ignored.

I also tried to load my data into hdfs and let pyspark read data directly from hdfs, but it doesn't fix the error either.

@faaany
Copy link
Author

faaany commented Sep 7, 2022

when I look into my spark web UI, the spark job failed at this stage:

[javaToPython at NativeMethodAccessorImpl.java:0]`
`org.apache.spark.sql.Dataset.javaToPython(Dataset.scala:3679)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:282)
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
py4j.commands.CallCommand.execute(CallCommand.java:79)
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
py4j.ClientServerConnection.run(ClientServerConnection.java:106)
java.lang.Thread.run(Thread.java:748)

@WeichenXu123
Copy link
Contributor

Got it, I will file a PR soon.

@WeichenXu123
Copy link
Contributor

WeichenXu123 commented Sep 8, 2022

@faaany I filed fixing PR, could you help test ? #8231

You need to test on:

Input dataset without repartition and num_workers setting > 1 and ==1

@faaany
Copy link
Author

faaany commented Sep 8, 2022

@WeichenXu123 no, the error persists. But I noticed that the stage
javaToPython at NativeMethodAccessorImpl.java:0 succeeds and it fails in the next stage, right before it starts to train (I suppose). Here is the log:

Traceback (most recent call last):
  File "train_stage1_spark.py", line 145, in <module>
    xgb_clf_model = xgb_classifier.fit(data_trans)
  File "/opt/spark-3.3.0-bin-hadoop3/python/lib/pyspark.zip/pyspark/ml/base.py", line 205, in fit
  File "/usr/local/lib/python3.8/site-packages/xgboost/spark/core.py", line 801, in _fit
    (config, booster) = _run_job()
  File "/usr/local/lib/python3.8/site-packages/xgboost/spark/core.py", line 792, in _run_job
    dataset.mapInPandas(
  File "/opt/spark-3.3.0-bin-hadoop3/python/lib/pyspark.zip/pyspark/rdd.py", line 1197, in collect
  File "/opt/spark-3.3.0-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
  File "/opt/spark-3.3.0-bin-hadoop3/python/lib/pyspark.zip/pyspark/sql/utils.py", line 190, in deco
  File "/opt/spark-3.3.0-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Could not recover from a failed barrier ResultStage. Most recent failure reason: Stage failed because barrier task ResultTask(16, 0) finished unsuccessfully.
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/xgboost/spark/core.py", line 773, in _train_booster
    booster = worker_train(
  File "/usr/local/lib/python3.8/site-packages/xgboost/spark/data.py", line 243, in create_dmatrix_from_partitions
    cache_partitions(iterator, append_fn)
  File "/usr/local/lib/python3.8/site-packages/xgboost/spark/data.py", line 62, in cache_partitions
    make_blob(valid, True)
  File "/usr/local/lib/python3.8/site-packages/xgboost/spark/data.py", line 40, in make_blob
    append(part, alias.data, is_valid)
  File "/usr/local/lib/python3.8/site-packages/xgboost/spark/data.py", line 179, in append_m
    array = stack_series(array)
  File "/usr/local/lib/python3.8/site-packages/xgboost/spark/data.py", line 16, in stack_series
    array = np.stack(array)
  File "<__array_function__ internals>", line 180, in stack
  File "/usr/local/lib/python3.8/site-packages/numpy/core/shape_base.py", line 422, in stack
    raise ValueError('need at least one array to stack')
ValueError: need at least one array to stack
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
        at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:101)
        at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:50)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
        at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:86)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:80)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
        at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:732)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:438)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2066)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:272)
        at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:2111)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2857)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2293)
        at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
        at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)
        at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.lang.Thread.run(Thread.java:748)

@WeichenXu123
Copy link
Contributor

@faaany Strange.. What's the num_workers do you set ? And would you provide the records count of every partition in your input dataframe ?

@faaany
Copy link
Author

faaany commented Sep 8, 2022

@WeichenXu123 this is my spark configurations:

spark = SparkSession.builder.master(f'spark://{master_name}:7077')\
        .appName("xgboost_train")\
        .config("spark.driver.memory", '30g')\
        .config("spark.local.dir", "/mnt/tmp/spark")\
        .config("spark.executor.memory", "30g")\
        .config("spark.executor.memoryOverhead", "20g")\
        .config("spark.executor.cores", "8")\
        .config("spark.executor.instances","16")\
        .config("spark.task.cpus","8")\
        .getOrCreate()

How can I count the records for each partition in spark? This is how I read my data:

def read_data(data_path):

    train = spark.read.parquet(data_path+'train/').withColumn('isVal', lit(False))
    print(train.count(), len(train.columns))
    
    valid = spark.read.parquet(data_path+'valid/').withColumn('isVal', lit(True))
    print(valid.count(), len(valid.columns))
    
    data = train.union(valid) 

    print(data.count(), len(data.columns))

    return data

Then I just pass the data to the vector assembler and start training. I tried to set num_workers to 1 and to 16:

vector_assembler = VectorAssembler()\
                            .setInputCols(feature_list[i])\
                            .setOutputCol("features")

data_trans = vector_assembler.setHandleInvalid("skip").transform(data)

xgb_classifier = SparkXGBClassifier(num_workers=1, max_depth=5, missing=0.0, label_col=name, validation_indicator_col='isVal', early_stopping_rounds=1, eval_metric='logloss')
        
xgb_clf_model = xgb_classifier.fit(data_trans)

BTW, the objective in my case is "binary: logistic", right? I somehow can't set the objective in the estimator.

@wbo4958
Copy link
Contributor

wbo4958 commented Sep 9, 2022

@faaany Would you have the minimum dataset to repro it?

@faaany
Copy link
Author

faaany commented Sep 9, 2022

@wbo4958 I will try a public dataset, see whether I could reproduce it, and post my code here.

@wbo4958
Copy link
Contributor

wbo4958 commented Sep 9, 2022

@faaany thx

@WeichenXu123
Copy link
Contributor

@faaany

I updated my PR #8231 to add debugging log.
Could you run your failure case on my latest PR and then copy the output log to me ?
The related logs which I added start with "debug-repartition" prefix.

@WeichenXu123
Copy link
Contributor

WeichenXu123 commented Sep 10, 2022

Emm, seemingly I can reproduce it, the root reason I am still investigating.

df1 = spark.createDataFrame(sc.parallelize([(i,) for i in range(100)], 50))

def mapper1(iter):
  yield [r._1 for r in iter]


# The result shows that it generates 2 empty partitions.
df1.repartition(4).rdd.mapPartitions(mapper1).collect() 

@trivialfis Does DMatrix allow empty partition input (for distributed training) ?

@trivialfis
Copy link
Member

trivialfis commented Sep 10, 2022

Yes, you can see tests around empty DMatrix in dask tests. It takes some cares to be sure that everything is correctly synchronized like the device ID and the number of columns.

@faaany
Copy link
Author

faaany commented Sep 10, 2022

@WeichenXu123 My machines are not reachable somehow over the weekend.. I will give you updates as soon as I could login to my machines

@wbo4958
Copy link
Contributor

wbo4958 commented Sep 10, 2022

Right,

df1 = spark.createDataFrame(sc.parallelize([(i,) for i in range(100)], 50))

def mapper1(iter):
  yield [r._1 for r in iter]

# The result shows that it generates 2 empty partitions.
df1.repartition(4).rdd.mapPartitions(mapper1).collect() 

It indeed generates the empty partitions after repartition, Just debugged it and found the RoundRobin seems to ensure to distribute the records evenly in the same partition, and not guarantee it between partitions.

In this case, there are 50 partitions, each partition will only compute 2 elements, but when choosing the random position (to begin doing the round robin), it also starts to begin with position=2.

      case RoundRobinPartitioning(numPartitions) =>
        // Distributes elements evenly across output partitions, starting from a random partition.
        // In this case, the position will always be 2, when the total partition is 50,
        var position = new Random(TaskContext.get().partitionId()).nextInt(numPartitions)  
        (row: InternalRow) => {
          // The HashPartitioner will handle the `mod` by the number of partitions
          position += 1
          position
        }
(1 to 200).foreach(i => print(new Random(i).nextInt(4) + " "))   // all to print is 2

Let's back to this case,

consider partition 0, the total elements are [0, 1], so when shuffle writes, for element 0, the key will be (position + 1) = 2 + 1 = 3%4=3, the element 1, the key will be (position + 1)=(3+1)=4%4 = 0

The calculation for partition 0 is also applied for other left partitions since the start position is always 2 for this case.

@WeichenXu123 Do you think we need to file the PR for spark to improve this? I'd love to file the PR for the spark

@WeichenXu123
Copy link
Contributor

@wbo4958 Appreciate if you can file PR in spark. :)

@WeichenXu123
Copy link
Contributor

@wbo4958 As a workaround fix, shall we use repartition on a rand() column in xgboost spark ? i.e. what I did in this PR #8231

@wbo4958
Copy link
Contributor

wbo4958 commented Sep 13, 2022

Yeah, rand() can fix it in this case but Seems Rand is a Nondeterministic expression, which means each time, the training model may be different. I really don't like this since it's hard to debug when the issue happens, the Rand seems to introduce some extra XORShiftRandom calculation, I don't know if it will hurt the performance issue.

For these special cases, could we suggest users adjust their input partition numbers by a bunch of configurations, like max partitions size and so on.

@wbo4958
Copy link
Contributor

wbo4958 commented Sep 13, 2022

Hi @faaany,

I re-read this issue,

The code runs through when I don't specify the validation_indicator_col in SparkXGBClassifier.

Does that mean without setting validation_indicator_col in your case, the test can pass? And it will fail if you specified validation_indicator_col? If that so, the issue may be not related with "repartition"

@wbo4958
Copy link
Contributor

wbo4958 commented Sep 13, 2022

Hi @faaany, It would be better if you can provide some faking data that will really help to debug this issue.

@WeichenXu123
Copy link
Contributor

@wbo4958

Yeah, rand() can fix it in this case but Seems Rand is a Nondeterministic expression

Not true, we can set seed to make it deterministic, see my PR #8231

Does that mean without setting validation_indicator_col in your case, the test can pass? And it will fail if you specified validation_indicator_col? If that so, the issue may be not related with "repartition"

No. It is still the same repartition issue. If set validation_indicator_col, then each spark dataframe partition we will filter by validation_indicator_col column value, and generate 2 Dmatrix (train and validation). It changes each DMatrix partition example counts.

@wbo4958
Copy link
Contributor

wbo4958 commented Sep 13, 2022

@WeichenXu123, That's Good. BTW, does it cause any data skew? It's better to add a parameter to control this.

@WeichenXu123
Copy link
Contributor

does it cause any data skew

Random shuffle should not cause data skew.

It's better to add a parameter to control this

Sounds good. Shall we make random shuffle by default ?

@faaany
Copy link
Author

faaany commented Sep 13, 2022

@wbo4958 Yes, you are right. Setting validation_indicator_col for my dataset causes the error. And sure, since my machines are back now, I will provide a mini-example today so that you guys can better debug the issue.

@wbo4958
Copy link
Contributor

wbo4958 commented Sep 13, 2022

@faaany, I can repro this issue locally and am debugging it.

@faaany
Copy link
Author

faaany commented Sep 13, 2022

@wbo4958 awesome!! thanks!!

@wbo4958
Copy link
Contributor

wbo4958 commented Sep 13, 2022

After debugging, I found there may be 2 issues.

issue 1

When specifying validation col, it is possible there is no training data or validation data. In that case, it will cause need at least one array to stack issue when stacking the array since we didn't check if the pd.DataFrame(pd.Series) has the values. This issue can be reproduced by the below code

    spark = (SparkSession.builder.master('local[2]')
             .appName("xgboost_train")
             .config("spark.driver.memory", '10g')
             .config("spark.local.dir", "/tmp/spark")
             .getOrCreate()
             )
    df_train = spark.createDataFrame([
        (Vectors.dense(10.1, 11.2, 11.3), 0, False, 1.0),
        (Vectors.dense(1, 1.2, 1.3), 1, False, 2.0),
        (Vectors.dense(14.0, 15.0, 16.0), 0, False, 1.0),
        (Vectors.dense(1.1, 1.2, 1.3), 1, True, 2.0),
    ], ["features", "label", "val_col", "weight_col"])
    print(f"num ---- {df_train.rdd.getNumPartitions()}")
    xgb_classifier = SparkXGBClassifier(
        n_estimators=100,
        num_workers=2,
        min_child_weight=0.0, reg_alpha=0, reg_lambda=0,
        validation_indicator_col='val_col',
    )
    xgb_clf_model = xgb_classifier.fit(df_train)
    xgb_clf_model.get_booster().save_model("/tmp/xyzb.json")
    xgb_clf_model.transform(df_train).show()

issue 2

The whole partition has no data caused by df.repartition(), see #8221 (comment) this is true. But this is really a Spark issue, I filed the JIRA https://issues.apache.org/jira/browse/SPARK-40407 and the PR apache/spark#37855. But XGBoost itself supports the empty DMatrix in some workers. So in that case, we can fix it on XGBoost side and make XGBoost more robust.

The PR is ready only for constructing the normal DMatrix, I haven't tested it for qdm and sparse matrix yet, #8245 @faaany Could you have a chance to test it in your real case?

@WeichenXu123
Copy link
Contributor

Thanks @wbo4958

I propose the following fix:

(1) Repartition on rand column (but we can fix rand seed)
(2) Allow DMatrix partition empty input
(3) When validation col specified, force repartitioning (because user might union train and validation dataset, without repartitioning then train/validation data is unbalanced in each spark dataframe partition

@wbo4958
Copy link
Contributor

wbo4958 commented Sep 13, 2022

@WeichenXu123, Good suggestion, My PR just fixed 2, can you add 3 in your PR?

@WeichenXu123
Copy link
Contributor

WeichenXu123 commented Sep 14, 2022

@wbo4958 My PR already fixed (3): #8231

@faaany
Copy link
Author

faaany commented Oct 13, 2022

@wbo4958 @WeichenXu123
Hi guys, sorry for my late response to this thread. I finally had the chance to try the new code and it works well!!

I just had one problem when I am trying to get the training loss by using this:

results = {}
xgb_classifier = SparkXGBClassifier(**xgb_parms, validation_indicator_col='isVal', evals_result=results, verbose_eval=25, num_workers=16, n_estimators=250, early_stopping_rounds=25, label_col=name)

Then I got this error:

TypeError: train() got multiple values for keyword argument 'evals_result'

It seems that the evals_result is already set within SparkXGBClassifier, but then how can I retrieve the results? I couldn't find anything in the documentation, then I tried some options myself, but none of them works. Thanks!

@wbo4958
Copy link
Contributor

wbo4958 commented Oct 18, 2022

Thx @faaany , it's definitely a bug, I will fix it. thank you for reporting it.

@faaany
Copy link
Author

faaany commented Oct 18, 2022

Awesome! Thanks! @wbo4958

@wbo4958
Copy link
Contributor

wbo4958 commented Oct 18, 2022

@faaany, I had the PR up to throw the exception when detecting unsupported parameters.

But to be honest, the real evals_result feature has not been supported by xgboost pyspark. Considering the xgboost 1.7 release is coming, I won't intend to add it at this time, hope you understand it. But I will add the feature after 1.7 is released.

@trivialfis
Copy link
Member

The evals_result is not a pyspark estimator parameter (not supposed to be). We haven't implemented the summary method for the spark estimator and will complete its support in the next release.

@faaany
Copy link
Author

faaany commented Oct 19, 2022

@wbo4958 @trivialfis sure, the next release is good! thank you, guys!

@sajadn
Copy link

sajadn commented Mar 1, 2024

I can see the same error on SparkXGBClassifier with device='cuda'. It works for device='cpu' though. Any suggestions?
Also is the fix above part of the released 2.0.3 version?

@wbo4958
Copy link
Contributor

wbo4958 commented Mar 5, 2024

Hi @sajadn, Could you file a new issue for it with the repro code? Thx

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants