Skip to content

Commit

Permalink
Fixed root workflow thread names and removed excessive error logging (#…
Browse files Browse the repository at this point in the history
…136)

* remove excessive options check

* Removed excessive check

* fixed gradle syntax

* Fixed root thread name and removed duplicated error loggin

* forgotten log statement removed
  • Loading branch information
mfateev authored Jun 27, 2020
1 parent 6105d49 commit 15e9801
Show file tree
Hide file tree
Showing 10 changed files with 52 additions and 57 deletions.
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,8 @@ publishing {
nexusPublishing {
repositories {
sonatype {
username = hasProperty('ossrhUsername') ? ossrhUsername : ''
password = hasProperty('ossrhPassword') ? ossrhPassword : ''
username = project.hasProperty('ossrhUsername') ? project.property('ossrhUsername') : ''
password = project.hasProperty('ossrhPassword') ? project.property('ossrhPassword') : ''
nexusUrl = uri("https://oss.sonatype.org/service/local/staging/deploy/maven2/")
snapshotRepositoryUrl = uri("https://oss.sonatype.org/content/repositories/snapshots/")
}
Expand Down
6 changes: 0 additions & 6 deletions src/main/java/io/temporal/activity/ActivityOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,6 @@ public ActivityOptions build() {
}

public ActivityOptions validateAndBuildWithDefaults() {
if (scheduleToCloseTimeout == null
&& (scheduleToStartTimeout == null || startToCloseTimeout == null)) {
throw new IllegalStateException(
"Either ScheduleToClose or both ScheduleToStart and StartToClose "
+ "timeouts are required: ");
}
return new ActivityOptions(
heartbeatTimeout,
scheduleToCloseTimeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@ static DeterministicRunner newRunner(
ExecutorService threadPool,
SyncDecisionContext decisionContext,
Supplier<Long> clock,
String rootThreadName,
Runnable root,
DeciderCache cache) {
return new DeterministicRunnerImpl(threadPool, decisionContext, clock, root, cache);
return new DeterministicRunnerImpl(
threadPool, decisionContext, clock, rootThreadName, root, cache);
}

/**
Expand All @@ -72,8 +74,10 @@ static DeterministicRunner newRunner(
ExecutorService threadPool,
SyncDecisionContext decisionContext,
Supplier<Long> clock,
String rootThreadName,
Runnable root) {
return new DeterministicRunnerImpl(threadPool, decisionContext, clock, root, null);
return new DeterministicRunnerImpl(
threadPool, decisionContext, clock, rootThreadName, root, null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private NamedRunnable(String name, Runnable runnable) {
}

private static final Logger log = LoggerFactory.getLogger(DeterministicRunnerImpl.class);
static final String WORKFLOW_ROOT_THREAD_NAME = "workflow-root";
static final String WORKFLOW_ROOT_THREAD_NAME = "workflow-method";
private static final ThreadLocal<WorkflowThread> currentThreadThreadLocal = new ThreadLocal<>();

private final Lock lock = new ReentrantLock();
Expand Down Expand Up @@ -158,7 +158,13 @@ static void setCurrentThreadInternal(WorkflowThread coroutine) {
}

DeterministicRunnerImpl(Supplier<Long> clock, Runnable root) {
this(getDefaultThreadPool(), newDummySyncDecisionContext(), clock, root, null);
this(
getDefaultThreadPool(),
newDummySyncDecisionContext(),
clock,
WORKFLOW_ROOT_THREAD_NAME,
root,
null);
}

private static ThreadPoolExecutor getDefaultThreadPool() {
Expand All @@ -173,13 +179,14 @@ private static ThreadPoolExecutor getDefaultThreadPool() {
SyncDecisionContext decisionContext,
Supplier<Long> clock,
Runnable root) {
this(threadPool, decisionContext, clock, root, null);
this(threadPool, decisionContext, clock, WORKFLOW_ROOT_THREAD_NAME, root, null);
}

DeterministicRunnerImpl(
ExecutorService threadPool,
SyncDecisionContext decisionContext,
Supplier<Long> clock,
String rootName,
Runnable root,
DeciderCache cache) {
this.threadPool = threadPool;
Expand Down Expand Up @@ -386,8 +393,7 @@ public void close() {
throw new Error("unreachable");
} catch (RuntimeException e) {
log.warn(
"Promise that was completedExceptionally was never accessed. "
+ "The ignored exception:",
"Promise completed with exception and was never accessed. The ignored exception:",
CheckedExceptionWrapper.unwrap(e));
}
}
Expand Down
7 changes: 6 additions & 1 deletion src/main/java/io/temporal/internal/sync/SyncWorkflow.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,14 @@ public void start(HistoryEvent event, DecisionContext context) {
threadPool,
syncContext,
context::currentTimeMillis,
"interceptor-init",
() -> {
workflow.initialize();
WorkflowInternal.newThread(false, "root", () -> workflowProc.run()).start();
WorkflowInternal.newThread(
false,
DeterministicRunnerImpl.WORKFLOW_ROOT_THREAD_NAME,
() -> workflowProc.run())
.start();
},
cache);
runner.setInterceptorHead(syncContext.getWorkflowInterceptor());
Expand Down
19 changes: 0 additions & 19 deletions src/main/java/io/temporal/internal/sync/WorkflowThreadImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,6 @@ public void run() {
threadContext.setUnhandledException(e);
}
} catch (Error e) {
// Error aborts decision, not fails the workflow.
if (log.isErrorEnabled() && !root) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw, true);
e.printStackTrace(pw);
String stackTrace = sw.getBuffer().toString();
log.error(
String.format("Workflow thread \"%s\" run failed with Error:\n%s", name, stackTrace));
}
threadContext.setUnhandledException(e);
} catch (CanceledFailure e) {
if (!isCancelRequested()) {
Expand All @@ -126,16 +117,6 @@ public void run() {
log.debug(String.format("Workflow thread \"%s\" run cancelled", name));
}
} catch (Throwable e) {
if (log.isWarnEnabled() && !root) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw, true);
e.printStackTrace(pw);
String stackTrace = sw.getBuffer().toString();
log.warn(
String.format(
"Workflow thread \"%s\" run failed with unhandled exception:\n%s",
name, stackTrace));
}
threadContext.setUnhandledException(e);
} finally {
DeterministicRunnerImpl.setCurrentThreadInternal(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,7 @@ public void workflowThreadsWillEvictCacheWhenMaxThreadCountIsHit() throws Throwa
new SyncDecisionContext(
decisionContext, DataConverter.getDefaultInstance(), null, null),
() -> 0L, // clock override
"test-thread",
() -> {
Promise<Void> thread =
Async.procedure(
Expand All @@ -759,6 +760,7 @@ public void workflowThreadsWillEvictCacheWhenMaxThreadCountIsHit() throws Throwa
new SyncDecisionContext(
decisionContext, DataConverter.getDefaultInstance(), null, null),
() -> 0L, // clock override
"test-thread",
() -> {
Promise<Void> thread =
Async.procedure(
Expand Down Expand Up @@ -796,6 +798,7 @@ public void workflowThreadsWillNotEvictCacheWhenMaxThreadCountIsHit() throws Thr
threadPool,
null,
() -> 0L, // clock override
"test-thread",
() -> {
Promise<Void> thread =
Async.procedure(
Expand All @@ -821,6 +824,7 @@ public void workflowThreadsWillNotEvictCacheWhenMaxThreadCountIsHit() throws Thr
threadPool,
null,
() -> 0L, // clock override
"test-thread",
() -> {
Promise<Void> thread =
Async.procedure(
Expand Down
1 change: 1 addition & 0 deletions src/test/java/io/temporal/internal/sync/PromiseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public void testGetTimeout() throws Throwable {
threadPool,
null,
() -> currentTime,
"test-thread",
() -> {
CompletablePromise<String> f = Workflow.newPromise();
trace.add("root begin");
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/io/temporal/worker/StickyWorkerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ public void whenCacheIsEvictedTheWorkerCanRecover() throws Exception {
// Act
WorkflowClient.start(workflow::getGreeting);

Thread.sleep(200); // Wait for workflow to start
Thread.sleep(1000); // Wait for workflow to start

DeciderCache cache = factory.getCache();
assertNotNull(cache);
Expand Down
42 changes: 21 additions & 21 deletions src/test/java/io/temporal/workflow/WorkflowTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ public void testSync() {
assertEquals("activity10", result);
tracer.setExpected(
"interceptExecuteWorkflow " + UUID_REGEXP,
"newThread root",
"newThread workflow-method",
"newThread null",
"sleep PT2S",
"executeActivity ActivityWithDelay",
Expand Down Expand Up @@ -1499,19 +1499,19 @@ public void testContinueAsNew() {
assertEquals(111, result);
tracer.setExpected(
"interceptExecuteWorkflow " + UUID_REGEXP,
"newThread root",
"newThread workflow-method",
"continueAsNew",
"interceptExecuteWorkflow " + UUID_REGEXP,
"newThread root",
"newThread workflow-method",
"continueAsNew",
"interceptExecuteWorkflow " + UUID_REGEXP,
"newThread root",
"newThread workflow-method",
"continueAsNew",
"interceptExecuteWorkflow " + UUID_REGEXP,
"newThread root",
"newThread workflow-method",
"continueAsNew",
"interceptExecuteWorkflow " + UUID_REGEXP,
"newThread root");
"newThread workflow-method");
}

@WorkflowInterface
Expand Down Expand Up @@ -1546,10 +1546,10 @@ public void testContinueAsNewNoArgs() {
assertEquals("done", result);
tracer.setExpected(
"interceptExecuteWorkflow " + UUID_REGEXP,
"newThread root",
"newThread workflow-method",
"continueAsNew",
"interceptExecuteWorkflow " + UUID_REGEXP,
"newThread root");
"newThread workflow-method");
}

public static class TestAsyncActivityWorkflowImpl implements TestWorkflow1 {
Expand Down Expand Up @@ -2369,15 +2369,15 @@ public void testTimer() {
tracer.setExpected(
"interceptExecuteWorkflow " + UUID_REGEXP,
"registerQuery getTrace",
"newThread root",
"newThread workflow-method",
"newTimer PT0.7S",
"newTimer PT1.3S",
"newTimer PT10S");
} else {
tracer.setExpected(
"interceptExecuteWorkflow " + UUID_REGEXP,
"registerQuery getTrace",
"newThread root",
"newThread workflow-method",
"newTimer PT11M40S",
"newTimer PT21M40S",
"newTimer PT10H");
Expand Down Expand Up @@ -3482,10 +3482,10 @@ public void testSignalExternalWorkflow() {
tracer.setExpected(
"interceptExecuteWorkflow " + stub.getExecution().getWorkflowId(),
"registerSignal testSignal",
"newThread root",
"newThread workflow-method",
"executeChildWorkflow SignalingChild",
"interceptExecuteWorkflow " + UUID_REGEXP, // child
"newThread root",
"newThread workflow-method",
"signalExternalWorkflow " + UUID_REGEXP + " testSignal");
}

Expand Down Expand Up @@ -4573,7 +4573,7 @@ public void testSideEffect() {
assertEquals("activity1", result);
tracer.setExpected(
"interceptExecuteWorkflow " + UUID_REGEXP,
"newThread root",
"newThread workflow-method",
"sideEffect",
"sleep PT1S",
"executeActivity customActivity1");
Expand Down Expand Up @@ -4671,7 +4671,7 @@ public void testGetVersion() {
assertEquals("activity22activity1activity1activity1", result);
tracer.setExpected(
"interceptExecuteWorkflow " + UUID_REGEXP,
"newThread root",
"newThread workflow-method",
"getVersion",
"executeActivity Activity2",
"getVersion",
Expand Down Expand Up @@ -4984,7 +4984,7 @@ public void testGetVersionRemovedInReplay() {
assertEquals("activity22activity", result);
tracer.setExpected(
"interceptExecuteWorkflow " + UUID_REGEXP,
"newThread root",
"newThread workflow-method",
"getVersion",
"executeActivity Activity2",
"executeActivity Activity");
Expand Down Expand Up @@ -5022,7 +5022,7 @@ public void testGetVersionRemovedBefore() {
assertEquals("activity", result);
tracer.setExpected(
"interceptExecuteWorkflow " + UUID_REGEXP,
"newThread root",
"newThread workflow-method",
"getVersion",
"getVersion",
"getVersion",
Expand Down Expand Up @@ -5225,7 +5225,7 @@ public void testUUIDAndRandom() {
assertEquals("foo10", result);
tracer.setExpected(
"interceptExecuteWorkflow " + UUID_REGEXP,
"newThread root",
"newThread workflow-method",
"sideEffect",
"sideEffect",
"executeActivity Activity2");
Expand Down Expand Up @@ -5934,15 +5934,15 @@ public void testSaga() {
sagaWorkflow.execute(taskList, false);
tracer.setExpected(
"interceptExecuteWorkflow " + UUID_REGEXP,
"newThread root",
"newThread workflow-method",
"executeActivity customActivity1",
"executeChildWorkflow TestMultiargsWorkflowsFunc",
"interceptExecuteWorkflow " + UUID_REGEXP,
"newThread root",
"newThread workflow-method",
"executeActivity ThrowIO",
"executeChildWorkflow TestCompensationWorkflow",
"interceptExecuteWorkflow " + UUID_REGEXP,
"newThread root",
"newThread workflow-method",
"executeActivity Activity2");
}

Expand Down Expand Up @@ -6063,7 +6063,7 @@ public void testUpsertSearchAttributes() {
assertEquals("done", result);
tracer.setExpected(
"interceptExecuteWorkflow " + UUID_REGEXP,
"newThread root",
"newThread workflow-method",
"upsertSearchAttributes",
"executeActivity Activity");
}
Expand Down

0 comments on commit 15e9801

Please sign in to comment.