diff --git a/docs/advanced/error-handling.md b/docs/advanced/error-handling.md index 867ef2720..e81deeb61 100644 --- a/docs/advanced/error-handling.md +++ b/docs/advanced/error-handling.md @@ -3,13 +3,13 @@ The SDK throws specific exceptions to help you handle different failure scenarios: ``` +Error +└── DurableExecutionError - Internal SDK control-flow/error base type + └── SuspendExecutionException - Internal signal used by the SDK to suspend execution + (for example during `wait()`, `waitForCallback()`, and + `waitForCondition()`). User code should not catch it. + RuntimeException -├── SuspendExecutionException - Internal control-flow exception thrown by the SDK to suspend execution -│ (e.g., during wait(), waitForCallback(), waitForCondition()). -│ The SDK catches this internally — you will never see it unless you have -│ a broad catch(Exception) block around durable operations. If caught -│ accidentally, you MUST re-throw it so the SDK can suspend correctly. -│ └── DurableExecutionException - General durable exception ├── SerDesException - Serialization and deserialization exception. ├── UnrecoverableDurableExecutionException - Execution cannot be recovered. The durable execution will be immediately terminated. @@ -52,7 +52,11 @@ try { ### Handling SuspendExecutionException -If you have a broad `catch (Exception e)` block around durable operations, you must re-throw `SuspendExecutionException` to let the SDK suspend correctly: +`SuspendExecutionException` is an internal SDK control-flow signal. It extends `Error`, not `Exception`, so a +normal `catch (Exception e)` block will not intercept it. + +The real risk is code that catches `Throwable`, or code that explicitly catches `SuspendExecutionException`. In those +cases, you must re-throw it immediately so the SDK can suspend the execution correctly. ```java try { @@ -60,8 +64,24 @@ try { ctx.wait("pause", Duration.ofDays(1)); ctx.step("more-work", String.class, stepCtx -> doMoreWork()); } catch (SuspendExecutionException e) { - throw e; // Always re-throw — lets the SDK suspend the execution -} catch (Exception e) { + throw e; // Always re-throw internal suspension signals +} catch (Throwable t) { + log.error("Unexpected throwable", t); + throw t; +} +``` + +Avoid broad `catch (Throwable)` blocks around durable operations unless you have a strong reason to use them. Prefer +catching specific application exceptions instead: + +```java +try { + ctx.step("work", String.class, stepCtx -> doWork()); + ctx.wait("pause", Duration.ofDays(1)); + ctx.step("more-work", String.class, stepCtx -> doMoreWork()); +} catch (SuspendExecutionException e) { + throw e; // Always re-throw internal suspension signals +} catch (MyBusinessException e) { log.error("Operation failed", e); } -``` \ No newline at end of file +``` diff --git a/examples/src/main/java/software/amazon/lambda/durable/examples/general/LoggingExample.java b/examples/src/main/java/software/amazon/lambda/durable/examples/general/LoggingExample.java index de7db9163..d916bf258 100644 --- a/examples/src/main/java/software/amazon/lambda/durable/examples/general/LoggingExample.java +++ b/examples/src/main/java/software/amazon/lambda/durable/examples/general/LoggingExample.java @@ -2,6 +2,8 @@ // SPDX-License-Identifier: Apache-2.0 package software.amazon.lambda.durable.examples.general; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import software.amazon.lambda.durable.DurableContext; import software.amazon.lambda.durable.DurableHandler; import software.amazon.lambda.durable.examples.types.GreetingRequest; @@ -13,15 +15,16 @@ * in log entries via MDC. By default, logs are suppressed during replay to avoid duplicates. */ public class LoggingExample extends DurableHandler { + Logger logger = LoggerFactory.getLogger(LoggingExample.class); @Override public String handleRequest(GreetingRequest input, DurableContext context) { // Log at execution level (outside any step) - context.getLogger().info("Processing greeting for: {}", input.getName()); + context.getLogger(logger).info("Processing greeting for: {}", input.getName()); // Step 1: Create greeting - logs inside step include operation context var greeting = context.step("create-greeting", String.class, ctx -> { - ctx.getLogger().info("Creating greeting message"); + ctx.getLogger(logger).info("Creating greeting message"); return "Hello, " + input.getName(); }); diff --git a/examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java b/examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java index fa4f0eb44..22346df0e 100644 --- a/examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java +++ b/examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java @@ -253,6 +253,14 @@ void testWaitAtLeastInProcessExample() { assertTrue(asyncOp.getStepResult(String.class).contains("Processed: TestUser")); } + @Test + void testLoggingExample() { + var runner = CloudDurableTestRunner.create( + arn("logging-example"), GreetingRequest.class, String.class, lambdaClient); + var result = runner.run(new GreetingRequest("TestUser")); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + } + @Test void testGenericTypesExample() { var runner = CloudDurableTestRunner.create( diff --git a/sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java b/sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java index 38065b749..4b4a04081 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java @@ -22,6 +22,10 @@ import software.amazon.lambda.durable.model.WaitForConditionResult; public interface DurableContext extends BaseContext { + static DurableContext getCurrentContext() { + return (DurableContext) BaseContext.getCurrentContext(); + } + /** * Executes a durable step with the given name and blocks until it completes. * diff --git a/sdk/src/main/java/software/amazon/lambda/durable/ParallelDurableFuture.java b/sdk/src/main/java/software/amazon/lambda/durable/ParallelDurableFuture.java index b71198d7d..5f3067eda 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/ParallelDurableFuture.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/ParallelDurableFuture.java @@ -5,9 +5,10 @@ import java.util.function.Function; import software.amazon.lambda.durable.config.ParallelBranchConfig; import software.amazon.lambda.durable.model.ParallelResult; +import software.amazon.lambda.durable.model.SafeCloseable; /** User-facing context for managing parallel branch execution within a durable function. */ -public interface ParallelDurableFuture extends AutoCloseable, DurableFuture { +public interface ParallelDurableFuture extends SafeCloseable, DurableFuture { /** * Registers and immediately starts a branch (respects maxConcurrency). @@ -68,7 +69,4 @@ default DurableFuture branch( */ DurableFuture branch( String name, TypeToken resultType, Function func, ParallelBranchConfig config); - - /** Calls {@link #get()} if not already called. Guarantees that the context is closed. */ - void close(); } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/StepContext.java b/sdk/src/main/java/software/amazon/lambda/durable/StepContext.java index 35e24b6fa..092897d10 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/StepContext.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/StepContext.java @@ -7,4 +7,8 @@ public interface StepContext extends BaseContext { /** Returns the current retry attempt number (0-based). */ int getAttempt(); + + static StepContext getCurrentContext() { + return (StepContext) BaseContext.getCurrentContext(); + } } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/context/BaseContext.java b/sdk/src/main/java/software/amazon/lambda/durable/context/BaseContext.java index 274b31a79..c6c2cbd71 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/context/BaseContext.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/context/BaseContext.java @@ -3,10 +3,21 @@ package software.amazon.lambda.durable.context; import com.amazonaws.services.lambda.runtime.Context; +import org.slf4j.Logger; import software.amazon.lambda.durable.DurableConfig; import software.amazon.lambda.durable.logging.DurableLogger; -public interface BaseContext extends AutoCloseable { +public interface BaseContext { + ThreadLocal CONTEXT = new ThreadLocal<>(); + + /** + * Gets the current context (DurableContext or StepContext) for this thread. + * + * @return the current context or null if not set + */ + static BaseContext getCurrentContext() { + return CONTEXT.get(); + } /** * Gets a logger with additional information of the current execution context. * @@ -14,6 +25,14 @@ public interface BaseContext extends AutoCloseable { */ DurableLogger getLogger(); + /** + * Gets a logger with additional information of the current execution context. + * + * @param delegate the logger to wrap + * @return a DurableLogger instance + */ + DurableLogger getLogger(Logger delegate); + /** * Returns the AWS Lambda runtime context. * @@ -46,7 +65,4 @@ public interface BaseContext extends AutoCloseable { /** Returns whether this context is currently in replay mode. */ boolean isReplaying(); - - /** Closes this context. */ - void close(); } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/context/BaseContextImpl.java b/sdk/src/main/java/software/amazon/lambda/durable/context/BaseContextImpl.java index 9920366f4..50a0337a6 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/context/BaseContextImpl.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/context/BaseContextImpl.java @@ -3,11 +3,13 @@ package software.amazon.lambda.durable.context; import com.amazonaws.services.lambda.runtime.Context; +import org.slf4j.Logger; import software.amazon.lambda.durable.DurableConfig; import software.amazon.lambda.durable.execution.ExecutionManager; import software.amazon.lambda.durable.execution.ThreadType; +import software.amazon.lambda.durable.logging.DurableLogger; -public abstract class BaseContextImpl implements AutoCloseable, BaseContext { +public abstract class BaseContextImpl implements BaseContext { private final ExecutionManager executionManager; private final DurableConfig durableConfig; private final Context lambdaContext; @@ -109,4 +111,18 @@ public boolean isReplaying() { public void setExecutionMode() { this.isReplaying = false; } + + /** Returns a durable logger for this context. */ + public DurableLogger getLogger() { + return DurableLogger.INSTANCE; + } + + /** Returns a durable logger for this context. */ + public DurableLogger getLogger(Logger delegate) { + return new DurableLogger(delegate); + } + + public static void setCurrentContext(BaseContext context) { + CONTEXT.set(context); + } } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/context/DurableContextImpl.java b/sdk/src/main/java/software/amazon/lambda/durable/context/DurableContextImpl.java index c08b6317b..801ad7c4c 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/context/DurableContextImpl.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/context/DurableContextImpl.java @@ -10,7 +10,6 @@ import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Function; -import org.slf4j.LoggerFactory; import software.amazon.lambda.durable.DurableCallbackFuture; import software.amazon.lambda.durable.DurableConfig; import software.amazon.lambda.durable.DurableContext; @@ -32,7 +31,6 @@ import software.amazon.lambda.durable.execution.OperationIdGenerator; import software.amazon.lambda.durable.execution.SuspendExecutionException; import software.amazon.lambda.durable.execution.ThreadType; -import software.amazon.lambda.durable.logging.DurableLogger; import software.amazon.lambda.durable.model.MapResult; import software.amazon.lambda.durable.model.OperationIdentifier; import software.amazon.lambda.durable.model.OperationSubType; @@ -62,7 +60,6 @@ public class DurableContextImpl extends BaseContextImpl implements DurableContex private final OperationIdGenerator operationIdGenerator; private final DurableContextImpl parentContext; private final boolean isVirtual; - private volatile DurableLogger logger; /** Shared initialization — sets all fields. */ private DurableContextImpl( @@ -430,30 +427,6 @@ private static T executeRetryLoop( } // =============== accessors ================ - @Override - public DurableLogger getLogger() { - // lazy initialize logger - if (logger == null) { - synchronized (this) { - if (logger == null) { - logger = new DurableLogger(LoggerFactory.getLogger(DurableContext.class), this); - } - } - } - return logger; - } - - /** - * Clears the logger's thread properties. Called during context destruction to prevent memory leaks and ensure clean - * state for subsequent executions. - */ - @Override - public void close() { - if (logger != null) { - logger.close(); - } - } - /** * Get the next operationId. Returns a globally unique operation ID by hashing a sequential operation counter. For * root contexts, the counter value is hashed directly (e.g. "1", "2", "3"). For child contexts, the values are diff --git a/sdk/src/main/java/software/amazon/lambda/durable/context/StepContextImpl.java b/sdk/src/main/java/software/amazon/lambda/durable/context/StepContextImpl.java index dcf5af66b..a988d40aa 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/context/StepContextImpl.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/context/StepContextImpl.java @@ -3,12 +3,10 @@ package software.amazon.lambda.durable.context; import com.amazonaws.services.lambda.runtime.Context; -import org.slf4j.LoggerFactory; import software.amazon.lambda.durable.DurableConfig; import software.amazon.lambda.durable.StepContext; import software.amazon.lambda.durable.execution.ExecutionManager; import software.amazon.lambda.durable.execution.ThreadType; -import software.amazon.lambda.durable.logging.DurableLogger; /** * Context available inside a step operation's user function. @@ -17,7 +15,6 @@ * {@link BaseContext} for thread lifecycle management. */ public class StepContextImpl extends BaseContextImpl implements StepContext { - private volatile DurableLogger logger; private final int attempt; /** @@ -46,25 +43,4 @@ protected StepContextImpl( public int getAttempt() { return attempt; } - - @Override - public DurableLogger getLogger() { - // lazy initialize logger - if (logger == null) { - synchronized (this) { - if (logger == null) { - logger = new DurableLogger(LoggerFactory.getLogger(StepContext.class), this); - } - } - } - return logger; - } - - /** Closes the logger for this context. */ - @Override - public void close() { - if (logger != null) { - logger.close(); - } - } } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/execution/DurableExecutor.java b/sdk/src/main/java/software/amazon/lambda/durable/execution/DurableExecutor.java index ded7ac0fa..a8a90f6fb 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/execution/DurableExecutor.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/execution/DurableExecutor.java @@ -22,6 +22,7 @@ import software.amazon.lambda.durable.exception.DurableOperationException; import software.amazon.lambda.durable.exception.IllegalDurableOperationException; import software.amazon.lambda.durable.exception.UnrecoverableDurableExecutionException; +import software.amazon.lambda.durable.logging.DurableLogger; import software.amazon.lambda.durable.model.DurableExecutionInput; import software.amazon.lambda.durable.model.DurableExecutionOutput; import software.amazon.lambda.durable.plugin.InvocationEndInfo; @@ -69,9 +70,10 @@ public static DurableExecutionOutput execute( var userInput = extractUserInput( executionManager.getExecutionOperation(), config.getSerDes(), inputType); - // use try-with-resources to clear logger properties - try (var context = - DurableContextImpl.createRootContext(executionManager, config, lambdaContext)) { + var context = DurableContextImpl.createRootContext(executionManager, config, lambdaContext); + DurableContextImpl.setCurrentContext(context); + // use a try-with-resources to clear logger properties + try (var ignored = DurableLogger.attachContext()) { return handler.apply(userInput, context); } }, diff --git a/sdk/src/main/java/software/amazon/lambda/durable/execution/ExecutionManager.java b/sdk/src/main/java/software/amazon/lambda/durable/execution/ExecutionManager.java index e528301b0..ef5c4e456 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/execution/ExecutionManager.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/execution/ExecutionManager.java @@ -20,10 +20,12 @@ import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.lambda.model.Operation; import software.amazon.awssdk.services.lambda.model.OperationStatus; +import software.amazon.awssdk.services.lambda.model.OperationType; import software.amazon.awssdk.services.lambda.model.OperationUpdate; import software.amazon.lambda.durable.DurableConfig; import software.amazon.lambda.durable.exception.UnrecoverableDurableExecutionException; import software.amazon.lambda.durable.model.DurableExecutionInput; +import software.amazon.lambda.durable.model.SafeCloseable; import software.amazon.lambda.durable.operation.BaseDurableOperation; /** @@ -47,7 +49,7 @@ * * @see InternalExecutor */ -public class ExecutionManager implements AutoCloseable { +public class ExecutionManager implements SafeCloseable { private static final Logger logger = LoggerFactory.getLogger(ExecutionManager.class); @@ -192,7 +194,8 @@ public Operation getExecutionOperation() { * @return true if at least one operation exists with the given parentId */ public boolean hasOperationsForContext(String parentId) { - return operationStorage.values().stream().anyMatch(op -> Objects.equals(op.parentId(), parentId)); + return operationStorage.values().stream() + .anyMatch(op -> op.type() != OperationType.EXECUTION && Objects.equals(op.parentId(), parentId)); } // ===== Thread Coordination ===== diff --git a/sdk/src/main/java/software/amazon/lambda/durable/logging/DurableLogger.java b/sdk/src/main/java/software/amazon/lambda/durable/logging/DurableLogger.java index 735e42867..e9fc22622 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/logging/DurableLogger.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/logging/DurableLogger.java @@ -3,17 +3,20 @@ package software.amazon.lambda.durable.logging; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.slf4j.MDC; import software.amazon.lambda.durable.DurableContext; import software.amazon.lambda.durable.StepContext; -import software.amazon.lambda.durable.context.BaseContextImpl; +import software.amazon.lambda.durable.context.BaseContext; +import software.amazon.lambda.durable.model.SafeCloseable; /** * Logger wrapper that adds durable execution context to log entries via MDC and optionally suppresses logs during * replay. */ public class DurableLogger { - static final String MDC_EXECUTION_ARN = "durableExecutionArn"; + static final String MDC_DURABLE_EXECUTION_ARN = "durableExecutionArn"; + static final String MDC_EXECUTION_ARN = "executionArn"; static final String MDC_REQUEST_ID = "requestId"; static final String MDC_OPERATION_ID = "operationId"; static final String MDC_CONTEXT_ID = "contextId"; @@ -21,21 +24,44 @@ public class DurableLogger { static final String MDC_CONTEXT_NAME = "contextName"; static final String MDC_ATTEMPT = "attempt"; + public static final DurableLogger INSTANCE = new DurableLogger(LoggerFactory.getLogger(DurableLogger.class)); + private static final SafeCloseable AUTO_CLOSER = DurableLogger::detachContext; + private final Logger delegate; - private final BaseContextImpl context; /** * Creates a DurableLogger wrapping the given SLF4J logger with execution context MDC entries. * * @param delegate the SLF4J logger to wrap - * @param context the durable execution context providing MDC values */ - public DurableLogger(Logger delegate, BaseContextImpl context) { + public DurableLogger(Logger delegate) { this.delegate = delegate; - this.context = context; + } + + public static SafeCloseable attachContext() { + var context = BaseContext.getCurrentContext(); + if (context != null) { + injectMdcProperties(context); + } + return AUTO_CLOSER; + } + + public static void detachContext() { + var context = BaseContext.getCurrentContext(); + if (context != null) { + MDC.clear(); + } + } + + private static void injectMdcProperties(BaseContext context) { + var config = context.getDurableConfig().getLoggerConfig(); // execution arn - MDC.put(MDC_EXECUTION_ARN, context.getExecutionArn()); + if (config.oldKeyNames()) { + MDC.put(MDC_DURABLE_EXECUTION_ARN, context.getExecutionArn()); + } else { + MDC.put(MDC_EXECUTION_ARN, context.getExecutionArn()); + } // lambda request id var requestId = @@ -47,10 +73,18 @@ public DurableLogger(Logger delegate, BaseContextImpl context) { if (context instanceof DurableContext) { // context thread - context id and name if (context.getContextId() != null) { - MDC.put(MDC_CONTEXT_ID, context.getContextId()); + if (config.oldKeyNames()) { + MDC.put(MDC_CONTEXT_ID, context.getContextId()); + } else { + MDC.put(MDC_OPERATION_ID, context.getContextId()); + } } if (context.getContextName() != null) { - MDC.put(MDC_CONTEXT_NAME, context.getContextName()); + if (config.oldKeyNames()) { + MDC.put(MDC_CONTEXT_NAME, context.getContextName()); + } else { + MDC.put(MDC_OPERATION_NAME, context.getContextName()); + } } } else if (context instanceof StepContext stepContext) { // In step context, context id is the operation id, context name is the operation name @@ -63,11 +97,6 @@ public DurableLogger(Logger delegate, BaseContextImpl context) { } } - /** Clears all MDC entries. User set MDC entries will also be removed as the thread will not be used anymore. */ - public void close() { - MDC.clear(); - } - public void trace(String format, Object... args) { log(() -> delegate.trace(format, args)); } @@ -92,13 +121,13 @@ public void error(String message, Throwable t) { log(() -> delegate.error(message, t)); } - private boolean shouldSuppress() { - return context.getDurableConfig().getLoggerConfig().suppressReplayLogs() - && context.getExecutionManager().isReplaying(); + private boolean shouldSuppress(BaseContext context) { + return context.getDurableConfig().getLoggerConfig().suppressReplayLogs() && context.isReplaying(); } private void log(Runnable logAction) { - if (!shouldSuppress()) { + var threadLocalContext = BaseContext.getCurrentContext(); + if (threadLocalContext == null || !shouldSuppress(threadLocalContext)) { logAction.run(); } } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/logging/LoggerConfig.java b/sdk/src/main/java/software/amazon/lambda/durable/logging/LoggerConfig.java index 66e16045c..017913341 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/logging/LoggerConfig.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/logging/LoggerConfig.java @@ -3,15 +3,15 @@ package software.amazon.lambda.durable.logging; /** Configuration for DurableLogger behavior. */ -public record LoggerConfig(boolean suppressReplayLogs) { +public record LoggerConfig(boolean suppressReplayLogs, boolean oldKeyNames) { /** Default configuration: suppress logs during replay. */ public static LoggerConfig defaults() { - return new LoggerConfig(true); + return new LoggerConfig(true, false); } /** Configuration that allows logs during replay. */ public static LoggerConfig withReplayLogging() { - return new LoggerConfig(false); + return new LoggerConfig(false, false); } } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/model/SafeCloseable.java b/sdk/src/main/java/software/amazon/lambda/durable/model/SafeCloseable.java new file mode 100644 index 000000000..c4559b6bf --- /dev/null +++ b/sdk/src/main/java/software/amazon/lambda/durable/model/SafeCloseable.java @@ -0,0 +1,8 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.model; + +/** A resource that can be safely closed, similar to {@link java.io.Closeable} but without throwing IOException. */ +public interface SafeCloseable extends AutoCloseable { + void close(); +} diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java index 7b47228be..acdda283b 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java @@ -32,6 +32,7 @@ import software.amazon.lambda.durable.exception.UnrecoverableDurableExecutionException; import software.amazon.lambda.durable.execution.SuspendExecutionException; import software.amazon.lambda.durable.execution.ThreadType; +import software.amazon.lambda.durable.logging.DurableLogger; import software.amazon.lambda.durable.model.DeserializedOperationResult; import software.amazon.lambda.durable.model.OperationIdentifier; import software.amazon.lambda.durable.util.ExceptionHelper; @@ -130,7 +131,9 @@ private void executeChildContext() { // When this child is part of a ConcurrencyOperation (parentOperation != null), // we notify the parent BEFORE closing the child context. This ensures the parent // can trigger the next queued branch while the current child context is still valid. - try (var childContext = getContext().createChildContext(contextId, getName(), isVirtual)) { + var childContext = getContext().createChildContext(contextId, getName(), isVirtual); + DurableContextImpl.setCurrentContext(childContext); + try (var ignored = DurableLogger.attachContext()) { try { T result = function.apply(childContext); diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/SerializableDurableOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/SerializableDurableOperation.java index 0b582a9bc..6ccf24e0f 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/SerializableDurableOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/SerializableDurableOperation.java @@ -8,9 +8,7 @@ import software.amazon.lambda.durable.DurableFuture; import software.amazon.lambda.durable.TypeToken; import software.amazon.lambda.durable.context.DurableContextImpl; -import software.amazon.lambda.durable.exception.IllegalDurableOperationException; import software.amazon.lambda.durable.exception.SerDesException; -import software.amazon.lambda.durable.execution.ThreadType; import software.amazon.lambda.durable.model.OperationIdentifier; import software.amazon.lambda.durable.serde.SerDes; import software.amazon.lambda.durable.util.ExceptionHelper; @@ -77,22 +75,6 @@ protected SerializableDurableOperation( this.resultSerDes = resultSerDes; } - /** - * Checks if it's called from a Step. - * - * @throws IllegalDurableOperationException if it's in a step - */ - private void validateCurrentThreadType() { - ThreadType current = getCurrentThreadContext().threadType(); - if (current == ThreadType.STEP) { - var message = String.format( - "Nested %s operation is not supported on %s from within a %s execution.", - getType(), getName(), current); - // terminate execution and throw the exception - throw terminateExecutionWithIllegalDurableOperationException(message); - } - } - /** * Deserializes a result string into the operation's result type. * diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java index 7ac2d4340..ef43c2acf 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java @@ -15,6 +15,7 @@ import software.amazon.lambda.durable.TypeToken; import software.amazon.lambda.durable.config.StepConfig; import software.amazon.lambda.durable.config.StepSemantics; +import software.amazon.lambda.durable.context.BaseContextImpl; import software.amazon.lambda.durable.context.DurableContextImpl; import software.amazon.lambda.durable.exception.DurableOperationException; import software.amazon.lambda.durable.exception.StepFailedException; @@ -22,6 +23,7 @@ import software.amazon.lambda.durable.exception.UnrecoverableDurableExecutionException; import software.amazon.lambda.durable.execution.SuspendExecutionException; import software.amazon.lambda.durable.execution.ThreadType; +import software.amazon.lambda.durable.logging.DurableLogger; import software.amazon.lambda.durable.model.OperationIdentifier; import software.amazon.lambda.durable.util.ExceptionHelper; @@ -104,7 +106,10 @@ private void executeStepLogic(int attempt) { // use a try-with-resources to // - add thread id/type to thread local when the step starts // - clear logger properties when the step finishes - try (StepContext stepContext = getContext().createStepContext(getOperationId(), getName(), attempt)) { + StepContext stepContext = getContext().createStepContext(getOperationId(), getName(), attempt); + BaseContextImpl.setCurrentContext(stepContext); + + try (var ignored = DurableLogger.attachContext()) { try { checkpointStarted(); diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java index a3471179c..8404e1d68 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java @@ -13,12 +13,14 @@ import software.amazon.lambda.durable.StepContext; import software.amazon.lambda.durable.TypeToken; import software.amazon.lambda.durable.config.WaitForConditionConfig; +import software.amazon.lambda.durable.context.BaseContextImpl; import software.amazon.lambda.durable.context.DurableContextImpl; import software.amazon.lambda.durable.exception.DurableOperationException; import software.amazon.lambda.durable.exception.UnrecoverableDurableExecutionException; import software.amazon.lambda.durable.exception.WaitForConditionFailedException; import software.amazon.lambda.durable.execution.SuspendExecutionException; import software.amazon.lambda.durable.execution.ThreadType; +import software.amazon.lambda.durable.logging.DurableLogger; import software.amazon.lambda.durable.model.OperationIdentifier; import software.amazon.lambda.durable.model.WaitForConditionResult; import software.amazon.lambda.durable.util.ExceptionHelper; @@ -112,7 +114,9 @@ private CompletableFuture pollReadyAndResumeCheckLoop(Operation existing) private void executeCheckLogic(T currentState, int attempt) { Runnable userHandler = () -> { - try (var stepContext = getContext().createStepContext(getOperationId(), getName(), attempt)) { + var stepContext = getContext().createStepContext(getOperationId(), getName(), attempt); + BaseContextImpl.setCurrentContext(stepContext); + try (var ignored = DurableLogger.attachContext()) { try { // Checkpoint START if not already started var existing = getOperation(); diff --git a/sdk/src/test/java/software/amazon/lambda/durable/logging/DurableLoggerTest.java b/sdk/src/test/java/software/amazon/lambda/durable/logging/DurableLoggerTest.java index ad13ac825..6377ccacc 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/logging/DurableLoggerTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/logging/DurableLoggerTest.java @@ -2,17 +2,29 @@ // SPDX-License-Identifier: Apache-2.0 package software.amazon.lambda.durable.logging; -import static org.mockito.Mockito.*; +import static org.junit.jupiter.api.Assertions.*; +import java.lang.reflect.Proxy; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.mockito.MockedStatic; import org.slf4j.Logger; import org.slf4j.MDC; +import org.slf4j.helpers.BasicMDCAdapter; +import org.slf4j.spi.MDCAdapter; +import software.amazon.awssdk.services.lambda.model.CheckpointDurableExecutionResponse; +import software.amazon.awssdk.services.lambda.model.GetDurableExecutionStateResponse; +import software.amazon.awssdk.services.lambda.model.OperationUpdate; import software.amazon.lambda.durable.DurableConfig; +import software.amazon.lambda.durable.DurableContext; +import software.amazon.lambda.durable.StepContext; import software.amazon.lambda.durable.TestContext; -import software.amazon.lambda.durable.context.DurableContextImpl; -import software.amazon.lambda.durable.execution.ExecutionManager; +import software.amazon.lambda.durable.client.DurableExecutionClient; +import software.amazon.lambda.durable.context.BaseContext; +import software.amazon.lambda.durable.context.BaseContextImpl; class DurableLoggerTest { private static final String EXECUTION_NAME = "exec-123"; @@ -20,157 +32,301 @@ class DurableLoggerTest { private static final String EXECUTION_ARN = "arn:aws:lambda:us-east-1:123456789012:function:test/durable-execution/" + EXECUTION_NAME + "/" + EXECUTION_OP_ID; private static final String REQUEST_ID = "req-456"; - - private enum Mode { - REPLAYING, - EXECUTING - } - - private enum Suppression { - ENABLED, - DISABLED - } - - private Logger mockLogger; - private ExecutionManager mockExecutionManager; + private MDCAdapter originalMdcAdapter; @BeforeEach - void setUp() { - mockLogger = mock(Logger.class); - mockExecutionManager = mock(ExecutionManager.class); - when(mockExecutionManager.getDurableExecutionArn()).thenReturn(EXECUTION_ARN); + void setUp() throws ReflectiveOperationException { + originalMdcAdapter = MDC.getMDCAdapter(); + setMdcAdapter(new BasicMDCAdapter()); } - private DurableLogger createLogger(Mode mode, Suppression suppression) { - when(mockExecutionManager.isReplaying()).thenReturn(mode == Mode.REPLAYING); - return new DurableLogger(mockLogger, createDurableContext(REQUEST_ID, suppression)); - } - - private DurableContextImpl createDurableContext(String requestId, Suppression suppression) { - return DurableContextImpl.createRootContext( - mockExecutionManager, - DurableConfig.builder() - .withLoggerConfig(new LoggerConfig(suppression == Suppression.ENABLED)) - .build(), - new TestContext(requestId)); + @AfterEach + void tearDown() throws ReflectiveOperationException { + MDC.clear(); + BaseContextImpl.setCurrentContext(null); + DurableLogger.detachContext(); + setMdcAdapter(originalMdcAdapter); } @Test void logsWhenNotReplaying() { - var logger = createLogger(Mode.EXECUTING, Suppression.ENABLED); + var recordingLogger = new RecordingLogger(); + var logger = new DurableLogger(recordingLogger.delegate()); + var replaying = new AtomicBoolean(false); - logger.info("test message"); + withContext( + createDurableContext(replaying, LoggerConfig.defaults(), REQUEST_ID), + () -> logger.info("test message")); - verify(mockLogger).info(eq("test message"), any(Object[].class)); + assertEquals(1, recordingLogger.calls().size()); + assertEquals("info", recordingLogger.calls().get(0).methodName()); + assertEquals("test message", recordingLogger.calls().get(0).message()); } @Test void suppressesLogsWhenReplayingAndSuppressionEnabled() { - var logger = createLogger(Mode.REPLAYING, Suppression.ENABLED); - - logger.trace("suppressed"); - logger.info("should be suppressed"); - logger.debug("also suppressed"); - logger.warn("suppressed too"); - logger.error("even errors suppressed"); - - verify(mockLogger, never()).trace(anyString(), any(Object[].class)); - verify(mockLogger, never()).info(anyString(), any(Object[].class)); - verify(mockLogger, never()).debug(anyString(), any(Object[].class)); - verify(mockLogger, never()).warn(anyString(), any(Object[].class)); - verify(mockLogger, never()).error(anyString(), any(Object[].class)); + var recordingLogger = new RecordingLogger(); + var logger = new DurableLogger(recordingLogger.delegate()); + var replaying = new AtomicBoolean(true); + + withContext(createDurableContext(replaying, LoggerConfig.defaults(), REQUEST_ID), () -> { + logger.trace("suppressed"); + logger.info("should be suppressed"); + logger.debug("also suppressed"); + logger.warn("suppressed too"); + logger.error("even errors suppressed"); + }); + + assertTrue(recordingLogger.calls().isEmpty()); } @Test void logsWhenReplayingButSuppressionDisabled() { - var logger = createLogger(Mode.REPLAYING, Suppression.DISABLED); + var recordingLogger = new RecordingLogger(); + var logger = new DurableLogger(recordingLogger.delegate()); + var replaying = new AtomicBoolean(true); - logger.info("should log during replay"); + withContext( + createDurableContext(replaying, LoggerConfig.withReplayLogging(), REQUEST_ID), + () -> logger.info("should log during replay")); - verify(mockLogger).info(eq("should log during replay"), any(Object[].class)); + assertEquals(1, recordingLogger.calls().size()); + assertEquals("should log during replay", recordingLogger.calls().get(0).message()); } @Test - void setsExecutionMdcInConstructor() { - try (MockedStatic mdcMock = mockStatic(MDC.class)) { - createLogger(Mode.EXECUTING, Suppression.ENABLED); - - mdcMock.verify(() -> MDC.put(DurableLogger.MDC_EXECUTION_ARN, EXECUTION_ARN)); - mdcMock.verify(() -> MDC.put(DurableLogger.MDC_REQUEST_ID, REQUEST_ID)); + void setsExecutionMdcOnFirstLog() { + var logger = new DurableLogger(new RecordingLogger().delegate()); + var replaying = new AtomicBoolean(false); + + BaseContextImpl.setCurrentContext(createDurableContext(replaying, LoggerConfig.defaults(), REQUEST_ID)); + DurableLogger.attachContext(); + try { + logger.info("test"); + + assertEquals(EXECUTION_ARN, MDC.get(DurableLogger.MDC_EXECUTION_ARN)); + assertEquals(REQUEST_ID, MDC.get(DurableLogger.MDC_REQUEST_ID)); + } finally { + DurableLogger.detachContext(); } } @Test void setStepThreadPropertiesSetsMdc() { - try (MockedStatic mdcMock = mockStatic(MDC.class)) { - mdcMock.clearInvocations(); - when(mockExecutionManager.isReplaying()).thenReturn(false); - var logger = new DurableLogger( - mockLogger, - createDurableContext(REQUEST_ID, Suppression.ENABLED) - .createStepContext("op-1", "validateOrder", 2)); - - mdcMock.verify(() -> MDC.put(DurableLogger.MDC_OPERATION_ID, "op-1")); - mdcMock.verify(() -> MDC.put(DurableLogger.MDC_OPERATION_NAME, "validateOrder")); - mdcMock.verify(() -> MDC.put(DurableLogger.MDC_ATTEMPT, "2")); + var logger = new DurableLogger(new RecordingLogger().delegate()); + var replaying = new AtomicBoolean(false); + + BaseContextImpl.setCurrentContext( + createStepContext(replaying, LoggerConfig.defaults(), REQUEST_ID, "op-1", "validateOrder", 2)); + DurableLogger.attachContext(); + try { + logger.info("step log"); + + assertEquals("op-1", MDC.get(DurableLogger.MDC_OPERATION_ID)); + assertEquals("validateOrder", MDC.get(DurableLogger.MDC_OPERATION_NAME)); + assertEquals("2", MDC.get(DurableLogger.MDC_ATTEMPT)); + } finally { + DurableLogger.detachContext(); } } @Test void clearThreadPropertiesRemovesMdc() { - try (MockedStatic mdcMock = mockStatic(MDC.class)) { - var logger = createLogger(Mode.EXECUTING, Suppression.ENABLED); - mdcMock.clearInvocations(); + var logger = new DurableLogger(new RecordingLogger().delegate()); + var replaying = new AtomicBoolean(false); - logger.close(); + BaseContextImpl.setCurrentContext(createDurableContext(replaying, LoggerConfig.defaults(), REQUEST_ID)); + DurableLogger.attachContext(); + logger.info("test"); - mdcMock.verify(() -> MDC.clear()); - } + DurableLogger.detachContext(); + + assertNull(MDC.get(DurableLogger.MDC_EXECUTION_ARN)); + assertNull(MDC.get(DurableLogger.MDC_REQUEST_ID)); } @Test void replayModeTransitionAllowsSubsequentLogs() { - when(mockExecutionManager.isReplaying()).thenReturn(true, false); - var logger = new DurableLogger(mockLogger, createDurableContext(REQUEST_ID, Suppression.ENABLED)); + var recordingLogger = new RecordingLogger(); + var logger = new DurableLogger(recordingLogger.delegate()); + var replaying = new AtomicBoolean(true); + var context = createDurableContext(replaying, LoggerConfig.defaults(), REQUEST_ID); - // During replay - suppressed - logger.info("suppressed"); - verify(mockLogger, never()).info(anyString(), any(Object[].class)); + withContext(context, () -> logger.info("suppressed")); + assertTrue(recordingLogger.calls().isEmpty()); - // After transition to execution mode - logged - logger.info("logged after transition"); - verify(mockLogger).info(eq("logged after transition"), any(Object[].class)); + replaying.set(false); + withContext(context, () -> logger.info("logged after transition")); + + assertEquals(1, recordingLogger.calls().size()); + assertEquals("logged after transition", recordingLogger.calls().get(0).message()); } @Test void allLogLevelsDelegateCorrectly() { - var logger = createLogger(Mode.EXECUTING, Suppression.ENABLED); - - logger.trace("trace msg"); - logger.debug("debug msg"); - logger.info("info msg"); - logger.warn("warn msg"); - logger.error("error msg"); - + var recordingLogger = new RecordingLogger(); + var logger = new DurableLogger(recordingLogger.delegate()); + var replaying = new AtomicBoolean(false); var exception = new RuntimeException("test"); - logger.error("error with exception", exception); - verify(mockLogger).trace(eq("trace msg"), any(Object[].class)); - verify(mockLogger).debug(eq("debug msg"), any(Object[].class)); - verify(mockLogger).info(eq("info msg"), any(Object[].class)); - verify(mockLogger).warn(eq("warn msg"), any(Object[].class)); - verify(mockLogger).error(eq("error msg"), any(Object[].class)); - verify(mockLogger).error("error with exception", exception); + withContext(createDurableContext(replaying, LoggerConfig.defaults(), REQUEST_ID), () -> { + logger.trace("trace msg"); + logger.debug("debug msg"); + logger.info("info msg"); + logger.warn("warn msg"); + logger.error("error msg"); + logger.error("error with exception", exception); + }); + + assertEquals( + List.of("trace", "debug", "info", "warn", "error", "error"), + recordingLogger.calls().stream().map(LogCall::methodName).toList()); + var lastCall = recordingLogger.calls().get(recordingLogger.calls().size() - 1); + assertEquals("error with exception", lastCall.message()); + assertSame(exception, lastCall.throwable()); } @Test void handlesNullRequestId() { - try (MockedStatic mdcMock = mockStatic(MDC.class)) { - when(mockExecutionManager.isReplaying()).thenReturn(false); - new DurableLogger(mockLogger, createDurableContext(null, Suppression.DISABLED)); + var logger = new DurableLogger(new RecordingLogger().delegate()); + var replaying = new AtomicBoolean(false); + + BaseContextImpl.setCurrentContext(createDurableContext(replaying, LoggerConfig.withReplayLogging(), null)); + DurableLogger.attachContext(); + try { + logger.info("test"); + + assertEquals(EXECUTION_ARN, MDC.get(DurableLogger.MDC_EXECUTION_ARN)); + assertNull(MDC.get(DurableLogger.MDC_REQUEST_ID)); + } finally { + DurableLogger.detachContext(); + } + } + + private static DurableContext createDurableContext( + AtomicBoolean replaying, LoggerConfig loggerConfig, String requestId) { + return (DurableContext) Proxy.newProxyInstance( + DurableContext.class.getClassLoader(), + new Class[] {DurableContext.class}, + (proxy, method, args) -> switch (method.getName()) { + case "getExecutionArn" -> EXECUTION_ARN; + case "getLambdaContext" -> requestId == null ? null : new TestContext(requestId); + case "getDurableConfig" -> createDurableConfig(loggerConfig); + case "getContextId" -> null; + case "getContextName" -> null; + case "isReplaying" -> replaying.get(); + case "toString" -> "TestDurableContext"; + case "hashCode" -> System.identityHashCode(proxy); + case "equals" -> proxy == args[0]; + default -> throw new UnsupportedOperationException(method.getName()); + }); + } + + private static StepContext createStepContext( + AtomicBoolean replaying, + LoggerConfig loggerConfig, + String requestId, + String operationId, + String operationName, + int attempt) { + return (StepContext) Proxy.newProxyInstance( + StepContext.class.getClassLoader(), + new Class[] {StepContext.class}, + (proxy, method, args) -> switch (method.getName()) { + case "getExecutionArn" -> EXECUTION_ARN; + case "getLambdaContext" -> requestId == null ? null : new TestContext(requestId); + case "getDurableConfig" -> createDurableConfig(loggerConfig); + case "getContextId" -> operationId; + case "getContextName" -> operationName; + case "getAttempt" -> attempt; + case "isReplaying" -> replaying.get(); + case "toString" -> "TestStepContext"; + case "hashCode" -> System.identityHashCode(proxy); + case "equals" -> proxy == args[0]; + default -> throw new UnsupportedOperationException(method.getName()); + }); + } + + private static void withContext(BaseContext context, Runnable action) { + BaseContextImpl.setCurrentContext(context); + DurableLogger.attachContext(); + try { + action.run(); + } finally { + DurableLogger.detachContext(); + BaseContextImpl.setCurrentContext(null); + } + } + + private static DurableConfig createDurableConfig(LoggerConfig loggerConfig) { + return DurableConfig.builder() + .withLoggerConfig(loggerConfig) + .withDurableExecutionClient(new NoOpDurableExecutionClient()) + .build(); + } + + private static void setMdcAdapter(MDCAdapter adapter) throws ReflectiveOperationException { + var field = MDC.class.getDeclaredField("MDC_ADAPTER"); + field.setAccessible(true); + field.set(null, adapter); + } + + private record LogCall(String methodName, String message, Throwable throwable) {} + + private static final class RecordingLogger { + private final List calls = new ArrayList<>(); + private final Logger delegate = (Logger) Proxy.newProxyInstance( + Logger.class.getClassLoader(), new Class[] {Logger.class}, (proxy, method, args) -> { + switch (method.getName()) { + case "trace", "debug", "info", "warn", "error" -> { + var message = args != null && args.length > 0 ? (String) args[0] : null; + var throwable = + args != null && args.length > 1 && args[1] instanceof Throwable t ? t : null; + calls.add(new LogCall(method.getName(), message, throwable)); + return null; + } + case "getName" -> { + return "recording-logger"; + } + case "isTraceEnabled", "isDebugEnabled", "isInfoEnabled", "isWarnEnabled", "isErrorEnabled" -> { + return true; + } + case "toString" -> { + return "RecordingLogger"; + } + case "hashCode" -> { + return System.identityHashCode(proxy); + } + case "equals" -> { + return proxy == args[0]; + } + default -> { + if (method.getReturnType() == boolean.class) { + return false; + } + return null; + } + } + }); + + private Logger delegate() { + return delegate; + } + + private List calls() { + return calls; + } + } + + private static final class NoOpDurableExecutionClient implements DurableExecutionClient { + @Override + public CheckpointDurableExecutionResponse checkpoint(String arn, String token, List updates) { + throw new UnsupportedOperationException("Not used in DurableLoggerTest"); + } - mdcMock.verify(() -> MDC.put(DurableLogger.MDC_EXECUTION_ARN, EXECUTION_ARN)); - mdcMock.verify(() -> MDC.put(eq(DurableLogger.MDC_REQUEST_ID), anyString()), never()); + @Override + public GetDurableExecutionStateResponse getExecutionState(String arn, String checkpointToken, String marker) { + throw new UnsupportedOperationException("Not used in DurableLoggerTest"); } } }