Skip to content

Commit

Permalink
Add file-like pager: click.get_pager_file()
Browse files Browse the repository at this point in the history
  • Loading branch information
craigds authored and AndreasBackx committed Nov 9, 2024
1 parent 244d562 commit b544c40
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 32 deletions.
4 changes: 4 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ Unreleased
allows the user to search for future output of the generator when
using less and then aborting the program using ctrl-c.

- Add ``click.get_pager_file`` for file-like access to an output
pager. :pr:`1572`


Version 8.1.8
-------------

Expand Down
2 changes: 2 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ Utilities

.. autofunction:: echo_via_pager

.. autofunction:: get_pager_file

.. autofunction:: prompt

.. autofunction:: confirm
Expand Down
12 changes: 12 additions & 0 deletions docs/utils.rst
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,18 @@ If you want to use the pager for a lot of text, especially if generating everyth
click.echo_via_pager(_generate_output())


For more complex programs, which can't easily use a simple generator, you
can get access to a writable file-like object for the pager, and write to
that instead:

.. click:example::
@click.command()
def less():
with click.get_pager_file() as pager:
for idx in range(50000):
print(idx, file=pager)


Screen Clearing
---------------

Expand Down
1 change: 1 addition & 0 deletions src/click/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from .termui import confirm as confirm
from .termui import echo_via_pager as echo_via_pager
from .termui import edit as edit
from .termui import get_pager_file as get_pager_file
from .termui import getchar as getchar
from .termui import launch as launch
from .termui import pause as pause
Expand Down
4 changes: 4 additions & 0 deletions src/click/_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,10 @@ def should_strip_ansi(
if color is None:
if stream is None:
stream = sys.stdin
elif hasattr(stream, "color"):
# ._termui_impl.MaybeStripAnsi handles stripping ansi itself,
# so we don't need to strip it here
return False
return not isatty(stream) and not _is_jupyter_kernel_output(stream)
return not color

Expand Down
86 changes: 58 additions & 28 deletions src/click/_termui_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import collections.abc as cabc
import contextlib
import io
import math
import os
import sys
Expand Down Expand Up @@ -363,7 +364,20 @@ def generator(self) -> cabc.Iterator[V]:
self.render_progress()


def _pager_contextmanager(color: bool | None = None):
class MaybeStripAnsi(io.TextIOWrapper):
def __init__(self, stream: t.IO[bytes], *, color: bool, **kwargs: t.Any):
super().__init__(stream, **kwargs)
self.color = color

def write(self, text: str) -> int:
if not self.color:
text = strip_ansi(text)
return super().write(text)


def _pager_contextmanager(
color: bool | None = None,
) -> t.ContextManager[t.Tuple[t.BinaryIO, str, bool]]:
"""Decide what method to use for paging through text."""
stdout = _default_text_stdout()

Expand Down Expand Up @@ -398,17 +412,26 @@ def _pager_contextmanager(color: bool | None = None):
os.unlink(filename)


def pager(generator: cabc.Iterable[str], color: bool | None = None):
"""Given an iterable of text, write it all to an output pager."""
with _pager_contextmanager(color=color) as (pager_file, encoding, color):
for text in generator:
if not color:
text = strip_ansi(text)
pager_file.write(text.encode(encoding, "replace"))
@contextlib.contextmanager
def get_pager_file(color: bool | None = None) -> t.Generator[t.IO, None, None]:
"""Context manager.
Yields a writable file-like object which can be used as an output pager.
.. versionadded:: 8.2
:param color: controls if the pager supports ANSI colors or not. The
default is autodetection.
"""
with _pager_contextmanager(color=color) as (stream, encoding, color):
if not getattr(stream, "encoding", None):
# wrap in a text stream
stream = MaybeStripAnsi(stream, color=color, encoding=encoding)
yield stream
stream.flush()


@contextlib.contextmanager
def _pipepager(cmd: str, color: bool | None = None):
def _pipepager(
cmd: str, color: bool | None
) -> t.Iterator[t.Tuple[t.BinaryIO, str, bool]]:
"""Page through text by feeding it to another program. Invoking a
pager through this might support colors.
"""
Expand All @@ -427,6 +450,9 @@ def _pipepager(cmd: str, color: bool | None = None):
elif "r" in less_flags or "R" in less_flags:
color = True

if color is None:
color = False

c = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, env=env)
stdin = t.cast(t.BinaryIO, c.stdin)
encoding = get_best_encoding(stdin)
Expand Down Expand Up @@ -469,7 +495,9 @@ def _pipepager(cmd: str, color: bool | None = None):


