返回介绍

上卷 程序设计

中卷 标准库

下卷 运行时

源码剖析

附录

connection

发布于 2024-10-12 19:15:51 字数 5132 浏览 0 评论 0 收藏 0

通常不需要显式获取连接,由相关方法自动完成。

  • 优先从连接池获取空闲连接。
  • 若没取到,检查是否达到上限,以决定是否新建连接。
  • 已达上限,请求被阻塞,直到有可用连接被放回池中。
  • 方法 Conn 获取连接,须用 conn.Close 放回。
  • 方法 Stats 返回相关状态 统计信息。

闲置过久会出问题,可设置生命周期。当然,连接池也会自动清理坏掉的连接。
若相关方法检测到 ErrBadConn ,会重新获取连接重试。

  • SetMaxOpenConns : inuse + idle
  • SetMaxIdleConns : MaxIdle <= MaxOpen
  • SetConnMaxIdleTime
  • SetConnMaxLifetime
  • SetMaxIdleConns(0) :释放后,重新设置。

源码剖析

Ping 为例,看看具体过程。

// database/sql/sql.go

const maxBadConnRetries = 2

func (db *DB) PingContext(ctx context.Context) error {
    
    // 如果是坏的,重试。
	for i := 0; i < maxBadConnRetries; i++ {
		dc, err = db.conn(ctx, cachedOrNewConn)
        
		isBadConn = errors.Is(err, driver.ErrBadConn)
		if !isBadConn { break }
	}
    
    // 依旧是坏的,新建。
	if isBadConn {
		dc, err = db.conn(ctx, alwaysNewConn)
	}
	if err != nil {
		return err
	}

    // 结束前放回连接。(releaseConn / putConn)
	return db.pingDC(ctx, dc, dc.releaseConn)
}
func (dc *driverConn) releaseConn(err error) {
	dc.db.putConn(dc, err, true)
}

调用 conn 获取闲置连接,或新建。

func (db *DB) conn(ctx context.Context, ...) (*driverConn, error) {
    
    db.mu.Lock() // !!!

	// 尝试获取闲置连接。
	last := len(db.freeConn) - 1
	if strategy == cachedOrNewConn && last >= 0 {
        
        // 列表最后一个,闲置时间最短。
		conn := db.freeConn[last]
		db.freeConn = db.freeConn[:last]
        
        // 过期,关掉。
		if conn.expired(lifetime) {
			return nil, driver.ErrBadConn
		}
        
		// 坏的,关掉。
		if err := conn.resetSession(ctx); ErrBadConn {
			conn.Close()
			return nil, err
		}

		return conn, nil
	}

    // 超过上限。阻塞,等待!
	if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
        
        // 加入队列,用 chan 接收连接。
		req := make(chan connRequest, 1)
		reqKey := db.nextRequestKeyLocked() // 自增序号!
		db.connRequests[reqKey] = req  
        
		db.waitCount++

        // 阻塞,等着连接送上门。
		select {
            // case context cancel ...
		case ret, ok := <-req:
            // 如果连接过期或坏掉,关闭 ...            
			return ret.conn, ret.err
		}
	}

	db.numOpen++
    
    // 新建。
	ci, err := db.connector.Connect(ctx)
	dc := &driverConn{
		db:    db,
		ci:    ci,
		inUse: true,
	}
    
	return dc, nil
}

Ping 结束时,调用 releaseConnputConn 将连接放回池中。
如果有请求被阻塞,随机挑一个,将连接用通道发给他。

// database/sql/sql.go

func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
	db.mu.Lock()
    
    // 如果连接坏掉,关掉并放弃 ...
    
    // 放回。
	added := db.putConnDBLocked(dc, nil)
	db.mu.Unlock()

    // 放回失败,直接处理掉连接。
	if !added {
		dc.Close()
		return
	}
}
func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
    
    // 超过上限。
	if db.maxOpen > 0 && db.numOpen > db.maxOpen {
		return false
	}
    
    // 有请求阻塞。
	if c := len(db.connRequests); c > 0 {
        
        // 随机挑一个。
		for reqKey, req = range db.connRequests {
			break
		}
        
        // 从队列移除。
		delete(db.connRequests, reqKey) 
        
        // 将连接发送给他。
		req <- connRequest{
			conn: dc,
			err:  err,
		}
        
		return true
	} else if err == nil && !db.closed {
        
        // 未超上限,返回闲置列表。
		if db.maxIdleConnsLocked() > len(db.freeConn) {
			db.freeConn = append(db.freeConn, dc)
            
            // 清理过期连接。
			db.startCleanerLocked()
            
			return true
		}
	}
	return false
}

信号

OpenDB 里有个特别操作:接收 “信号”,创建新连接并放回池中。

// database/sql/sql.go

func OpenDB(c driver.Connector) *DB {
	go db.connectionOpener(ctx)
}

func (db *DB) connectionOpener(ctx context.Context) {
	for {
		select {
		case <-db.openerCh:
			db.openNewConnection(ctx)
		}
	}
}
// Open one new connection
func (db *DB) openNewConnection(ctx context.Context) {
    
    // 新建连接。
	ci, err := db.connector.Connect(ctx)    
	dc := &driverConn{
		db: db,
		ci: ci,
	}
    
    // 放回连接池,或直接给被阻塞的请求。
	if db.putConnDBLocked(dc, err) {
		db.addDepLocked(dc, dc)
	} else {
		ci.Close()
	}
}

问题是,谁在发信号?什么时候发?

// database/sql/sql.go

func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {

    // 如果放回的连接是坏的。
	if errors.Is(err, driver.ErrBadConn) {
		db.maybeOpenNewConnections()  // 新建。
		dc.Close()                    // 放弃坏掉的。
		return
	}

    // ...
}
func (db *DB) maybeOpenNewConnections() {
    
    // 按被阻塞的请求数,发送信号。
    
	numRequests := len(db.connRequests)
	if db.maxOpen > 0 {
		numCanOpen := db.maxOpen - db.numOpen
		if numRequests > numCanOpen {
			numRequests = numCanOpen
		}
	}
    
	for numRequests > 0 {
		db.numOpen++ // optimistically
		numRequests--
		if db.closed { return }
        
		db.openerCh <- struct{}{}
	}
}

其他相关方法,也会发送信号,以期尽快解除请求阻塞。

func (dc *driverConn) finalClose() error {
	dc.db.maybeOpenNewConnections()
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文