diff --git a/src/database/sql/driver/driver.go b/src/database/sql/driver/driver.go index 6113af79c5..f5a2e7c16c 100644 --- a/src/database/sql/driver/driver.go +++ b/src/database/sql/driver/driver.go @@ -222,18 +222,6 @@ type ConnBeginTx interface { BeginTx(ctx context.Context, opts TxOptions) (Tx, error) } -// ResetSessioner may be implemented by Conn to allow drivers to reset the -// session state associated with the connection and to signal a bad connection. -type ResetSessioner interface { - // ResetSession is called while a connection is in the connection - // pool. No queries will run on this connection until this method returns. - // - // If the connection is bad this should return driver.ErrBadConn to prevent - // the connection from being returned to the connection pool. Any other - // error will be discarded. - ResetSession(ctx context.Context) error -} - // Result is the result of a query execution. type Result interface { // LastInsertId returns the database's auto-generated ID diff --git a/src/database/sql/fakedb_test.go b/src/database/sql/fakedb_test.go index 070b783453..4dcd096ca4 100644 --- a/src/database/sql/fakedb_test.go +++ b/src/database/sql/fakedb_test.go @@ -55,22 +55,6 @@ type fakeDriver struct { dbs map[string]*fakeDB } -type fakeConnector struct { - name string - - waiter func(context.Context) -} - -func (c *fakeConnector) Connect(context.Context) (driver.Conn, error) { - conn, err := fdriver.Open(c.name) - conn.(*fakeConn).waiter = c.waiter - return conn, err -} - -func (c *fakeConnector) Driver() driver.Driver { - return fdriver -} - type fakeDB struct { name string @@ -123,16 +107,6 @@ type fakeConn struct { // bad connection tests; see isBad() bad bool stickyBad bool - - skipDirtySession bool // tests that use Conn should set this to true. - - // dirtySession tests ResetSession, true if a query has executed - // until ResetSession is called. - dirtySession bool - - // The waiter is called before each query. May be used in place of the "WAIT" - // directive. - waiter func(context.Context) } func (c *fakeConn) touchMem() { @@ -324,9 +298,6 @@ func (c *fakeConn) isBad() bool { if c.stickyBad { return true } else if c.bad { - if c.db == nil { - return false - } // alternate between bad conn and not bad conn c.db.badConn = !c.db.badConn return c.db.badConn @@ -335,21 +306,6 @@ func (c *fakeConn) isBad() bool { } } -func (c *fakeConn) isDirtyAndMark() bool { - if c.skipDirtySession { - return false - } - if c.currTx != nil { - c.dirtySession = true - return false - } - if c.dirtySession { - return true - } - c.dirtySession = true - return false -} - func (c *fakeConn) Begin() (driver.Tx, error) { if c.isBad() { return nil, driver.ErrBadConn @@ -381,14 +337,6 @@ func setStrictFakeConnClose(t *testing.T) { testStrictClose = t } -func (c *fakeConn) ResetSession(ctx context.Context) error { - c.dirtySession = false - if c.isBad() { - return driver.ErrBadConn - } - return nil -} - func (c *fakeConn) Close() (err error) { drv := fdriver.(*fakeDriver) defer func() { @@ -624,10 +572,6 @@ func (c *fakeConn) PrepareContext(ctx context.Context, query string) (driver.Stm stmt.cmd = cmd parts = parts[1:] - if c.waiter != nil { - c.waiter(ctx) - } - if stmt.wait > 0 { wait := time.NewTimer(stmt.wait) select { @@ -718,9 +662,6 @@ func (s *fakeStmt) ExecContext(ctx context.Context, args []driver.NamedValue) (d if s.c.stickyBad || (hookExecBadConn != nil && hookExecBadConn()) { return nil, driver.ErrBadConn } - if s.c.isDirtyAndMark() { - return nil, errors.New("session is dirty") - } err := checkSubsetTypes(s.c.db.allowAny, args) if err != nil { @@ -833,9 +774,6 @@ func (s *fakeStmt) QueryContext(ctx context.Context, args []driver.NamedValue) ( if s.c.stickyBad || (hookQueryBadConn != nil && hookQueryBadConn()) { return nil, driver.ErrBadConn } - if s.c.isDirtyAndMark() { - return nil, errors.New("session is dirty") - } err := checkSubsetTypes(s.c.db.allowAny, args) if err != nil { diff --git a/src/database/sql/sql.go b/src/database/sql/sql.go index c17b2b543b..7c35710688 100644 --- a/src/database/sql/sql.go +++ b/src/database/sql/sql.go @@ -334,7 +334,6 @@ type DB struct { // It is closed during db.Close(). The close tells the connectionOpener // goroutine to exit. openerCh chan struct{} - resetterCh chan *driverConn closed bool dep map[finalCloser]depSet lastPut map[*driverConn]string // stacktrace of last conn's put; debug only @@ -342,8 +341,6 @@ type DB struct { maxOpen int // <= 0 means unlimited maxLifetime time.Duration // maximum amount of time a connection may be reused cleanerCh chan struct{} - - stop func() // stop cancels the connection opener and the session resetter. } // connReuseStrategy determines how (*DB).conn returns database connections. @@ -371,7 +368,6 @@ type driverConn struct { closed bool finalClosed bool // ci.Close has been called openStmt map[*driverStmt]bool - lastErr error // lastError captures the result of the session resetter. // guarded by db.mu inUse bool @@ -380,7 +376,7 @@ type driverConn struct { } func (dc *driverConn) releaseConn(err error) { - dc.db.putConn(dc, err, true) + dc.db.putConn(dc, err) } func (dc *driverConn) removeOpenStmt(ds *driverStmt) { @@ -421,19 +417,6 @@ func (dc *driverConn) prepareLocked(ctx context.Context, cg stmtConnGrabber, que return ds, nil } -// resetSession resets the connection session and sets the lastErr -// that is checked before returning the connection to another query. -// -// resetSession assumes that the embedded mutex is locked when the connection -// was returned to the pool. This unlocks the mutex. -func (dc *driverConn) resetSession(ctx context.Context) { - defer dc.Unlock() // In case of panic. - if dc.closed { // Check if the database has been closed. - return - } - dc.lastErr = dc.ci.(driver.ResetSessioner).ResetSession(ctx) -} - // the dc.db's Mutex is held. func (dc *driverConn) closeDBLocked() func() error { dc.Lock() @@ -621,18 +604,14 @@ func (t dsnConnector) Driver() driver.Driver { // function should be called just once. It is rarely necessary to // close a DB. func OpenDB(c driver.Connector) *DB { - ctx, cancel := context.WithCancel(context.Background()) db := &DB{ connector: c, openerCh: make(chan struct{}, connectionRequestQueueSize), - resetterCh: make(chan *driverConn, 50), lastPut: make(map[*driverConn]string), connRequests: make(map[uint64]chan connRequest), - stop: cancel, } - go db.connectionOpener(ctx) - go db.connectionResetter(ctx) + go db.connectionOpener() return db } @@ -714,6 +693,7 @@ func (db *DB) Close() error { db.mu.Unlock() return nil } + close(db.openerCh) if db.cleanerCh != nil { close(db.cleanerCh) } @@ -734,7 +714,6 @@ func (db *DB) Close() error { err = err1 } } - db.stop() return err } @@ -922,39 +901,18 @@ func (db *DB) maybeOpenNewConnections() { } // Runs in a separate goroutine, opens new connections when requested. -func (db *DB) connectionOpener(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case <-db.openerCh: - db.openNewConnection(ctx) - } - } -} - -// connectionResetter runs in a separate goroutine to reset connections async -// to exported API. -func (db *DB) connectionResetter(ctx context.Context) { - for { - select { - case <-ctx.Done(): - for dc := range db.resetterCh { - dc.Unlock() - } - return - case dc := <-db.resetterCh: - dc.resetSession(ctx) - } +func (db *DB) connectionOpener() { + for range db.openerCh { + db.openNewConnection() } } // Open one new connection -func (db *DB) openNewConnection(ctx context.Context) { +func (db *DB) openNewConnection() { // maybeOpenNewConnctions has already executed db.numOpen++ before it sent // on db.openerCh. This function must execute db.numOpen-- if the // connection fails or is closed before returning. - ci, err := db.connector.Connect(ctx) + ci, err := db.connector.Connect(context.Background()) db.mu.Lock() defer db.mu.Unlock() if db.closed { @@ -1029,14 +987,6 @@ func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn conn.Close() return nil, driver.ErrBadConn } - // Lock around reading lastErr to ensure the session resetter finished. - conn.Lock() - err := conn.lastErr - conn.Unlock() - if err == driver.ErrBadConn { - conn.Close() - return nil, driver.ErrBadConn - } return conn, nil } @@ -1062,7 +1012,7 @@ func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn default: case ret, ok := <-req: if ok { - db.putConn(ret.conn, ret.err, false) + db.putConn(ret.conn, ret.err) } } return nil, ctx.Err() @@ -1074,17 +1024,6 @@ func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn ret.conn.Close() return nil, driver.ErrBadConn } - if ret.conn == nil { - return nil, ret.err - } - // Lock around reading lastErr to ensure the session resetter finished. - ret.conn.Lock() - err := ret.conn.lastErr - ret.conn.Unlock() - if err == driver.ErrBadConn { - ret.conn.Close() - return nil, driver.ErrBadConn - } return ret.conn, ret.err } } @@ -1140,7 +1079,7 @@ const debugGetPut = false // putConn adds a connection to the db's free pool. // err is optionally the last error that occurred on this connection. -func (db *DB) putConn(dc *driverConn, err error, resetSession bool) { +func (db *DB) putConn(dc *driverConn, err error) { db.mu.Lock() if !dc.inUse { if debugGetPut { @@ -1171,35 +1110,11 @@ func (db *DB) putConn(dc *driverConn, err error, resetSession bool) { if putConnHook != nil { putConnHook(db, dc) } - if resetSession { - if _, resetSession = dc.ci.(driver.ResetSessioner); resetSession { - // Lock the driverConn here so it isn't released until - // the connection is reset. - // The lock must be taken before the connection is put into - // the pool to prevent it from being taken out before it is reset. - dc.Lock() - } - } added := db.putConnDBLocked(dc, nil) db.mu.Unlock() if !added { - if resetSession { - dc.Unlock() - } dc.Close() - return - } - if !resetSession { - return - } - select { - default: - // If the resetterCh is blocking then mark the connection - // as bad and continue on. - dc.lastErr = driver.ErrBadConn - dc.Unlock() - case db.resetterCh <- dc: } } diff --git a/src/database/sql/sql_test.go b/src/database/sql/sql_test.go index 7100d000c7..3551366369 100644 --- a/src/database/sql/sql_test.go +++ b/src/database/sql/sql_test.go @@ -60,12 +60,10 @@ const fakeDBName = "foo" var chrisBirthday = time.Unix(123456789, 0) func newTestDB(t testing.TB, name string) *DB { - return newTestDBConnector(t, &fakeConnector{name: fakeDBName}, name) -} - -func newTestDBConnector(t testing.TB, fc *fakeConnector, name string) *DB { - fc.name = fakeDBName - db := OpenDB(fc) + db, err := Open("test", fakeDBName) + if err != nil { + t.Fatalf("Open: %v", err) + } if _, err := db.Exec("WIPE"); err != nil { t.Fatalf("exec wipe: %v", err) } @@ -587,46 +585,24 @@ func TestPoolExhaustOnCancel(t *testing.T) { if testing.Short() { t.Skip("long test") } + db := newTestDB(t, "people") + defer closeDB(t, db) max := 3 - var saturate, saturateDone sync.WaitGroup - saturate.Add(max) - saturateDone.Add(max) - - donePing := make(chan bool) - state := 0 - - // waiter will be called for all queries, including - // initial setup queries. The state is only assigned when no - // no queries are made. - // - // Only allow the first batch of queries to finish once the - // second batch of Ping queries have finished. - waiter := func(ctx context.Context) { - switch state { - case 0: - // Nothing. Initial database setup. - case 1: - saturate.Done() - select { - case <-ctx.Done(): - case <-donePing: - } - case 2: - } - } - db := newTestDBConnector(t, &fakeConnector{waiter: waiter}, "people") - defer closeDB(t, db) db.SetMaxOpenConns(max) // First saturate the connection pool. // Then start new requests for a connection that is cancelled after it is requested. - state = 1 + var saturate, saturateDone sync.WaitGroup + saturate.Add(max) + saturateDone.Add(max) + for i := 0; i < max; i++ { go func() { - rows, err := db.Query("SELECT|people|name,photo|") + saturate.Done() + rows, err := db.Query("WAIT|500ms|SELECT|people|name,photo|") if err != nil { t.Fatalf("Query: %v", err) } @@ -636,7 +612,6 @@ func TestPoolExhaustOnCancel(t *testing.T) { } saturate.Wait() - state = 2 // Now cancel the request while it is waiting. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) @@ -653,7 +628,7 @@ func TestPoolExhaustOnCancel(t *testing.T) { t.Fatalf("PingContext (Exhaust): %v", err) } } - close(donePing) + saturateDone.Wait() // Now try to open a normal connection. @@ -1357,7 +1332,6 @@ func TestConnQuery(t *testing.T) { if err != nil { t.Fatal(err) } - conn.dc.ci.(*fakeConn).skipDirtySession = true defer conn.Close() var name string @@ -1385,7 +1359,6 @@ func TestConnTx(t *testing.T) { if err != nil { t.Fatal(err) } - conn.dc.ci.(*fakeConn).skipDirtySession = true defer conn.Close() tx, err := conn.BeginTx(ctx, nil) @@ -2411,9 +2384,7 @@ func TestManyErrBadConn(t *testing.T) { t.Fatalf("unexpected len(db.freeConn) %d (was expecting %d)", len(db.freeConn), nconn) } for _, conn := range db.freeConn { - conn.Lock() conn.ci.(*fakeConn).stickyBad = true - conn.Unlock() } return db } @@ -2503,7 +2474,6 @@ func TestManyErrBadConn(t *testing.T) { if err != nil { t.Fatal(err) } - conn.dc.ci.(*fakeConn).skipDirtySession = true err = conn.Close() if err != nil { t.Fatal(err) @@ -3268,8 +3238,9 @@ func TestIssue18719(t *testing.T) { // This call will grab the connection and cancel the context // after it has done so. Code after must deal with the canceled state. - _, err = tx.QueryContext(ctx, "SELECT|people|name|") + rows, err := tx.QueryContext(ctx, "SELECT|people|name|") if err != nil { + rows.Close() t.Fatalf("expected error %v but got %v", nil, err) } @@ -3292,7 +3263,6 @@ func TestIssue20647(t *testing.T) { if err != nil { t.Fatal(err) } - conn.dc.ci.(*fakeConn).skipDirtySession = true defer conn.Close() stmt, err := conn.PrepareContext(ctx, "SELECT|people|name|")