Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lesson level testing #47

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions .github/workflows/ruff.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
name: Dagster University Ruff

on:
push:
branches:
- "main"
paths:
- dagster_university/**
- dagster_university_tests/**
pull_request:
branches:
- "main"
paths:
- dagster_university/**
- dagster_university_tests/**

jobs:
build:
name: ruff
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4

- name: ruff
uses: astral-sh/ruff-action@v3
with:
args: "format --check"
50 changes: 50 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
name: Dagster University Lesson Tests

on:
schedule:
- cron: "0 0 * * 0"

push:
branches:
- "main"
paths:
- dagster_university/**
- dagster_university_tests/**
- setup.py

pull_request:
branches:
- "main"
paths:
- dagster_university/**
- dagster_university_tests/**
- setup.py

jobs:
build:
name: tests
runs-on: ubuntu-latest
strategy:
matrix:
python-version:
# TODO: Re-enable when testing done
# - "3.9"
# - "3.10"
# - "3.11"
- "3.12"

steps:
- uses: actions/checkout@v4

- name: Install uv and set the python version
uses: astral-sh/setup-uv@v5
with:
enable-cache: true
cache-dependency-glob: "setup.py"
python-version: ${{ matrix.python-version }}

- name: Install the project
run: uv pip install -r setup.py --extra dev

- name: Run tests
run: pytest dagster_university_tests -v
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,11 @@ tmp*/

*.duckdb
*.geojson
*.csv
*.parquet

