Skip to content

Instantly share code, notes, and snippets.

@nimaxin
Created April 26, 2025 10:23
Show Gist options
  • Save nimaxin/1ff6fe93293ea2194112401e9e4e6de9 to your computer and use it in GitHub Desktop.
Save nimaxin/1ff6fe93293ea2194112401e9e4e6de9 to your computer and use it in GitHub Desktop.
SQLAlchemy's custom type to store timezone aware datetimes in PostgreSQL (via asyncpg) and automatically convert to local timezones (e.g., Europe/Berlin) when querying. Solves timezone handling for databases with clean ORM integration.
import asyncio
from datetime import datetime, timezone
import pytz
import sqlalchemy as sa
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy.types import TypeDecorator
class DatetimeWithTimezone(TypeDecorator):
impl = sa.DateTime(timezone=True)
cache_ok = True
def __init__(self, tz: str = "UTC", *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.tz = pytz.timezone(tz)
def process_result_value(self, value: datetime, dialect) -> datetime | None:
return value.astimezone(self.tz) if value else None
# ------------- USAGE DEMO AND TEST -------------
class Base(DeclarativeBase):
pass
class Product(Base):
__tablename__ = "product"
id: Mapped[int] = mapped_column(primary_key=True)
created_at: Mapped[datetime] = mapped_column(
DatetimeWithTimezone(tz="Europe/Berlin"),
server_default=func.now(),
)
engine = create_async_engine(
"postgresql+asyncpg://postgres:[email protected]:5000/testdb"
)
Session = async_sessionmaker(engine, expire_on_commit=False, autoflush=False)
async def create_tables():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
async def created_test_products():
test_datetimes = [
datetime.now(tz=pytz.timezone("Europe/Berlin")),
datetime.now(timezone.utc),
None,
]
async with Session() as session, session.begin():
for dt in test_datetimes:
product = Product(created_at=dt)
session.add(product)
await session.commit()
async def read_products():
async with Session() as session, session.begin():
stmt = select(Product)
result = await session.execute(stmt)
return result.scalars().all()
async def main():
await create_tables()
await created_test_products()
products = await read_products()
for product in products:
print(product.created_at)
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment