-
Notifications
You must be signed in to change notification settings - Fork 57
/
Copy pathoriginal_post_discovery.py
609 lines (509 loc) · 22.8 KB
/
original_post_discovery.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
"""Augments the standard original_post_discovery algorithm with a
reverse lookup that supports posts without a backlink or citation.
Performs a reverse-lookup that scans the activity's author's ``h-feed``
for posts with rel=syndication links. As we find syndicated copies,
save the relationship. If we find the original post for the activity
in question, return the original's URL.
See http://indiewebcamp.com/posse-post-discovery for more detail.
This feature adds costs in terms of HTTP requests and database
lookups in the following primary cases:
* Author's domain is known to be invalid or blocklisted, there will
be 0 requests and 0 DB lookups.
* For a syndicated post has been seen previously (regardless of
whether discovery was successful), there will be 0 requests and 1
DB lookup.
* The first time a syndicated post has been seen:
* 1 to 2 HTTP requests to get and parse the ``h-feed`` plus 1 additional
request for *each* post permalink that has not been seen before.
* 1 DB query for the initial check plus 1 additional DB query for
*each* post permalink.
"""
import collections
import itertools
import logging
import mf2util
from granary import as1
from granary import microformats2
from oauth_dropins.webutil.appengine_info import DEBUG
import models
from models import SyndicatedPost
import util
logger = logging.getLogger(__name__)
MAX_PERMALINK_FETCHES = 10
MAX_PERMALINK_FETCHES_BETA = 50
MAX_FEED_ENTRIES = 100
MAX_ORIGINAL_CANDIDATES = 10
MAX_MENTION_CANDIDATES = 10
# this was 30 in google.appengine.ext.ndb. haven't found it in google.cloud.ndb
# yet, or whether it's even there at all, but we only rarely hit it anyway, so
# let's just keep it as is for now.
MAX_ALLOWABLE_QUERIES = 30
MF2_HTML_MIME_TYPE= 'text/mf2+html'
def discover(source, activity, fetch_hfeed=True, include_redirect_sources=True,
already_fetched_hfeeds=None):
r"""Augments the standard original post discovery algorithm with a
reverse lookup that supports posts without a backlink or citation.
If ``fetch_hfeed`` is False, then we will check the db for previously found
:class:`models.SyndicatedPost`\s but will not do posse-post-discovery to find
new ones.
Args:
source (models.Source): subclass. Changes to property values (e.g.
`domains``, ``domain_urls``, ``last_syndication_url``) are stored in
``source.updates``\; they should be updated transactionally later.
activity (dict)
fetch_hfeed (bool)
include_redirect_sources (bool): whether to include URLs that redirect as
well as their final destination URLs
already_fetched_hfeeds (set of str): URLs that we have already fetched and
run posse-post-discovery on, so we can avoid running it multiple times
Returns:
(set of str, set of str) tuple: (original post URLs, mention URLs)
"""
label = activity.get('url') or activity.get('id')
logger.debug(f'discovering original posts for: {label}')
if not source.updates:
source.updates = {}
if already_fetched_hfeeds is None:
already_fetched_hfeeds = set()
originals, mentions = as1.original_post_discovery(
activity, domains=source.domains,
include_redirect_sources=include_redirect_sources,
include_reserved_hosts=DEBUG, max_redirect_fetches=MAX_ORIGINAL_CANDIDATES,
headers=util.request_headers(source=source))
# only include mentions of the author themselves.
# (mostly just for Mastodon; other silos' domains are all in the blocklist, so
# their mention URLs get dropped later anyway.)
# (these are originally added in Source._inject_user_urls() and in poll step 2.)
obj = activity.get('object', {})
other_user_mentions = set(
t.get('url') for t in obj.get('tags', [])
if t.get('objectType') == 'person' and t.get('url') not in source.domain_urls)
originals -= other_user_mentions
mentions -= other_user_mentions
# original posts are only from the author themselves
owner = activity.get('actor') or obj.get('author') or {}
owner_ids = util.trim_nulls([owner.get('id'), owner.get('username')])
source_ids = util.trim_nulls([source.key.id(), source.user_tag_id()])
if source.USERNAME_KEY_ID:
owner_ids = [id.lower() for id in owner_ids]
source_ids = [id.lower() for id in source_ids]
if owner_ids and not set(owner_ids) & set(source_ids):
logger.info(f"Demoting original post links because user ids {source_ids} don't match author ids {owner_ids}")
# this is someone else's post, so all links must be mentions
mentions.update(originals)
originals = set()
# look for original URL of attachments (e.g. quote tweets)
for att in obj.get('attachments', []):
if (att.get('objectType') in ('note', 'article')
and att.get('author', {}).get('id') == source.user_tag_id()):
logger.debug(f"running original post discovery on attachment: {att.get('id')}")
att_origs, _ = discover(
source, att, include_redirect_sources=include_redirect_sources)
logger.debug(f'original post discovery found originals for attachment, {att_origs}')
mentions.update(att_origs)
if len(originals) > MAX_ORIGINAL_CANDIDATES:
logger.info(f'{len(originals)} originals, pruning down to {MAX_ORIGINAL_CANDIDATES}')
originals = sorted(originals)[:MAX_ORIGINAL_CANDIDATES]
if len(mentions) > MAX_MENTION_CANDIDATES:
logger.info(f'{len(mentions)} mentions, pruning down to {MAX_MENTION_CANDIDATES}')
mentions = sorted(mentions)[:MAX_MENTION_CANDIDATES]
def resolve(urls):
resolved = set()
for url in urls:
final, domain, send = util.get_webmention_target(url)
if send and domain != source.gr_source.DOMAIN:
resolved.add(final)
if include_redirect_sources:
resolved.add(url)
return resolved
originals = resolve(originals)
mentions = resolve(mentions)
if not source.get_author_urls():
logger.debug('no author url(s), cannot find h-feed')
return ((originals, mentions) if not source.BACKFEED_REQUIRES_SYNDICATION_LINK
else (set(), set()))
# TODO possible optimization: if we've discovered a backlink to a post on the
# author's domain (i.e., it included a link or citation), then skip the rest
# of this.
syndicated = []
syndication_url = obj.get('url') or activity.get('url')
if syndication_url:
# use the canonical syndication url on both sides, so that we have
# the best chance of finding a match. Some silos allow several
# different permalink formats to point to the same place.
syndication_url = source.canonicalize_url(syndication_url)
if syndication_url:
syndicated = _posse_post_discovery(source, activity, syndication_url,
fetch_hfeed, already_fetched_hfeeds)
originals.update(syndicated)
originals = set(util.dedupe_urls(originals))
if not syndication_url:
logger.debug(f'no {source.SHORT_NAME} syndication url, cannot process h-entries')
return ((originals, mentions) if not source.BACKFEED_REQUIRES_SYNDICATION_LINK
else (set(syndicated), set()))
def refetch(source):
r"""Refetch the author's URLs and look for new or updated syndication
links that might not have been there the first time we looked.
Args:
source (models.Source): Changes to property values (e.g. ``domains``,
``domain_urls``, ``last_syndication_url``) are stored in source.updates;
they should be updated transactionally later.
Returns:
dict: mapping syndicated_url to a list of new :class:`models.SyndicatedPost`\s
"""
logger.debug(f'attempting to refetch h-feed for {source.label()}')
if not source.updates:
source.updates = {}
results = {}
for url in _get_author_urls(source):
results.update(_process_author(source, url, refetch=True))
return results
def targets_for_response(resp, originals, mentions):
"""Returns the URLs that we should send webmentions to for a given response.
...specifically, all responses except posts get sent to original post URLs,
but only posts and comments get sent to mentioned URLs.
Args:
resp (dict): ActivityStreams response object
originals, mentions (sequence of str) URLs
Returns:
set of str: URLs
"""
type = models.Response.get_type(resp)
targets = set()
if type != 'post':
targets |= originals
if type in ('post', 'comment'):
targets |= mentions
return targets
def _posse_post_discovery(source, activity, syndication_url, fetch_hfeed,
already_fetched_hfeeds):
"""Performs the actual meat of the posse-post-discover.
Args:
source (models.Source)
activity (dict)
syndication_url (str): url of the syndicated copy for which we are
trying to find an original
fetch_hfeed (bool): whether or not to fetch and parse the
author's feed if we don't have a previously stored
relationship
already_fetched_hfeeds (set of str): URLs we've already fetched in a
previous iteration
Return:
list of str: original post urls, possibly empty
"""
logger.info(f'starting posse post discovery with syndicated {syndication_url}')
relationships = SyndicatedPost.query(
SyndicatedPost.syndication == syndication_url,
ancestor=source.key).fetch()
if source.IGNORE_SYNDICATION_LINK_FRAGMENTS:
relationships += SyndicatedPost.query(
# prefix search to find any instances of this synd link with a fragment
SyndicatedPost.syndication > f'{syndication_url}#',
SyndicatedPost.syndication < f'{syndication_url}#\ufffd',
ancestor=source.key).fetch()
if not relationships and fetch_hfeed:
# a syndicated post we haven't seen before! fetch the author's URLs to see
# if we can find it.
#
# TODO: Consider using the actor's url, with get_author_urls() as the
# fallback in the future to support content from non-Bridgy users.
results = {}
for url in _get_author_urls(source):
if url not in already_fetched_hfeeds:
results.update(_process_author(source, url))
already_fetched_hfeeds.add(url)
else:
logger.debug(f'skipping {url}, already fetched this round')
relationships = results.get(syndication_url, [])
if not relationships:
# No relationships were found. Remember that we've seen this
# syndicated post to avoid reprocessing it every time
logger.debug(f'posse post discovery found no relationship for {syndication_url}')
if fetch_hfeed:
SyndicatedPost.insert_syndication_blank(source, syndication_url)
originals = [r.original for r in relationships if r.original]
if originals:
logger.debug(f'posse post discovery found relationship(s) {syndication_url} -> {originals}')
return originals
def _process_author(source, author_url, refetch=False, store_blanks=True):
r"""Fetch the author's domain URL, and look for syndicated posts.
Args:
source (models.Source)
author_url (str): the author's homepage URL
refetch (bool): whether to refetch and process entries we've seen before
store_blanks (bool): whether we should store blank
:class:`models.SyndicatedPost`\s when we don't find a relationship
Return:
dict: maps syndicated_url to a list of new :class:`models.SyndicatedPost`\s
"""
# for now use whether the url is a valid webmention target
# as a proxy for whether it's worth searching it.
author_url, _, ok = util.get_webmention_target(author_url)
if not ok:
return {}
logger.debug(f'fetching author url {author_url}')
try:
author_mf2 = util.fetch_mf2(author_url)
except AssertionError:
raise # for unit tests
except BaseException:
# TODO limit allowed failures, cache the author's h-feed url
# or the # of times we've failed to fetch it
logger.info(f'Could not fetch author url {author_url}', exc_info=True)
return {}
if not author_mf2:
logger.debug('nothing found')
return {}
feeditems = _find_feed_items(author_mf2)
# try rel=feeds and rel=alternates
feed_urls = set()
candidates = (author_mf2['rels'].get('feed', []) +
[a.get('url') for a in author_mf2.get('alternates', [])
if a.get('type') == MF2_HTML_MIME_TYPE])
for feed_url in candidates:
# check that it's html, not too big, etc
feed_url, _, feed_ok = util.get_webmention_target(feed_url)
if feed_url == author_url:
logger.debug('author url is the feed url, ignoring')
elif not feed_ok:
logger.debug("skipping feed since it's not HTML or otherwise bad")
else:
feed_urls.add(feed_url)
for feed_url in feed_urls:
try:
logger.debug(f"fetching author's rel-feed {feed_url}")
feed_mf2 = util.fetch_mf2(feed_url)
if not feed_mf2:
logger.debug('nothing found')
continue
feeditems = _merge_hfeeds(feeditems, _find_feed_items(feed_mf2))
domain = util.domain_from_link(feed_url)
if source.updates is not None and domain not in source.domains:
domains = source.updates.setdefault('domains', source.domains)
if domain not in domains:
logger.info(f'rel-feed found new domain {domain}! adding to source')
domains.append(domain)
except AssertionError:
raise # reraise assertions for unit tests
except BaseException:
logger.info(f'Could not fetch h-feed url {feed_url}.', exc_info=True)
# sort by dt-updated/dt-published
def updated_or_published(item):
props = microformats2.first_props(item.get('properties'))
return props.get('updated') or props.get('published') or ''
feeditems.sort(key=updated_or_published, reverse=True)
permalink_to_entry = collections.OrderedDict()
for child in feeditems:
if 'h-entry' in child['type']:
permalinks = child['properties'].get('url', [])
if not permalinks:
logger.debug('ignoring h-entry with no u-url!')
for permalink in permalinks:
if isinstance(permalink, str):
permalink_to_entry[permalink] = child
else:
logger.warning(f'unexpected non-string "url" property: {permalink}')
max = (MAX_PERMALINK_FETCHES_BETA if source.is_beta_user()
else MAX_PERMALINK_FETCHES)
if len(permalink_to_entry) >= max:
logger.info(f'Hit cap of {max} permalinks. Stopping.')
break
# query all preexisting permalinks at once, instead of once per link
permalinks_list = list(permalink_to_entry.keys())
# fetch the maximum allowed entries (currently 30) at a time
preexisting_list = itertools.chain.from_iterable(
SyndicatedPost.query(
SyndicatedPost.original.IN(permalinks_list[i:i + MAX_ALLOWABLE_QUERIES]),
ancestor=source.key)
for i in range(0, len(permalinks_list), MAX_ALLOWABLE_QUERIES))
preexisting = {}
for r in preexisting_list:
preexisting.setdefault(r.original, []).append(r)
results = {}
for permalink, entry in permalink_to_entry.items():
logger.debug(f'processing permalink: {permalink}')
new_results = process_entry(
source, permalink, entry, refetch, preexisting.get(permalink, []),
store_blanks=store_blanks)
for key, value in new_results.items():
results.setdefault(key, []).extend(value)
if source.updates is not None and results:
# keep track of the last time we've seen rel=syndication urls for
# this author. this helps us decide whether to refetch periodically
# and look for updates.
# Source will be saved at the end of each round of polling
source.updates['last_syndication_url'] = util.now()
return results
def _merge_hfeeds(feed1, feed2):
r"""Merge items from two ``h-feeds`` into a composite feed.
Skips items in ``feed2`` that are already represented in ``feed1``\, based on
the ``url`` property.
Args:
feed1 (list of dict)
feed2 (list of dict)
Returns:
list of dict:
"""
seen = set()
for item in feed1:
for url in item.get('properties', {}).get('url', []):
if isinstance(url, str):
seen.add(url)
return feed1 + [item for item in feed2 if all(
(url not in seen) for url in item.get('properties', {}).get('url', []) if isinstance(url, str))]
def _find_feed_items(mf2):
"""Extract feed items from given microformats2 data.
If the top-level ``h-*`` item is an h-feed, return its children. Otherwise,
returns the top-level items.
Args:
mf2 (dict): parsed mf2 data
Returns:
list of dict: each one representing an mf2 ``h-*`` item
"""
feeditems = mf2['items']
hfeeds = mf2util.find_all_entries(mf2, ('h-feed',))
if hfeeds:
feeditems = list(itertools.chain.from_iterable(
hfeed.get('children', []) for hfeed in hfeeds))
else:
logger.debug('No h-feed found, fallback to top-level h-entrys.')
if len(feeditems) > MAX_FEED_ENTRIES:
logger.info(f'Feed has {len(feeditems)} entries! only processing the first {MAX_FEED_ENTRIES}.')
feeditems = feeditems[:MAX_FEED_ENTRIES]
return feeditems
def process_entry(source, permalink, feed_entry, refetch, preexisting,
store_blanks=True):
r"""Fetch and process an h-entry and save a new :class:`models.SyndicatedPost`.
Args:
source (models.Source)
permalink (str): url of the unprocessed post
feed_entry (dict): the ``h-feed`` version of the ``h-entry``\, often contains
a partial version of the ``h-entry`` at the permalink
refetch (bool): whether to refetch and process entries we've seen before
preexisting (list): of previously discovered :class:`models.SyndicatedPost`\s
for this permalink
store_blanks (bool): whether we should store blank
:class:`models.SyndicatedPost`\s when we don't find a relationship
Returns:
dict: maps syndicated url to a list of new :class:`models.SyndicatedPost`\s
"""
# if the post has already been processed, do not add to the results
# since this method only returns *newly* discovered relationships.
if preexisting:
# if we're refetching and this one is blank, do not return.
# if there is a blank entry, it should be the one and only entry,
# but go ahead and check 'all' of them to be safe.
if not refetch:
return {}
synds = [s.syndication for s in preexisting if s.syndication]
if synds:
logger.debug(f'previously found relationship(s) for original {permalink}: {synds}')
# first try with the h-entry from the h-feed. if we find the syndication url
# we're looking for, we don't have to fetch the permalink
permalink, _, type_ok = util.get_webmention_target(permalink)
usynd = feed_entry.get('properties', {}).get('syndication', [])
usynd_urls = {url for url in usynd if isinstance(url, str)}
if usynd_urls:
logger.debug(f'u-syndication links on the h-feed h-entry: {usynd_urls}')
results = _process_syndication_urls(source, permalink, usynd_urls, preexisting)
success = True
if results:
source.updates['last_feed_syndication_url'] = util.now()
elif not source.last_feed_syndication_url or not feed_entry:
# fetch the full permalink page if we think it might have more details
mf2 = None
try:
if type_ok:
logger.debug(f'fetching post permalink {permalink}')
mf2 = util.fetch_mf2(permalink)
except AssertionError:
raise # for unit tests
except BaseException:
# TODO limit the number of allowed failures
logger.info(f'Could not fetch permalink {permalink}', exc_info=True)
success = False
if mf2:
syndication_urls = set()
relsynd = mf2['rels'].get('syndication', [])
if relsynd:
logger.debug(f'rel-syndication links: {relsynd}')
syndication_urls.update(url for url in relsynd
if isinstance(url, str))
# there should only be one h-entry on a permalink page, but
# we'll check all of them just in case.
for hentry in (item for item in mf2['items']
if 'h-entry' in item['type']):
usynd = hentry.get('properties', {}).get('syndication', [])
if usynd:
logger.debug(f'u-syndication links: {usynd}')
syndication_urls.update(url for url in usynd
if isinstance(url, str))
results = _process_syndication_urls(
source, permalink, syndication_urls, preexisting)
# detect and delete SyndicatedPosts that were removed from the site
if success:
result_syndposts = list(itertools.chain(*results.values()))
for syndpost in preexisting:
if syndpost.syndication and syndpost not in result_syndposts:
logger.info(f'deleting relationship that disappeared: {syndpost}')
syndpost.key.delete()
preexisting.remove(syndpost)
if not results:
logger.debug(f'no syndication links from {permalink} to current source {source.label()}.')
results = {}
if store_blanks and not preexisting:
# remember that this post doesn't have syndication links for this
# particular source
logger.debug(f'saving empty relationship so that {permalink} will not be searched again')
SyndicatedPost.insert_original_blank(source, permalink)
# only return results that are not in the preexisting list
new_results = {}
for syndurl, syndposts_for_url in results.items():
for syndpost in syndposts_for_url:
if syndpost not in preexisting:
new_results.setdefault(syndurl, []).append(syndpost)
if new_results:
logger.debug(f'discovered relationships {new_results}')
return new_results
def _process_syndication_urls(source, permalink, syndication_urls,
preexisting):
r"""Process a list of syndication URLs looking for one that matches the
current source. If one is found, stores a new :class:`models.SyndicatedPost`
in the db.
Args:
source (models.Source)
permalink (str): the current ``h-entry`` permalink
syndication_urls (sequence of str): the unfitered list of syndication urls
preexisting: list of models.SyndicatedPost: previously discovered
Returns:
dict: maps str syndication url to list of :class:`models.SyndicatedPost`\s
"""
results = {}
# save the results (or lack thereof) to the db, and put them in a
# map for immediate use
for url in syndication_urls:
# source-specific logic to standardize the URL
url = source.canonicalize_url(url)
if not url:
continue
# TODO: save future lookups by saving results for other sources too (note:
# query the appropriate source subclass by author.domains, rather than
# author.domain_urls)
#
# we may have already seen this relationship, save a DB lookup by
# finding it in the preexisting list
relationship = next((sp for sp in preexisting
if sp.syndication == url
and sp.original == permalink), None)
if not relationship:
logger.debug(f'saving discovered relationship {url} -> {permalink}')
relationship = SyndicatedPost.insert(source, syndication=url, original=permalink)
results.setdefault(url, []).append(relationship)
return results
def _get_author_urls(source):
max = models.MAX_AUTHOR_URLS
urls = source.get_author_urls()
if len(urls) > max:
logger.warning(f'user has over {max} URLs! only running PPD on {urls[:max]}. skipping {urls[max:]}.')
urls = urls[:max]
return urls