aboutsummaryrefslogtreecommitdiff
path: root/src/database/sql/sql.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/database/sql/sql.go')
-rw-r--r--src/database/sql/sql.go200
1 files changed, 110 insertions, 90 deletions
diff --git a/src/database/sql/sql.go b/src/database/sql/sql.go
index 5c5b7dc7e9..25e241859c 100644
--- a/src/database/sql/sql.go
+++ b/src/database/sql/sql.go
@@ -421,7 +421,6 @@ type DB struct {
// It is closed during db.Close(). The close tells the connectionOpener
// goroutine to exit.
openerCh chan struct{}
- resetterCh chan *driverConn
closed bool
dep map[finalCloser]depSet
lastPut map[*driverConn]string // stacktrace of last conn's put; debug only
@@ -458,10 +457,10 @@ type driverConn struct {
sync.Mutex // guards following
ci driver.Conn
+ needReset bool // The connection session should be reset before use if true.
closed bool
finalClosed bool // ci.Close has been called
openStmt map[*driverStmt]bool
- lastErr error // lastError captures the result of the session resetter.
// guarded by db.mu
inUse bool
@@ -486,6 +485,41 @@ func (dc *driverConn) expired(timeout time.Duration) bool {
return dc.createdAt.Add(timeout).Before(nowFunc())
}
+// resetSession checks if the driver connection needs the
+// session to be reset and if required, resets it.
+func (dc *driverConn) resetSession(ctx context.Context) error {
+ dc.Lock()
+ defer dc.Unlock()
+
+ if !dc.needReset {
+ return nil
+ }
+ if cr, ok := dc.ci.(driver.SessionResetter); ok {
+ return cr.ResetSession(ctx)
+ }
+ return nil
+}
+
+// validator was introduced for Go1.15, but backported to Go1.14.
+type validator interface {
+ IsValid() bool
+}
+
+// validateConnection checks if the connection is valid and can
+// still be used. It also marks the session for reset if required.
+func (dc *driverConn) validateConnection(needsReset bool) bool {
+ dc.Lock()
+ defer dc.Unlock()
+
+ if needsReset {
+ dc.needReset = true
+ }
+ if cv, ok := dc.ci.(validator); ok {
+ return cv.IsValid()
+ }
+ return true
+}
+
// prepareLocked prepares the query on dc. When cg == nil the dc must keep track of
// the prepared statements in a pool.
func (dc *driverConn) prepareLocked(ctx context.Context, cg stmtConnGrabber, query string) (*driverStmt, error) {
@@ -511,19 +545,6 @@ func (dc *driverConn) prepareLocked(ctx context.Context, cg stmtConnGrabber, que
return ds, nil
}
-// resetSession resets the connection session and sets the lastErr
-// that is checked before returning the connection to another query.
-//
-// resetSession assumes that the embedded mutex is locked when the connection
-// was returned to the pool. This unlocks the mutex.
-func (dc *driverConn) resetSession(ctx context.Context) {
- defer dc.Unlock() // In case of panic.
- if dc.closed { // Check if the database has been closed.
- return
- }
- dc.lastErr = dc.ci.(driver.SessionResetter).ResetSession(ctx)
-}
-
// the dc.db's Mutex is held.
func (dc *driverConn) closeDBLocked() func() error {
dc.Lock()
@@ -713,14 +734,12 @@ func OpenDB(c driver.Connector) *DB {
db := &DB{
connector: c,
openerCh: make(chan struct{}, connectionRequestQueueSize),
- resetterCh: make(chan *driverConn, 50),
lastPut: make(map[*driverConn]string),
connRequests: make(map[uint64]chan connRequest),
stop: cancel,
}
go db.connectionOpener(ctx)
- go db.connectionResetter(ctx)
return db
}
@@ -1058,23 +1077,6 @@ func (db *DB) connectionOpener(ctx context.Context) {
}
}
-// connectionResetter runs in a separate goroutine to reset connections async
-// to exported API.
-func (db *DB) connectionResetter(ctx context.Context) {
- for {
- select {
- case <-ctx.Done():
- close(db.resetterCh)
- for dc := range db.resetterCh {
- dc.Unlock()
- }
- return
- case dc := <-db.resetterCh:
- dc.resetSession(ctx)
- }
- }
-}
-
// Open one new connection
func (db *DB) openNewConnection(ctx context.Context) {
// maybeOpenNewConnctions has already executed db.numOpen++ before it sent
@@ -1155,14 +1157,13 @@ func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn
conn.Close()
return nil, driver.ErrBadConn
}
- // Lock around reading lastErr to ensure the session resetter finished.
- conn.Lock()
- err := conn.lastErr
- conn.Unlock()
- if err == driver.ErrBadConn {
+
+ // Reset the session if required.
+ if err := conn.resetSession(ctx); err == driver.ErrBadConn {
conn.Close()
return nil, driver.ErrBadConn
}
+
return conn, nil
}
@@ -1204,18 +1205,22 @@ func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn
if !ok {
return nil, errDBClosed
}
- if ret.err == nil && ret.conn.expired(lifetime) {
+ // Only check if the connection is expired if the strategy is cachedOrNewConns.
+ // If we require a new connection, just re-use the connection without looking
+ // at the expiry time. If it is expired, it will be checked when it is placed
+ // back into the connection pool.
+ // This prioritizes giving a valid connection to a client over the exact connection
+ // lifetime, which could expire exactly after this point anyway.
+ if strategy == cachedOrNewConn && ret.err == nil && ret.conn.expired(lifetime) {
ret.conn.Close()
return nil, driver.ErrBadConn
}
if ret.conn == nil {
return nil, ret.err
}
- // Lock around reading lastErr to ensure the session resetter finished.
- ret.conn.Lock()
- err := ret.conn.lastErr
- ret.conn.Unlock()
- if err == driver.ErrBadConn {
+
+ // Reset the session if required.
+ if err := ret.conn.resetSession(ctx); err == driver.ErrBadConn {
ret.conn.Close()
return nil, driver.ErrBadConn
}
@@ -1275,13 +1280,23 @@ const debugGetPut = false
// putConn adds a connection to the db's free pool.
// err is optionally the last error that occurred on this connection.
func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
+ if err != driver.ErrBadConn {
+ if !dc.validateConnection(resetSession) {
+ err = driver.ErrBadConn
+ }
+ }
db.mu.Lock()
if !dc.inUse {
+ db.mu.Unlock()
if debugGetPut {
fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc])
}
panic("sql: connection returned that was never out")
}
+
+ if err != driver.ErrBadConn && dc.expired(db.maxLifetime) {
+ err = driver.ErrBadConn
+ }
if debugGetPut {
db.lastPut[dc] = stack()
}
@@ -1305,41 +1320,13 @@ func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
if putConnHook != nil {
putConnHook(db, dc)
}
- if db.closed {
- // Connections do not need to be reset if they will be closed.
- // Prevents writing to resetterCh after the DB has closed.
- resetSession = false
- }
- if resetSession {
- if _, resetSession = dc.ci.(driver.SessionResetter); resetSession {
- // Lock the driverConn here so it isn't released until
- // the connection is reset.
- // The lock must be taken before the connection is put into
- // the pool to prevent it from being taken out before it is reset.
- dc.Lock()
- }
- }
added := db.putConnDBLocked(dc, nil)
db.mu.Unlock()
if !added {
- if resetSession {
- dc.Unlock()
- }
dc.Close()
return
}
- if !resetSession {
- return
- }
- select {
- default:
- // If the resetterCh is blocking then mark the connection
- // as bad and continue on.
- dc.lastErr = driver.ErrBadConn
- dc.Unlock()
- case db.resetterCh <- dc:
- }
}
// Satisfy a connRequest or put the driverConn in the idle pool and return true
@@ -1701,7 +1688,11 @@ func (db *DB) begin(ctx context.Context, opts *TxOptions, strategy connReuseStra
// beginDC starts a transaction. The provided dc must be valid and ready to use.
func (db *DB) beginDC(ctx context.Context, dc *driverConn, release func(error), opts *TxOptions) (tx *Tx, err error) {
var txi driver.Tx
+ keepConnOnRollback := false
withLock(dc, func() {
+ _, hasSessionResetter := dc.ci.(driver.SessionResetter)
+ _, hasConnectionValidator := dc.ci.(validator)
+ keepConnOnRollback = hasSessionResetter && hasConnectionValidator
txi, err = ctxDriverBegin(ctx, opts, dc.ci)
})
if err != nil {
@@ -1713,12 +1704,13 @@ func (db *DB) beginDC(ctx context.Context, dc *driverConn, release func(error),
// The cancel function in Tx will be called after done is set to true.
ctx, cancel := context.WithCancel(ctx)
tx = &Tx{
- db: db,
- dc: dc,
- releaseConn: release,
- txi: txi,
- cancel: cancel,
- ctx: ctx,
+ db: db,
+ dc: dc,
+ releaseConn: release,
+ txi: txi,
+ cancel: cancel,
+ keepConnOnRollback: keepConnOnRollback,
+ ctx: ctx,
}
go tx.awaitDone()
return tx, nil
@@ -1980,6 +1972,11 @@ type Tx struct {
// Use atomic operations on value when checking value.
done int32
+ // keepConnOnRollback is true if the driver knows
+ // how to reset the connection's session and if need be discard
+ // the connection.
+ keepConnOnRollback bool
+
// All Stmts prepared for this transaction. These will be closed after the
// transaction has been committed or rolled back.
stmts struct {
@@ -2005,7 +2002,10 @@ func (tx *Tx) awaitDone() {
// transaction is closed and the resources are released. This
// rollback does nothing if the transaction has already been
// committed or rolled back.
- tx.rollback(true)
+ // Do not discard the connection if the connection knows
+ // how to reset the session.
+ discardConnection := !tx.keepConnOnRollback
+ tx.rollback(discardConnection)
}
func (tx *Tx) isDone() bool {
@@ -2016,14 +2016,10 @@ func (tx *Tx) isDone() bool {
// that has already been committed or rolled back.
var ErrTxDone = errors.New("sql: transaction has already been committed or rolled back")
-// close returns the connection to the pool and
-// must only be called by Tx.rollback or Tx.Commit.
-func (tx *Tx) close(err error) {
- tx.cancel()
-
- tx.closemu.Lock()
- defer tx.closemu.Unlock()
-
+// closeLocked returns the connection to the pool and
+// must only be called by Tx.rollback or Tx.Commit while
+// closemu is Locked and tx already canceled.
+func (tx *Tx) closeLocked(err error) {
tx.releaseConn(err)
tx.dc = nil
tx.txi = nil
@@ -2090,6 +2086,15 @@ func (tx *Tx) Commit() error {
if !atomic.CompareAndSwapInt32(&tx.done, 0, 1) {
return ErrTxDone
}
+
+ // Cancel the Tx to release any active R-closemu locks.
+ // This is safe to do because tx.done has already transitioned
+ // from 0 to 1. Hold the W-closemu lock prior to rollback
+ // to ensure no other connection has an active query.
+ tx.cancel()
+ tx.closemu.Lock()
+ defer tx.closemu.Unlock()
+
var err error
withLock(tx.dc, func() {
err = tx.txi.Commit()
@@ -2097,16 +2102,31 @@ func (tx *Tx) Commit() error {
if err != driver.ErrBadConn {
tx.closePrepared()
}
- tx.close(err)
+ tx.closeLocked(err)
return err
}
+var rollbackHook func()
+
// rollback aborts the transaction and optionally forces the pool to discard
// the connection.
func (tx *Tx) rollback(discardConn bool) error {
if !atomic.CompareAndSwapInt32(&tx.done, 0, 1) {
return ErrTxDone
}
+
+ if rollbackHook != nil {
+ rollbackHook()
+ }
+
+ // Cancel the Tx to release any active R-closemu locks.
+ // This is safe to do because tx.done has already transitioned
+ // from 0 to 1. Hold the W-closemu lock prior to rollback
+ // to ensure no other connection has an active query.
+ tx.cancel()
+ tx.closemu.Lock()
+ defer tx.closemu.Unlock()
+
var err error
withLock(tx.dc, func() {
err = tx.txi.Rollback()
@@ -2117,7 +2137,7 @@ func (tx *Tx) rollback(discardConn bool) error {
if discardConn {
err = driver.ErrBadConn
}
- tx.close(err)
+ tx.closeLocked(err)
return err
}