Skip to content

Commit 7b2a44a

Browse files
authored
chore: Update add_batch_of_requests for MemoryRequestQueueClient and FileSystemRequestQueueClient (#1388)
### Description - Updates the deduplication logic used in `add_batch_of_requests` to improve overall Queues performance. ### Issues - Closes: #1382
1 parent f8a7ff4 commit 7b2a44a

File tree

2 files changed

+73
-81
lines changed

2 files changed

+73
-81
lines changed

src/crawlee/storage_clients/_file_system/_request_queue_client.py

Lines changed: 56 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -312,28 +312,43 @@ async def add_batch_of_requests(
312312
unprocessed_requests = list[UnprocessedRequest]()
313313
state = self._state.current_value
314314

315-
# Prepare a dictionary to track existing requests by their unique keys.
316-
existing_unique_keys: dict[str, Path] = {}
317-
existing_request_files = await self._get_request_files(self.path_to_rq)
315+
all_requests = state.forefront_requests | state.regular_requests
318316

319-
for request_file in existing_request_files:
320-
existing_request = await self._parse_request_file(request_file)
321-
if existing_request is not None:
322-
existing_unique_keys[existing_request.unique_key] = request_file
317+
requests_to_enqueue = {}
323318

324-
# Process each request in the batch.
319+
# Determine which requests can be added or are modified.
325320
for request in requests:
326-
existing_request_file = existing_unique_keys.get(request.unique_key)
327-
existing_request = None
328-
329-
# Only load the full request from disk if we found a duplicate
330-
if existing_request_file is not None:
331-
existing_request = await self._parse_request_file(existing_request_file)
321+
# Check if the request has already been handled.
322+
if request.unique_key in state.handled_requests:
323+
processed_requests.append(
324+
ProcessedRequest(
325+
unique_key=request.unique_key,
326+
was_already_present=True,
327+
was_already_handled=True,
328+
)
329+
)
330+
# Check if the request is already in progress.
331+
# Or if the request is already in the queue and the `forefront` flag is not used, we do not change the
332+
# position of the request.
333+
elif (request.unique_key in state.in_progress_requests) or (
334+
request.unique_key in all_requests and not forefront
335+
):
336+
processed_requests.append(
337+
ProcessedRequest(
338+
unique_key=request.unique_key,
339+
was_already_present=True,
340+
was_already_handled=False,
341+
)
342+
)
343+
# These requests must either be added or update their position.
344+
else:
345+
requests_to_enqueue[request.unique_key] = request
332346

333-
# If there is no existing request with the same unique key, add the new request.
334-
if existing_request is None:
347+
# Process each request in the batch.
348+
for request in requests_to_enqueue.values():
349+
# If the request is not already in the RQ, this is a new request.
350+
if request.unique_key not in all_requests:
335351
request_path = self._get_request_path(request.unique_key)
336-
337352
# Add sequence number to ensure FIFO ordering using state.
338353
if forefront:
339354
sequence_number = state.forefront_sequence_counter
@@ -352,9 +367,6 @@ async def add_batch_of_requests(
352367
new_total_request_count += 1
353368
new_pending_request_count += 1
354369

355-
# Add to our index for subsequent requests in this batch
356-
existing_unique_keys[request.unique_key] = self._get_request_path(request.unique_key)
357-
358370
processed_requests.append(
359371
ProcessedRequest(
360372
unique_key=request.unique_key,
@@ -363,57 +375,33 @@ async def add_batch_of_requests(
363375
)
364376
)
365377

366-
# If the request already exists in the RQ, just update it if needed.
367-
else:
368-
# Set the processed request flags.
369-
was_already_present = existing_request is not None
370-
was_already_handled = existing_request.unique_key in state.handled_requests
371-
372-
# If the request is already in the RQ and handled, just continue with the next one.
373-
if was_already_present and was_already_handled:
374-
processed_requests.append(
375-
ProcessedRequest(
376-
unique_key=request.unique_key,
377-
was_already_present=True,
378-
was_already_handled=True,
379-
)
380-
)
378+
# If the request already exists in the RQ and use the forefront flag to update its position
379+
elif forefront:
380+
# If the request is among `regular`, remove it from its current position.
381+
if request.unique_key in state.regular_requests:
382+
state.regular_requests.pop(request.unique_key)
381383

382-
# If the request is already in the RQ but not handled yet, update it.
383-
elif was_already_present and not was_already_handled:
384-
# Update request type (forefront vs regular) in state
385-
if forefront:
386-
# Move from regular to forefront if needed
387-
if existing_request.unique_key in state.regular_requests:
388-
state.regular_requests.pop(existing_request.unique_key)
389-
if existing_request.unique_key not in state.forefront_requests:
390-
state.forefront_requests[existing_request.unique_key] = state.forefront_sequence_counter
391-
state.forefront_sequence_counter += 1
392-
elif (
393-
existing_request.unique_key not in state.forefront_requests
394-
and existing_request.unique_key not in state.regular_requests
395-
):
396-
# Keep as regular if not already forefront
397-
state.regular_requests[existing_request.unique_key] = state.sequence_counter
398-
state.sequence_counter += 1
399-
400-
processed_requests.append(
401-
ProcessedRequest(
402-
unique_key=request.unique_key,
403-
was_already_present=True,
404-
was_already_handled=False,
405-
)
384+
# If the request is already in `forefront`, we just need to update its position.
385+
state.forefront_requests[request.unique_key] = state.forefront_sequence_counter
386+
state.forefront_sequence_counter += 1
387+
388+
processed_requests.append(
389+
ProcessedRequest(
390+
unique_key=request.unique_key,
391+
was_already_present=True,
392+
was_already_handled=False,
406393
)
394+
)
407395

408-
else:
409-
logger.warning(f'Request with unique key "{request.unique_key}" could not be processed.')
410-
unprocessed_requests.append(
411-
UnprocessedRequest(
412-
unique_key=request.unique_key,
413-
url=request.url,
414-
method=request.method,
415-
)
396+
else:
397+
logger.warning(f'Request with unique key "{request.unique_key}" could not be processed.')
398+
unprocessed_requests.append(
399+
UnprocessedRequest(
400+
unique_key=request.unique_key,
401+
url=request.url,
402+
method=request.method,
416403
)
404+
)
417405

418406
await self._update_metadata(
419407
update_modified_at=True,
@@ -752,10 +740,7 @@ async def _get_request_files(cls, path_to_rq: Path) -> list[Path]:
752740
files = await asyncio.to_thread(list, path_to_rq.glob('*.json'))
753741

754742
# Filter out metadata file and non-file entries.
755-
filtered = filter(
756-
lambda request_file: request_file.is_file() and request_file.name != METADATA_FILENAME,
757-
files,
758-
)
743+
filtered = filter(lambda request_file: request_file.is_file() and request_file.name != METADATA_FILENAME, files)
759744

760745
return list(filtered)
761746

src/crawlee/storage_clients/_memory/_request_queue_client.py

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -163,19 +163,27 @@ async def add_batch_of_requests(
163163

164164
# If the request is already in the queue but not handled, update it.
165165
if was_already_present and existing_request:
166-
# Update the existing request with any new data and
167-
# remove old request from pending queue if it's there.
168-
with suppress(ValueError):
169-
self._pending_requests.remove(existing_request)
170-
171166
# Update indexes.
172167
self._requests_by_unique_key[request.unique_key] = request
173168

174-
# Add updated request back to queue.
169+
# We only update `forefront` by updating its position by shifting it to the left.
175170
if forefront:
171+
# Update the existing request with any new data and
172+
# remove old request from pending queue if it's there.
173+
with suppress(ValueError):
174+
self._pending_requests.remove(existing_request)
175+
176+
# Add updated request back to queue.
176177
self._pending_requests.appendleft(request)
177-
else:
178-
self._pending_requests.append(request)
178+
179+
processed_requests.append(
180+
ProcessedRequest(
181+
unique_key=request.unique_key,
182+
was_already_present=True,
183+
was_already_handled=False,
184+
)
185+
)
186+
179187
# Add the new request to the queue.
180188
else:
181189
if forefront:
@@ -217,8 +225,7 @@ async def fetch_next_request(self) -> Request | None:
217225

218226
# Skip if already in progress (shouldn't happen, but safety check).
219227
if request.unique_key in self._in_progress_requests:
220-
self._pending_requests.appendleft(request)
221-
break
228+
continue
222229

223230
# Mark as in progress.
224231
self._in_progress_requests[request.unique_key] = request

0 commit comments

Comments
 (0)