This document outlines the design and implementation of a Snowflake-native system for processing and generating vector embeddings using Python UDFs. It replaced a distributed EC2-based solution, reduced operational overhead, and enabled scalable, SQL-driven vector workflows within our analytics platform.
Our team needed to perform operations on embedding vectors (e.g., similarity scoring, normalization, distance calculations) using billions of records stored in our Snowflake database.
However, the existing workflow—relying on distributed Python scripts running on EC2 and ingesting CSVs from S3—was operationally brittle, inefficient, and difficult to maintain.
Therefore, I had the idea of implementing Snowflake user-defined-functions using Python, that allowed us to improve the process significantly, reducing costs, time spent and complexity.
- Enable vector operations (e.g., normalization, cosine similarity) via SQL queries in Snowflake
- Allow on-demand embedding generation using our own sentence-transformer models
- Maintain performance suitable for batch processing and near-real-time scoring
- Use native Snowflake capabilities (UDFs) to avoid infrastructure sprawl
- Use our own embedding models via Sentence Transformers library
- Avoid overhead of managing Python environments with hundreds of dependencies
- Build a full-featured vector database
- Use Snowflake native vectors functions
- Python UDFs were written in a local project that used uv for dependency management, then pushed to a private GitHub repo from which Snowflake could fetch the Python modules via external Git integration and deploy the functions, allowing us to have a full CI/CD pipeline.
- Each UDF handles a specific vector operation, such as:
parse_address_with_libpostal(address VARCHAR)
embed_address(address VARCHAR)
embed_organization(company_name VARCHAR)
- Snowflake’s Anaconda integration was used to load dependencies (like
postal
,pandas
,numpy
,pymilvus
,sentence-transformers
). - Snowflake's AWS S3 integration was used to load external assets, like our custom embedding models:
CREATE OR REPLACE FUNCTION PUBLIC.SENTENCE_TRANSFORMERS_ENCODE_ADDRESS_128("ADDRESS" VARCHAR)
RETURNS ARRAY(FLOAT)
LANGUAGE PYTHON
RUNTIME_VERSION = 3.11
PACKAGES = ('pandas','sentence-transformers')
IMPORTS = (
'@GITHUB.SNOWFLAKE_PYTHON/branches/main/user_defined_functions/sentence_transformers/encode_address_128.py',
'@MODELS/address-v1-128dim.zip')
HANDLER = 'encode_address_128.main';
import fcntl
import os
import sys
import threading
import zipfile
from pandas import DataFrame
from sentence_transformers import SentenceTransformer
from _snowflake import vectorized
@vectorized(input=DataFrame)
def main(df: DataFrame):
embeddings = model.encode(df[0])
return embeddings
# File lock class for synchronizing write access to /tmp.
class FileLock:
def __enter__(self):
self._lock = threading.Lock()
self._lock.acquire()
self._fd = open('/tmp/lockfile.LOCK', 'w+')
fcntl.lockf(self._fd, fcntl.LOCK_EX)
def __exit__(self, type, value, traceback):
self._fd.close()
self._lock.release()
# Get the location of the import directory. Snowflake sets the import
# directory location so code can retrieve the location via sys._xoptions.
IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
# Get the path to the ZIP file and set the location to extract to.
zip_file_path = import_dir + "address-v1-128dim.zip"
extracted = '/tmp/models'
# Extract the contents of the ZIP. This is done under the file lock
# to ensure that only one worker process unzips the contents.
with FileLock():
if not os.path.isdir(extracted + '/address-v1-128dim'):
with zipfile.ZipFile(zip_file_path, 'r') as myzip:
myzip.extractall(extracted)
model = SentenceTransformer(extracted + '/address-v1-128dim')
sequenceDiagram
Snowflake ->> AWS S3: export CSV with addresses for embedding
AWS S3 ->> AWS EC2 producer: list all CSV files
AWS EC2 producer ->> AWS SQS: write to queue
AWS SQS ->> AWS EC2 consumer: read from queue
Note right of AWS EC2 consumer: stream CSV file from S3<br/>parse address with libpostal<br/>embed parsed address with<br/>sentence-transformers.
AWS EC2 consumer ->> AWS S3: export the libpostal JSON and vectors JSON in CSV files
AWS S3 ->> Snowflake: import the CSV files into new tables
Everything run in Snowflake:
flowchart TD
A[ORIGINAL ADDRESSES] --> B(LIBPOSTAL_PARSE_ADDRESS)
B --> C[PARSED ADDRESSES]
C --> D(SENTENCE_TRANSFORMERS_ENCODE_ADDRESS_128)
D --> E[EMBEDDED ADDRESSES]
from pandas import DataFrame
from postal.parser import parse_address
from _snowflake import vectorized
@vectorized(input=DataFrame)
def main(df: DataFrame):
return df.iloc[:, 0].apply(lambda address: {k: v for (v, k) in parse_address(address)})
CREATE OR REPLACE FUNCTION PUBLIC.LIBPOSTAL_PARSE_ADDRESS("ADDRESS" VARCHAR)
RETURNS OBJECT
LANGUAGE PYTHON
RUNTIME_VERSION = 3.11
PACKAGES = ('pandas','postal')
IMPORTS = ('@GITHUB.SNOWFLAKE_PYTHON/branches/main/user_defined_functions/libpostal/parse_address.py')
HANDLER = 'parse_address.main';
Query:
SELECT PUBLIC.LIBPOSTAL_PARSE_ADDRESS('5203 NORTH RED MAPLE STREET, IRONS, MI 49644');
Output:
{
"city": "irons",
"house_number": "5203",
"postcode": "49644",
"road": "north red maple street",
"state": "mi"
}
- Considered pre-processing vectors externally and storing normalized versions in the DB, but this added maintenance burden and decreased flexibility.
- Considered using Snowpark for Python but opted for simple UDFs due to lower overhead and easier integration into SQL pipelines.
- UDFs run in Snowflake’s secure sandboxed Python environment.
- Strict control over secrets and external access using Snowflake's
EXTERNAL_ACCESS_INTEGRATIONS
andSECRETS
:
CREATE OR REPLACE FUNCTION ZILLIZ_CLOUD.DELETE_ENTITIES_BY_PRIMARY_KEY("COLLECTION_NAME" VARCHAR, "ID" VARIANT, "PARTITION_NAME" VARCHAR DEFAULT '')
RETURNS OBJECT
LANGUAGE PYTHON
RUNTIME_VERSION = 3.11
PACKAGES = ('pandas','pymilvus')
IMPORTS = ('@GITHUBP.SNOWFLAKE_PYTHON/branches/main/user_defined_functions/zilliz/delete_entities_by_primary_key.py')
HANDLER = 'delete_entities_by_primary_key.main'
EXTERNAL_ACCESS_INTEGRATIONS = (ZILLIZ_CLOUD_API_ACCESS_INTEGRATION)
SECRETS = (
'api_token' = ZILLIZ_CLOUD.API_TOKEN_SECRET,
'api_endpoint' = ZILLIZ_CLOUD.DEDICATED_CLUSTER_ENDPOINT_SECRET,
'db_name' = ZILLIZ_CLOUD.DEDICATED_CLUSTER_ENTITY_NAMES_DB_SECRET);
- Performance acceptable for datasets with hundreds of millions of rows in batch mode.
- Easily scale up by using larger Snowflake warehouses without the overhead of managing hundreds of AWS EC2 instances.
- Batches thousands of rows using the vectorized functions that groups rows into Pandas Dataframes instead of one function call for every row.
- Created integration tests using a small Snowflake warehouse with data samples in a staging environment.
- Deployed using versioned UDFs (e.g.,
LIBPOSTAL_PARSE_ADDRESS_V1
,V2
, etc.) to allow for safe upgrades. - Deploying using Github Actions with pipelines for testing and Snowflake operations
- Used Snowflake’s query history and profiling tools to observe UDF execution times.
- Monitored for UDF failures or memory issues (e.g., vector too large, eval errors).
- Logged vector dimensions and runtimes to Snowflake audit logs for later analysis.
- Python UDFs currently have limited GPU access.
- Snowflake Anaconda channel doesn't have the most recent versions for several packages and has limited packages compared to PyPi