Created
September 13, 2022 13:53
-
-
Save ldonjibson/368388a1dee1f0b904f00d0185b0da5e to your computer and use it in GitHub Desktop.
Backend to use tribearc mail
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from anymail.backends.base_requests import AnymailRequestsBackend, RequestsPayload | |
from anymail.exceptions import AnymailRequestsAPIError | |
from anymail.message import AnymailRecipientStatus | |
from anymail.utils import get_anymail_setting | |
from django.conf import settings | |
import requests | |
class EmailBackend(AnymailRequestsBackend): | |
""" | |
Tribearc v1 API Email Backend | |
""" | |
esp_name = "Tribearc" | |
def __init__(self, **kwargs): | |
"""Init options from Django settings""" | |
esp_name = self.esp_name | |
# self.api_key = get_anymail_setting( | |
# "api_key", esp_name=esp_name, kwargs=kwargs, allow_bare=True | |
# ) | |
anym = settings.ANYMAIL | |
print(anym, "settings") | |
self.api_key = anym['TRIBEARC_API_KEY'] | |
self.api_url = anym['TRIBEARC_URL'] | |
self.reply_to = anym['REPLY_TO'] | |
self.from_name = anym['FROM_NAME'] | |
self.business_address = anym['BUSINESS_ADDRESS'] | |
self.business_name = anym['BUSINESS_NAME'] | |
# Required, as there is no hosted instance of Postal | |
# api_url = get_anymail_setting("api_url", esp_name=esp_name, kwargs=kwargs) | |
# if not api_url.endswith("/"): | |
# api_url += "/" | |
api_url = self.api_url | |
super().__init__(api_url, **kwargs) | |
def build_message_payload(self, message, defaults): | |
return TribearcPayload(message, defaults, self) | |
def post_to_esp(self, payload, message): | |
"""Post payload to ESP send API endpoint, and return the raw response. | |
payload is the result of build_message_payload | |
message is the original EmailMessage | |
return should be a requests.Response | |
Can raise AnymailRequestsAPIError for HTTP errors in the post | |
""" | |
params = payload.get_request_params(self.api_url) | |
# params['data']['html_body'] = "htmlbody" | |
# params['data']['html_text'] = "html" | |
params.setdefault('timeout', 20)#self.timeout) | |
params_x = json.loads(params["data"]) | |
# return params_x | |
header_x = params["headers"] | |
# params_x['html_body'] = "oooo" | |
# params_x['html_text'] = "oooo" | |
params_x['emails'] = params_x['to'] | |
if "<" in params_x['from']: | |
params_x['from_email'] = params_x['from'].split("<")[1].split(">")[0] | |
else: | |
params_x['from_email'] = params_x['from'] | |
try: | |
# response = self.session.request(**params) | |
response = requests.post(self.api_url, data=params_x, headers={}, files=[]) | |
except requests.RequestException as err: | |
# raise an exception that is both AnymailRequestsAPIError | |
# and the original requests exception type | |
exc_class = type('AnymailRequestsAPIError', (AnymailRequestsAPIError, type(err)), {}) | |
raise exc_class( | |
"Error posting to %s:" % params.get('url', '<missing url>'), | |
email_message=message, payload=params_x) from err | |
self.raise_for_status(response, params_x, message) | |
return response | |
def deserialize_json_response(self, response, payload, message): | |
"""Deserialize an ESP API response that's in json. | |
Useful for implementing deserialize_response | |
""" | |
try: | |
# print(response, "0000000000000.") | |
return response | |
except ValueError as err: | |
# print(err, "err::::::::::::::::") | |
raise AnymailRequestsAPIError("Invalid JSON in %s API response" % self.esp_name, | |
email_message=message, payload=payload, response=response, | |
backend=self) from err | |
def parse_recipient_status(self, response, payload, message): | |
res = {"response": str(response.text)} | |
try: | |
parsed_response = self.deserialize_json_response(res, payload, message) | |
# print(parsed_response, message, "====||||=>") | |
if "Message sent" not in str(res): | |
raise AnymailRequestsAPIError( | |
email_message=message, payload=payload, response=response, backend=self | |
) | |
# If we get here, the send call was successful. | |
messages = {} #parsed_response["data"]["messages"] | |
# print(message.message, payload.data, "messages") | |
# return { | |
# payload.data['to']: AnymailRecipientStatus(message_id=details["id"], status="queued") for email, details in messages.items() | |
# } | |
return { | |
payload.data['to'][0]: AnymailRecipientStatus(message_id=payload.data['to'][0], status="sent") | |
#for from_name, details in payload.data.items() | |
} | |
except Exception as e: | |
raise e | |
class TribearcPayload(RequestsPayload): | |
def __init__(self, message, defaults, backend, *args, **kwargs): | |
http_headers = kwargs.pop("headers", {}) | |
# http_headers["X-Server-API-Key"] = backend.api_key | |
http_headers["Content-Type"] = "multipart/form-data" | |
# http_headers["Accept"] = "application/json" | |
self.backend = backend | |
super().__init__( | |
message, defaults, backend, headers=http_headers, *args, **kwargs | |
) | |
def get_api_endpoint(self): | |
return self.backend.api_url#"https://mail.tribearc.com/api/campaigns/send_now.php" | |
def init_payload(self): | |
# print(self.backend) | |
backend = self.backend | |
self.data = { | |
'api_key': backend.api_key, | |
'json': '1', | |
'from_name': backend.from_name, | |
'reply_to': backend.reply_to, | |
'track_opens': '1', | |
'track_clicks': '1', | |
'send_campaign': '1', | |
'business_address': backend.business_address, | |
'business_name': backend.business_name | |
} | |
def serialize_data(self): | |
# print(self.data, "sjfjsfgfjggjsgjgjfs====>") | |
return self.serialize_json(self.data) | |
def set_from_email(self, email): | |
self.data["from"] = str(email) | |
def set_subject(self, subject): | |
self.data["subject"] = subject | |
def set_to(self, emails): | |
self.data["to"] = [str(email) for email in emails] | |
def set_cc(self, emails): | |
self.data["cc"] = [str(email) for email in emails] | |
def set_bcc(self, emails): | |
self.data["bcc"] = [str(email) for email in emails] | |
def set_reply_to(self, emails): | |
if len(emails) > 1: | |
self.unsupported_feature("multiple reply_to addresses") | |
if len(emails) > 0: | |
self.data["reply_to"] = str(emails[0]) | |
def set_extra_headers(self, headers): | |
self.data["headers"] = headers | |
def set_text_body(self, body): | |
self.data["plain_body"] = body | |
self.data["html_text"] = body | |
def set_html_body(self, body): | |
if "html_body" in self.data: | |
self.unsupported_feature("multiple html parts") | |
self.data["html_body"] = body | |
self.data["html_text"] = body | |
def make_attachment(self, attachment): | |
"""Returns Postal attachment dict for attachment""" | |
att = { | |
"name": attachment.name or "", | |
"data": attachment.b64content, | |
"content_type": attachment.mimetype, | |
} | |
if attachment.inline: | |
# see https://github.com/postalhq/postal/issues/731 | |
# but it might be possible with the send/raw endpoint | |
self.unsupported_feature('inline attachments') | |
return att | |
def set_attachments(self, attachments): | |
if attachments: | |
self.data["attachments"] = [ | |
self.make_attachment(attachment) for attachment in attachments | |
] | |
def set_envelope_sender(self, email): | |
self.data["sender"] = str(email) | |
def set_tags(self, tags): | |
if len(tags) > 1: | |
self.unsupported_feature("multiple tags") | |
if len(tags) > 0: | |
self.data["tag"] = tags[0] | |
def set_esp_extra(self, extra): | |
self.data.update(extra) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment