Skip to content

Commit

Permalink
HelloKotlinWorkflow works
Browse files Browse the repository at this point in the history
  • Loading branch information
mfateev committed Jun 20, 2023
1 parent a40e569 commit 4ed8723
Show file tree
Hide file tree
Showing 26 changed files with 280 additions and 141 deletions.
Expand Up @@ -17,6 +17,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.async

import io.temporal.kotlin.interceptors.WorkflowInboundCallsInterceptor
Expand Down
Expand Up @@ -17,6 +17,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.async

import io.temporal.api.common.v1.Payloads
Expand All @@ -39,7 +40,7 @@ internal class DynamicKotlinWorkflowDefinition(
private var workflowInvoker: WorkflowInboundCallsInterceptor? = null

override suspend fun initialize() {
val workflowContext: KotlinWorkflowContext = KotlinWorkflowInternal.rootWorkflowContext
val workflowContext: KotlinWorkflowContext = KotlinWorkflowInternal.getRootWorkflowContext()
workflowInvoker = RootWorkflowInboundCallsInterceptor(workflowContext)
for (workerInterceptor in workerInterceptors) {
workflowInvoker = workerInterceptor.interceptWorkflow(workflowInvoker!!)
Expand Down
Expand Up @@ -17,6 +17,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.async

import java.lang.reflect.Type
Expand Down
Expand Up @@ -17,6 +17,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.async

import io.temporal.activity.ActivityOptions
Expand Down
@@ -1,3 +1,23 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.async

import io.temporal.api.common.v1.Payloads
Expand Down Expand Up @@ -101,7 +121,7 @@ class KotlinWorkflow(
return false
}
dispatcher.eventLoop(defaultDeadlockDetectionTimeout)
return dispatcher.isDone() || executionHandler!!.isDone // Do not wait for all other threads.
return /*dispatcher.isDone() ||*/ executionHandler!!.isDone // Do not wait for all other threads.
}

override fun getOutput(): Optional<Payloads> {
Expand Down
@@ -1,3 +1,23 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.async

import com.uber.m3.tally.Scope
Expand Down Expand Up @@ -57,6 +77,8 @@ class KotlinWorkflowContext(
private var activityOptionsMap: Map<String, ActivityOptions>? = null
private var localActivityOptionsMap: Map<String, LocalActivityOptions>? = null

private var replayContext: ReplayWorkflowContext? = null

init {
if (workflowImplementationOptions != null) {
defaultActivityOptions = workflowImplementationOptions!!.defaultActivityOptions
Expand Down Expand Up @@ -88,7 +110,7 @@ class KotlinWorkflowContext(
replayContext = context
}

override fun getReplayContext(): ReplayWorkflowContext {
override fun getReplayContext(): ReplayWorkflowContext? {
return replayContext
}

Expand Down Expand Up @@ -142,7 +164,7 @@ class KotlinWorkflowContext(
override fun <R : Any?> getLastCompletionResult(resultClass: Class<R>, resultType: Type): R? {
return dataConverter.fromPayloads(
0,
Optional.ofNullable(replayContext.lastCompletionResult),
Optional.ofNullable(replayContext!!.lastCompletionResult),
resultClass,
resultType
)
Expand All @@ -157,7 +179,7 @@ class KotlinWorkflowContext(
return HashMap()
}

val headerData: Map<String, Payload> = HashMap(replayContext.header)
val headerData: Map<String, Payload> = HashMap(replayContext!!.header)
val contextData: MutableMap<String, Any> = HashMap()
for (propagator in contextPropagators) {
contextData[propagator.name] = propagator.deserializeContext(headerData)
Expand All @@ -168,13 +190,13 @@ class KotlinWorkflowContext(

override suspend fun <R> executeActivity(input: WorkflowOutboundCallsInterceptor.ActivityInput<R>): WorkflowOutboundCallsInterceptor.ActivityOutput<R?> {
val serializationContext = ActivitySerializationContext(
replayContext.namespace,
replayContext.workflowId,
replayContext.workflowType.name,
replayContext!!.namespace,
replayContext!!.workflowId,
replayContext!!.workflowType.name,
input.activityName,
// input.getOptions().getTaskQueue() may be not specified, workflow task queue is used
// by the Server in this case
if (input.options.taskQueue != null) input.options.taskQueue else replayContext.taskQueue,
if (input.options.taskQueue != null) input.options.taskQueue else replayContext!!.taskQueue,
false
)
val dataConverterWithActivityContext = dataConverter.withContext(serializationContext)
Expand Down Expand Up @@ -205,7 +227,7 @@ class KotlinWorkflowContext(
TODO("Not yet implemented")
}

override fun newRandom(): Random = replayContext.newRandom()
override fun newRandom(): Random = replayContext!!.newRandom()

override suspend fun signalExternalWorkflow(input: WorkflowOutboundCallsInterceptor.SignalExternalInput): WorkflowOutboundCallsInterceptor.SignalExternalOutput {
TODO("Not yet implemented")
Expand Down Expand Up @@ -290,7 +312,7 @@ class KotlinWorkflowContext(
}

val metricScope: Scope
get() = replayContext.metricsScope
get() = replayContext!!.metricsScope

@OptIn(ExperimentalCoroutinesApi::class)
private suspend fun executeActivityOnce(
Expand All @@ -303,7 +325,7 @@ class KotlinWorkflowContext(

return suspendCancellableCoroutine { continuation ->
var activityId: String? = null
val activityOutput = replayContext.scheduleActivityTask(
val activityOutput = replayContext!!.scheduleActivityTask(
params
) { output: Optional<Payloads>, failure: Failure? ->
if (failure == null) {
Expand All @@ -329,7 +351,7 @@ class KotlinWorkflowContext(
): ExecuteActivityParameters {
var taskQueue = options.taskQueue
if (taskQueue == null) {
taskQueue = replayContext.taskQueue
taskQueue = replayContext!!.taskQueue
}
val attributes = ScheduleActivityTaskCommandAttributes.newBuilder()
.setActivityType(ActivityType.newBuilder().setName(name))
Expand All @@ -345,7 +367,7 @@ class KotlinWorkflowContext(
)
.setHeartbeatTimeout(ProtobufTimeUtils.toProtoDuration(options.heartbeatTimeout))
.setRequestEagerExecution(
!options.isEagerExecutionDisabled && (taskQueue == replayContext.taskQueue)
!options.isEagerExecutionDisabled && (taskQueue == replayContext!!.taskQueue)
)
input.ifPresent { value: Payloads? ->
attributes.input = value
Expand All @@ -366,7 +388,7 @@ class KotlinWorkflowContext(
if (options.versioningIntent != null) {
attributes.useCompatibleVersion = options
.versioningIntent
.determineUseCompatibleFlag(replayContext.taskQueue == options.taskQueue)
.determineUseCompatibleFlag(replayContext!!.taskQueue == options.taskQueue)
}
return ExecuteActivityParameters(attributes, options.cancellationType)
}
Expand Down Expand Up @@ -396,7 +418,7 @@ class KotlinWorkflowContext(
* thread and should be replaced with another specific implementation during initialization stage
* `workflow.initialize()` performed inside the workflow root thread.
*
* @see SyncWorkflow.start
* @see KotlinWorkflow.start
*/
private class InitialWorkflowInboundCallsInterceptor(workflowContext: KotlinWorkflowContext) :
BaseRootKotlinWorkflowInboundCallsInterceptor(workflowContext) {
Expand Down
@@ -1,3 +1,23 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.async

import io.temporal.api.common.v1.Payloads
Expand Down
Expand Up @@ -17,6 +17,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.async

import io.temporal.api.common.v1.Payloads
Expand Down Expand Up @@ -106,7 +107,7 @@ internal class KotlinWorkflowExecutionHandler(

private fun throwAndFailWorkflowExecution(exception: Throwable) {
val replayWorkflowContext = context.getReplayContext()
val fullReplayDirectQueryName = replayWorkflowContext.fullReplayDirectQueryName
val fullReplayDirectQueryName = replayWorkflowContext!!.fullReplayDirectQueryName
val info = Workflow.getInfo()
if (fullReplayDirectQueryName != null) {
if (log.isDebugEnabled &&
Expand Down
Expand Up @@ -17,10 +17,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.async

import com.google.common.base.Preconditions
import com.google.common.collect.ImmutableSet
import io.temporal.api.common.v1.Payloads
import io.temporal.api.common.v1.WorkflowExecution
import io.temporal.api.common.v1.WorkflowType
Expand All @@ -32,7 +32,6 @@ import io.temporal.common.metadata.POJOWorkflowImplMetadata
import io.temporal.common.metadata.POJOWorkflowInterfaceMetadata
import io.temporal.common.metadata.WorkflowMethodType
import io.temporal.failure.CanceledFailure
import io.temporal.internal.common.env.ReflectionUtils
import io.temporal.internal.replay.ReplayWorkflow
import io.temporal.internal.replay.ReplayWorkflowFactory
import io.temporal.internal.sync.WorkflowInternal
Expand All @@ -54,6 +53,8 @@ import org.slf4j.LoggerFactory
import java.lang.reflect.InvocationTargetException
import java.lang.reflect.Method
import java.util.*
import kotlin.reflect.full.callSuspend
import kotlin.reflect.jvm.kotlinFunction

class KotlinWorkflowImplementationFactory(
clientOptions: WorkflowClientOptions,
Expand Down Expand Up @@ -281,9 +282,11 @@ class KotlinWorkflowImplementationFactory(
// don't pass it down to other classes, it's a "cached" instance for internal usage only
private val dataConverterWithWorkflowContext: DataConverter
) : KotlinWorkflowDefinition {

private var workflowInvoker: WorkflowInboundCallsInterceptor? = null

override suspend fun initialize() {
val workflowContext = KotlinWorkflowInternal.rootWorkflowContext
val workflowContext = KotlinWorkflowInternal.getRootWorkflowContext()
workflowInvoker = RootWorkflowInboundCallsInterceptor(workflowContext)
for (workerInterceptor in workerInterceptors) {
workflowInvoker = workerInterceptor.interceptWorkflow(workflowInvoker!!)
Expand All @@ -296,8 +299,9 @@ class KotlinWorkflowImplementationFactory(
override suspend fun execute(header: Header?, input: Payloads?): Payloads? {
val args = dataConverterWithWorkflowContext.fromPayloads(
Optional.ofNullable(input),
workflowMethod.parameterTypes,
workflowMethod.genericParameterTypes
// TODO(maxim): Validate that the last element is coroutine continuation
workflowMethod.parameterTypes.dropLast(1).toTypedArray(),
workflowMethod.genericParameterTypes.dropLast(1).toTypedArray()
)
Preconditions.checkNotNull(workflowInvoker, "initialize not called")
val result = workflowInvoker!!.execute(WorkflowInboundCallsInterceptor.WorkflowInput(header, args))
Expand All @@ -322,7 +326,8 @@ class KotlinWorkflowImplementationFactory(

override suspend fun execute(input: WorkflowInboundCallsInterceptor.WorkflowInput): WorkflowInboundCallsInterceptor.WorkflowOutput {
return try {
val result = workflowMethod.invoke(workflow, *input.arguments)
val kMethod = workflowMethod.kotlinFunction
val result = kMethod!!.callSuspend(workflow, *input.arguments)
WorkflowInboundCallsInterceptor.WorkflowOutput(result)
} catch (e: IllegalAccessException) {
throw CheckedExceptionWrapper.wrap(e)
Expand Down Expand Up @@ -381,23 +386,25 @@ class KotlinWorkflowImplementationFactory(

companion object {
private val log = LoggerFactory.getLogger(KotlinWorkflowImplementationFactory::class.java)
val WORKFLOW_HANDLER_STACKTRACE_CUTOFF = ImmutableSet.builder<String>() // POJO
.add(
ReflectionUtils.getMethodNameForStackTraceCutoff(
KotlinWorkflowImplementation::class.java,
"execute",
Header::class.java,
Optional::class.java
)
) // Dynamic
.add(
ReflectionUtils.getMethodNameForStackTraceCutoff(
DynamicKotlinWorkflowDefinition::class.java,
"execute",
Header::class.java,
Optional::class.java
)
)
.build()

// TODO(maxim): See if this is needed for Kotlin
val WORKFLOW_HANDLER_STACKTRACE_CUTOFF = 0 // ImmutableSet.builder<String>() // POJO
// .add(
// ReflectionUtils.getMethodNameForStackTraceCutoff(
// KotlinWorkflowImplementation::class.java,
// "execute",
// Header::class.java,
// Payloads::class.java
// )
// ) // Dynamic
// .add(
// ReflectionUtils.getMethodNameForStackTraceCutoff(
// DynamicKotlinWorkflowDefinition::class.java,
// "execute",
// Header::class.java,
// Payloads::class.java
// )
// )
// .build()
}
}
Expand Up @@ -17,6 +17,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.async

import io.temporal.api.common.v1.SearchAttributes
Expand Down

0 comments on commit 4ed8723

Please sign in to comment.