Skip to content

Commit

Permalink
Introduce SegmentQueueSynchronizer abstraction and add `ReadWriteMu…
Browse files Browse the repository at this point in the history
…tex`
  • Loading branch information
ndkoval committed Oct 23, 2020
1 parent 9587590 commit 8c364c9
Show file tree
Hide file tree
Showing 9 changed files with 2,347 additions and 179 deletions.
74 changes: 74 additions & 0 deletions benchmarks/scripts/generate_plots_semaphore_jvm.py
@@ -0,0 +1,74 @@
# To run this script run the command 'python3 scripts/generate_plots_semaphore_jvm.py' in the /benchmarks folder


import pandas as pd
import sys
import locale
import matplotlib.pyplot as plt
from matplotlib.ticker import FormatStrFormatter

input_file = "build/reports/jmh/results.csv"
output_file = "out/semaphore_jvm.svg"
# Please change the value of this variable according to the FlowFlattenMergeBenchmarkKt.ELEMENTS
operations = 1000000
csv_columns = ["Score", "Param: parallelism", "Param: maxPermits", "Param: algo"]
rename_columns = {"Score": "score", "Param: parallelism" : "threads", "Param: maxPermits" : "permits", "Param: algo": "algo"}

markers = ['v', 'P', 'x', '8', 'd', '1', '2', '8', 'p']
# markers = ['.', 'v', 'P', 'x', '8', 'd', '1', '2', '8', 'p']
colours = ["darkorange", "seagreen", "red", "blueviolet", "sienna"]
# colours = ["royalblue", "darkorange", "seagreen", "red", "blueviolet", "sienna"]

def next_colour():
i = 0
while True:
yield colours[i % len(colours)]
i += 1

def next_marker():
i = 0
while True:
yield markers[i % len(markers)]
i += 1

def draw(data, plt):
plt.xscale('log', basex=2)
plt.gca().xaxis.set_major_formatter(FormatStrFormatter('%0.f'))
plt.grid(linewidth='0.5', color='lightgray')
plt.ylabel("us / op")
plt.xlabel('threads')
plt.xticks(data.threads.unique())

colour_gen = next_colour()
marker_gen = next_marker()
for algo in data.algo.unique():
gen_colour = next(colour_gen)
gen_marker = next(marker_gen)
res = data[(data.algo == algo)]
plt.plot(res.threads, res.score * 1000 / operations, label="{}".format(algo), color=gen_colour, marker=gen_marker)
# plt.errorbar(x=res.concurrency, y=res.score*elements/1000, yerr=res.score_error*elements/1000, solid_capstyle='projecting',
# label="flows={}".format(flows), capsize=4, color=gen_colour, linewidth=2.2)

langlocale = locale.getdefaultlocale()[0]
locale.setlocale(locale.LC_ALL, langlocale)
dp = locale.localeconv()['decimal_point']
if dp == ",":
csv_columns.append("Score Error (99,9%)")
rename_columns["Score Error (99,9%)"] = "score_error"
elif dp == ".":
csv_columns.append("Score Error (99.9%)")
rename_columns["Score Error (99.9%)"] = "score_error"
else:
print("Unexpected locale delimeter: " + dp)
sys.exit(1)
data = pd.read_csv(input_file, sep=",", decimal=dp)
data = data[csv_columns].rename(columns=rename_columns)
data = data[(data.permits==8)]
data = data[(data.algo!="Java ReentrantLock")]
data = data[(data.algo!="Java Semaphore")]
plt.rcParams.update({'font.size': 15})
plt.figure(figsize=(12, 12))
draw(data, plt)
plt.legend(loc='lower center', borderpad=0, bbox_to_anchor=(0.5, 1.3), ncol=2, frameon=False, borderaxespad=2, prop={'size': 15})
plt.tight_layout(pad=12, w_pad=2, h_pad=1)
plt.savefig(output_file, bbox_inches='tight')
2 changes: 1 addition & 1 deletion gradle.properties
Expand Up @@ -12,7 +12,7 @@ junit_version=4.12
atomicfu_version=0.14.4
knit_version=0.2.2
html_version=0.6.8
lincheck_version=2.7.1
lincheck_version=2.9-SNAPSHOT
dokka_version=0.9.16-rdev-2-mpp-hacks
byte_buddy_version=1.10.9
reactor_version=3.2.5.RELEASE
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/build.gradle
Expand Up @@ -201,8 +201,8 @@ task jvmStressTest(type: Test, dependsOn: compileTestKotlinJvm) {
enableAssertions = true
testLogging.showStandardStreams = true
systemProperty 'kotlinx.coroutines.scheduler.keep.alive.sec', '100000' // any unpark problem hangs test
systemProperty 'kotlinx.coroutines.semaphore.segmentSize', '2'
systemProperty 'kotlinx.coroutines.semaphore.maxSpinCycles', '10'
systemProperty 'kotlinx.coroutines.sqs.segmentSize', '2'
systemProperty 'kotlinx.coroutines.sqs.maxSpinCycles', '10'
}

task jdk16Test(type: Test, dependsOn: [compileTestKotlinJvm, checkJdk16]) {
Expand Down
9 changes: 9 additions & 0 deletions kotlinx-coroutines-core/common/src/Annotations.kt
Expand Up @@ -64,3 +64,12 @@ public annotation class ObsoleteCoroutinesApi
"so stable API could be provided instead"
)
public annotation class InternalCoroutinesApi

@MustBeDocumented
@Retention(value = AnnotationRetention.BINARY)
@RequiresOptIn(
level = RequiresOptIn.Level.ERROR,
message = "TODO"
)
@Target(AnnotationTarget.FUNCTION, AnnotationTarget.PROPERTY)
public annotation class HazardousConcurrentApi

Large diffs are not rendered by default.

59 changes: 59 additions & 0 deletions kotlinx-coroutines-core/common/src/sync/ReadWriteMutex.kt
@@ -0,0 +1,59 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.sync

import kotlinx.coroutines.*

public interface ReadWriteMutex {
/**
*
*/
@HazardousConcurrentApi
public suspend fun readLock()

/**
*
*/
@HazardousConcurrentApi
public fun readUnlock()

/**
*
*/
@HazardousConcurrentApi
public suspend fun writeLock()

/**
*
*/
@HazardousConcurrentApi
public fun writeUnlock()
}

/**
*
*/
@OptIn(HazardousConcurrentApi::class)
public suspend inline fun <T> ReadWriteMutex.withReadLock(action: () -> T): T {
readLock()
try {
return action()
} finally {
readUnlock()
}
}

/**
*
*/
@OptIn(HazardousConcurrentApi::class)
public suspend inline fun <T> ReadWriteMutex.withWriteLock(action: () -> T): T {
writeLock()
try {
return action()
} finally {
writeUnlock()
}
}

0 comments on commit 8c364c9

Please sign in to comment.