Skip to content

Commit

Permalink
Introduce typesafe actors abstraction
Browse files Browse the repository at this point in the history
Fixes #87 and #169
  • Loading branch information
qwwdfsad committed Aug 9, 2018
1 parent fb6f240 commit cf9db6c
Show file tree
Hide file tree
Showing 15 changed files with 1,224 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,39 @@ public final class kotlinx/coroutines/experimental/YieldKt {
public static final fun yield (Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object;
}

public abstract class kotlinx/coroutines/experimental/actors/Actor {
public fun <init> ()V
public fun <init> (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/Job;Lkotlinx/coroutines/experimental/CoroutineStart;I)V
public synthetic fun <init> (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/Job;Lkotlinx/coroutines/experimental/CoroutineStart;IILkotlin/jvm/internal/DefaultConstructorMarker;)V
protected final fun act (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object;
}

public abstract class kotlinx/coroutines/experimental/actors/ActorTraits {
public fun <init> ()V
public abstract fun cancel ()V
public abstract fun close ()V
public abstract fun getJob ()Lkotlinx/coroutines/experimental/Job;
public final fun join (Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object;
protected fun onClose ()V
protected fun onStart (Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/experimental/actors/ActorsKt {
public static final fun actor (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/Job;Lkotlinx/coroutines/experimental/CoroutineStart;ILkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/experimental/actors/TypedActor;
public static final fun actor (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/actors/ActorTraits;Lkotlinx/coroutines/experimental/CoroutineStart;ILkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/experimental/actors/TypedActor;
public static synthetic fun actor$default (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/Job;Lkotlinx/coroutines/experimental/CoroutineStart;ILkotlin/jvm/functions/Function3;ILjava/lang/Object;)Lkotlinx/coroutines/experimental/actors/TypedActor;
public static synthetic fun actor$default (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/actors/ActorTraits;Lkotlinx/coroutines/experimental/CoroutineStart;ILkotlin/jvm/functions/Function3;ILjava/lang/Object;)Lkotlinx/coroutines/experimental/actors/TypedActor;
}

public abstract class kotlinx/coroutines/experimental/actors/TypedActor {
public fun <init> ()V
public fun <init> (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/Job;Lkotlinx/coroutines/experimental/CoroutineStart;I)V
public synthetic fun <init> (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/Job;Lkotlinx/coroutines/experimental/CoroutineStart;IILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun offer (Ljava/lang/Object;)Z
protected abstract fun receive (Ljava/lang/Object;Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object;
public final fun send (Ljava/lang/Object;Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object;
}

public abstract class kotlinx/coroutines/experimental/channels/AbstractChannel : kotlinx/coroutines/experimental/channels/AbstractSendChannel, kotlinx/coroutines/experimental/channels/Channel {
public fun <init> ()V
public fun cancel (Ljava/lang/Throwable;)Z
Expand Down
76 changes: 76 additions & 0 deletions core/kotlinx-coroutines-core/src/actors/AbstractActor.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.experimental.actors

import kotlinx.atomicfu.*
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.*
import kotlin.coroutines.experimental.*

/**
* Base class for actors implementation, which provides implementation for [ActorTraits]
* This class is not designed to be extended outside of kotlinx.coroutines, so it's internal
*
* @param T type of messages which are stored in the mailbox
*/
internal abstract class AbstractActor<T>(
context: CoroutineContext = DefaultDispatcher,
parent: Job? = null,
start: CoroutineStart = CoroutineStart.LAZY,
channelCapacity: Int = 16
) : ActorTraits() {

internal val mailbox = Channel<T>(channelCapacity)
public final override val job: Job = launch(context, start, parent) { actorLoop() }

/*
* Guard for onClose.
* It's necessary to invoke onClose in the end of actor body even when we have job completion:
* if actor decides to decompose its work, then onClose should be called *before* actor's body end,
* otherwise delegated work will never be closed, because job completion will await all created children
* to complete
*/
private val onCloseInvoked = atomic(0)

// Save an allocation
private inner class OnCloseNode : JobNode<Job>(job) {
override fun invoke(cause: Throwable?) {
if (onCloseInvoked.compareAndSet(0, 1)) {
onClose()
}
}
}

init {
job.invokeOnCompletion(OnCloseNode())
}

public override fun close() {
mailbox.close()
}

public override fun cancel() {
job.cancel()
mailbox.cancel()
}

private suspend fun actorLoop() {
try {
onStart()
for (message in mailbox) {
onMessage(message)
}
} catch (e: Throwable) {
handleCoroutineException(coroutineContext, e)
} finally {
mailbox.close()
if (onCloseInvoked.compareAndSet(0, 1)) {
onClose()
}
}
}

internal abstract suspend fun onMessage(message: T)
}
65 changes: 65 additions & 0 deletions core/kotlinx-coroutines-core/src/actors/Actor.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.experimental.actors

import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.*
import kotlin.coroutines.experimental.*

/**
* [Actor] is the base for all stateful actors, who have to process more than one type of messages.
* [Actor] has well-defined lifecycle described in [ActorTraits].
*
* To declare message handler, actor should have methods declared using [act],
* which are used to send message "Send message which handler invokes `act` body"
*
* Example, where the actor asynchronously processes two types of messages:
* ```
* class ExampleActor : Actor() {
*
* suspend fun sendInt(number: Int) = act {
* println("Received $number")
* }
*
* suspend fun sendString(string: String) = act {
* println("Received $string")
* }
* }
*
*
* // Sender
* exampleActor.sendInt(42)
* ```
*
* @param context context in which actor's job will be launched
* @param parent optional parent of actor's job
* @param start start mode of actor's job
* @param channelCapacity capacity of actor's mailbox aka maximum count of pending messages
*/
@Suppress("EXPOSED_SUPER_CLASS")
abstract class Actor(
context: CoroutineContext = DefaultDispatcher,
parent: Job? = null,
start: CoroutineStart = CoroutineStart.LAZY,
channelCapacity: Int = 16
) : AbstractActor<suspend () -> Unit>(context, parent, start, channelCapacity) {

/**
* Schedules [block] as a message to the actor mailbox.
* All messages sent via [act] will be processed sequentially in the actor context.
* Act semantics is equivalent to sending lambda to channel with receiver, which invokes
* all sent lambdas.
*
* @throws ClosedSendChannelException if actor is [closed][close]
*/
protected suspend fun act(block: suspend () -> Unit) {
job.start()
mailbox.send(block)
}

internal override suspend fun onMessage(message: suspend () -> Unit) {
message()
}
}
82 changes: 82 additions & 0 deletions core/kotlinx-coroutines-core/src/actors/ActorTraits.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.experimental.actors

import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.*
import kotlin.coroutines.experimental.*

/**
* Actor traits, common for [Actor] and [TypedActor].
* Simply speaking, actor is a high-level abstraction for [channel][ReceiveChannel] and coroutine, which
* sequentially processes messages from the channel.
*
* Actors are inspired by the Actor Model: <a href="http://en.wikipedia.org/wiki/Actor_model">http://en.wikipedia.org/wiki/Actor_model</a>,
* but have slightly different semantics to expose type-safety over address transparency.
*
* Every actor has a [Job] associated with it, which lifecycle is tightly bound with actor lifecycle.
*
* Any actor has well-defined lifecycle:
* -- Not started. Note that by default actors are started [lazily][CoroutineStart.LAZY]
* -- Active. Actor is running and processing incoming messages
* -- Closing. Actor's channel is closed for new messages, but actor is processing all pending messages,
* then invokes [onClose]. Can be triggered by [close] call
* -- Closed. Actor and all its children (both actors and launched jobs) are completed, [job] is completed.
* -- Cancelled. Actor's channel is closed for new messages, its job is cancelled, pending messages are not processed and
* hang in the channel, [onClose] is invoked. Can be triggered by [cancel] call
*
* Note:
* [ActorTraits] doesn't have any variations of `send` method, because different implementations
* have different ways to expose mailbox to provide static typing.
*/
abstract class ActorTraits {

/**
* Job identifying current actor and available from its [coroutineContext]
*
* Lifecycle:
* If job is cancelled, actor is effectively killed
* If actor is closed, job is completed as soon as all messages are processed and all launched children are completed
* If actor is cancelled, job is cancelled immediately
*/
public abstract val job: Job

/**
* Close the actor and its channel.
* Before closing, the actor processes all pending messages and calls [onClose]
*/
public abstract fun close()

/**
* Cancel the actor and its channel without letting the actor to process pending messages.
* This is a last ditch way to stop the actor which shouldn't be used normally.
* It's guaranteed that [onClose] will be called.
*/
public abstract fun cancel()

/**
* Handler which is invoked when actor is started.
* Actor is started according to its [start mode][CoroutineStart].
* This method will not be invoked is actor is started lazily and is cancelled before receiving any messages.
* If [onStart] throws an exception, actor is immediately [cancelled][cancel].
*/
protected open suspend fun onStart() {}

/**
* Handler which is invoked when actor is being closed or killed.
* It's guaranteed that on the moment of invocation no more messages will be processed by the actor
* and no more messages can be sent.
* This handler is invoked even if actor wasn't started to properly cleanup resources owned by the actor.
*
* Handler is invoked before associated [job] is completed or cancelled to allow graceful shutdown
* and ability to shutdown child tasks.
*/
protected open fun onClose() {}

/**
* Waits until the actor is completed or cancelled
*/
public suspend fun join(): Unit = job.join()
}
49 changes: 49 additions & 0 deletions core/kotlinx-coroutines-core/src/actors/Actors.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.experimental.actors

import kotlinx.coroutines.experimental.*
import kotlin.coroutines.experimental.*


/**
* Creates a new [TypedActor] with given [block] as [message handler][TypedActor.receive]
*
* @param context context in which actor's job will be launched
* @param parent optional parent of actor's job
* @param start start mode of actor's job
* @param channelCapacity capacity of actor's mailbox aka maximum count of pending messages
* @param block actor's message handler
*/
public fun <T> actor(
context: CoroutineContext = DefaultDispatcher,
parent: ActorTraits,
start: CoroutineStart = CoroutineStart.LAZY,
channelCapacity: Int = 16, block: suspend TypedActor<T>.(T) -> Unit
): TypedActor<T> {
return actor(context, parent.job, start, channelCapacity, block)
}

/**
* Creates a new [TypedActor] with given [block] as [message handler][TypedActor.receive]
*
* @param context context in which actor's job will be launched
* @param parent optional parent of actor's job
* @param start start mode of actor's job
* @param channelCapacity capacity of actor's mailbox aka maximum count of pending messages
* @param block actor's message handler
*/
public fun <T> actor(
context: CoroutineContext = DefaultDispatcher,
parent: Job? = null,
start: CoroutineStart = CoroutineStart.LAZY,
channelCapacity: Int = 16, block: suspend TypedActor<T>.(T) -> Unit
): TypedActor<T> {
return object : TypedActor<T>(context, parent, start, channelCapacity) {
override suspend fun receive(message: T) {
block(message)
}
}
}
80 changes: 80 additions & 0 deletions core/kotlinx-coroutines-core/src/actors/TypedActor.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.experimental.actors

import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.*
import kotlin.coroutines.experimental.*

/**
* [TypedActor] is the base for all stateful actors, which can process only one [type][T] of messages.
* [TypedActor] has well-defined lifecycle described in [ActorTraits].
* [TypedActor.receive] method is used to declare a message handler, which is parametrized by [T]
* to provide better compile-type safety.
*
* Example:
* ```
* class ExampleActor : TypedActor<String>() {
*
* override suspend fun receive(string: String) = act {
* println("Received $string")
* }
* }
*
* // Sender
* exampleActor.send("foo")
* ```
*
* @param T type of the message this actor can handle
* @param context context in which actor's job will be launched
* @param parent optional parent of actor's job
* @param start start mode of actor's job
* @param channelCapacity capacity of actor's mailbox aka maximum count of pending messages
*/
@Suppress("EXPOSED_SUPER_CLASS")
abstract class TypedActor<T>(
context: CoroutineContext = DefaultDispatcher,
parent: Job? = null,
start: CoroutineStart = CoroutineStart.LAZY,
channelCapacity: Int = 16
) : AbstractActor<T>(context, parent, start, channelCapacity) {


/**
* Sends the message to the actor, which later will be sequentially processed by [receive].
* Sender is suspended, if actor's channel capacity is reached. This suspension is cancellable
* and has semantics similar to [SendChannel.send]
*
* @throws ClosedSendChannelException if actor is [closed][close]
*/
suspend fun send(message: T) {
job.start()
mailbox.send(message)
}

/**
* Attempts to send message to the actor, which later will be sequentially processed by [receive].
* Attempt is successful if actor's channel capacity restriction is not violated.
* This method is intended to be used from synchronous callbacks with [Channel.UNLIMITED]
*
* @throws ClosedSendChannelException if actor is [closed][close]
* @return `true` if offer was successful, false otherwise
*/
fun offer(message: T): Boolean {
job.start()
return mailbox.offer(message)
}

/**
* Handler, which handles all received messages.
*
* @throws ClassCastException if actor was casted to raw type and [send] was invoked with wrong type of the argument
*/
protected abstract suspend fun receive(message: T)

internal override suspend fun onMessage(message: T) {
receive(message)
}
}

0 comments on commit cf9db6c

Please sign in to comment.