@contextlib.contextmanager
def _tempfilepager(cmd: str, color: bool | None = None):
def _tempfilepager(
cmd: str, color: bool | None = None
) -> t.Iterator[t.Tuple[t.BinaryIO, str, bool]]:
"""Page through text by invoking a program on a temporary file."""
import tempfile

Expand All @@ -481,10 +509,12 @@ def _tempfilepager(cmd: str, color: bool | None = None):


@contextlib.contextmanager
def _nullpager(stream: t.TextIO, color: bool | None = None):
def _nullpager(
stream: t.TextIO, color: bool | None = None
) -> t.Iterator[t.Tuple[t.BinaryIO, str, bool]]:
"""Simply print unformatted text. This is the ultimate fallback."""
encoding = get_best_encoding(stream)
return stream, encoding, color
yield stream, encoding, color


class Editor:
Expand Down Expand Up @@ -629,23 +659,23 @@ def _unquote_file(url: str) -> str:
wait_str = "-w" if wait else ""
args = f'cygstart {wait_str} "{url}"'
return os.system(args)

try:
if locate:
url = os.path.dirname(_unquote_file(url)) or "."
else:
url = _unquote_file(url)
c = subprocess.Popen(["xdg-open", url])
if wait:
return c.wait()
return 0
except OSError:
if url.startswith(("http://", "https://")) and not locate and not wait:
import webbrowser

webbrowser.open(url)
else:
try:
if locate:
url = os.path.dirname(_unquote_file(url)) or "."
else:
url = _unquote_file(url)
c = subprocess.Popen(["xdg-open", url])
if wait:
return c.wait()
return 0
return 1
except OSError:
if url.startswith(("http://", "https://")) and not locate and not wait:
import webbrowser

webbrowser.open(url)
return 0
return 1


def _translate_ch_to_exc(ch: str) -> None:
Expand Down
24 changes: 20 additions & 4 deletions src/click/termui.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,23 @@ def confirm(
return rv


def get_pager_file(color=None):
"""Context manager.
Yields a writable file-like object which can be used as an output pager.
.. versionadded:: 8.2
:param color: controls if the pager supports ANSI colors or not. The
default is autodetection.
"""
from ._termui_impl import get_pager_file

color = resolve_color_default(color)

return get_pager_file(color=color)


def echo_via_pager(
text_or_generator: cabc.Iterable[str] | t.Callable[[], cabc.Iterable[str]] | str,
color: bool | None = None,
Expand All @@ -267,7 +284,6 @@ def echo_via_pager(
:param color: controls if the pager supports ANSI colors or not. The
default is autodetection.
"""
color = resolve_color_default(color)

if inspect.isgeneratorfunction(text_or_generator):
i = t.cast("t.Callable[[], cabc.Iterable[str]]", text_or_generator)()
Expand All @@ -279,9 +295,9 @@ def echo_via_pager(
# convert every element of i to a text type if necessary
text_generator = (el if isinstance(el, str) else str(el) for el in i)

from ._termui_impl import pager

return pager(itertools.chain(text_generator, "\n"), color)
with get_pager_file(color=color) as pager:
for text in itertools.chain(text_generator, "\n"):
pager.write(text)


@t.overload
Expand Down

0 comments on commit b544c40

Please sign in to comment.