Skip to content

Latest commit

 

History

History
85 lines (68 loc) · 2.33 KB

GROUPS.md

File metadata and controls

85 lines (68 loc) · 2.33 KB

Sorted Groups

Spark provides the ability to group rows by an arbitrary key, while then providing an iterator for each of these groups. This allows to iterate over groups that are too large to fit into memory:

import org.apache.spark.sql.Dataset

import spark.implicits._

case class Val(id: Int, seq: Int, value: Double)

val ds: Dataset[Val] = Seq(
  Val(1, 1, 1.1),
  Val(1, 2, 1.2),
  Val(1, 3, 1.3),

  Val(2, 1, 2.1),
  Val(2, 2, 2.2),
  Val(2, 3, 2.3),

  Val(3, 1, 3.1)
).reverse.toDS().repartition(3).cache()

// order of iterator IS NOT guaranteed
ds.groupByKey(v => v.id)
  .flatMapGroups((key, it) => it.zipWithIndex.map(v => (key, v._2, v._1.seq, v._1.value)))
  .toDF("key", "index", "seq", "value")
  .show(false)

+---+-----+---+-----+
|key|index|seq|value|
+---+-----+---+-----+
|1  |0    |3  |1.3  |
|1  |1    |2  |1.2  |
|1  |2    |1  |1.1  |
|2  |0    |1  |2.1  |
|2  |1    |3  |2.3  |
|2  |2    |2  |2.2  |
|3  |0    |1  |3.1  |
+---+-----+---+-----+

However, we have no control over the order of the group iterators. If we want the iterators to be ordered according to seq, we can do the following:

import uk.co.gresearch.spark._

// the group key $"id" needs an ordering
implicit val ordering: Ordering.Int.type = Ordering.Int

// order of iterator IS guaranteed
ds.groupBySorted($"id")($"seq")
  .flatMapSortedGroups((key, it) => it.zipWithIndex.map(v => (key, v._2, v._1.seq, v._1.value)))
  .toDF("key", "index", "seq", "value")
  .show(false)

+---+-----+---+-----+
|key|index|seq|value|
+---+-----+---+-----+
|1  |0    |1  |1.1  |
|1  |1    |2  |1.2  |
|1  |2    |3  |1.3  |
|2  |0    |1  |2.1  |
|2  |1    |2  |2.2  |
|2  |2    |3  |2.3  |
|3  |0    |1  |3.1  |
+---+-----+---+-----+

Now, iterators are ordered according to seq, which is proven by the value of index, that has been generated by it.zipWithIndex.

Instead of column expressions, we can also use lambdas to define group key and group order:

ds.groupByKeySorted(v => v.id)(v => v.seq)
  .flatMapSortedGroups((key, it) => it.zipWithIndex.map(v => (key, v._2, v._1.seq, v._1.value)))
  .toDF("key", "index", "seq", "value")
  .show(false)

Note: Using lambdas here hides from Spark which columns we use for grouping and sorting. Query optimization cannot improve partitioning and sorting in this case. Use column expressions when possible.