Skip to content

Commit

Permalink
Flow.flattenMerge benchmark (#1464)
Browse files Browse the repository at this point in the history
  • Loading branch information
fmixing authored and qwwdfsad committed Dec 26, 2019
1 parent 4f24a7a commit fe15b6d
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 14 deletions.
2 changes: 2 additions & 0 deletions benchmarks/build.gradle
Expand Up @@ -74,4 +74,6 @@ dependencies {
compile "org.openjdk.jmh:jmh-core:1.21"
compile 'com.typesafe.akka:akka-actor_2.12:2.5.0'
compile project(':kotlinx-coroutines-core')
// add jmh dependency on main
jmh sourceSets.main.runtimeClasspath
}
75 changes: 75 additions & 0 deletions benchmarks/scripts/generate_plots_flow_flatten_merge.py
@@ -0,0 +1,75 @@
# To run this script run the command 'python3 scripts/generate_plots_flow_flatten_merge.py' in the /benchmarks folder


import pandas as pd
import sys
import locale
import matplotlib.pyplot as plt
from matplotlib.ticker import FormatStrFormatter

input_file = "build/reports/jmh/results.csv"
output_file = "out/flow-flatten-merge.svg"
# Please change the value of this variable according to the FlowFlattenMergeBenchmarkKt.ELEMENTS
elements = 100000
benchmark_name = "benchmarks.flow.FlowFlattenMergeBenchmark.flattenMerge"
csv_columns = ["Benchmark", "Score", "Unit", "Param: concurrency", "Param: flowsNumberStrategy"]
rename_columns = {"Benchmark": "benchmark", "Score" : "score", "Unit" : "unit",
"Param: concurrency" : "concurrency", "Param: flowsNumberStrategy" : "flows"}

markers = ['.', 'v', '^', '1', '2', '8', 'p', 'P', 'x', 'D', 'd', 's']
colours = ['red', 'gold', 'sienna', 'olivedrab', 'lightseagreen', 'navy', 'blue', 'm', 'crimson', 'yellow', 'orangered', 'slateblue', 'aqua', 'black', 'silver']

def next_colour():
i = 0
while True:
yield colours[i % len(colours)]
i += 1

def next_marker():
i = 0
while True:
yield markers[i % len(markers)]
i += 1

def draw(data, plt):
plt.xscale('log', basex=2)
plt.gca().xaxis.set_major_formatter(FormatStrFormatter('%0.f'))
plt.grid(linewidth='0.5', color='lightgray')
if data.unit.unique()[0] != "ops/s":
print("Unexpected time unit: " + data.unit.unique()[0])
sys.exit(1)
plt.ylabel("elements / ms")
plt.xlabel('concurrency')
plt.xticks(data.concurrency.unique())

colour_gen = next_colour()
marker_gen = next_marker()
for flows in data.flows.unique():
gen_colour = next(colour_gen)
gen_marker = next(marker_gen)
res = data[(data.flows == flows)]
# plt.plot(res.concurrency, res.score*elements/1000, label="flows={}".format(flows), color=gen_colour, marker=gen_marker)
plt.errorbar(x=res.concurrency, y=res.score*elements/1000, yerr=res.score_error*elements/1000, solid_capstyle='projecting',
label="flows={}".format(flows), capsize=4, color=gen_colour, linewidth=2.2)

langlocale = locale.getdefaultlocale()[0]
locale.setlocale(locale.LC_ALL, langlocale)
dp = locale.localeconv()['decimal_point']
if dp == ",":
csv_columns.append("Score Error (99,9%)")
rename_columns["Score Error (99,9%)"] = "score_error"
elif dp == ".":
csv_columns.append("Score Error (99.9%)")
rename_columns["Score Error (99.9%)"] = "score_error"
else:
print("Unexpected locale delimeter: " + dp)
sys.exit(1)
data = pd.read_csv(input_file, sep=",", decimal=dp)
data = data[csv_columns].rename(columns=rename_columns)
data = data[(data.benchmark == benchmark_name)]
plt.rcParams.update({'font.size': 15})
plt.figure(figsize=(12.5, 10))
draw(data, plt)
plt.legend(loc='upper center', borderpad=0, bbox_to_anchor=(0.5, 1.3), ncol=2, frameon=False, borderaxespad=2, prop={'size': 15})
plt.tight_layout(pad=12, w_pad=2, h_pad=1)
plt.savefig(output_file, bbox_inches='tight')
19 changes: 5 additions & 14 deletions benchmarks/src/jmh/kotlin/benchmarks/SemaphoreBenchmark.kt
@@ -1,13 +1,13 @@
package benchmarks

import benchmarks.common.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.scheduling.ExperimentalCoroutineDispatcher
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
import org.openjdk.jmh.annotations.*
import java.util.concurrent.ForkJoinPool
import java.util.concurrent.ThreadLocalRandom
import java.util.concurrent.TimeUnit

@Warmup(iterations = 3, time = 500, timeUnit = TimeUnit.MICROSECONDS)
Expand Down Expand Up @@ -50,9 +50,9 @@ open class SemaphoreBenchmark {
jobs += GlobalScope.launch {
repeat(n) {
semaphore.withPermit {
doWork(WORK_INSIDE)
doGeomDistrWork(WORK_INSIDE)
}
doWork(WORK_OUTSIDE)
doGeomDistrWork(WORK_OUTSIDE)
}
}
}
Expand All @@ -68,9 +68,9 @@ open class SemaphoreBenchmark {
jobs += GlobalScope.launch {
repeat(n) {
semaphore.send(Unit) // acquire
doWork(WORK_INSIDE)
doGeomDistrWork(WORK_INSIDE)
semaphore.receive() // release
doWork(WORK_OUTSIDE)
doGeomDistrWork(WORK_OUTSIDE)
}
}
}
Expand All @@ -83,15 +83,6 @@ enum class SemaphoreBenchDispatcherCreator(val create: (parallelism: Int) -> Cor
EXPERIMENTAL({ parallelism -> ExperimentalCoroutineDispatcher(corePoolSize = parallelism, maxPoolSize = parallelism) })
}

private fun doWork(work: Int) {
// We use geometric distribution here
val p = 1.0 / work
val r = ThreadLocalRandom.current()
while (true) {
if (r.nextDouble() < p) break
}
}

private const val WORK_INSIDE = 80
private const val WORK_OUTSIDE = 40
private const val BATCH_SIZE = 1000000
@@ -0,0 +1,63 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package benchmarks.flow

import benchmarks.common.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import org.openjdk.jmh.annotations.*
import java.util.concurrent.*

/**
* Benchmark to measure performance of [kotlinx.coroutines.flow.FlowKt.flattenMerge].
* In addition to that, it can be considered as a macro benchmark for the [kotlinx.coroutines.sync.Semaphore]
*/
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 5, time = 1)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
@State(Scope.Benchmark)
@Fork(1)
open class FlowFlattenMergeBenchmark {
@Param
private var flowsNumberStrategy: FlowsNumberStrategy = FlowsNumberStrategy.`10xConcurrency flows`

@Param("1", "2", "4", "8")
private var concurrency: Int = 0

private lateinit var flow: Flow<Flow<Int>>

@Setup
fun setup() {
val n = flowsNumberStrategy.get(concurrency)
val flowElementsToProcess = ELEMENTS / n

flow = (1..n).asFlow().map {
flow {
repeat(flowElementsToProcess) {
doGeomDistrWork(WORK)
emit(it)
}
}
}
}

@Benchmark
fun flattenMerge() = runBlocking(Dispatchers.Default) {
flow.flattenMerge(concurrency = concurrency).collect()
}
}

enum class FlowsNumberStrategy(val get: (concurrency: Int) -> Int) {
`10xConcurrency flows`({ concurrency -> concurrency * 10 }),
`1xConcurrency flows`({ it }),
`100 flows`({ 100 }),
`500 flows`({ 500 })
}

// If you change this variable please be sure that you change variable elements in the generate_plots_flow_flatten_merge.py
// python script as well
private const val ELEMENTS = 100_000
private const val WORK = 100
17 changes: 17 additions & 0 deletions benchmarks/src/main/kotlin/benchmarks/common/BenchmarkUtils.kt
@@ -0,0 +1,17 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package benchmarks.common

import java.util.concurrent.*

fun doGeomDistrWork(work: Int) {
// We use geometric distribution here. We also checked on macbook pro 13" (2017) that the resulting work times
// are distributed geometrically, see https://github.com/Kotlin/kotlinx.coroutines/pull/1464#discussion_r355705325
val p = 1.0 / work
val r = ThreadLocalRandom.current()
while (true) {
if (r.nextDouble() < p) break
}
}

0 comments on commit fe15b6d

Please sign in to comment.