-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(Low-Code Concurrent CDK): Add ConcurrentPerPartitionCursor #111
base: main
Are you sure you want to change the base?
Conversation
📝 Walkthrough📝 WalkthroughWalkthroughThe changes in this pull request introduce enhancements to the Airbyte CDK, particularly focusing on concurrent processing of streams. Key updates include the addition of new cursor types, modifications to existing classes to support these cursors, and refinements in request handling. The Changes
Possibly related PRs
Suggested reviewers
Would you like to consider any additional reviewers or related PRs? Wdyt? 📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 9
🧹 Outside diff range and nitpick comments (5)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
59-66
: Consider simplifying the constructor parametersThe
__init__
method ofConcurrentPerPartitionCursor
has many parameters, which can make it harder to maintain and understand. Would it be beneficial to encapsulate related parameters into data classes or reduce the number of parameters if possible? Wdyt?airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)
23-25
: Unused import statementWe import
PerPartitionWithGlobalCursor
, but it's not used elsewhere in the code. Should we remove this import to keep the code clean? Wdyt?
309-361
: Refactor duplicated code in_group_streams
methodThere seems to be duplicated code in the conditional blocks handling
DatetimeBasedCursorModel
streams. Could we refactor these blocks into a helper function to reduce redundancy and improve readability? Wdyt?airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
915-970
: Consider reducing code duplicationThe methods
create_concurrent_cursor_from_datetime_based_cursor
andcreate_concurrent_cursor_from_perpartition_cursor
share similar logic. Could we extract the common parts into a shared helper function to simplify maintenance? Wdyt?airbyte_cdk/sources/streams/concurrent/cursor.py (1)
243-251
: Consider extracting common logic between close methods?The new
close_partition_without_emit
shares a lot of logic withclose_partition
. What do you think about extracting the common logic into a private method to reduce duplication? Something like this, wdyt?+ def _close_partition_internal(self, partition: Partition) -> bool: + slice_count_before = len(self.state.get("slices", [])) + self._add_slice_to_state(partition) + should_merge = slice_count_before < len(self.state["slices"]) + if should_merge: + self._merge_partitions() + self._has_closed_at_least_one_slice = True + return should_merge + def close_partition_without_emit(self, partition: Partition) -> None: - slice_count_before = len(self.state.get("slices", [])) - self._add_slice_to_state(partition) - if slice_count_before < len( - self.state["slices"] - ): # only emit if at least one slice has been processed - self._merge_partitions() - self._has_closed_at_least_one_slice = True + self._close_partition_internal(partition) def close_partition(self, partition: Partition) -> None: - slice_count_before = len(self.state.get("slices", [])) - self._add_slice_to_state(partition) - if slice_count_before < len( - self.state["slices"] - ): # only emit if at least one slice has been processed - self._merge_partitions() + if self._close_partition_internal(partition): self._emit_state_message() - self._has_closed_at_least_one_slice = True
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (7)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(2 hunks)airbyte_cdk/sources/declarative/incremental/__init__.py
(2 hunks)airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
(1 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(3 hunks)airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
(1 hunks)airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py
(1 hunks)airbyte_cdk/sources/streams/concurrent/cursor.py
(1 hunks)
🔇 Additional comments (5)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)
25-29
: Question about returning only the first cursor
In the ConcurrentCursorFactory
, the create
method returns only the first cursor from _create_function
. Since _create_function
returns a tuple of cursors, should we ensure that we're not discarding any necessary cursors? Perhaps we should handle all cursors returned. Wdyt?
221-224
: Ensure partition key serialization handles edge cases
In _to_partition_key
and _to_dict
, we use self._partition_serializer
. Do we need to ensure that partition keys are properly sanitized or encoded to handle special characters and prevent serialization issues? Wdyt?
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
317-318
: Type checking with isinstance
We use isinstance(declarative_stream.retriever.stream_slicer, PerPartitionWithGlobalCursor)
to check the type. Should we consider using duck typing or interface checks to allow for subclasses or alternative implementations that fulfill the same protocol? Wdyt?
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (1)
41-66
: Clarify the purpose of DeclarativePartitionFactory1
The new class DeclarativePartitionFactory1
appears to be similar to DeclarativePartitionFactory
but with a different approach to handling the retriever. Is this intended to replace the original factory or serve a different purpose? Providing more context or renaming the class for clarity might help. Wdyt?
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (1)
181-181
: LGTM!
The change to use request_option_provider
instead of stream_slicer
improves modularity by separating concerns.
def state(self) -> MutableMapping[str, Any]: | ||
states = [] | ||
for partition_tuple, cursor in self._cursor_per_partition.items(): | ||
cursor_state = cursor._connector_state_converter.convert_to_state_message( | ||
cursor._cursor_field, cursor.state | ||
) | ||
if cursor_state: | ||
states.append( | ||
{ | ||
"partition": self._to_dict(partition_tuple), | ||
"cursor": copy.deepcopy(cursor_state), | ||
} | ||
) | ||
state: dict[str, Any] = {"states": states} | ||
return state | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Potential performance issue when generating state
The state
property method iterates over self._cursor_per_partition.items()
and performs deep copies of cursor states. This could become a performance bottleneck with a large number of partitions. Should we consider optimizing this by avoiding deep copies or processing states incrementally? Wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason we're doing a deepcopy here? If so, I think we should document it
def _ensure_partition_limit(self) -> None: | ||
""" | ||
Ensure the maximum number of partitions is not exceeded. If so, the oldest added partition will be dropped. | ||
""" | ||
while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1: | ||
self._over_limit += 1 | ||
oldest_partition = self._cursor_per_partition.popitem(last=False)[ | ||
0 | ||
] # Remove the oldest partition | ||
logger.warning( | ||
f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._over_limit}." | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clarification on partition limit logic
In _ensure_partition_limit
, we increment _over_limit
every time we remove a partition when the limit is reached. However, in limit_reached
, we check if _over_limit > DEFAULT_MAX_PARTITIONS_NUMBER
. Is this the intended behavior? Should the condition be adjusted to properly reflect when the limit is truly exceeded? Wdyt?
def limit_reached(self) -> bool: | ||
return self._over_limit > self.DEFAULT_MAX_PARTITIONS_NUMBER | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possible off-by-one error in limit_reached
method
The method limit_reached
returns True
when _over_limit > DEFAULT_MAX_PARTITIONS_NUMBER
. Given how _over_limit
is incremented, could this condition lead to an off-by-one error? Should we revisit this logic to ensure accurate limit checks? Wdyt?
stream_state = state_manager.get_stream_state( | ||
stream_name=declarative_stream.name, namespace=declarative_stream.namespace | ||
) | ||
partition_router = declarative_stream.retriever.stream_slicer._partition_router | ||
|
||
cursor = self._constructor.create_concurrent_cursor_from_perpartition_cursor( | ||
state_manager=state_manager, | ||
model_type=DatetimeBasedCursorModel, | ||
component_definition=incremental_sync_component_definition, | ||
stream_name=declarative_stream.name, | ||
stream_namespace=declarative_stream.namespace, | ||
config=config or {}, | ||
stream_state=stream_state, | ||
partition_router=partition_router, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handling potential None
in stream_state
When retrieving stream_state
, do we need to handle the case where it might be None
? Ensuring that stream_state
is properly initialized could prevent unexpected errors during cursor creation. Wdyt?
) | ||
|
||
|
||
partition_generator = StreamSlicerPartitionGenerator( | ||
DeclarativePartitionFactory( | ||
declarative_stream.name, | ||
declarative_stream.get_json_schema(), | ||
self._retriever_factory( | ||
name_to_stream_mapping[declarative_stream.name], | ||
config, | ||
stream_state, | ||
), | ||
self.message_repository, | ||
), | ||
cursor, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Error handling for partition generator creation
In constructing the partition_generator
, are there potential edge cases where dependencies might not be properly initialized? Should we add error handling or input validation to make the code more robust against such scenarios? Wdyt?
ConcurrentCursorFactory, | ||
ConcurrentPerPartitionCursor, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing comma in import statement
In the import statement for ConcurrentCursorFactory
and ConcurrentPerPartitionCursor
, there's a missing comma between them. Should we add a comma to prevent syntax errors? Wdyt?
"ConcurrentCursorFactory" | ||
"ConcurrentPerPartitionCursor", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comma missing in __all__
list
In the __all__
list, there's a missing comma after "ConcurrentCursorFactory"
. This could lead to import errors. Should we add the comma to fix this issue? Wdyt?
The DeclarativePartitionFactory takes a retriever_factory and not a retriever directly. The reason is that our components are not | ||
thread safe and classes like `DefaultPaginator` may not work because multiple threads can access and modify a shared field across each other. | ||
In order to avoid these problems, we will create one retriever per thread which should make the processing thread-safe. | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update docstring to match the new constructor
The docstring for DeclarativePartitionFactory1
mentions that it takes a retriever_factory
, but the constructor now accepts a retriever
instance directly. Should we update the docstring to reflect this change and prevent confusion? Wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Outside diff range and nitpick comments (5)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
152-156
: Could we simplify the partition limit condition in_ensure_partition_limit
method?Currently, the condition is
len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1
. Would it improve readability to uselen(self._cursor_per_partition) >= self.DEFAULT_MAX_PARTITIONS_NUMBER
instead? This change might make the maximum partition limit clearer. Wdyt?airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
910-965
: Consider extracting common validation logic and fixing indentation.A few suggestions to improve this new method:
- The component type validation logic (lines 922-926) is duplicated from create_concurrent_cursor_from_datetime_based_cursor. Should we extract it into a helper method to follow DRY?
- The indentation in the return statement (lines 955-964) seems inconsistent with the rest of the file. Should we align it with the standard indentation?
- Instead of using type ignore on line 961, we could properly type message_repository. Wdyt?
def create_concurrent_cursor_from_perpartition_cursor( self, state_manager: ConnectorStateManager, model_type: Type[BaseModel], component_definition: ComponentDefinition, stream_name: str, stream_namespace: Optional[str], config: Config, stream_state: MutableMapping[str, Any], partition_router, **kwargs: Any, ) -> ConcurrentPerPartitionCursor: - component_type = component_definition.get("type") - if component_definition.get("type") != model_type.__name__: - raise ValueError( - f"Expected manifest component of type {model_type.__name__}, but received {component_type} instead" - ) + self._validate_component_type(component_definition, model_type) # ... rest of the method ... return ConcurrentPerPartitionCursor( - cursor_factory=cursor_factory, - partition_router=partition_router, - stream_name=stream_name, - stream_namespace=stream_namespace, - stream_state=stream_state, - message_repository=self._message_repository, # type: ignore - connector_state_manager=state_manager, - cursor_field=cursor_field, - ) + cursor_factory=cursor_factory, + partition_router=partition_router, + stream_name=stream_name, + stream_namespace=stream_namespace, + stream_state=stream_state, + message_repository=self._message_repository, + connector_state_manager=state_manager, + cursor_field=cursor_field, + )Helper method to add:
def _validate_component_type(self, component_definition: ComponentDefinition, expected_type: Type[BaseModel]) -> None: component_type = component_definition.get("type") if component_definition.get("type") != expected_type.__name__: raise ValueError( f"Expected manifest component of type {expected_type.__name__}, but received {component_type} instead" )unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (3)
276-276
: Should the test case name match the function name for consistency?In the parameterization at line 276, the test name is
"test_incremental_parent_state"
, but the test function is namedtest_incremental_parent_state_no_incremental_dependency
. Aligning the test case name with the function name could enhance clarity. Wdyt?
483-495
: Could we improve the docstring formatting for better readability?The docstring contains a detailed explanation of the test, but it's indented inconsistently. Adjusting the indentation could enhance readability and maintain consistency with PEP 257 guidelines. Wdyt?
519-523
: Is comparing only the last state sufficient to validate the final state?In the assertion at lines 519-523, we compare
final_state[-1]
withexpected_state
. Should we consider comparing all elements offinal_state
to ensure that all state messages throughout the sync match the expectations? Wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (7)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(2 hunks)airbyte_cdk/sources/declarative/extractors/record_filter.py
(2 hunks)airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
(1 hunks)airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py
(1 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(4 hunks)airbyte_cdk/sources/streams/concurrent/cursor.py
(1 hunks)unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte_cdk/sources/declarative/concurrent_declarative_source.py
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/extractors/record_filter.py (1)
78-81
: Is it safe to use an empty stream_name
when creating Record
instances?
In the filter_records
method, we're creating Record
instances with an empty stream_name
. Could this lead to issues if other parts of the codebase expect stream_name
to be non-empty? Should we consider passing the actual stream name to prevent potential problems? Wdyt?
airbyte_cdk/sources/streams/concurrent/cursor.py (1)
243-251
: Could we clarify the behavior of close_partition_without_emit
method?
In the close_partition_without_emit
method, we update _has_closed_at_least_one_slice
to True
, but we don't emit a state message. Is this intended? Should we ensure that the state is eventually emitted elsewhere to keep the state consistent? Wdyt?
if self._to_partition_key(record.associated_slice.partition) not in self._cursor_per_partition: | ||
partition_state = ( | ||
self._state_to_migrate_from | ||
if self._state_to_migrate_from | ||
else self._NO_CURSOR_STATE | ||
) | ||
cursor = self._create_cursor(partition_state) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add a check for record.associated_slice
being None
in should_be_synced
?
In the should_be_synced
method, we access record.associated_slice.partition
without verifying if record.associated_slice
is not None
. This could raise an AttributeError
if record.associated_slice
is None
. Should we add a check to ensure record.associated_slice
is not None
before proceeding? Wdyt?
source_config=manifest, config=config, catalog=catalog, state=state | ||
) | ||
messages = list( | ||
source.read(logger=source.logger, config=config, catalog=catalog, state=[]) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we pass the initial_state
to the read
method?
I noticed that in the _run_read
function, we initialize ConcurrentDeclarativeSource
with the state
parameter, but when calling source.read
, we're passing state=[]
instead of state=state
. Passing the initial_state
to read
might be necessary to ensure the state is correctly utilized during the read operation. Wdyt?
/autofix
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (1)
unit_tests/sources/declarative/extractors/test_record_filter.py (1)
Line range hint
341-432
: Comprehensive test coverage for cursor types! 👍The parameterized tests cover all important cursor scenarios:
- Basic datetime cursor
- Global substream cursor (with and without state)
- Per-partition cursor with various state configurations
One thought: would it be valuable to add a test case for error handling when an invalid cursor type is passed? wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
unit_tests/sources/declarative/extractors/test_record_filter.py
(2 hunks)
🔇 Additional comments (1)
unit_tests/sources/declarative/extractors/test_record_filter.py (1)
293-293
: Nice API simplification! 🎉
I see you've consolidated the cursor parameters (date_time_based_cursor
and substream_cursor
) into a single cursor
parameter. This makes the API cleaner and more flexible. The tests demonstrate it works well with all cursor types (DatetimeBasedCursor, GlobalSubstreamCursor, and PerPartitionWithGlobalCursor).
Also applies to: 431-431
/autofix
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
305-316
: Consider extracting condition to a descriptive methodThe nested conditions make the code harder to read and maintain. Would you consider extracting these conditions into a separate method with a descriptive name? Wdyt?
Something like:
+def _supports_concurrent_perpartition_cursor( + self, + declarative_stream: DeclarativeStream, + incremental_sync_component_definition: Mapping[str, Any] +) -> bool: + return ( + incremental_sync_component_definition + and incremental_sync_component_definition.get("type", "") == DatetimeBasedCursorModel.__name__ + and self._stream_supports_concurrent_partition_processing(declarative_stream=declarative_stream) + and hasattr(declarative_stream.retriever, "stream_slicer") + and isinstance(declarative_stream.retriever.stream_slicer, PerPartitionWithGlobalCursor) + )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
⛔ Files ignored due to path filters (1)
poetry.lock
is excluded by!**/*.lock
📒 Files selected for processing (5)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(2 hunks)airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
(1 hunks)airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py
(1 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(4 hunks)unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
- airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
🔇 Additional comments (4)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
84-85
:
Add missing comma between imports
There's a missing comma between ConcurrentCursorFactory
and ConcurrentPerPartitionCursor
in the import statement.
Apply this diff:
- ConcurrentCursorFactory
- ConcurrentPerPartitionCursor,
+ ConcurrentCursorFactory,
+ ConcurrentPerPartitionCursor,
airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py (2)
Line range hint 320-358
: LGTM! Well-structured comparison logic
The implementation includes proper null checks and validation for partition matching. The error messages are clear and descriptive.
306-319
:
Add null check for record.associated_slice
The code accesses record.associated_slice.partition
without verifying if record.associated_slice
is not None. This could lead to a NullPointerException.
Consider adding a null check like this:
def should_be_synced(self, record: Record) -> bool:
+ if not record.associated_slice:
+ raise ValueError("Record must have an associated slice")
if (
self._to_partition_key(record.associated_slice.partition)
not in self._cursor_per_partition
):
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
317-331
: Consider handling potential None stream_state
When retrieving stream_state
, we might want to handle the case where it could be None to prevent unexpected errors during cursor creation. Wdyt?
Let's verify the current handling of stream_state:
#!/bin/bash
# Check how stream_state is handled in other parts of the codebase
rg -A 3 "get_stream_state" | grep -v "test"
def create_concurrent_cursor_from_perpartition_cursor( | ||
self, | ||
state_manager: ConnectorStateManager, | ||
model_type: Type[BaseModel], | ||
component_definition: ComponentDefinition, | ||
stream_name: str, | ||
stream_namespace: Optional[str], | ||
config: Config, | ||
stream_state: MutableMapping[str, Any], | ||
partition_router, | ||
**kwargs: Any, | ||
) -> ConcurrentPerPartitionCursor: | ||
component_type = component_definition.get("type") | ||
if component_definition.get("type") != model_type.__name__: | ||
raise ValueError( | ||
f"Expected manifest component of type {model_type.__name__}, but received {component_type} instead" | ||
) | ||
|
||
datetime_based_cursor_model = model_type.parse_obj(component_definition) | ||
|
||
if not isinstance(datetime_based_cursor_model, DatetimeBasedCursorModel): | ||
raise ValueError( | ||
f"Expected {model_type.__name__} component, but received {datetime_based_cursor_model.__class__.__name__}" | ||
) | ||
|
||
interpolated_cursor_field = InterpolatedString.create( | ||
datetime_based_cursor_model.cursor_field, | ||
parameters=datetime_based_cursor_model.parameters or {}, | ||
) | ||
cursor_field = CursorField(interpolated_cursor_field.eval(config=config)) | ||
|
||
# Create the cursor factory | ||
cursor_factory = ConcurrentCursorFactory( | ||
partial( | ||
self.create_concurrent_cursor_from_datetime_based_cursor, | ||
state_manager=state_manager, | ||
model_type=model_type, | ||
component_definition=component_definition, | ||
stream_name=stream_name, | ||
stream_namespace=stream_namespace, | ||
config=config, | ||
) | ||
) | ||
|
||
# Return the concurrent cursor and state converter | ||
return ConcurrentPerPartitionCursor( | ||
cursor_factory=cursor_factory, | ||
partition_router=partition_router, | ||
stream_name=stream_name, | ||
stream_namespace=stream_namespace, | ||
stream_state=stream_state, | ||
message_repository=self._message_repository, # type: ignore | ||
connector_state_manager=state_manager, | ||
cursor_field=cursor_field, | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider adding error handling for cursor creation
The cursor creation logic could benefit from additional error handling. For instance, what happens if the cursor factory creation fails? Wdyt?
Here's a suggested enhancement:
def create_concurrent_cursor_from_perpartition_cursor(
self,
...
) -> ConcurrentPerPartitionCursor:
+ try:
component_type = component_definition.get("type")
if component_definition.get("type") != model_type.__name__:
raise ValueError(
f"Expected manifest component of type {model_type.__name__}, but received {component_type} instead"
)
# Rest of the implementation...
return ConcurrentPerPartitionCursor(
cursor_factory=cursor_factory,
partition_router=partition_router,
stream_name=stream_name,
stream_namespace=stream_namespace,
stream_state=stream_state,
message_repository=self._message_repository,
connector_state_manager=state_manager,
cursor_field=cursor_field,
)
+ except Exception as e:
+ raise ValueError(
+ f"Failed to create concurrent cursor for stream '{stream_name}': {str(e)}"
+ ) from e
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def create_concurrent_cursor_from_perpartition_cursor( | |
self, | |
state_manager: ConnectorStateManager, | |
model_type: Type[BaseModel], | |
component_definition: ComponentDefinition, | |
stream_name: str, | |
stream_namespace: Optional[str], | |
config: Config, | |
stream_state: MutableMapping[str, Any], | |
partition_router, | |
**kwargs: Any, | |
) -> ConcurrentPerPartitionCursor: | |
component_type = component_definition.get("type") | |
if component_definition.get("type") != model_type.__name__: | |
raise ValueError( | |
f"Expected manifest component of type {model_type.__name__}, but received {component_type} instead" | |
) | |
datetime_based_cursor_model = model_type.parse_obj(component_definition) | |
if not isinstance(datetime_based_cursor_model, DatetimeBasedCursorModel): | |
raise ValueError( | |
f"Expected {model_type.__name__} component, but received {datetime_based_cursor_model.__class__.__name__}" | |
) | |
interpolated_cursor_field = InterpolatedString.create( | |
datetime_based_cursor_model.cursor_field, | |
parameters=datetime_based_cursor_model.parameters or {}, | |
) | |
cursor_field = CursorField(interpolated_cursor_field.eval(config=config)) | |
# Create the cursor factory | |
cursor_factory = ConcurrentCursorFactory( | |
partial( | |
self.create_concurrent_cursor_from_datetime_based_cursor, | |
state_manager=state_manager, | |
model_type=model_type, | |
component_definition=component_definition, | |
stream_name=stream_name, | |
stream_namespace=stream_namespace, | |
config=config, | |
) | |
) | |
# Return the concurrent cursor and state converter | |
return ConcurrentPerPartitionCursor( | |
cursor_factory=cursor_factory, | |
partition_router=partition_router, | |
stream_name=stream_name, | |
stream_namespace=stream_namespace, | |
stream_state=stream_state, | |
message_repository=self._message_repository, # type: ignore | |
connector_state_manager=state_manager, | |
cursor_field=cursor_field, | |
) | |
def create_concurrent_cursor_from_perpartition_cursor( | |
self, | |
state_manager: ConnectorStateManager, | |
model_type: Type[BaseModel], | |
component_definition: ComponentDefinition, | |
stream_name: str, | |
stream_namespace: Optional[str], | |
config: Config, | |
stream_state: MutableMapping[str, Any], | |
partition_router, | |
**kwargs: Any, | |
) -> ConcurrentPerPartitionCursor: | |
try: | |
component_type = component_definition.get("type") | |
if component_definition.get("type") != model_type.__name__: | |
raise ValueError( | |
f"Expected manifest component of type {model_type.__name__}, but received {component_type} instead" | |
) | |
datetime_based_cursor_model = model_type.parse_obj(component_definition) | |
if not isinstance(datetime_based_cursor_model, DatetimeBasedCursorModel): | |
raise ValueError( | |
f"Expected {model_type.__name__} component, but received {datetime_based_cursor_model.__class__.__name__}" | |
) | |
interpolated_cursor_field = InterpolatedString.create( | |
datetime_based_cursor_model.cursor_field, | |
parameters=datetime_based_cursor_model.parameters or {}, | |
) | |
cursor_field = CursorField(interpolated_cursor_field.eval(config=config)) | |
# Create the cursor factory | |
cursor_factory = ConcurrentCursorFactory( | |
partial( | |
self.create_concurrent_cursor_from_datetime_based_cursor, | |
state_manager=state_manager, | |
model_type=model_type, | |
component_definition=component_definition, | |
stream_name=stream_name, | |
stream_namespace=stream_namespace, | |
config=config, | |
) | |
) | |
# Return the concurrent cursor and state converter | |
return ConcurrentPerPartitionCursor( | |
cursor_factory=cursor_factory, | |
partition_router=partition_router, | |
stream_name=stream_name, | |
stream_namespace=stream_namespace, | |
stream_state=stream_state, | |
message_repository=self._message_repository, # type: ignore | |
connector_state_manager=state_manager, | |
cursor_field=cursor_field, | |
) | |
except Exception as e: | |
raise ValueError( | |
f"Failed to create concurrent cursor for stream '{stream_name}': {str(e)}" | |
) from e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm very happy with the progress on this! I've added a couple of comments more in terms of code maintenance and structure but the functional part seemed fine. I want to check the tests eventually (tomorrow hopefully) but I can still leave a couple of comments here
@@ -299,6 +302,60 @@ def _group_streams( | |||
cursor=final_state_cursor, | |||
) | |||
) | |||
elif ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we filter only on list partition router until we support the global cursor part?
@@ -77,7 +75,7 @@ def filter_records( | |||
records = ( | |||
record | |||
for record in records | |||
if (self._substream_cursor or self._date_time_based_cursor).should_be_synced( | |||
if self._cursor.should_be_synced( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is so beautiful that I want to cry ❤️
|
||
|
||
class ConcurrentCursorFactory: | ||
def __init__(self, create_function: Callable[..., Cursor]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the input type for Callable
be StreamState
?
self._set_initial_state(stream_state) | ||
|
||
@property | ||
def cursor_field(self) -> CursorField: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add this in the Cursor
interface? If we put it there, it seems like the return type will need to be Optional
because of the FinalStateCursor
def state(self) -> MutableMapping[str, Any]: | ||
states = [] | ||
for partition_tuple, cursor in self._cursor_per_partition.items(): | ||
cursor_state = cursor._connector_state_converter.convert_to_state_message( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have two confusions regarding this method:
Why is Cursor.state public?
I know it was already like that before you changes but I'm trying to understand where it is used and with it is public. Before your changes, it didn't seem like we needed it to be public.
Is there a way we can access the information we need without accessing private parameters?
It feel like what we need to make public here is the representation of the data that is being set as an AirbyteMessage of type state. In order words, the one in ConcurrentCursor._emit_state_message
because when PerPartitionCursor will want to emit the state, it does not care about the implementation of the cursor but care about setting it in a state message
Based on the two points above, can we have the state
method be more explicit (something like as_state_message
) with a comment saying it is used for PerPartitionCursor and it is NOT used for other classes that are not cursors to emit states?
@@ -240,6 +240,15 @@ def observe(self, record: Record) -> None: | |||
def _extract_cursor_value(self, record: Record) -> Any: | |||
return self._connector_state_converter.parse_value(self._cursor_field.extract_value(record)) | |||
|
|||
def close_partition_without_emit(self, partition: Partition) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels like if we add a public method here, it should be also in the public interface. Based on that, I'm wondering if it would be easier to have the ConcurrentCursorFactory
set the message_repository
as a NoopMessageRepository
to prevent the cursors from emitting anything. This solution would work with any type of cursors as well (let's say we want to support custom cursors soon)
def state(self) -> MutableMapping[str, Any]: | ||
states = [] | ||
for partition_tuple, cursor in self._cursor_per_partition.items(): | ||
cursor_state = cursor._connector_state_converter.convert_to_state_message( | ||
cursor._cursor_field, cursor.state | ||
) | ||
if cursor_state: | ||
states.append( | ||
{ | ||
"partition": self._to_dict(partition_tuple), | ||
"cursor": copy.deepcopy(cursor_state), | ||
} | ||
) | ||
state: dict[str, Any] = {"states": states} | ||
return state | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason we're doing a deepcopy here? If so, I think we should document it
@@ -178,7 +178,7 @@ def _request_headers( | |||
stream_slice, | |||
next_page_token, | |||
self._paginator.get_request_headers, | |||
self.stream_slicer.get_request_headers, | |||
self.request_option_provider.get_request_headers, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@brianjlai was there a reason we were still using the stream_slicer
here instead of the request_option_provider
?
a6b5daa
to
79ffb77
Compare
/autofix
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (2)
airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py (1)
306-319
:⚠️ Potential issueShould we add a null check for
record.associated_slice
?The code accesses
record.associated_slice.partition
without verifying ifrecord.associated_slice
is notNone
. This could raise anAttributeError
. Consider adding a null check, wdyt?def should_be_synced(self, record: Record) -> bool: + if not record.associated_slice: + raise ValueError("Record must have an associated slice") if ( self._to_partition_key(record.associated_slice.partition) not in self._cursor_per_partition ):unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (1)
244-268
:⚠️ Potential issueShould we pass the state parameter to the read method?
The
state
parameter is passed toConcurrentDeclarativeSource
initialization but not to theread
method (empty list is passed instead). This might affect state handling. Consider using the state parameter in both places, wdyt?- messages = list(source.read(logger=source.logger, config=config, catalog=catalog, state=[])) + messages = list(source.read(logger=source.logger, config=config, catalog=catalog, state=state))
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
962-1018
: Consider adding error handling for cursor creationThe cursor creation logic could benefit from additional error handling similar to
create_concurrent_cursor_from_datetime_based_cursor
. For instance, what happens if the cursor factory creation fails? Wdyt?def create_concurrent_cursor_from_perpartition_cursor( self, state_manager: ConnectorStateManager, model_type: Type[BaseModel], component_definition: ComponentDefinition, stream_name: str, stream_namespace: Optional[str], config: Config, stream_state: MutableMapping[str, Any], partition_router, **kwargs: Any, ) -> ConcurrentPerPartitionCursor: + try: component_type = component_definition.get("type") if component_definition.get("type") != model_type.__name__: raise ValueError( f"Expected manifest component of type {model_type.__name__}, but received {component_type} instead" ) # Rest of the implementation... return ConcurrentPerPartitionCursor( cursor_factory=cursor_factory, partition_router=partition_router, stream_name=stream_name, stream_namespace=stream_namespace, stream_state=stream_state, message_repository=self._message_repository, connector_state_manager=state_manager, cursor_field=cursor_field, ) + except Exception as e: + raise ValueError( + f"Failed to create concurrent cursor for stream '{stream_name}': {str(e)}" + ) from e
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(2 hunks)airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
(1 hunks)airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py
(1 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(7 hunks)airbyte_cdk/sources/streams/concurrent/cursor.py
(1 hunks)unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- airbyte_cdk/sources/streams/concurrent/cursor.py
- airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
🔇 Additional comments (5)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (3)
482-528
: LGTM! Well-structured test with clear documentation.
The test case is comprehensive and effectively validates the parent state behavior when incremental dependency is disabled.
530-630
: LGTM! Excellent test helper implementation.
The helper function is well-documented and implements thorough validation steps for testing incremental parent state behavior.
1124-1556
: LGTM! Comprehensive test coverage.
The test functions thoroughly cover different scenarios including state migration and empty record sets. The test cases are well-structured and documented.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
Line range hint 814-961
: LGTM! Well-structured implementation with proper error handling.
The implementation of create_concurrent_cursor_from_datetime_based_cursor
is thorough and handles all edge cases appropriately.
84-85
:
Add missing comma between imports
There seems to be a missing comma between ConcurrentCursorFactory
and ConcurrentPerPartitionCursor
. Wdyt?
- ConcurrentCursorFactory
- ConcurrentPerPartitionCursor,
+ ConcurrentCursorFactory,
+ ConcurrentPerPartitionCursor,
Likely invalid or redundant comment.
elif ( | ||
incremental_sync_component_definition | ||
and incremental_sync_component_definition.get("type", "") | ||
== DatetimeBasedCursorModel.__name__ | ||
and self._stream_supports_concurrent_partition_processing( | ||
declarative_stream=declarative_stream | ||
) | ||
and hasattr(declarative_stream.retriever, "stream_slicer") | ||
and isinstance( | ||
declarative_stream.retriever.stream_slicer, PerPartitionWithGlobalCursor | ||
) | ||
): | ||
stream_state = state_manager.get_stream_state( | ||
stream_name=declarative_stream.name, namespace=declarative_stream.namespace | ||
) | ||
partition_router = declarative_stream.retriever.stream_slicer._partition_router | ||
|
||
cursor = self._constructor.create_concurrent_cursor_from_perpartition_cursor( | ||
state_manager=state_manager, | ||
model_type=DatetimeBasedCursorModel, | ||
component_definition=incremental_sync_component_definition, | ||
stream_name=declarative_stream.name, | ||
stream_namespace=declarative_stream.namespace, | ||
config=config or {}, | ||
stream_state=stream_state, | ||
partition_router=partition_router, | ||
) | ||
|
||
partition_generator = StreamSlicerPartitionGenerator( | ||
DeclarativePartitionFactory( | ||
declarative_stream.name, | ||
declarative_stream.get_json_schema(), | ||
self._retriever_factory( | ||
name_to_stream_mapping[declarative_stream.name], | ||
config, | ||
stream_state, | ||
), | ||
self.message_repository, | ||
), | ||
cursor, | ||
) | ||
|
||
concurrent_streams.append( | ||
DefaultStream( | ||
partition_generator=partition_generator, | ||
name=declarative_stream.name, | ||
json_schema=declarative_stream.get_json_schema(), | ||
availability_strategy=AlwaysAvailableAvailabilityStrategy(), | ||
primary_key=get_primary_key_from_stream(declarative_stream.primary_key), | ||
cursor_field=cursor.cursor_field.cursor_field_key, | ||
logger=self.logger, | ||
cursor=cursor, | ||
) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add error handling for the partition router access?
The code directly accesses declarative_stream.retriever.stream_slicer._partition_router
without verifying if all the intermediate attributes exist. Consider adding error handling to make the code more robust, wdyt?
- partition_router = declarative_stream.retriever.stream_slicer._partition_router
+ try:
+ partition_router = declarative_stream.retriever.stream_slicer._partition_router
+ except AttributeError as e:
+ raise ValueError(
+ f"Failed to access partition router for stream {declarative_stream.name}: {str(e)}"
+ ) from e
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
elif ( | |
incremental_sync_component_definition | |
and incremental_sync_component_definition.get("type", "") | |
== DatetimeBasedCursorModel.__name__ | |
and self._stream_supports_concurrent_partition_processing( | |
declarative_stream=declarative_stream | |
) | |
and hasattr(declarative_stream.retriever, "stream_slicer") | |
and isinstance( | |
declarative_stream.retriever.stream_slicer, PerPartitionWithGlobalCursor | |
) | |
): | |
stream_state = state_manager.get_stream_state( | |
stream_name=declarative_stream.name, namespace=declarative_stream.namespace | |
) | |
partition_router = declarative_stream.retriever.stream_slicer._partition_router | |
cursor = self._constructor.create_concurrent_cursor_from_perpartition_cursor( | |
state_manager=state_manager, | |
model_type=DatetimeBasedCursorModel, | |
component_definition=incremental_sync_component_definition, | |
stream_name=declarative_stream.name, | |
stream_namespace=declarative_stream.namespace, | |
config=config or {}, | |
stream_state=stream_state, | |
partition_router=partition_router, | |
) | |
partition_generator = StreamSlicerPartitionGenerator( | |
DeclarativePartitionFactory( | |
declarative_stream.name, | |
declarative_stream.get_json_schema(), | |
self._retriever_factory( | |
name_to_stream_mapping[declarative_stream.name], | |
config, | |
stream_state, | |
), | |
self.message_repository, | |
), | |
cursor, | |
) | |
concurrent_streams.append( | |
DefaultStream( | |
partition_generator=partition_generator, | |
name=declarative_stream.name, | |
json_schema=declarative_stream.get_json_schema(), | |
availability_strategy=AlwaysAvailableAvailabilityStrategy(), | |
primary_key=get_primary_key_from_stream(declarative_stream.primary_key), | |
cursor_field=cursor.cursor_field.cursor_field_key, | |
logger=self.logger, | |
cursor=cursor, | |
) | |
) | |
elif ( | |
incremental_sync_component_definition | |
and incremental_sync_component_definition.get("type", "") | |
== DatetimeBasedCursorModel.__name__ | |
and self._stream_supports_concurrent_partition_processing( | |
declarative_stream=declarative_stream | |
) | |
and hasattr(declarative_stream.retriever, "stream_slicer") | |
and isinstance( | |
declarative_stream.retriever.stream_slicer, PerPartitionWithGlobalCursor | |
) | |
): | |
stream_state = state_manager.get_stream_state( | |
stream_name=declarative_stream.name, namespace=declarative_stream.namespace | |
) | |
try: | |
partition_router = declarative_stream.retriever.stream_slicer._partition_router | |
except AttributeError as e: | |
raise ValueError( | |
f"Failed to access partition router for stream {declarative_stream.name}: {str(e)}" | |
) from e | |
cursor = self._constructor.create_concurrent_cursor_from_perpartition_cursor( | |
state_manager=state_manager, | |
model_type=DatetimeBasedCursorModel, | |
component_definition=incremental_sync_component_definition, | |
stream_name=declarative_stream.name, | |
stream_namespace=declarative_stream.namespace, | |
config=config or {}, | |
stream_state=stream_state, | |
partition_router=partition_router, | |
) | |
partition_generator = StreamSlicerPartitionGenerator( | |
DeclarativePartitionFactory( | |
declarative_stream.name, | |
declarative_stream.get_json_schema(), | |
self._retriever_factory( | |
name_to_stream_mapping[declarative_stream.name], | |
config, | |
stream_state, | |
), | |
self.message_repository, | |
), | |
cursor, | |
) | |
concurrent_streams.append( | |
DefaultStream( | |
partition_generator=partition_generator, | |
name=declarative_stream.name, | |
json_schema=declarative_stream.get_json_schema(), | |
availability_strategy=AlwaysAvailableAvailabilityStrategy(), | |
primary_key=get_primary_key_from_stream(declarative_stream.primary_key), | |
cursor_field=cursor.cursor_field.cursor_field_key, | |
logger=self.logger, | |
cursor=cursor, | |
) | |
) |
) and not isinstance(stream_slicer, PerPartitionWithGlobalCursor): | ||
# Many of the custom component implementations of DatetimeBasedCursor override get_request_params() (or other methods). | ||
# Because we're decoupling RequestOptionsProvider from the Cursor, custom components will eventually need to reimplement |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Simplify the type checking condition
The current condition seems overly complex and might cause issues with valid subclasses:
not isinstance(stream_slicer, DatetimeBasedCursor) or type(stream_slicer) is not DatetimeBasedCursor
Should we simplify it to just use isinstance
? The current combination of isinstance
and type
checks with OR could unexpectedly reject valid subclasses of DatetimeBasedCursor
. Wdyt?
- if (
- not isinstance(stream_slicer, DatetimeBasedCursor)
- or type(stream_slicer) is not DatetimeBasedCursor
- ) and not isinstance(stream_slicer, PerPartitionWithGlobalCursor):
+ if not isinstance(stream_slicer, (DatetimeBasedCursor, PerPartitionWithGlobalCursor)):
Summary by CodeRabbit
Release Notes
New Features
ClientSideIncrementalRecordFilterDecorator
.ConcurrentCursor
class.Bug Fixes
SimpleRetriever
class for consistency.Documentation
Tests
ConcurrentDeclarativeSource
and updated tests for record filtering to match new cursor parameterization.