mirror of
https://github.com/golang/go
synced 2024-11-12 01:00:22 -07:00
Revert "database/sql: add driver.ResetSessioner and add pool support"
This reverts commit 2620ac3aea
.
Reason for revert: broke all the builds.
Change-Id: I26fc09a13f5f80fa708de66c843442ff9d934694
Reviewed-on: https://go-review.googlesource.com/73050
Reviewed-by: Russ Cox <rsc@golang.org>
This commit is contained in:
parent
8c58900aeb
commit
3b9d947b2f
@ -222,18 +222,6 @@ type ConnBeginTx interface {
|
||||
BeginTx(ctx context.Context, opts TxOptions) (Tx, error)
|
||||
}
|
||||
|
||||
// ResetSessioner may be implemented by Conn to allow drivers to reset the
|
||||
// session state associated with the connection and to signal a bad connection.
|
||||
type ResetSessioner interface {
|
||||
// ResetSession is called while a connection is in the connection
|
||||
// pool. No queries will run on this connection until this method returns.
|
||||
//
|
||||
// If the connection is bad this should return driver.ErrBadConn to prevent
|
||||
// the connection from being returned to the connection pool. Any other
|
||||
// error will be discarded.
|
||||
ResetSession(ctx context.Context) error
|
||||
}
|
||||
|
||||
// Result is the result of a query execution.
|
||||
type Result interface {
|
||||
// LastInsertId returns the database's auto-generated ID
|
||||
|
@ -55,22 +55,6 @@ type fakeDriver struct {
|
||||
dbs map[string]*fakeDB
|
||||
}
|
||||
|
||||
type fakeConnector struct {
|
||||
name string
|
||||
|
||||
waiter func(context.Context)
|
||||
}
|
||||
|
||||
func (c *fakeConnector) Connect(context.Context) (driver.Conn, error) {
|
||||
conn, err := fdriver.Open(c.name)
|
||||
conn.(*fakeConn).waiter = c.waiter
|
||||
return conn, err
|
||||
}
|
||||
|
||||
func (c *fakeConnector) Driver() driver.Driver {
|
||||
return fdriver
|
||||
}
|
||||
|
||||
type fakeDB struct {
|
||||
name string
|
||||
|
||||
@ -123,16 +107,6 @@ type fakeConn struct {
|
||||
// bad connection tests; see isBad()
|
||||
bad bool
|
||||
stickyBad bool
|
||||
|
||||
skipDirtySession bool // tests that use Conn should set this to true.
|
||||
|
||||
// dirtySession tests ResetSession, true if a query has executed
|
||||
// until ResetSession is called.
|
||||
dirtySession bool
|
||||
|
||||
// The waiter is called before each query. May be used in place of the "WAIT"
|
||||
// directive.
|
||||
waiter func(context.Context)
|
||||
}
|
||||
|
||||
func (c *fakeConn) touchMem() {
|
||||
@ -324,9 +298,6 @@ func (c *fakeConn) isBad() bool {
|
||||
if c.stickyBad {
|
||||
return true
|
||||
} else if c.bad {
|
||||
if c.db == nil {
|
||||
return false
|
||||
}
|
||||
// alternate between bad conn and not bad conn
|
||||
c.db.badConn = !c.db.badConn
|
||||
return c.db.badConn
|
||||
@ -335,21 +306,6 @@ func (c *fakeConn) isBad() bool {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *fakeConn) isDirtyAndMark() bool {
|
||||
if c.skipDirtySession {
|
||||
return false
|
||||
}
|
||||
if c.currTx != nil {
|
||||
c.dirtySession = true
|
||||
return false
|
||||
}
|
||||
if c.dirtySession {
|
||||
return true
|
||||
}
|
||||
c.dirtySession = true
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *fakeConn) Begin() (driver.Tx, error) {
|
||||
if c.isBad() {
|
||||
return nil, driver.ErrBadConn
|
||||
@ -381,14 +337,6 @@ func setStrictFakeConnClose(t *testing.T) {
|
||||
testStrictClose = t
|
||||
}
|
||||
|
||||
func (c *fakeConn) ResetSession(ctx context.Context) error {
|
||||
c.dirtySession = false
|
||||
if c.isBad() {
|
||||
return driver.ErrBadConn
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *fakeConn) Close() (err error) {
|
||||
drv := fdriver.(*fakeDriver)
|
||||
defer func() {
|
||||
@ -624,10 +572,6 @@ func (c *fakeConn) PrepareContext(ctx context.Context, query string) (driver.Stm
|
||||
stmt.cmd = cmd
|
||||
parts = parts[1:]
|
||||
|
||||
if c.waiter != nil {
|
||||
c.waiter(ctx)
|
||||
}
|
||||
|
||||
if stmt.wait > 0 {
|
||||
wait := time.NewTimer(stmt.wait)
|
||||
select {
|
||||
@ -718,9 +662,6 @@ func (s *fakeStmt) ExecContext(ctx context.Context, args []driver.NamedValue) (d
|
||||
if s.c.stickyBad || (hookExecBadConn != nil && hookExecBadConn()) {
|
||||
return nil, driver.ErrBadConn
|
||||
}
|
||||
if s.c.isDirtyAndMark() {
|
||||
return nil, errors.New("session is dirty")
|
||||
}
|
||||
|
||||
err := checkSubsetTypes(s.c.db.allowAny, args)
|
||||
if err != nil {
|
||||
@ -833,9 +774,6 @@ func (s *fakeStmt) QueryContext(ctx context.Context, args []driver.NamedValue) (
|
||||
if s.c.stickyBad || (hookQueryBadConn != nil && hookQueryBadConn()) {
|
||||
return nil, driver.ErrBadConn
|
||||
}
|
||||
if s.c.isDirtyAndMark() {
|
||||
return nil, errors.New("session is dirty")
|
||||
}
|
||||
|
||||
err := checkSubsetTypes(s.c.db.allowAny, args)
|
||||
if err != nil {
|
||||
|
@ -334,7 +334,6 @@ type DB struct {
|
||||
// It is closed during db.Close(). The close tells the connectionOpener
|
||||
// goroutine to exit.
|
||||
openerCh chan struct{}
|
||||
resetterCh chan *driverConn
|
||||
closed bool
|
||||
dep map[finalCloser]depSet
|
||||
lastPut map[*driverConn]string // stacktrace of last conn's put; debug only
|
||||
@ -342,8 +341,6 @@ type DB struct {
|
||||
maxOpen int // <= 0 means unlimited
|
||||
maxLifetime time.Duration // maximum amount of time a connection may be reused
|
||||
cleanerCh chan struct{}
|
||||
|
||||
stop func() // stop cancels the connection opener and the session resetter.
|
||||
}
|
||||
|
||||
// connReuseStrategy determines how (*DB).conn returns database connections.
|
||||
@ -371,7 +368,6 @@ type driverConn struct {
|
||||
closed bool
|
||||
finalClosed bool // ci.Close has been called
|
||||
openStmt map[*driverStmt]bool
|
||||
lastErr error // lastError captures the result of the session resetter.
|
||||
|
||||
// guarded by db.mu
|
||||
inUse bool
|
||||
@ -380,7 +376,7 @@ type driverConn struct {
|
||||
}
|
||||
|
||||
func (dc *driverConn) releaseConn(err error) {
|
||||
dc.db.putConn(dc, err, true)
|
||||
dc.db.putConn(dc, err)
|
||||
}
|
||||
|
||||
func (dc *driverConn) removeOpenStmt(ds *driverStmt) {
|
||||
@ -421,19 +417,6 @@ func (dc *driverConn) prepareLocked(ctx context.Context, cg stmtConnGrabber, que
|
||||
return ds, nil
|
||||
}
|
||||
|
||||
// resetSession resets the connection session and sets the lastErr
|
||||
// that is checked before returning the connection to another query.
|
||||
//
|
||||
// resetSession assumes that the embedded mutex is locked when the connection
|
||||
// was returned to the pool. This unlocks the mutex.
|
||||
func (dc *driverConn) resetSession(ctx context.Context) {
|
||||
defer dc.Unlock() // In case of panic.
|
||||
if dc.closed { // Check if the database has been closed.
|
||||
return
|
||||
}
|
||||
dc.lastErr = dc.ci.(driver.ResetSessioner).ResetSession(ctx)
|
||||
}
|
||||
|
||||
// the dc.db's Mutex is held.
|
||||
func (dc *driverConn) closeDBLocked() func() error {
|
||||
dc.Lock()
|
||||
@ -621,18 +604,14 @@ func (t dsnConnector) Driver() driver.Driver {
|
||||
// function should be called just once. It is rarely necessary to
|
||||
// close a DB.
|
||||
func OpenDB(c driver.Connector) *DB {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
db := &DB{
|
||||
connector: c,
|
||||
openerCh: make(chan struct{}, connectionRequestQueueSize),
|
||||
resetterCh: make(chan *driverConn, 50),
|
||||
lastPut: make(map[*driverConn]string),
|
||||
connRequests: make(map[uint64]chan connRequest),
|
||||
stop: cancel,
|
||||
}
|
||||
|
||||
go db.connectionOpener(ctx)
|
||||
go db.connectionResetter(ctx)
|
||||
go db.connectionOpener()
|
||||
|
||||
return db
|
||||
}
|
||||
@ -714,6 +693,7 @@ func (db *DB) Close() error {
|
||||
db.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
close(db.openerCh)
|
||||
if db.cleanerCh != nil {
|
||||
close(db.cleanerCh)
|
||||
}
|
||||
@ -734,7 +714,6 @@ func (db *DB) Close() error {
|
||||
err = err1
|
||||
}
|
||||
}
|
||||
db.stop()
|
||||
return err
|
||||
}
|
||||
|
||||
@ -922,39 +901,18 @@ func (db *DB) maybeOpenNewConnections() {
|
||||
}
|
||||
|
||||
// Runs in a separate goroutine, opens new connections when requested.
|
||||
func (db *DB) connectionOpener(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-db.openerCh:
|
||||
db.openNewConnection(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// connectionResetter runs in a separate goroutine to reset connections async
|
||||
// to exported API.
|
||||
func (db *DB) connectionResetter(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
for dc := range db.resetterCh {
|
||||
dc.Unlock()
|
||||
}
|
||||
return
|
||||
case dc := <-db.resetterCh:
|
||||
dc.resetSession(ctx)
|
||||
}
|
||||
func (db *DB) connectionOpener() {
|
||||
for range db.openerCh {
|
||||
db.openNewConnection()
|
||||
}
|
||||
}
|
||||
|
||||
// Open one new connection
|
||||
func (db *DB) openNewConnection(ctx context.Context) {
|
||||
func (db *DB) openNewConnection() {
|
||||
// maybeOpenNewConnctions has already executed db.numOpen++ before it sent
|
||||
// on db.openerCh. This function must execute db.numOpen-- if the
|
||||
// connection fails or is closed before returning.
|
||||
ci, err := db.connector.Connect(ctx)
|
||||
ci, err := db.connector.Connect(context.Background())
|
||||
db.mu.Lock()
|
||||
defer db.mu.Unlock()
|
||||
if db.closed {
|
||||
@ -1029,14 +987,6 @@ func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn
|
||||
conn.Close()
|
||||
return nil, driver.ErrBadConn
|
||||
}
|
||||
// Lock around reading lastErr to ensure the session resetter finished.
|
||||
conn.Lock()
|
||||
err := conn.lastErr
|
||||
conn.Unlock()
|
||||
if err == driver.ErrBadConn {
|
||||
conn.Close()
|
||||
return nil, driver.ErrBadConn
|
||||
}
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
@ -1062,7 +1012,7 @@ func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn
|
||||
default:
|
||||
case ret, ok := <-req:
|
||||
if ok {
|
||||
db.putConn(ret.conn, ret.err, false)
|
||||
db.putConn(ret.conn, ret.err)
|
||||
}
|
||||
}
|
||||
return nil, ctx.Err()
|
||||
@ -1074,17 +1024,6 @@ func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn
|
||||
ret.conn.Close()
|
||||
return nil, driver.ErrBadConn
|
||||
}
|
||||
if ret.conn == nil {
|
||||
return nil, ret.err
|
||||
}
|
||||
// Lock around reading lastErr to ensure the session resetter finished.
|
||||
ret.conn.Lock()
|
||||
err := ret.conn.lastErr
|
||||
ret.conn.Unlock()
|
||||
if err == driver.ErrBadConn {
|
||||
ret.conn.Close()
|
||||
return nil, driver.ErrBadConn
|
||||
}
|
||||
return ret.conn, ret.err
|
||||
}
|
||||
}
|
||||
@ -1140,7 +1079,7 @@ const debugGetPut = false
|
||||
|
||||
// putConn adds a connection to the db's free pool.
|
||||
// err is optionally the last error that occurred on this connection.
|
||||
func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
|
||||
func (db *DB) putConn(dc *driverConn, err error) {
|
||||
db.mu.Lock()
|
||||
if !dc.inUse {
|
||||
if debugGetPut {
|
||||
@ -1171,35 +1110,11 @@ func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
|
||||
if putConnHook != nil {
|
||||
putConnHook(db, dc)
|
||||
}
|
||||
if resetSession {
|
||||
if _, resetSession = dc.ci.(driver.ResetSessioner); resetSession {
|
||||
// Lock the driverConn here so it isn't released until
|
||||
// the connection is reset.
|
||||
// The lock must be taken before the connection is put into
|
||||
// the pool to prevent it from being taken out before it is reset.
|
||||
dc.Lock()
|
||||
}
|
||||
}
|
||||
added := db.putConnDBLocked(dc, nil)
|
||||
db.mu.Unlock()
|
||||
|
||||
if !added {
|
||||
if resetSession {
|
||||
dc.Unlock()
|
||||
}
|
||||
dc.Close()
|
||||
return
|
||||
}
|
||||
if !resetSession {
|
||||
return
|
||||
}
|
||||
select {
|
||||
default:
|
||||
// If the resetterCh is blocking then mark the connection
|
||||
// as bad and continue on.
|
||||
dc.lastErr = driver.ErrBadConn
|
||||
dc.Unlock()
|
||||
case db.resetterCh <- dc:
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -60,12 +60,10 @@ const fakeDBName = "foo"
|
||||
var chrisBirthday = time.Unix(123456789, 0)
|
||||
|
||||
func newTestDB(t testing.TB, name string) *DB {
|
||||
return newTestDBConnector(t, &fakeConnector{name: fakeDBName}, name)
|
||||
}
|
||||
|
||||
func newTestDBConnector(t testing.TB, fc *fakeConnector, name string) *DB {
|
||||
fc.name = fakeDBName
|
||||
db := OpenDB(fc)
|
||||
db, err := Open("test", fakeDBName)
|
||||
if err != nil {
|
||||
t.Fatalf("Open: %v", err)
|
||||
}
|
||||
if _, err := db.Exec("WIPE"); err != nil {
|
||||
t.Fatalf("exec wipe: %v", err)
|
||||
}
|
||||
@ -587,46 +585,24 @@ func TestPoolExhaustOnCancel(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("long test")
|
||||
}
|
||||
db := newTestDB(t, "people")
|
||||
defer closeDB(t, db)
|
||||
|
||||
max := 3
|
||||
var saturate, saturateDone sync.WaitGroup
|
||||
saturate.Add(max)
|
||||
saturateDone.Add(max)
|
||||
|
||||
donePing := make(chan bool)
|
||||
state := 0
|
||||
|
||||
// waiter will be called for all queries, including
|
||||
// initial setup queries. The state is only assigned when no
|
||||
// no queries are made.
|
||||
//
|
||||
// Only allow the first batch of queries to finish once the
|
||||
// second batch of Ping queries have finished.
|
||||
waiter := func(ctx context.Context) {
|
||||
switch state {
|
||||
case 0:
|
||||
// Nothing. Initial database setup.
|
||||
case 1:
|
||||
saturate.Done()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-donePing:
|
||||
}
|
||||
case 2:
|
||||
}
|
||||
}
|
||||
db := newTestDBConnector(t, &fakeConnector{waiter: waiter}, "people")
|
||||
defer closeDB(t, db)
|
||||
|
||||
db.SetMaxOpenConns(max)
|
||||
|
||||
// First saturate the connection pool.
|
||||
// Then start new requests for a connection that is cancelled after it is requested.
|
||||
|
||||
state = 1
|
||||
var saturate, saturateDone sync.WaitGroup
|
||||
saturate.Add(max)
|
||||
saturateDone.Add(max)
|
||||
|
||||
for i := 0; i < max; i++ {
|
||||
go func() {
|
||||
rows, err := db.Query("SELECT|people|name,photo|")
|
||||
saturate.Done()
|
||||
rows, err := db.Query("WAIT|500ms|SELECT|people|name,photo|")
|
||||
if err != nil {
|
||||
t.Fatalf("Query: %v", err)
|
||||
}
|
||||
@ -636,7 +612,6 @@ func TestPoolExhaustOnCancel(t *testing.T) {
|
||||
}
|
||||
|
||||
saturate.Wait()
|
||||
state = 2
|
||||
|
||||
// Now cancel the request while it is waiting.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
@ -653,7 +628,7 @@ func TestPoolExhaustOnCancel(t *testing.T) {
|
||||
t.Fatalf("PingContext (Exhaust): %v", err)
|
||||
}
|
||||
}
|
||||
close(donePing)
|
||||
|
||||
saturateDone.Wait()
|
||||
|
||||
// Now try to open a normal connection.
|
||||
@ -1357,7 +1332,6 @@ func TestConnQuery(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
conn.dc.ci.(*fakeConn).skipDirtySession = true
|
||||
defer conn.Close()
|
||||
|
||||
var name string
|
||||
@ -1385,7 +1359,6 @@ func TestConnTx(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
conn.dc.ci.(*fakeConn).skipDirtySession = true
|
||||
defer conn.Close()
|
||||
|
||||
tx, err := conn.BeginTx(ctx, nil)
|
||||
@ -2411,9 +2384,7 @@ func TestManyErrBadConn(t *testing.T) {
|
||||
t.Fatalf("unexpected len(db.freeConn) %d (was expecting %d)", len(db.freeConn), nconn)
|
||||
}
|
||||
for _, conn := range db.freeConn {
|
||||
conn.Lock()
|
||||
conn.ci.(*fakeConn).stickyBad = true
|
||||
conn.Unlock()
|
||||
}
|
||||
return db
|
||||
}
|
||||
@ -2503,7 +2474,6 @@ func TestManyErrBadConn(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
conn.dc.ci.(*fakeConn).skipDirtySession = true
|
||||
err = conn.Close()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -3268,8 +3238,9 @@ func TestIssue18719(t *testing.T) {
|
||||
|
||||
// This call will grab the connection and cancel the context
|
||||
// after it has done so. Code after must deal with the canceled state.
|
||||
_, err = tx.QueryContext(ctx, "SELECT|people|name|")
|
||||
rows, err := tx.QueryContext(ctx, "SELECT|people|name|")
|
||||
if err != nil {
|
||||
rows.Close()
|
||||
t.Fatalf("expected error %v but got %v", nil, err)
|
||||
}
|
||||
|
||||
@ -3292,7 +3263,6 @@ func TestIssue20647(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
conn.dc.ci.(*fakeConn).skipDirtySession = true
|
||||
defer conn.Close()
|
||||
|
||||
stmt, err := conn.PrepareContext(ctx, "SELECT|people|name|")
|
||||
|
Loading…
Reference in New Issue
Block a user