-
Notifications
You must be signed in to change notification settings - Fork 21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added Counter Interceptor Sample #77
Merged
Merged
Changes from all commits
Commits
Show all changes
20 commits
Select commit
Hold shift + click to select a range
d678366
Added CounterInterceptor Sample
rross f428229
Fixed spelling error. Added tests
rross 26fdbee
Fixed whitespace format issue
rross c68e75d
Changes from PR suggestions
rross ec9da55
Addressing PR comments; Fixed Query bug
rross 31c0743
Refactored to use properties
rross ca90698
Made task queue a guid. Moved test to be in alphabetical order
rross 7ac67a9
Updated readme. Consistent case for comments
rross 34c83af
Addressing PR comments
rross 7682937
Addressing PR comments
rross 37692a7
Removed/Added CounterInterceptor project to avoid conflict with head
rross d548089
Resolving merge conflict
rross d113bd3
Merge branch 'main' into CounterInterceptor
rross c0131fd
Updated tests to use base class
rross 4557ffe
Addressing PR comments
rross c7b6e31
Added logic for detecting replay; fixed test
rross f3a82cc
Changed Count to a class; changed activities to use interpolation
rross 078bbe6
Added testing instructions to README
rross fc4fada
Changed output from worker to unique workflows
rross c6f5e55
Updated README to be consistent with others
rross File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
namespace TemporalioSamples.CounterInterceptor; | ||
public class Counts | ||
{ | ||
private uint clientExecutions; | ||
private uint clientQueries; | ||
private uint clientSignals; | ||
private uint workflowReplays; | ||
private uint workflowSignals; | ||
private uint workflowQueries; | ||
private uint workflowChildExecutions; | ||
private uint workflowActivityExecutions; | ||
|
||
public ref uint ClientExecutions => ref clientExecutions; | ||
|
||
public ref uint ClientSignals => ref clientSignals; | ||
|
||
public ref uint ClientQueries => ref clientQueries; | ||
|
||
public string ClientInfo() => | ||
$"\n\tTotal Number of Workflow Exec: {ClientExecutions}\n\t" + | ||
$"Total Number of Signals: {ClientSignals}\n\t" + | ||
$"Total Number of Queries: {ClientQueries}"; | ||
|
||
public ref uint WorkflowReplays => ref workflowReplays; | ||
|
||
public ref uint WorkflowSignals => ref workflowSignals; | ||
|
||
public ref uint WorkflowQueries => ref workflowQueries; | ||
|
||
public ref uint WorkflowChildExecutions => ref workflowChildExecutions; | ||
|
||
public ref uint WorkflowActivityExecutions => ref workflowActivityExecutions; | ||
|
||
public string WorkflowInfo() => | ||
$"\n\tTotal Number of Workflow Replays: {WorkflowReplays}\n\t" + | ||
$"Total Number of Child Workflow Exec: {WorkflowChildExecutions}\n\t" + | ||
$"Total Number of Activity Exec: {WorkflowActivityExecutions}\n\t" + | ||
$"Total Number of Signals: {WorkflowSignals}\n\t" + | ||
$"Total Number of Queries: {WorkflowQueries}"; | ||
|
||
public override string ToString() => | ||
ClientInfo() + WorkflowInfo(); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
namespace TemporalioSamples.CounterInterceptor; | ||
|
||
using System.Diagnostics; | ||
using Temporalio.Activities; | ||
|
||
public class MyActivities | ||
{ | ||
[Activity] | ||
public string SayHello(string name, string title) => | ||
$"Hello {title} {name}"; | ||
|
||
[Activity] | ||
public string SayGoodBye(string name, string title) => | ||
$"Goodby {title} {name}"; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
namespace TemporalioSamples.CounterInterceptor; | ||
|
||
using Temporalio.Workflows; | ||
|
||
[Workflow] | ||
public class MyChildWorkflow | ||
{ | ||
private readonly ActivityOptions activityOptions = new() | ||
{ | ||
StartToCloseTimeout = TimeSpan.FromSeconds(10), | ||
}; | ||
|
||
[WorkflowRun] | ||
public async Task<string> RunAsync(string name, string title) => | ||
await Workflow.ExecuteActivityAsync((MyActivities act) => act.SayHello(name, title), activityOptions) + | ||
await Workflow.ExecuteActivityAsync((MyActivities act) => act.SayGoodBye(name, title), activityOptions); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
namespace TemporalioSamples.CounterInterceptor; | ||
|
||
using System.Collections.Concurrent; | ||
using Temporalio.Activities; | ||
using Temporalio.Client; | ||
using Temporalio.Client.Interceptors; | ||
using Temporalio.Worker.Interceptors; | ||
using Temporalio.Workflows; | ||
|
||
public class MyCounterInterceptor : IClientInterceptor, IWorkerInterceptor | ||
{ | ||
public ConcurrentDictionary<string, Counts> Counts { get; } = new(); | ||
|
||
public string WorkerInfo() => | ||
string.Join( | ||
"\n", | ||
Counts.Select(kvp => $"** Workflow ID: {kvp.Key} {kvp.Value.WorkflowInfo()}")); | ||
|
||
public string ClientInfo() => | ||
string.Join( | ||
"\n", | ||
Counts.Select(kvp => $"** Workflow ID: {kvp.Key} {kvp.Value.ClientInfo()}")); | ||
|
||
public ClientOutboundInterceptor InterceptClient(ClientOutboundInterceptor nextInterceptor) => | ||
new ClientOutbound(this, nextInterceptor); | ||
|
||
public WorkflowInboundInterceptor InterceptWorkflow(WorkflowInboundInterceptor nextInterceptor) => | ||
new WorkflowInbound(this, nextInterceptor); | ||
|
||
public ActivityInboundInterceptor InterceptActivity(ActivityInboundInterceptor nextInterceptor) => | ||
new ActivityInbound(this, nextInterceptor); | ||
|
||
private void Increment(string id, Action<Counts> increment) => | ||
increment(Counts.GetOrAdd(id, _ => new())); | ||
|
||
private sealed class ClientOutbound : ClientOutboundInterceptor | ||
{ | ||
private MyCounterInterceptor root; | ||
|
||
public ClientOutbound(MyCounterInterceptor root, ClientOutboundInterceptor next) | ||
: base(next) => this.root = root; | ||
|
||
public override Task<WorkflowHandle<TWorkflow, TResult>> StartWorkflowAsync<TWorkflow, TResult>( | ||
StartWorkflowInput input) | ||
{ | ||
var id = input.Options.Id ?? "None"; | ||
root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].ClientExecutions)); | ||
return base.StartWorkflowAsync<TWorkflow, TResult>(input); | ||
} | ||
|
||
public override Task SignalWorkflowAsync(SignalWorkflowInput input) | ||
{ | ||
var id = input.Id; | ||
root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].ClientSignals)); | ||
return base.SignalWorkflowAsync(input); | ||
} | ||
|
||
public override Task<TResult> QueryWorkflowAsync<TResult>(QueryWorkflowInput input) | ||
{ | ||
var id = input.Id; | ||
root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].ClientQueries)); | ||
return base.QueryWorkflowAsync<TResult>(input); | ||
} | ||
} | ||
|
||
private sealed class WorkflowInbound : WorkflowInboundInterceptor | ||
{ | ||
private readonly MyCounterInterceptor root; | ||
|
||
internal WorkflowInbound(MyCounterInterceptor root, WorkflowInboundInterceptor next) | ||
: base(next) => this.root = root; | ||
|
||
public override void Init(WorkflowOutboundInterceptor outbound) => | ||
base.Init(new WorkflowOutbound(root, outbound)); | ||
|
||
public override Task<object?> ExecuteWorkflowAsync(ExecuteWorkflowInput input) | ||
{ | ||
// Count only if we're not replaying | ||
if (!Workflow.Unsafe.IsReplaying) | ||
{ | ||
var id = Workflow.Info.WorkflowId; | ||
root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].WorkflowReplays)); | ||
} | ||
return base.ExecuteWorkflowAsync(input); | ||
} | ||
|
||
public override Task HandleSignalAsync(HandleSignalInput input) | ||
{ | ||
// Count only if we're not replaying | ||
if (!Workflow.Unsafe.IsReplaying) | ||
{ | ||
var id = Workflow.Info.WorkflowId; | ||
root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].WorkflowSignals)); | ||
} | ||
return base.HandleSignalAsync(input); | ||
} | ||
|
||
public override object? HandleQuery(HandleQueryInput input) | ||
{ | ||
// Count only if we're not replaying | ||
if (!Workflow.Unsafe.IsReplaying) | ||
{ | ||
var id = Workflow.Info.WorkflowId; | ||
root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].WorkflowQueries)); | ||
} | ||
return base.HandleQuery(input); | ||
} | ||
} | ||
|
||
private sealed class WorkflowOutbound : WorkflowOutboundInterceptor | ||
{ | ||
private readonly MyCounterInterceptor root; | ||
|
||
internal WorkflowOutbound(MyCounterInterceptor root, WorkflowOutboundInterceptor next) | ||
: base(next) => this.root = root; | ||
|
||
public override Task<ChildWorkflowHandle<TWorkflow, TResult>> StartChildWorkflowAsync<TWorkflow, TResult>( | ||
StartChildWorkflowInput input) | ||
{ | ||
// Count only if we're not replaying | ||
if (!Workflow.Unsafe.IsReplaying) | ||
{ | ||
var id = Workflow.Info.WorkflowId; | ||
root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].WorkflowChildExecutions)); | ||
} | ||
return base.StartChildWorkflowAsync<TWorkflow, TResult>(input); | ||
} | ||
} | ||
|
||
private sealed class ActivityInbound : ActivityInboundInterceptor | ||
{ | ||
private readonly MyCounterInterceptor root; | ||
|
||
internal ActivityInbound(MyCounterInterceptor root, ActivityInboundInterceptor next) | ||
: base(next) => this.root = root; | ||
|
||
public override Task<object?> ExecuteActivityAsync(ExecuteActivityInput input) | ||
{ | ||
var id = ActivityExecutionContext.Current.Info.WorkflowId; | ||
root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].WorkflowActivityExecutions)); | ||
return base.ExecuteActivityAsync(input); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
namespace TemporalioSamples.CounterInterceptor; | ||
|
||
using Temporalio.Workflows; | ||
|
||
[Workflow] | ||
public class MyWorkflow | ||
{ | ||
private bool exit; // Automatically defaults to false | ||
|
||
[WorkflowRun] | ||
public async Task<string> RunAsync() | ||
{ | ||
// Wait for greeting info | ||
await Workflow.WaitConditionAsync(() => | ||
!string.IsNullOrEmpty(Name) && !string.IsNullOrEmpty(Title)); | ||
|
||
// Execute Child Workflow | ||
var result = await Workflow.ExecuteChildWorkflowAsync( | ||
(MyChildWorkflow wf) => wf.RunAsync(Name, Title), | ||
new() { Id = "counter-interceptor-child" }); | ||
|
||
// Wait for exit signal | ||
await Workflow.WaitConditionAsync(() => exit); | ||
|
||
return result; | ||
} | ||
|
||
[WorkflowSignal] | ||
public async Task SignalNameAndTitleAsync(string name, string title) | ||
{ | ||
Name = name; | ||
Title = title; | ||
} | ||
|
||
[WorkflowQuery] | ||
public string Name { get; private set; } = string.Empty; | ||
|
||
[WorkflowQuery] | ||
public string Title { get; private set; } = string.Empty; | ||
|
||
[WorkflowSignal] | ||
public async Task ExitAsync() => exit = true; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
namespace TemporalioSamples.CounterInterceptor; | ||
|
||
using Temporalio.Client; | ||
using Temporalio.Worker; | ||
|
||
internal class Program | ||
{ | ||
private static async Task Main(string[] args) | ||
{ | ||
var counterInterceptor = new MyCounterInterceptor(); | ||
var client = await TemporalClient.ConnectAsync( | ||
options: new("localhost:7233") | ||
{ | ||
Interceptors = new[] | ||
{ | ||
counterInterceptor, | ||
}, | ||
}); | ||
|
||
var activities = new MyActivities(); | ||
|
||
var taskQueue = "CounterInterceptorTaskQueue"; | ||
|
||
var workerOptions = new TemporalWorkerOptions(taskQueue). | ||
AddAllActivities(activities). | ||
AddWorkflow<MyWorkflow>(). | ||
AddWorkflow<MyChildWorkflow>(); | ||
|
||
// workerOptions.Interceptors = new[] { counterInterceptor }; | ||
using var worker = new TemporalWorker( | ||
client, | ||
workerOptions); | ||
|
||
// Run worker until cancelled | ||
Console.WriteLine("Running worker..."); | ||
|
||
// Start the workers | ||
await worker.ExecuteAsync(async () => | ||
{ | ||
// Start the workflow | ||
var handle = await client.StartWorkflowAsync( | ||
(MyWorkflow wf) => wf.RunAsync(), | ||
new(id: Guid.NewGuid().ToString(), taskQueue: taskQueue)); | ||
|
||
Console.WriteLine("Sending name and title to workflow"); | ||
await handle.SignalAsync(wf => wf.SignalNameAndTitleAsync("John", "Customer")); | ||
|
||
var name = await handle.QueryAsync(wf => wf.Name); | ||
var title = await handle.QueryAsync(wf => wf.Title); | ||
|
||
// Send exit signal to workflow | ||
await handle.SignalAsync(wf => wf.ExitAsync()); | ||
|
||
var result = await handle.GetResultAsync(); | ||
|
||
Console.WriteLine($"Workflow result is {result}"); | ||
|
||
Console.WriteLine("Query results: "); | ||
Console.WriteLine($"\tName: {name}"); | ||
Console.WriteLine($"\tTitle: {title}"); | ||
|
||
// Print worker counter info | ||
Console.WriteLine("\nCollected Worker Counter Info:\n"); | ||
Console.WriteLine(counterInterceptor.WorkerInfo()); | ||
Console.WriteLine($"Number of unique workflows: {counterInterceptor.Counts.Count}"); | ||
|
||
// Print client counter info | ||
Console.WriteLine(); | ||
Console.WriteLine("Collected Client Counter Info:\n"); | ||
Console.WriteLine(counterInterceptor.ClientInfo()); | ||
}); | ||
} | ||
} |
rross marked this conversation as resolved.
Show resolved
Hide resolved
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
# dotnet-counter-interceptor | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not really the best title, but that's ok |
||
The sample demonstrates: | ||
- the use of a Worker Workflow Interceptor that counts the number of Workflow Executions, Child Workflow Executions, and Activity Executions and the number of Signals and Queries. It is based | ||
off of the [Java sample](https://github.com/temporalio/samples-java/tree/main) located [here](https://github.com/temporalio/samples-java/tree/main/core/src/main/java/io/temporal/samples/countinterceptor) | ||
- the use of a Client Workflow Interceptor that counts the number of Workflow Executions and the number of Signals and Queries. | ||
|
||
To run, first see [README.md](https://github.com/temporalio/samples-dotnet/blob/main/README.md) for prerequisites | ||
|
||
## Run Worker and Client | ||
```bash | ||
# make sure you have temporal server running | ||
dotnet run | ||
``` |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are inconsistent with whether you add spacing between namespace and type declaration