Skip to content

Commit

Permalink
Copy retryOptions when continue as new
Browse files Browse the repository at this point in the history
  • Loading branch information
mfateev committed Dec 29, 2023
1 parent f767030 commit fc3df01
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 7 deletions.
Expand Up @@ -177,11 +177,11 @@ private <R> Tracer.SpanBuilder createChildWorkflowStartSpanBuilder(ChildWorkflow
}

private Tracer.SpanBuilder createContinueAsNewWorkflowStartSpanBuilder(ContinueAsNewInput input) {
WorkflowInfo parentWorkflowInfo = Workflow.getInfo();
WorkflowInfo continuedWorkflowInfo = Workflow.getInfo();
return spanFactory.createContinueAsNewWorkflowStartSpan(
tracer,
MoreObjects.firstNonNull(input.getWorkflowType(), parentWorkflowInfo.getWorkflowType()),
parentWorkflowInfo.getWorkflowId(),
parentWorkflowInfo.getRunId());
MoreObjects.firstNonNull(input.getWorkflowType(), continuedWorkflowInfo.getWorkflowType()),
continuedWorkflowInfo.getWorkflowId(),
continuedWorkflowInfo.getRunId());
}
}
Expand Up @@ -150,6 +150,7 @@ public Failure getPreviousRunFailure() {
return previousRunFailure;
}

@Nullable
public RetryOptions getRetryOptions() {
if (!startedAttributes.hasRetryPolicy()) {
return null;
Expand Down
Expand Up @@ -34,7 +34,13 @@
import io.temporal.api.command.v1.ScheduleActivityTaskCommandAttributes;
import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes;
import io.temporal.api.command.v1.StartChildWorkflowExecutionCommandAttributes;
import io.temporal.api.common.v1.*;
import io.temporal.api.common.v1.ActivityType;
import io.temporal.api.common.v1.Memo;
import io.temporal.api.common.v1.Payload;
import io.temporal.api.common.v1.Payloads;
import io.temporal.api.common.v1.SearchAttributes;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.common.v1.WorkflowType;
import io.temporal.api.enums.v1.ParentClosePolicy;
import io.temporal.api.failure.v1.Failure;
import io.temporal.api.history.v1.HistoryEvent;
Expand Down Expand Up @@ -1104,6 +1110,8 @@ public void continueAsNew(ContinueAsNewInput input) {
}
if (options.getRetryOptions() != null) {
attributes.setRetryPolicy(toRetryPolicy(options.getRetryOptions()));
} else if (replayContext.getRetryOptions() != null) {
attributes.setRetryPolicy(toRetryPolicy(replayContext.getRetryOptions()));
}
Map<String, Object> searchAttributes = options.getSearchAttributes();
if (searchAttributes != null && !searchAttributes.isEmpty()) {
Expand Down
Expand Up @@ -86,7 +86,7 @@ public static class TestContinueAsNewImpl implements TestContinueAsNew {
@Override
public int execute(int count, String continueAsNewTaskQueue) {
String taskQueue = Workflow.getInfo().getTaskQueue();
if (count == INITIAL_COUNT) {
if (count >= INITIAL_COUNT - 2) {
assertEquals(10, Workflow.getInfo().getRetryOptions().getMaximumAttempts());
} else {
assertEquals(5, Workflow.getInfo().getRetryOptions().getMaximumAttempts());
Expand All @@ -97,7 +97,12 @@ public int execute(int count, String continueAsNewTaskQueue) {
}
Map<String, Object> memo = new HashMap<>();
memo.put("myKey", "MyValue");
RetryOptions retryOptions = RetryOptions.newBuilder().setMaximumAttempts(5).build();
RetryOptions retryOptions = null;
// don't specify retryOptions on the first continue-as-new to test that they are copied from
// the previous run.
if (count < INITIAL_COUNT - 1) {
retryOptions = RetryOptions.newBuilder().setMaximumAttempts(5).build();
}
SearchAttributes searchAttributes =
SearchAttributes.newBuilder()
.set(SearchAttributeKey.forKeyword("CustomKeywordField"), "foo1")
Expand Down

0 comments on commit fc3df01

Please sign in to comment.