Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nexus error rehydration #2365

Draft
wants to merge 12 commits into
base: master
Choose a base branch
from
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ plugins {

allprojects {
repositories {
mavenLocal()
mavenCentral()
}
}
Expand All @@ -31,7 +32,7 @@ ext {
// Platforms
grpcVersion = '1.54.1' // [1.38.0,) Needed for io.grpc.protobuf.services.HealthStatusManager
jacksonVersion = '2.14.2' // [2.9.0,)
nexusVersion = '0.3.0-alpha'
nexusVersion = '0.5.0-SNAPSHOT'
// we don't upgrade to 1.10.x because it requires kotlin 1.6. Users may use 1.10.x in their environments though.
micrometerVersion = project.hasProperty("edgeDepsTest") ? '1.13.6' : '1.9.9' // [1.0.0,)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

package io.temporal.opentracing.internal;

import io.nexusrpc.OperationUnsuccessfulException;
import io.nexusrpc.OperationException;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.SpanContext;
Expand Down Expand Up @@ -49,8 +49,7 @@ public OpenTracingNexusOperationInboundCallsInterceptor(
}

@Override
public StartOperationOutput startOperation(StartOperationInput input)
throws OperationUnsuccessfulException {
public StartOperationOutput startOperation(StartOperationInput input) throws OperationException {
SpanContext rootSpanContext =
contextAccessor.readSpanContextFromHeader(input.getOperationContext().getHeaders(), tracer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import io.temporal.api.failure.v1.Failure;
import io.temporal.api.failure.v1.ResetWorkflowFailureInfo;
import io.temporal.api.failure.v1.TimeoutFailureInfo;
import io.temporal.failure.TemporalFailure;
import io.temporal.payload.codec.ChainCodec;
import io.temporal.payload.codec.PayloadCodec;
import io.temporal.payload.context.SerializationContext;
Expand Down Expand Up @@ -199,7 +198,7 @@ public Failure exceptionToFailure(@Nonnull Throwable throwable) {

@Override
@Nonnull
public TemporalFailure failureToException(@Nonnull Failure failure) {
public RuntimeException failureToException(@Nonnull Failure failure) {
Preconditions.checkNotNull(failure, "failure");
return ConverterUtils.withContext(dataConverter, serializationContext)
.failureToException(this.decodeFailure(failure.toBuilder()).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.temporal.api.failure.v1.Failure;
import io.temporal.common.Experimental;
import io.temporal.failure.DefaultFailureConverter;
import io.temporal.failure.TemporalFailure;
import io.temporal.payload.codec.PayloadCodec;
import io.temporal.payload.context.SerializationContext;
import java.lang.reflect.Type;
Expand Down Expand Up @@ -176,7 +175,7 @@ default Object[] fromPayloads(
* @throws NullPointerException if failure is null
*/
@Nonnull
default TemporalFailure failureToException(@Nonnull Failure failure) {
default RuntimeException failureToException(@Nonnull Failure failure) {
Preconditions.checkNotNull(failure, "failure");
return new DefaultFailureConverter().failureToException(failure, this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import io.temporal.api.failure.v1.Failure;
import io.temporal.failure.DefaultFailureConverter;
import io.temporal.failure.TemporalFailure;
import io.temporal.payload.context.SerializationContext;
import javax.annotation.Nonnull;

Expand All @@ -49,7 +48,7 @@ public interface FailureConverter {
* @throws NullPointerException if either failure or dataConverter is null
*/
@Nonnull
TemporalFailure failureToException(
RuntimeException failureToException(
@Nonnull Failure failure, @Nonnull DataConverter dataConverter);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.temporal.api.common.v1.Payloads;
import io.temporal.api.failure.v1.Failure;
import io.temporal.failure.DefaultFailureConverter;
import io.temporal.failure.TemporalFailure;
import io.temporal.payload.context.SerializationContext;
import java.lang.reflect.Type;
import java.util.*;
Expand Down Expand Up @@ -135,7 +134,7 @@ public <T> T fromPayloads(

@Override
@Nonnull
public TemporalFailure failureToException(@Nonnull Failure failure) {
public RuntimeException failureToException(@Nonnull Failure failure) {
Preconditions.checkNotNull(failure, "failure");
return (serializationContext != null
? failureConverter.withContext(serializationContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@

package io.temporal.common.interceptors;

import io.nexusrpc.OperationUnsuccessfulException;
import io.nexusrpc.OperationException;
import io.nexusrpc.handler.*;
import io.temporal.common.Experimental;

/**
* Intercepts inbound calls to a Nexus operation on the worker side.
*
* <p>An instance should be created in {@link
* WorkerInterceptor#interceptNexusOperation(NexusOperationInboundCallsInterceptor)}.
* WorkerInterceptor#interceptNexusOperation(OperationContext,
* NexusOperationInboundCallsInterceptor)}.
*
* <p>Prefer extending {@link NexusOperationInboundCallsInterceptorBase} and overriding only the
* methods you need instead of implementing this interface directly. {@link
Expand Down Expand Up @@ -102,10 +103,9 @@ final class CancelOperationOutput {}
*
* @param input input to the operation start.
* @return result of the operation start.
* @throws OperationUnsuccessfulException if the operation start failed.
* @throws io.nexusrpc.OperationException if the operation start failed.
*/
StartOperationOutput startOperation(StartOperationInput input)
throws OperationUnsuccessfulException;
StartOperationOutput startOperation(StartOperationInput input) throws OperationException;

/**
* Intercepts a call to cancel a Nexus operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

package io.temporal.common.interceptors;

import io.nexusrpc.OperationUnsuccessfulException;
import io.nexusrpc.OperationException;
import io.temporal.common.Experimental;

/** Convenience base class for {@link NexusOperationInboundCallsInterceptor} implementations. */
Expand All @@ -39,8 +39,7 @@ public void init(NexusOperationOutboundCallsInterceptor outboundCalls) {
}

@Override
public StartOperationOutput startOperation(StartOperationInput input)
throws OperationUnsuccessfulException {
public StartOperationOutput startOperation(StartOperationInput input) throws OperationException {
return next.startOperation(input);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import io.nexusrpc.handler.HandlerException;
import io.temporal.api.common.v1.ActivityType;
import io.temporal.api.common.v1.Payloads;
import io.temporal.api.common.v1.WorkflowType;
Expand Down Expand Up @@ -72,21 +73,23 @@ public final class DefaultFailureConverter implements FailureConverter {

@Override
@Nonnull
public TemporalFailure failureToException(
public RuntimeException failureToException(
@Nonnull Failure failure, @Nonnull DataConverter dataConverter) {
Preconditions.checkNotNull(failure, "failure");
Preconditions.checkNotNull(dataConverter, "dataConverter");
TemporalFailure result = failureToExceptionImpl(failure, dataConverter);
result.setFailure(failure);
RuntimeException result = failureToExceptionImpl(failure, dataConverter);
if (result instanceof TemporalFailure) {
((TemporalFailure) result).setFailure(failure);
}
if (failure.getSource().equals(JAVA_SDK) && !failure.getStackTrace().isEmpty()) {
StackTraceElement[] stackTrace = parseStackTrace(failure.getStackTrace());
result.setStackTrace(stackTrace);
}
return result;
}

private TemporalFailure failureToExceptionImpl(Failure failure, DataConverter dataConverter) {
TemporalFailure cause =
private RuntimeException failureToExceptionImpl(Failure failure, DataConverter dataConverter) {
Exception cause =
failure.hasCause() ? failureToException(failure.getCause(), dataConverter) : null;
switch (failure.getFailureInfoCase()) {
case APPLICATION_FAILURE_INFO:
Expand Down Expand Up @@ -187,6 +190,11 @@ private TemporalFailure failureToExceptionImpl(Failure failure, DataConverter da
info.getOperationId(),
cause);
}
case NEXUS_HANDLER_FAILURE_INFO:
{
NexusHandlerFailureInfo info = failure.getNexusHandlerFailureInfo();
return new HandlerException(HandlerException.ErrorType.valueOf(info.getType()), cause);
}
case FAILUREINFO_NOT_SET:
default:
// All unknown types are considered to be retryable ApplicationError.
Expand Down Expand Up @@ -302,14 +310,19 @@ private Failure exceptionToFailure(Throwable throwable) {
failure.setCanceledFailureInfo(info);
} else if (throwable instanceof NexusOperationFailure) {
NexusOperationFailure no = (NexusOperationFailure) throwable;
NexusOperationFailureInfo.Builder info =
NexusOperationFailureInfo.Builder op =
NexusOperationFailureInfo.newBuilder()
.setScheduledEventId(no.getScheduledEventId())
.setEndpoint(no.getEndpoint())
.setService(no.getService())
.setOperation(no.getOperation())
.setOperationId(no.getOperationId());
failure.setNexusOperationExecutionFailureInfo(info);
failure.setNexusOperationExecutionFailureInfo(op);
} else if (throwable instanceof HandlerException) {
HandlerException oe = (HandlerException) throwable;
NexusHandlerFailureInfo.Builder info =
NexusHandlerFailureInfo.newBuilder().setType(oe.getErrorType().toString());
failure.setNexusHandlerFailureInfo(info);
} else {
ApplicationFailureInfo.Builder info =
ApplicationFailureInfo.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,26 @@

package io.temporal.internal.common;

import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import io.nexusrpc.Link;
import io.temporal.api.nexus.v1.Failure;
import io.temporal.common.converter.DataConverter;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;

public class NexusUtil {
private static final JsonFormat.Printer JSON_PRINTER =
JsonFormat.printer().omittingInsignificantWhitespace();
private static final String TEMPORAL_FAILURE_TYPE_STRING =
io.temporal.api.failure.v1.Failure.getDescriptor().getFullName();
private static final Map<String, String> NEXUS_FAILURE_METADATA =
Collections.singletonMap("type", TEMPORAL_FAILURE_TYPE_STRING);

public static Duration parseRequestTimeout(String timeout) {
try {
if (timeout.endsWith("m")) {
Expand Down Expand Up @@ -53,5 +67,23 @@ public static Link nexusProtoLinkToLink(io.temporal.api.nexus.v1.Link nexusLink)
.build();
}

public static Failure exceptionToNexusFailure(Throwable exception, DataConverter dataConverter) {
io.temporal.api.failure.v1.Failure failure = dataConverter.exceptionToFailure(exception);
String details;
try {
details = JSON_PRINTER.print(failure.toBuilder().setMessage("").build());
} catch (InvalidProtocolBufferException e) {
return Failure.newBuilder()
.setMessage("Failed to serialize failure details")
.setDetails(ByteString.copyFromUtf8(e.getMessage()))
.build();
}
return Failure.newBuilder()
.setMessage(failure.getMessage())
.setDetails(ByteString.copyFromUtf8(details))
.putAllMetadata(NEXUS_FAILURE_METADATA)
.build();
}

private NexusUtil() {}
}
Loading
Loading