diff --git a/src/database/sql/sql.go b/src/database/sql/sql.go index 2b84cea374..09d61f1287 100644 --- a/src/database/sql/sql.go +++ b/src/database/sql/sql.go @@ -583,6 +583,17 @@ func Open(driverName, dataSourceName string) (*DB, error) { return db, nil } +func (db *DB) pingDC(ctx context.Context, dc *driverConn, release func(error)) error { + var err error + if pinger, ok := dc.ci.(driver.Pinger); ok { + withLock(dc, func() { + err = pinger.Ping(ctx) + }) + } + release(err) + return err +} + // PingContext verifies a connection to the database is still alive, // establishing a connection if necessary. func (db *DB) PingContext(ctx context.Context) error { @@ -602,11 +613,7 @@ func (db *DB) PingContext(ctx context.Context) error { return err } - if pinger, ok := dc.ci.(driver.Pinger); ok { - err = pinger.Ping(ctx) - } - dc.releaseConn(err) - return err + return db.pingDC(ctx, dc, dc.releaseConn) } // Ping verifies a connection to the database is still alive, @@ -1404,6 +1411,189 @@ func (db *DB) Driver() driver.Driver { return db.driver } +// ErrConnDone is returned by any operation that is performed on a connection +// that has already been committed or rolled back. +var ErrConnDone = errors.New("database/sql: connection is already closed") + +// Conn returns a single connection by either opening a new connection +// or returning an existing connection from the connection pool. Conn will +// block until either a connection is returned or ctx is canceled. +// Queries run on the same Conn will be run in the same database session. +// +// Every Conn must be returned to the database pool after use by +// calling Conn.Close. +func (db *DB) Conn(ctx context.Context) (*Conn, error) { + var dc *driverConn + var err error + for i := 0; i < maxBadConnRetries; i++ { + dc, err = db.conn(ctx, cachedOrNewConn) + if err != driver.ErrBadConn { + break + } + } + if err == driver.ErrBadConn { + dc, err = db.conn(ctx, cachedOrNewConn) + } + if err != nil { + return nil, err + } + + conn := &Conn{ + db: db, + dc: dc, + } + return conn, nil +} + +// Conn represents a single database session rather a pool of database +// sessions. Prefer running queries from DB unless there is a specific +// need for a continuous single database session. +// +// A Conn must call Close to return the connection to the database pool +// and may do so concurrently with a running query. +// +// After a call to Close, all operations on the +// connection fail with ErrConnDone. +type Conn struct { + db *DB + + // closemu prevents the connection from closing while there + // is an active query. It is held for read during queries + // and exclusively during close. + closemu sync.RWMutex + + // dc is owned until close, at which point + // it's returned to the connection pool. + dc *driverConn + + // done transitions from 0 to 1 exactly once, on close. + // Once done, all operations fail with ErrConnDone. + // Use atomic operations on value when checking value. + done int32 +} + +func (c *Conn) grabConn() (*driverConn, error) { + if atomic.LoadInt32(&c.done) != 0 { + return nil, ErrConnDone + } + return c.dc, nil +} + +// PingContext verifies the connection to the database is still alive. +func (c *Conn) PingContext(ctx context.Context) error { + dc, err := c.grabConn() + if err != nil { + return err + } + + c.closemu.RLock() + return c.db.pingDC(ctx, dc, c.closemuRUnlockCondReleaseConn) +} + +// ExecContext executes a query without returning any rows. +// The args are for any placeholder parameters in the query. +func (c *Conn) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error) { + dc, err := c.grabConn() + if err != nil { + return nil, err + } + + c.closemu.RLock() + return c.db.execDC(ctx, dc, c.closemuRUnlockCondReleaseConn, query, args) +} + +// QueryContext executes a query that returns rows, typically a SELECT. +// The args are for any placeholder parameters in the query. +func (c *Conn) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) { + dc, err := c.grabConn() + if err != nil { + return nil, err + } + + c.closemu.RLock() + return c.db.queryDC(ctx, dc, c.closemuRUnlockCondReleaseConn, query, args) +} + +// QueryRowContext executes a query that is expected to return at most one row. +// QueryRowContext always returns a non-nil value. Errors are deferred until +// Row's Scan method is called. +func (c *Conn) QueryRowContext(ctx context.Context, query string, args ...interface{}) *Row { + rows, err := c.QueryContext(ctx, query, args...) + return &Row{rows: rows, err: err} +} + +// PrepareContext creates a prepared statement for later queries or executions. +// Multiple queries or executions may be run concurrently from the +// returned statement. +// The caller must call the statement's Close method +// when the statement is no longer needed. +// +// The provided context is used for the preparation of the statement, not for the +// execution of the statement. +func (c *Conn) PrepareContext(ctx context.Context, query string) (*Stmt, error) { + dc, err := c.grabConn() + if err != nil { + return nil, err + } + + c.closemu.RLock() + return c.db.prepareDC(ctx, dc, c.closemuRUnlockCondReleaseConn, query) +} + +// BeginTx starts a transaction. +// +// The provided context is used until the transaction is committed or rolled back. +// If the context is canceled, the sql package will roll back +// the transaction. Tx.Commit will return an error if the context provided to +// BeginTx is canceled. +// +// The provided TxOptions is optional and may be nil if defaults should be used. +// If a non-default isolation level is used that the driver doesn't support, +// an error will be returned. +func (c *Conn) BeginTx(ctx context.Context, opts *TxOptions) (*Tx, error) { + dc, err := c.grabConn() + if err != nil { + return nil, err + } + + c.closemu.RLock() + return c.db.beginDC(ctx, dc, c.closemuRUnlockCondReleaseConn, opts) +} + +// closemuRUnlockCondReleaseConn read unlocks closemu +// as the sql operation is done with the dc. +func (c *Conn) closemuRUnlockCondReleaseConn(err error) { + c.closemu.RUnlock() + if err == driver.ErrBadConn { + c.close(err) + } +} + +func (c *Conn) close(err error) error { + if !atomic.CompareAndSwapInt32(&c.done, 0, 1) { + return ErrConnDone + } + + // Lock around releasing the driver connection + // to ensure all queries have been stopped before doing so. + c.closemu.Lock() + defer c.closemu.Unlock() + + c.dc.releaseConn(err) + c.dc = nil + c.db = nil + return err +} + +// Close returns the connection to the connection pool. +// All operations after a Close will return with ErrConnDone. +// Close is safe to call concurrently with other operations and will +// block until all other operations finish. It may be useful to first +// cancel any used context and then call close directly after. +func (c *Conn) Close() error { + return c.close(nil) +} + // Tx is an in-progress database transaction. // // A transaction must end with a call to Commit or Rollback. diff --git a/src/database/sql/sql_test.go b/src/database/sql/sql_test.go index b5a1f850bd..5ea965fb28 100644 --- a/src/database/sql/sql_test.go +++ b/src/database/sql/sql_test.go @@ -139,6 +139,7 @@ func closeDB(t testing.TB, db *DB) { t.Errorf("Error closing fakeConn: %v", err) } }) + db.mu.Lock() for i, dc := range db.freeConn { if n := len(dc.openStmt); n > 0 { // Just a sanity check. This is legal in @@ -149,6 +150,8 @@ func closeDB(t testing.TB, db *DB) { t.Errorf("while closing db, freeConn %d/%d had %d open stmts; want 0", i, len(db.freeConn), n) } } + db.mu.Unlock() + err := db.Close() if err != nil { t.Fatalf("error closing DB: %v", err) @@ -1298,6 +1301,69 @@ func TestTxErrBadConn(t *testing.T) { } } +func TestConnQuery(t *testing.T) { + db := newTestDB(t, "people") + defer closeDB(t, db) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + conn, err := db.Conn(ctx) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + var name string + err = conn.QueryRowContext(ctx, "SELECT|people|name|age=?", 3).Scan(&name) + if err != nil { + t.Fatal(err) + } + if name != "Chris" { + t.Fatalf("unexpected result, got %q want Chris", name) + } + + err = conn.PingContext(ctx) + if err != nil { + t.Fatal(err) + } +} + +func TestConnTx(t *testing.T) { + db := newTestDB(t, "people") + defer closeDB(t, db) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + conn, err := db.Conn(ctx) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + t.Fatal(err) + } + insertName, insertAge := "Nancy", 33 + _, err = tx.ExecContext(ctx, "INSERT|people|name=?,age=?,photo=APHOTO", insertName, insertAge) + if err != nil { + t.Fatal(err) + } + err = tx.Commit() + if err != nil { + t.Fatal(err) + } + + var selectName string + err = conn.QueryRowContext(ctx, "SELECT|people|name|age=?", insertAge).Scan(&selectName) + if err != nil { + t.Fatal(err) + } + if selectName != insertName { + t.Fatalf("got %q want %q", selectName, insertName) + } +} + // Tests fix for issue 2542, that we release a lock when querying on // a closed connection. func TestIssue2542Deadlock(t *testing.T) { @@ -2338,6 +2404,28 @@ func TestManyErrBadConn(t *testing.T) { if err = stmt.Close(); err != nil { t.Fatal(err) } + + // Conn + db = manyErrBadConnSetup() + defer closeDB(t, db) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + conn, err := db.Conn(ctx) + if err != nil { + t.Fatal(err) + } + err = conn.Close() + if err != nil { + t.Fatal(err) + } + + // Ping + db = manyErrBadConnSetup() + defer closeDB(t, db) + err = db.PingContext(ctx) + if err != nil { + t.Fatal(err) + } } // golang.org/issue/5718