From e92757dc92ed8c5a7d3d51de10a61973eebf0a29 Mon Sep 17 00:00:00 2001 From: Daofeng Wu Date: Wed, 16 Oct 2024 14:12:31 +0900 Subject: [PATCH] feat(scraper): support streaming --- npiai/tools/web/scraper/__test__/bardeen.py | 8 +- .../web/scraper/__test__/column_inference.py | 6 +- npiai/tools/web/scraper/app.py | 121 ++++++++++-------- 3 files changed, 81 insertions(+), 54 deletions(-) diff --git a/npiai/tools/web/scraper/__test__/bardeen.py b/npiai/tools/web/scraper/__test__/bardeen.py index 2bfdd879..1602786a 100644 --- a/npiai/tools/web/scraper/__test__/bardeen.py +++ b/npiai/tools/web/scraper/__test__/bardeen.py @@ -1,16 +1,17 @@ import asyncio +import json + from npiai.tools.web.scraper import Scraper from npiai.utils.test_utils import DebugContext async def main(): async with Scraper(headless=False, batch_size=10) as scraper: - await scraper.summarize( + stream = scraper.summarize_stream( ctx=DebugContext(), url="https://www.bardeen.ai/playbooks", ancestor_selector=".playbook_list", items_selector=".playbook_list .playbook_item-link", - output_file=".cache/bardeen.csv", output_columns=[ { "name": "Apps Involved", @@ -32,6 +33,9 @@ async def main(): limit=42, ) + async for items in stream: + print("Chunk:", json.dumps(items, indent=2)) + if __name__ == "__main__": asyncio.run(main()) diff --git a/npiai/tools/web/scraper/__test__/column_inference.py b/npiai/tools/web/scraper/__test__/column_inference.py index 161a566f..11e9626d 100644 --- a/npiai/tools/web/scraper/__test__/column_inference.py +++ b/npiai/tools/web/scraper/__test__/column_inference.py @@ -20,16 +20,18 @@ async def main(): print("Inferred columns:", json.dumps(columns, indent=2)) - await scraper.summarize( + stream = scraper.summarize_stream( ctx=DebugContext(), url=url, ancestor_selector=ancestor_selector, items_selector=items_selector, output_columns=columns, - output_file=".cache/bardeen.csv", limit=10, ) + async for items in stream: + print("Chunk:", json.dumps(items, indent=2)) + if __name__ == "__main__": asyncio.run(main()) diff --git a/npiai/tools/web/scraper/app.py b/npiai/tools/web/scraper/app.py index f180254c..f956ca38 100644 --- a/npiai/tools/web/scraper/app.py +++ b/npiai/tools/web/scraper/app.py @@ -81,6 +81,65 @@ def from_context(cls, ctx: Context) -> "Scraper": ) return cls() + async def summarize_stream( + self, + ctx: Context, + url: str, + output_columns: List[Column], + ancestor_selector: str | None = None, + items_selector: str | None = None, + pagination_button_selector: str | None = None, + limit: int = -1, + ): + if limit == 0: + return + + await self.playwright.page.goto(url) + + if not ancestor_selector: + ancestor_selector = "body" + + count = 0 + + while True: + remaining = min(self._batch_size, limit - count) if limit != -1 else -1 + + md = await self._get_md( + ctx=ctx, + ancestor_selector=ancestor_selector, + items_selector=items_selector, + limit=remaining, + ) + + if not md: + break + + items = await self._llm_summarize(ctx, md, output_columns) + + await ctx.send_debug_message(f"[{self.name}] Summarized {len(items)} items") + + if not items: + break + + items_slice = items[:remaining] if limit != -1 else items + count += len(items_slice) + + yield items_slice + + await ctx.send_debug_message( + f"[{self.name}] Summarized {count} items in total" + ) + + if limit != -1 and count >= limit: + break + + await self._load_more( + ctx, + ancestor_selector, + items_selector, + pagination_button_selector, + ) + @function async def summarize( self, @@ -106,17 +165,6 @@ async def summarize( output_file: The file path to save the output. If None, the output is saved to 'scraper_output.json'. limit: The maximum number of items to summarize. If -1, all items are summarized. """ - if limit == 0: - return "No items to summarize" - - await self.playwright.page.goto(url) - - if not ancestor_selector: - ancestor_selector = "body" - - if not output_file: - output_file = "scraper_output.csv" - os.makedirs(os.path.dirname(output_file), exist_ok=True) with open(output_file, "w", newline="") as f: @@ -127,48 +175,21 @@ async def summarize( count = 0 - while True: - remaining = min(self._batch_size, limit - count) if limit != -1 else -1 - - md = await self._get_md( - ctx=ctx, - ancestor_selector=ancestor_selector, - items_selector=items_selector, - limit=remaining, - ) - - if not md: - break - - items = await self._llm_summarize(ctx, md, output_columns) - - await ctx.send_debug_message( - f"[{self.name}] Summarized {len(items)} items" - ) - - if not items: - break + stream = self.summarize_stream( + ctx, + url, + output_columns, + ancestor_selector, + items_selector, + pagination_button_selector, + limit, + ) - items_slice = items[:remaining] if limit != -1 else items - writer.writerows(items_slice) + async for items in stream: + writer.writerows(items) + count += len(items) f.flush() - count += len(items_slice) - - await ctx.send_debug_message( - f"[{self.name}] Summarized {count} items in total" - ) - - if limit != -1 and count >= limit: - break - - await self._load_more( - ctx, - ancestor_selector, - items_selector, - pagination_button_selector, - ) - return f"Saved {count} items to {output_file}" @function