Skip to content

Commit

Permalink
Merge pull request #63 from conductor-sdk/add-worker-attribute
Browse files Browse the repository at this point in the history
Added worker attribute
  • Loading branch information
gardusig authored May 8, 2023
2 parents b5ec858 + 7b02900 commit 31ff28c
Show file tree
Hide file tree
Showing 14 changed files with 312 additions and 194 deletions.
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
using Conductor.Api;
using Conductor.Client;
using Conductor.Executor;
using Conductor.Client.Authentication;
using System;
using System.Diagnostics;

namespace Tests.Util
namespace Conductor.Client.Extensions
{
public class ApiUtil
public class ApiExtensions
{
private const string ENV_ROOT_URI = "CONDUCTOR_SERVER_URL";
private const string ENV_KEY_ID = "KEY";
private const string ENV_SECRET = "SECRET";

private static Configuration _configuration = null;

static ApiUtil()
static ApiExtensions()
{
_configuration = new Configuration()
{
Timeout = 7500,
Timeout = 30 * 1000,
BasePath = GetEnvironmentVariable(ENV_ROOT_URI),
AuthenticationSettings = new OrkesAuthenticationSettings(
GetEnvironmentVariable(ENV_KEY_ID),
GetEnvironmentVariable(ENV_SECRET))
GetEnvironmentVariable(ENV_SECRET)
)
};
}

Expand Down
91 changes: 91 additions & 0 deletions Conductor/Client/Extensions/WorkflowExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
using Conductor.Api;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System;
using System.Threading.Tasks;

namespace Conductor.Client.Extensions
{
public class WorkflowExtensions
{
private static int RETRY_ATTEMPT_LIMIT = 3;

public static async Task<ConcurrentBag<string>> StartWorkflows(WorkflowResourceApi workflowClient, Models.StartWorkflowRequest startWorkflowRequest, int maxAllowedInParallel, int total)
{
var workflowIds = new ConcurrentBag<string>();
await StartWorkflowBatch(workflowClient, startWorkflowRequest, total % maxAllowedInParallel, workflowIds);
for (int i = 1; i * maxAllowedInParallel <= total; i += 1)
{
await StartWorkflowBatch(workflowClient, startWorkflowRequest, maxAllowedInParallel, workflowIds);
}
Console.WriteLine($"Started {workflowIds.Count} workflows");
return workflowIds;
}

public static async Task<ConcurrentBag<Models.WorkflowStatus>> GetWorkflowStatusList(WorkflowResourceApi workflowClient, int maxAllowedInParallel, params string[] workflowIds)
{
var workflowStatusList = new ConcurrentBag<Models.WorkflowStatus>();
for (int index = 0; index < workflowIds.Length; index += maxAllowedInParallel)
{
await GetWorkflowStatusBatch(workflowClient, workflowStatusList, index, index + maxAllowedInParallel, workflowIds);
}
Console.WriteLine($"Got ${workflowStatusList.Count} workflow statuses");
return workflowStatusList;
}

private static async Task GetWorkflowStatusBatch(WorkflowResourceApi workflowClient, ConcurrentBag<Models.WorkflowStatus> workflowStatusList, int startIndex, int finishIndex, params string[] workflowIds)
{
var threads = new List<Task>();
for (int index = Math.Max(0, startIndex); index < Math.Min(workflowIds.Length, finishIndex); index += 1)
{
var workflowId = workflowIds[index];
threads.Add(Task.Run(() => GetWorkflowStatus(workflowClient, workflowStatusList, workflowId)));
}
await Task.WhenAll(threads);
}

private static async Task StartWorkflowBatch(WorkflowResourceApi workflowClient, Models.StartWorkflowRequest startWorkflowRequest, int quantity, ConcurrentBag<string> workflowIds)
{
var threads = new List<Task>();
for (int counter = 0; counter < quantity; counter += 1)
{
threads.Add(Task.Run(() => StartWorkflow(workflowClient, startWorkflowRequest, workflowIds)));
}
await Task.WhenAll(threads);
}

private static void GetWorkflowStatus(WorkflowResourceApi workflowClient, ConcurrentBag<Models.WorkflowStatus> workflowStatusList, string workflowId)
{
for (int attempt = 0; attempt < RETRY_ATTEMPT_LIMIT; attempt += 1)
{
try
{
workflowStatusList.Add(workflowClient.GetWorkflowStatusSummary(workflowId));
return;
}
catch (ApiException e)
{
Console.WriteLine($"Failed to get workflow status, reason: {e}");
System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1 << attempt));
}
}
}

private static void StartWorkflow(WorkflowResourceApi workflowClient, Models.StartWorkflowRequest startWorkflowRequest, ConcurrentBag<string> workflowIds)
{
for (int attempt = 0; attempt < RETRY_ATTEMPT_LIMIT; attempt += 1)
{
try
{
workflowIds.Add(workflowClient.StartWorkflow(startWorkflowRequest));
return;
}
catch (ApiException e)
{
Console.WriteLine($"Failed to start workflow, reason: {e}");
System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1 << attempt));
}
}
}
}
}
29 changes: 29 additions & 0 deletions Conductor/Client/Worker/GenericWorker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using Conductor.Client.Interfaces;
using Conductor.Client.Models;
using System.Reflection;

namespace Conductor.Client.Worker
{
public class GenericWorker : IWorkflowTask
{
public string TaskType { get; }
public WorkflowTaskExecutorConfiguration WorkerSettings { get; }

private readonly object _workerInstance;
private readonly MethodInfo _executeTaskMethod;

public GenericWorker(string taskType, WorkflowTaskExecutorConfiguration workerSettings, MethodInfo executeTaskMethod, object workerInstance = null)
{
TaskType = taskType;
WorkerSettings = workerSettings;
_executeTaskMethod = executeTaskMethod;
_workerInstance = workerInstance;
}

public TaskResult Execute(Task task)
{
var taskResult = _executeTaskMethod.Invoke(_workerInstance, new object[] { task });
return (TaskResult)taskResult;
}
}
}
28 changes: 28 additions & 0 deletions Conductor/Client/Worker/WorkerTask.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System;

namespace Conductor.Client.Worker
{
[AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)]
public class WorkerTask : Attribute
{
public string TaskType { get; }
public WorkflowTaskExecutorConfiguration WorkerSettings { get; set; }

public WorkerTask()
{
WorkerSettings = new WorkflowTaskExecutorConfiguration();
}

public WorkerTask(string taskType, int batchSize, string domain, int pollIntervalMs, string workerId)
{
TaskType = taskType;
WorkerSettings = new WorkflowTaskExecutorConfiguration
{
BatchSize = batchSize,
Domain = domain,
PollInterval = TimeSpan.FromMilliseconds(pollIntervalMs),
WorkerId = workerId,
};
}
}
}
39 changes: 37 additions & 2 deletions Conductor/Client/Worker/WorkflowTaskCoordinator.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
using Conductor.Client.Interfaces;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Threading.Tasks;

namespace Conductor.Client.Worker
{
internal class WorkflowTaskCoordinator : IWorkflowTaskCoordinator
{
private readonly ILogger<WorkflowTaskCoordinator> _logger;

private readonly ILogger<WorkflowTaskExecutor> _loggerWorkflowTaskExecutor;
private readonly ILogger<WorkflowTaskMonitor> _loggerWorkflowTaskMonitor;
private readonly HashSet<IWorkflowTaskExecutor> _workers;
Expand All @@ -26,6 +27,7 @@ public WorkflowTaskCoordinator(IWorkflowTaskClient client, ILogger<WorkflowTaskC
public async Task Start()
{
_logger.LogDebug("Starting workers...");
DiscoverWorkers();
var runningWorkers = new List<Task>();
foreach (var worker in _workers)
{
Expand All @@ -43,10 +45,43 @@ public void RegisterWorker(IWorkflowTask worker)
_loggerWorkflowTaskExecutor,
_client,
worker,
worker.WorkerSettings,
workflowTaskMonitor
);
_workers.Add(workflowTaskExecutor);
}

private void DiscoverWorkers()
{
foreach (var assembly in AppDomain.CurrentDomain.GetAssemblies())
{
foreach (var type in assembly.GetTypes())
{
if (type.GetCustomAttribute<WorkerTask>() == null)
{
continue;
}
foreach (var method in type.GetMethods())
{
var workerTask = method.GetCustomAttribute<WorkerTask>();
if (workerTask == null)
{
continue;
}
object workerInstance = null;
if (!method.IsStatic)
{
workerInstance = Activator.CreateInstance(type);
}
var worker = new GenericWorker(
workerTask.TaskType,
workerTask.WorkerSettings,
method,
workerInstance
);
RegisterWorker(worker);
}
}
}
}
}
}
38 changes: 30 additions & 8 deletions Conductor/Client/Worker/WorkflowTaskExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,19 @@ internal class WorkflowTaskExecutor : IWorkflowTaskExecutor
private readonly WorkflowTaskExecutorConfiguration _workerSettings;
private readonly WorkflowTaskMonitor _workflowTaskMonitor;

public WorkflowTaskExecutor(
ILogger<WorkflowTaskExecutor> logger,
IWorkflowTaskClient client,
IWorkflowTask worker,
WorkflowTaskMonitor workflowTaskMonitor)
{
_logger = logger;
_taskClient = client;
_worker = worker;
_workerSettings = worker.WorkerSettings;
_workflowTaskMonitor = workflowTaskMonitor;
}

public WorkflowTaskExecutor(
ILogger<WorkflowTaskExecutor> logger,
IWorkflowTaskClient client,
Expand All @@ -27,7 +40,6 @@ public WorkflowTaskExecutor(
_logger = logger;
_taskClient = client;
_worker = worker;
_workerSettings = workflowTaskConfiguration;
_workflowTaskMonitor = workflowTaskMonitor;
}

Expand Down Expand Up @@ -130,18 +142,28 @@ private void ProcessTask(Models.Task task)
var taskResult = _worker.Execute(task);
taskResult.WorkerId = _workerSettings.WorkerId;
UpdateTask(taskResult);
_logger.LogTrace(
$"[{_workerSettings.WorkerId}] Done processing task for worker"
+ $", taskType: {_worker.TaskType}"
+ $", domain: {_workerSettings.Domain}"
+ $", taskId: {task.TaskId}"
+ $", workflowId: {task.WorkflowInstanceId}"
);
}
catch (Exception e)
{
_logger.LogDebug(
$"[{_workerSettings.WorkerId}] Failed to process task for worker, reason: {e.Message}"
+ $", taskType: {_worker.TaskType}"
+ $", domain: {_workerSettings.Domain}"
+ $", taskId: {task.TaskId}"
+ $", workflowId: {task.WorkflowInstanceId}"
);
}
finally
{
_workflowTaskMonitor.RunningWorkerDone();
}
_logger.LogTrace(
$"[{_workerSettings.WorkerId}] Done processing task for worker"
+ $", taskType: {_worker.TaskType}"
+ $", domain: {_workerSettings.Domain}"
+ $", taskId: {task.TaskId}"
+ $", workflowId: {task.WorkflowInstanceId}"
);
}

private void UpdateTask(Models.TaskResult taskResult)
Expand Down
47 changes: 41 additions & 6 deletions Conductor/Client/Worker/WorkflowTaskHost.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,48 @@
using Conductor.Client.Extensions;
using Conductor.Client.Interfaces;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;

namespace Conductor.Client.Worker
namespace Conductor.Client.Extensions
{
public static class WorkflowTaskHost
public class WorkflowTaskHost
{
public static IHost CreateWorkerHost<T>(Configuration configuration, params T[] workers) where T : IWorkflowTask
private static Dictionary<LogLevel, IHost> _hostByLogLevel;

static WorkflowTaskHost()
{
_hostByLogLevel = new Dictionary<LogLevel, IHost>();
}

public static IHost GetWorkerHost(LogLevel logLevel = LogLevel.Information)
{
if (!_hostByLogLevel.ContainsKey(logLevel))
{
var host = CreateWorkerHost(ApiExtensions.GetConfiguration(), logLevel);
_hostByLogLevel[logLevel] = host;
}
return _hostByLogLevel[logLevel];
}

public static IHost CreateWorkerHost(Configuration configuration, LogLevel logLevel = LogLevel.Information)
{
return new HostBuilder()
.ConfigureServices(
(ctx, services) =>
{
services.AddConductorWorker(configuration);
services.WithHostedService();
}
).ConfigureLogging(
logging =>
{
logging.SetMinimumLevel(logLevel);
logging.AddConsole();
}
).Build();
}

public static IHost CreateWorkerHost(Configuration configuration, LogLevel logLevel = LogLevel.Information, params IWorkflowTask[] workers)
{
return new HostBuilder()
.ConfigureServices(
Expand All @@ -23,10 +58,10 @@ public static IHost CreateWorkerHost<T>(Configuration configuration, params T[]
).ConfigureLogging(
logging =>
{
logging.SetMinimumLevel(LogLevel.Debug);
logging.SetMinimumLevel(logLevel);
logging.AddConsole();
}
).Build();
}
}
}
}
Loading

0 comments on commit 31ff28c

Please sign in to comment.