Skip to content

Commit d001858

Browse files
committed
PR feedback and a few small tweaks
1 parent e19644f commit d001858

File tree

6 files changed

+39
-13
lines changed

6 files changed

+39
-13
lines changed

airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,10 @@ def get_request_body_json(
154154
return self._get_request_options(RequestOptionType.body_json)
155155

156156
def reset(self, reset_value: Optional[Any] = None) -> None:
157-
self.pagination_strategy.reset(reset_value=reset_value)
157+
if reset_value:
158+
self.pagination_strategy.reset(reset_value=reset_value)
159+
else:
160+
self.pagination_strategy.reset()
158161
self._token = self.pagination_strategy.initial_token
159162

160163
def _get_request_options(self, option_type: RequestOptionType) -> MutableMapping[str, Any]:

airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py

+2-4
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,8 @@ def next_page_token(self, response: requests.Response, last_page_size: int, last
6666
self._offset += last_page_size
6767
return self._offset
6868

69-
def reset(self, reset_value: Optional[Any] = None) -> None:
70-
if reset_value is None:
71-
self._offset = 0
72-
elif not isinstance(reset_value, int):
69+
def reset(self, reset_value: Optional[Any] = 0) -> None:
70+
if not isinstance(reset_value, int):
7371
raise ValueError(f"Reset value {reset_value} for OffsetIncrement pagination strategy was not an integer")
7472
else:
7573
self._offset = reset_value

airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
7171
self._last_record: Optional[Record] = None
7272
self._parameters = parameters
7373
self._name = InterpolatedString(self._name, parameters=parameters) if isinstance(self._name, str) else self._name
74+
75+
# This mapping is used during a resumable full refresh syncs to indicate whether a partition has started syncing
76+
# records. Partitions serve as the key and map to True if they already began processing records
7477
self._synced_partitions: MutableMapping[Any, bool] = dict()
7578

7679
@property # type: ignore
@@ -357,8 +360,8 @@ def read_records(
357360
return
358361
cursor_value = stream_state.get("next_page_token")
359362

360-
# The first attempt to read a page for the current partition should have the paginator reset to the current
361-
# cursor state which is initially set to the incoming state from the platform
363+
# The first attempt to read a page for the current partition should reset the paginator to the current
364+
# cursor state which is initially assigned to the incoming state from the platform
362365
partition_key = self._to_partition_key(_slice.partition)
363366
if partition_key not in self._synced_partitions:
364367
self._synced_partitions[partition_key] = True

airbyte-cdk/python/airbyte_cdk/sources/streams/checkpoint/checkpoint_reader.py

+22-5
Original file line numberDiff line numberDiff line change
@@ -79,26 +79,43 @@ class CursorBasedCheckpointReader(CheckpointReader):
7979
"""
8080
CursorBasedCheckpointReader is used by streams that implement a Cursor in order to manage state. This allows the checkpoint
8181
reader to delegate the complexity of fetching state to the cursor and focus on the iteration over a stream's partitions.
82-
Right now only low-code connectors provide cursor implementations, but the logic is extensible to any stream that adheres
83-
to the Cursor interface.
82+
83+
This reader supports the Cursor interface used by Python and low-code sources. Not to be confused with Cursor interface
84+
that belongs to the Concurrent CDK.
8485
"""
8586

8687
def __init__(self, cursor: Cursor, stream_slices: Iterable[Optional[Mapping[str, Any]]], read_state_from_cursor: bool = False):
87-
# The first attempt of an RFR stream has an empty {} incoming state, but should still make a first attempt to read records
88-
# from the first page in next().
8988
self._cursor = cursor
9089
self._stream_slices = iter(stream_slices)
90+
# read_state_from_cursor is used to delineate that partitions should determine when to stop syncing dynamically according
91+
# to the value of the state at runtime. This currently only applies to streams that use resumable full refresh.
9192
self._read_state_from_cursor = read_state_from_cursor
9293
self._current_slice: Optional[StreamSlice] = None
9394
self._finished_sync = False
9495

9596
def next(self) -> Optional[Mapping[str, Any]]:
97+
"""
98+
The next() method returns the next slice of data should be synced for the current stream according to its cursor.
99+
This function support iterating over a stream's slices across two dimensions. The first dimension is the stream's
100+
partitions like parent records for a substream. The inner dimension is iterating over the cursor value like a
101+
date range for incremental streams or a pagination checkpoint for resumable full refresh.
102+
103+
basic algorithm for iterating through a stream's slices is:
104+
1. The first time next() is invoked we get the first partition and return it
105+
2. For streams whose cursor value is determined dynamically using stream state
106+
1. Get the current state for the current partition
107+
2. If the current partition's state is complete, get the next partition
108+
3. If the current partition's state is still in progress, emit the next cursor value
109+
3. If a stream has processed all partitions, the iterator will raise a StopIteration exception signaling there are no more
110+
slices left for extracting more records.
111+
"""
112+
96113
try:
97114
if self._current_slice is None:
98115
self._current_slice = self._get_next_slice()
99116
return self._current_slice
100117
if self._read_state_from_cursor:
101-
state_for_slice = self._cursor.select_state(self._current_slice.get("partition"))
118+
state_for_slice = self._cursor.select_state(self._current_slice)
102119
if state_for_slice == {"__ab_full_refresh_sync_complete": True}:
103120
self._current_slice = self._get_next_slice()
104121
else:

airbyte-cdk/python/airbyte_cdk/sources/streams/core.py

+2
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,8 @@ def _get_checkpoint_reader(
412412
stream_slices=slices, cursor=cursor, read_state_from_cursor=checkpoint_mode == CheckpointMode.RESUMABLE_FULL_REFRESH
413413
)
414414
if checkpoint_mode == CheckpointMode.RESUMABLE_FULL_REFRESH:
415+
# Resumable full refresh readers rely on the stream state dynamically being updated during pagination and does
416+
# not iterate over a static set of slices.
415417
return ResumableFullRefreshCheckpointReader(stream_state=stream_state)
416418
else:
417419
slices = self.stream_slices(

airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_offset_increment.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -73,5 +73,8 @@ def test_offset_increment_reset(reset_value, expected_initial_token, expected_er
7373
with pytest.raises(expected_error):
7474
paginator_strategy.reset(reset_value=reset_value)
7575
else:
76-
paginator_strategy.reset(reset_value=reset_value)
76+
if reset_value is None:
77+
paginator_strategy.reset()
78+
else:
79+
paginator_strategy.reset(reset_value=reset_value)
7780
assert paginator_strategy.initial_token == expected_initial_token

0 commit comments

Comments
 (0)