批量创建记录时 Sidekiq 作业幂等性

发布于 2025-01-15 17:49:59 字数 2137 浏览 2 评论 0原文

我们如何以幂等方式批量创建记录?

在下面的示例中,如果一切按预期运行,则应创建 100,500 个票证。但是,假设至少其中一项作业由于某种未知原因运行了两次。

  1. 我们如何保证作业只创建所请求的确切数量的票证,而不是更多?
  2. 我们可以在没有竞争条件风险的情况下做到这一点吗?

上下文

我正在尝试快速批量创建 100k+ 条记录,Sidekiq 最佳实践建议作业应该是幂等的,即它们应该能够运行多次并且最终结果应该相同。

就我而言,我正在执行以下操作:

  • 我使用 insert_all (Rails 6+) 能够非常快速地进行批量创建(它会跳过 Rails 验证)。
  • 如果任何批处理创建作业无法为其批创建所有记录,则该尝试将以原子方式回滚,并且作业失败(并稍后重试)。

示例

我们有一个 raffles 表:

id number_of_tickets_requested

创建新的 raffles 记录后,我们希望在 中批量创建抽奖门票Tickets 表:

id code raffle_id

假设我们刚刚创建了一个新的抽奖活动,其中 number_of_tickets_requested: 100500

(免责声明:我在示例中对内容进行了硬编码,以使其更易于理解。)

到目前为止我的尝试

在 Raffle 模型中:

  MAX_TICKETS_PER_JOB = 1000

  after_create :queue_jobs_to_batch_create_tickets

  def queue_jobs_to_batch_create_tickets
    100.times { BatchCreateTicketsJob.perform_later(raffle, 1000) }
    BatchCreateTicketsJob.perform_later(raffle, 500)
  end

在 BatchCreateTicketsJob 中:

  def perform(raffle, number_of_tickets_to_create)
    BatchCreateTicketsService.call(raffle, number_of_tickets_to_create)
  end

在 BatchCreateTicketsService 中:

  def call
    Raffle.transaction do
      # Uses insert_all to create all tickets in 1 db query
      # It skips Rails validations so is very fast
      # It only creates records that pass the db validations
      result = Ticket.insert_all(tickets)

      unless result.count == number_of_tickets_to_create
        raise ActiveRecord::Rollback
      end
    end
  end

  private

  def tickets
    result = []
    number_of_tickets_to_create.times { result << new_ticket }
    result
  end

  def new_ticket
    {
      code: "#{SecureRandom.hex(6)}".upcase,
      raffle_id: raffle.id
    }
  end

How do we create records in batches in an idempotent fashion?

In the example below, if everything runs as expected, then 100,500 tickets should be created. However, suppose at least one of the jobs is run twice for some unknown reason.

  1. How can we guarantee that the jobs only create the exact number of tickets requested, and no more?
  2. Can we do this without any risk of race conditions?

Context

I'm trying to batch-create 100k+ records quickly, and Sidekiq best practices recommend that jobs should be idempotent, i.e. they should be able to run several times and the end result should be the same.

In my case, I am doing the following:

  • I'm using insert_all (Rails 6+) to be able to do this bulk-creation very quickly (it skips Rails validations).
  • If any of the batch-create jobs fail to create all of the records for their batch, that attempt rolls back in an atomic fashion and the job fails (and later retries).

Example

We have a raffles table:

id number_of_tickets_requested

Upon creating a new raffle record, we want to batch-create tickets for the raffle in a tickets table:

id code raffle_id

Suppose we've just created a new raffle with number_of_tickets_requested: 100500.

(Disclaimer: I've hard-coded things in the example to try to make it easier to understand.)

My attempt so far

In Raffle model:

  MAX_TICKETS_PER_JOB = 1000

  after_create :queue_jobs_to_batch_create_tickets

  def queue_jobs_to_batch_create_tickets
    100.times { BatchCreateTicketsJob.perform_later(raffle, 1000) }
    BatchCreateTicketsJob.perform_later(raffle, 500)
  end

In BatchCreateTicketsJob:

  def perform(raffle, number_of_tickets_to_create)
    BatchCreateTicketsService.call(raffle, number_of_tickets_to_create)
  end

In BatchCreateTicketsService:

  def call
    Raffle.transaction do
      # Uses insert_all to create all tickets in 1 db query
      # It skips Rails validations so is very fast
      # It only creates records that pass the db validations
      result = Ticket.insert_all(tickets)

      unless result.count == number_of_tickets_to_create
        raise ActiveRecord::Rollback
      end
    end
  end

  private

  def tickets
    result = []
    number_of_tickets_to_create.times { result << new_ticket }
    result
  end

  def new_ticket
    {
      code: "#{SecureRandom.hex(6)}".upcase,
      raffle_id: raffle.id
    }
  end

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

物价感观 2025-01-22 17:49:59

作为参考,我最终选择了:

  • with_lock 来防止竞争条件;
  • 事务以确保原子性;
  • 莱佛士桌上新的 tickets_count 计数器列以确保幂等性。
class BatchCreateTicketsService < ApplicationService
  attr_reader :raffle, :num_tickets

  def initialize(raffle, num_tickets)
    @raffle = raffle
    @num_tickets = num_tickets
  end

  def call
    raffle.with_lock do
      Raffle.transaction do
        create_tickets
      end
    end
  end

  private

  def create_tickets
    result = Ticket.insert_all(tickets)

    raise StandardError unless result.count == num_tickets

    raffle.tickets_count += result.count
    raffle.save
  end

  def tickets
    result = []
    num_tickets.times { result << new_ticket }
    result
  end

  def new_ticket
    {
      code: "#{SecureRandom.hex(6)}".upcase,
      raffle_id: raffle.id
    }
  end
end

For reference, I ended up going with:

  • with_lock to prevent race conditions;
  • a transaction to ensure atomicity;
  • a new tickets_count counter column on the raffles table to ensure idempotency.
class BatchCreateTicketsService < ApplicationService
  attr_reader :raffle, :num_tickets

  def initialize(raffle, num_tickets)
    @raffle = raffle
    @num_tickets = num_tickets
  end

  def call
    raffle.with_lock do
      Raffle.transaction do
        create_tickets
      end
    end
  end

  private

  def create_tickets
    result = Ticket.insert_all(tickets)

    raise StandardError unless result.count == num_tickets

    raffle.tickets_count += result.count
    raffle.save
  end

  def tickets
    result = []
    num_tickets.times { result << new_ticket }
    result
  end

  def new_ticket
    {
      code: "#{SecureRandom.hex(6)}".upcase,
      raffle_id: raffle.id
    }
  end
end
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文