Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Work around crazy emails with non-base64 encoded attachments #283

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 68 additions & 8 deletions src/gmv/imap_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,76 @@
import gmv.gmvault_utils as gmvault_utils
import gmv.mod_imap as mimap

import email.parser

LOG = log_utils.LoggerFactory.get_logger('imap_utils')


re_valid_base64 = re.compile('^[A-Za-z0-9+/=\r\n]*$')
def is_valid_base64_email_payload(payload):
"""Check if a base64-encoded email payload is valid

A payload is a part of a multipart message for instance.

NOTE: There is no base64 alphabet validating function
in the standard library so we use our own.
We only permit the standard base64 alphabet and CRLF.
"""
re_match = re_valid_base64.match(payload)
return re_match is not None


def check_email_payload_encodings(email_body):
"""
Check whether emails-to-push have correctly specified encodings.
If there is binary data where base64 is expected, gmail will break
and interpret the ("random") binary data as part of the SMTP or
IMAP protocol metadata or something.

See: https://github.com/gaubert/gmvault/pull/283
"""
parser = email.parser.Parser()
message = parser.parsestr(email_body)

for submsg in message.walk():
# We can only check non-multipart, parts, for validity.
if submsg.is_multipart():
continue

cte = submsg.get('content-transfer-encoding', '').lower()

# Only "falsely" base64-encoded emails have had problems with gmail servers so far,
# so we don't bother trying to validate the rest.
if cte != 'base64':
continue

if not is_valid_base64_email_payload(submsg._payload):
raise PushEmailError(
"Bad data encoding: base64-encoded data contains non-base64 characters.",
permanent = True,
)


class PushEmailError(Exception):
"""
PushEmail Error
"""
def __init__(self, a_msg, quarantined = False):
def __init__(self, a_msg, quarantined = False, permanent = False):
"""
Constructor
"""
super(PushEmailError, self).__init__(a_msg)
self._in_quarantine = quarantined
self._permanent = permanent

def quarantined(self):
""" Get email to quarantine """
return self._in_quarantine

def is_permanent(self):
""" Is the error permanent? Should we attempt to retry? """
return self._permanent

class LabelError(Exception):
"""
LabelError. Exception send when there is an error when adding labels to message
Expand Down Expand Up @@ -134,12 +187,17 @@ def wrapper(*args, **kwargs): #pylint:disable=C0111,R0912

LOG.debug("error message = %s. traceback:%s" % (p_err, gmvault_utils.get_exception_traceback()))

if nb_tries[0] < a_nb_tries:
LOG.critical("Cannot reach the Gmail server. Wait %s second(s) and retrying." % (m_sleep_time[0]))
if p_err.is_permanent():
LOG.critical('Permanent error in email message: {}'.format(p_err))
break

else:
LOG.critical("Stop retrying, tried too many times ...")

reconnect(args[0], nb_tries, a_nb_tries, p_err, m_sleep_time)
if nb_tries[0] < a_nb_tries:
LOG.critical("Cannot reach the Gmail server. Wait %s second(s) and retrying." % (m_sleep_time[0]))
else:
LOG.critical("Stop retrying, tried too many times ...")

reconnect(args[0], nb_tries, a_nb_tries, p_err, m_sleep_time)

except imaplib.IMAP4.abort, err: #abort is recoverable and error is not

Expand Down Expand Up @@ -796,10 +854,12 @@ def push_data(self, a_folder, a_body, a_flags, a_internal_time):
#msg = "a_folder = %s" % (a_folder.encode('utf-8'))
#msg = msg.encode('utf-8')
#print(msg)
check_email_payload_encodings(a_body)

res = None
try:
#a_body = self._clean_email_body(a_body)
res = self.server.append(a_folder, a_body, a_flags, a_internal_time)
res = self.server.append(a_folder, a_body, a_flags, a_internal_time)

except imaplib.IMAP4.abort, err:
# handle issue when there are invalid characters (This is do to the presence of null characters)
if str(err).find("APPEND => Invalid character in literal") >= 0:
Expand Down