forked from Charcoal-SE/SmokeDetector
-
Notifications
You must be signed in to change notification settings - Fork 0
/
spamhandling.py
232 lines (213 loc) · 12.5 KB
/
spamhandling.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# coding=utf-8
import sys
from threading import Thread
from findspam import FindSpam
import datahandling
from globalvars import GlobalVars
from datetime import datetime
import parsing
import metasmoke
import deletionwatcher
import excepthook
# noinspection PyCompatibility
import regex
import time
from classes import Post, PostParseError
from helpers import log
# noinspection PyMissingTypeHints
def should_whitelist_prevent_alert(user_url, reasons):
is_whitelisted = datahandling.is_whitelisted_user(parsing.get_user_from_url(user_url))
if not is_whitelisted:
return False
reasons_comparison = [r for r in set(reasons) if "username" not in r]
return len(reasons_comparison) == 0
# noinspection PyMissingTypeHints
def should_reasons_prevent_tavern_posting(reasons):
reasons_comparison = [r for r in set(reasons) if r not in GlobalVars.non_tavern_reasons]
return len(reasons_comparison) == 0
# noinspection PyMissingTypeHints
def check_if_spam(post):
# if not post.body:
# body = ""
# test, why = FindSpam.test_post(title, body, user_name, post_site,
# is_answer, body_is_summary, owner_rep, post_score)
test, why = FindSpam.test_post(post)
if datahandling.is_blacklisted_user(parsing.get_user_from_url(post.user_url)):
test.append("blacklisted user")
blacklisted_user_data = datahandling.get_blacklisted_user_data(parsing.get_user_from_url(post.user_url))
if len(blacklisted_user_data) > 1:
if blacklisted_user_data[1] == "metasmoke":
blacklisted_by = "the metasmoke API"
else:
blacklisted_by = "http:" + blacklisted_user_data[1]
blacklisted_post_url = blacklisted_user_data[2]
if blacklisted_post_url:
rel_url = blacklisted_post_url.replace("http:", "", 1)
why += u"\nBlacklisted user - blacklisted for {} (" \
u"https://m.erwaysoftware.com/posts/by-url?url={}) by {}".format(blacklisted_post_url, rel_url,
blacklisted_by)
else:
why += u"\n" + u"Blacklisted user - blacklisted by {}".format(blacklisted_by)
if 0 < len(test):
if datahandling.has_already_been_posted(post.post_site, post.post_id, post.title) \
or datahandling.is_false_positive((post.post_id, post.post_site)) \
or should_whitelist_prevent_alert(post.user_url, test) \
or datahandling.is_ignored_post((post.post_id, post.post_site)) \
or datahandling.is_auto_ignored_post((post.post_id, post.post_site)):
return False, None, "" # Don't repost. Reddit will hate you.
return True, test, why
return False, None, ""
# noinspection PyMissingTypeHints
def check_if_spam_json(json_data):
try:
post = Post(json_data=json_data)
except PostParseError as err:
log('error', 'Parse error {0} when parsing json_data {1!r}'.format(
err, json_data))
return False, '', ''
is_spam, reason, why = check_if_spam(post)
return is_spam, reason, why
# noinspection PyBroadException,PyProtectedMember
def handle_spam(post, reasons, why):
post_url = parsing.to_protocol_relative(parsing.url_to_shortlink(post.post_url))
poster_url = parsing.to_protocol_relative(parsing.user_url_to_shortlink(post.user_url))
reason = ", ".join(reasons[:5])
if len(reasons) > 5:
reason += ", +{} more".format(len(reasons) - 5)
reason = reason[:1].upper() + reason[1:] # reason is capitalised, unlike the entries of reasons list
shortened_site = post.post_site.replace("stackexchange.com", "SE") # site.stackexchange.com -> site.SE
datahandling.append_to_latest_questions(post.post_site, post.post_id, post.title if not post.is_answer else "")
if len(reasons) == 1 and ("all-caps title" in reasons or
"repeating characters in title" in reasons or
"repeating characters in body" in reasons or
"repeating characters in answer" in reasons or
"repeating words in title" in reasons or
"repeating words in body" in reasons or
"repeating words in answer" in reasons):
datahandling.add_auto_ignored_post((post.post_id, post.post_site, datetime.now()))
if why is not None and why != "":
datahandling.add_why(post.post_site, post.post_id, why)
if post.is_answer and post.post_id is not None and post.post_id is not "":
datahandling.add_post_site_id_link((post.post_id, post.post_site, "answer"), post.parent.post_id)
try:
post._title = parsing.escape_special_chars_in_title(post.title)
if post.is_answer:
# If the post is an answer type post, the 'title' is going to be blank, so when posting the
# message contents we need to set the post title to the *parent* title, so the message in the
# chat is properly constructed with parent title instead. This will make things 'print'
# in a proper way in chat messages.
sanitized_title = regex.sub('(https?://|\n)', '', post.parent.title)
else:
sanitized_title = regex.sub('(https?://|\n)', '', post.title)
sanitized_title = regex.sub(r'([\]*`])', r'\\$1', sanitized_title).replace('\n', u'\u23CE')
prefix = u"[ [SmokeDetector](//goo.gl/eLDYqh) ]"
if GlobalVars.metasmoke_key:
prefix_ms = u"[ [SmokeDetector](//goo.gl/eLDYqh) | [MS](//m.erwaysoftware.com/posts/by-url?url=" + \
post_url + ") ]"
else:
prefix_ms = prefix
if not post.user_name.strip() or (not poster_url or poster_url.strip() == ""):
s = u" {}: [{}]({}) by a deleted user on `{}`".format(reason, sanitized_title.strip(), post_url,
shortened_site)
username = ""
else:
s = u" {}: [{}]({}) by [{}]({}) on `{}`".format(reason, sanitized_title.strip(), post_url,
post.user_name.strip(), poster_url, shortened_site)
username = post.user_name.strip()
t_metasmoke = Thread(name="metasmoke send post",
target=metasmoke.Metasmoke.send_stats_on_post,
args=(post.title_ignore_type, post_url, reasons, post.body, username,
post.user_link, why, post.owner_rep, post.post_score,
post.up_vote_count, post.down_vote_count))
t_metasmoke.start()
log('debug', GlobalVars.parser.unescape(s).encode('ascii', errors='replace'))
if time.time() >= GlobalVars.blockedTime["all"]:
datahandling.append_to_latest_questions(post.post_site, post.post_id, post.title)
if time.time() >= GlobalVars.blockedTime[GlobalVars.charcoal_room_id]:
chq_pings = datahandling.get_user_names_on_notification_list(
"stackexchange.com",
GlobalVars.charcoal_room_id,
post.post_site,
GlobalVars.wrap)
chq_msg = prefix + s
chq_msg_pings = prefix + datahandling.append_pings(s, chq_pings)
chq_msg_pings_ms = prefix_ms + datahandling.append_pings(s, chq_pings)
msg_to_send = chq_msg_pings_ms if len(chq_msg_pings_ms) <= 500 else chq_msg_pings \
if len(chq_msg_pings) <= 500 else chq_msg[0:500]
try:
GlobalVars.charcoal_hq.send_message(msg_to_send)
except AttributeError: # In our Test Suite
pass
# If it's all experimental rules, we are done.
# If not, see which other rooms this should perhaps be posted to.
if set(reasons).intersection(GlobalVars.experimental_reasons) != set(reasons):
if not should_reasons_prevent_tavern_posting(reasons) \
and post.post_site not in GlobalVars.non_tavern_sites \
and time.time() >= GlobalVars.blockedTime[GlobalVars.meta_tavern_room_id]:
tavern_pings = datahandling.get_user_names_on_notification_list(
"meta.stackexchange.com",
GlobalVars.meta_tavern_room_id,
post.post_site, GlobalVars.wrapm)
tavern_msg = prefix + s
tavern_msg_pings = prefix + datahandling.append_pings(s, tavern_pings)
tavern_msg_pings_ms = prefix_ms + datahandling.append_pings(s, tavern_pings)
msg_to_send = tavern_msg_pings_ms if len(tavern_msg_pings_ms) <= 500 else tavern_msg_pings \
if len(tavern_msg_pings) <= 500 else tavern_msg[0:500]
t_check_websocket = Thread(
name="deletionwatcher post message if not deleted",
target=deletionwatcher.DeletionWatcher.post_message_if_not_deleted,
args=((post.post_id, post.post_site,
"answer" if post.is_answer else "question"),
post_url, msg_to_send, GlobalVars.tavern_on_the_meta))
t_check_websocket.daemon = True
t_check_websocket.start()
if post.post_site == "stackoverflow.com" and reason not in GlobalVars.non_socvr_reasons \
and time.time() >= GlobalVars.blockedTime[GlobalVars.socvr_room_id]:
socvr_pings = datahandling.get_user_names_on_notification_list(
"stackoverflow.com",
GlobalVars.socvr_room_id,
post.post_site,
GlobalVars.wrapso)
socvr_msg = prefix + s
socvr_msg_pings = prefix + datahandling.append_pings(s, socvr_pings)
socvr_msg_pings_ms = prefix_ms + datahandling.append_pings(s, socvr_pings)
msg_to_send = socvr_msg_pings_ms if len(socvr_msg_pings_ms) <= 500 else socvr_msg_pings \
if len(socvr_msg_pings) <= 500 else socvr_msg[0:500]
try:
GlobalVars.socvr.send_message(msg_to_send)
except AttributeError: # In test Suite
pass
for specialroom in GlobalVars.specialrooms:
sites = specialroom["sites"]
if post.post_site in sites and reason not in specialroom["unwantedReasons"]:
room = specialroom["room"]
if room.id not in GlobalVars.blockedTime or time.time() >= GlobalVars.blockedTime[room.id]:
room_site = room._client.host
room_id = int(room.id)
room_pings = datahandling.get_user_names_on_notification_list(room_site, room_id,
post.post_site, room._client)
room_msg = prefix + s
room_msg_pings = prefix + datahandling.append_pings(s, room_pings)
room_msg_pings_ms = prefix_ms + datahandling.append_pings(s, room_pings)
msg_to_send = room_msg_pings_ms if len(room_msg_pings_ms) <= 500 else room_msg_pings \
if len(room_msg_pings) <= 500 else room_msg[0:500]
specialroom["room"].send_message(msg_to_send)
except:
exc_type, exc_obj, exc_tb = sys.exc_info()
excepthook.uncaught_exception(exc_type, exc_obj, exc_tb)
def handle_user_with_all_spam(user, why):
user_id = user[0]
site = user[1]
tab = "activity" if site == "stackexchange.com" else "topactivity"
s = "[ [SmokeDetector](//git.io/vgx7b) ] All of this user's posts are spam: [user {} on {}](//{}/users/{}?tab={})" \
.format(user_id, site, site, user_id, tab)
log('debug', GlobalVars.parser.unescape(s).encode('ascii', errors='replace'))
datahandling.add_why_allspam(user, why)
if time.time() >= GlobalVars.blockedTime[GlobalVars.charcoal_room_id]:
GlobalVars.charcoal_hq.send_message(s)
for specialroom in GlobalVars.specialrooms:
room = specialroom["room"]
if site in specialroom["sites"] and (
room.id not in GlobalVars.blockedTime or
time.time() >= GlobalVars.blockedTime[room.id]):
room.send_message(s)