Skip to content

Commit

Permalink
Download full pageviews file, HTTP streaming was not working
Browse files Browse the repository at this point in the history
  • Loading branch information
audiodude committed Aug 17, 2024
1 parent 0ed33f9 commit 8b1221a
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 85 deletions.
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ services:
- /data/wp1bot/credentials.py:/usr/src/app/wp1/credentials.py
- /data/wp1bot/db/yoyo.ini:/usr/src/app/db/production/yoyo.ini
- /srv/log/wp1bot/:/var/log/wp1bot/
- /srv/data/wp1bot/:/var/data/wp1bot/
links:
- redis
logging:
Expand Down
2 changes: 1 addition & 1 deletion docker/dev-db/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ The dev database will need to be migrated in the following circumstances:
To migrate, cd to the `db/dev` directory and run the following command:

```bash
PYTHONPATH=$PYTHONPATH:../.. yoyo apply
PYTHONPATH=$PYTHONPATH:../.. pipenv run yoyo apply
```

The `PYTHONPATH` environment variable is necessary because some of the migrations
Expand Down
4 changes: 4 additions & 0 deletions wp1/credentials.py.dev.e2e
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ CREDENTIALS = {
'secret': '',
'bucket': 'org-kiwix-dev-wp1',
},
'FILE_PATH': {
# Path where pageviews.bz2 file (~3GB) will be downloaded.
'pageviews': '/tmp/pageviews',
}
},
Environment.TEST: {},
Environment.PRODUCTION: {}
Expand Down
10 changes: 10 additions & 0 deletions wp1/credentials.py.example
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ CREDENTIALS = {
# server, to ensure requests to the webhook endpoint are valid.
'hook_token': '', # EDIT this line
},

'FILE_PATH': {
# Path where pageviews.bz2 file (~3GB) will be downloaded.
'pageviews': '/tmp/pageviews',
}
},

# Environment for python nosetests. In this environment, only the MySQL database
Expand Down Expand Up @@ -253,4 +258,9 @@ CREDENTIALS = {
# # server, to ensure requests to the webhook endpoint are valid.
# 'hook_token': '', # EDIT this line
# },

# 'FILE_PATH': {
# # Path where pageviews.bz2 file (~3GB) will be downloaded.
# 'pageviews': '/var/data/wp1bot/pageviews',
# }
}
100 changes: 79 additions & 21 deletions wp1/scores.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from bz2 import BZ2Decompressor
from collections import namedtuple
from contextlib import contextmanager
import logging
import os.path

import csv
from datetime import datetime, timedelta
Expand All @@ -14,11 +16,22 @@
PageviewRecord = namedtuple('PageviewRecord',
['lang', 'name', 'page_id', 'views'])

logger = logging.getLogger(__name__)

try:
from wp1.credentials import ENV, CREDENTIALS
except ImportError:
logger.exception('The file credentials.py must be populated manually in '
'order to download pageviews')
CREDENTIALS = None
ENV = None


def wiki_languages():
r = requests.get(
'https://wikistats.wmcloud.org/api.php?action=dump&table=wikipedias&format=csv',
headers={'User-Agent': WP1_USER_AGENT})
headers={'User-Agent': WP1_USER_AGENT},
)
try:
r.raise_for_status()
except requests.exceptions.HTTPError as e:
Expand All @@ -31,44 +44,75 @@ def wiki_languages():
yield row[2]


def get_pageview_url():
def get_pageview_url(prev=False):
weeks = 4
if prev:
weeks = 8

now = get_current_datetime()
dt = datetime(now.year, now.month, 1) - timedelta(weeks=4)
dt = datetime(now.year, now.month, 1) - timedelta(weeks=weeks)
return dt.strftime(
'https://dumps.wikimedia.org/other/pageview_complete/monthly/'
'%Y/%Y-%m/pageviews-%Y%m-user.bz2')


@contextmanager
def get_pageview_response():
url = get_pageview_url()
with requests.get(url, stream=True,
headers={'User-Agent': WP1_USER_AGENT}) as r:
try:
r.raise_for_status()
except requests.exceptions.HTTPError as e:
raise Wp1ScoreProcessingError('Could not retrieve pageview data') from e
def get_pageview_file_path(filename):
path = CREDENTIALS[ENV]['FILE_PATH']['pageviews']
os.makedirs(path, exist_ok=True)
return os.path.join(path, filename)


def get_prev_file_path():
prev_filename = get_pageview_url(prev=True).split('/')[-1]
return get_pageview_file_path(prev_filename)


def get_cur_file_path():
cur_filename = get_pageview_url().split('/')[-1]
return get_pageview_file_path(cur_filename)


yield r
def download_pageviews():
# Clean up file from last month
prev_filepath = get_prev_file_path()
if os.path.exists(prev_filepath):
os.remove(prev_filepath)

