Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Counter Interceptor Sample #77

Merged
merged 20 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Prerequisites:
* [AspNet](src/AspNet) - Demonstration of a generic host worker and an ASP.NET workflow starter.
* [ClientMtls](src/ClientMtls) - How to use client certificate authentication, e.g. for Temporal Cloud.
* [ContextPropagation](src/ContextPropagation) - Context propagation via interceptors.
* [CounterInterceptor](src/CounterInterceptor/) - Simple Workflow and Client Interceptors example.
* [DependencyInjection](src/DependencyInjection) - How to inject dependencies in activities and use generic hosts for workers
* [Encryption](src/Encryption) - End-to-end encryption with Temporal payload codecs.
* [Mutex](src/Mutex) - How to implement a mutex as a workflow. Demonstrates how to avoid race conditions or parallel mutually exclusive operations on the same resource.
Expand Down
7 changes: 7 additions & 0 deletions TemporalioSamples.sln
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.WorkflowU
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.ContextPropagation", "src\ContextPropagation\TemporalioSamples.ContextPropagation.csproj", "{7B797D20-485F-441D-8E71-AF7E315FA9CF}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.CounterInterceptor", "src\CounterInterceptor\TemporalioSamples.CounterInterceptor.csproj", "{EB05B946-4DD3-4992-B84B-4438B5846ED5}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -149,6 +151,10 @@ Global
{7B797D20-485F-441D-8E71-AF7E315FA9CF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7B797D20-485F-441D-8E71-AF7E315FA9CF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7B797D20-485F-441D-8E71-AF7E315FA9CF}.Release|Any CPU.Build.0 = Release|Any CPU
{EB05B946-4DD3-4992-B84B-4438B5846ED5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{EB05B946-4DD3-4992-B84B-4438B5846ED5}.Debug|Any CPU.Build.0 = Debug|Any CPU
{EB05B946-4DD3-4992-B84B-4438B5846ED5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{EB05B946-4DD3-4992-B84B-4438B5846ED5}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -178,5 +184,6 @@ Global
{B79F07F7-3429-4C58-84C3-08587F748B2D} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
{B3DB7B8C-7BD3-4A53-A809-AB6279B1A630} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
{7B797D20-485F-441D-8E71-AF7E315FA9CF} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
{EB05B946-4DD3-4992-B84B-4438B5846ED5} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
EndGlobalSection
EndGlobal
19 changes: 19 additions & 0 deletions src/CounterInterceptor/MyActivities.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
namespace TemporalioSamples.CounterInterceptor;

using System.Diagnostics;
using Temporalio.Activities;

public class MyActivities
{
[Activity]
public string SayHello(string name, string title)
{
return "Hello " + title + " " + name;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public string SayHello(string name, string title)
{
return "Hello " + title + " " + name;
}
public string SayHello(string name, string title) => $"Hello {title} {name}";

Use string interpolation and single-statement syntax (will stop commenting on this, but it applies project wide)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump, this was not addressed


[Activity]
public string SayGoodBye(string name, string title)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are inconsistent with your single-line methods, this should be => like the one just above it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

{
return "Goodbye " + title + " " + name;
}
}
17 changes: 17 additions & 0 deletions src/CounterInterceptor/MyChildWorkflow.workflow.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace TemporalioSamples.CounterInterceptor;

using Temporalio.Workflows;

[Workflow]
public class MyChildWorkflow
{
private readonly ActivityOptions activityOptions = new()
{
StartToCloseTimeout = TimeSpan.FromSeconds(10),
};

[WorkflowRun]
public async Task<string> RunAsync(string name, string title) =>
await Workflow.ExecuteActivityAsync((MyActivities act) => act.SayHello(name, title), activityOptions) +
await Workflow.ExecuteActivityAsync((MyActivities act) => act.SayGoodBye(name, title), activityOptions);
}
42 changes: 42 additions & 0 deletions src/CounterInterceptor/MyWorkflow.workflow.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
namespace TemporalioSamples.CounterInterceptor;

using Temporalio.Workflows;

[Workflow]
public class MyWorkflow
{
private bool exit; // Automatically defaults to false

[WorkflowRun]
public async Task<string> RunAsync()
{
// Wait for greeting info
await Workflow.WaitConditionAsync(() => Name != null && Title != null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These properties can never be null, so this statement has no value, it will always pass immediately

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump, this was never addressed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump again, this is still not addressed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.


// Execute Child Workflow
var result = await Workflow.ExecuteChildWorkflowAsync(
(MyChildWorkflow wf) => wf.RunAsync(Name, Title),
new() { Id = "counter-interceptor-child" });

// Wait for exit signal
await Workflow.WaitConditionAsync(() => exit);

return result;
}

[WorkflowSignal]
public async Task SignalNameAndTitleAsync(string name, string title)
{
Name = name;
Title = title;
}

[WorkflowQuery]
public string Name { get; private set; } = string.Empty;

[WorkflowQuery]
public string Title { get; private set; } = string.Empty;

[WorkflowSignal]
public async Task ExitAsync() => exit = true;
}
86 changes: 86 additions & 0 deletions src/CounterInterceptor/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
namespace TemporalioSamples.CounterInterceptor;

using Temporalio.Client;
using Temporalio.Worker;

internal class Program
{
private static async Task Main(string[] args)
{
var clientInterceptor = new SimpleClientCallsInterceptor();
var client = await TemporalClient.ConnectAsync(
options: new("localhost:7233")
{
Interceptors = new[]
{
clientInterceptor,
},
});

using var tokenSource = new CancellationTokenSource();
Console.CancelKeyPress += (_, eventArgs) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This program is a run then complete, you don't need ctrl+c support as if it was a long running worker. I would recommend just removing cancel token and shutting down the worker manually if you must have the entire program self-contained (i.e. combining client and worker).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump, this was never addressed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump again, this is still not addressed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

{
tokenSource.Cancel();
eventArgs.Cancel = true;
};

var activities = new MyActivities();

var taskQueue = "CounterInterceptorTaskQueue";

var workerOptions = new TemporalWorkerOptions(taskQueue).
AddAllActivities(activities).
AddWorkflow<MyWorkflow>().
AddWorkflow<MyChildWorkflow>();

var workerInterceptor = new SimpleCounterWorkerInterceptor();
workerOptions.Interceptors = new[] { workerInterceptor };

using var worker = new TemporalWorker(
client,
workerOptions);

// Run worker until cancelled
Console.WriteLine("Running worker...");
try
{
// Start the workers
var workerResult = worker.ExecuteAsync(tokenSource.Token);

// Start the workflow
var handle = await client.StartWorkflowAsync(
(MyWorkflow wf) => wf.RunAsync(),
new(id: Guid.NewGuid().ToString(), taskQueue: taskQueue));

Console.WriteLine("Sending name and title to workflow");
await handle.SignalAsync(wf => wf.SignalNameAndTitleAsync("John", "Customer"));

var name = await handle.QueryAsync(wf => wf.Name);
var title = await handle.QueryAsync(wf => wf.Title);

// Send exit signal to workflow
await handle.SignalAsync(wf => wf.ExitAsync());

var result = await handle.GetResultAsync();

Console.WriteLine($"Workflow result is {result}", result);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent with whether you use string concatenation or string interpolation

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Console.WriteLine($"Workflow result is {result}", result);
Console.WriteLine($"Workflow result is {result}");

or

Suggested change
Console.WriteLine($"Workflow result is {result}", result);
Console.WriteLine($"Workflow result is {0}", result);

But not both. This mistake is made in other lines below. Probably the former approach is best.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump this was never addressed. If you are using interpolation, you don't also pass as a param.


Console.WriteLine("Query results: ");
Console.WriteLine($"\tName: {name}", name);
Console.WriteLine($"\tTitle: {title}", title);

// Print worker counter info
Console.WriteLine("Collected Worker Counter Info: ");
Console.WriteLine(workerInterceptor.Info());

// Print client counter info
Console.WriteLine();
Console.WriteLine("Collected Client Counter Info:");
Console.WriteLine(clientInterceptor.Info());
}
catch (OperationCanceledException)
{
Console.WriteLine("Worker cancelled");
}
}
}
28 changes: 28 additions & 0 deletions src/CounterInterceptor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# dotnet-counter-interceptor
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really the best title, but that's ok

The sample demonstrates:
- the use of a Worker Workflow Interceptor that counts the number of Workflow Executions, Child Workflow Executions, and Activity Executions and the number of Signals and Queries. It is based
off of the [Java sample](https://github.com/temporalio/samples-java/tree/main) located [here](https://github.com/temporalio/samples-java/tree/main/core/src/main/java/io/temporal/samples/countinterceptor)
- the use of a Client Workflow Interceptor that counts the number of Workflow Executions and the number of Signals and Queries.

## Start local Temporal Server
```bash
# run only once
temporal server start-dev
```
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
## Start local Temporal Server
```bash
# run only once
temporal server start-dev
```
To run, first see [README.md](https://github.com/temporalio/samples-dotnet/blob/main/README.md) for prerequisites

We like to keep the starting of the local server in one place since it can change and there are multiple options. This can be seen in other samples. (in fact, we will make these run easy on cloud too soon, so we don't want to have to come back and update these)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump, this was never addressed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump again, this is still not addressed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finally addressed :)


## Run Worker Locally
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more than the worker, it's the worker and the client calls mixed into one. Many samples split these, but don't have to here, but it's not just "worker" as the title suggests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump, this was never addressed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump again, this is still not addressed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

```bash
# make sure you have temporal server running (see section above)
dotnet run
```

## Run Worker using Temporal Cloud
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this section applies anymore

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump, this was never addressed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deleted.

```bash
# set up environment variables
export TEMPORAL_NAMESPACE=<namespace>.<accountId>
export TEMPORAL_ADDRESS=<namespace>.<accountId>.tmprl.cloud:7233
export TEMPORAL_TLS_CERT=/path/to/cert
export TEMPORAL_TLS_KEY=/path/to/key
# run the worker
dotnet run
```
97 changes: 97 additions & 0 deletions src/CounterInterceptor/SimpleClientCallsInterceptor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
namespace TemporalioSamples.CounterInterceptor;

using Temporalio.Client;
using Temporalio.Client.Interceptors;

public record ClientCounts
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should only have top-level types named the same as the file in A non-Program.cs C# file.

{
public uint Executions { get; internal set; }

public uint Signals { get; internal set; }

public uint Queries { get; internal set; }

public override string ToString() =>
$"\n\tTotal Number of Workflow Exec: {Executions}\n\t" +
$"Total Number of Signals: {Signals}\n\t" +
$"Total Number of Queries: {Queries}";
}

public class SimpleClientCallsInterceptor : IClientInterceptor
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO for simplicity you should make one interceptor that implements both IClientInterceptor and IWorkerInterceptor (and you only then need to configure the client not the worker interceptor separately). You could also make just one counts object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

{
private const string NumberOfWorkflowExecutions = "numOfWorkflowExec";
private const string NumberOfSignals = "numOfSignals";
private const string NumberOfQueries = "numOfQueries";
private static Dictionary<string, ClientCounts> clientDictionary = new();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be static and it should be readonly and you do not have to suffix "Dictionary", just call it counts or something

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


public ClientOutboundInterceptor InterceptClient(ClientOutboundInterceptor nextInterceptor) =>
new ClientOutbound(this, nextInterceptor);

public string Info() =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be easier if you just let the caller access the dictionary and do what they want with it instead of all of these separate methods to read/write it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

string.Join(
"\n",
clientDictionary.Select(kvp => $"** Workflow ID: {kvp.Key} {kvp.Value}"));

public uint NumOfWorkflowExecutions(string workflowId) =>
clientDictionary[workflowId].Executions;

public uint NumOfSignals(string workflowId) =>
clientDictionary[workflowId].Signals;

public uint NumOfQueries(string workflowId) =>
clientDictionary[workflowId].Queries;

private void Add(string workflowId, string type)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not thread safe. What if multiple threads for the same workflow ID come in at the same time, there is a case where one dictionary value may get overwritten with another. Consider a ConcurrentDictionary.

{
if (!clientDictionary.TryGetValue(workflowId, out ClientCounts? value))
{
value = new ClientCounts();
clientDictionary.Add(workflowId, value);
}

switch (type)
{
case NumberOfWorkflowExecutions:
value.Executions++;
break;
case NumberOfQueries:
value.Queries++;
break;
case NumberOfSignals:
value.Signals++;
break;
default:
throw new NotImplementedException("Unknown type: " + type);
}
}

private class ClientOutbound : ClientOutboundInterceptor
{
private SimpleClientCallsInterceptor root;

public ClientOutbound(SimpleClientCallsInterceptor root, ClientOutboundInterceptor next)
: base(next) => this.root = root;

public override Task<WorkflowHandle<TWorkflow, TResult>> StartWorkflowAsync<TWorkflow, TResult>(
StartWorkflowInput input)
{
var id = input.Options.Id ?? "None";
root.Add(id, NumberOfWorkflowExecutions);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for these string constants, can change this to something like:

Suggested change
root.Add(id, NumberOfWorkflowExecutions);
Interlocked.Increment(ref root.ClientCounts(id).Executions);

Or

Suggested change
root.Add(id, NumberOfWorkflowExecutions);
root.UpdateClientCounts(id, counts => counts.Executions++);

Depending on how you want to do the thread safety

return base.StartWorkflowAsync<TWorkflow, TResult>(input);
}

public override Task SignalWorkflowAsync(SignalWorkflowInput input)
{
var id = input.Id ?? "None";
root.Add(id, NumberOfSignals);
return base.SignalWorkflowAsync(input);
}

public override Task<TResult> QueryWorkflowAsync<TResult>(QueryWorkflowInput input)
{
var id = input.Id ?? "None";
root.Add(id, NumberOfQueries);
return base.QueryWorkflowAsync<TResult>(input);
}
}
}
Loading