Text Extraction Improvements #2038
Replies: 6 comments 15 replies
-
What about just providing some indications in the documentation pages to use the replace function. I'm not sure that subsitution will be ok for all cases.
I have not found some cases where to meet such a case : can you provide.
Help would be welcomed to find a reliable rule about it.
|
Beta Was this translation helpful? Give feedback.
-
@MartinThoma |
Beta Was this translation helpful? Give feedback.
-
Sounds good but also we have to keep in mind to remove the duplicates we have now to have a better visilibity on the work to to be done. for 5) check my latest PR for 3) and 4) it may be already quite tough 🤔 |
Beta Was this translation helpful? Give feedback.
-
This isn't anywhere close to being universal enough for implementation, but thought I'd share an approach that's working for me in a narrowed use case in case it spurs some ideas. PDFs generated by Epic (the biggest EMR in the US) have an oddball internal structure. The OOTB pypdf extract_text() function was returning the text more or less "raw": elements were distributed vertically according to the order in which the Text Show operators appeared with virtually no spacing between horizontally distributed elements. Fidelity to the rendered layout (especially for tabular data) was critical to my use case, so I created the routines below to address the issue. Details of First ImplementationCaveats:
In any case, the basic idea is to collect the full set of text render operations with corresponding "effective transform" matrices before "putting pen to paper" as it were. Data is collected in a "per BT" manner and the effective transform data is used to sort and horizontally distribute the collected text once I have all the facts. A simple sort by All of this may be useless given the narrow scope, but some of the concepts may be helpful. E.g.:
Let me know if this seems a worthy pursuit. I'd be happy to create a branch and extend the existing pypdf text extraction logic with this new """new pdf text extraction algorithm
Usage:
import io
from pathlib import Path
from pypdf import PdfReader
fname = "FB01219A86F94518818875AB0828B31D_pg1.PDF"
byt = Path(fname).read_bytes()
tpdf = PdfReader(io.BytesIO(byt), False)
Path(f"{fname}.txt").write_text("\n".join(extract_structured_text(pg) for pg in tpdf.pages))
"""
# pylint: disable=invalid-name
import json
import math
from collections import ChainMap, Counter
from collections.abc import Iterator
from copy import copy
from itertools import groupby, pairwise
from pathlib import Path
from typing import Any, NamedTuple, cast
from pypdf import PageObject, _cmap
from pypdf import _text_extraction as tex
from pypdf.constants import PageAttributes as PG
from pypdf.generic import ContentStream, DictionaryObject, NameObject
class _Font(NamedTuple):
space_width: int | float
encoding: str | dict[int, str]
char_map: dict
class _XfrmStack:
"""cm/tm transformation matrix manager"""
def __init__(self) -> None:
self.xfrm_stack = ChainMap(self.new_xform())
self.q_queue: Counter[int] = Counter()
self.q_depth = [0]
@staticmethod
def raw_xform(_a=1.0, _b=0.0, _c=0.0, _d=1.0, _e=0.0, _f=0.0):
"""only a/b/c/d/e/f matrix params"""
return dict(zip(range(6), map(float, (_a, _b, _c, _d, _e, _f))))
@staticmethod
def new_xform(_a=1.0, _b=0.0, _c=0.0, _d=1.0, _e=0.0, _f=0.0, /, is_text=False):
"""a/b/c/d/e/f matrix params + 'is_text' key"""
return _XfrmStack.raw_xform(_a, _b, _c, _d, _e, _f) | {"is_text": is_text}
def reset_tm(self) -> ChainMap[int | str, float | bool]:
"""clear all xforms from chainmap having is_text==True"""
while self.xfrm_stack.maps[0]["is_text"]:
self.xfrm_stack = self.xfrm_stack.parents
return self.xfrm_stack
def remove_q(self):
"""rewind to stack prior state after closing a 'q' with internal 'cm' ops"""
self.xfrm_stack = self.reset_tm()
self.xfrm_stack.maps = self.xfrm_stack.maps[self.q_queue.pop(self.q_depth.pop(), 0) :]
return self.xfrm_stack
def add_q(self):
"""add another level to q_queue"""
self.q_depth.append(len(self.q_depth))
def add_cm(self, *args):
"""concatenate an additional transform matrix"""
self.xfrm_stack = self.reset_tm()
self.q_queue.update(self.q_depth[-1:])
self.xfrm_stack = self.xfrm_stack.new_child(self.new_xform(*args))
return self.xfrm_stack
def add_tm(self, operands: list[float | int]):
"""append a text transform matrix"""
if len(operands) == 2:
operands = [1.0, 0.0, 0.0, 1.0, *operands]
self.xfrm_stack = self.xfrm_stack.new_child(
self.new_xform(*operands, is_text=True) # type: ignore # mypy issue??
)
return self.xfrm_stack
@property
def effective_xform(self) -> list[float]:
"""the current effective transform account for both cm and text xforms"""
eff_xform = [*self.xfrm_stack.maps[0].values()]
for xform in self.xfrm_stack.maps[1:]:
eff_xform = tex.mult(eff_xform, xform) # type: ignore
return eff_xform
@property
def scale_factors(self) -> tuple[float, float]:
"""x/y scale factors"""
_a, _b, _c, _d, *_ = self.effective_xform
return math.sqrt(_a**2 + _c**2), math.sqrt(_b**2 + _d**2)
@property
def xmaps(self):
"""internal ChainMap 'maps' property"""
return self.xfrm_stack.maps
def _decode_tj(font: _Font, _b: bytes, xform_stack: _XfrmStack, debug=False) -> dict:
try:
if isinstance(font.encoding, str):
_text = _b.decode(font.encoding, "surrogatepass")
else:
_text = "".join(
font.encoding[x] if x in font.encoding else bytes((x,)).decode() for x in _b
)
except (UnicodeEncodeError, UnicodeDecodeError):
_text = _b.decode("utf-8", "replace")
_text = "".join(font.char_map[x] if x in font.char_map else x for x in _text)
return {
"text": _text,
"effective_xform": xform_stack.effective_xform,
"xform_stack": copy(xform_stack.xmaps) if debug else None,
}
def _recurs_to_target_op(
ops: Iterator[tuple[list, bytes]],
xform_stack: _XfrmStack,
end_target: bytes,
fonts: dict[str, _Font],
debug=False,
) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
"""recurse operators between BT/ET and/or q/Q operators managing the xform
stack and capturing text positioning and rendering data.
Args:
ops (Iterator[tuple[list, bytes]]): iterator of operators in content stream
xform_stack (_XfrmStack): stack of cm/tm transformations to be applied
end_target (bytes): Allowed values are b"Q" and b"BT"
fonts (dict[str, _Font]): font dictionary
debug (bool, optional): Captures all text operator data. Defaults to False.
NOTE: performance penalty when debug=True.
Raises:
ValueError: if multiple fonts appear in a single BT
Returns:
tuple[list[dict[str, Any]], list[dict[str, Any]]]: list of dicts of text
rendered by each BT operator + list of dicts of text rendered by individual
Tj/TJ operators.
"""
# 1 dict entry per BT operator. keys: tx, ty, space_width, font_height, font_width, text
bt_groups: list[dict[str, Any]] = []
# 1 dict entry per Tj operator. keys: text, effective_xform
# extra key "xform_stack" added when debug=True
tj_ops: list[dict[str, Any]] = []
bt_grp: dict[str, Any] = {} # current bt_groups dict entry
if end_target == b"Q":
# add new q level. cm's added at this level will be popped at next b'Q'
xform_stack.add_q()
while True:
try:
opands, op = next(ops)
except StopIteration:
return bt_groups, tj_ops
if op == end_target:
if op == b"Q":
xform_stack.remove_q()
if op == b"ET":
if "tx" not in bt_grp: # no Td or Tm operators in this BT, use base tx/ty
*_, tx, ty = xform_stack.effective_xform
bt_grp["tx"] = tx
bt_grp["ty"] = ty
_text = ""
last_tx = bt_grp["tx"]
last_len = 0
font_width = bt_grp["font_width"]
for _tj in tj_ops: # ... build text from new Tj operators
tx = _tj["effective_xform"][4]
distance = tx - last_tx
excess = distance - (last_len * font_width)
new_text = f'{" " * int(excess // font_width)}{_tj["text"]}'
_text = f"{_text}{new_text}"
last_tx = tx
last_len = len(new_text)
bt_grp["text"] = _text
bt_groups.append(bt_grp)
xform_stack.reset_tm()
return bt_groups, tj_ops
if op == b"q":
bts, tjs = _recurs_to_target_op(ops, xform_stack, b"Q", fonts, debug)
bt_groups.extend(bts)
tj_ops.extend(tjs)
if op == b"cm":
xform_stack.add_cm(*opands)
if op == b"BT":
bts, tjs = _recurs_to_target_op(ops, xform_stack, b"ET", fonts, debug)
bt_groups.extend(bts)
tj_ops.extend(tjs)
if op == b"Tj":
tj_ops.append(_decode_tj(fonts[bt_grp["font_name"]], opands[0], xform_stack, debug))
if op == b"TJ":
_ft = fonts[bt_grp["font_name"]]
for tj_op in opands[0]:
if isinstance(tj_op, bytes):
tj_ops.append(_decode_tj(_ft, tj_op, xform_stack, debug))
elif str(tj_op).isnumeric():
xform_stack.add_tm([tj_op, 0])
if op in (b"Td", b"Tm"):
if op == b"Tm":
xform_stack.reset_tm()
xform_stack.add_tm(opands)
*_, tx, ty = xform_stack.effective_xform
bt_grp["tx"] = bt_grp.get("tx", tx)
bt_grp["ty"] = bt_grp.get("ty", ty)
if op == b"Tf":
if "space_width" in bt_grp:
raise ValueError("multiple fonts")
xform_stack.reset_tm()
x_scale, y_scale = xform_stack.scale_factors
bt_grp["space_width"] = fonts[opands[0]].space_width * x_scale
bt_grp["font_height"] = opands[1] * y_scale
bt_grp["font_width"] = opands[1] * x_scale
bt_grp["font_name"] = opands[0]
def extract_structured_text(pg: PageObject, space_expansion_factor=0.1, debug_file=None) -> str:
"""Get text from pypdf page preserving fidelity to rendered position
Args:
pg (PageObject): a pypdf PdfReader page
space_expansion_factor (float, optional): higher values result in more spacing
between text rendered in different BT operators. Defaults to 2.0.
debug_file (str, optional): full path + filename prefix for debug output.
Defaults to None. NOTE: significantly higher memory and processor usage.
Returns:
str: multiline string containing page text structured as it appeared in the
source pdf.
"""
# Font retrieval logic adapted from pypdf.PageObject._extract_text()
objr = pg
while NameObject(PG.RESOURCES) not in objr:
objr = objr["/Parent"].get_object() # type: ignore
resources_dict = cast(DictionaryObject, objr[PG.RESOURCES])
fonts: dict[str, _Font] = {}
if "/Font" in resources_dict:
for font_name in resources_dict["/Font"]: # type: ignore
cmap = _cmap.build_char_map(font_name, 200.0, pg)
fonts[font_name] = _Font(*cmap[1:-1])
if debug_file:
Path(f"{debug_file}.fonts.json").write_text(
json.dumps(fonts, indent=2, default=lambda x: getattr(x, "_asdict", str)(x)), "utf-8"
)
x_stack = _XfrmStack() # transformation stack manager
ops = iter(ContentStream(pg["/Contents"].get_object(), pg.pdf, "bytes").operations)
bt_groups: list[dict] = [] # BT operator dict
# keys: tx, ty, space_width, font_height, text
tj_debug: list[dict] = [] # Tj/TJ operator data (debug only)
try:
debug = bool(debug_file)
while True:
_, op = next(ops)
if op in (b"BT", b"q"):
end_op = b"ET" if op == b"BT" else b"Q"
bts, tjs = _recurs_to_target_op(ops, x_stack, end_op, fonts, debug)
bt_groups.extend(bts)
if debug:
tj_debug.extend(tjs)
except StopIteration:
pass
# left align the data, i.e. decrement all tx values by min(tx)
min_x = min(x["tx"] for x in bt_groups)
meets_len = any(len(x["text"]) > 10 for x in bt_groups)
bt_groups = [
ogrp | {"tx": ogrp["tx"] - min_x} | ({"rawtx": ogrp["tx"]} if debug_file else {})
for ogrp in sorted(bt_groups, key=lambda x: (x["ty"], -x["tx"]), reverse=True)
]
if debug_file:
Path(f"{debug_file}.bt.json").write_text(
json.dumps(bt_groups, indent=2, default=str), "utf-8"
)
Path(f"{debug_file}.tj.json").write_text(
json.dumps(tj_debug, indent=2, default=str), "utf-8"
)
# group the text operations by rendered y coordinate, i.e. the line number
ty_groups = {
ty: sorted(grp, key=lambda x: x["tx"])
for ty, grp in groupby(bt_groups, key=lambda bt_grp: int(bt_grp["ty"]))
}
# combine groups whose y coordinates differ by less than the effective font height
# (accounts for mixed fonts and other minor oddities)
last_ty = list(ty_groups)[0]
for ty in list(ty_groups)[1:]:
text_groups = ty_groups[ty]
this_fsz = text_groups[0]["font_height"]
fsz = min((ty_groups[last_ty][0]["font_height"], this_fsz))
txs = set(int(_t["tx"]) for _t in text_groups if _t["text"].strip())
last_txs = set(int(_t["tx"]) for _t in ty_groups[last_ty] if _t["text"].strip())
# prevent merge if both groups are rendering in the same x position.
no_text_overlap = not any(chk in last_txs for chk in txs)
offset_less_than_font_height = abs(ty - last_ty) < fsz
if no_text_overlap and offset_less_than_font_height:
ty_groups[last_ty] = sorted(
ty_groups.pop(ty) + ty_groups[last_ty], key=lambda x: x["tx"]
)
else:
last_ty = ty
if debug_file:
Path(f"{debug_file}.bt_line_groups.json").write_text(
json.dumps(ty_groups, indent=2, default=str), "utf-8"
)
sp_width = min(
(
(p2["tx"] - p1["tx"]) / (len(p1["text"]) + space_expansion_factor)
for tdictlist in ty_groups.values()
for p1, p2 in pairwise(tdictlist)
if p1["text"].strip()
and (len(p1["text"].strip()) > 10 or not meets_len)
and int(p2["tx"] - p1["tx"]) > 0
),
default=0.1,
)
lines: list[str] = []
for line_data in ty_groups.values():
line = ""
for bt_op in line_data:
offset = int(bt_op["tx"] // sp_width)
spaces = offset - len(line)
line = f"{line}{' ' * spaces}{bt_op['text']}"
lines.append(line)
return "\n".join(ln.rstrip() for ln in lines if ln.strip()) Example Epic PDF page stripped of PHI: NOTE: pypdf splitting operations also perform poorly on documents generated by the Epic engine. Epic stores ALL image data in a single shared Resources dictionary, and pypdf does NOT provide support for the removal of unrendered images in such a resource. I've hacked together another routine that replaces all unrendered images with an empty bytes object yielding an enormous reduction in required storage for large, multi-split pdfs, but if you examine the internal structure of this sample, you'll notice that all of the /img# named image references (along with their corresponding indirect object references) are still present with empty streams. Should I create a separate discussion thread for this??EditI tried out a few non-Epic PDFs to see what'd happen and found a few bugs. Also made some small performance and text encoding improvements. That's it for now. My codebase is working for me, so stopping there until it's not... ;) |
Beta Was this translation helpful? Give feedback.
-
I have what I believe to be a fully functional "layout mode" implementation based on the concepts above with none of the caveats. As implemented, structural fidelity is on par with what I consider to be SOTA in open source tools, namely pip install pypdf
pip install -i https://test.pypi.org/simple/ pypdftotext Once installed, usage in python is as follows: from pathlib import Path
import pypdftotext
pdf = Path("some_pdf.pdf") # can be bytes, Path, PdfReader, or io.BytesIO; used Path for convenience
pdf_text = pypdftotext.pdf_text(pdf)
print(pdf_text) Top level functions Performance ComparisonsExample 1Source file:Claim Maker Alerts Guide_pg2.PDF pypdf PageObject.extract_text() output:
pypdftotext output:
Example 2Source file:pypdf PageObject.extract_text() output:
pypdftotext output:
If there's any interest in trying to pull this into pypdf itself, I'd be happy to work toward that goal. As implemented, pypdftotext requires python 3.10+, but I don't think it'd be that difficult to adapt for earlier python3 versions. The biggest obstacles are likely to be 3.10+ typing features and usage of the walrus ( |
Beta Was this translation helpful? Give feedback.
-
This code was merged on pypdf ? |
Beta Was this translation helpful? Give feedback.
-
In the context of the LaTeX / math mode improvements which @pubpub-zz recently did, I had a look at what we could do to further improve text extraction: #2016 (comment)
I mainly don't want to loose this list. But maybe somebody has some ideas how to implement them :-)
The ligature replacement seems to be rather straight-forward.
Beta Was this translation helpful? Give feedback.
All reactions