Skip to content

Commit

Permalink
Merge branch 'main' into add-waiting-env
Browse files Browse the repository at this point in the history
Signed-off-by: Will Sackfield <[email protected]>
  • Loading branch information
8W9aG authored Oct 28, 2024
2 parents 187eeea + 5e2218f commit 8ca482f
Show file tree
Hide file tree
Showing 25 changed files with 422 additions and 408 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ dependencies = [
"requests>=2,<3",
"structlog>=20,<25",
'typing-compat; python_version < "3.8"',
"typing_extensions>=4.4.0",
"typing_extensions>=4.6.0",
"uvicorn[standard]>=0.12,<1",
]

Expand Down
91 changes: 0 additions & 91 deletions python/cog/server/connection.py

This file was deleted.

6 changes: 0 additions & 6 deletions python/cog/server/eventtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,6 @@

# From worker parent process
#
@define
class Cancel:
# TODO: identify which prediction!
pass


@define
class PredictionInput:
payload: Dict[str, Any]
Expand Down
101 changes: 1 addition & 100 deletions python/cog/server/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import threading
import uuid
from types import TracebackType
from typing import Any, BinaryIO, Callable, Dict, List, Sequence, TextIO, Union
from typing import Any, Callable, Dict, List, Sequence, TextIO, Union

import pydantic
from typing_extensions import Self
Expand All @@ -19,45 +19,6 @@
from .errors import CogRuntimeError, CogTimeoutError


class _SimpleStreamWrapper(io.TextIOWrapper):
"""
_SimpleStreamWrapper wraps a binary I/O buffer and provides a TextIOWrapper
interface (primarily write and flush methods) which call a provided
callback function instead of (or, if `tee` is True, in addition to) writing
to the underlying buffer.
"""

def __init__(
self,
buffer: BinaryIO,
callback: Callable[[str, str], None],
tee: bool = False,
) -> None:
super().__init__(buffer, line_buffering=True)

self._callback = callback
self._tee = tee
self._buffer = []

def write(self, s: str) -> int:
length = len(s)
self._buffer.append(s)
if self._tee:
super().write(s)
else:
# If we're not teeing, we have to handle automatic flush on
# newline. When `tee` is true, this is handled by the write method.
if "\n" in s or "\r" in s:
self.flush()
return length

def flush(self) -> None:
self._callback(self.name, "".join(self._buffer))
self._buffer.clear()
if self._tee:
super().flush()


class _StreamWrapper:
def __init__(self, name: str, stream: TextIO) -> None:
self.name = name
Expand Down Expand Up @@ -125,66 +86,6 @@ def original(self) -> TextIO:
return self._original_fp


if sys.version_info < (3, 9):

class _AsyncStreamRedirectorBase(contextlib.AbstractContextManager):
pass
else:

class _AsyncStreamRedirectorBase(
contextlib.AbstractContextManager["AsyncStreamRedirector"]
):
pass


class AsyncStreamRedirector(_AsyncStreamRedirectorBase):
"""
AsyncStreamRedirector is a context manager that redirects I/O streams to a
callback function. If `tee` is True, it also writes output to the original
streams.
Unlike StreamRedirector, the underlying stream file descriptors are not
modified, which means that only stream writes from Python code will be
captured. Writes from native code will not be captured.
Unlike StreamRedirector, the streams redirected cannot be configured. The
context manager is only able to redirect STDOUT and STDERR.
"""

def __init__(
self,
callback: Callable[[str, str], None],
tee: bool = False,
) -> None:
self._callback = callback
self._tee = tee

stdout_wrapper = _SimpleStreamWrapper(sys.stdout.buffer, callback, tee)
stderr_wrapper = _SimpleStreamWrapper(sys.stderr.buffer, callback, tee)
self._stdout_ctx = contextlib.redirect_stdout(stdout_wrapper)
self._stderr_ctx = contextlib.redirect_stderr(stderr_wrapper)

def __enter__(self) -> Self:
self._stdout_ctx.__enter__()
self._stderr_ctx.__enter__()
return self

def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
self._stdout_ctx.__exit__(exc_type, exc_value, traceback)
self._stderr_ctx.__exit__(exc_type, exc_value, traceback)

def drain(self, timeout: float = 0.0) -> None:
# Draining isn't complicated for AsyncStreamRedirector, since we're not
# moving data between threads. We just need to flush the streams.
sys.stdout.flush()
sys.stderr.flush()


if sys.version_info < (3, 9):

class _StreamRedirectorBase(contextlib.AbstractContextManager):
Expand Down
1 change: 0 additions & 1 deletion python/cog/server/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ def result(self) -> T:
class SetupTask(Task[SetupResult]):
def __init__(self, _clock: Optional[Callable[[], datetime]] = None) -> None:
log.info("starting setup")

self._clock = _clock
if self._clock is None:
self._clock = lambda: datetime.now(timezone.utc)
Expand Down
Loading

0 comments on commit 8ca482f

Please sign in to comment.