cur_filepath = get_cur_file_path()
if os.path.exists(cur_filepath):
# File already downloaded
return

with requests.get(get_pageview_url(), stream=True) as r:
r.raise_for_status()
with open(PAGEVIEW_FILE_NAME, 'wb') as f:
# Read data in 8 KB chunks
for chunk in r.iter_content(chunk_size=8 * 1024):
f.write(chunk)


def raw_pageviews(decode=False):

def as_bytes():
with get_pageview_response() as r:
decompressor = BZ2Decompressor()
trailing = b''
# Read data in 32 MB chunks
for http_chunk in r.iter_content(chunk_size=32 * 1024 * 1024):
data = decompressor.decompress(http_chunk)
decompressor = BZ2Decompressor()
trailing = b''
with open(get_cur_file_path(), 'rb') as f:
while True:
# Read data in 1 MB chunks
chunk = f.read(1024 * 1024)
if not chunk:
break
data = decompressor.decompress(chunk)
lines = [line for line in data.split(b'\n') if line]
if not lines:
continue

# Reunite incomplete lines
yield trailing + lines[0]
yield from lines[1:-1]
trailing = lines[-1]

# Nothing left, yield the last line
yield trailing

if decode:
Expand Down Expand Up @@ -96,7 +140,6 @@ def pageview_components():
try:
views = int(parts[4])
except ValueError:
# Views field wasn't int
log.warning('Views field wasn\'t int in pageview dump: %r', line)
continue

Expand Down Expand Up @@ -130,18 +173,33 @@ def update_db_pageviews(wp10db, lang, article, page_id, views):


def update_pageviews(filter_lang=None):
download_pageviews()

# Convert filter lang to bytes if necessary
if filter_lang is not None and isinstance(filter_lang, str):
filter_lang = filter_lang.encode('utf-8')

if filter_lang is None:
logger.info('Updating all pageviews')
else:
logger.info('Updating pageviews for %s', filter_lang.decode('utf-8'))

wp10db = wp10_connect()
n = 0
for lang, article, page_id, views in pageview_components():
if filter_lang is None or lang == filter_lang:
update_db_pageviews(wp10db, lang, article, page_id, views)

n += 1
if n >= 10000:
if n >= 50000:
logger.debug('Committing')
wp10db.commit()
n = 0
wp10db.commit()
logger.info('Done')


if __name__ == '__main__':
logging.basicConfig(level=logging.INFO,
format='%(levelname)s %(asctime)s: %(message)s')
update_pageviews()
95 changes: 32 additions & 63 deletions wp1/scores_test.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import bz2
from datetime import datetime
import unittest
from unittest.mock import patch, MagicMock
from unittest.mock import patch, MagicMock, mock_open

import requests

Expand All @@ -10,9 +10,7 @@
from wp1.exceptions import Wp1ScoreProcessingError
from wp1 import scores


