Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MutationScan seems to be doing the wrong thing #755

Merged
merged 17 commits into from May 18, 2024
Merged

Conversation

nikhilsimha
Copy link
Contributor

Summary

when joinSource left is events & right is entities, mutationScan is being set to false, but we still enter temporalEntities case as we should. This PR disables mutation scan all together.

Test Plan

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested

Checklist

  • Documentation update

Reviewers

@ezvz @better365

@@ -515,8 +515,8 @@ object GroupBy {
// Generate mutation Df if required, align the columns with inputDf so no additional schema is needed by aggregator.
val mutationSources = groupByConf.sources.toScala.filter { _.isSetEntities }
val mutationsColumnOrder = inputDf.columns ++ Constants.MutationFields.map(_.name)
val mutationDf =
if (mutationScan && groupByConf.inferredAccuracy == api.Accuracy.TEMPORAL && mutationSources.nonEmpty) {
def mutationDfFn(): DataFrame = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes mutationDf eval lazy.

So that it is only touched when temporal entities code is run

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

@@ -325,4 +329,12 @@ object Extensions {
}
}
}

implicit class TupleToJMapOps[K, V](tuples: Iterator[(K, V)]) {
def toJMap: util.Map[K, V] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it be more performant than the Scala Map?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scala map somehow is not serializable :-/

// events | entities | snapshot => right part tables are not aligned - so scan by leftTimeRange
// events | entities | temporal => right part tables are aligned - so scan by leftRange
// entities | entities | snapshot => right part tables are aligned - so scan by leftRange
val rightRange = if (joinConf.left.dataModel == Events && joinPart.groupBy.inferredAccuracy == Accuracy.SNAPSHOT) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the comments above

@@ -90,7 +91,8 @@ object SparkSessionBuilder {
}
val spark = builder.getOrCreate()
// disable log spam
spark.sparkContext.setLogLevel("ERROR")
// spark.sparkContext.setLogLevel("ERROR")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you want to keep this line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is an issue with logs right now - will follow up PR to fix the logs issue

@@ -117,7 +119,7 @@ object SparkSessionBuilder {
}
val spark = builder.getOrCreate()
// disable log spam
spark.sparkContext.setLogLevel("ERROR")
// spark.sparkContext.setLogLevel("ERROR")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you want to keep this line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is something wrong with our logs - they just disappear in the tests


val DefaultWarehouseDir = new File("/tmp/chronon/spark-warehouse")
private val warehouseId = java.util.UUID.randomUUID().toString.takeRight(6)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it solve the potential racing condition in the CI test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it does!

Copy link
Contributor

@better365 better365 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for cleaning up and removing unused mutationScan

@nikhilsimha nikhilsimha merged commit 5e03bc1 into main May 18, 2024
7 checks passed
@nikhilsimha nikhilsimha deleted the chaining_mutations branch May 18, 2024 18:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants