Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Apache Doris storage format #737

Open
javaht opened this issue Aug 24, 2023 · 1 comment
Open

Support for Apache Doris storage format #737

javaht opened this issue Aug 24, 2023 · 1 comment
Labels

Comments

@javaht
Copy link

javaht commented Aug 24, 2023

    proDs.write().format("doris")
            .option("doris.table.identifier", "test.tablename")
            .option("doris.fenodes", "10.67.xx.xx:8030")
            .option("user", "root")
            .option("password", "xxx")
            .option("dbtable","test.tablename")
            .mode(SaveMode.Overwrite)
            .save();
23/08/24 13:53:59 ERROR SplineAgent: Unexpected error occurred during lineage processing for application: heihei #local-1692856419191
java.lang.RuntimeException: Write extraction failed for: class org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand
	at za.co.absa.spline.harvester.LineageHarvester.tryExtractWriteCommand(LineageHarvester.scala:153)
	at za.co.absa.spline.harvester.LineageHarvester.harvest(LineageHarvester.scala:61)
	at za.co.absa.spline.agent.SplineAgent$$anon$1.$anonfun$handle$1(SplineAgent.scala:91)
	at za.co.absa.spline.agent.SplineAgent$$anon$1.withErrorHandling(SplineAgent.scala:100)
	at za.co.absa.spline.agent.SplineAgent$$anon$1.handle(SplineAgent.scala:72)
	at za.co.absa.spline.harvester.listener.QueryExecutionListenerDelegate.onFailure(QueryExecutionListenerDelegate.scala:32)
	at za.co.absa.spline.harvester.SparkLineageInitializer$$anon$2.za$co$absa$spline$harvester$listener$QueryExecutionListenerDecorators$FatalFailureOmitting$$super$onFailure(SparkLineageInitializer.scala:169)
	at za.co.absa.spline.harvester.listener.QueryExecutionListenerDecorators$FatalFailureOmitting.onFailure(QueryExecutionListenerDecorators.scala:36)
	at za.co.absa.spline.harvester.listener.QueryExecutionListenerDecorators$FatalFailureOmitting.onFailure$(QueryExecutionListenerDecorators.scala:33)
	at za.co.absa.spline.harvester.SparkLineageInitializer$$anon$2.onFailure(SparkLineageInitializer.scala:169)
	at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:163)
	at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:135)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
	at org.apache.spark.sql.util.ExecutionListenerBus.postToAll(QueryExecutionListener.scala:135)
	at org.apache.spark.sql.util.ExecutionListenerBus.onOtherEvent(QueryExecutionListener.scala:147)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:100)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
	at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
	at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1446)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)
Caused by: java.lang.RuntimeException: Cannot extract source URI from the options: doris.fenodes,dbtable,doris.table.identifier,user,password
	at scala.sys.package$.error(package.scala:30)
	at za.co.absa.spline.harvester.plugin.composite.SaveIntoDataSourceCommandPlugin$$anonfun$writeNodeProcessor$1.$anonfun$applyOrElse$2(SaveIntoDataSourceCommandPlugin.scala:55)
	at scala.Option.getOrElse(Option.scala:189)
	at za.co.absa.spline.harvester.plugin.composite.SaveIntoDataSourceCommandPlugin$$anonfun$writeNodeProcessor$1.applyOrElse(SaveIntoDataSourceCommandPlugin.scala:55)
	at za.co.absa.spline.harvester.plugin.composite.SaveIntoDataSourceCommandPlugin$$anonfun$writeNodeProcessor$1.applyOrElse(SaveIntoDataSourceCommandPlugin.scala:45)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
	at scala.PartialFunction$Lifted.apply(PartialFunction.scala:228)
	at scala.PartialFunction$Lifted.apply(PartialFunction.scala:224)
	at za.co.absa.spline.harvester.builder.write.PluggableWriteCommandExtractor.asWriteCommand(PluggableWriteCommandExtractor.scala:45)
	at za.co.absa.spline.harvester.LineageHarvester.$anonfun$tryExtractWriteCommand$1(LineageHarvester.scala:145)
	at scala.util.Try$.apply(Try.scala:213)
	at za.co.absa.spline.harvester.LineageHarvester.tryExtractWriteCommand(LineageHarvester.scala:145)
	... 29 more
@wajda wajda changed the title [BUG] spark writing data to doris prompts that doris parameters are not recognized Support for Apache Doris storage format Aug 24, 2023
@wajda wajda added the feature label Aug 24, 2023
@wajda
Copy link
Contributor

wajda commented Aug 24, 2023

Hi @javaht,
thank you for reporting it.
Currently, Spline agent doesn't support Doris format, but it should be possible to add a support for this by implementing a Spline agent plugin.
Could you possible try to do it and make a pull-request?

Here's a little bit of documentation about Spline agent plugins - https://github.com/AbsaOSS/spline-spark-agent#plugin-api
Example project for building Spline agent extensions (filters, dispatchers or plugins) - https://github.com/AbsaOSS/spline-getting-started/tree/main/spark-agent-extension-example

You can start with a simple solution by implementing the DataSourceFormatNameResolving trait (see AvroPlugin for example - https://github.com/AbsaOSS/spline-spark-agent/blob/develop/core/src/main/scala/za/co/absa/spline/harvester/plugin/embedded/AvroPlugin.scala)
If it's not enough then you might need to implement the RelationProviderProcessing trait (see KafkaPlugin, MongoPlugin, CassandraPlugin etc).

We would highly appreciate your contribution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Status: New
Development

No branches or pull requests

2 participants