|
|
@@ -0,0 +1,93 @@ |
|
|
import boto3 |
|
|
from google.appengine.ext import testbed |
|
|
|
|
|
import json |
|
|
from google.appengine.api import mail |
|
|
from google.appengine.api import urlfetch |
|
|
|
|
|
|
|
|
testbed = testbed.Testbed() |
|
|
testbed.activate() |
|
|
testbed.init_datastore_v3_stub() |
|
|
testbed.init_memcache_stub() |
|
|
testbed.init_urlfetch_stub() |
|
|
|
|
|
def _encode_safely(s): |
|
|
"""Helper to turn a unicode string into 8-bit bytes.""" |
|
|
if isinstance(s, unicode): |
|
|
s = s.encode('utf-8') |
|
|
return s |
|
|
|
|
|
|
|
|
|
|
|
def aws_execute(client, method, par): |
|
|
# Perform an AWS method call by wrapping it in urlfetch.fetch() |
|
|
# Direct boto3.client.method() calls times out as GAE blocks it |
|
|
post = client.generate_presigned_post(method, Params=par, ExpiresIn=100, HttpMethod="GET") |
|
|
hdrs = {"Accept": "application/json", } # request JSON results in body. |
|
|
result = urlfetch.fetch(url, method=urlfetch.POST, deadline=5, headers=hdrs) |
|
|
if result.status_code // 100 != 2: # no 2xx from AWS? |
|
|
logging.info("AWS %s FAIL: %s", method, str(result)) |
|
|
raise RuntimeError("AWS %s call failed with %d" % (method, result.status_code)) |
|
|
if result.content: |
|
|
return json.loads(result.content) |
|
|
return {} |
|
|
|
|
|
my_email = '[email protected]' |
|
|
to = ['[email protected]'] |
|
|
subject = 'test subject' |
|
|
body = "test small body / " * 1000 |
|
|
reply_to = [ '[email protected]' ] |
|
|
cc = ['[email protected]'] |
|
|
patch = None |
|
|
def main(): |
|
|
send_args = {'sender': my_email, |
|
|
'to': [_encode_safely(address) for address in to], |
|
|
'subject': _encode_safely(subject), |
|
|
'body': _encode_safely(body), |
|
|
'reply_to': _encode_safely(reply_to)} |
|
|
if cc: |
|
|
send_args['cc'] = [_encode_safely(address) for address in cc] |
|
|
if patch: |
|
|
send_args['attachments'] = [('issue_%s_patch.diff' % issue.key.id(), |
|
|
patch)] |
|
|
|
|
|
with open("secrets.json", "r") as f: |
|
|
SECRETS = json.loads(f.read()) |
|
|
|
|
|
attempts = 0 |
|
|
while True: |
|
|
try: |
|
|
client = boto3.client('ses', 'us-east-1', aws_access_key_id=SECRETS['AWS_ACCESS_KEY_ID'], aws_secret_access_key=SECRETS['AWS_SECRET_ACCESS_KEY']) |
|
|
|
|
|
par = { |
|
|
'Source':my_email, |
|
|
'Destination': { |
|
|
'ToAddresses': send_args["to"], |
|
|
}, |
|
|
"ReplyToAddresses": send_args["reply_to"], |
|
|
'Message':{"Subject":{"Data":send_args["subject"]}, |
|
|
"Body":{ |
|
|
"Text":{"Data":send_args["body"].decode("utf-8"), "Charset":"UTF-8" }, |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
if send_args.get('cc', None): |
|
|
par["Destination"]["CcAddresses"] = send_args["cc"] |
|
|
|
|
|
r = aws_execute(client, 'send_email', par) |
|
|
break |
|
|
except mail.InvalidSenderError: |
|
|
if django_settings.RIETVELD_INCOMING_MAIL_ADDRESS: |
|
|
previous_sender = send_args['sender'] |
|
|
if previous_sender not in send_args['to']: |
|
|
send_args['to'].append(previous_sender) |
|
|
send_args['sender'] = django_settings.RIETVELD_INCOMING_MAIL_ADDRESS |
|
|
else: |
|
|
raise |
|
|
if attempts: |
|
|
logging.warning("Retried sending email %s times", attempts) |
|
|
|
|
|
|
|
|
main() |