Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Counter Interceptor Sample #77

Merged
merged 20 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Prerequisites:
* [AspNet](src/AspNet) - Demonstration of a generic host worker and an ASP.NET workflow starter.
* [ClientMtls](src/ClientMtls) - How to use client certificate authentication, e.g. for Temporal Cloud.
* [ContextPropagation](src/ContextPropagation) - Context propagation via interceptors.
* [CounterInterceptor](src/CounterInterceptor/) - Simple Workflow and Client Interceptors example.
* [DependencyInjection](src/DependencyInjection) - How to inject dependencies in activities and use generic hosts for workers
* [Encryption](src/Encryption) - End-to-end encryption with Temporal payload codecs.
* [Mutex](src/Mutex) - How to implement a mutex as a workflow. Demonstrates how to avoid race conditions or parallel mutually exclusive operations on the same resource.
Expand Down
7 changes: 7 additions & 0 deletions TemporalioSamples.sln
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.ContextPr
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.SafeMessageHandlers", "src\SafeMessageHandlers\TemporalioSamples.SafeMessageHandlers.csproj", "{FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.CounterInterceptor", "src\CounterInterceptor\TemporalioSamples.CounterInterceptor.csproj", "{F9C44936-8BF9-4919-BB66-8F1888E22AEB}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -155,6 +157,10 @@ Global
{FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7}.Release|Any CPU.Build.0 = Release|Any CPU
{F9C44936-8BF9-4919-BB66-8F1888E22AEB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F9C44936-8BF9-4919-BB66-8F1888E22AEB}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F9C44936-8BF9-4919-BB66-8F1888E22AEB}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F9C44936-8BF9-4919-BB66-8F1888E22AEB}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -185,5 +191,6 @@ Global
{B3DB7B8C-7BD3-4A53-A809-AB6279B1A630} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
{7B797D20-485F-441D-8E71-AF7E315FA9CF} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
{FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
{F9C44936-8BF9-4919-BB66-8F1888E22AEB} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
EndGlobalSection
EndGlobal
54 changes: 54 additions & 0 deletions src/CounterInterceptor/Counts.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
public record Counts
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No benefit to making a record, might as well make a class. Also, make sure you put the namespace on this file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

{
public Counts()
{
clientExecutions = 0;
clientQueries = 0;
clientSignals = 0;
workflowExecutions = 0;
workflowSignals = 0;
workflowQueries = 0;
workflowChildExecutions = 0;
workflowActivityExecutions = 0;
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public Counts()
{
clientExecutions = 0;
clientQueries = 0;
clientSignals = 0;
workflowExecutions = 0;
workflowSignals = 0;
workflowQueries = 0;
workflowChildExecutions = 0;
workflowActivityExecutions = 0;
}

Not needed, this is the default value already of every field here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

private uint clientExecutions;
private uint clientQueries;
private uint clientSignals;
private uint workflowExecutions;
private uint workflowSignals;
private uint workflowQueries;
private uint workflowChildExecutions;
private uint workflowActivityExecutions;

public ref uint ClientExecutions => ref clientExecutions;

public ref uint ClientSignals => ref clientSignals;

public ref uint ClientQueries => ref clientQueries;

public string ClientInfo() =>
$"\n\tTotal Number of Workflow Exec: {ClientExecutions}\n\t" +
$"Total Number of Signals: {ClientSignals}\n\t" +
$"Total Number of Queries: {ClientQueries}";

public ref uint WorkflowExecutions => ref workflowExecutions;

public ref uint WorkflowSignals => ref workflowSignals;

public ref uint WorkflowQueries => ref workflowQueries;

public ref uint WorkflowChildExecutions => ref workflowChildExecutions;

public ref uint WorkflowActivityExecutions => ref workflowActivityExecutions;

public string WorkflowInfo() =>
$"\n\tTotal Number of Workflow Exec: {WorkflowExecutions}\n\t" +
$"Total Number of Child Workflow Exec: {WorkflowChildExecutions}\n\t" +
$"Total Number of Activity Exec: {WorkflowActivityExecutions}\n\t" +
$"Total Number of Signals: {WorkflowSignals}\n\t" +
$"Total Number of Queries: {WorkflowQueries}";

public override string ToString() =>
ClientInfo() + WorkflowInfo();
}
17 changes: 17 additions & 0 deletions src/CounterInterceptor/MyActivities.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace TemporalioSamples.CounterInterceptor;

using System.Diagnostics;
using Temporalio.Activities;

public class MyActivities
{
[Activity]
public string SayHello(string name, string title) =>
"Hello " + title + " " + name;
Copy link
Member

@cretz cretz Aug 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Hello " + title + " " + name;
$"Hello {title} {name}";

I think interpolation is a bit clearer here and in next activity

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed


[Activity]
public string SayGoodBye(string name, string title)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are inconsistent with your single-line methods, this should be => like the one just above it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

{
return "Goodbye " + title + " " + name;
}
}
17 changes: 17 additions & 0 deletions src/CounterInterceptor/MyChildWorkflow.workflow.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace TemporalioSamples.CounterInterceptor;

using Temporalio.Workflows;

[Workflow]
public class MyChildWorkflow
{
private readonly ActivityOptions activityOptions = new()
{
StartToCloseTimeout = TimeSpan.FromSeconds(10),
};

[WorkflowRun]
public async Task<string> RunAsync(string name, string title) =>
await Workflow.ExecuteActivityAsync((MyActivities act) => act.SayHello(name, title), activityOptions) +
await Workflow.ExecuteActivityAsync((MyActivities act) => act.SayGoodBye(name, title), activityOptions);
}
137 changes: 137 additions & 0 deletions src/CounterInterceptor/MyCounterInterceptor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
namespace TemporalioSamples.CounterInterceptor;

using System.Collections.Concurrent;
using Temporalio.Activities;
using Temporalio.Client;
using Temporalio.Client.Interceptors;
using Temporalio.Worker.Interceptors;
using Temporalio.Workflows;

public class MyCounterInterceptor : IClientInterceptor, IWorkerInterceptor
{
private ConcurrentDictionary<string, Counts> counts = new();

public ConcurrentDictionary<string, Counts> Counts => counts;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private ConcurrentDictionary<string, Counts> counts = new();
public ConcurrentDictionary<string, Counts> Counts => counts;
public ConcurrentDictionary<string, Counts> Counts { get; } = new();

Use auto properties

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed


public string WorkerInfo() =>
string.Join(
"\n",
counts.Select(kvp => $"** Workflow ID: {kvp.Key} {kvp.Value.WorkflowInfo()}"));

public string ClientInfo() =>
string.Join(
"\n",
counts.Select(kvp => $"** Workflow ID: {kvp.Key} {kvp.Value.ClientInfo()}"));

public ClientOutboundInterceptor InterceptClient(ClientOutboundInterceptor nextInterceptor) =>
new ClientOutbound(this, nextInterceptor);

public WorkflowInboundInterceptor InterceptWorkflow(WorkflowInboundInterceptor nextInterceptor) =>
new WorkflowInbound(this, nextInterceptor);

public ActivityInboundInterceptor InterceptActivity(ActivityInboundInterceptor nextInterceptor) =>
new ActivityInbound(this, nextInterceptor);

private sealed class ClientOutbound : ClientOutboundInterceptor
{
private MyCounterInterceptor root;

public ClientOutbound(MyCounterInterceptor root, ClientOutboundInterceptor next)
: base(next) => this.root = root;

public override Task<WorkflowHandle<TWorkflow, TResult>> StartWorkflowAsync<TWorkflow, TResult>(
StartWorkflowInput input)
{
var id = input.Options.Id ?? "None";
// Need to add an empty record if none exists
// we don't care if it doesn't add it as we will
// still increment the current value.
root.counts.TryAdd(id, new Counts());
Copy link
Member

@cretz cretz Aug 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should happen on every call, not just start/execute workflow. The root.counts[id].ClientSignals call below will throw an exception when someone uses this to signal a workflow that wasn't started here. Same for other situations. Consider just having a helper on the root like:

private void Increment(string id, Action<Counts> increment) =>
    increment(counts.GetOrAdd(id, _ => new()));

Or something like that (untested). Then you can just change this to root.Increment(id ?? "None", c => Interlocked.Increment(ref c.ClientExecutions)); I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I debated on whether it needed to be done on every call or not.

I was able to use your example as you had it. Worked the first time.

Thanks!

Interlocked.Increment(ref root.counts[id].ClientExecutions);
return base.StartWorkflowAsync<TWorkflow, TResult>(input);
}

public override Task SignalWorkflowAsync(SignalWorkflowInput input)
{
var id = input.Id ?? "None";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

input.Id can never be null on this call (and some others), so ?? should be removed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Interlocked.Increment(ref root.counts[id].ClientSignals);
return base.SignalWorkflowAsync(input);
}

public override Task<TResult> QueryWorkflowAsync<TResult>(QueryWorkflowInput input)
{
var id = input.Id ?? "None";
Interlocked.Increment(ref root.counts[id].ClientQueries);
return base.QueryWorkflowAsync<TResult>(input);
}
}

private sealed class WorkflowInbound : WorkflowInboundInterceptor
{
private readonly MyCounterInterceptor root;

internal WorkflowInbound(MyCounterInterceptor root, WorkflowInboundInterceptor next)
: base(next) => this.root = root;

public override void Init(WorkflowOutboundInterceptor outbound)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pedantic, but can also be single-method syntax using =>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

{
base.Init(new WorkflowOutbound(root, outbound));
}

public override Task<object?> ExecuteWorkflowAsync(ExecuteWorkflowInput input)
{
var id = Workflow.Info.WorkflowId;
// Need to add an empty record if none exists
// we don't care if it doesn't add it as we will
// still increment the current value.
root.counts.TryAdd(id, new Counts());
Interlocked.Increment(ref root.counts[id].WorkflowExecutions);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note, this is not workflow executions, but workflow replays. You could get 1000 of these for the same workflow. Same for other counts in the workflow inbound/outbound. These will be double counted on every replay. You can consider not doing this on replay, or you can clarify in docs of the fields that they include replay attempts (you probably want the former).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh wow. That's important to call out.

I see that Workflow.Info has an Attempt field, but not sure it makes sense to use that to determine replay, as I think that is used for the number of retry attempts.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is Workflow.Unsafe.IsReplaying you can make sure is false before doing the increment (this is what we do in tracing interceptor for instance). It should be noted though that on task failure, this will run over and over in non-replaying situation, but besides task failure it will work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

return base.ExecuteWorkflowAsync(input);
}

public override Task HandleSignalAsync(HandleSignalInput input)
{
var id = Workflow.Info.WorkflowId;
Interlocked.Increment(ref root.counts[id].WorkflowSignals);
return base.HandleSignalAsync(input);
}

public override object? HandleQuery(HandleQueryInput input)
{
var id = Workflow.Info.WorkflowId;
Interlocked.Increment(ref root.counts[id].WorkflowQueries);
return base.HandleQuery(input);
}
}

private sealed class WorkflowOutbound : WorkflowOutboundInterceptor
{
private readonly MyCounterInterceptor root;

internal WorkflowOutbound(MyCounterInterceptor root, WorkflowOutboundInterceptor next)
: base(next) => this.root = root;

public override Task<ChildWorkflowHandle<TWorkflow, TResult>> StartChildWorkflowAsync<TWorkflow, TResult>(
StartChildWorkflowInput input)
{
var id = Workflow.Info.WorkflowId;
Interlocked.Increment(ref root.counts[id].WorkflowChildExecutions);
return base.StartChildWorkflowAsync<TWorkflow, TResult>(input);
}
}

private sealed class ActivityInbound : ActivityInboundInterceptor
{
private readonly MyCounterInterceptor root;

internal ActivityInbound(MyCounterInterceptor root, ActivityInboundInterceptor next)
: base(next) => this.root = root;

public override Task<object?> ExecuteActivityAsync(ExecuteActivityInput input)
{
var id = ActivityExecutionContext.Current.Info.WorkflowId;
Interlocked.Increment(ref root.counts[id].WorkflowActivityExecutions);
return base.ExecuteActivityAsync(input);
}
}
}
42 changes: 42 additions & 0 deletions src/CounterInterceptor/MyWorkflow.workflow.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
namespace TemporalioSamples.CounterInterceptor;

using Temporalio.Workflows;

[Workflow]
public class MyWorkflow
{
private bool exit; // Automatically defaults to false

[WorkflowRun]
public async Task<string> RunAsync()
{
// Wait for greeting info
await Workflow.WaitConditionAsync(() => Name != null && Title != null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These properties can never be null, so this statement has no value, it will always pass immediately

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump, this was never addressed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump again, this is still not addressed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.


// Execute Child Workflow
var result = await Workflow.ExecuteChildWorkflowAsync(
(MyChildWorkflow wf) => wf.RunAsync(Name, Title),
new() { Id = "counter-interceptor-child" });

// Wait for exit signal
await Workflow.WaitConditionAsync(() => exit);

return result;
}

[WorkflowSignal]
public async Task SignalNameAndTitleAsync(string name, string title)
{
Name = name;
Title = title;
}

[WorkflowQuery]
public string Name { get; private set; } = string.Empty;

[WorkflowQuery]
public string Title { get; private set; } = string.Empty;

[WorkflowSignal]
public async Task ExitAsync() => exit = true;
}
85 changes: 85 additions & 0 deletions src/CounterInterceptor/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
namespace TemporalioSamples.CounterInterceptor;

using Temporalio.Client;
using Temporalio.Worker;

internal class Program
{
private static async Task Main(string[] args)
{
var counterInterceptor = new MyCounterInterceptor();
var client = await TemporalClient.ConnectAsync(
options: new("localhost:7233")
{
Interceptors = new[]
{
counterInterceptor,
},
});

using var tokenSource = new CancellationTokenSource();
Console.CancelKeyPress += (_, eventArgs) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This program is a run then complete, you don't need ctrl+c support as if it was a long running worker. I would recommend just removing cancel token and shutting down the worker manually if you must have the entire program self-contained (i.e. combining client and worker).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump, this was never addressed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump again, this is still not addressed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

{
tokenSource.Cancel();
eventArgs.Cancel = true;
};

var activities = new MyActivities();

var taskQueue = "CounterInterceptorTaskQueue";

var workerOptions = new TemporalWorkerOptions(taskQueue).
AddAllActivities(activities).
AddWorkflow<MyWorkflow>().
AddWorkflow<MyChildWorkflow>();

// workerOptions.Interceptors = new[] { counterInterceptor };
using var worker = new TemporalWorker(
client,
workerOptions);

// Run worker until cancelled
Console.WriteLine("Running worker...");
try
{
// Start the workers
var workerResult = worker.ExecuteAsync(tokenSource.Token);

// Start the workflow
var handle = await client.StartWorkflowAsync(
(MyWorkflow wf) => wf.RunAsync(),
new(id: Guid.NewGuid().ToString(), taskQueue: taskQueue));

Console.WriteLine("Sending name and title to workflow");
await handle.SignalAsync(wf => wf.SignalNameAndTitleAsync("John", "Customer"));

var name = await handle.QueryAsync(wf => wf.Name);
var title = await handle.QueryAsync(wf => wf.Title);

// Send exit signal to workflow
await handle.SignalAsync(wf => wf.ExitAsync());

var result = await handle.GetResultAsync();

Console.WriteLine($"Workflow result is {result}");

Console.WriteLine("Query results: ");
Console.WriteLine($"\tName: {name}");
Console.WriteLine($"\tTitle: {title}");

// Print worker counter info
Console.WriteLine("\nCollected Worker Counter Info:\n");
Console.WriteLine(counterInterceptor.WorkerInfo());
Console.WriteLine($"Number of workers: {counterInterceptor.Counts.Count}");

// Print client counter info
Console.WriteLine();
Console.WriteLine("Collected Client Counter Info:\n");
Console.WriteLine(counterInterceptor.ClientInfo());
}
catch (OperationCanceledException)
{
Console.WriteLine("Worker cancelled");
}
}
}
Loading