Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply kotlinx-atomicfu compiler plugin #4018

Merged
merged 3 commits into from Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions gradle.properties
Expand Up @@ -24,8 +24,8 @@ kotlin.incremental.multiplatform=true
kotlin.mpp.applyDefaultHierarchyTemplate=false
kotlin.native.ignoreIncorrectDependencies=true
kotlin.native.binary.memoryModel=experimental
#kotlinx.atomicfu.enableJvmIrTransformation=true
#kotlinx.atomicfu.enableJsIrTransformation=true
kotlinx.atomicfu.enableJvmIrTransformation=true
kotlinx.atomicfu.enableNativeIrTransformation=true

kotlin.daemon.jvmargs=-Xmx6g -XX:+HeapDumpOnOutOfMemoryError
kotlin.daemon.useFallbackStrategy=false
Expand Down
Expand Up @@ -55,7 +55,7 @@ internal fun onBodyChunkReceived(
return -1
}
if (written > 0) {
wrapper.bytesWritten += written
wrapper.bytesWritten.addAndGet(written)
}
if (wrapper.bytesWritten.value == chunkSize) {
wrapper.bytesWritten.value = 0
Expand Down
Expand Up @@ -14,11 +14,11 @@ import kotlinx.atomicfu.*
import kotlin.test.*

class EventsTest : ClientLoader() {
val created = atomic(0)
val ready = atomic(0)
val received = atomic(0)
val counter = atomic(0)
val cause: AtomicRef<Throwable?> = atomic(null)
private val created = atomic(0)
private val ready = atomic(0)
private val received = atomic(0)
private val counter = atomic(0)
private val cause: AtomicRef<Throwable?> = atomic(null)

@Test
fun testBasicEvents() = clientTests {
Expand Down
6 changes: 3 additions & 3 deletions ktor-io/common/src/io/ktor/utils/io/ByteChannelSequential.kt
Expand Up @@ -840,9 +840,9 @@ public abstract class ByteChannelSequentialBase(
private fun addBytesRead(count: Int) {
require(count >= 0) { "Can't read negative amount of bytes: $count" }

channelSize.minusAssign(count)
channelSize.addAndGet(-count)
_totalBytesRead.addAndGet(count.toLong())
_availableForRead.minusAssign(count)
_availableForRead.addAndGet(-count)

check(channelSize.value >= 0) { "Readable bytes count is negative: $availableForRead, $count in $this" }
check(availableForRead >= 0) { "Readable bytes count is negative: $availableForRead, $count in $this" }
Expand All @@ -851,7 +851,7 @@ public abstract class ByteChannelSequentialBase(
private fun addBytesWritten(count: Int) {
require(count >= 0) { "Can't write negative amount of bytes: $count" }

channelSize.plusAssign(count)
channelSize.addAndGet(count)
_totalBytesWritten.addAndGet(count.toLong())

check(channelSize.value >= 0) { "Readable bytes count is negative: ${channelSize.value}, $count in $this" }
Expand Down
17 changes: 7 additions & 10 deletions ktor-io/jvm/src/io/ktor/utils/io/ByteBufferChannel.kt
Expand Up @@ -2228,7 +2228,7 @@ internal open class ByteBufferChannel(
continuation.resume(flush && hasEnoughBytes)
return COROUTINE_SUSPENDED
}
} while (!setContinuation({ readOp }, _readOp, continuation, { closed == null && readSuspendPredicate(size) }))
} while (!_readOp.setContinuation(continuation, { closed == null && readSuspendPredicate(size) }))

return COROUTINE_SUSPENDED
}
Expand Down Expand Up @@ -2283,7 +2283,7 @@ internal open class ByteBufferChannel(
ucont.resume(Unit)
break
}
} while (!setContinuation({ writeOp }, _writeOp, ucont.intercepted(), { writeSuspendPredicate(size) }))
} while (!_writeOp.setContinuation(ucont.intercepted(), { writeSuspendPredicate(size) }))

flushImpl(minWriteSize = size)

Expand Down Expand Up @@ -2329,7 +2329,7 @@ internal open class ByteBufferChannel(
c.resume(Unit)
break
}
} while (!setContinuation({ writeOp }, _writeOp, c, { writeSuspendPredicate(size) }))
} while (!_writeOp.setContinuation(c, { writeSuspendPredicate(size) }))

