Created
July 10, 2018 18:53
-
-
Save mvrhov/08870b23efdad46791c30fb49bb8ef66 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type ConnPool struct { | |
pgx.ConnPool | |
} | |
type AfterAcquireFunc func(c *pgx.Conn) error | |
// NewConnPool creates a new ConnPool. config.ConnConfig is passed through to | |
// Connect directly. | |
func NewConnPool(config pgx.ConnPoolConfig) (p *ConnPool, err error) { | |
pp, err := pgx.NewConnPool(config) | |
return &ConnPool{ConnPool: *pp}, err | |
} | |
// AcquireEx takes exclusive use of a connection until it is released. | |
//If fn is provided, then it also executes the said function | |
func (p *ConnPool) AcquireEx(fn AfterAcquireFunc) (*pgx.Conn, error) { | |
c, err := p.ConnPool.Acquire() | |
if err == nil && fn != nil { | |
err = fn(c) | |
} | |
return c, err | |
} | |
// Exec acquires a connection, delegates the call to that connection, and releases the connection | |
func (p *ConnPool) Exec(fn AfterAcquireFunc, sql string, arguments ...interface{}) (commandTag pgx.CommandTag, err error) { | |
var c *pgx.Conn | |
if c, err = p.AcquireEx(fn); err != nil { | |
return | |
} | |
defer p.Release(c) | |
return c.Exec(sql, arguments...) | |
} | |
func (p *ConnPool) ExecEx(ctx context.Context, fn AfterAcquireFunc, sql string, options *pgx.QueryExOptions, arguments ...interface{}) (commandTag pgx.CommandTag, err error) { | |
var c *pgx.Conn | |
if c, err = p.AcquireEx(fn); err != nil { | |
return | |
} | |
defer p.Release(c) | |
return c.ExecEx(ctx, sql, options, arguments...) | |
} | |
// Query acquires a connection and delegates the call to that connection. When | |
// *Rows are closed, the connection is released automatically. | |
func (p *ConnPool) Query(fn AfterAcquireFunc, sql string, args ...interface{}) (*pgx.Rows, error) { | |
c, err := p.AcquireEx(fn) | |
if err != nil { | |
// Because checking for errors can be deferred to the *Rows, build one with the error | |
return &pgx.Rows{ /*closed: true, err: err*/ }, err | |
} | |
rows, err := c.Query(sql, args...) | |
if err != nil { | |
p.Release(c) | |
return rows, err | |
} | |
//rows.connPool = p | |
return rows, nil | |
} | |
func (p *ConnPool) QueryEx(ctx context.Context, fn AfterAcquireFunc, sql string, options *pgx.QueryExOptions, args ...interface{}) (*pgx.Rows, error) { | |
c, err := p.AcquireEx(fn) | |
if err != nil { | |
// Because checking for errors can be deferred to the *Rows, build one with the error | |
return &pgx.Rows{ /*closed: true, err: err*/ }, err | |
} | |
rows, err := c.QueryEx(ctx, sql, options, args...) | |
if err != nil { | |
p.Release(c) | |
return rows, err | |
} | |
//rows.connPool = p | |
return rows, nil | |
} | |
// QueryRow acquires a connection and delegates the call to that connection. The | |
// connection is released automatically after Scan is called on the returned | |
// *Row. | |
func (p *ConnPool) QueryRow(fn AfterAcquireFunc, sql string, args ...interface{}) *pgx.Row { | |
rows, _ := p.Query(fn, sql, args...) | |
return (*pgx.Row)(rows) | |
} | |
func (p *ConnPool) QueryRowEx(ctx context.Context, fn AfterAcquireFunc, sql string, options *pgx.QueryExOptions, args ...interface{}) *pgx.Row { | |
rows, _ := p.QueryEx(ctx, fn, sql, options, args...) | |
return (*pgx.Row)(rows) | |
} | |
// Begin acquires a connection and begins a transaction on it. When the | |
// transaction is closed the connection will be automatically released. | |
func (p *ConnPool) Begin(fn AfterAcquireFunc) (*pgx.Tx, error) { | |
return p.BeginEx(context.Background(), fn, nil) | |
} | |
// BeginEx acquires a connection and starts a transaction with txOptions | |
// determining the transaction mode. When the transaction is closed the | |
// connection will be automatically released. | |
func (p *ConnPool) BeginEx(ctx context.Context, fn AfterAcquireFunc, txOptions *pgx.TxOptions) (*pgx.Tx, error) { | |
for { | |
c, err := p.AcquireEx(fn) | |
if err != nil { | |
return nil, err | |
} | |
tx, err := c.BeginEx(ctx, txOptions) | |
if err != nil { | |
alive := c.IsAlive() | |
p.Release(c) | |
// If connection is still alive then the error is not something trying | |
// again on a new connection would fix, so just return the error. But | |
// if the connection is dead try to acquire a new connection and try | |
// again. | |
if alive || ctx.Err() != nil { | |
return nil, err | |
} | |
continue | |
} | |
//tx.connPool = p | |
return tx, nil | |
} | |
} | |
// CopyFrom acquires a connection, delegates the call to that connection, and releases the connection | |
func (p *ConnPool) CopyFrom(fn AfterAcquireFunc, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int, error) { | |
c, err := p.AcquireEx(fn) | |
if err != nil { | |
return 0, err | |
} | |
defer p.Release(c) | |
return c.CopyFrom(tableName, columnNames, rowSrc) | |
} | |
// BeginBatch acquires a connection and begins a batch on that connection. When | |
// *Batch is finished, the connection is released automatically. | |
func (p *ConnPool) BeginBatch(fn AfterAcquireFunc) *pgx.Batch { | |
//c, err := p.AcquireEx(fn) | |
return &pgx.Batch{ /*conn: c, connPool: p, err: err*/ } | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment