使用DataFlow的pubsub到云存储很慢

发布于 2025-02-09 22:44:58 字数 2001 浏览 1 评论 0原文

我正在使用以下示例 snippet向GCS写下PubSub消息:

class WriteToGCS(beam.DoFn):
    def __init__(self, output_path, prefix):
        self.output_path = output_path
        self.prefix = prefix

    def process(self, key_value, window=beam.DoFn.WindowParam):
        """Write messages in a batch to Google Cloud Storage."""

        start_date = window.start.to_utc_datetime().date().isoformat()
        start = window.start.to_utc_datetime().isoformat()
        end = window.end.to_utc_datetime().isoformat()
        shard_id, batch = key_value

        filename = f'{self.output_path}/{start_date}/{start}-{end}/{self.prefix}-{start}-{end}-{shard_id:03d}'

        with beam.io.gcsio.GcsIO().open(filename=filename, mode="w") as f:
            for message_body in batch:
                f.write(f"{message_body},".encode("utf-8"))

但是,它的速度非常慢。 这就是图中的外观。有没有办法加快这一步骤?订阅每秒可获得500个元素,因此每秒3-10个元素无法跟上。

管道看起来像这样:

class JsonWriter(beam.PTransform):
    def __init__(self, window_size, path, prefix, num_shards=20):
        self.window_size = int(window_size)
        self.path = path
        self.prefix = prefix
        self.num_shards = num_shards

    def expand(self, pcoll):
        return (
            pcoll
            | "Group into fixed windows" >> beam.WindowInto(window.FixedWindows(self.window_size, 0))
            | "Decode windowed elements" >> beam.ParDo(Decode())
            | "Group into batches" >> beam.BatchElements()
            | "Add key" >> beam.WithKeys(lambda _: random.randint(0, int(self.num_shards) - 1))
            | "Write to GCS" >> beam.ParDo(WriteToGCS(self.path, self.prefix))
        )

I am using a slightly adjusted version of the following example snippet to write PubSub messages to GCS:

class WriteToGCS(beam.DoFn):
    def __init__(self, output_path, prefix):
        self.output_path = output_path
        self.prefix = prefix

    def process(self, key_value, window=beam.DoFn.WindowParam):
        """Write messages in a batch to Google Cloud Storage."""

        start_date = window.start.to_utc_datetime().date().isoformat()
        start = window.start.to_utc_datetime().isoformat()
        end = window.end.to_utc_datetime().isoformat()
        shard_id, batch = key_value

        filename = f'{self.output_path}/{start_date}/{start}-{end}/{self.prefix}-{start}-{end}-{shard_id:03d}'

        with beam.io.gcsio.GcsIO().open(filename=filename, mode="w") as f:
            for message_body in batch:
                f.write(f"{message_body},".encode("utf-8"))

However, its terribly slow. This is how it looks like in the graph. Is there a way to speed up this step? The subscription gets 500 elements per second, so 3-10 elements per seconds is not keeping up.

The pipeline looks like this:

class JsonWriter(beam.PTransform):
    def __init__(self, window_size, path, prefix, num_shards=20):
        self.window_size = int(window_size)
        self.path = path
        self.prefix = prefix
        self.num_shards = num_shards

    def expand(self, pcoll):
        return (
            pcoll
            | "Group into fixed windows" >> beam.WindowInto(window.FixedWindows(self.window_size, 0))
            | "Decode windowed elements" >> beam.ParDo(Decode())
            | "Group into batches" >> beam.BatchElements()
            | "Add key" >> beam.WithKeys(lambda _: random.randint(0, int(self.num_shards) - 1))
            | "Write to GCS" >> beam.ParDo(WriteToGCS(self.path, self.prefix))
        )

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

染墨丶若流云 2025-02-16 22:44:58

用以下步骤解决了此问题:

            | "Add Path" >> beam.ParDo(AddFilename(self.path, self.prefix, self.num_shards))
            | "Group Paths" >> beam.GroupByKey()
            | "Join and encode" >> beam.ParDo(ToOneString())
            | "Write to GCS" >> beam.ParDo(WriteToGCS())

首先,我添加每个元素的路径,然后按路径进行分组(以防止同时写入同一文件/shard)。然后,我将每个组的JSON字符串(路径)串联(新行定义),然后将整个字符串写入文件。这大大降低了对GCS的调用,因为我没有将每个元素一个人逐一写入文件,而是先加入文件,然后再写一次。希望它对某人有帮助。

Solved this with these steps:

            | "Add Path" >> beam.ParDo(AddFilename(self.path, self.prefix, self.num_shards))
            | "Group Paths" >> beam.GroupByKey()
            | "Join and encode" >> beam.ParDo(ToOneString())
            | "Write to GCS" >> beam.ParDo(WriteToGCS())

First I add the paths per element, then I group by path (to prevent simultaneous writes to the same file/shard). Then I concatenate (new line delimited) the JSON strings per group (path) and then write the whole string to the file. This reduces the calls to GCS tremendously, since I dont write every element one by one to a file, but instead join them first and then write once. Hope it helps someone.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文