使用dbt时如何手动创建表或使用postgres分区?

发布于 2025-01-16 06:42:46 字数 1633 浏览 2 评论 0原文

我想使用 dbt 将数据插入到分区表中,但发现 postgres 不支持 dbt 分区。

通过另一种方式,我在 pre_hook 中创建表和分区,但在 dbt 运行时收到错误“关系 'download_counts_p' 已存在”

有什么建议吗?这是我的 SQL 和 pre_hook 配置

{{ config(
    materialized = 'table',
    indexes = [ ],
    pre_hook=[
        'CREATE TABLE IF NOT EXISTS "download_counts_p" (
                                              "channel_id" int8 NOT NULL,
                                              "product_id" int8 NOT NULL,
                                              "country_code" text NOT NULL,
                                              "year" int2  NULL,
                                              "month" int2 NOT NULL,
                                              "count" int8 NOT NULL,
                                              "count" int8 NOT NULL,
                                              "months" int8 NOT NULL
                                         ) partition by list(country_code)',
        "DO $$
    Declare unique_country_code varchar;
    BEGIN
        FOR unique_country_code IN
            SELECT country_code as unique_country_code FROM download_counts group by country_code

            LOOP
                EXECUTE  format('create table IF NOT EXISTS download_counts_p_%s partition of download_counts_p for values in (''%s'')', upper(unique_country_code), unique_country_code);
            END LOOP;
    END; $$;"]
)}}


select 1

输入图片此处描述

I want to insert data into a partitioned table using dbt, but found no support for dbt postgres partition.

By another way, i create table and partition in pre_hook, but got the error "relation 'download_counts_p' already exists" when dbt run

Are there any suggestions ? Here is my SQL and pre_hook config

{{ config(
    materialized = 'table',
    indexes = [ ],
    pre_hook=[
        'CREATE TABLE IF NOT EXISTS "download_counts_p" (
                                              "channel_id" int8 NOT NULL,
                                              "product_id" int8 NOT NULL,
                                              "country_code" text NOT NULL,
                                              "year" int2  NULL,
                                              "month" int2 NOT NULL,
                                              "count" int8 NOT NULL,
                                              "count" int8 NOT NULL,
                                              "months" int8 NOT NULL
                                         ) partition by list(country_code)',
        "DO $
    Declare unique_country_code varchar;
    BEGIN
        FOR unique_country_code IN
            SELECT country_code as unique_country_code FROM download_counts group by country_code

            LOOP
                EXECUTE  format('create table IF NOT EXISTS download_counts_p_%s partition of download_counts_p for values in (''%s'')', upper(unique_country_code), unique_country_code);
            END LOOP;
    END; $;"]
)}}


select 1

enter image description here

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

神也荒唐 2025-01-23 06:42:46

这里发生了一些不同的事情。

  1. 您是正确的,dbt 的 postgres 适配器当前不支持配置分区表。这里的解决方案是创建一个插入额外 DDL 的新物化你需要。您不想在 dbt 中手动编写 DDL——这被认为是一个很大的反模式。您可能还想在 dbt-core(postgres 适配器代码所在的位置)中打开一个问题来发出功能请求
  2. 您的 FOR ... LOOP ... END LOOP 挂钩实际上应该编写为它的自己的模型,使用 jinja 循环。您可以使用 run_query 将数据返回到 jinja 上下文 宏。该页面底部有一个示例,它将查询结果提取到 jinja 上下文中,然后使用 {% for payment_method in results_list %} 循环它们

There are a few different things going on here.

  1. You are correct that dbt's postgres adapter does not currently support a config for partitioned tables. The solution here is to create a new materialization that inserts the extra DDL you need. You don't want to be manually writing DDL in dbt -- this is considered a big antipattern. You may also want to open an issue in dbt-core (where the postgres adapter code lives) to make a feature request
  2. Your FOR ... LOOP ... END LOOP hook should really be written as its own model, using a jinja loop. You can return data to the jinja context using the run_query macro. There is an example toward the bottom of that page that fetches the query results into the jinja context and then loops over them with {% for payment_method in results_list %}
恏ㄋ傷疤忘ㄋ疼 2025-01-23 06:42:46

