Skip to content

Commit

Permalink
Lazy file writers for crawler data outputs
Browse files Browse the repository at this point in the history
Fix #951
  • Loading branch information
Yomguithereal committed Sep 5, 2024
1 parent 665d3e8 commit 29f17e4
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 11 deletions.
2 changes: 1 addition & 1 deletion ftest/crawlers/echojs.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def process(self, job: CrawlJob, response: Response) -> SpiderResult:
next_links = response.soup().scrape("#newslist article > h2 > a[href]", "href")
next_targets = [CrawlTarget(url=link) for link in next_links]

return job.group, next_targets
return None, next_targets


class ArticleSpider(Spider):
Expand Down
40 changes: 30 additions & 10 deletions minet/cli/crawl/crawl.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,31 @@ def __init__(
self.format = format
self.dir = base_dir

if self.format not in ["csv", "jsonl", "ndjson"]:
raise NotImplementedError('unknown format "%s"' % self.format)

if self.singular:
self.__add_file(None, "data", crawler.get_spider())
self.__register_file(None, "data", crawler.get_spider())
else:
for name, spider in crawler.spiders():
self.__add_file(name, join("data", name), spider)
self.__register_file(name, join("data", name), spider)

def __add_file(self, name: Optional[str], path: str, spider):
def __register_file(self, name: Optional[str], path: str, spider):
path += "." + self.format

path = join(self.dir, path)
directory = dirname(path)
os.makedirs(directory, exist_ok=True)

# NOTE: file will only be written on first write
self.handles[name] = {
"file": None,
"writer": None,
"spider": spider,
"path": path,
}

def __open(self, path: str):
f = (
casanova.BasicResumer(path, encoding="utf-8")
if self.resume
Expand All @@ -92,9 +104,9 @@ def __add_file(self, name: Optional[str], path: str, spider):
elif self.format == "jsonl" or self.format == "ndjson":
w = ndjson.writer(f)
else:
raise NotImplementedError('unknown format "%s"' % self.format)
raise NotImplementedError

self.handles[name] = {"file": f, "writer": w, "spider": spider}
return f, w

def __unpack_result(self, result: SuccessfulCrawlResult, data):
job_id = result.job.id
Expand All @@ -107,22 +119,30 @@ def __unpack_result(self, result: SuccessfulCrawlResult, data):
def write(self, result: SuccessfulCrawlResult) -> None:
handle = self.handles[result.spider]
spider = handle["spider"]
writer = handle["writer"]
f = handle["file"]
w = handle["writer"]

for item in spider.tabulate(result):
writer.writerow(*self.__unpack_result(result, item))
if f is None:
f, w = self.__open(handle["path"])
handle["file"] = f
handle["writer"] = w

w.writerow(*self.__unpack_result(result, item))

# NOTE: write is flushing to ensure atomicity as well as possible
f.flush()
if f is not None:
f.flush()

def flush(self) -> None:
for h in self.handles.values():
h["file"].flush()
if h["file"] is not None:
h["file"].flush()

def close(self) -> None:
for h in self.handles.values():
h["file"].close()
if h["file"] is not None:
h["file"].close()


@with_defer()
Expand Down

0 comments on commit 29f17e4

Please sign in to comment.