This repository is a project consisting of an ELT pipeline using Airflow in a Docker container, S3 and BigQuery as storage layers:
- Request the follwing endpoint to download information about weather foerecast in Mexico per day by municipality: https://smn.conagua.gob.mx/tools/GUI/webservices/?method=1 and https://smn.conagua.gob.mx/tools/GUI/webservices/?method=3 per hour.
- Uploads the Data into an S3 Bucket
- Load the raw data into BigQuery and computes the following aggregates:
- Generate a sample query.
- Generate a table that has the average temperature and precipitation by municipality of the last two hours. (WIP)
- Generate a joined table between the first generated table and the latest pre-computed data (data_municipios). (WIP)
If you're working on Windows I encourage to you use WSL to clone the repository if you don't wanna have problems with the file directory structure because in windows you have to use \
insted of /
- Clone the repository and checkout the tag called: v1.2.0 Refactor Hourly Pipeline that has everything to start using it
- Install Docker compose on your machine
- Locate your terminal in the root folder of the repository
- Add bq_sa.json file. This represents the Service Account for GCP.
- Add .env file using example.env as a template but adding your own AWS and GCP credentials.
- Execute
sudo docker-compose build
andsudo docker-compose up -d
- Open in your web browser http://localhost:8080/home
- Log in into the Airflow web UI user: airflow, password: airflow
- Everything is setup
I used Pandas as data manipulation layer because it offers a complete solution to the required tasks, Airflow as a pipeline orchestrator, AWS S3 for raw data storage (GZip Json) and Google BigQuery for structured data. Built a Custom Airflow Operator to move the data between the AWS S3 -> BigQuery
The file directory follows and standard etl pipeline structure, we have:
- airflow/ Includes:
- common/ Directory containing all the common modules
- custom_operators: Directory containing all the Airflow custom operators
- aws: AWS custom classes for the use cae (wrapper of boto3)
- gcp: Google Cloud custom classes for the use case (wrapper for gcp library)
- utils.py: Helper common functions
- dags/ directory where it's located the DAGS for bth pipelines
- daily_pipeline.py: DAG
- hourly_pipeline.py: DAG
- data/ Includes:
- data_municipios/: Where it's stored static data about municipios (It is also stored in BigQuery but i placed here for the other users.)
- Airflow config files.: aiflow.cfg, airflow.db, webserver_config.py
- queries.toml (File containing queries to be run into BigQuery)
- common/ Directory containing all the common modules
- example.env Env file of example (**You need to replace some env variables like S3 and BigQuery configuration)
- requirements.txt File
- docker-compose.yml File
- Dockerfile for building the custom docker image
- /example_data Contains some sample executions
I just did the following steps to complete the solution:
- Extract phase: Request the endpoint with the extract submodule that extracts the raw compressed file, stores it in an S3 bucket in raw format (Gzip Json).
- Load phase: Move the Data between S3 to BigQuery using custom operators and push the raw table into BigQuery prior schema and data type validations.
- Transform phase: Write aggregated tables into BigQuery from existing raw table, performing all the computing with BigQuery.
- It uses state of the art tools like Docker and Airflow to execute and orchestrate the pipeline virtually everywhere
- Modularized, atomic and well-documented code
- Use cloud managed airflow
- Include a CI/CD/QA/DQ Flow