This is a Singer tap that reads data from files located inside a given S3 bucket and produces JSON-formatted data following the Singer spec.
This is a PipelineWise compatible tap connector.
The recommended method of running this tap is to use it from PipelineWise. When running it from PipelineWise you don't need to configure this tap with JSON files and most of things are automated. Please check the related documentation at Tap S3 CSV
If you want to run this Singer Tap independently please read further.
First, make sure Python 3 is installed on your system or follow these installation instructions for Mac or Ubuntu.
It's recommended to use a virtualenv:
python3 -m venv venv
pip install pipelinewise-tap-s3-csv
or
python3 -m venv venv
. venv/bin/activate
pip install --upgrade pip
pip install .
Here is an example of basic config, that's using the default Profile based authentication:
```json
{
"start_date": "2000-01-01T00:00:00Z",
"bucket": "tradesignals-crawler",
"warning_if_no_files": false,
"table_suffix": "_extract1",
"s3_proxies": {"http": "http://mickeymouse.com:3128","https": "http://mickeymouse.com:3128"},
"set_empty_values_null": true,
"tables": [{
"search_prefix": "feeds",
"search_pattern": ".csv",
"table_name": "my_table",
"key_properties": ["id"],
"delimiter": ",",
"string_overrides": ["field_1","field_2"],
"remove_character": "\""
}]
}
```
Profile based authentication used by default using the default
profile. To use another profile set aws_profile
parameter in config.json
or set the AWS_PROFILE
environment variable.
For non-profile based authentication set aws_access_key_id
, aws_secret_access_key
and optionally the aws_session_token
parameter in the config.json
. Alternatively you can define them out of config.json
by setting AWS_ACCESS_KEY_ID
, AWS_SECRET_ACCESS_KEY
and AWS_SESSION_TOKEN
environment variables.
A bit of a run down on each of the properties:
- aws_profile: AWS Profile name for Profile based authentication. If not provided,
AWS_PROFILE
environment variable will be used. - aws_access_key_id: AWS access key ID for Non-Profile based authentication. If not provided,
AWS_ACCESS_KEY_ID
environment variable will be used. - aws_secret_access_key: AWS secret access key for Non-Profile based authentication. If not provided,
AWS_SECRET_ACCESS_KEY
environment variable will be used. - aws_session_token: AWS session token for Non-Profile based authentication. If not provided,
AWS_SESSION_TOKEN
environment variable will be used. - aws_endpoint_url: (Optional): The complete URL to use for the constructed client. Normally, botocore will automatically construct the appropriate URL to use when communicating with a service. You can specify a complete URL (including the "http/https" scheme) to override this behavior. For example https://nyc3.digitaloceanspaces.com
- start_date: This is the datetime that the tap will use to look for newly updated or created files, based on the modified timestamp of the file.
- bucket: The name of the bucket to search for files under.
- warning_if_no_files: (Optional): Will attempt to log a warning rather than error if there are no files found for the search criteria if the setting is set to
true
. - table_suffix: (Optional): If set will append a suffix on each of the tables to provide some uniqueness e.g. a date or supplier identifier.
- s3_proxies: (Optional): A dict of proxies settings for use of a proxy server. Set to {} to avoid using a proxy server for s3 traffic.
- set_empty_values_null: (Optional): When set true will emit
null
(the JSON equivalent of None) instead of an empty string. - tables: JSON object that the tap will use to search for files, and emit records as "tables" from those files.
The table
field consists of one or more objects, that describe how to find files and emit records. A more detailed (and unescaped) example below:
[
{
"search_prefix": "exports"
"search_pattern": "my_table\\/.*\\.csv",
"table_name": "my_table",
"key_properties": ["id"],
"date_overrides": ["created_at"],
"delimiter": ",",
"string_overrides": ["field_1","field_2"],
"remove_character": "\""
},
...
]
- search_prefix: This is a prefix to apply after the bucket, but before the file search pattern, to allow you to find files in "directories" below the bucket.
- search_pattern: This is an escaped regular expression that the tap will use to find files in the bucket + prefix. It's a bit strange, since this is an escaped string inside of an escaped string, any backslashes in the RegEx will need to be double-escaped.
- table_name: This value is a string of your choosing, and will be used to name the stream that records are emitted under for files matching content.
- key_properties: These are the "primary keys" of the CSV files, to be used by the target for deduplication and primary key definitions downstream in the destination.
- date_overrides: Specifies field names in the files that are supposed to be parsed as a datetime. The tap doesn't attempt to automatically determine if a field is a datetime, so this will make it explicit in the discovered schema.
- datatype_overrides: A object / dictionary of header names in the file and any override datatype other than string you with to set as the datatype. Example config -
"datatype_overrides":{"administration_number":"integer","percentage":"number","grade":"integer"}
. - delimiter: This allows you to specify a custom delimiter, such as
\t
or|
, if that applies to your files. - string_overrides: Deprecated .Specifies field names in the files that should be parsed as a string regardless of what was discovered.
- guess_types: Deprecated. (default
True
) By default, column data types will be determined via scanning the first file in a table_spec. Set this toFalse
to disable this and set all columns tostring
. - remove_character: Specifies a character which can be removed from each line in the the file e.g.
"\""
will remove all double-quotes. - encoding: The encoding to use to read these files from codecs -> Standard Encodings
A sample configuration is available inside config.sample.json
- Create tests within the
tests/
directory and then run:
make venv
Followed by a run of unit tests:
make unit_tests
- Run through the full suite of tests and linters by running
poetry run tox
# Or to run for the current python instance
poetry run -e py
These must pass in order for PR's to be merged.
- To run integration tests:
Integration tests require a valid S3 bucket and credentials should be passed as environment variables, this project uses Minio server.
First, start a Minio server docker container:
mkdir -p ./minio/data/awesome_bucket
UID=$(id -u) GID=$(id -g) docker-compose up -d
Run integration tests:
make integration_tests
or
poetry run tox
- Install python dependencies and run python linter
make venv pylint