data/outputs/*
data/raw/*
data/requests/*.json
data/staging/*
data/staging/*

.ruff_cache
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ This is a [Dagster](https://dagster.io/) project made to accompany Dagster Unive
First, install your Dagster code location as a Python package. By using the --editable flag, pip will install your Python package in ["editable mode"](https://pip.pypa.io/en/latest/topics/local-project-installs/#editable-installs) so that as you develop, local code changes will automatically apply.

```bash
pip install -e ".[dev]"
uv pip install -e ".[dev]"
```

Duplicate the `.env.example` file and rename it to `.env`. Then, fill in the values for the environment variables in the file.
Expand All @@ -26,6 +26,11 @@ dagster dev

Open http://localhost:3000 with your browser to see the project.

> [!NOTE]
> Running `dagster dev` will put you in the finished Dagster University project (at the end of lesson 9). To see any of the other lessons run:
> `dagster dev -m dagster_university.lesson_{LESSON NUMBER}.definitions`
>
> Lessons 3 and 4 do not use a `definition` in the content.

## Development

Expand All @@ -42,6 +47,12 @@ Tests are in the `dagster_university_tests` directory and you can run tests usin
pytest dagster_university_tests
```

### Formatting / Linting

```bash
ruff check --select I --fix .
```

### Schedules and sensors

If you want to enable Dagster [Schedules](https://docs.dagster.io/concepts/partitions-schedules-sensors/schedules) or [Sensors](https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors) for your jobs, the [Dagster Daemon](https://docs.dagster.io/deployment/dagster-daemon) process must be running. This is done automatically when you run `dagster dev`.
Expand Down
31 changes: 0 additions & 31 deletions dagster_university/__init__.py
Original file line number Diff line number Diff line change
@@ -1,31 +0,0 @@
from dagster import Definitions, load_assets_from_modules

from .assets import trips, metrics, requests
from .resources import database_resource
from .jobs import trip_update_job, weekly_update_job, adhoc_request_job
from .schedules import trip_update_schedule, weekly_update_schedule
from .sensors import adhoc_request_sensor

trip_assets = load_assets_from_modules([trips])
metric_assets = load_assets_from_modules(
modules=[metrics],
group_name="metrics",
)
requests_assets = load_assets_from_modules(
modules=[requests],
group_name="requests",
)

all_jobs = [trip_update_job, weekly_update_job, adhoc_request_job]
all_schedules = [trip_update_schedule, weekly_update_schedule]
all_sensors = [adhoc_request_sensor]

defs = Definitions(
assets=trip_assets + metric_assets + requests_assets,
resources={
"database": database_resource,
},
jobs=all_jobs,
schedules=all_schedules,
sensors=all_sensors,
)
Empty file.
4 changes: 4 additions & 0 deletions dagster_university/lesson_3/assets/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import os

TAXI_ZONES_FILE_PATH = os.path.join("data", "raw", "taxi_zones.csv")
TAXI_TRIPS_TEMPLATE_FILE_PATH = os.path.join("data", "raw", "taxi_trips_{}.parquet")
34 changes: 34 additions & 0 deletions dagster_university/lesson_3/assets/trips.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import requests
from dagster import asset

from . import constants


@asset
def taxi_trips_file() -> None:
"""
The raw parquet files for the taxi trips dataset. Sourced from the NYC Open Data portal.
"""
month_to_fetch = "2023-03"
raw_trips = requests.get(
f"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{month_to_fetch}.parquet"
)

with open(
constants.TAXI_TRIPS_TEMPLATE_FILE_PATH.format(month_to_fetch), "wb"
) as output_file:
output_file.write(raw_trips.content)


@asset
def taxi_zones_file() -> None:
"""
The raw CSV file for the taxi zones dataset. Sourced from the NYC Open Data portal.
"""
# TODO: Fix when self host CSV
# raw_taxi_zones = requests.get(
# "https://data.cityofnewyork.us/api/views/755u-8jsi/rows.csv?accessType=DOWNLOAD"
# )

# with open(constants.TAXI_ZONES_FILE_PATH, "wb") as output_file:
# output_file.write(raw_taxi_zones.content)
9 changes: 9 additions & 0 deletions dagster_university/lesson_3/definitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from dagster import Definitions, load_assets_from_modules

from .assets import trips

trip_assets = load_assets_from_modules([trips])

defs = Definitions(
assets=trip_assets,
)
Empty file.
Empty file.
13 changes: 13 additions & 0 deletions dagster_university/lesson_4/assets/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import os

# TODO: Fix when self host CSV
# TAXI_ZONES_FILE_PATH = os.path.join("data", "raw", "taxi_zones.csv")
TAXI_ZONES_FILE_PATH = os.path.join("data", "source", "taxi_zones.csv")
TAXI_TRIPS_TEMPLATE_FILE_PATH = os.path.join("data", "raw", "taxi_trips_{}.parquet")

TRIPS_BY_AIRPORT_FILE_PATH = os.path.join("data", "outputs", "trips_by_airport.csv")
TRIPS_BY_WEEK_FILE_PATH = os.path.join("data", "outputs", "trips_by_week.csv")
MANHATTAN_STATS_FILE_PATH = os.path.join("data", "staging", "manhattan_stats.geojson")
MANHATTAN_MAP_FILE_PATH = os.path.join("data", "outputs", "manhattan_map.png")

DATE_FORMAT = "%Y-%m-%d"
118 changes: 118 additions & 0 deletions dagster_university/lesson_4/assets/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import os
from datetime import datetime, timedelta

import duckdb
import geopandas as gpd
import pandas as pd
import plotly.express as px
import plotly.io as pio
from dagster import asset
from dagster._utils.backoff import backoff

from . import constants


@asset(deps=["taxi_trips"])
def trips_by_week() -> None:
conn = backoff(
fn=duckdb.connect,
retry_on=(RuntimeError, duckdb.IOException),
kwargs={
"database": os.getenv("DUCKDB_DATABASE"),
},
max_retries=10,
)

current_date = datetime.strptime("2023-03-01", constants.DATE_FORMAT)
end_date = datetime.strptime("2023-04-01", constants.DATE_FORMAT)

result = pd.DataFrame()

while current_date < end_date:
current_date_str = current_date.strftime(constants.DATE_FORMAT)
query = f"""
select
vendor_id, total_amount, trip_distance, passenger_count
from trips
where date_trunc('week', pickup_datetime) = date_trunc('week', '{current_date_str}'::date)
"""

data_for_week = conn.execute(query).fetch_df()

aggregate = (
data_for_week.agg(
{
"vendor_id": "count",
"total_amount": "sum",
"trip_distance": "sum",
"passenger_count": "sum",
}
)
.rename({"vendor_id": "num_trips"})
.to_frame()
.T
) # type: ignore

aggregate["period"] = current_date

result = pd.concat([result, aggregate])

current_date += timedelta(days=7)

# clean up the formatting of the dataframe
result["num_trips"] = result["num_trips"].astype(int)
result["passenger_count"] = result["passenger_count"].astype(int)
result["total_amount"] = result["total_amount"].round(2).astype(float)
result["trip_distance"] = result["trip_distance"].round(2).astype(float)
result = result[
["period", "num_trips", "total_amount", "trip_distance", "passenger_count"]
]
result = result.sort_values(by="period")

result.to_csv(constants.TRIPS_BY_WEEK_FILE_PATH, index=False)


@asset(deps=["taxi_trips", "taxi_zones"])
def manhattan_stats() -> None:
query = """
select
zones.zone,
zones.borough,
zones.geometry,
count(1) as num_trips,
from trips
left join zones on trips.pickup_zone_id = zones.zone_id
where borough = 'Manhattan' and geometry is not null
group by zone, borough, geometry
"""

conn = duckdb.connect(os.getenv("DUCKDB_DATABASE"))
trips_by_zone = conn.execute(query).fetch_df()

trips_by_zone["geometry"] = gpd.GeoSeries.from_wkt(trips_by_zone["geometry"])
trips_by_zone = gpd.GeoDataFrame(trips_by_zone)

with open(constants.MANHATTAN_STATS_FILE_PATH, "w") as output_file:
output_file.write(trips_by_zone.to_json())


@asset(
deps=["manhattan_stats"],
)
def manhattan_map() -> None:
trips_by_zone = gpd.read_file(constants.MANHATTAN_STATS_FILE_PATH)

fig = px.choropleth_mapbox(
trips_by_zone,
geojson=trips_by_zone.geometry.__geo_interface__,
locations=trips_by_zone.index,
color="num_trips",
color_continuous_scale="Plasma",
mapbox_style="carto-positron",
center={"lat": 40.758, "lon": -73.985},
zoom=11,
opacity=0.7,
labels={"num_trips": "Number of Trips"},
)

pio.write_image(fig, constants.MANHATTAN_MAP_FILE_PATH)
Loading
Loading