Last active
May 27, 2021 10:14
-
-
Save tonywok/76498335b69d7f411a0fb8d06140e6f0 to your computer and use it in GitHub Desktop.
PostgresSQL Async DB Adapter
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
gem "db" | |
gem "db-postgres" | |
gem "thread-local" | |
require "db" | |
require "db/postgres" | |
require "active_record/connection_adapters/postgresql_adapter" | |
module ActiveRecord | |
# NOTE: This is the active record connection entry point. | |
# This method is called internally based on the adapter specified in database.yml | |
module ConnectionHandling # :nodoc: | |
def postgresql_db_connection(config) | |
conn_params = config.symbolize_keys.compact | |
# Map database.yaml config keys to db-postgres config keys | |
conn_params[:user] = conn_params.delete(:username) if conn_params[:username] | |
ConnectionAdapters::PostgreSQLDbAdapter.new( | |
ConnectionAdapters::PostgreSQLDbAdapter.new_client(conn_params), | |
logger, | |
conn_params, | |
config, | |
) | |
end | |
end | |
module ConnectionAdapters | |
# NOTE: We need to ensure that each thread that AR spins up utilizes the same underlying DB client | |
# Without this, | |
module ThreadLocalClients | |
extend Thread::Local | |
def self.local | |
{} | |
end | |
def self.lookup(adapter, credentials) | |
key = [adapter, credentials] | |
self.instance[key] ||= DB::Client.new(adapter.new(**credentials)) | |
end | |
end | |
# NOTE: We have to freedom patch the connection pool because it's currently pooling threads. | |
# We want each fiber to use its own connection | |
# There is some nuance here around transactions requiring sticky connections s.t fibers don't step on one another | |
class ConnectionPool | |
def connection_cache_key(_thread) | |
Fiber.current | |
end | |
end | |
# NOTE: Here we forward database interactions to db | |
# NOTE: Currently inheriting from PostgreSQLAdapter, but we may have to revisit this as it's super coupled to pg gem | |
# This will involve potentially adding necessary features to db and db-postgres, respectively. | |
class PostgreSQLDbAdapter < PostgreSQLAdapter | |
ADAPTER_NAME = "PostgreSQLDb" | |
class << self | |
def new_client(conn_params) | |
thread_local_client = ThreadLocalClients.lookup(DB::Postgres::Adapter, conn_params) | |
DbPostgresShim.new(thread_local_client) | |
end | |
end | |
class DbPostgresShim | |
NotImplemented = Class.new(StandardError) | |
attr_reader :client | |
def initialize(client) | |
@client = client | |
end | |
def query(statement) | |
session = client.session | |
result = nil | |
Sync do | |
result = session.call(statement) | |
ensure | |
session.close | |
end | |
result | |
end | |
def status | |
end | |
def reset | |
end | |
def transaction_status | |
end | |
def close | |
end | |
def socket_io | |
end | |
def server_version | |
query("SELECT VERSION()") | |
end | |
def exec_params(sql, type_casted_binds) | |
end | |
def exec_prepared(stmt_key, type_casted_binds) | |
end | |
def prepare(nextkey, sql) | |
end | |
def get_last_result | |
end | |
def set_client_encoding(encoding) | |
end | |
def type_map_for_queries=(map) | |
end | |
def type_map_for_results=(map) | |
end | |
def type_map_for_results | |
# returns something that responds to add_coder(@timestamp_decoder) | |
end | |
def method_missing(method, *args, &block) | |
raise(NotImplemented, "'#{method}' called with '#{args}'") | |
end | |
end | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment