diff --git a/Tests/Util/ApiUtil.cs b/Conductor/Client/Extensions/ApiExtensions.cs similarity index 87% rename from Tests/Util/ApiUtil.cs rename to Conductor/Client/Extensions/ApiExtensions.cs index dd8c8139..5e162d6b 100644 --- a/Tests/Util/ApiUtil.cs +++ b/Conductor/Client/Extensions/ApiExtensions.cs @@ -1,13 +1,12 @@ using Conductor.Api; -using Conductor.Client; using Conductor.Executor; using Conductor.Client.Authentication; using System; using System.Diagnostics; -namespace Tests.Util +namespace Conductor.Client.Extensions { - public class ApiUtil + public class ApiExtensions { private const string ENV_ROOT_URI = "CONDUCTOR_SERVER_URL"; private const string ENV_KEY_ID = "KEY"; @@ -15,15 +14,16 @@ public class ApiUtil private static Configuration _configuration = null; - static ApiUtil() + static ApiExtensions() { _configuration = new Configuration() { - Timeout = 7500, + Timeout = 30 * 1000, BasePath = GetEnvironmentVariable(ENV_ROOT_URI), AuthenticationSettings = new OrkesAuthenticationSettings( GetEnvironmentVariable(ENV_KEY_ID), - GetEnvironmentVariable(ENV_SECRET)) + GetEnvironmentVariable(ENV_SECRET) + ) }; } diff --git a/Conductor/Client/Extensions/WorkflowExtensions.cs b/Conductor/Client/Extensions/WorkflowExtensions.cs new file mode 100644 index 00000000..e9f3b708 --- /dev/null +++ b/Conductor/Client/Extensions/WorkflowExtensions.cs @@ -0,0 +1,91 @@ +using Conductor.Api; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System; +using System.Threading.Tasks; + +namespace Conductor.Client.Extensions +{ + public class WorkflowExtensions + { + private static int RETRY_ATTEMPT_LIMIT = 3; + + public static async Task> StartWorkflows(WorkflowResourceApi workflowClient, Models.StartWorkflowRequest startWorkflowRequest, int maxAllowedInParallel, int total) + { + var workflowIds = new ConcurrentBag(); + await StartWorkflowBatch(workflowClient, startWorkflowRequest, total % maxAllowedInParallel, workflowIds); + for (int i = 1; i * maxAllowedInParallel <= total; i += 1) + { + await StartWorkflowBatch(workflowClient, startWorkflowRequest, maxAllowedInParallel, workflowIds); + } + Console.WriteLine($"Started {workflowIds.Count} workflows"); + return workflowIds; + } + + public static async Task> GetWorkflowStatusList(WorkflowResourceApi workflowClient, int maxAllowedInParallel, params string[] workflowIds) + { + var workflowStatusList = new ConcurrentBag(); + for (int index = 0; index < workflowIds.Length; index += maxAllowedInParallel) + { + await GetWorkflowStatusBatch(workflowClient, workflowStatusList, index, index + maxAllowedInParallel, workflowIds); + } + Console.WriteLine($"Got ${workflowStatusList.Count} workflow statuses"); + return workflowStatusList; + } + + private static async Task GetWorkflowStatusBatch(WorkflowResourceApi workflowClient, ConcurrentBag workflowStatusList, int startIndex, int finishIndex, params string[] workflowIds) + { + var threads = new List(); + for (int index = Math.Max(0, startIndex); index < Math.Min(workflowIds.Length, finishIndex); index += 1) + { + var workflowId = workflowIds[index]; + threads.Add(Task.Run(() => GetWorkflowStatus(workflowClient, workflowStatusList, workflowId))); + } + await Task.WhenAll(threads); + } + + private static async Task StartWorkflowBatch(WorkflowResourceApi workflowClient, Models.StartWorkflowRequest startWorkflowRequest, int quantity, ConcurrentBag workflowIds) + { + var threads = new List(); + for (int counter = 0; counter < quantity; counter += 1) + { + threads.Add(Task.Run(() => StartWorkflow(workflowClient, startWorkflowRequest, workflowIds))); + } + await Task.WhenAll(threads); + } + + private static void GetWorkflowStatus(WorkflowResourceApi workflowClient, ConcurrentBag workflowStatusList, string workflowId) + { + for (int attempt = 0; attempt < RETRY_ATTEMPT_LIMIT; attempt += 1) + { + try + { + workflowStatusList.Add(workflowClient.GetWorkflowStatusSummary(workflowId)); + return; + } + catch (ApiException e) + { + Console.WriteLine($"Failed to get workflow status, reason: {e}"); + System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1 << attempt)); + } + } + } + + private static void StartWorkflow(WorkflowResourceApi workflowClient, Models.StartWorkflowRequest startWorkflowRequest, ConcurrentBag workflowIds) + { + for (int attempt = 0; attempt < RETRY_ATTEMPT_LIMIT; attempt += 1) + { + try + { + workflowIds.Add(workflowClient.StartWorkflow(startWorkflowRequest)); + return; + } + catch (ApiException e) + { + Console.WriteLine($"Failed to start workflow, reason: {e}"); + System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1 << attempt)); + } + } + } + } +} \ No newline at end of file diff --git a/Conductor/Client/Worker/GenericWorker.cs b/Conductor/Client/Worker/GenericWorker.cs new file mode 100644 index 00000000..a5e11074 --- /dev/null +++ b/Conductor/Client/Worker/GenericWorker.cs @@ -0,0 +1,29 @@ +using Conductor.Client.Interfaces; +using Conductor.Client.Models; +using System.Reflection; + +namespace Conductor.Client.Worker +{ + public class GenericWorker : IWorkflowTask + { + public string TaskType { get; } + public WorkflowTaskExecutorConfiguration WorkerSettings { get; } + + private readonly object _workerInstance; + private readonly MethodInfo _executeTaskMethod; + + public GenericWorker(string taskType, WorkflowTaskExecutorConfiguration workerSettings, MethodInfo executeTaskMethod, object workerInstance = null) + { + TaskType = taskType; + WorkerSettings = workerSettings; + _executeTaskMethod = executeTaskMethod; + _workerInstance = workerInstance; + } + + public TaskResult Execute(Task task) + { + var taskResult = _executeTaskMethod.Invoke(_workerInstance, new object[] { task }); + return (TaskResult)taskResult; + } + } +} diff --git a/Conductor/Client/Worker/WorkerTask.cs b/Conductor/Client/Worker/WorkerTask.cs new file mode 100644 index 00000000..4f0a6040 --- /dev/null +++ b/Conductor/Client/Worker/WorkerTask.cs @@ -0,0 +1,28 @@ +using System; + +namespace Conductor.Client.Worker +{ + [AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)] + public class WorkerTask : Attribute + { + public string TaskType { get; } + public WorkflowTaskExecutorConfiguration WorkerSettings { get; set; } + + public WorkerTask() + { + WorkerSettings = new WorkflowTaskExecutorConfiguration(); + } + + public WorkerTask(string taskType, int batchSize, string domain, int pollIntervalMs, string workerId) + { + TaskType = taskType; + WorkerSettings = new WorkflowTaskExecutorConfiguration + { + BatchSize = batchSize, + Domain = domain, + PollInterval = TimeSpan.FromMilliseconds(pollIntervalMs), + WorkerId = workerId, + }; + } + } +} diff --git a/Conductor/Client/Worker/WorkflowTaskCoordinator.cs b/Conductor/Client/Worker/WorkflowTaskCoordinator.cs index e31d0c08..93500684 100644 --- a/Conductor/Client/Worker/WorkflowTaskCoordinator.cs +++ b/Conductor/Client/Worker/WorkflowTaskCoordinator.cs @@ -1,6 +1,8 @@ using Conductor.Client.Interfaces; using Microsoft.Extensions.Logging; +using System; using System.Collections.Generic; +using System.Reflection; using System.Threading.Tasks; namespace Conductor.Client.Worker @@ -8,7 +10,6 @@ namespace Conductor.Client.Worker internal class WorkflowTaskCoordinator : IWorkflowTaskCoordinator { private readonly ILogger _logger; - private readonly ILogger _loggerWorkflowTaskExecutor; private readonly ILogger _loggerWorkflowTaskMonitor; private readonly HashSet _workers; @@ -26,6 +27,7 @@ public WorkflowTaskCoordinator(IWorkflowTaskClient client, ILogger(); foreach (var worker in _workers) { @@ -43,10 +45,43 @@ public void RegisterWorker(IWorkflowTask worker) _loggerWorkflowTaskExecutor, _client, worker, - worker.WorkerSettings, workflowTaskMonitor ); _workers.Add(workflowTaskExecutor); } + + private void DiscoverWorkers() + { + foreach (var assembly in AppDomain.CurrentDomain.GetAssemblies()) + { + foreach (var type in assembly.GetTypes()) + { + if (type.GetCustomAttribute() == null) + { + continue; + } + foreach (var method in type.GetMethods()) + { + var workerTask = method.GetCustomAttribute(); + if (workerTask == null) + { + continue; + } + object workerInstance = null; + if (!method.IsStatic) + { + workerInstance = Activator.CreateInstance(type); + } + var worker = new GenericWorker( + workerTask.TaskType, + workerTask.WorkerSettings, + method, + workerInstance + ); + RegisterWorker(worker); + } + } + } + } } } diff --git a/Conductor/Client/Worker/WorkflowTaskExecutor.cs b/Conductor/Client/Worker/WorkflowTaskExecutor.cs index 6b70df83..a51dc4d8 100644 --- a/Conductor/Client/Worker/WorkflowTaskExecutor.cs +++ b/Conductor/Client/Worker/WorkflowTaskExecutor.cs @@ -17,6 +17,19 @@ internal class WorkflowTaskExecutor : IWorkflowTaskExecutor private readonly WorkflowTaskExecutorConfiguration _workerSettings; private readonly WorkflowTaskMonitor _workflowTaskMonitor; + public WorkflowTaskExecutor( + ILogger logger, + IWorkflowTaskClient client, + IWorkflowTask worker, + WorkflowTaskMonitor workflowTaskMonitor) + { + _logger = logger; + _taskClient = client; + _worker = worker; + _workerSettings = worker.WorkerSettings; + _workflowTaskMonitor = workflowTaskMonitor; + } + public WorkflowTaskExecutor( ILogger logger, IWorkflowTaskClient client, @@ -27,7 +40,6 @@ public WorkflowTaskExecutor( _logger = logger; _taskClient = client; _worker = worker; - _workerSettings = workflowTaskConfiguration; _workflowTaskMonitor = workflowTaskMonitor; } @@ -130,18 +142,28 @@ private void ProcessTask(Models.Task task) var taskResult = _worker.Execute(task); taskResult.WorkerId = _workerSettings.WorkerId; UpdateTask(taskResult); + _logger.LogTrace( + $"[{_workerSettings.WorkerId}] Done processing task for worker" + + $", taskType: {_worker.TaskType}" + + $", domain: {_workerSettings.Domain}" + + $", taskId: {task.TaskId}" + + $", workflowId: {task.WorkflowInstanceId}" + ); + } + catch (Exception e) + { + _logger.LogDebug( + $"[{_workerSettings.WorkerId}] Failed to process task for worker, reason: {e.Message}" + + $", taskType: {_worker.TaskType}" + + $", domain: {_workerSettings.Domain}" + + $", taskId: {task.TaskId}" + + $", workflowId: {task.WorkflowInstanceId}" + ); } finally { _workflowTaskMonitor.RunningWorkerDone(); } - _logger.LogTrace( - $"[{_workerSettings.WorkerId}] Done processing task for worker" - + $", taskType: {_worker.TaskType}" - + $", domain: {_workerSettings.Domain}" - + $", taskId: {task.TaskId}" - + $", workflowId: {task.WorkflowInstanceId}" - ); } private void UpdateTask(Models.TaskResult taskResult) diff --git a/Conductor/Client/Worker/WorkflowTaskHost.cs b/Conductor/Client/Worker/WorkflowTaskHost.cs index 8c230cc3..6fdb393a 100644 --- a/Conductor/Client/Worker/WorkflowTaskHost.cs +++ b/Conductor/Client/Worker/WorkflowTaskHost.cs @@ -1,13 +1,48 @@ -using Conductor.Client.Extensions; using Conductor.Client.Interfaces; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; +using System.Collections.Generic; -namespace Conductor.Client.Worker +namespace Conductor.Client.Extensions { - public static class WorkflowTaskHost + public class WorkflowTaskHost { - public static IHost CreateWorkerHost(Configuration configuration, params T[] workers) where T : IWorkflowTask + private static Dictionary _hostByLogLevel; + + static WorkflowTaskHost() + { + _hostByLogLevel = new Dictionary(); + } + + public static IHost GetWorkerHost(LogLevel logLevel = LogLevel.Information) + { + if (!_hostByLogLevel.ContainsKey(logLevel)) + { + var host = CreateWorkerHost(ApiExtensions.GetConfiguration(), logLevel); + _hostByLogLevel[logLevel] = host; + } + return _hostByLogLevel[logLevel]; + } + + public static IHost CreateWorkerHost(Configuration configuration, LogLevel logLevel = LogLevel.Information) + { + return new HostBuilder() + .ConfigureServices( + (ctx, services) => + { + services.AddConductorWorker(configuration); + services.WithHostedService(); + } + ).ConfigureLogging( + logging => + { + logging.SetMinimumLevel(logLevel); + logging.AddConsole(); + } + ).Build(); + } + + public static IHost CreateWorkerHost(Configuration configuration, LogLevel logLevel = LogLevel.Information, params IWorkflowTask[] workers) { return new HostBuilder() .ConfigureServices( @@ -23,10 +58,10 @@ public static IHost CreateWorkerHost(Configuration configuration, params T[] ).ConfigureLogging( logging => { - logging.SetMinimumLevel(LogLevel.Debug); + logging.SetMinimumLevel(logLevel); logging.AddConsole(); } ).Build(); } } -} \ No newline at end of file +} diff --git a/Tests/Client/OrkesApiClientTest.cs b/Tests/Client/OrkesApiClientTest.cs index abcd3981..2aee3e63 100644 --- a/Tests/Client/OrkesApiClientTest.cs +++ b/Tests/Client/OrkesApiClientTest.cs @@ -1,6 +1,6 @@ using Conductor.Api; using Conductor.Client; -using Tests.Util; +using Conductor.Client.Extensions; using Xunit; namespace Tests.Client @@ -10,7 +10,7 @@ public class OrkesApiClientTest [Fact] public void TestOrkesApiClient() { - var configuration = ApiUtil.GetConfiguration(); + var configuration = ApiExtensions.GetConfiguration(); var orkesApiClient = new OrkesApiClient(configuration, null); var workflowClient = orkesApiClient.GetClient(); var expectedWorkflowClient = configuration.GetClient(); diff --git a/Tests/Definition/WorkflowDefinitionTests.cs b/Tests/Definition/WorkflowDefinitionTests.cs index 53e0c35d..4feaec69 100644 --- a/Tests/Definition/WorkflowDefinitionTests.cs +++ b/Tests/Definition/WorkflowDefinitionTests.cs @@ -1,9 +1,9 @@ +using Conductor.Client.Extensions; using Conductor.Client.Models; using Conductor.Definition; using Conductor.Definition.TaskType; using Conductor.Executor; using System; -using Tests.Util; using Xunit; namespace Tests.Definition @@ -21,7 +21,7 @@ public class WorkflowDefTests public WorkflowDefTests() { - _workflowExecutor = ApiUtil.GetWorkflowExecutor(); + _workflowExecutor = ApiExtensions.GetWorkflowExecutor(); } [Fact] diff --git a/Tests/Util/WorkerUtil.cs b/Tests/Util/WorkerUtil.cs deleted file mode 100644 index 547a18bc..00000000 --- a/Tests/Util/WorkerUtil.cs +++ /dev/null @@ -1,24 +0,0 @@ -using Conductor.Client.Worker; -using Microsoft.Extensions.Hosting; -using Tests.Worker; - -namespace Tests.Util -{ - public class WorkerUtil - { - private static IHost _host = null; - - static WorkerUtil() - { - _host = WorkflowTaskHost.CreateWorkerHost( - ApiUtil.GetConfiguration(), - new SimpleWorker() - ); - } - - public static IHost GetWorkerHost() - { - return _host; - } - } -} \ No newline at end of file diff --git a/Tests/Util/WorkflowUtil.cs b/Tests/Util/WorkflowUtil.cs deleted file mode 100644 index 3aa1bc2d..00000000 --- a/Tests/Util/WorkflowUtil.cs +++ /dev/null @@ -1,97 +0,0 @@ -using Conductor.Api; -using Conductor.Client; -using Conductor.Client.Models; -using Conductor.Definition; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System; - -namespace Tests.Util -{ - public class WorkflowUtil - { - private static int RETRY_ATTEMPT_LIMIT = 5; - public static async System.Threading.Tasks.Task> StartWorkflows(WorkflowResourceApi workflowClient, ConductorWorkflow workflow, int maxAllowedInParallel, int total) - { - var workflowIds = new ConcurrentBag(); - await StartWorkflowBatch(workflowClient, workflow, total % maxAllowedInParallel, workflowIds); - for (int i = 1; i * maxAllowedInParallel <= total; i += 1) - { - await StartWorkflowBatch(workflowClient, workflow, maxAllowedInParallel, workflowIds); - } - Console.WriteLine($"Started {workflowIds.Count} workflows"); - return workflowIds; - } - - public static async System.Threading.Tasks.Task> GetWorkflowStatusList(WorkflowResourceApi workflowClient, int maxAllowedInParallel, params string[] workflowIds) - { - var workflowStatusList = new ConcurrentBag(); - for (int index = 0; index < workflowIds.Length; index += maxAllowedInParallel) - { - await GetWorkflowStatusBatch(workflowClient, workflowStatusList, workflowIds, index, index + maxAllowedInParallel); - } - Console.WriteLine($"Got ${workflowStatusList.Count} workflow statuses"); - return workflowStatusList; - } - - private static async System.Threading.Tasks.Task GetWorkflowStatusBatch(WorkflowResourceApi workflowClient, ConcurrentBag workflowStatusList, string[] workflowIds, int startIndex, int finishIndex) - { - var threads = new List(); - for (int i = Math.Max(0, startIndex); i < Math.Min(workflowIds.Length, finishIndex); i += 1) - { - int copy = i; - threads.Add( - System.Threading.Tasks.Task.Run( - () => GetWorkflowStatus(workflowClient, workflowStatusList, workflowIds, copy))); - } - await System.Threading.Tasks.Task.WhenAll(threads); - } - - private static async System.Threading.Tasks.Task StartWorkflowBatch(WorkflowResourceApi workflowClient, ConductorWorkflow workflow, int quantity, ConcurrentBag workflowIds) - { - List threads = new List(); - var startWorkflowRequest = workflow.GetStartWorkflowRequest(); - for (int counter = 0; counter < quantity; counter += 1) - { - threads.Add( - System.Threading.Tasks.Task.Run( - () => StartWorkflow(workflowClient, startWorkflowRequest, workflowIds))); - } - await System.Threading.Tasks.Task.WhenAll(threads); - } - - private static void GetWorkflowStatus(WorkflowResourceApi workflowClient, ConcurrentBag workflowStatusList, string[] workflowIds, int index) - { - for (int attempt = 0; attempt < RETRY_ATTEMPT_LIMIT; attempt += 1) - { - try - { - workflowStatusList.Add(workflowClient.GetWorkflowStatusSummary(workflowIds[index])); - return; - } - catch (ApiException e) - { - Console.WriteLine($"Failed to get workflow status, reason: {e}"); - System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1)); - } - } - } - - private static void StartWorkflow(WorkflowResourceApi workflowClient, StartWorkflowRequest startWorkflowRequest, ConcurrentBag workflowIds) - { - for (int attempt = 0; attempt < RETRY_ATTEMPT_LIMIT; attempt += 1) - { - try - { - workflowIds.Add(workflowClient.StartWorkflow(startWorkflowRequest)); - return; - } - catch (ApiException e) - { - Console.WriteLine($"Failed to start workflow, reason: {e}"); - System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1)); - } - } - } - } -} \ No newline at end of file diff --git a/Tests/Worker/SimpleWorker.cs b/Tests/Worker/SimpleWorker.cs deleted file mode 100644 index 3b9b719b..00000000 --- a/Tests/Worker/SimpleWorker.cs +++ /dev/null @@ -1,26 +0,0 @@ -using Conductor.Client.Interfaces; -using Conductor.Client.Extensions; -using Conductor.Client.Models; -using Conductor.Client.Worker; -using System; - -namespace Tests.Worker -{ - public class SimpleWorker : IWorkflowTask - { - public string TaskType { get; } - public WorkflowTaskExecutorConfiguration WorkerSettings { get; } - - public SimpleWorker(string taskType = "test-sdk-csharp-task") - { - TaskType = taskType; - WorkerSettings = new WorkflowTaskExecutorConfiguration(); - WorkerSettings.BatchSize = Math.Max(15, Environment.ProcessorCount * 2); - } - - public TaskResult Execute(Task task) - { - return task.Completed(); - } - } -} diff --git a/Tests/Worker/WorkerTests.cs b/Tests/Worker/WorkerTests.cs index bff1a056..6427412c 100644 --- a/Tests/Worker/WorkerTests.cs +++ b/Tests/Worker/WorkerTests.cs @@ -1,12 +1,12 @@ using Conductor.Api; +using Conductor.Client.Extensions; using Conductor.Client.Models; using Conductor.Definition; using Conductor.Definition.TaskType; -using Conductor.Executor; using System; using System.Collections.Concurrent; +using System.Collections.Generic; using System.Threading; -using Tests.Util; using Xunit; namespace Tests.Worker @@ -15,26 +15,23 @@ public class WorkerTests { private const string WORKFLOW_NAME = "test-sdk-csharp-worker"; private const int WORKFLOW_VERSION = 1; - private const string TASK_NAME = "test-sdk-csharp-task"; - - private readonly WorkflowExecutor _workflowExecutor; + private const string TASK_DOMAIN = "taskDomain"; private readonly WorkflowResourceApi _workflowClient; public WorkerTests() { - _workflowExecutor = ApiUtil.GetWorkflowExecutor(); - _workflowClient = ApiUtil.GetClient(); + _workflowClient = ApiExtensions.GetClient(); } [Fact] public async System.Threading.Tasks.Task TestWorkflowAsyncExecution() { - ConductorWorkflow workflow = GetConductorWorkflow(); - _workflowExecutor.RegisterWorkflow(workflow, true); - var workflowIdList = await StartWorkflows(workflow, quantity: 64); - await ExecuteWorkflowTasks(TimeSpan.FromSeconds(16)); + var workflow = GetConductorWorkflow(); + ApiExtensions.GetWorkflowExecutor().RegisterWorkflow(workflow, true); + var workflowIdList = await StartWorkflows(workflow, quantity: 32); + await ExecuteWorkflowTasks(workflowCompletionTimeout: TimeSpan.FromSeconds(10)); await ValidateWorkflowCompletion(workflowIdList.ToArray()); } @@ -48,17 +45,20 @@ private ConductorWorkflow GetConductorWorkflow() private async System.Threading.Tasks.Task> StartWorkflows(ConductorWorkflow workflow, int quantity) { - var startedWorkflows = await WorkflowUtil.StartWorkflows( + var startWorkflowRequest = workflow.GetStartWorkflowRequest(); + startWorkflowRequest.TaskToDomain = new Dictionary { { TASK_NAME, TASK_DOMAIN } }; + var startedWorkflows = await WorkflowExtensions.StartWorkflows( _workflowClient, - workflow, - Math.Max(15, Environment.ProcessorCount << 1), - quantity); + startWorkflowRequest, + maxAllowedInParallel: 10, + total: quantity + ); return startedWorkflows; } private async System.Threading.Tasks.Task ExecuteWorkflowTasks(TimeSpan workflowCompletionTimeout) { - var host = WorkerUtil.GetWorkerHost(); + var host = WorkflowTaskHost.GetWorkerHost(Microsoft.Extensions.Logging.LogLevel.Debug); await host.StartAsync(); Thread.Sleep(workflowCompletionTimeout); await host.StopAsync(); @@ -66,11 +66,12 @@ private async System.Threading.Tasks.Task ExecuteWorkflowTasks(TimeSpan workflow private async System.Threading.Tasks.Task ValidateWorkflowCompletion(params string[] workflowIdList) { - var workflowStatusList = await WorkflowUtil.GetWorkflowStatusList( + var workflowStatusList = await WorkflowExtensions.GetWorkflowStatusList( _workflowClient, - Math.Max(15, Environment.ProcessorCount << 1), - workflowIdList); - int incompleteWorkflowCounter = 0; + maxAllowedInParallel: 10, + workflowIdList + ); + var incompleteWorkflowCounter = 0; foreach (var workflowStatus in workflowStatusList) { if (workflowStatus.Status.Value != WorkflowStatus.StatusEnum.COMPLETED) @@ -82,4 +83,4 @@ private async System.Threading.Tasks.Task ValidateWorkflowCompletion(params stri Assert.Equal(0, incompleteWorkflowCounter); } } -} \ No newline at end of file +} diff --git a/Tests/Worker/Workers.cs b/Tests/Worker/Workers.cs new file mode 100644 index 00000000..cbc8b61d --- /dev/null +++ b/Tests/Worker/Workers.cs @@ -0,0 +1,24 @@ +using Conductor.Client.Extensions; +using Conductor.Client.Models; +using Conductor.Client.Worker; + +namespace Tests.Worker +{ + [WorkerTask] + public class Workers + { + // Polls for 1 task every 35ms + [WorkerTask("test-sdk-csharp-task", 1, "taskDomain", 35, "workerId")] + public static TaskResult SimpleWorkerStatic(Task task) + { + return task.Completed(); + } + + // Polls for 12 tasks every 420ms + [WorkerTask("test-sdk-csharp-task", 12, "taskDomain", 420, "workerId")] + public TaskResult SimpleWorker(Task task) + { + return task.Completed(); + } + } +}