Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(source-google-sheets): migrate low code #50843

Open
wants to merge 34 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
c9faf8b
source-google-sheets: adding dymamic streams and dynamic schemas for …
aldogonzalez8 Dec 12, 2024
b4f8deb
source-google-sheets: add custom retriever with custom slicer for ranges
aldogonzalez8 Dec 14, 2024
86f4cc9
source-google-sheets: fix nested records in value field with custom e…
aldogonzalez8 Dec 16, 2024
777daa5
source-google-sheets:" -m " - Add transformations to record selector…
aldogonzalez8 Dec 18, 2024
9435c86
source-google-sheets: remove teste file
aldogonzalez8 Dec 18, 2024
624d671
source-google-sheets:
aldogonzalez8 Dec 18, 2024
4d87723
source-google-sheets: initial changes for integrations tests passing.
aldogonzalez8 Dec 19, 2024
5b61aed
source-google-sheets: - all check operations (excelpt service auth fl…
aldogonzalez8 Dec 21, 2024
e1f6a75
source-google-sheets: all discovery operations passing
aldogonzalez8 Dec 23, 2024
30b6673
source-google-sheets: all original read integration tests passing
aldogonzalez8 Dec 24, 2024
510e074
source-google-sheets: test for read with conversion ready and also ch…
aldogonzalez8 Dec 24, 2024
76def28
source-google-sheets: add support for empty cells
aldogonzalez8 Dec 24, 2024
f9f658b
source-google-sheets: add tests for empty columns
aldogonzalez8 Dec 24, 2024
4b6636b
source-google-sheets: complete all read protocol tests
aldogonzalez8 Dec 31, 2024
6f24fac
source-google-sheets: merge from master
aldogonzalez8 Jan 1, 2025
6c53e9e
source-google-sheets: reduce unnecesary response filters
aldogonzalez8 Jan 1, 2025
041280a
source-google-sheets: fix styles
aldogonzalez8 Jan 1, 2025
00edfb4
source-google-sheets: split tests in different files per type
aldogonzalez8 Jan 1, 2025
5dd67d0
source-google-sheets: ruff-format
aldogonzalez8 Jan 1, 2025
6021253
source-google-sheets: update release information
aldogonzalez8 Jan 2, 2025
5876dc9
chore: auto-fix lint and format issues
octavia-squidington-iii Jan 2, 2025
874c191
source-google-sheets: add tests for components and urls
aldogonzalez8 Jan 3, 2025
c0e9e6f
source-google-sheets: ruff fix format
aldogonzalez8 Jan 3, 2025
ba3620b
source-google-sheets: add tests for parsing spreadsheet_id in config …
aldogonzalez8 Jan 3, 2025
a35f07f
source-google-sheets: ruff-format
aldogonzalez8 Jan 3, 2025
7ed534a
source-google-sheets: add manager for to increase batch size when 429…
aldogonzalez8 Jan 6, 2025
6ff3244
source-google-sheets: ruff-format
aldogonzalez8 Jan 6, 2025
e625be6
source-google-sheets: minor changes to batch size manager to handle t…
aldogonzalez8 Jan 6, 2025
6b487ea
source-google-sheets: ruff-format
aldogonzalez8 Jan 6, 2025
fc59b0d
source-google-sheets: add sheet id to range logging and remove old files
aldogonzalez8 Jan 6, 2025
1803ba8
source-google-sheets: ruff-format
aldogonzalez8 Jan 6, 2025
1dc50a5
source-google-sheets: Update lock file with latest CDK
aldogonzalez8 Jan 9, 2025
2b2f970
source-google-sheets: initial changes for jwt_requester
aldogonzalez8 Jan 10, 2025
0eef551
source-google-sheets: ruff format
aldogonzalez8 Jan 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: file
connectorType: source
definitionId: 71607ba1-c0ac-4799-8049-7f4b90dd50f7
dockerImageTag: 0.8.4
dockerImageTag: 0.9.4
dockerRepository: airbyte/source-google-sheets
documentationUrl: https://docs.airbyte.com/integrations/sources/google-sheets
githubIssueLabel: source-google-sheets
Expand Down
1,850 changes: 1,210 additions & 640 deletions airbyte-integrations/connectors/source-google-sheets/poetry.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "0.8.4"
version = "0.9.4"
name = "source-google-sheets"
description = "Source implementation for Google Sheets."
authors = [ "Airbyte <[email protected]>",]
Expand All @@ -16,8 +16,8 @@ repository = "https://github.com/airbytehq/airbyte"
include = "source_google_sheets"

[tool.poetry.dependencies]
python = "^3.10"
airbyte-cdk = "^4"
python = ">=3.10,<3.13"
airbyte-cdk = "^6"
google-auth-httplib2 = "==0.2.0"
Unidecode = "==1.3.8"
google-api-python-client = "==2.114.0"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

import logging
from typing import Optional, Union

import requests
from requests.status_codes import codes as status_codes


logger = logging.getLogger("airbyte")


class BatchSizeManager:
_instance = None
RATE_LIMIT_INCREASE = 100

def __new__(cls, initial_batch_size=200):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.row_batch_size = initial_batch_size
return cls._instance

def get_batch_size(self) -> int:
return self._instance.row_batch_size

def increase_row_batch_size(self, response_or_exception: Optional[Union[requests.Response, Exception]]) -> None:
if response_or_exception.status_code == status_codes.TOO_MANY_REQUESTS and self._instance.row_batch_size < 1000:
self._instance.row_batch_size += BatchSizeManager.RATE_LIMIT_INCREASE
logger.info(f"Increasing number of records fetching due to rate limits. Current value: {self._instance.row_batch_size}")

@classmethod
def reset(cls):
cls._instance = None

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from source_google_sheets.components.extractors import DpathSchemaMatchingExtractor, DpathSchemaExtractor
from source_google_sheets.components.partition_routers import RangePartitionRouter
from source_google_sheets.components.error_handlers import SheetDataErrorHandler
from source_google_sheets.components.retrievers import SheetsDataRetriever

__all__ = ["DpathSchemaMatchingExtractor", "RangePartitionRouter", "DpathSchemaExtractor", "SheetDataErrorHandler", "SheetsDataRetriever"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from dataclasses import dataclass
from typing import Optional, Union

import requests

from airbyte_cdk.sources.declarative.requesters.error_handlers.default_error_handler import DefaultErrorHandler
from airbyte_cdk.sources.streams.http.error_handlers.response_models import (
ErrorResolution,
)
from source_google_sheets.batch_size_manager import BatchSizeManager


@dataclass
class SheetDataErrorHandler(DefaultErrorHandler):
def interpret_response(self, response_or_exception: Optional[Union[requests.Response, Exception]]) -> ErrorResolution:
BatchSizeManager().increase_row_batch_size(response_or_exception)
return super().interpret_response(response_or_exception)
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Union

import dpath
import requests

from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.types import Config
from source_google_sheets.utils import name_conversion, safe_name_conversion


class RawSchemaParser:
config: Config

def _extract_data(
self,
body: Mapping[str, Any],
extraction_path: Optional[List[Union[InterpolatedString, str]]] = None,
default: Any = None,
) -> Any:
"""
Extracts data from the body based on the provided extraction path.
"""

if not extraction_path:
return body

path = [node.eval(self.config) if not isinstance(node, str) else node for node in extraction_path]

return dpath.get(body, path, default=default) # type: ignore # extracted

def _set_data(
self, value: Any, body: MutableMapping[str, Any], extraction_path: Optional[List[Union[InterpolatedString, str]]] = None
) -> Any:
"""
Sets data in the body based on the provided extraction path.
"""
if not extraction_path:
body = value

path = [node.eval(self.config) if not isinstance(node, str) else node for node in extraction_path]

dpath.set(body, path, value=value)

def parse_raw_schema_values(
self,
raw_schema_data: MutableMapping[Any, Any],
schema_pointer: List[Union[InterpolatedString, str]],
key_pointer: List[Union[InterpolatedString, str]],
names_conversion: bool,
):
"""
1. Parses sheet headers from the provided raw schema. This method assumes that data is contiguous
i.e: every cell contains a value and the first cell which does not contain a value denotes the end
of the headers.
2. Makes name conversion if required.
3. Removes duplicated fields from the schema.
Return a list of tuples with correct property index (by found in array), value and raw_schema
"""
raw_schema_properties = self._extract_data(raw_schema_data, schema_pointer)
duplicate_fields = set()
parsed_schema_values = []
seen_values = set()
for property_index, raw_schema_property in enumerate(raw_schema_properties):
raw_schema_property_value = self._extract_data(raw_schema_property, key_pointer)
if not raw_schema_property_value:
break
if names_conversion:
raw_schema_property_value = safe_name_conversion(raw_schema_property_value)

if raw_schema_property_value in seen_values:
duplicate_fields.add(raw_schema_property_value)
seen_values.add(raw_schema_property_value)
parsed_schema_values.append((property_index, raw_schema_property_value, raw_schema_property))

if duplicate_fields:
parsed_schema_values = [
parsed_schema_value for parsed_schema_value in parsed_schema_values if parsed_schema_value[1] not in duplicate_fields
]

return parsed_schema_values

def parse(self, schema_type_identifier, records: Iterable[MutableMapping[Any, Any]]):
"""Removes duplicated fields and makes names conversion"""
names_conversion = self.config.get("names_conversion", False)
schema_pointer = schema_type_identifier.get("schema_pointer")
key_pointer = schema_type_identifier["key_pointer"]
parsed_properties = []
for raw_schema_data in records:
for _, parsed_value, raw_schema_property in self.parse_raw_schema_values(
raw_schema_data, schema_pointer, key_pointer, names_conversion
):
self._set_data(parsed_value, raw_schema_property, key_pointer)
parsed_properties.append(raw_schema_property)
self._set_data(parsed_properties, raw_schema_data, schema_pointer)
yield raw_schema_data


@dataclass
class DpathSchemaMatchingExtractor(DpathExtractor, RawSchemaParser):
"""
Current DpathExtractor has problems for this type of data in response:
[
{
"values": [
[
"name1",
"22"
],
[
"name2",
"24"
],
[
"name3",
"25"
]
]
}
]

This is because "values" field is a list of lists instead of objects that we could extract with "*".
In order to do so we need the ordered properties from the schema that we can match with each list of values.
Then, if we get a properties object like {0: 'name', 1: 'age'} we end up with:

{"type":"RECORD","record":{"stream":"a_stream_name","data":{"name":"name1","age":"22"},"emitted_at":1734371904128}}
{"type":"RECORD","record":{"stream":"a_stream_name","data":{"name":"name2","age":"24"},"emitted_at":1734371904134}}
{"type":"RECORD","record":{"stream":"a_stream_name","data":{"name":"name3","age":"25"},"emitted_at":1734371904134}}
"""

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
super().__post_init__(parameters)
self.decoder = JsonDecoder(parameters={})
self._values_to_match_key = parameters["values_to_match_key"]
schema_type_identifier = parameters["schema_type_identifier"]
names_conversion = self.config.get("names_conversion", False)
self._indexed_properties_to_match = self.extract_properties_to_match(
parameters["properties_to_match"], schema_type_identifier, names_conversion=names_conversion
)

def extract_properties_to_match(self, properties_to_match, schema_type_identifier, names_conversion):
schema_pointer = schema_type_identifier.get("schema_pointer")
key_pointer = schema_type_identifier["key_pointer"]
indexed_properties = {}
for property_index, property_parsed_value, _ in self.parse_raw_schema_values(
properties_to_match, schema_pointer, key_pointer, names_conversion
):
indexed_properties[property_index] = property_parsed_value
return indexed_properties

@staticmethod
def match_properties_with_values(unmatched_values: List[str], indexed_properties: Dict[int, str]):
data = {}
for relevant_index in sorted(indexed_properties.keys()):
if relevant_index >= len(unmatched_values):
break

unmatch_value = unmatched_values[relevant_index]
if unmatch_value.strip() != "":
data[indexed_properties[relevant_index]] = unmatch_value
yield data

@staticmethod
def is_row_empty(cell_values: List[str]) -> bool:
for cell in cell_values:
if cell.strip() != "":
return False
return True

@staticmethod
def row_contains_relevant_data(cell_values: List[str], relevant_indices: Iterable[int]) -> bool:
for idx in relevant_indices:
if len(cell_values) > idx and cell_values[idx].strip() != "":
return True
return False

def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]:
raw_records_extracted = super().extract_records(response=response)
for raw_record in raw_records_extracted:
unmatched_values_collection = raw_record[self._values_to_match_key]
for unmatched_values in unmatched_values_collection:
if not DpathSchemaMatchingExtractor.is_row_empty(
unmatched_values
) and DpathSchemaMatchingExtractor.row_contains_relevant_data(unmatched_values, self._indexed_properties_to_match.keys()):
yield from DpathSchemaMatchingExtractor.match_properties_with_values(
unmatched_values, self._indexed_properties_to_match
)


class DpathSchemaExtractor(DpathExtractor, RawSchemaParser):
"""
Makes names conversion and parses sheet headers from the provided row.
"""

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
super().__post_init__(parameters)
self.schema_type_identifier = parameters["schema_type_identifier"]

def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]:
extracted_records = super().extract_records(response=response)
yield from self.parse(schema_type_identifier=self.schema_type_identifier, records=extracted_records)
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
import logging
from typing import Any, Iterable, Mapping

from airbyte_cdk.sources.declarative.partition_routers.single_partition_router import SinglePartitionRouter
from airbyte_cdk.sources.types import StreamSlice
from source_google_sheets.batch_size_manager import BatchSizeManager


logger = logging.getLogger("airbyte")


class RangePartitionRouter(SinglePartitionRouter):
"""
Create ranges to request rows data to google sheets api.
"""

parameters: Mapping[str, Any]

def __init__(self, parameters: Mapping[str, Any]) -> None:
super().__init__(parameters)
self.parameters = parameters
self.sheet_row_count = parameters.get("row_count", 0)
self._sheet_id = parameters.get("sheet_id")
self.batch_size_manager = BatchSizeManager(parameters.get("batch_size"))

def stream_slices(self) -> Iterable[StreamSlice]:
start_range = 2
while start_range <= self.sheet_row_count:
end_range = start_range + self.batch_size_manager.get_batch_size()
logger.info(f"Fetching range {self._sheet_id}!{start_range}:{end_range}")
yield StreamSlice(partition={"start_range": start_range, "end_range": end_range}, cursor_slice={})
start_range = end_range + 1
Loading
Loading