diff --git a/README.md b/README.md index d76caf2..e98b664 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ This code uses Python 3. It is tested on Python 3.12, but will probably work on To install the project dependencies, first install pipenv globally with `pip install pipenv`. Then create a virtual env/install dependencies with `pipenv install --dev`. -To run code in the pipenv virtual environment, prefix your command with `pipenv run` (ex. `pipenv run dev` runs the development script). +To run code in the pipenv virtual environment, prefix your command with `pipenv run` (ex. `pipenv run python` runs the python interpreter in the pipenv environment). ### Using the pipenv environment in VSCode @@ -30,18 +30,21 @@ For development purposes, you can simply run the dev script: pipenv run dev ``` -This is currently just an alias to run the main script using `pipenv run python -m mediabridge.main`, but this may change in the future, so using `pipenv run dev` will be ensure the correct script is always run. +Be sure to specify options such as -v and -l *before* any subcommands (process, load, etc.). **NOTE:** *If you encounter a ModuleNotFoundError, make sure you are in the root directory of the project, as the `mediabridge` directory is the module Pipenv is trying to reference.* +This is currently just an alias to run the main script using `pipenv run python -m mediabridge.main`, but this may change in the future, so using `pipenv run dev` will ensure the correct script is always run. + ## Testing -To run unit tests, +To run unittests: -1. Ensure `pipenv` is installed -2. Run `pipenv run test` +``` +pipenv run test +``` -There is a GitHub actions "check" for passing tests, which must pass for you to be able to merge your PR. +These tests are also evaluated via a GitHub action when opening or updating a PR and must pass before merging. ## Code formatting diff --git a/mediabridge/data_processing/wiki_to_netflix.py b/mediabridge/data_processing/wiki_to_netflix.py index 929e385..d2abbc5 100644 --- a/mediabridge/data_processing/wiki_to_netflix.py +++ b/mediabridge/data_processing/wiki_to_netflix.py @@ -1,42 +1,45 @@ import csv +import dataclasses import logging -import sys import time -from dataclasses import dataclass -from typing import List, Optional +from contextlib import nullcontext +from pathlib import Path +from typing import Iterator import requests +import typer from tqdm import tqdm +from tqdm.contrib.logging import logging_redirect_tqdm from mediabridge.definitions import DATA_DIR, OUTPUT_DIR +from mediabridge.schemas.movies import EnrichedMovieData, MovieData + +USER_AGENT = "Noisebridge MovieBot 0.0.1/Audiodude " +DEFAULT_TEST_ROWS = 100 class WikidataServiceTimeoutException(Exception): pass -@dataclass -class MovieData: - movie_id: Optional[str] - genre: List[str] - director: Optional[str] - - +app = typer.Typer() log = logging.getLogger(__name__) -# need Genres, Directors, Title, year? - -USER_AGENT = "Noisebridge MovieBot 0.0.1/Audiodude " -DEFAULT_TEST_ROWS = 100 - -def read_netflix_txt(txt_file, num_rows=None): +def read_netflix_txt( + txt_file: Path, num_rows: int | None = None +) -> Iterator[list[str]]: """ - Reads and processes a Netflix text file. + Reads rows from the Netflix dataset file. Parameters: - txt_file (str): Path to the Netflix text file - num_rows (int): Number of rows to read from the file, defaults to all + txt_file (Path): Path to the Netflix text file. + + num_rows (int | None): Number of rows to read from the file, if None, + all rows are read. + + Yields: + List of strings representing the values of the next row in the file. """ with open(txt_file, "r", encoding="ISO-8859-1") as netflix_data: for i, line in enumerate(netflix_data): @@ -45,30 +48,43 @@ def read_netflix_txt(txt_file, num_rows=None): yield line.rstrip().split(",", 2) -def create_netflix_csv(csv_path, data_list): +def create_netflix_csv(csv_path: Path, data_list: list[MovieData]): """ - Writes data to a Netflix CSV file. + Writes list of MovieData objects to a CSV file, either with enriched or + plain/missing data. Parameters: - csv_name (str): Name of CSV file to be created - data_list (list): Row of data to be written to CSV file + csv_name (Path): Path to CSV file to be written to. + + data_list (list[MovieData]): List of MovieData objects to be written. """ - with open(csv_path, "w") as netflix_csv: - csv.writer(netflix_csv).writerows(data_list) + with open(csv_path, "w") as csv_file: + if data_list: + # Write header based on type of first item in data_list + writer = csv.DictWriter( + csv_file, + fieldnames=(f.name for f in dataclasses.fields(data_list[0])), + ) + writer.writeheader() + writer.writerows((movie.flatten_values() for movie in data_list)) -def wiki_feature_info(data, key): +def wiki_feature_info(data: dict, key: str) -> str | list | None: """ Extracts movie information from a Wikidata query result. Parameters: - data (dict): JSON response from a SPARQL query, see example in get_example_json_sparql_response(). - key (str): The key for the information to extract (e.g., 'item', 'genreLabel', 'directorLabel'). + data (dict): JSON response from a SPARQL query, see example in + get_example_json_sparql_response(). + + key (str): The key for the information to extract (e.g., 'item', + 'genreLabel', 'directorLabel'). Returns: - None: If the key is not present or no results are available. - list: If the key is 'genreLabel', returns a list of unique genre labels. - String: If the Key is present, return the movie ID of the first binding, in other words the first row in query result + The formatted movie information, or None if the key is not present or no + results are available. If the Key is present, return a list of unique + genre labels if the key is 'genreLabel', otherwise return the movie ID + of the first binding (in other words, the first row in query result). """ if ( len(data["results"]["bindings"]) < 1 @@ -86,16 +102,9 @@ def wiki_feature_info(data, key): return data["results"]["bindings"][0][key]["value"].split("/")[-1] -def format_sparql_query(title, year): +def format_sparql_query(title: str, year: int) -> str: """ - Formats SPARQL query for Wiki data - - Parameters: - title (str): name of content to query - year (int): release year of the movie - - Returns: - SPARQL Query (str): formatted string with movie title and year + Formats a SPARQL query for Wiki data using the given title and year. """ QUERY = """ @@ -146,77 +155,81 @@ def format_sparql_query(title, year): return QUERY % {"Title": title, "Year": year} -def wiki_query(data_csv, user_agent): +def wiki_query( + movie: MovieData, user_agent: str = USER_AGENT +) -> EnrichedMovieData | None: """ - Formats SPARQL query for Wiki data + Queries Wikidata for information about a movie. Parameters: - data_csv (list of lists): Rows of movie data with [movie ID, release year, title]. - user_agent (str): used to identify our script when sending requests to Wikidata SPARQL API. + movie (MovieData): A MovieData object to use in the sparql query. - Returns: - list of WikiMovieData: A list of movieData instances with movie IDs, genres, and directors. - """ - wiki_data_list = [] + user_agent (str): Used to identify our script when sending requests to + Wikidata SPARQL API. - for row in tqdm(data_csv): - id, year, title = row - if year is None: - continue + Returns: + An EnrichedMovieData object containing information about the movie, or + None if no results are found. - SPARQL = format_sparql_query(title, int(year)) - # logging.debug(SPARQL) - - tries = 0 - while True: - try: - log.info(f"Requesting id {id} (try {tries})") - response = requests.post( - "https://query.wikidata.org/sparql", - headers={"User-Agent": user_agent}, - data={"query": SPARQL, "format": "json"}, - timeout=20, - ) - break - except requests.exceptions.Timeout: - wait_time = 2**tries * 5 - time.sleep(wait_time) - tries += 1 - if tries > 5: - raise WikidataServiceTimeoutException( - f"Tried {tries} time, could not reach Wikidata " - f"(movie: {title} {year})" - ) - - response.raise_for_status() - data = response.json() - log.debug(data) - - if not data["results"]["bindings"]: - wiki_data_list.append(None) - log.warning(f"Could not find movie id {id} ({repr(title)}, {repr(year)})") - else: - wiki_data_list.append( - MovieData( - movie_id=wiki_feature_info(data, "item"), - genre=wiki_feature_info(data, "genreLabel"), - director=wiki_feature_info(data, "directorLabel"), - ) - ) - log.info( - f"Found movie id {id} (' {title} ', {year}, {wiki_data_list[-1]}) " + Raises: + WikidataServiceTimeoutException: If the Wikidata service times out. + """ + SPARQL = format_sparql_query(movie.title, movie.year) + + tries = 0 + while True: + try: + log.info(f"Requesting id {movie.netflix_id} (try {tries})") + response = requests.post( + "https://query.wikidata.org/sparql", + headers={"User-Agent": user_agent}, + data={"query": SPARQL, "format": "json"}, + timeout=20, ) + break + except requests.exceptions.Timeout: + wait_time = 2**tries * 5 + time.sleep(wait_time) + tries += 1 + if tries > 5: + raise WikidataServiceTimeoutException( + f"Tried {tries} time, could not reach Wikidata " + f'(movie: "{movie.title}" {movie.year})' + ) + + response.raise_for_status() + data = response.json() + log.debug(data) + + if data["results"]["bindings"]: + log.info(f'Found movie id {movie.netflix_id}: ("{movie.title}", {movie.year})') + return EnrichedMovieData( + **vars(movie), + wikidata_id=wiki_feature_info(data, "item"), + genres=wiki_feature_info(data, "genreLabel"), + director=wiki_feature_info(data, "directorLabel"), + ) - return wiki_data_list + log.warning( + f'Could not find movie id {movie.netflix_id}: ("{movie.title}", {movie.year})' + ) -def process_data(num_rows=None, output_missing_csv_path=None): +def process_data(num_rows: int | None = None, output_missing_csv_path: Path = None): """ - Processes Netflix movie data by enriching it with information from Wikidata and writes the results to a CSV file. - Netflix data was conveted from a generator to a list to avoid exaustion. was running into an issue where nothing would print to CSV file + Processes Netflix movie data by enriching it with information from Wikidata + and writes the results to a CSV file. + + Parameters: + num_rows (int): Number of rows to process. If None, all rows are + processed. - num_rows (int): Number of rows to process. If None, all rows are processed. - output_missing_csv_path (str): If provided, movies that could not be matched will be written to a CSV at this path. + output_missing_csv_path (Path): If provided, movies that could not be + matched will be written to a CSV at this path. + + Raises: + FileNotFoundError: If the data directory or the movie data file does not + exist. """ if not DATA_DIR.exists(): @@ -225,7 +238,7 @@ def process_data(num_rows=None, output_missing_csv_path=None): "https://archive.org/details/nf_prize_dataset.tar" ) - movie_data_path = DATA_DIR.joinpath("movie_titles.txt") + movie_data_path = DATA_DIR / "movie_titles.txt" if not movie_data_path.exists(): raise FileNotFoundError( @@ -233,68 +246,86 @@ def process_data(num_rows=None, output_missing_csv_path=None): "https://archive.org/details/nf_prize_dataset.tar" ) + total_count = 0 missing_count = 0 - processed_data = [] + processed = [] missing = [] - netflix_data = list(read_netflix_txt(movie_data_path, num_rows)) - - netflix_csv = OUTPUT_DIR.joinpath("movie_titles.csv") - - enriched_movies = wiki_query(netflix_data, USER_AGENT) + print(f"Processing {num_rows or 'all'} rows...") - num_rows = len(enriched_movies) + netflix_data = read_netflix_txt(movie_data_path, num_rows) + for row in tqdm(netflix_data, total=num_rows): + total_count += 1 - for index, row in enumerate(netflix_data): - netflix_id, year, title = row - movie_data = enriched_movies[index] + id, year, title = row + if year == "NULL": + log.warning(f"Skipping movie id {id}: (' {title} ', {year})") + continue - if movie_data is None: - missing_count += 1 - movie = [ - netflix_id, - "null", - title, - year, - "null", - "null", - ] - missing.append(movie) + netflix_data = MovieData(int(id), title, int(year)) + if wiki_data := wiki_query(netflix_data): + # wiki_query finds match, add to processed data + processed.append(wiki_data) else: - if movie_data.genre: - genres = "; ".join(movie_data.genre) - else: - genres = "" - if movie_data.director: - director = movie_data.director - else: - director = "" - movie = [ - netflix_id, - movie_data.movie_id, - title, - year, - genres, - director, - ] - processed_data.append(movie) - - netflix_csv = OUTPUT_DIR.joinpath("movie_titles.csv") - create_netflix_csv(netflix_csv, processed_data) + # Otherwise, is missing a match + missing_count += 1 + if output_missing_csv_path: + missing.append(netflix_data) + + output_csv = OUTPUT_DIR / "matches.csv" + create_netflix_csv(output_csv, processed) if output_missing_csv_path: - missing_csv = OUTPUT_DIR.joinpath(output_missing_csv_path) + missing_csv = OUTPUT_DIR / output_missing_csv_path create_netflix_csv(missing_csv, missing) print( - f"missing: {missing_count} ({missing_count / num_rows * 100:.2f}%)\n" - f"found: {num_rows - missing_count} ({(num_rows - missing_count) / num_rows * 100:.2f}%)\n" - f"total: {num_rows}\n", + f"missing: {missing_count} ({missing_count / total_count * 100:.2f}%)\n" + f"found: {total_count - missing_count} ({(total_count - missing_count) / total_count * 100:.2f}%)\n" + f"total: {total_count}\n", ) +@app.command() +def process( + ctx: typer.Context, + full: bool = typer.Option( + False, + "--full", + "-f", + help="Run processing on full dataset. Overrides --num_rows.", + ), + num_rows: int = typer.Option( + DEFAULT_TEST_ROWS, + "--num-rows", + "-n", + help="Number of rows to process. If --full is True, all rows are processed", + ), + missing_out_path: str = typer.Option( + None, + "--missing-out-path", + "-m", + help=( + "If provided, movies that could not be matched will be written to a " + "CSV at this path, relative to the output directory." + ), + ), +): + """Enrich Netflix data with Wikidata matches and write matches to CSV.""" + log.debug(ctx.obj) + log_to_file = ctx.obj and ctx.obj.log_to_file + # We redirect logs to stdout through tqdm to avoid breaking progress bar. + # But when logging to file, we use nullcontext or tqdm will redirect logs + # back to stdout. + with nullcontext() if log_to_file else logging_redirect_tqdm(): + num_rows = None if full else num_rows + try: + process_data(num_rows, output_missing_csv_path=missing_out_path) + except Exception as e: + # include fatal exceptions with traceback in logs + if log_to_file: + logging.exception("Uncaught exception", exc_info=True) + raise e + + if __name__ == "__main__": - # Test is true if no argument is passed or if the first argument is not '--prod'. - test = len(sys.argv) < 2 or sys.argv[1] != "--prod" - process_data( - num_rows=DEFAULT_TEST_ROWS if test else None, - ) + app() diff --git a/mediabridge/data_processing/wiki_to_netflix_test.py b/mediabridge/data_processing/wiki_to_netflix_test.py index b7ea413..eb6e259 100644 --- a/mediabridge/data_processing/wiki_to_netflix_test.py +++ b/mediabridge/data_processing/wiki_to_netflix_test.py @@ -1,27 +1,25 @@ -from wiki_to_netflix import format_sparql_query -from wiki_to_netflix_test_data import EXPECTED_SPARQL_QUERY +import mediabridge.data_processing.wiki_to_netflix as w2n +from mediabridge.data_processing.wiki_to_netflix_test_data import EXPECTED_SPARQL_QUERY +from mediabridge.schemas.movies import EnrichedMovieData, MovieData def test_format_sparql_query(): - QUERY = format_sparql_query("The Room", 2003) + QUERY = w2n.format_sparql_query("The Room", 2003) assert QUERY == EXPECTED_SPARQL_QUERY -def get_example_json_sparql_response(): - """ - Returns an example response structure for testing. - """ - return { - "results": { - "bindings": [ - { - "item": { - "type": "uri", - "value": "http://www.wikidata.org/entity/Q12345", - }, - "genreLabel": {"type": "literal", "value": "Science Fiction"}, - "directorLabel": {"type": "literal", "value": "John Doe"}, - } - ] - } - } +def test_wiki_query(): + movie = MovieData("0", "The Room", 2003) + result = w2n.wiki_query(movie) + + # Order of genres is not guaranteed, so we sort before checking for equality + result.genres = sorted(result.genres) + + assert result == EnrichedMovieData( + netflix_id="0", + title="The Room", + year=2003, + wikidata_id="Q533383", + genres=["drama film", "independent film", "romance film"], + director="Tommy Wiseau", + ) diff --git a/mediabridge/db/load.py b/mediabridge/db/load.py new file mode 100644 index 0000000..fe74c3e --- /dev/null +++ b/mediabridge/db/load.py @@ -0,0 +1,36 @@ +import csv +import dataclasses +import logging + +from typer import Typer + +from mediabridge.definitions import OUTPUT_DIR +from mediabridge.schemas.movies import EnrichedMovieData + +log = logging.getLogger(__name__) +app = Typer() + + +@app.command() +def load(): + """ + Load a csv of movie data into the mongo database. + """ + with open(OUTPUT_DIR / "matches.csv", "r") as f: + reader = csv.reader(f) + + header = next(reader) + if header != [f.name for f in dataclasses.fields(EnrichedMovieData)]: + raise ValueError( + "Header does not match expected dataclass fields (EnrichedMovieData), " + f"expected {dataclasses.fields(EnrichedMovieData)}, got {header}" + ) + + for row in reader: + movie = EnrichedMovieData(*row) + log.info(f"Inserting {movie} into MongoDB") + # TODO: Needs implementation, bulk inserts for performance + + +if __name__ == "__main__": + app() diff --git a/mediabridge/db/queries.py b/mediabridge/db/queries.py index 171e49a..6e38e5d 100644 --- a/mediabridge/db/queries.py +++ b/mediabridge/db/queries.py @@ -5,9 +5,9 @@ def insert_into_mongo(movie): db = connect_to_mongo() collection = db["movies"] collection.update_one( - {"wikidata_id": movie[1]}, + {"_id": movie[1]}, { - "set": { + "$set": { "netflix_id": movie[0], "wikidata_id": movie[1], "title": movie[2], @@ -18,3 +18,9 @@ def insert_into_mongo(movie): }, upsert=True, ) + + +def bulk_insert(operations): + db = connect_to_mongo() + collection = db["movies"] + collection.bulk_write(operations) diff --git a/mediabridge/definitions.py b/mediabridge/definitions.py index cbf4025..70f9907 100644 --- a/mediabridge/definitions.py +++ b/mediabridge/definitions.py @@ -7,8 +7,8 @@ MODULE_DIR = Path(__file__).parent PROJECT_DIR = MODULE_DIR.parent -DATA_DIR = PROJECT_DIR.joinpath("data") -OUTPUT_DIR = PROJECT_DIR.joinpath("out") +DATA_DIR = PROJECT_DIR / "data" +OUTPUT_DIR = PROJECT_DIR / "out" if __name__ == "__main__": print(MODULE_DIR) diff --git a/mediabridge/main.py b/mediabridge/main.py index 4f8f904..1dac267 100644 --- a/mediabridge/main.py +++ b/mediabridge/main.py @@ -1,42 +1,32 @@ import logging -from contextlib import nullcontext +from dataclasses import dataclass from datetime import datetime import typer as typer -from tqdm.contrib.logging import logging_redirect_tqdm from mediabridge.data_processing import wiki_to_netflix +from mediabridge.db import load from mediabridge.definitions import OUTPUT_DIR +app = typer.Typer(no_args_is_help=True, add_completion=False) +app.add_typer(wiki_to_netflix.app) +app.add_typer(load.app) + +@dataclass +class AppContext: + log_to_file: bool = False + + +@app.callback() def main( + ctx: typer.Context, verbose: bool = typer.Option( False, "--verbose", "-v", help="Enable verbose logging." ), log: bool = typer.Option( False, "--log", "-l", help="Enable all logging message levels and log to file." ), - full: bool = typer.Option( - False, - "--full", - "-f", - help="Run processing on full dataset. Overrides --num_rows.", - ), - num_rows: int = typer.Option( - 100, - "--num_rows", - "-n", - help="Number of rows to process. If --full is True, all rows are processed", - ), - missing_out_path: str = typer.Option( - None, - "--missing_out_path", - "-m", - help=( - f"If provided, movies that could not be matched will be written to a " - f"CSV at this path, relative to the {OUTPUT_DIR} directory." - ), - ), ): if not OUTPUT_DIR.exists(): print( @@ -48,9 +38,7 @@ def main( # log all messages to new file logging.basicConfig( level=logging.DEBUG, - filename=OUTPUT_DIR.joinpath( - f"mb_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" - ), + filename=OUTPUT_DIR / f"mb_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log", filemode="x", format="%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s", datefmt="%H:%M:%S", @@ -61,22 +49,8 @@ def main( else: level = logging.WARNING logging.basicConfig(level=level, format="[%(levelname)s] %(message)s") - - # We redirect logs to stdout through tqdm to avoid breaking progress bar. - # But when logging to file, we use nullcontext or tqdm will redirect logs - # back to stdout. - with logging_redirect_tqdm() if not log else nullcontext(): - num_rows = None if full else num_rows - try: - wiki_to_netflix.process_data( - num_rows, output_missing_csv_path=missing_out_path - ) - except Exception as e: - # include fatal exceptions with traceback in logs - if log: - logging.exception("Uncaught exception") - raise e + ctx.obj = AppContext(log_to_file=log) if __name__ == "__main__": - typer.run(main) + app() diff --git a/mediabridge/models/predict.py b/mediabridge/recommender/predict.py similarity index 100% rename from mediabridge/models/predict.py rename to mediabridge/recommender/predict.py diff --git a/mediabridge/models/train_model.py b/mediabridge/recommender/train_model.py similarity index 100% rename from mediabridge/models/train_model.py rename to mediabridge/recommender/train_model.py diff --git a/mediabridge/models/utils.py b/mediabridge/recommender/utils.py similarity index 100% rename from mediabridge/models/utils.py rename to mediabridge/recommender/utils.py diff --git a/mediabridge/schemas/movies.py b/mediabridge/schemas/movies.py new file mode 100644 index 0000000..d119a88 --- /dev/null +++ b/mediabridge/schemas/movies.py @@ -0,0 +1,28 @@ +from dataclasses import dataclass +from typing import Optional + + +@dataclass(order=True) +class MovieData: + """Dataclass for known data from the Netflix dataset""" + + netflix_id: int + title: str + year: int + + def flatten_values(self): + """Format all dataclass fields into a mapping of strings by joining + lists with semicolons""" + return { + k: (";".join(v) if isinstance(v, list) else str(v)) + for (k, v) in vars(self).items() + } + + +@dataclass(order=True) +class EnrichedMovieData(MovieData): + """Dataclass for enriched data from a Wikidata match""" + + wikidata_id: str + genres: Optional[list[str]] + director: Optional[str] diff --git a/mediabridge/schemas/movies_test.py b/mediabridge/schemas/movies_test.py new file mode 100644 index 0000000..d1bceac --- /dev/null +++ b/mediabridge/schemas/movies_test.py @@ -0,0 +1,15 @@ +from mediabridge.schemas.movies import EnrichedMovieData + + +def test_flatten_values(): + vals = EnrichedMovieData( + 1, "The Matrix", 1999, "Q11424", ["Action", "Drama"], "Lana Wachowski" + ).flatten_values() + assert vals == { + "netflix_id": "1", + "wikidata_id": "Q11424", + "title": "The Matrix", + "year": "1999", + "genres": "Action;Drama", + "director": "Lana Wachowski", + }