class ScoresTest(BaseWpOneDbTest):
pageview_text = b'''af.wikipedia 1701 1402 desktop 4 F1
pageview_text = b'''af.wikipedia 1701 1402 desktop 4 F1
af.wikipedia 1701 1402 mobile-web 3 O2T1
af.wikipedia 1702 1404 mobile-web 3 L1O2
af.wikipedia 1702 1404 desktop 1 P1
Expand All @@ -37,9 +35,10 @@ class ScoresTest(BaseWpOneDbTest):
af.wikipedia 1712 753 mobile-web 2 O2
af.wikipedia 1712 753 desktop 20 E12J7U1'''

@property
def pageview_bz2(self):
return bz2.compress(self.pageview_text)
pageview_bz2 = bz2.compress(pageview_text)


class ScoresTest(BaseWpOneDbTest):

@patch('wp1.scores.requests')
def test_wiki_languages(self, mock_requests):
Expand Down Expand Up @@ -85,73 +84,43 @@ def test_get_pageview_url(self, mock_datetime):
'https://dumps.wikimedia.org/other/pageview_complete/monthly/'
'2024/2024-04/pageviews-202404-user.bz2', actual)

@patch('wp1.scores.requests.get')
def test_get_pageview_response(self, mock_get):
context = MagicMock()
expected = MagicMock()
context.__enter__.return_value = expected
mock_get.return_value = context
with scores.get_pageview_response() as actual:
self.assertEqual(expected, actual)

@patch('wp1.scores.requests.get')
def test_get_pageview_response_non_success(self, mock_get):
context = MagicMock()
resp = MagicMock()
resp.raise_for_status.side_effect = requests.exceptions.HTTPError
context.__enter__.return_value = resp
mock_get.return_value = context
with self.assertRaises(
Wp1ScoreProcessingError), scores.get_pageview_response() as actual:
pass
@patch('wp1.scores.get_current_datetime', return_value=datetime(2024, 5, 25))
def test_get_pageview_url_prev(self, mock_datetime):
actual = scores.get_pageview_url(prev=True)
self.assertEqual(
'https://dumps.wikimedia.org/other/pageview_complete/monthly/'
'2024/2024-03/pageviews-202403-user.bz2', actual)

@patch('wp1.scores.get_current_datetime', return_value=datetime(2024, 5, 25))
@patch('wp1.scores.get_pageview_response')
def test_raw_pageviews(self, mock_get_response, mock_datetime):
context = MagicMock()
resp = MagicMock()
resp.iter_content.return_value = (self.pageview_bz2,)
context.__enter__.return_value = resp
mock_get_response.return_value = context
def test_get_prev_file_path(self, mock_datetime):
actual = scores.get_prev_file_path()
self.assertEqual('/tmp/pageviews/pageviews-202403-user.bz2', actual)

Check warning on line 97 in wp1/scores_test.py

View check run for this annotation

codefactor.io / CodeFactor

wp1/scores_test.py#L97

Probable insecure usage of temp file/directory. (B108)

actual = b'\n'.join(scores.raw_pageviews())
@patch('wp1.scores.get_current_datetime', return_value=datetime(2024, 5, 25))
def test_get_cur_file_path(self, mock_datetime):
actual = scores.get_cur_file_path()
self.assertEqual('/tmp/pageviews/pageviews-202404-user.bz2', actual)

Check warning on line 102 in wp1/scores_test.py

View check run for this annotation

codefactor.io / CodeFactor

wp1/scores_test.py#L102

Probable insecure usage of temp file/directory. (B108)

self.assertEqual(self.pageview_text, actual)
def test_get_pageview_file_path(self):
actual = scores.get_pageview_file_path('pageviews-202404-user.bz2')
self.assertEqual('/tmp/pageviews/pageviews-202404-user.bz2', actual)

Check warning on line 106 in wp1/scores_test.py

View check run for this annotation

codefactor.io / CodeFactor

wp1/scores_test.py#L106

Probable insecure usage of temp file/directory. (B108)

@patch('wp1.scores.get_current_datetime', return_value=datetime(2024, 5, 25))
@patch('wp1.scores.get_pageview_response')
def test_raw_pageviews(self, mock_get_response, mock_datetime):
context = MagicMock()
resp = MagicMock()
resp.iter_content.return_value = (self.pageview_bz2,)
context.__enter__.return_value = resp
mock_get_response.return_value = context

@patch("builtins.open", new_callable=mock_open, read_data=pageview_bz2)
def test_raw_pageviews(self, mock_file_open, mock_datetime):
actual = b'\n'.join(scores.raw_pageviews())

self.assertEqual(self.pageview_text, actual)
self.assertEqual(pageview_text, actual)

@patch('wp1.scores.get_current_datetime', return_value=datetime(2024, 5, 25))
@patch('wp1.scores.get_pageview_response')
def test_raw_pageviews_decode(self, mock_get_response, mock_datetime):
context = MagicMock()
resp = MagicMock()
resp.iter_content.return_value = (self.pageview_bz2,)
context.__enter__.return_value = resp
mock_get_response.return_value = context

@patch("builtins.open", new_callable=mock_open, read_data=pageview_bz2)
def test_raw_pageviews_decode(self, mock_file_open, mock_datetime):
actual = '\n'.join(scores.raw_pageviews(decode=True))

self.assertEqual(self.pageview_text.decode('utf-8'), actual)

@patch('wp1.scores.get_pageview_response')
def test_pageview_components(self, mock_get_response):
context = MagicMock()
resp = MagicMock()
resp.iter_content.return_value = (self.pageview_bz2,)
context.__enter__.return_value = resp
mock_get_response.return_value = context
self.assertEqual(pageview_text.decode('utf-8'), actual)

@patch("builtins.open", new_callable=mock_open, read_data=pageview_bz2)
def test_pageview_components(self, mock_file_open):
expected = [
(b'af', b'1701', b'1402', 7),
(b'af', b'1702', b'1404', 4),
Expand Down Expand Up @@ -187,8 +156,8 @@ def test_update_db_pageviews(self):
def test_update_db_pageviews_existing(self):
with self.wp10db.cursor() as cursor:
cursor.execute(
'INSERT INTO page_scores VALUES ("en", "Statue_of_Liberty", 1234, 100'
)
'INSERT INTO page_scores (ps_lang, ps_article, ps_page_id, ps_views) '
'VALUES ("en", "Statue_of_Liberty", 1234, 100)')

scores.update_db_pageviews(self.wp10db, 'en', 'Statue_of_Liberty', 1234,
200)
Expand Down

0 comments on commit 8b1221a

Please sign in to comment.