-
Notifications
You must be signed in to change notification settings - Fork 1
/
feedme.py
executable file
·124 lines (114 loc) · 4.09 KB
/
feedme.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#!/usr/bin/env python
import sqlite3
import feedparser
import base64
from dateutil.parser import parse
from dateutil import tz
import tomli
from bs4 import BeautifulSoup
import os
from datetime import datetime, timezone
with open("config.toml", mode="rb") as fp:
config = tomli.load(fp)
db_file = 'sqlite.db'
tzinfos = {
'PDT': tz.gettz('America/Los_Angeles'),
'PST': tz.gettz('America/Los_Angeles'),
'EST': tz.gettz('America/New_York'),
'EDT': tz.gettz('America/New_York'),
}
def cleanup(files):
'''Remove intermediate database files.'''
if os.path.isfile(db_file):
os.remove(db_file)
if files == "all":
if os.path.isfile(config["html_output"]):
os.remove(config["html_output"])
def truncate_html(html, length):
'''Truncate strings while still maintaining (and terminating) html.'''
return str(BeautifulSoup(html[:length], "html.parser"))
def process_rss(url):
'''Gather posts from RSS feed and put them into sqlite.'''
feed = feedparser.parse(url)
for post in feed.entries:
title = post.title
title_encoded = summary_encoded = base64.b64encode(title.encode("utf-8")).decode()
link = post.link
summary = post.summary
summary_encoded = base64.b64encode(summary.encode("utf-8")).decode()
parsed_date = parse(post.published, tzinfos=tzinfos)
# exclude duplicates
check_sql = f"select * from articles where title = \"{title_encoded}\""
cursor.execute(check_sql)
row = cursor.fetchall()
if not row:
sql = f'''insert into articles
(summary, title, link, published)
values
('{summary_encoded}', '{title_encoded}', '{link}', '{parsed_date}')'''
cursor.execute(sql)
conn.commit()
def read_feeds():
'''Process all the feeds in feeds.txt.'''
with open("feeds.txt", "r") as file:
for url in file:
process_rss(url.strip())
def build_page():
'''Generate the aggregated page.'''
# read the header template
with open('header.tmpl', 'r') as file:
base_head = file.read()
all_content = base_head
dt_now = datetime.now(timezone.utc).strftime("%Y/%m/%d, %H:%M:%S %Z")
all_content += f"Last updated: {dt_now} (updates every 4 hours)<br><hr>"
# select the latest 100 posts and write them to the body
query = f'SELECT title, link, summary, published FROM articles ORDER BY datetime(published) DESC LIMIT {config["max_posts"]}'
cursor.execute(query)
row = cursor.fetchall()
base_body = ""
if row:
for r in row:
END = -1
print_string = ""
title = base64.b64decode(r[0]).decode()
link = r[1]
summary = base64.b64decode(r[2]).decode()
if len(summary) > config["max_summary_size"]:
summary = truncate_html(summary, config["max_summary_size"])
summary = f"{summary} [...]"
published = r[3]
print_string += f"""<h3><a href=\"{link}\" target=\"_blank\">{title.strip()}</a></h3>\n
<b>Published:</b> {published}<p>
<b>Summary:</b><br>
{summary.strip()}<br>
<hr><p>"""
base_body += print_string.strip()
all_content += base_body
# read the footer template
with open('footer.tmpl', 'r') as file:
base_foot = file.read()
all_content += base_foot
return all_content
if __name__ == '__main__':
print("Starting feed generation...")
cleanup("all")
conn = sqlite3.connect(db_file)
cursor = conn.cursor()
sql = '''CREATE TABLE IF NOT EXISTS articles (
id INTEGER PRIMARY KEY,
summary TEXT NOT NULL,
title TEXT NOT NULL,
link TEXT NOT NULL,
published TEXT NOT NULL
)'''
cursor.execute(sql)
read_feeds()
output_file = open(config["html_output"], "w")
contents = build_page()
output_file.write(contents)
output_file.close()
cursor.close()
conn.close()
conn.close()
cleanup("db")
print("Finished feed generation.")