Skip to content

Commit

Permalink
feat(scraper): support streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
idiotWu committed Oct 16, 2024
1 parent 6ac5027 commit e92757d
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 54 deletions.
8 changes: 6 additions & 2 deletions npiai/tools/web/scraper/__test__/bardeen.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import asyncio
import json

from npiai.tools.web.scraper import Scraper
from npiai.utils.test_utils import DebugContext


async def main():
async with Scraper(headless=False, batch_size=10) as scraper:
await scraper.summarize(
stream = scraper.summarize_stream(
ctx=DebugContext(),
url="https://www.bardeen.ai/playbooks",
ancestor_selector=".playbook_list",
items_selector=".playbook_list .playbook_item-link",
output_file=".cache/bardeen.csv",
output_columns=[
{
"name": "Apps Involved",
Expand All @@ -32,6 +33,9 @@ async def main():
limit=42,
)

async for items in stream:
print("Chunk:", json.dumps(items, indent=2))


if __name__ == "__main__":
asyncio.run(main())
6 changes: 4 additions & 2 deletions npiai/tools/web/scraper/__test__/column_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,18 @@ async def main():

print("Inferred columns:", json.dumps(columns, indent=2))

await scraper.summarize(
stream = scraper.summarize_stream(
ctx=DebugContext(),
url=url,
ancestor_selector=ancestor_selector,
items_selector=items_selector,
output_columns=columns,
output_file=".cache/bardeen.csv",
limit=10,
)

async for items in stream:
print("Chunk:", json.dumps(items, indent=2))


if __name__ == "__main__":
asyncio.run(main())
121 changes: 71 additions & 50 deletions npiai/tools/web/scraper/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,65 @@ def from_context(cls, ctx: Context) -> "Scraper":
)
return cls()

async def summarize_stream(
self,
ctx: Context,
url: str,
output_columns: List[Column],
ancestor_selector: str | None = None,
items_selector: str | None = None,
pagination_button_selector: str | None = None,
limit: int = -1,
):
if limit == 0:
return

await self.playwright.page.goto(url)

if not ancestor_selector:
ancestor_selector = "body"

count = 0

while True:
remaining = min(self._batch_size, limit - count) if limit != -1 else -1

md = await self._get_md(
ctx=ctx,
ancestor_selector=ancestor_selector,
items_selector=items_selector,
limit=remaining,
)

if not md:
break

items = await self._llm_summarize(ctx, md, output_columns)

await ctx.send_debug_message(f"[{self.name}] Summarized {len(items)} items")

if not items:
break

items_slice = items[:remaining] if limit != -1 else items
count += len(items_slice)

yield items_slice

await ctx.send_debug_message(
f"[{self.name}] Summarized {count} items in total"
)

if limit != -1 and count >= limit:
break

await self._load_more(
ctx,
ancestor_selector,
items_selector,
pagination_button_selector,
)

@function
async def summarize(
self,
Expand All @@ -106,17 +165,6 @@ async def summarize(
output_file: The file path to save the output. If None, the output is saved to 'scraper_output.json'.
limit: The maximum number of items to summarize. If -1, all items are summarized.
"""
if limit == 0:
return "No items to summarize"

await self.playwright.page.goto(url)

if not ancestor_selector:
ancestor_selector = "body"

if not output_file:
output_file = "scraper_output.csv"

os.makedirs(os.path.dirname(output_file), exist_ok=True)

with open(output_file, "w", newline="") as f:
Expand All @@ -127,48 +175,21 @@ async def summarize(

count = 0

while True:
remaining = min(self._batch_size, limit - count) if limit != -1 else -1

md = await self._get_md(
ctx=ctx,
ancestor_selector=ancestor_selector,
items_selector=items_selector,
limit=remaining,
)

if not md:
break

items = await self._llm_summarize(ctx, md, output_columns)

await ctx.send_debug_message(
f"[{self.name}] Summarized {len(items)} items"
)

if not items:
break
stream = self.summarize_stream(
ctx,
url,
output_columns,
ancestor_selector,
items_selector,
pagination_button_selector,
limit,
)

items_slice = items[:remaining] if limit != -1 else items
writer.writerows(items_slice)
async for items in stream:
writer.writerows(items)
count += len(items)
f.flush()

count += len(items_slice)

await ctx.send_debug_message(
f"[{self.name}] Summarized {count} items in total"
)

if limit != -1 and count >= limit:
break

await self._load_more(
ctx,
ancestor_selector,
items_selector,
pagination_button_selector,
)

return f"Saved {count} items to {output_file}"

@function
Expand Down

0 comments on commit e92757d

Please sign in to comment.