Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Calculate pageviews for all articles across all wikipedias #755

Merged
merged 13 commits into from
Sep 8, 2024
22 changes: 22 additions & 0 deletions db/migrations/20240429_01_qtSms-add-scores-table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""
Add scores table
"""

from yoyo import step

__depends__ = {'20230528_02_pL6ka-add-b-selection-zim-version-to-builders'}

steps = [
step(
'''CREATE TABLE page_scores (
ps_lang VARBINARY(255),
ps_page_id INTEGER NOT NULL,
ps_article VARBINARY(1024),
ps_views INTEGER DEFAULT 0,
ps_links INTEGER DEFAULT 0,
ps_lang_links INTEGER DEFAULT 0,
ps_score INTEGER DEFAULT 0,
PRIMARY KEY (`ps_lang`, `ps_page_id`),
KEY `lang_article` (`ps_lang`, `ps_article`)
)''', 'DROP TABLE page_scores')
]
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ services:
- /data/wp1bot/credentials.py:/usr/src/app/wp1/credentials.py
- /data/wp1bot/db/yoyo.ini:/usr/src/app/db/production/yoyo.ini
- /srv/log/wp1bot/:/var/log/wp1bot/
- /srv/data/wp1bot/:/var/data/wp1bot/
links:
- redis
logging:
Expand Down
2 changes: 1 addition & 1 deletion docker/dev-db/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ The dev database will need to be migrated in the following circumstances:
To migrate, cd to the `db/dev` directory and run the following command:

```bash
PYTHONPATH=$PYTHONPATH:../.. yoyo apply
PYTHONPATH=$PYTHONPATH:../.. pipenv run yoyo apply
```

The `PYTHONPATH` environment variable is necessary because some of the migrations
Expand Down
4 changes: 4 additions & 0 deletions wp1/credentials.py.dev.e2e
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ CREDENTIALS = {
'secret': '',
'bucket': 'org-kiwix-dev-wp1',
},
'FILE_PATH': {
# Path where pageviews.bz2 file (~3GB) will be downloaded.
'pageviews': '/tmp/pageviews',
}
},
Environment.TEST: {},
Environment.PRODUCTION: {}
Expand Down
4 changes: 4 additions & 0 deletions wp1/credentials.py.e2e
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ CREDENTIALS = {
'user': 'farmuser',
'password': 'farmpass',
'hook_token': 'hook-token-abc',
},
'FILE_PATH': {
# Path where pageviews.bz2 file (~3GB) will be downloaded.
'pageviews': '/tmp/pageviews',
}
},
Environment.PRODUCTION: {},
Expand Down
15 changes: 15 additions & 0 deletions wp1/credentials.py.example
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ CREDENTIALS = {
# server, to ensure requests to the webhook endpoint are valid.
'hook_token': '', # EDIT this line
},

'FILE_PATH': {
# Path where pageviews.bz2 file (~3GB) will be downloaded.
'pageviews': '/tmp/pageviews',
}
},

# Environment for python nosetests. In this environment, only the MySQL database
Expand Down Expand Up @@ -173,6 +178,11 @@ CREDENTIALS = {
'password': 'farmpass',
'hook_token': 'hook-token-abc',
}

'FILE_PATH': {
# Path where pageviews.bz2 file (~3GB) will be downloaded.
'pageviews': '/tmp/pageviews',
}
},

# EDIT: Remove the next line after you've provided actual production credentials.
Expand Down Expand Up @@ -253,4 +263,9 @@ CREDENTIALS = {
# # server, to ensure requests to the webhook endpoint are valid.
# 'hook_token': '', # EDIT this line
# },

# 'FILE_PATH': {
# # Path where pageviews.bz2 file (~3GB) will be downloaded.
# 'pageviews': '/var/data/wp1bot/pageviews',
# }
}
3 changes: 3 additions & 0 deletions wp1/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,6 @@ class ObjectNotFoundError(Wp1Error):

class UserNotAuthorizedError(Wp1Error):
pass

class Wp1ScoreProcessingError(Wp1Error):
pass
211 changes: 211 additions & 0 deletions wp1/scores.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
from bz2 import BZ2Decompressor
from collections import namedtuple
from contextlib import contextmanager
import logging
import os.path

import csv
from datetime import datetime, timedelta
import requests

from wp1.constants import WP1_USER_AGENT
from wp1.exceptions import Wp1ScoreProcessingError
from wp1.time import get_current_datetime
from wp1.wp10_db import connect as wp10_connect

PageviewRecord = namedtuple('PageviewRecord',
['lang', 'name', 'page_id', 'views'])

logger = logging.getLogger(__name__)

try:
from wp1.credentials import ENV, CREDENTIALS
except ImportError:
logger.exception('The file credentials.py must be populated manually in '

Check warning on line 24 in wp1/scores.py

View check run for this annotation

Codecov / codecov/patch

wp1/scores.py#L23-L24

Added lines #L23 - L24 were not covered by tests
'order to download pageviews')
CREDENTIALS = None
ENV = None

Check warning on line 27 in wp1/scores.py

View check run for this annotation

Codecov / codecov/patch

wp1/scores.py#L26-L27

Added lines #L26 - L27 were not covered by tests


def wiki_languages():
r = requests.get(
'https://wikistats.wmcloud.org/api.php?action=dump&table=wikipedias&format=csv',
headers={'User-Agent': WP1_USER_AGENT},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Always include a timeout

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

timeout=60,
)
try:
r.raise_for_status()
except requests.exceptions.HTTPError as e:
raise Wp1ScoreProcessingError('Could not retrieve wiki list') from e

reader = csv.reader(r.text.splitlines())
# Skip the header row
next(reader, None)
for row in reader:
yield row[2]


def get_pageview_url(prev=False):
weeks = 4
if prev:
weeks = 8

now = get_current_datetime()
dt = datetime(now.year, now.month, 1) - timedelta(weeks=weeks)
return dt.strftime(
'https://dumps.wikimedia.org/other/pageview_complete/monthly/'
'%Y/%Y-%m/pageviews-%Y%m-user.bz2')


def get_pageview_file_path(filename):
path = CREDENTIALS[ENV]['FILE_PATH']['pageviews']
os.makedirs(path, exist_ok=True)
return os.path.join(path, filename)


def get_prev_file_path():
prev_filename = get_pageview_url(prev=True).split('/')[-1]
return get_pageview_file_path(prev_filename)


def get_cur_file_path():
cur_filename = get_pageview_url().split('/')[-1]
return get_pageview_file_path(cur_filename)


def download_pageviews():
# Clean up file from last month
prev_filepath = get_prev_file_path()
if os.path.exists(prev_filepath):
os.remove(prev_filepath)

cur_filepath = get_cur_file_path()
if os.path.exists(cur_filepath):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When downloading below, you're writing to the actual file. Should there be any issue, a partial file will still be on disk without possibility to fix it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added error handling in the download, with corresponding test, PTAL.

# File already downloaded
return

with requests.get(get_pageview_url(), stream=True, timeout=60) as r:
r.raise_for_status()
try:
with open(cur_filepath, 'wb') as f:
# Read data in 8 MB chunks
for chunk in r.iter_content(chunk_size=8 * 1024 * 1024):
f.write(chunk)
except Exception as e:
logger.exception('Error downloading pageviews')
os.remove(cur_filepath)
raise Wp1ScoreProcessingError('Error downloading pageviews') from e


def raw_pageviews(decode=False):

def as_bytes():
decompressor = BZ2Decompressor()
trailing = b''
with open(get_cur_file_path(), 'rb') as f:
while True:
# Read data in 1 MB chunks
chunk = f.read(1024 * 1024)
if not chunk:
break
data = decompressor.decompress(chunk)
lines = [line for line in data.split(b'\n') if line]
if not lines:
continue

Check warning on line 114 in wp1/scores.py

View check run for this annotation

Codecov / codecov/patch

wp1/scores.py#L114

Added line #L114 was not covered by tests

# Reunite incomplete lines
yield trailing + lines[0]
yield from lines[1:-1]
trailing = lines[-1]

# Nothing left, yield the last line
yield trailing

if decode:
for line in as_bytes():
yield line.decode('utf-8')
else:
yield from as_bytes()


def pageview_components():
tally = None
for line in raw_pageviews():
parts = line.split(b' ')
if len(parts) != 6 or parts[2] == b'null':
# Skip pages that don't have a pageid
continue

if parts[1] == b'' or parts[1] == b'-':
# Skip pages that don't have a title
continue

lang = parts[0].split(b'.')[0]
name = parts[1]
page_id = parts[2]
try:
views = int(parts[4])
except ValueError:
logger.warning('Views field wasn\'t int in pageview dump: %r', line)
continue

if (tally is not None and tally.lang == lang and tally.name == name and
tally.page_id == page_id):
# This is a view on the same page from a different interface (mobile v
# desktop etc)
new_dict = {**tally._asdict(), 'views': tally.views + views}
tally = PageviewRecord(**new_dict)
else:
# Language code, article name, article page id, views
if tally is not None:
yield tally.lang, tally.name, tally.page_id, tally.views
tally = PageviewRecord(lang, name, page_id, views)

yield tally.lang, tally.name, tally.page_id, tally.views


def update_db_pageviews(wp10db, lang, article, page_id, views):
with wp10db.cursor() as cursor:
cursor.execute(
'''INSERT INTO page_scores (ps_lang, ps_page_id, ps_article, ps_views)
VALUES (%(lang)s, %(page_id)s, %(article)s, %(views)s)
ON DUPLICATE KEY UPDATE ps_views = %(views)s
''', {
'lang': lang,
'page_id': page_id,
'article': article,
'views': views
})


def update_pageviews(filter_lang=None, commit_after=50000):
download_pageviews()

# Convert filter lang to bytes if necessary
if filter_lang is not None and isinstance(filter_lang, str):
filter_lang = filter_lang.encode('utf-8')

if filter_lang is None:
logger.info('Updating all pageviews')
else:
logger.info('Updating pageviews for %s', filter_lang.decode('utf-8'))

wp10db = wp10_connect()
n = 0
for lang, article, page_id, views in pageview_components():
if filter_lang is None or lang == filter_lang:
update_db_pageviews(wp10db, lang, article, page_id, views)

n += 1
if n >= commit_after:
logger.debug('Committing')
wp10db.commit()
n = 0
wp10db.commit()
logger.info('Done')


if __name__ == '__main__':
logging.basicConfig(level=logging.INFO,

Check warning on line 209 in wp1/scores.py

View check run for this annotation

Codecov / codecov/patch

wp1/scores.py#L209

Added line #L209 was not covered by tests
format='%(levelname)s %(asctime)s: %(message)s')
update_pageviews()

Check warning on line 211 in wp1/scores.py

View check run for this annotation

Codecov / codecov/patch

wp1/scores.py#L211

Added line #L211 was not covered by tests
Loading