Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correctly enqueue items such that work can be taken by other processes #18

Merged
merged 1 commit into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyaugmecon/process_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def check_timeout(self):
while self.runtime.get() <= self.opts.process_timeout:
if not any(p.is_alive() for p in self.procs): # Check if any process has exited
break
time.sleep(0.5)
time.sleep(1)
else:
self.logger.info("Timed out, gracefully stopping all worker process(es)")
self.queues.empty_job_qs() # Empty the job queues
Expand Down
13 changes: 8 additions & 5 deletions pyaugmecon/queue_handler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
import queue
from multiprocessing import Queue, Manager
from multiprocessing import Manager, Queue

import numpy as np

Expand Down Expand Up @@ -120,13 +120,16 @@ def split_work(self):
blocks = [
self.work[i : i + block_size] for i in range(0, len(self.work), block_size)
] # Divide the work into blocks
blocks = np.array_split(np.array(blocks), self.opts.cpu_count) # Divide the blocks into sub-blocks
blocks = [x for x in blocks if x.size > 0] # Remove empty sub-blocks

# Divide the blocks into sub-blocks and remove empty sub-blocks
blocks = [x for x in np.array_split(np.array(blocks), self.opts.cpu_count) if x.size > 0]

manager = Manager()
self.proc_count = len(blocks) # Set the number of processes to be used
self.job_qs = [manager.Queue() for _ in range(self.proc_count)] # Create a job queue for each process
self.logger.info(f"Dividing grid over {self.proc_count} process(es)") # Log the number of processes

for i, block in enumerate(blocks):
items = [tuple(item) for sublist in block.tolist() for item in sublist] # Flatten the sub-blocks
self.job_qs[i].put(items) # Put the flattened items in the job queue for the process
for item in block:
item = [tuple(x) for x in item.tolist()]
self.job_qs[i].put_nowait(item) # Put each item in the job queue for the corresponding process
Loading