Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

From PENDING to FINISHED, join raises an InvalidStateError #143

Open
balmasea opened this issue Nov 8, 2024 · 1 comment
Open

From PENDING to FINISHED, join raises an InvalidStateError #143

balmasea opened this issue Nov 8, 2024 · 1 comment

Comments

@balmasea
Copy link

balmasea commented Nov 8, 2024

So I'm trying to run a ProcessPool to execute a bunch of asynchronous tasks with a maximum time each. Pretty much this is my code:

def do_transfer(self, hostvars: dict) -> None:
        """Transfer."""
        results = []
        with ProcessPool(max_workers=self.parallel_limit) as pool:
            processes = []
            for host in self.hosts:
                self.logger.debug(
                    f"{self.log_prefix}|Starting transfer of {self.filename} for {host}."
                )
                processes.append(
                    {
                        "host": host,
                        "process": pool.schedule(
                            self.transfer, args=(hostvars[host],)
                        ),
                    }
                )
            try:
                self.logger.info(
                    f"{self.log_prefix}|Maximum time to be running: {self.total_time_for_running_file_transfers} seconds."
                )
                pool.close()
                pool.join(timeout=self.total_time_for_running_file_transfers)
            except TimeoutError:
                self.logger.info(
                    f"{self.log_prefix}|Maximum time to run has expired. Finishing current and planned tasks."
                )
                for process in processes:
                    self.logger.debug(
                        f"{self.log_prefix}|Terminating process for {process['host']}"
                    )
                    if not process["process"].done():
                        process["process"].set_result(
                            {
                                error_message": "Timeout reached. Transfer was not completed.",
                            }
                        )
                pool.stop()
                pool.join()
            results = [process["process"].result() for process in processes]

If the processes are all done or started, everything works. However, if some of the processes has not started yet, when running the pool.join after the pool.stop an InvalidStateError exception is raised.

Here is the backtrace:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/home/bananas/venv/lib/python3.8/site-packages/pebble/pool/process.py", line 218, in schedule
    self.worker_manager.dispatch(task)
  File "/home/bananas/venv/lib/python3.8/site-packages/pebble/pool/process.py", line 353, in dispatch
    raise error
  File "/home/bananas/venv//lib/python3.8/site-packages/pebble/pool/process.py", line 351, in dispatch
    self.pool_channel.send(WorkerTask(task.id, task.payload))
  File "/home/bananas/venv//lib/python3.8/site-packages/pebble/pool/channel.py", line 70, in send
    return self.writer.send(obj)
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 405, in _send_bytes
    self._send(buf)
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
TypeError: an integer is required (got type NoneType)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/home/bananas/venv/lib/python3.8/site-packages/pebble/pool/process.py", line 169, in task_scheduler_loop
    pool_manager.schedule(task)
  File "/home/bananas/venv/lib/python3.8/site-packages/pebble/pool/process.py", line 220, in schedule
    self.task_manager.task_problem(task.id, error)
  File "/home/bananas/venv/lib/python3.8/site-packages/pebble/pool/process.py", line 316, in task_problem
    self.task_done(task_id, Result(TASK_ERROR, error))
  File "/home/bananas/venv/lib/python3.8/site-packages/pebble/pool/process.py", line 309, in task_done
    task.future.set_exception(result.value)
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 547, in set_exception
    raise InvalidStateError('{}: {!r}'.format(self._state, self))
concurrent.futures._base.InvalidStateError: FINISHED: <ProcessFuture at 0x7fc4a2aa45e0 state=finished returned dict>

Is that expected? Do I need to use a different method to finish the processes¿

@noxdafox
Copy link
Owner

Hello,

The context manager does already close and joins the pool for you so you don't have to: https://pebble.readthedocs.io/en/latest/#pebble.ProcessPool

It might be the calling join sequentially too quickly causes a race condition in the logic but it's difficult to troubleshoot as most of the above code is not reproducible locally.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants