|
2 | 2 | import os
|
3 | 3 | import shutil
|
4 | 4 | import signal
|
5 |
| -import time |
6 | 5 | from abc import ABC, abstractmethod
|
7 | 6 | from typing import Callable, Coroutine, List, Optional
|
8 | 7 |
|
9 | 8 | import aiohttp
|
10 |
| -import psutil |
11 | 9 | from opal_client.config import EngineLogFormat, opal_client_config
|
12 | 10 | from opal_client.engine.logger import log_engine_output_opa, log_engine_output_simple
|
13 | 11 | from opal_client.engine.options import CedarServerOptions, OpaServerOptions
|
|
17 | 15 | AsyncCallback = Callable[[], Coroutine]
|
18 | 16 |
|
19 | 17 |
|
20 |
| -async def wait_until_process_is_up( |
21 |
| - process_pid: int, |
22 |
| - callback: Optional[AsyncCallback], |
23 |
| - wait_interval: float = 0.1, |
24 |
| - timeout: Optional[float] = None, |
25 |
| -): |
26 |
| - """Waits until the pid of the process exists, then optionally runs a |
27 |
| - callback. |
28 |
| -
|
29 |
| - optionally receives a timeout to give up. |
30 |
| - """ |
31 |
| - start_time = time.time() |
32 |
| - while not psutil.pid_exists(process_pid): |
33 |
| - if timeout is not None and start_time - time.time() > timeout: |
34 |
| - break |
35 |
| - await asyncio.sleep(wait_interval) |
36 |
| - if callback is not None: |
37 |
| - await callback() |
38 |
| - |
39 |
| - |
40 | 18 | class PolicyEngineRunner(ABC):
|
41 | 19 | """Runs the policy engine in a supervised subprocess.
|
42 | 20 |
|
@@ -210,22 +188,19 @@ async def _run_process_until_terminated(self) -> int:
|
210 | 188 | start_new_session=True,
|
211 | 189 | )
|
212 | 190 |
|
213 |
| - # waits until the process is up, then runs a callback |
214 |
| - asyncio.create_task( |
215 |
| - wait_until_process_is_up( |
216 |
| - self._process.pid, callback=self._run_start_callbacks |
217 |
| - ) |
218 |
| - ) |
219 |
| - |
220 |
| - # After the process is up (PID exists) we also want to make sure the |
| 191 | + # After the process is up, we also want to make sure the |
221 | 192 | # engine reports as healthy before we continue. We run the health
|
222 |
| - # check in the background and set an event so __aenter__ can await it. |
| 193 | + # check in the background and set an event, so __aenter__ can await it. |
223 | 194 | async def _set_ready_when_healthy():
|
224 | 195 | try:
|
225 | 196 | await self._wait_for_engine_health()
|
226 |
| - self._engine_ready.set() |
227 | 197 | except Exception as e:
|
228 | 198 | logger.error("Engine failed health check: {err}", err=e)
|
| 199 | + else: |
| 200 | + self._engine_ready.set() |
| 201 | + # Now that the engine is confirmed healthy, run the |
| 202 | + # lifecycle callbacks (initial start or rehydration). |
| 203 | + await self._run_start_callbacks() |
229 | 204 |
|
230 | 205 | asyncio.create_task(_set_ready_when_healthy())
|
231 | 206 |
|
|
0 commit comments