在 DBT DDL 中,表的创建是自动完成的,运行 pre-hook 或 post-hook 不会有任何帮助,因为 PostgreSQL 不允许更改表以通过子句进行分区。

而不是 pre_hook 块 - 这也需要为每个模块完成 -。可以修改 DBT postgres 中的创建表语句来处理表分区创建。该解决方案支持范围和列表类型。

作为 PostgreSQL DDL 语句的示例,

            CREATE TABLE measurement (
                city_id         int not null,
                logdate         date not null,
                peaktemp        int,
                unitsales       int
            ) PARTITION BY RANGE (logdate); 

将此 DDL 修改为具有 PARTITION BY 子句后,可以创建并附加分区

应用适配

1- 在项目中找到 postgres dbt 适配器,通常路径如下:

venv/lib/python3.10/site-packages/dbt/include/postgres/macros/adapters.sql

2- 修改 < 的内容code>postgres__create_table_as 宏如下:

{% macro postgres__create_table_as(temporary, relation, sql) -%}
{%- set unlogged = config.get(‘unlogged’, default=false) -%}
{%- set sql_header = config.get(‘sql_header’, none) -%}

{%- set fields_string = config.get(‘fields_string’, none) -%}

{%- set partition_type = config.get(‘partition_type’, none) -%}
{%- set partition_column = config.get(‘partition_column’, none) -%}

{%- set partition_range_start_end = config.get(‘partition_range_start_end’, none) -%}
{%- set partition_list_values = config.get(‘partition_list_values’, none) -%}

{%- set is_partition = partition_type is not none -%}

{{ sql_header if sql_header is not none }}

create {% if temporary -%}
temporary
{%- elif unlogged -%}
unlogged
{%- endif %} table {{ relation }}
as (
{{ sql }}
{% if is_partition and not temporary %}
limit 0
{% endif %}
);

{% if is_partition and not temporary %}

{%- set relation_str=relation -%}
{%- set relation_str -%}
{{ relation_str|string|replace(‘“‘, ‘’) }}
{%- endset -%}

{%- set relation_str2-%}{{relation_str}}_tmp4part{%- endset -%}
{%- set relation_str2-%}”{{relation_str2|replace(‘.’,’”.”’)}}”{%- endset -%}

CREATE TABLE {{relation_str2}} ( like {{ relation }} including all) PARTITION BY {{ partition_type }} ({{ partition_column }});
DROP TABLE {{ relation }};
CREATE TABLE {{ relation }} ( like {{relation_str2}} including all ) PARTITION BY {{ partition_type }} ({{ partition_column }});
DROP TABLE {{relation_str2}};

{% if partition_type|lower == ‘range’ %}
{% for rng in partition_range_start_end %}
{% set start = rng[0] %}
{% set end = rng[1] %}
{%- set table4part-%}{{relation_str}}_{{loop.index}} {%- endset -%}
{%- set table4part-%}”{{table4part|replace(‘.’,’”.”’)}}” {%- endset -%}
CREATE TABLE {{table4part}} PARTITION OF {{ relation }} FOR VALUES FROM (‘{{start}}’) TO (‘{{end}}’);
{% endfor %}
{% elif partition_type|lower == ‘list’ %}
{% for val in partition_list_values.split(‘,’) %}
{%- set table4part-%}{{relation_str}}_{{loop.index}} {%- endset -%}
{%- set table4part-%}”{{table4part|replace(‘.’,’”.”’)}}” {%- endset -%}
CREATE TABLE {{table4part}} PARTITION OF {{ relation }} FOR VALUES IN ({{val}});
{% endfor %}
{%- endif %}

