Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import asyncio
from datetime import timedelta

from crawlee import service_locator
from crawlee.configuration import Configuration
from crawlee.storage_clients import MemoryStorageClient
from crawlee.storages import Dataset


Expand All @@ -11,10 +13,16 @@ async def main() -> None:
headless=False,
persist_state_interval=timedelta(seconds=30),
)
# Set the custom configuration as the global default configuration.
service_locator.set_configuration(configuration)

# Pass the configuration to the dataset (or other storage) when opening it.
dataset = await Dataset.open(
configuration=configuration,
# Use the global defaults when creating the dataset (or other storage).
dataset_1 = await Dataset.open()

# Or set explicitly specific storage client if
# you do not want to rely on global defaults.
dataset_2 = await Dataset.open(
storage_client=MemoryStorageClient(configuration=configuration)
)


Expand Down
2 changes: 1 addition & 1 deletion src/crawlee/_autoscaling/snapshotter.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def from_config(cls, config: Configuration | None = None) -> Snapshotter:
Args:
config: The `Configuration` instance. Uses the global (default) one if not provided.
"""
config = service_locator.get_configuration()
config = config or service_locator.get_configuration()

# Compute the maximum memory size based on the provided configuration. If `memory_mbytes` is provided,
# it uses that value. Otherwise, it calculates the `max_memory_size` as a proportion of the system's
Expand Down
71 changes: 46 additions & 25 deletions src/crawlee/_service_locator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
if TYPE_CHECKING:
from crawlee.storages._storage_instance_manager import StorageInstanceManager

from logging import getLogger

logger = getLogger(__name__)


@docs_group('Configuration')
class ServiceLocator:
Expand All @@ -19,23 +23,25 @@ class ServiceLocator:
All services are initialized to its default value lazily.
"""

def __init__(self) -> None:
self._configuration: Configuration | None = None
self._event_manager: EventManager | None = None
self._storage_client: StorageClient | None = None
self._storage_instance_manager: StorageInstanceManager | None = None
global_storage_instance_manager: StorageInstanceManager | None = None

# Flags to check if the services were already set.
self._configuration_was_retrieved = False
self._event_manager_was_retrieved = False
self._storage_client_was_retrieved = False
def __init__(
self,
configuration: Configuration | None = None,
event_manager: EventManager | None = None,
storage_client: StorageClient | None = None,
) -> None:
self._configuration = configuration
self._event_manager = event_manager
self._storage_client = storage_client
self._storage_instance_manager: StorageInstanceManager | None = None

def get_configuration(self) -> Configuration:
"""Get the configuration."""
if self._configuration is None:
logger.warning('No configuration set, implicitly creating and using default Configuration.')
self._configuration = Configuration()

self._configuration_was_retrieved = True
return self._configuration

def set_configuration(self, configuration: Configuration) -> None:
Expand All @@ -47,21 +53,25 @@ def set_configuration(self, configuration: Configuration) -> None:
Raises:
ServiceConflictError: If the configuration has already been retrieved before.
"""
if self._configuration_was_retrieved:
if self._configuration is configuration:
# Same instance, no need to anything
return
if self._configuration:
raise ServiceConflictError(Configuration, configuration, self._configuration)

self._configuration = configuration

def get_event_manager(self) -> EventManager:
"""Get the event manager."""
if self._event_manager is None:
self._event_manager = (
LocalEventManager().from_config(config=self._configuration)
if self._configuration
else LocalEventManager.from_config()
)
logger.warning('No event manager set, implicitly creating and using default LocalEventManager.')
if self._configuration is None:
logger.warning(
'Implicit creation of event manager will implicitly set configuration as side effect. '
'It is advised to explicitly first set the configuration instead.'
)
self._event_manager = LocalEventManager().from_config(config=self._configuration)

self._event_manager_was_retrieved = True
return self._event_manager

def set_event_manager(self, event_manager: EventManager) -> None:
Expand All @@ -73,17 +83,25 @@ def set_event_manager(self, event_manager: EventManager) -> None:
Raises:
ServiceConflictError: If the event manager has already been retrieved before.
"""
if self._event_manager_was_retrieved:
if self._event_manager is event_manager:
# Same instance, no need to anything
return
if self._event_manager:
raise ServiceConflictError(EventManager, event_manager, self._event_manager)

self._event_manager = event_manager

def get_storage_client(self) -> StorageClient:
"""Get the storage client."""
if self._storage_client is None:
self._storage_client = FileSystemStorageClient()
logger.warning('No storage client set, implicitly creating and using default FileSystemStorageClient.')
if self._configuration is None:
logger.warning(
'Implicit creation of storage client will implicitly set configuration as side effect. '
'It is advised to explicitly first set the configuration instead.'
)
self._storage_client = FileSystemStorageClient(configuration=self._configuration)

self._storage_client_was_retrieved = True
return self._storage_client

def set_storage_client(self, storage_client: StorageClient) -> None:
Expand All @@ -95,21 +113,24 @@ def set_storage_client(self, storage_client: StorageClient) -> None:
Raises:
ServiceConflictError: If the storage client has already been retrieved before.
"""
if self._storage_client_was_retrieved:
if self._storage_client is storage_client:
# Same instance, no need to anything
return
if self._storage_client:
raise ServiceConflictError(StorageClient, storage_client, self._storage_client)

self._storage_client = storage_client

@property
def storage_instance_manager(self) -> StorageInstanceManager:
"""Get the storage instance manager."""
if self._storage_instance_manager is None:
"""Get the storage instance manager. It is global manager shared by all instances of ServiceLocator."""
if self.__class__.global_storage_instance_manager is None:
# Import here to avoid circular imports.
from crawlee.storages._storage_instance_manager import StorageInstanceManager # noqa: PLC0415

self._storage_instance_manager = StorageInstanceManager()
self.__class__.global_storage_instance_manager = StorageInstanceManager()

return self._storage_instance_manager
return self.__class__.global_storage_instance_manager


service_locator = ServiceLocator()
2 changes: 1 addition & 1 deletion src/crawlee/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class Configuration(BaseSettings):
Settings can also be configured via environment variables, prefixed with `CRAWLEE_`.
"""

model_config = SettingsConfigDict(populate_by_name=True)
model_config = SettingsConfigDict(validate_by_name=True, validate_by_alias=True)

internal_timeout: Annotated[timedelta | None, Field(alias='crawlee_internal_timeout')] = None
"""Timeout for the internal asynchronous operations."""
Expand Down
39 changes: 27 additions & 12 deletions src/crawlee/crawlers/_basic/_basic_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from crawlee._autoscaling import AutoscaledPool, Snapshotter, SystemStatus
from crawlee._log_config import configure_logger, get_configured_log_level, string_to_log_level
from crawlee._request import Request, RequestOptions, RequestState
from crawlee._service_locator import ServiceLocator
from crawlee._types import (
BasicCrawlingContext,
EnqueueLinksKwargs,
Expand Down Expand Up @@ -346,14 +347,28 @@ def __init__(
_logger: A logger instance, typically provided by a subclass, for consistent logging labels.
Intended for use by subclasses rather than direct instantiation of `BasicCrawler`.
"""
if configuration:
service_locator.set_configuration(configuration)
if storage_client:
service_locator.set_storage_client(storage_client)
if event_manager:
service_locator.set_event_manager(event_manager)
global_configuration: None | Configuration = None

config = service_locator.get_configuration()
if not configuration:
global_configuration = service_locator.get_configuration()
configuration = global_configuration

if not storage_client:
if global_configuration:
# If global configuration was used, reuse its storage client too
storage_client = service_locator.get_storage_client()
else:
# If unique configuration was used, create a unique storage client based on such configuration
storage_client = service_locator.get_storage_client().create_client(configuration)

if not event_manager:
event_manager = service_locator.get_event_manager()

self._service_locator = ServiceLocator(
configuration=configuration, storage_client=storage_client, event_manager=event_manager
)

config = self._service_locator.get_configuration()

# Core components
self._request_manager = request_manager
Expand Down Expand Up @@ -548,7 +563,7 @@ async def _get_proxy_info(self, request: Request, session: Session | None) -> Pr
async def get_request_manager(self) -> RequestManager:
"""Return the configured request manager. If none is configured, open and return the default request queue."""
if not self._request_manager:
self._request_manager = await RequestQueue.open()
self._request_manager = await RequestQueue.open(storage_client=self._service_locator.get_storage_client())

return self._request_manager

Expand All @@ -559,7 +574,7 @@ async def get_dataset(
name: str | None = None,
) -> Dataset:
"""Return the `Dataset` with the given ID or name. If none is provided, return the default one."""
return await Dataset.open(id=id, name=name)
return await Dataset.open(id=id, name=name, storage_client=self._service_locator.get_storage_client())

async def get_key_value_store(
self,
Expand All @@ -568,7 +583,7 @@ async def get_key_value_store(
name: str | None = None,
) -> KeyValueStore:
"""Return the `KeyValueStore` with the given ID or name. If none is provided, return the default KVS."""
return await KeyValueStore.open(id=id, name=name)
return await KeyValueStore.open(id=id, name=name, storage_client=self._service_locator.get_storage_client())

def error_handler(
self, handler: ErrorHandler[TCrawlingContext | BasicCrawlingContext]
Expand Down Expand Up @@ -684,7 +699,7 @@ def sigint_handler() -> None:
return final_statistics

async def _run_crawler(self) -> None:
event_manager = service_locator.get_event_manager()
event_manager = self._service_locator.get_event_manager()

self._crawler_state_rec_task.start()

Expand Down Expand Up @@ -1520,7 +1535,7 @@ def _log_status_message(self, message: str, level: LogLevel = 'DEBUG') -> None:

async def _crawler_state_task(self) -> None:
"""Emit a persist state event with the given migration status."""
event_manager = service_locator.get_event_manager()
event_manager = self._service_locator.get_event_manager()

current_state = self.statistics.state

Expand Down
7 changes: 4 additions & 3 deletions src/crawlee/storage_clients/_base/_storage_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ async def create_dataset_client(
*,
id: str | None = None,
name: str | None = None,
configuration: Configuration | None = None,
) -> DatasetClient:
"""Create a dataset client."""

Expand All @@ -44,7 +43,6 @@ async def create_kvs_client(
*,
id: str | None = None,
name: str | None = None,
configuration: Configuration | None = None,
) -> KeyValueStoreClient:
"""Create a key-value store client."""

Expand All @@ -54,10 +52,13 @@ async def create_rq_client(
*,
id: str | None = None,
name: str | None = None,
configuration: Configuration | None = None,
) -> RequestQueueClient:
"""Create a request queue client."""

@abstractmethod
def create_client(self, configuration: Configuration) -> StorageClient:
"""Create a storage client from an existing storage."""

def get_rate_limit_errors(self) -> dict[int, int]:
"""Return statistics about rate limit errors encountered by the HTTP client in storage client."""
return {}
Expand Down
32 changes: 20 additions & 12 deletions src/crawlee/storage_clients/_file_system/_storage_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,24 @@ class FileSystemStorageClient(StorageClient):
Use it only when running a single crawler process at a time.
"""

def __init__(self, configuration: Configuration | None = None) -> None:
"""Initialize the file system storage client.

Args:
configuration: Optional configuration instance to use with the storage client.
If not provided, the global configuration will be used.
"""
self._configuration = configuration or Configuration.get_global_configuration()

@override
async def create_dataset_client(
self,
*,
id: str | None = None,
name: str | None = None,
configuration: Configuration | None = None,
) -> FileSystemDatasetClient:
configuration = configuration or Configuration.get_global_configuration()
client = await FileSystemDatasetClient.open(id=id, name=name, configuration=configuration)
await self._purge_if_needed(client, configuration)
client = await FileSystemDatasetClient.open(id=id, name=name, configuration=self._configuration)
await self._purge_if_needed(client, self._configuration)
return client

@override
Expand All @@ -48,11 +55,9 @@ async def create_kvs_client(
*,
id: str | None = None,
name: str | None = None,
configuration: Configuration | None = None,
) -> FileSystemKeyValueStoreClient:
configuration = configuration or Configuration.get_global_configuration()
client = await FileSystemKeyValueStoreClient.open(id=id, name=name, configuration=configuration)
await self._purge_if_needed(client, configuration)
client = await FileSystemKeyValueStoreClient.open(id=id, name=name, configuration=self._configuration)
await self._purge_if_needed(client, self._configuration)
return client

@override
Expand All @@ -61,9 +66,12 @@ async def create_rq_client(
*,
id: str | None = None,
name: str | None = None,
configuration: Configuration | None = None,
) -> FileSystemRequestQueueClient:
configuration = configuration or Configuration.get_global_configuration()
client = await FileSystemRequestQueueClient.open(id=id, name=name, configuration=configuration)
await self._purge_if_needed(client, configuration)
client = await FileSystemRequestQueueClient.open(id=id, name=name, configuration=self._configuration)
await self._purge_if_needed(client, self._configuration)
return client

@override
def create_client(self, configuration: Configuration) -> FileSystemStorageClient:
"""Create a storage client from an existing storage client potentially just replacing the configuration."""
return FileSystemStorageClient(configuration)
Loading
Loading