@@ -32,8 +32,12 @@ def _log_all_threads(logger: logging.Logger, context: str = ""):
3232 active_threads = threading .enumerate ()
3333 thread_info = []
3434 for t in active_threads :
35- thread_info .append (f"name='{ t .name } ', id={ t .ident } , daemon={ t .daemon } , alive={ t .is_alive ()} " )
36- logger .debug (f"[THREAD_TRACE] { context } Active threads ({ len (active_threads )} ): { ', ' .join (thread_info )} " )
35+ thread_info .append (
36+ f"name='{ t .name } ', id={ t .ident } , daemon={ t .daemon } , alive={ t .is_alive ()} "
37+ )
38+ logger .debug (
39+ f"[THREAD_TRACE] { context } Active threads ({ len (active_threads )} ): { ', ' .join (thread_info )} "
40+ )
3741
3842
3943class ConcurrencyOptions :
@@ -429,13 +433,13 @@ def stream_reader():
429433 stream = self ._response_stream
430434 if stream is None :
431435 return
432-
436+
433437 # Use next() to allow shutdown check between items
434438 # This matches Go's pattern: check ctx.Err() after each stream.Recv()
435439 while True :
436440 if self ._shutdown .is_set ():
437441 break
438-
442+
439443 try :
440444 # NOTE: next(stream) blocks until gRPC returns the next work item or cancels the stream.
441445 # There is no way to interrupt this blocking call in Python gRPC. When shutdown is
@@ -451,28 +455,39 @@ def stream_reader():
451455 break
452456 except grpc .RpcError as rpc_error :
453457 # Check if this is due to shutdown/cancellation
454- if self ._shutdown .is_set () or rpc_error .code () == grpc .StatusCode .CANCELLED :
455- self ._logger .debug (f"Stream reader: stream cancelled during shutdown (code={ rpc_error .code ()} )" )
458+ if (
459+ self ._shutdown .is_set ()
460+ or rpc_error .code () == grpc .StatusCode .CANCELLED
461+ ):
462+ self ._logger .debug (
463+ f"Stream reader: stream cancelled during shutdown (code={ rpc_error .code ()} )"
464+ )
456465 break
457466 # Other RPC errors - put in queue for async loop to handle
458- self ._logger .warning (f"Stream reader: RPC error (code={ rpc_error .code ()} ): { rpc_error } " )
467+ self ._logger .warning (
468+ f"Stream reader: RPC error (code={ rpc_error .code ()} ): { rpc_error } "
469+ )
459470 break
460471 except Exception as stream_error :
461472 # Check if this is due to shutdown
462473 if self ._shutdown .is_set ():
463- self ._logger .debug (f"Stream reader: exception during shutdown: { type (stream_error ).__name__ } : { stream_error } " )
474+ self ._logger .debug (
475+ f"Stream reader: exception during shutdown: { type (stream_error ).__name__ } : { stream_error } "
476+ )
464477 # Other stream errors - put in queue for async loop to handle
465- self ._logger .warning (f"Stream reader: unexpected error: { stream_error } " )
478+ self ._logger .warning (
479+ f"Stream reader: unexpected error: { stream_error } "
480+ )
466481 raise
467-
482+
468483 except Exception as e :
469484 if not self ._shutdown .is_set ():
470485 work_item_queue .put (e )
471486 finally :
472487 # signal that the stream reader is done (ie matching Go's context cancellation)
473488 try :
474489 work_item_queue .put (SHUTDOWN_SENTINEL )
475- except Exception as e :
490+ except Exception :
476491 # queue might be closed so ignore this
477492 pass
478493
@@ -482,24 +497,27 @@ def stream_reader():
482497 # Daemon threads exit immediately when the main program exits, which prevents
483498 # cleanup of gRPC channel resources and OTel interceptors. Non-daemon threads
484499 # block shutdown until they complete, ensuring all resources are properly closed.
485- current_reader_thread = threading .Thread (target = stream_reader , daemon = False , name = "StreamReader" )
500+ current_reader_thread = threading .Thread (
501+ target = stream_reader , daemon = False , name = "StreamReader"
502+ )
486503 current_reader_thread .start ()
487504 loop = asyncio .get_running_loop ()
488505
489506 # NOTE: This is a blocking call that will wait for a work item to become available or the shutdown sentinel
490507 while not self ._shutdown .is_set ():
491508 try :
492-
493509 # Use timeout to allow shutdown check (mimicing Go's select with ctx.Done())
494510 work_item = await loop .run_in_executor (
495- None ,
496- lambda : work_item_queue . get ( timeout = 0.1 ) )
511+ None , lambda : work_item_queue . get ( timeout = 0.1 )
512+ )
497513 # Essentially check for ctx.Done() in Go
498514 if work_item == SHUTDOWN_SENTINEL :
499515 break
500-
516+
501517 if self ._async_worker_manager ._shutdown or loop .is_closed ():
502- self ._logger .debug ("Async worker manager shut down or loop closed, exiting work item processing" )
518+ self ._logger .debug (
519+ "Async worker manager shut down or loop closed, exiting work item processing"
520+ )
503521 break
504522 if isinstance (work_item , Exception ):
505523 raise work_item
@@ -533,7 +551,7 @@ def stream_reader():
533551 invalidate_connection ()
534552 raise e
535553 current_reader_thread .join (timeout = 1 )
536-
554+
537555 if self ._shutdown .is_set ():
538556 self ._logger .info (f"Disconnected from { self ._host_address } " )
539557 else :
@@ -581,13 +599,17 @@ def stream_reader():
581599 # RuntimeError often indicates asyncio loop issues (e.g., "cannot schedule new futures after shutdown")
582600 # Check shutdown state first
583601 if self ._shutdown .is_set ():
584- self ._logger .debug (f"Shutdown detected during RuntimeError handling, exiting: { ex } " )
602+ self ._logger .debug (
603+ f"Shutdown detected during RuntimeError handling, exiting: { ex } "
604+ )
585605 break
586606 # Check if async worker manager is shut down or loop is closed
587607 try :
588608 loop = asyncio .get_running_loop ()
589609 if self ._async_worker_manager ._shutdown or loop .is_closed ():
590- self ._logger .debug (f"Async worker manager shut down or loop closed, exiting: { ex } " )
610+ self ._logger .debug (
611+ f"Async worker manager shut down or loop closed, exiting: { ex } "
612+ )
591613 break
592614 except RuntimeError :
593615 # No event loop running, treat as shutdown
@@ -598,13 +620,17 @@ def stream_reader():
598620 break
599621 except Exception as ex :
600622 if self ._shutdown .is_set ():
601- self ._logger .debug (f"Shutdown detected during exception handling, exiting: { ex } " )
623+ self ._logger .debug (
624+ f"Shutdown detected during exception handling, exiting: { ex } "
625+ )
602626 break
603627 # Check if async worker manager is shut down or loop is closed
604628 try :
605629 loop = asyncio .get_running_loop ()
606630 if self ._async_worker_manager ._shutdown or loop .is_closed ():
607- self ._logger .debug (f"Async worker manager shut down or loop closed, exiting: { ex } " )
631+ self ._logger .debug (
632+ f"Async worker manager shut down or loop closed, exiting: { ex } "
633+ )
608634 break
609635 except RuntimeError :
610636 # No event loop running, treat as shutdown
@@ -614,7 +640,7 @@ def stream_reader():
614640 self ._logger .warning (f"Unexpected error: { ex } " )
615641 invalidate_connection ()
616642 self ._logger .info ("No longer listening for work items" )
617-
643+
618644 # Cancel worker_task to ensure shutdown completes even if tasks are still running
619645 worker_task .cancel ()
620646 try :
@@ -644,7 +670,7 @@ def stop(self):
644670 self ._logger .exception (f"Error closing gRPC channel: { e } " )
645671 finally :
646672 self ._current_channel = None
647-
673+
648674 if self ._runLoop is not None :
649675 self ._runLoop .join (timeout = self ._stop_timeout )
650676 if self ._runLoop .is_alive ():
@@ -654,7 +680,7 @@ def stop(self):
654680 )
655681 else :
656682 self ._logger .debug ("Worker thread completed successfully" )
657-
683+
658684 self ._async_worker_manager .shutdown ()
659685 self ._logger .info ("Worker shutdown completed" )
660686 self ._is_running = False
@@ -1683,7 +1709,7 @@ async def _consume_queue(self, queue: asyncio.Queue, semaphore: asyncio.Semaphor
16831709 # Check for cancellation during timeout and exit while loop if shutting down
16841710 if self ._shutdown :
16851711 break
1686- continue # otherwise wait for work item to become available and loop again
1712+ continue # otherwise wait for work item to become available and loop again
16871713 except asyncio .CancelledError :
16881714 # Propagate cancellation
16891715 raise
0 commit comments