diff --git a/docker-compose.yml b/docker-compose.yml index 91b3c548..6de643bc 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,6 +14,7 @@ services: - /data/wp1bot/credentials.py:/usr/src/app/wp1/credentials.py - /data/wp1bot/db/yoyo.ini:/usr/src/app/db/production/yoyo.ini - /srv/log/wp1bot/:/var/log/wp1bot/ + - /srv/data/wp1bot/:/var/data/wp1bot/ links: - redis logging: diff --git a/docker/dev-db/README.md b/docker/dev-db/README.md index 2d27255c..8a50cb80 100644 --- a/docker/dev-db/README.md +++ b/docker/dev-db/README.md @@ -14,7 +14,7 @@ The dev database will need to be migrated in the following circumstances: To migrate, cd to the `db/dev` directory and run the following command: ```bash -PYTHONPATH=$PYTHONPATH:../.. yoyo apply +PYTHONPATH=$PYTHONPATH:../.. pipenv run yoyo apply ``` The `PYTHONPATH` environment variable is necessary because some of the migrations diff --git a/wp1/credentials.py.dev.e2e b/wp1/credentials.py.dev.e2e index 9855819b..f0e02680 100644 --- a/wp1/credentials.py.dev.e2e +++ b/wp1/credentials.py.dev.e2e @@ -39,6 +39,10 @@ CREDENTIALS = { 'secret': '', 'bucket': 'org-kiwix-dev-wp1', }, + 'FILE_PATH': { + # Path where pageviews.bz2 file (~3GB) will be downloaded. + 'pageviews': '/tmp/pageviews', + } }, Environment.TEST: {}, Environment.PRODUCTION: {} diff --git a/wp1/credentials.py.example b/wp1/credentials.py.example index aecf512f..2241d668 100644 --- a/wp1/credentials.py.example +++ b/wp1/credentials.py.example @@ -122,6 +122,11 @@ CREDENTIALS = { # server, to ensure requests to the webhook endpoint are valid. 'hook_token': '', # EDIT this line }, + + 'FILE_PATH': { + # Path where pageviews.bz2 file (~3GB) will be downloaded. + 'pageviews': '/tmp/pageviews', + } }, # Environment for python nosetests. In this environment, only the MySQL database @@ -253,4 +258,9 @@ CREDENTIALS = { # # server, to ensure requests to the webhook endpoint are valid. # 'hook_token': '', # EDIT this line # }, + + # 'FILE_PATH': { + # # Path where pageviews.bz2 file (~3GB) will be downloaded. + # 'pageviews': '/var/data/wp1bot/pageviews', + # } } diff --git a/wp1/scores.py b/wp1/scores.py index 09cd1d08..ffc1c397 100644 --- a/wp1/scores.py +++ b/wp1/scores.py @@ -1,6 +1,8 @@ from bz2 import BZ2Decompressor from collections import namedtuple from contextlib import contextmanager +import logging +import os.path import csv from datetime import datetime, timedelta @@ -14,11 +16,22 @@ PageviewRecord = namedtuple('PageviewRecord', ['lang', 'name', 'page_id', 'views']) +logger = logging.getLogger(__name__) + +try: + from wp1.credentials import ENV, CREDENTIALS +except ImportError: + logger.exception('The file credentials.py must be populated manually in ' + 'order to download pageviews') + CREDENTIALS = None + ENV = None + def wiki_languages(): r = requests.get( 'https://wikistats.wmcloud.org/api.php?action=dump&table=wikipedias&format=csv', - headers={'User-Agent': WP1_USER_AGENT}) + headers={'User-Agent': WP1_USER_AGENT}, + ) try: r.raise_for_status() except requests.exceptions.HTTPError as e: @@ -31,44 +44,75 @@ def wiki_languages(): yield row[2] -def get_pageview_url(): +def get_pageview_url(prev=False): + weeks = 4 + if prev: + weeks = 8 + now = get_current_datetime() - dt = datetime(now.year, now.month, 1) - timedelta(weeks=4) + dt = datetime(now.year, now.month, 1) - timedelta(weeks=weeks) return dt.strftime( 'https://dumps.wikimedia.org/other/pageview_complete/monthly/' '%Y/%Y-%m/pageviews-%Y%m-user.bz2') -@contextmanager -def get_pageview_response(): - url = get_pageview_url() - with requests.get(url, stream=True, - headers={'User-Agent': WP1_USER_AGENT}) as r: - try: - r.raise_for_status() - except requests.exceptions.HTTPError as e: - raise Wp1ScoreProcessingError('Could not retrieve pageview data') from e +def get_pageview_file_path(filename): + path = CREDENTIALS[ENV]['FILE_PATH']['pageviews'] + os.makedirs(path, exist_ok=True) + return os.path.join(path, filename) + + +def get_prev_file_path(): + prev_filename = get_pageview_url(prev=True).split('/')[-1] + return get_pageview_file_path(prev_filename) + + +def get_cur_file_path(): + cur_filename = get_pageview_url().split('/')[-1] + return get_pageview_file_path(cur_filename) + - yield r +def download_pageviews(): + # Clean up file from last month + prev_filepath = get_prev_file_path() + if os.path.exists(prev_filepath): + os.remove(prev_filepath) + + cur_filepath = get_cur_file_path() + if os.path.exists(cur_filepath): + # File already downloaded + return + + with requests.get(get_pageview_url(), stream=True) as r: + r.raise_for_status() + with open(PAGEVIEW_FILE_NAME, 'wb') as f: + # Read data in 8 KB chunks + for chunk in r.iter_content(chunk_size=8 * 1024): + f.write(chunk) def raw_pageviews(decode=False): def as_bytes(): - with get_pageview_response() as r: - decompressor = BZ2Decompressor() - trailing = b'' - # Read data in 32 MB chunks - for http_chunk in r.iter_content(chunk_size=32 * 1024 * 1024): - data = decompressor.decompress(http_chunk) + decompressor = BZ2Decompressor() + trailing = b'' + with open(get_cur_file_path(), 'rb') as f: + while True: + # Read data in 1 MB chunks + chunk = f.read(1024 * 1024) + if not chunk: + break + data = decompressor.decompress(chunk) lines = [line for line in data.split(b'\n') if line] if not lines: continue + # Reunite incomplete lines yield trailing + lines[0] yield from lines[1:-1] trailing = lines[-1] + # Nothing left, yield the last line yield trailing if decode: @@ -96,7 +140,6 @@ def pageview_components(): try: views = int(parts[4]) except ValueError: - # Views field wasn't int log.warning('Views field wasn\'t int in pageview dump: %r', line) continue @@ -130,10 +173,17 @@ def update_db_pageviews(wp10db, lang, article, page_id, views): def update_pageviews(filter_lang=None): + download_pageviews() + # Convert filter lang to bytes if necessary if filter_lang is not None and isinstance(filter_lang, str): filter_lang = filter_lang.encode('utf-8') + if filter_lang is None: + logger.info('Updating all pageviews') + else: + logger.info('Updating pageviews for %s', filter_lang.decode('utf-8')) + wp10db = wp10_connect() n = 0 for lang, article, page_id, views in pageview_components(): @@ -141,7 +191,15 @@ def update_pageviews(filter_lang=None): update_db_pageviews(wp10db, lang, article, page_id, views) n += 1 - if n >= 10000: + if n >= 50000: + logger.debug('Committing') wp10db.commit() n = 0 wp10db.commit() + logger.info('Done') + + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO, + format='%(levelname)s %(asctime)s: %(message)s') + update_pageviews() diff --git a/wp1/scores_test.py b/wp1/scores_test.py index 984abf7e..19ea58f9 100644 --- a/wp1/scores_test.py +++ b/wp1/scores_test.py @@ -1,7 +1,7 @@ import bz2 from datetime import datetime import unittest -from unittest.mock import patch, MagicMock +from unittest.mock import patch, MagicMock, mock_open import requests @@ -10,9 +10,7 @@ from wp1.exceptions import Wp1ScoreProcessingError from wp1 import scores - -class ScoresTest(BaseWpOneDbTest): - pageview_text = b'''af.wikipedia 1701 1402 desktop 4 F1 +pageview_text = b'''af.wikipedia 1701 1402 desktop 4 F1 af.wikipedia 1701 1402 mobile-web 3 O2T1 af.wikipedia 1702 1404 mobile-web 3 L1O2 af.wikipedia 1702 1404 desktop 1 P1 @@ -37,9 +35,10 @@ class ScoresTest(BaseWpOneDbTest): af.wikipedia 1712 753 mobile-web 2 O2 af.wikipedia 1712 753 desktop 20 E12J7U1''' - @property - def pageview_bz2(self): - return bz2.compress(self.pageview_text) +pageview_bz2 = bz2.compress(pageview_text) + + +class ScoresTest(BaseWpOneDbTest): @patch('wp1.scores.requests') def test_wiki_languages(self, mock_requests): @@ -85,73 +84,43 @@ def test_get_pageview_url(self, mock_datetime): 'https://dumps.wikimedia.org/other/pageview_complete/monthly/' '2024/2024-04/pageviews-202404-user.bz2', actual) - @patch('wp1.scores.requests.get') - def test_get_pageview_response(self, mock_get): - context = MagicMock() - expected = MagicMock() - context.__enter__.return_value = expected - mock_get.return_value = context - with scores.get_pageview_response() as actual: - self.assertEqual(expected, actual) - - @patch('wp1.scores.requests.get') - def test_get_pageview_response_non_success(self, mock_get): - context = MagicMock() - resp = MagicMock() - resp.raise_for_status.side_effect = requests.exceptions.HTTPError - context.__enter__.return_value = resp - mock_get.return_value = context - with self.assertRaises( - Wp1ScoreProcessingError), scores.get_pageview_response() as actual: - pass + @patch('wp1.scores.get_current_datetime', return_value=datetime(2024, 5, 25)) + def test_get_pageview_url_prev(self, mock_datetime): + actual = scores.get_pageview_url(prev=True) + self.assertEqual( + 'https://dumps.wikimedia.org/other/pageview_complete/monthly/' + '2024/2024-03/pageviews-202403-user.bz2', actual) @patch('wp1.scores.get_current_datetime', return_value=datetime(2024, 5, 25)) - @patch('wp1.scores.get_pageview_response') - def test_raw_pageviews(self, mock_get_response, mock_datetime): - context = MagicMock() - resp = MagicMock() - resp.iter_content.return_value = (self.pageview_bz2,) - context.__enter__.return_value = resp - mock_get_response.return_value = context + def test_get_prev_file_path(self, mock_datetime): + actual = scores.get_prev_file_path() + self.assertEqual('/tmp/pageviews/pageviews-202403-user.bz2', actual) - actual = b'\n'.join(scores.raw_pageviews()) + @patch('wp1.scores.get_current_datetime', return_value=datetime(2024, 5, 25)) + def test_get_cur_file_path(self, mock_datetime): + actual = scores.get_cur_file_path() + self.assertEqual('/tmp/pageviews/pageviews-202404-user.bz2', actual) - self.assertEqual(self.pageview_text, actual) + def test_get_pageview_file_path(self): + actual = scores.get_pageview_file_path('pageviews-202404-user.bz2') + self.assertEqual('/tmp/pageviews/pageviews-202404-user.bz2', actual) @patch('wp1.scores.get_current_datetime', return_value=datetime(2024, 5, 25)) - @patch('wp1.scores.get_pageview_response') - def test_raw_pageviews(self, mock_get_response, mock_datetime): - context = MagicMock() - resp = MagicMock() - resp.iter_content.return_value = (self.pageview_bz2,) - context.__enter__.return_value = resp - mock_get_response.return_value = context - + @patch("builtins.open", new_callable=mock_open, read_data=pageview_bz2) + def test_raw_pageviews(self, mock_file_open, mock_datetime): actual = b'\n'.join(scores.raw_pageviews()) - self.assertEqual(self.pageview_text, actual) + self.assertEqual(pageview_text, actual) @patch('wp1.scores.get_current_datetime', return_value=datetime(2024, 5, 25)) - @patch('wp1.scores.get_pageview_response') - def test_raw_pageviews_decode(self, mock_get_response, mock_datetime): - context = MagicMock() - resp = MagicMock() - resp.iter_content.return_value = (self.pageview_bz2,) - context.__enter__.return_value = resp - mock_get_response.return_value = context - + @patch("builtins.open", new_callable=mock_open, read_data=pageview_bz2) + def test_raw_pageviews_decode(self, mock_file_open, mock_datetime): actual = '\n'.join(scores.raw_pageviews(decode=True)) - self.assertEqual(self.pageview_text.decode('utf-8'), actual) - - @patch('wp1.scores.get_pageview_response') - def test_pageview_components(self, mock_get_response): - context = MagicMock() - resp = MagicMock() - resp.iter_content.return_value = (self.pageview_bz2,) - context.__enter__.return_value = resp - mock_get_response.return_value = context + self.assertEqual(pageview_text.decode('utf-8'), actual) + @patch("builtins.open", new_callable=mock_open, read_data=pageview_bz2) + def test_pageview_components(self, mock_file_open): expected = [ (b'af', b'1701', b'1402', 7), (b'af', b'1702', b'1404', 4), @@ -187,8 +156,8 @@ def test_update_db_pageviews(self): def test_update_db_pageviews_existing(self): with self.wp10db.cursor() as cursor: cursor.execute( - 'INSERT INTO page_scores VALUES ("en", "Statue_of_Liberty", 1234, 100' - ) + 'INSERT INTO page_scores (ps_lang, ps_article, ps_page_id, ps_views) ' + 'VALUES ("en", "Statue_of_Liberty", 1234, 100)') scores.update_db_pageviews(self.wp10db, 'en', 'Statue_of_Liberty', 1234, 200)