上卷 程序设计
中卷 标准库
- bufio 1.18
- bytes 1.18
- io 1.18
- container 1.18
- encoding 1.18
- crypto 1.18
- hash 1.18
- index 1.18
- sort 1.18
- context 1.18
- database 1.18
- connection
- query
- queryrow
- exec
- prepare
- transaction
- scan & null
- context
- tcp
- udp
- http
- server
- handler
- client
- h2、tls
- url
- rpc
- exec
- signal
- embed 1.18
- plugin 1.18
- reflect 1.18
- runtime 1.18
- KeepAlived
- ReadMemStats
- SetFinalizer
- Stack
- sync 1.18
- atomic
- mutex
- rwmutex
- waitgroup
- cond
- once
- map
- pool
- copycheck
- nocopy
- unsafe 1.18
- fmt 1.18
- log 1.18
- math 1.18
- time 1.18
- timer
下卷 运行时
源码剖析
附录
文章来源于网络收集而来,版权归原创者所有,如有侵权请及时联系!
connection
通常不需要显式获取连接,由相关方法自动完成。
- 优先从连接池获取空闲连接。
- 若没取到,检查是否达到上限,以决定是否新建连接。
- 已达上限,请求被阻塞,直到有可用连接被放回池中。
- 方法
Conn
获取连接,须用conn.Close
放回。 - 方法
Stats
返回相关状态 统计信息。
闲置过久会出问题,可设置生命周期。当然,连接池也会自动清理坏掉的连接。
若相关方法检测到 ErrBadConn
,会重新获取连接重试。
SetMaxOpenConns
:inuse
+idle
SetMaxIdleConns
:MaxIdle <= MaxOpen
SetConnMaxIdleTime
SetConnMaxLifetime
SetMaxIdleConns(0)
:释放后,重新设置。
源码剖析
以 Ping
为例,看看具体过程。
// database/sql/sql.go const maxBadConnRetries = 2 func (db *DB) PingContext(ctx context.Context) error { // 如果是坏的,重试。 for i := 0; i < maxBadConnRetries; i++ { dc, err = db.conn(ctx, cachedOrNewConn) isBadConn = errors.Is(err, driver.ErrBadConn) if !isBadConn { break } } // 依旧是坏的,新建。 if isBadConn { dc, err = db.conn(ctx, alwaysNewConn) } if err != nil { return err } // 结束前放回连接。(releaseConn / putConn) return db.pingDC(ctx, dc, dc.releaseConn) }
func (dc *driverConn) releaseConn(err error) { dc.db.putConn(dc, err, true) }
调用 conn
获取闲置连接,或新建。
func (db *DB) conn(ctx context.Context, ...) (*driverConn, error) { db.mu.Lock() // !!! // 尝试获取闲置连接。 last := len(db.freeConn) - 1 if strategy == cachedOrNewConn && last >= 0 { // 列表最后一个,闲置时间最短。 conn := db.freeConn[last] db.freeConn = db.freeConn[:last] // 过期,关掉。 if conn.expired(lifetime) { return nil, driver.ErrBadConn } // 坏的,关掉。 if err := conn.resetSession(ctx); ErrBadConn { conn.Close() return nil, err } return conn, nil } // 超过上限。阻塞,等待! if db.maxOpen > 0 && db.numOpen >= db.maxOpen { // 加入队列,用 chan 接收连接。 req := make(chan connRequest, 1) reqKey := db.nextRequestKeyLocked() // 自增序号! db.connRequests[reqKey] = req db.waitCount++ // 阻塞,等着连接送上门。 select { // case context cancel ... case ret, ok := <-req: // 如果连接过期或坏掉,关闭 ... return ret.conn, ret.err } } db.numOpen++ // 新建。 ci, err := db.connector.Connect(ctx) dc := &driverConn{ db: db, ci: ci, inUse: true, } return dc, nil }
当 Ping
结束时,调用 releaseConn
、 putConn
将连接放回池中。
如果有请求被阻塞,随机挑一个,将连接用通道发给他。
// database/sql/sql.go func (db *DB) putConn(dc *driverConn, err error, resetSession bool) { db.mu.Lock() // 如果连接坏掉,关掉并放弃 ... // 放回。 added := db.putConnDBLocked(dc, nil) db.mu.Unlock() // 放回失败,直接处理掉连接。 if !added { dc.Close() return } }
func (db *DB) putConnDBLocked(dc *driverConn, err error) bool { // 超过上限。 if db.maxOpen > 0 && db.numOpen > db.maxOpen { return false } // 有请求阻塞。 if c := len(db.connRequests); c > 0 { // 随机挑一个。 for reqKey, req = range db.connRequests { break } // 从队列移除。 delete(db.connRequests, reqKey) // 将连接发送给他。 req <- connRequest{ conn: dc, err: err, } return true } else if err == nil && !db.closed { // 未超上限,返回闲置列表。 if db.maxIdleConnsLocked() > len(db.freeConn) { db.freeConn = append(db.freeConn, dc) // 清理过期连接。 db.startCleanerLocked() return true } } return false }
信号
在 OpenDB
里有个特别操作:接收 “信号”,创建新连接并放回池中。
// database/sql/sql.go func OpenDB(c driver.Connector) *DB { go db.connectionOpener(ctx) } func (db *DB) connectionOpener(ctx context.Context) { for { select { case <-db.openerCh: db.openNewConnection(ctx) } } }
// Open one new connection func (db *DB) openNewConnection(ctx context.Context) { // 新建连接。 ci, err := db.connector.Connect(ctx) dc := &driverConn{ db: db, ci: ci, } // 放回连接池,或直接给被阻塞的请求。 if db.putConnDBLocked(dc, err) { db.addDepLocked(dc, dc) } else { ci.Close() } }
问题是,谁在发信号?什么时候发?
// database/sql/sql.go func (db *DB) putConn(dc *driverConn, err error, resetSession bool) { // 如果放回的连接是坏的。 if errors.Is(err, driver.ErrBadConn) { db.maybeOpenNewConnections() // 新建。 dc.Close() // 放弃坏掉的。 return } // ... }
func (db *DB) maybeOpenNewConnections() { // 按被阻塞的请求数,发送信号。 numRequests := len(db.connRequests) if db.maxOpen > 0 { numCanOpen := db.maxOpen - db.numOpen if numRequests > numCanOpen { numRequests = numCanOpen } } for numRequests > 0 { db.numOpen++ // optimistically numRequests-- if db.closed { return } db.openerCh <- struct{}{} } }
其他相关方法,也会发送信号,以期尽快解除请求阻塞。
func (dc *driverConn) finalClose() error { dc.db.maybeOpenNewConnections() }
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论