Created
January 15, 2020 22:46
-
-
Save skylarmb/0cecf20afe16ba1959681c838ce9b3a2 to your computer and use it in GitHub Desktop.
Example implementation of Plaid webhook verification
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import hashlib | |
import hmac | |
import time | |
import requests | |
from jose import jwt | |
# Plaid client credentials. | |
CLIENT_ID = 'PLAID_CLIENT_ID' | |
SECRET = 'PLAID_SECRET' | |
# Endpoint for getting public verification keys. | |
ENDPOINT = 'https://production.plaid.com/webhook_verification_key/get' | |
# Cache for webhook validation keys. | |
KEY_CACHE = {} | |
def verify(body, headers): | |
signed_jwt = headers.get('Plaid-Verification') | |
current_key_id = jwt.get_unverified_header(signed_jwt)['kid'] | |
# If the key is not in the cache, update all non-expired keys. | |
if current_key_id not in KEY_CACHE: | |
keys_ids_to_update = [key_id for key_id, key in KEY_CACHE.items() | |
if key['expired_at'] is None] | |
keys_ids_to_update.append(current_key_id) | |
for key_id in keys_ids_to_update: | |
r = requests.post(ENDPOINT, json={ | |
'client_id': CLIENT_ID, | |
'secret': SECRET, | |
'key_id': key_id | |
}) | |
# If this is the case, the key ID may be invalid. | |
if r.status_code != 200: | |
continue | |
response = r.json() | |
key = response['key'] | |
KEY_CACHE[key_id] = key | |
# If the key ID is not in the cache, the key ID may be invalid. | |
if current_key_id not in KEY_CACHE: | |
return False | |
# Fetch the current key from the cache. | |
key = KEY_CACHE[current_key_id] | |
# Reject expired keys. | |
if key['expired_at'] is not None: | |
return False | |
# Validate the signature and extract the claims. | |
try: | |
claims = jwt.decode(signed_jwt, key, algorithms=['ES256']) | |
except jwt.JWTError: | |
return False | |
# Ensure that the token is not expired. | |
if claims["iat"] < time.time() - 5 * 60: | |
return False | |
# Compute the has of the body. | |
m = hashlib.sha256() | |
m.update(body.encode()) | |
body_hash = m.hexdigest() | |
# Ensure that the hash of the body matches the claim. | |
# Use constant time comparison to prevent timing attacks. | |
return hmac.compare_digest(body_hash, claims['request_body_sha256']) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment