diff --git a/src/database/sql/fakedb_test.go b/src/database/sql/fakedb_test.go index 112f280ec5..f1e8f6cb6e 100644 --- a/src/database/sql/fakedb_test.go +++ b/src/database/sql/fakedb_test.go @@ -153,12 +153,32 @@ func TestDrivers(t *testing.T) { } } +// hook to simulate connection failures +var hookOpenErr struct { + sync.Mutex + fn func() error +} + +func setHookOpenErr(fn func() error) { + hookOpenErr.Lock() + defer hookOpenErr.Unlock() + hookOpenErr.fn = fn +} + // Supports dsn forms: // // ; (only currently supported option is `badConn`, // which causes driver.ErrBadConn to be returned on // every other conn.Begin()) func (d *fakeDriver) Open(dsn string) (driver.Conn, error) { + hookOpenErr.Lock() + fn := hookOpenErr.fn + hookOpenErr.Unlock() + if fn != nil { + if err := fn(); err != nil { + return nil, err + } + } parts := strings.Split(dsn, ";") if len(parts) < 1 { return nil, errors.New("fakedb: no database name") diff --git a/src/database/sql/sql.go b/src/database/sql/sql.go index fbb0e594a5..f3fed953ad 100644 --- a/src/database/sql/sql.go +++ b/src/database/sql/sql.go @@ -229,8 +229,7 @@ type DB struct { mu sync.Mutex // protects following fields freeConn []*driverConn connRequests []chan connRequest - numOpen int - pendingOpens int + numOpen int // number of opened and pending open connections // Used to signal the need for new connections // a goroutine running connectionOpener() reads on this chan and // maybeOpenNewConnections sends on the chan (one send per needed connection) @@ -615,15 +614,15 @@ func (db *DB) Stats() DBStats { // If there are connRequests and the connection limit hasn't been reached, // then tell the connectionOpener to open new connections. func (db *DB) maybeOpenNewConnections() { - numRequests := len(db.connRequests) - db.pendingOpens + numRequests := len(db.connRequests) if db.maxOpen > 0 { - numCanOpen := db.maxOpen - (db.numOpen + db.pendingOpens) + numCanOpen := db.maxOpen - db.numOpen if numRequests > numCanOpen { numRequests = numCanOpen } } for numRequests > 0 { - db.pendingOpens++ + db.numOpen++ // optimistically numRequests-- db.openerCh <- struct{}{} } @@ -638,6 +637,9 @@ func (db *DB) connectionOpener() { // Open one new connection func (db *DB) openNewConnection() { + // maybeOpenNewConnctions has already executed db.numOpen++ before it sent + // on db.openerCh. This function must execute db.numOpen-- if the + // connection fails or is closed before returning. ci, err := db.driver.Open(db.dsn) db.mu.Lock() defer db.mu.Unlock() @@ -645,11 +647,13 @@ func (db *DB) openNewConnection() { if err == nil { ci.Close() } + db.numOpen-- return } - db.pendingOpens-- if err != nil { + db.numOpen-- db.putConnDBLocked(nil, err) + db.maybeOpenNewConnections() return } dc := &driverConn{ @@ -658,8 +662,8 @@ func (db *DB) openNewConnection() { } if db.putConnDBLocked(dc, err) { db.addDepLocked(dc, dc) - db.numOpen++ } else { + db.numOpen-- ci.Close() } } @@ -701,7 +705,10 @@ func (db *DB) conn(strategy connReuseStrategy) (*driverConn, error) { req := make(chan connRequest, 1) db.connRequests = append(db.connRequests, req) db.mu.Unlock() - ret := <-req + ret, ok := <-req + if !ok { + return nil, errDBClosed + } return ret.conn, ret.err } @@ -711,6 +718,7 @@ func (db *DB) conn(strategy connReuseStrategy) (*driverConn, error) { if err != nil { db.mu.Lock() db.numOpen-- // correct for earlier optimism + db.maybeOpenNewConnections() db.mu.Unlock() return nil, err } diff --git a/src/database/sql/sql_test.go b/src/database/sql/sql_test.go index e1063bbc6b..d835bc160a 100644 --- a/src/database/sql/sql_test.go +++ b/src/database/sql/sql_test.go @@ -1159,6 +1159,67 @@ func TestMaxOpenConnsOnBusy(t *testing.T) { } } +// Issue 10886: tests that all connection attempts return when more than +// DB.maxOpen connections are in flight and the first DB.maxOpen fail. +func TestPendingConnsAfterErr(t *testing.T) { + const ( + maxOpen = 2 + tryOpen = maxOpen*2 + 2 + ) + + db := newTestDB(t, "people") + defer closeDB(t, db) + defer func() { + for k, v := range db.lastPut { + t.Logf("%p: %v", k, v) + } + }() + + db.SetMaxOpenConns(maxOpen) + db.SetMaxIdleConns(0) + + errOffline := errors.New("db offline") + defer func() { setHookOpenErr(nil) }() + + errs := make(chan error, tryOpen) + + unblock := make(chan struct{}) + setHookOpenErr(func() error { + <-unblock // block until all connections are in flight + return errOffline + }) + + var opening sync.WaitGroup + opening.Add(tryOpen) + for i := 0; i < tryOpen; i++ { + go func() { + opening.Done() // signal one connection is in flight + _, err := db.Exec("INSERT|people|name=Julia,age=19") + errs <- err + }() + } + + opening.Wait() // wait for all workers to begin running + time.Sleep(10 * time.Millisecond) // make extra sure all workers are blocked + close(unblock) // let all workers proceed + + const timeout = 100 * time.Millisecond + to := time.NewTimer(timeout) + defer to.Stop() + + // check that all connections fail without deadlock + for i := 0; i < tryOpen; i++ { + select { + case err := <-errs: + if got, want := err, errOffline; got != want { + t.Errorf("unexpected err: got %v, want %v", got, want) + } + case <-to.C: + t.Fatalf("orphaned connection request(s), still waiting after %v", timeout) + } + } +} + func TestSingleOpenConn(t *testing.T) { db := newTestDB(t, "people") defer closeDB(t, db)