You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
import trio
import pytest
from functools import partial
from trio.testing import open_stream_to_socket_listener
# The code being tested:
async def echo_server_handler(stream):
while True:
data = await stream.receive_some(1000)
if not data:
break
await stream.send_all(data)
# The fixture:
@pytest.fixture
async def echo_client(nursery):
listeners = await nursery.start(
partial(trio.serve_tcp, echo_server_handler, port=0)
)
echo_client = await open_stream_to_socket_listener(listeners[0])
async with echo_client:
yield echo_client
# A test using the fixture:
async def test_final(echo_client):
for test_byte in [b"a", b"b", b"c"]:
await echo_client.send_all(test_byte)
assert await echo_client.receive_some(1) == test_byte
blows up with trace:
============================= test session starts ==============================
platform linux -- Python 3.8.0, pytest-5.2.2, py-1.8.0, pluggy-0.13.0
rootdir: /home/work/source/git/service, inifile: pytest.ini
plugins: hypothesis-4.50.2, trio-0.5.2, asyncio-0.10.0
collected 1 item
service_test/service/a_echo_test.py F
=================================== FAILURES ===================================
__________________________________ test_final __________________________________
async_fn = functools.partial(<function _trio_test_runner_factory.<locals>._bootstrap_fixtures_and_run_test at 0x7f2c0d8f2af0>, echo_client=<pytest_trio.plugin.TrioFixture object at 0x7f2c0c562be0>)
clock = SystemClock(offset=46288.026812318036), instruments = []
restrict_keyboard_interrupt_to_checkpoints = False, args = ()
def run(
async_fn,
*args,
clock=None,
instruments=(),
restrict_keyboard_interrupt_to_checkpoints=False
):
"""Run a Trio-flavored async function, and return the result.
Calling::
run(async_fn, *args)
is the equivalent of::
await async_fn(*args)
except that :func:`run` can (and must) be called from a synchronous
context.
This is Trio's main entry point. Almost every other function in Trio
requires that you be inside a call to :func:`run`.
Args:
async_fn: An async function.
args: Positional arguments to be passed to *async_fn*. If you need to
pass keyword arguments, then use :func:`functools.partial`.
clock: ``None`` to use the default system-specific monotonic clock;
otherwise, an object implementing the :class:`trio.abc.Clock`
interface, like (for example) a :class:`trio.testing.MockClock`
instance.
instruments (list of :class:`trio.abc.Instrument` objects): Any
instrumentation you want to apply to this run. This can also be
modified during the run; see :ref:`instrumentation`.
restrict_keyboard_interrupt_to_checkpoints (bool): What happens if the
user hits control-C while :func:`run` is running? If this argument
is False (the default), then you get the standard Python behavior: a
:exc:`KeyboardInterrupt` exception will immediately interrupt
whatever task is running (or if no task is running, then Trio will
wake up a task to be interrupted). Alternatively, if you set this
argument to True, then :exc:`KeyboardInterrupt` delivery will be
delayed: it will be *only* be raised at :ref:`checkpoints
<checkpoints>`, like a :exc:`Cancelled` exception.
The default behavior is nice because it means that even if you
accidentally write an infinite loop that never executes any
checkpoints, then you can still break out of it using control-C.
The alternative behavior is nice if you're paranoid about a
:exc:`KeyboardInterrupt` at just the wrong place leaving your
program in an inconsistent state, because it means that you only
have to worry about :exc:`KeyboardInterrupt` at the exact same
places where you already have to worry about :exc:`Cancelled`.
This setting has no effect if your program has registered a custom
SIGINT handler, or if :func:`run` is called from anywhere but the
main thread (this is a Python limitation), or if you use
:func:`open_signal_receiver` to catch SIGINT.
Returns:
Whatever ``async_fn`` returns.
Raises:
TrioInternalError: if an unexpected error is encountered inside Trio's
internal machinery. This is a bug and you should `let us know
<https://github.com/python-trio/trio/issues>`__.
Anything else: if ``async_fn`` raises an exception, then :func:`run`
propagates it.
"""
__tracebackhide__ = True
# Do error-checking up front, before we enter the TrioInternalError
# try/catch
#
# It wouldn't be *hard* to support nested calls to run(), but I can't
# think of a single good reason for it, so let's be conservative for
# now:
if hasattr(GLOBAL_RUN_CONTEXT, "runner"):
raise RuntimeError("Attempted to call run() from inside a run()")
if clock is None:
clock = SystemClock()
instruments = list(instruments)
io_manager = TheIOManager()
system_context = copy_context()
system_context.run(current_async_library_cvar.set, "trio")
runner = Runner(
clock=clock,
instruments=instruments,
io_manager=io_manager,
system_context=system_context,
)
GLOBAL_RUN_CONTEXT.runner = runner
locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True
# KI handling goes outside the core try/except/finally to avoid a window
# where KeyboardInterrupt would be allowed and converted into an
# TrioInternalError:
try:
with ki_manager(
runner.deliver_ki, restrict_keyboard_interrupt_to_checkpoints
):
try:
with closing(runner):
with runner.entry_queue.wakeup.wakeup_on_signals():
# The main reason this is split off into its own
# function is just to get rid of this extra
# indentation.
> run_impl(runner, async_fn, args)
/usr/lib/python3.8/site-packages/trio/_core/_run.py:1790:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
runner = Runner(clock=SystemClock(offset=46288.026812318036), instruments=[], io_manager=EpollIOManager(_epoll=<select.epoll ob...d.RLock object owner=0 count=0 at 0x7f2c0d8d76f0>), trio_token=None, ki_pending=False, waiting_for_idle=SortedDict({}))
async_fn = functools.partial(<function _trio_test_runner_factory.<locals>._bootstrap_fixtures_and_run_test at 0x7f2c0d8f2af0>, echo_client=<pytest_trio.plugin.TrioFixture object at 0x7f2c0c562be0>)
args = ()
def run_impl(runner, async_fn, args):
__tracebackhide__ = True
if runner.instruments:
runner.instrument("before_run")
runner.clock.start_clock()
runner.init_task = runner.spawn_impl(
runner.init,
(async_fn, args),
None,
"<init>",
system_task=True,
)
# You know how people talk about "event loops"? This 'while' loop right
# here is our event loop:
while runner.tasks:
if runner.runq:
timeout = 0
elif runner.deadlines:
deadline, _ = runner.deadlines.keys()[0]
timeout = runner.clock.deadline_to_sleep_time(deadline)
else:
timeout = _MAX_TIMEOUT
timeout = min(max(0, timeout), _MAX_TIMEOUT)
idle_primed = False
if runner.waiting_for_idle:
cushion, tiebreaker, _ = runner.waiting_for_idle.keys()[0]
if cushion < timeout:
timeout = cushion
idle_primed = True
if runner.instruments:
runner.instrument("before_io_wait", timeout)
runner.io_manager.handle_io(timeout)
if runner.instruments:
runner.instrument("after_io_wait", timeout)
# Process cancellations due to deadline expiry
now = runner.clock.current_time()
while runner.deadlines:
(deadline, _), cancel_scope = runner.deadlines.peekitem(0)
if deadline <= now:
# This removes the given scope from runner.deadlines:
cancel_scope.cancel()
idle_primed = False
else:
break
if not runner.runq and idle_primed:
while runner.waiting_for_idle:
key, task = runner.waiting_for_idle.peekitem(0)
if key[:2] == (cushion, tiebreaker):
del runner.waiting_for_idle[key]
runner.reschedule(task)
else:
break
# Process all runnable tasks, but only the ones that are already
# runnable now. Anything that becomes runnable during this cycle needs
# to wait until the next pass. This avoids various starvation issues
# by ensuring that there's never an unbounded delay between successive
# checks for I/O.
#
# Also, we randomize the order of each batch to avoid assumptions
# about scheduling order sneaking in. In the long run, I suspect we'll
# either (a) use strict FIFO ordering and document that for
# predictability/determinism, or (b) implement a more sophisticated
# scheduler (e.g. some variant of fair queueing), for better behavior
# under load. For now, this is the worst of both worlds - but it keeps
# our options open. (If we do decide to go all in on deterministic
# scheduling, then there are other things that will probably need to
# change too, like the deadlines tie-breaker and the non-deterministic
# ordering of task._notify_queues.)
batch = list(runner.runq)
if _ALLOW_DETERMINISTIC_SCHEDULING:
# We're running under Hypothesis, and pytest-trio has patched this
# in to make the scheduler deterministic and avoid flaky tests.
# It's not worth the (small) performance cost in normal operation,
# since we'll shuffle the list and _r is only seeded for tests.
batch.sort(key=lambda t: t._counter)
runner.runq.clear()
_r.shuffle(batch)
while batch:
task = batch.pop()
GLOBAL_RUN_CONTEXT.task = task
if runner.instruments:
runner.instrument("before_task_step", task)
next_send_fn = task._next_send_fn
next_send = task._next_send
task._next_send_fn = task._next_send = None
final_outcome = None
try:
# We used to unwrap the Outcome object here and send/throw its
# contents in directly, but it turns out that .throw() is
# buggy, at least on CPython 3.6 and earlier:
# https://bugs.python.org/issue29587
# https://bugs.python.org/issue29590
# So now we send in the Outcome object and unwrap it on the
# other side.
msg = task.context.run(next_send_fn, next_send)
except StopIteration as stop_iteration:
final_outcome = Value(stop_iteration.value)
except BaseException as task_exc:
# Store for later, removing uninteresting top frames: 1 frame
# we always remove, because it's this function catching it,
# and then in addition we remove however many more Context.run
# adds.
tb = task_exc.__traceback__.tb_next
for _ in range(CONTEXT_RUN_TB_FRAMES):
tb = tb.tb_next
final_outcome = Error(task_exc.with_traceback(tb))
if final_outcome is not None:
# We can't call this directly inside the except: blocks above,
# because then the exceptions end up attaching themselves to
# other exceptions as __context__ in unwanted ways.
> runner.task_exited(task, final_outcome)
/usr/lib/python3.8/site-packages/trio/_core/_run.py:1940:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Runner(clock=SystemClock(offset=46288.026812318036), instruments=[], io_manager=EpollIOManager(_epoll=<select.epoll ob...d.RLock object owner=0 count=0 at 0x7f2c0d8d76f0>), trio_token=None, ki_pending=False, waiting_for_idle=SortedDict({}))
task = <Task "<fixture 'echo_client'>" at 0x7f2c0d8f7280>
outcome = Error(RuntimeError('cannot reuse already awaited coroutine'))
def task_exited(self, task, outcome):
if (
task._cancel_status is not None
and task._cancel_status.abandoned_by_misnesting
and task._cancel_status.parent is None
):
# The cancel scope surrounding this task's nursery was closed
# before the task exited. Force the task to exit with an error,
# since the error might not have been caught elsewhere. See the
# comments in CancelStatus.close().
try:
# Raise this, rather than just constructing it, to get a
# traceback frame included
raise RuntimeError(
"Cancel scope stack corrupted: cancel scope surrounding "
"{!r} was closed before the task exited\n{}".format(
task, MISNESTING_ADVICE
)
)
except RuntimeError as new_exc:
if isinstance(outcome, Error):
new_exc.__context__ = outcome.error
outcome = Error(new_exc)
task._activate_cancel_status(None)
> self.tasks.remove(task)
E KeyError: <Task "<fixture 'echo_client'>" at 0x7f2c0d8f7280>
/usr/lib/python3.8/site-packages/trio/_core/_run.py:1410: KeyError
The above exception was the direct cause of the following exception:
async_fn = functools.partial(<function _trio_test_runner_factory.<locals>._bootstrap_fixtures_and_run_test at 0x7f2c0d8f2af0>, echo_client=<pytest_trio.plugin.TrioFixture object at 0x7f2c0c562be0>)
clock = SystemClock(offset=46288.026812318036), instruments = []
restrict_keyboard_interrupt_to_checkpoints = False, args = ()
def run(
async_fn,
*args,
clock=None,
instruments=(),
restrict_keyboard_interrupt_to_checkpoints=False
):
"""Run a Trio-flavored async function, and return the result.
Calling::
run(async_fn, *args)
is the equivalent of::
await async_fn(*args)
except that :func:`run` can (and must) be called from a synchronous
context.
This is Trio's main entry point. Almost every other function in Trio
requires that you be inside a call to :func:`run`.
Args:
async_fn: An async function.
args: Positional arguments to be passed to *async_fn*. If you need to
pass keyword arguments, then use :func:`functools.partial`.
clock: ``None`` to use the default system-specific monotonic clock;
otherwise, an object implementing the :class:`trio.abc.Clock`
interface, like (for example) a :class:`trio.testing.MockClock`
instance.
instruments (list of :class:`trio.abc.Instrument` objects): Any
instrumentation you want to apply to this run. This can also be
modified during the run; see :ref:`instrumentation`.
restrict_keyboard_interrupt_to_checkpoints (bool): What happens if the
user hits control-C while :func:`run` is running? If this argument
is False (the default), then you get the standard Python behavior: a
:exc:`KeyboardInterrupt` exception will immediately interrupt
whatever task is running (or if no task is running, then Trio will
wake up a task to be interrupted). Alternatively, if you set this
argument to True, then :exc:`KeyboardInterrupt` delivery will be
delayed: it will be *only* be raised at :ref:`checkpoints
<checkpoints>`, like a :exc:`Cancelled` exception.
The default behavior is nice because it means that even if you
accidentally write an infinite loop that never executes any
checkpoints, then you can still break out of it using control-C.
The alternative behavior is nice if you're paranoid about a
:exc:`KeyboardInterrupt` at just the wrong place leaving your
program in an inconsistent state, because it means that you only
have to worry about :exc:`KeyboardInterrupt` at the exact same
places where you already have to worry about :exc:`Cancelled`.
This setting has no effect if your program has registered a custom
SIGINT handler, or if :func:`run` is called from anywhere but the
main thread (this is a Python limitation), or if you use
:func:`open_signal_receiver` to catch SIGINT.
Returns:
Whatever ``async_fn`` returns.
Raises:
TrioInternalError: if an unexpected error is encountered inside Trio's
internal machinery. This is a bug and you should `let us know
<https://github.com/python-trio/trio/issues>`__.
Anything else: if ``async_fn`` raises an exception, then :func:`run`
propagates it.
"""
__tracebackhide__ = True
# Do error-checking up front, before we enter the TrioInternalError
# try/catch
#
# It wouldn't be *hard* to support nested calls to run(), but I can't
# think of a single good reason for it, so let's be conservative for
# now:
if hasattr(GLOBAL_RUN_CONTEXT, "runner"):
raise RuntimeError("Attempted to call run() from inside a run()")
if clock is None:
clock = SystemClock()
instruments = list(instruments)
io_manager = TheIOManager()
system_context = copy_context()
system_context.run(current_async_library_cvar.set, "trio")
runner = Runner(
clock=clock,
instruments=instruments,
io_manager=io_manager,
system_context=system_context,
)
GLOBAL_RUN_CONTEXT.runner = runner
locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True
# KI handling goes outside the core try/except/finally to avoid a window
# where KeyboardInterrupt would be allowed and converted into an
# TrioInternalError:
try:
with ki_manager(
runner.deliver_ki, restrict_keyboard_interrupt_to_checkpoints
):
try:
with closing(runner):
with runner.entry_queue.wakeup.wakeup_on_signals():
# The main reason this is split off into its own
# function is just to get rid of this extra
# indentation.
run_impl(runner, async_fn, args)
except TrioInternalError:
raise
except BaseException as exc:
> raise TrioInternalError(
"internal error in Trio - please file a bug!"
) from exc
E trio.TrioInternalError: internal error in Trio - please file a bug!
/usr/lib/python3.8/site-packages/trio/_core/_run.py:1794: TrioInternalError
============================== 1 failed in 0.13s ===============================
Exception ignored in: <function Nursery.__del__ at 0x7f2c0c94e1f0>
Traceback (most recent call last):
File "/usr/lib/python3.8/site-packages/trio/_core/_run.py", line 968, in __del__
AssertionError:
Exception ignored in: <function Nursery.__del__ at 0x7f2c0c94e1f0>
Traceback (most recent call last):
File "/usr/lib/python3.8/site-packages/trio/_core/_run.py", line 968, in __del__
AssertionError:
The text was updated successfully, but these errors were encountered:
I was facing that one too. It is an incompatibility between pytest-trio and pytest-asyncio.
If you have the two installed, then pytest-asyncio will try to run the fixture setup and fail to do so.
After uninstalling pytest-asyncio, the test code works as expected.
the following
https://pytest-trio.readthedocs.io/en/latest/quickstart.html
test code for
echo_server_handler
The text was updated successfully, but these errors were encountered: