-
Notifications
You must be signed in to change notification settings - Fork 37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Add SIGTERM handling #297
base: main
Are you sure you want to change the base?
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
@maciejka It exits correctly. |
scripts/data/client.py
Outdated
logger.debug("Producer is waiting for weight to be released.") | ||
weight_lock.wait() # Wait for the condition to be met | ||
weight_lock.wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What will happen when SIGTERM is sent when thread is waiting for a lock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Threads waiting on the lock will wait until they are notified (either by a timeout or by other threads) before they can check the shutdown_event
and exit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This means SIGTERM will not terminate the program immediately?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it will not terminate the program immediately rather it initiates a graceful shutdown process which will allow active threads to finish their current tasks and lets waiting threads proceed once they acquire the lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The point of sending SIGTERM is to stop immediately.
…nding jobs, stop all running processes shutdown the thread pool without waiting and exit
def process_batch(job): | ||
arguments_file = job.batch_file.as_posix().replace(".json", "-arguments.json") | ||
if shutdown_event.is_set(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shutdown_event is checked in job_consumer, no need to recheck it here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scripts/data/client.py
Outdated
error = result.stdout or result.stderr | ||
if result.returncode == -9: | ||
match = re.search(r"gas_spent=(\d+)", result.stdout) | ||
while process.poll() is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extract process running logic into a standalone function (name suggestion: run?) that will throw an exception in case of shutdown.
scripts/data/client.py
Outdated
# If executor exists, shutdown immediately | ||
if executor: | ||
logger.info("Shutting down thread pool...") | ||
executor.shutdown(wait=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why wait is False? Don't we want to wait until it is shutdown?
scripts/data/client.py
Outdated
shutdown_event.set() | ||
|
||
# Clear the job queue | ||
while not job_queue.empty(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
signal handler modifying job_queue
does not look like a good design. I would just set shutdown_event
here.
In job_producer
when shutdown is detected all consumers can be notified with notify_all
. Cleaning job_queue
does not seem necessary.
def process_batch(job): | ||
arguments_file = job.batch_file.as_posix().replace(".json", "-arguments.json") | ||
if shutdown_event.is_set(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scripts/data/client.py
Outdated
af.write(str(format_args(job.batch_file, False, False))) | ||
|
||
result = subprocess.run( | ||
def run_process(arguments_file, job): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
run_process
should be more generic. I mean it should just manage subprocess and return error (stdout or stderr). Error interpretation should be done in process_batch
. I want to keep clear separation between functions that facilitate multiprocessing and those that handle bussines logic (job_generator
, process_batch
)
scripts/data/client.py
Outdated
for _ in range(THREAD_POOL_SIZE): | ||
job_queue.put(None) | ||
|
||
if not shutdown_event.is_set(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it necessary? If there are no more jobs we just let consumers finish, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you let this happen during shutdown you will not need to wait with timeout in line 224. Information that there is no more work will reach consumers.
def job_producer(job_gen): | ||
global current_weight | ||
|
||
try: | ||
for job, weight in job_gen: | ||
# Wait until there is enough weight capacity to add the new block | ||
if shutdown_event.is_set(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this check necessary? shutdown_event
is checked few lines below.
for height in height_range: | ||
if shutdown_event.is_set(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scripts/data/client.py
Outdated
logger = logging.getLogger(__name__) | ||
|
||
# Constants | ||
MAX_WEIGHT_LIMIT = 8000 # Total weight limit for all jobs | ||
THREAD_POOL_SIZE = os.cpu_count() # Number of threads for processing | ||
QUEUE_MAX_SIZE = THREAD_POOL_SIZE * 2 # Maximum size of the job queue | ||
|
||
BASE_DIR = Path(".client_cache") | ||
BASE_DIR = Path(__file__).resolve().parent / ".client_cache" # Use absolute path |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change is necessary?
scripts/data/client.py
Outdated
af.write(str(format_args(job.batch_file, False, False))) | ||
|
||
result = subprocess.run( | ||
def run_process(arguments_file): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should take whole command as an argument
scripts/data/client.py
Outdated
|
||
|
||
# Producer function: Generates data and adds jobs to the queue | ||
def job_producer(job_gen): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you remove comments?
scripts/data/client.py
Outdated
(job, weight) = work_to_do | ||
|
||
# Process the block | ||
job, weight = work_to_do | ||
try: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why log is removed here?
scripts/data/client.py
Outdated
|
||
# Mark job as done | ||
job_queue.task_done() | ||
finally: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why except
block is removed? Exceptions might happen during execution. You probably need to handle process termination exception explicitly.
Hey @raizo07 there are many unanswered comments. Please respond to all of them. |
@maciejka Okay, I'll respond to them |
scripts/data/client.py
#295Add SIGTERM handling to scripts/data/client.py