flushImpl(minWriteSize = size)

Expand All @@ -2338,22 +2338,19 @@ internal open class ByteBufferChannel(
}
}

private inline fun <T, C : Continuation<T>> setContinuation(
getter: () -> C?,
updater: AtomicRef<C?>,
private inline fun <T, C : Continuation<T>> AtomicRef<C?>.setContinuation(
continuation: C,
predicate: () -> Boolean
): Boolean {
while (true) {
val current = getter()
loop { current ->
check(current == null) { "Operation is already in progress" }

if (!predicate()) {
return false
}

if (updater.compareAndSet(null, continuation)) {
return (predicate() || !updater.compareAndSet(continuation, null))
if (this.compareAndSet(null, continuation)) {
return (predicate() || !this.compareAndSet(continuation, null))
}
}
}
Expand Down
Expand Up @@ -112,12 +112,12 @@ class UDPSocketTest {
.bind()

socket.outgoing.invokeOnClose {
done += 1
done.addAndGet(1)
}

assertFailsWith<IllegalStateException> {
socket.outgoing.invokeOnClose {
done += 2
done.addAndGet(2)
}
}

Expand All @@ -136,7 +136,7 @@ class UDPSocketTest {
.bind()

socket.outgoing.invokeOnClose {
done += 1
done.addAndGet(1)
assertTrue(it is AssertionError)
}

Expand All @@ -154,7 +154,7 @@ class UDPSocketTest {
.bind()

socket.outgoing.invokeOnClose {
done += 1
done.addAndGet(1)
}

socket.close()
Expand All @@ -174,7 +174,7 @@ class UDPSocketTest {
socket.outgoing.close(AssertionError())

socket.outgoing.invokeOnClose {
done += 1
done.addAndGet(1)
assertTrue(it is AssertionError)
}

Expand Down
Expand Up @@ -16,6 +16,7 @@ import io.ktor.util.pipeline.*
import io.ktor.utils.io.*
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlin.concurrent.Volatile

/**
* Engine that based on CIO backend
Expand Down Expand Up @@ -51,7 +52,8 @@ public class CIOApplicationEngine(
private val startupJob: CompletableDeferred<Unit> = CompletableDeferred()
private val stopRequest: CompletableJob = Job()

private var serverJob: Job by atomic(Job())
// See KT-67440
@Volatile private var serverJob: Job = Job()

init {
serverJob = initServerJob()
Expand Down
Expand Up @@ -43,7 +43,9 @@ public class DefaultHeadersConfig {
public fun now(): Long
}

internal val cachedDateText: AtomicRef<String> = atomic("")
private val _cachedDateText: AtomicRef<String> = atomic("")

internal var cachedDateText: String by _cachedDateText
}

/**
Expand Down Expand Up @@ -78,9 +80,9 @@ public val DefaultHeaders: RouteScopedPlugin<DefaultHeadersConfig> = createRoute
val currentTimeStamp = pluginConfig.clock.now()
if (captureCached + DATE_CACHE_TIMEOUT_MILLISECONDS <= currentTimeStamp) {
cachedDateTimeStamp = currentTimeStamp
pluginConfig.cachedDateText.value = GMTDate(currentTimeStamp).toHttpDate()
pluginConfig.cachedDateText = GMTDate(currentTimeStamp).toHttpDate()
}
return pluginConfig.cachedDateText.value
return pluginConfig.cachedDateText
}

val serverHeader = "Ktor/$ktorVersion"
Expand Down
Expand Up @@ -60,7 +60,7 @@ public class TestApplication internal constructor(
Created, Starting, Started, Stopped
}

internal val state = atomic(State.Created)
private val state = atomic(State.Created)

internal val externalApplications by lazy { builder.externalServices.externalApplications }
internal val server by lazy { builder.embeddedServer }
Expand Down
Expand Up @@ -20,8 +20,10 @@ public actual abstract class NetworkAddress constructor(
public val port: Int,
explicitAddress: Any? = null
) {
private val _explicitAddress: AtomicRef<Any?> = atomic(explicitAddress)

@InternalAPI
public var explicitAddress: AtomicRef<Any?> = atomic(explicitAddress)
public var explicitAddress: Any? by _explicitAddress

/**
* Resolve current socket address.
Expand Down