Skip to content

Instantly share code, notes, and snippets.

@alpacas9
Last active June 30, 2023 09:27
Show Gist options
  • Select an option

  • Save alpacas9/7ac39ddfb4c4c7e8e6880e007ad500ce to your computer and use it in GitHub Desktop.

Select an option

Save alpacas9/7ac39ddfb4c4c7e8e6880e007ad500ce to your computer and use it in GitHub Desktop.
import datetime
import logging
from typing import Dict, List, Optional, Tuple
import requests
DATE_PAT = "%Y-%m-%d %H:%M UTC"
log = logging.getLogger("mailer")
MAX_BATCH = 300
class MessageStream:
class Transactional:
USERS_NEW = 'new-users'
USERS_ACTIVE = 'outbound'
class Broadcast:
USERS_ACTIVE = 'outbound'
USERS_INACTIVE = 'users-inactive'
USERS_NO_SUBSCRIPTION = 'users-no-subscription'
def postmark_send(
template: Dict, emails: List[Tuple[str, Dict]], message_stream: Optional[str] = None
) -> None:
log.info(f"PostmarkSend: {emails=} => {template=}")
for batch in chunks(emails, MAX_BATCH):
data = []
for email, template_params in batch:
body = {
"TemplateAlias": template["template_alias"],
"TemplateModel": template_params,
"To": email,
"From": f"FromWho <from>",
}
if message_stream:
body["MessageStream"] = message_stream
data.append(body)
postmark_send_batch(data)
def postmark_send_batch(data: List): -> None
resp = requests.post(
"https://api.postmarkapp.com/email/batchWithTemplates",
json={"Messages": data},
headers={"X-Postmark-Server-Token": "TOKEN", "Accept": "application/json"},
)
resp.raise_for_status()
def send_activation(email: str, link: str) -> None:
postmark_send(
{"template_alias": "template-name"},
[(email, {"activation_link": link})],
MessageStream.Transactional.USERS_NEW,
)
def send_account_confirmation(email: str) -> None:
postmark_send(
{"template_alias": "template-name"}, [(email, {})], MessageStream.Transactional.USERS_NEW
)
def send_password_reset(email: str, reset_link: str, token: str) -> None:
date_str = datetime.datetime.now().strftime(DATE_PAT)
postmark_send(
{"template_alias": "template-name"},
[(email, {"reset_link": reset_link, "date_time": date_str})],
MessageStream.Transactional.USERS_ACTIVE,
)
def send_temp_password(email: str, password: str) -> None:
date_str = datetime.datetime.now().strftime(DATE_PAT)
postmark_send(
{"template_alias": "template-name"},
[(email, {"temp_pass": password, "date_time": date_str})],
MessageStream.Transactional.USERS_ACTIVE,
)
def send_password_changed_notification(email: str) -> None:
postmark_send(
{"template_alias": "template-name"}, [(email, {})], MessageStream.Transactional.USERS_ACTIVE
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment