Skip to content

Commit f56d243

Browse files
committed
feat(scraper): support streaming
1 parent 6ac5027 commit f56d243

File tree

3 files changed

+82
-55
lines changed

3 files changed

+82
-55
lines changed

npiai/tools/web/scraper/__test__/bardeen.py

+7-3
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
import asyncio
2+
import json
3+
24
from npiai.tools.web.scraper import Scraper
35
from npiai.utils.test_utils import DebugContext
46

57

68
async def main():
79
async with Scraper(headless=False, batch_size=10) as scraper:
8-
await scraper.summarize(
10+
stream = scraper.summarize_stream(
911
ctx=DebugContext(),
1012
url="https://www.bardeen.ai/playbooks",
1113
ancestor_selector=".playbook_list",
1214
items_selector=".playbook_list .playbook_item-link",
13-
output_file=".cache/bardeen.csv",
15+
limit=42,
1416
output_columns=[
1517
{
1618
"name": "Apps Involved",
@@ -29,9 +31,11 @@ async def main():
2931
"description": "The URL of the playbook",
3032
},
3133
],
32-
limit=42,
3334
)
3435

36+
async for items in stream:
37+
print("Chunk:", json.dumps(items, indent=2))
38+
3539

3640
if __name__ == "__main__":
3741
asyncio.run(main())

npiai/tools/web/scraper/__test__/column_inference.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,18 @@ async def main():
2020

2121
print("Inferred columns:", json.dumps(columns, indent=2))
2222

23-
await scraper.summarize(
23+
stream = scraper.summarize_stream(
2424
ctx=DebugContext(),
2525
url=url,
2626
ancestor_selector=ancestor_selector,
2727
items_selector=items_selector,
2828
output_columns=columns,
29-
output_file=".cache/bardeen.csv",
3029
limit=10,
3130
)
3231

32+
async for items in stream:
33+
print("Chunk:", json.dumps(items, indent=2))
34+
3335

3436
if __name__ == "__main__":
3537
asyncio.run(main())

npiai/tools/web/scraper/app.py

+71-50
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,65 @@ def from_context(cls, ctx: Context) -> "Scraper":
8181
)
8282
return cls()
8383

84+
async def summarize_stream(
85+
self,
86+
ctx: Context,
87+
url: str,
88+
output_columns: List[Column],
89+
ancestor_selector: str | None = None,
90+
items_selector: str | None = None,
91+
pagination_button_selector: str | None = None,
92+
limit: int = -1,
93+
):
94+
if limit == 0:
95+
return
96+
97+
await self.playwright.page.goto(url)
98+
99+
if not ancestor_selector:
100+
ancestor_selector = "body"
101+
102+
count = 0
103+
104+
while True:
105+
remaining = min(self._batch_size, limit - count) if limit != -1 else -1
106+
107+
md = await self._get_md(
108+
ctx=ctx,
109+
ancestor_selector=ancestor_selector,
110+
items_selector=items_selector,
111+
limit=remaining,
112+
)
113+
114+
if not md:
115+
break
116+
117+
items = await self._llm_summarize(ctx, md, output_columns)
118+
119+
await ctx.send_debug_message(f"[{self.name}] Summarized {len(items)} items")
120+
121+
if not items:
122+
break
123+
124+
items_slice = items[:remaining] if limit != -1 else items
125+
count += len(items_slice)
126+
127+
yield items_slice
128+
129+
await ctx.send_debug_message(
130+
f"[{self.name}] Summarized {count} items in total"
131+
)
132+
133+
if limit != -1 and count >= limit:
134+
break
135+
136+
await self._load_more(
137+
ctx,
138+
ancestor_selector,
139+
items_selector,
140+
pagination_button_selector,
141+
)
142+
84143
@function
85144
async def summarize(
86145
self,
@@ -106,17 +165,6 @@ async def summarize(
106165
output_file: The file path to save the output. If None, the output is saved to 'scraper_output.json'.
107166
limit: The maximum number of items to summarize. If -1, all items are summarized.
108167
"""
109-
if limit == 0:
110-
return "No items to summarize"
111-
112-
await self.playwright.page.goto(url)
113-
114-
if not ancestor_selector:
115-
ancestor_selector = "body"
116-
117-
if not output_file:
118-
output_file = "scraper_output.csv"
119-
120168
os.makedirs(os.path.dirname(output_file), exist_ok=True)
121169

122170
with open(output_file, "w", newline="") as f:
@@ -127,48 +175,21 @@ async def summarize(
127175

128176
count = 0
129177

130-
while True:
131-
remaining = min(self._batch_size, limit - count) if limit != -1 else -1
132-
133-
md = await self._get_md(
134-
ctx=ctx,
135-
ancestor_selector=ancestor_selector,
136-
items_selector=items_selector,
137-
limit=remaining,
138-
)
139-
140-
if not md:
141-
break
142-
143-
items = await self._llm_summarize(ctx, md, output_columns)
144-
145-
await ctx.send_debug_message(
146-
f"[{self.name}] Summarized {len(items)} items"
147-
)
148-
149-
if not items:
150-
break
178+
stream = self.summarize_stream(
179+
ctx,
180+
url,
181+
output_columns,
182+
ancestor_selector,
183+
items_selector,
184+
pagination_button_selector,
185+
limit,
186+
)
151187

152-
items_slice = items[:remaining] if limit != -1 else items
153-
writer.writerows(items_slice)
188+
async for items in stream:
189+
writer.writerows(items)
190+
count += len(items)
154191
f.flush()
155192

156-
count += len(items_slice)
157-
158-
await ctx.send_debug_message(
159-
f"[{self.name}] Summarized {count} items in total"
160-
)
161-
162-
if limit != -1 and count >= limit:
163-
break
164-
165-
await self._load_more(
166-
ctx,
167-
ancestor_selector,
168-
items_selector,
169-
pagination_button_selector,
170-
)
171-
172193
return f"Saved {count} items to {output_file}"
173194

174195
@function

0 commit comments

Comments
 (0)