diff --git a/batch_daily/README.md b/batch_daily/README.md new file mode 100644 index 00000000..2c75dbb6 --- /dev/null +++ b/batch_daily/README.md @@ -0,0 +1,36 @@ +# Batch sample + +This is an example workflow that solves the following use-case. + +You have a series of records that are divided into daily batches (think a days +worth of telemetry coming from an application). +Every day you would like to run a batch to process a days worth of records, but +you would also like to have the ability to backfill the records from a previous +window of time. + +Backfilling might be run as a schedule or it might be run as a directly +triggered workflow. + +Please make sure your python is 3.9 above. For this sample, run: + +``` +poetry install --with batch_daily +``` + +To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the worker: + +```bash +poetry run python run_worker.py +``` + +This will start the worker. Then, in another terminal, run the following to start the workflow: + +```bash +poetry run python starter.py +``` + +Optionally, you can schedule the workflow with: + +```bash +poetry run python create_schedule.py +``` diff --git a/batch_daily/__init__.py b/batch_daily/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/batch_daily/activities.py b/batch_daily/activities.py new file mode 100644 index 00000000..3ca9a3a4 --- /dev/null +++ b/batch_daily/activities.py @@ -0,0 +1,43 @@ +import asyncio +import time +import random +from typing import Any, Dict, List +from temporalio import activity + +from dataclasses import dataclass + + +@dataclass +class ListRecordActivityInput: + record_filter: str + day: str + + +@dataclass +class ProcessRecordActivityInput: + uri: str + + +async def random_sleep(): + """ + simulate a long running operation with a random sleep. + """ + sleep_s = 1 / random.randint(1, 100) + await asyncio.sleep(sleep_s) + + +@activity.defn +async def list_records(activity_input: ListRecordActivityInput) -> List[str]: + print( + f"filtering records on {activity_input.day} based on filter: {activity_input.record_filter}" + ) + await random_sleep() + return [f"uri://record-id{idx}" for idx in range(10)] + + +@activity.defn +async def process_record(activity_input: ProcessRecordActivityInput) -> Dict[str, Any]: + t0 = time.monotonic() + print(f"this record is yummy: {activity_input.uri}") + await random_sleep() + return {"runtime": time.monotonic() - t0} diff --git a/batch_daily/create_schedule.py b/batch_daily/create_schedule.py new file mode 100644 index 00000000..72b1d28f --- /dev/null +++ b/batch_daily/create_schedule.py @@ -0,0 +1,46 @@ +import asyncio +import traceback +from datetime import datetime, timedelta + +from temporalio.client import ( + Client, + Schedule, + ScheduleActionStartWorkflow, + ScheduleIntervalSpec, + ScheduleSpec, + WorkflowFailureError, +) + +from batch_daily.workflows import ( + RecordBatchProcessor, + RecordBatchProcessorWorkflowInput, + TASK_QUEUE_NAME, +) + + +async def main() -> None: + """Main function to run temporal workflow.""" + client = await Client.connect("localhost:7233") + + try: + wf_input = RecordBatchProcessorWorkflowInput(record_filter="taste=yummy") + await client.create_schedule( + "daily-batch-wf-schedule", + Schedule( + action=ScheduleActionStartWorkflow( + RecordBatchProcessor.run, + wf_input, + id=f"record-filter-{wf_input.record_filter}", + task_queue=TASK_QUEUE_NAME, + ), + spec=ScheduleSpec( + intervals=[ScheduleIntervalSpec(every=timedelta(hours=1))] + ), + ), + ) + except WorkflowFailureError: + print("Got exception: ", traceback.format_exc()) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/batch_daily/run_worker.py b/batch_daily/run_worker.py new file mode 100644 index 00000000..356962d9 --- /dev/null +++ b/batch_daily/run_worker.py @@ -0,0 +1,29 @@ +import asyncio +from concurrent.futures import ThreadPoolExecutor + +from temporalio.client import Client +from temporalio.worker import Worker + +from batch_daily.activities import ( + list_records, + process_record, +) +from batch_daily.workflows import DailyBatch, RecordBatchProcessor, TASK_QUEUE_NAME + + +async def main() -> None: + """Main worker function.""" + client = await Client.connect("localhost:7233") + + worker: Worker = Worker( + client, + task_queue=TASK_QUEUE_NAME, + workflows=[DailyBatch, RecordBatchProcessor], + activities=[list_records, process_record], + activity_executor=ThreadPoolExecutor(100), + ) + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/batch_daily/starter.py b/batch_daily/starter.py new file mode 100644 index 00000000..4f834665 --- /dev/null +++ b/batch_daily/starter.py @@ -0,0 +1,28 @@ +import asyncio + +from temporalio.client import Client + +# from batch_daily.activity import +from batch_daily.workflows import DailyBatchWorkflowInput, TASK_QUEUE_NAME, DailyBatch + + +async def main(): + client = await Client.connect( + "localhost:7233", + ) + + result = await client.execute_workflow( + DailyBatch.run, + DailyBatchWorkflowInput( + start_day="2024-01-01", + end_day="2024-03-01", + record_filter="taste=yummy", + ), + id=f"daily_batch-workflow-id", + task_queue=TASK_QUEUE_NAME, + ) + print(f"Workflow result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/batch_daily/workflows.py b/batch_daily/workflows.py new file mode 100644 index 00000000..e9a1c037 --- /dev/null +++ b/batch_daily/workflows.py @@ -0,0 +1,106 @@ +import asyncio +from datetime import datetime, timedelta + +from dataclasses import dataclass +import time +from typing import Any, Dict, Optional + +from temporalio import workflow +from temporalio.common import RetryPolicy +from temporalio.exceptions import ActivityError +from temporalio.common import SearchAttributeKey + +with workflow.unsafe.imports_passed_through(): + from batch_daily.activities import ( + ListRecordActivityInput, + list_records, + ProcessRecordActivityInput, + process_record, + ) + +TASK_QUEUE_NAME = "MY_TASK_QUEUE" + + +@dataclass +class RecordBatchProcessorWorkflowInput: + record_filter: str + day: Optional[str] = None + + +@workflow.defn +class RecordBatchProcessor: + @workflow.run + async def run( + self, workflow_input: RecordBatchProcessorWorkflowInput + ) -> Dict[str, Any]: + if workflow_input.day is None: + schedule_time = workflow.info().typed_search_attributes.get( + SearchAttributeKey.for_datetime("TemporalScheduledStartTime") + ) + assert schedule_time is not None, "when not scheduled, day must be provided" + workflow_input.day = schedule_time.strftime("%Y-%m-%d") + + print(f"starting RecordProcessor with {workflow_input}") + + list_records_input = ListRecordActivityInput( + record_filter=workflow_input.record_filter, day=workflow_input.day + ) + + record_uri_list = await workflow.execute_activity( + list_records, + list_records_input, + start_to_close_timeout=timedelta(minutes=5), + ) + + task_list = [] + async with asyncio.TaskGroup() as tg: + for key in record_uri_list: + process_record_input = ProcessRecordActivityInput(uri=key) + task_list.append( + tg.create_task( + workflow.execute_activity( + process_record, + process_record_input, + start_to_close_timeout=timedelta(minutes=1), + ) + ) + ) + total_runtime = sum(map(lambda task: task.result()["runtime"], task_list)) + return {"runtime": total_runtime} + + +@dataclass +class DailyBatchWorkflowInput: + start_day: str + end_day: str + record_filter: str + + +@workflow.defn +class DailyBatch: + """DailyBatch workflow""" + + @workflow.run + async def run(self, workflow_input: DailyBatchWorkflowInput) -> Dict[str, Any]: + print(f"starting DailyBatch with {workflow_input}") + + start = datetime.strptime(workflow_input.start_day, "%Y-%m-%d") + end = datetime.strptime(workflow_input.end_day, "%Y-%m-%d") + task_list = [] + async with asyncio.TaskGroup() as tg: + for day in [ + start + timedelta(days=x) for x in range(0, (end - start).days) + ]: + task_list.append( + tg.create_task( + workflow.execute_child_workflow( + RecordBatchProcessor.run, + RecordBatchProcessorWorkflowInput( + day=day.strftime("%Y-%m-%d"), + record_filter=workflow_input.record_filter, + ), + ) + ) + ) + total_runtime = sum(map(lambda task: task.result()["runtime"], task_list)) + return {"runtime": total_runtime}