{# INSERT DATA #}
insert into {{ relation }} (
{{ sql }}
);
{% endif %}

{%- endmacro %}

3- 在要创建的模块中选择分区列表,如下所示:

            {%- set partition_type = ‘LIST’ -%}
            {%- set partition_column = ‘level’ -%}
            {%- set partition_list_values=’1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51'-%}

            {{
            config(
            materialized=’incremental’,
            partition_type=partition_type,
            partition_column=partition_column,
            partition_list_values=partition_list_values,
            unique_key=[‘col1’,’col2’]
            )
            }}

参考和更多详细信息 https://medium.com/@fmohammad_91999/postgresql-table-partitioning- in-dbt-6d6ed82e90ca

In DBT DDL table creation is automatically done and running pre-hook or post-hook will not be help since PostgreSQL do not allow alter the table to have partition by clause.

Instead of pre_hook blocks -which also need to be done for every module-. The create table statement from DBT postgres can be modified to handle table partition creation. Both range and list types are supported by this solution.

as an example of PostgreSQL DDL statement

            CREATE TABLE measurement (
                city_id         int not null,
                logdate         date not null,
                peaktemp        int,
                unitsales       int
            ) PARTITION BY RANGE (logdate); 

after this DDL is modified to have the PARTITION BY clause then partitions can be created and attached

Apply the adaptation

1- find the postgres dbt adapter in your project, normally path like:

venv/lib/python3.10/site-packages/dbt/include/postgres/macros/adapters.sql

2- modify the content of postgres__create_table_as macro to be as below:

{% macro postgres__create_table_as(temporary, relation, sql) -%}
{%- set unlogged = config.get(‘unlogged’, default=false) -%}
{%- set sql_header = config.get(‘sql_header’, none) -%}

{%- set fields_string = config.get(‘fields_string’, none) -%}

{%- set partition_type = config.get(‘partition_type’, none) -%}
{%- set partition_column = config.get(‘partition_column’, none) -%}

{%- set partition_range_start_end = config.get(‘partition_range_start_end’, none) -%}
{%- set partition_list_values = config.get(‘partition_list_values’, none) -%}

{%- set is_partition = partition_type is not none -%}

{{ sql_header if sql_header is not none }}

create {% if temporary -%}
temporary
{%- elif unlogged -%}
unlogged
{%- endif %} table {{ relation }}
as (
{{ sql }}
{% if is_partition and not temporary %}
limit 0
{% endif %}
);

{% if is_partition and not temporary %}

{%- set relation_str=relation -%}
{%- set relation_str -%}
{{ relation_str|string|replace(‘“‘, ‘’) }}
{%- endset -%}

{%- set relation_str2-%}{{relation_str}}_tmp4part{%- endset -%}
{%- set relation_str2-%}”{{relation_str2|replace(‘.’,’”.”’)}}”{%- endset -%}

CREATE TABLE {{relation_str2}} ( like {{ relation }} including all) PARTITION BY {{ partition_type }} ({{ partition_column }});
DROP TABLE {{ relation }};
CREATE TABLE {{ relation }} ( like {{relation_str2}} including all ) PARTITION BY {{ partition_type }} ({{ partition_column }});
DROP TABLE {{relation_str2}};

{% if partition_type|lower == ‘range’ %}
{% for rng in partition_range_start_end %}
{% set start = rng[0] %}
{% set end = rng[1] %}
{%- set table4part-%}{{relation_str}}_{{loop.index}} {%- endset -%}
{%- set table4part-%}”{{table4part|replace(‘.’,’”.”’)}}” {%- endset -%}
CREATE TABLE {{table4part}} PARTITION OF {{ relation }} FOR VALUES FROM (‘{{start}}’) TO (‘{{end}}’);
{% endfor %}
{% elif partition_type|lower == ‘list’ %}
{% for val in partition_list_values.split(‘,’) %}
{%- set table4part-%}{{relation_str}}_{{loop.index}} {%- endset -%}
{%- set table4part-%}”{{table4part|replace(‘.’,’”.”’)}}” {%- endset -%}
CREATE TABLE {{table4part}} PARTITION OF {{ relation }} FOR VALUES IN ({{val}});
{% endfor %}
{%- endif %}

{# INSERT DATA #}
insert into {{ relation }} (
{{ sql }}
);
{% endif %}

{%- endmacro %}

3- Choose the partitioning list in the module to be created like below for example:

            {%- set partition_type = ‘LIST’ -%}
            {%- set partition_column = ‘level’ -%}
            {%- set partition_list_values=’1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51'-%}

            {{
            config(
            materialized=’incremental’,
            partition_type=partition_type,
            partition_column=partition_column,
            partition_list_values=partition_list_values,
            unique_key=[‘col1’,’col2’]
            )
            }}

Reference and More details https://medium.com/@fmohammad_91999/postgresql-table-partitioning-in-dbt-6d6ed82e90ca

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文