返回介绍

上卷 程序设计

中卷 标准库

下卷 运行时

源码剖析

附录

transaction

发布于 2024-10-12 19:15:52 字数 4527 浏览 0 评论 0 收藏 0

事务 是指单个逻辑单元内的一系列操作,要么全部执行,要么全部放弃。确保所有操作全部成功后才更新数据。还为多个并发操作进行隔离,避免相互干扰。

基本特性(ACID)

  • 原子性:作为整体被执行。
  • 一致性:确保从一致状态转变为另一个一致状态。
  • 隔离性:多事务并发执行,彼此互不影响。
  • 持久性:已提交修改应永久保存。

因为连接池的缘故,多个 DB.Exec 调用无法保证在同一连接上完成。所以不能直接 DB.Exec("BEGIN") 启动或提交事务。而应该通过 Begin 创建事务,它确保后续操作在同一连接上进行。

在数据库服务器端,事务必须在单个连接上完成。

  • 事务内所有操作是串行,可用上下文控制进度。
  • 可从外部 Stmt 进行参数复制,创建事务版。
  • 事务内 Prepare 同样须 Stmt.Close ,且必须在提交和回滚前。
  • 提交( Commit )和回滚( Rollback )都会判断完成状态。
package main

import (
	"database/sql"
	"log"

	_ "github.com/mattn/go-sqlite3"
)

func main() {
	log.SetFlags(log.Lshortfile)

	db, err := sql.Open("sqlite3", "./test.db")
	if err != nil { log.Fatalln(err) }
	defer db.Close()

	// -------------------------------------------

	tx, err := db.Begin()
	if err != nil { log.Fatalln(err) }

	// -------------------------------------------
    
    // exec table
	execs := []func() error {
		// insert
		func() (err error) {
			q := `INSERT INTO user (id, name) VALUES (?, ?)`
			_, err = tx.Exec(q, 102, "u102")
			return
		},

		// select
		func() (err error) {
			var name string
			
			q := `SELECT (name) FROM user WHERE id = ?`
			err = tx.QueryRow(q, 102).Scan(&name)
			if err != nil { return }

			println(name)
			return
		},
	}
    
	// -------------------------------------------

    // exec, rollback
	for _, exec := range execs {
		if err := exec(); err != nil {
			if e := tx.Rollback(); e != nil { log.Fatalln(e) }
			log.Fatalln(err)
		}
	}

    // commit
	if e := tx.Commit(); e != nil && e != sql.ErrTxDone {
		log.Fatalln(e)
	}
}

因事务单连接特征,下面这样的操作有可能引发错误。具体与系统和驱动有关。

rows = tx.Query("SELECT id FROM user")

for rows.Next() {
    rows.Scan(&id)
    tx.QueryRow("SELECT name FROM user WHERE id = ?", id).Scan(&name)
}

源码剖析

方法 Begin 持有一个专门连接,后续所有操作都使用该连接。

// database/sql/sql.go

func (db *DB) begin(ctx, opts, strategy) (tx *Tx, err error) {
	dc, err := db.conn(ctx, strategy)
	return db.beginDC(ctx, dc, dc.releaseConn, opts)
}
func (db *DB) beginDC(ctx, dc, release, opts) (tx *Tx, err error) {
    
	withLock(dc, func() {
		txi, err = ctxDriverBegin(ctx, opts, dc.ci)
	})
    
	tx = &Tx{
		db:                 db,
		dc:                 dc,
		releaseConn:        release,
		txi:                txi,
		cancel:             cancel,
		keepConnOnRollback: keepConnOnRollback,
		ctx:                ctx,
	}
    
	return tx, nil
}

除通过 grabConn 获取事务连接外,相关操作并没有什么需要特别说明的。

func (tx *Tx) ExecContext(ctx, query, args ...any) (Result, error) {
	dc, release, err := tx.grabConn(ctx)
	return tx.db.execDC(ctx, dc, release, query, args)
}
func (tx *Tx) grabConn(ctx) (*driverConn, releaseConn, error) {
	if tx.isDone() {
		return nil, nil, ErrTxDone
	}
    
	return tx.dc, tx.closemuRUnlockRelease, nil
}

func (tx *Tx) isDone() bool {
	return atomic.LoadInt32(&tx.done) != 0
}

事务提交或回滚,重要的还有释放连接。

func (tx *Tx) Commit() error {
    
    // 设置结束标记。如果失败,返回特定错误。
	if !atomic.CompareAndSwapInt32(&tx.done, 0, 1) {
		return ErrTxDone
	}

    // 提交。
	withLock(tx.dc, func() {
		err = tx.txi.Commit()
	})
    
	tx.close(err)
	return err
}
func (tx *Tx) rollback(discardConn bool) error {
    
    // 设置结束标记。如果失败,返回特定错误。
	if !atomic.CompareAndSwapInt32(&tx.done, 0, 1) {
		return ErrTxDone
	}

    // 回滚。
	withLock(tx.dc, func() {
		err = tx.txi.Rollback()
	})
    
	tx.close(err)
	return err
}
func (tx *Tx) close(err error) {
	tx.releaseConn(err)
	tx.dc = nil
	tx.txi = nil
}

执行 Prepare ,直接使用事务连接,且释放函数执行解锁而非归还连接。

func (tx *Tx) PrepareContext(ctx, query string) (*Stmt, error) {
	dc, release, err := tx.grabConn(ctx)
	stmt, err := tx.db.prepareDC(ctx, dc, release, tx, query)
}
func (s *Stmt) connStmt(ctx, strategy) (...) {

	// In a transaction or connection, we always use the connection 
    // that the stmt was created on.
    
    // s.cg -> tx
	if s.cg != nil {  
		dc, releaseConn, err = s.cg.grabConn(ctx)
		return dc, releaseConn, s.cgds, nil
	}
}
func (tx *Tx) grabConn(ctx) (*driverConn, releaseConn, error) {
	return tx.dc, tx.closemuRUnlockRelease, nil
}

func (tx *Tx) closemuRUnlockRelease(error) {
	tx.closemu.RUnlock()
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文