Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
*/

import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.function;
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.tasks;
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.tryCatch;
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;

Expand All @@ -43,6 +44,8 @@
import io.serverlessworkflow.impl.WorkflowException;
import io.serverlessworkflow.impl.WorkflowModel;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -59,6 +62,8 @@ public class FuncTryCatchTest {
private static final String ORDER_002 = "ORDER#002";
private static final String ORDER_003 = "ORDER#003";

private static final String TRANSIENT_ERROR = "ERR_TRANSIENT";

@Test
void booking_compensation_dsl() {

Expand Down Expand Up @@ -447,6 +452,126 @@ void testCatchAll_WithAnyError() {
}
}

@Test
void testRetryWithoutBackoff() {
AtomicInteger attempts = new AtomicInteger();

Workflow workflow =
FuncWorkflowBuilder.workflow()
.tasks(
tryCatch(
"tryTask",
t ->
t.tryCatch(
tasks(
function(
"riskyTask",
(String input) -> {
if (attempts.incrementAndGet() <= 2) {
throw new WorkflowException(
WorkflowError.error(TRANSIENT_ERROR, 503).build());
}
return "success";
},
String.class)))
.catchHandler(
handler ->
handler
.errorsWith(err -> err.type(TRANSIENT_ERROR))
.retry(
retry ->
retry
.delay(d -> d.milliseconds(10))
.limit(
limit -> limit.attempt(a -> a.count(3)))))))
.build();

try (WorkflowApplication application = WorkflowApplication.builder().build()) {
WorkflowDefinition definition = application.workflowDefinition(workflow);
WorkflowModel result = definition.instance("input").start().join();
Assertions.assertThat(result.asText()).hasValue("success");
Assertions.assertThat(attempts.get()).isEqualTo(3);
}
}

@Test
void testRetryWithoutLimit() {

Workflow workflow =
FuncWorkflowBuilder.workflow()
.tasks(
tryCatch(
"tryTask",
t ->
t.tryCatch(
tasks(
function(
"riskyTask",
(String input) -> {
throw new WorkflowException(
WorkflowError.error(TRANSIENT_ERROR, 503).build());
},
String.class)))
.catchHandler(
handler ->
handler
.errorsWith(err -> err.type(TRANSIENT_ERROR))
.retry(
retry ->
retry
.delay(d -> d.milliseconds(10))
.backoff(b -> b.constant("c", "10"))))))
.build();

try (WorkflowApplication application = WorkflowApplication.builder().build()) {
WorkflowDefinition definition = application.workflowDefinition(workflow);
Assertions.assertThatThrownBy(() -> definition.instance("input").start().join())
.hasCauseInstanceOf(WorkflowException.class);
}
}

@Test
void testRetryWithoutDelay() {
AtomicInteger attempts = new AtomicInteger();

Workflow workflow =
FuncWorkflowBuilder.workflow()
.tasks(
tryCatch(
"tryTask",
t ->
t.tryCatch(
tasks(
function(
"riskyTask",
(String input) -> {
if (attempts.incrementAndGet() <= 2) {
throw new WorkflowException(
WorkflowError.error(TRANSIENT_ERROR, 503).build());
}
return "success";
},
String.class)))
.catchHandler(
handler ->
handler
.errorsWith(err -> err.type(TRANSIENT_ERROR))
.retry(
retry ->
retry
.backoff(b -> b.constant("c", "10"))
.limit(
limit -> limit.attempt(a -> a.count(3)))))))
.build();

try (WorkflowApplication application = WorkflowApplication.builder().build()) {
WorkflowDefinition definition = application.workflowDefinition(workflow);
WorkflowModel result = definition.instance("input").start().join();
Assertions.assertThat(result.asText()).hasValue("success");
Assertions.assertThat(attempts.get()).isEqualTo(3);
}
}

public String reserveStock(String order) {
log.info("Reserving stock for order: {}", order);
if (order.equals(ORDER_001)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.serverlessworkflow.api.types.ErrorFilter;
import io.serverlessworkflow.api.types.Retry;
import io.serverlessworkflow.api.types.RetryBackoff;
import io.serverlessworkflow.api.types.RetryLimit;
import io.serverlessworkflow.api.types.RetryPolicy;
import io.serverlessworkflow.api.types.TaskItem;
import io.serverlessworkflow.api.types.TryTask;
Expand Down Expand Up @@ -107,15 +108,22 @@ private Optional<RetryExecutor> buildRetryInterval(Retry retry) {

protected RetryExecutor buildRetryExecutor(RetryPolicy retryPolicy) {
return new DefaultRetryExecutor(
retryPolicy.getLimit().getAttempt().getCount(),
resolveMaxAttempts(retryPolicy.getLimit()),
buildIntervalFunction(retryPolicy),
WorkflowUtils.optionalPredicate(application, retryPolicy.getWhen()),
WorkflowUtils.optionalPredicate(application, retryPolicy.getExceptWhen()));
}

private static int resolveMaxAttempts(RetryLimit limit) {
if (limit == null || limit.getAttempt() == null) {
return 0;
}
return limit.getAttempt().getCount();
}

private RetryIntervalFunction buildIntervalFunction(RetryPolicy retryPolicy) {
RetryBackoff backoff = retryPolicy.getBackoff();
if (backoff.getConstantBackoff() != null) {
if (backoff == null || backoff.getConstantBackoff() != null) {
return new ConstantRetryIntervalFunction(
application, retryPolicy.getDelay(), retryPolicy.getJitter());
} else if (backoff.getLinearBackoff() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ public abstract class AbstractRetryIntervalFunction implements RetryIntervalFunc
private final Optional<WorkflowValueResolver<Duration>> maxJitteringResolver;
private final WorkflowValueResolver<Duration> delayResolver;

private static final WorkflowValueResolver<Duration> ZERO_DURATION_RESOLVER =
(w, t, m) -> Duration.ZERO;

public AbstractRetryIntervalFunction(
WorkflowApplication appl, TimeoutAfter delay, RetryPolicyJitter jitter) {
if (jitter != null) {
Expand All @@ -41,7 +44,8 @@ public AbstractRetryIntervalFunction(
minJitteringResolver = Optional.empty();
maxJitteringResolver = Optional.empty();
}
delayResolver = WorkflowUtils.fromTimeoutAfter(appl, delay);
delayResolver =
delay != null ? WorkflowUtils.fromTimeoutAfter(appl, delay) : ZERO_DURATION_RESOLVER;
}

@Override
Expand Down
Loading