Flink-可以将表转换为dataStream

发布于 2025-02-12 18:38:23 字数 1735 浏览 1 评论 0原文

我设法使用Pyflink表API连接到运动型并处理数据流。我现在试图将此表转换为数据流,因为我需要更多的低级处理。

我已经尝试在此处按照此示例 https://nightlies.apache.org/flink/flink/flink-docs-master/docs/dev/table/data_stream_api/#exampleas/#exampleass-for-todatastream

AttributeError: 'TableEnvironment' object has no attribute 'to_data_stream'

似乎很明显,但是将我的代码与示例进行比较,我看不到我缺少什么。

我使用的代码是:

from pyflink.table import (EnvironmentSettings, TableEnvironment, StreamTableEnvironment, TableDescriptor, Schema,
                           DataTypes, FormatDescriptor, AggregateFunction)
from pyflink.datastream import StreamExecutionEnvironment


t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().set("parallelism.default", "1")
t_env.get_config().set("pipeline.jars", "file:///home/ubuntu/connectors/flink-sql-connector-kinesis-1.15.0.jar")

#Create stream env
env = StreamExecutionEnvironment.get_execution_environment()

# Create sources
source_ddl = """
        CREATE TABLE source(
            decoded_timestamp TIMESTAMP(3),
            lon DOUBLE,
            lat DOUBLE,
            WATERMARK FOR decoded_timestamp AS decoded_timestamp
        ) WITH (
          'connector' = 'kinesis',
          'stream' = 'flink_formatted',
          'scan.startup.mode' = 'latest-offset',
          'format' = 'json',
          'aws.region' = 'eu-west-1'
        )
        """


# Trigger execution of job
t_env.execute_sql(source_ddl)
tab = t_env.from_path('source')

ds = t_env.to_data_stream(tab)

谢谢!

I've managed to use the Pyflink table API to connect to Kinesis and process a stream of data. I'm now trying to convert this table to a DataStream as I need more low level processing.

I've tried following the example here https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/data_stream_api/#examples-for-todatastream but I'm getting the error:

AttributeError: 'TableEnvironment' object has no attribute 'to_data_stream'

I know it seems obvious but comparing my code to the example I can't see what I'm missing.

The code I've used is:

from pyflink.table import (EnvironmentSettings, TableEnvironment, StreamTableEnvironment, TableDescriptor, Schema,
                           DataTypes, FormatDescriptor, AggregateFunction)
from pyflink.datastream import StreamExecutionEnvironment


t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().set("parallelism.default", "1")
t_env.get_config().set("pipeline.jars", "file:///home/ubuntu/connectors/flink-sql-connector-kinesis-1.15.0.jar")

#Create stream env
env = StreamExecutionEnvironment.get_execution_environment()

# Create sources
source_ddl = """
        CREATE TABLE source(
            decoded_timestamp TIMESTAMP(3),
            lon DOUBLE,
            lat DOUBLE,
            WATERMARK FOR decoded_timestamp AS decoded_timestamp
        ) WITH (
          'connector' = 'kinesis',
          'stream' = 'flink_formatted',
          'scan.startup.mode' = 'latest-offset',
          'format' = 'json',
          'aws.region' = 'eu-west-1'
        )
        """


# Trigger execution of job
t_env.execute_sql(source_ddl)
tab = t_env.from_path('source')

ds = t_env.to_data_stream(tab)

Thanks!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

给我一枪 2025-02-19 18:38:23

尝试重新组织这样的初始化:

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

t_env = StreamTableEnvironment.create(stream_execution_environment=env)
t_env.get_config().set("pipeline.jars", "file:///home/ubuntu/connectors/flink-sql-connector-kinesis-1.15.0.jar")

Try reorganizing the initialization like this:

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

t_env = StreamTableEnvironment.create(stream_execution_environment=env)
t_env.get_config().set("pipeline.jars", "file:///home/ubuntu/connectors/flink-sql-connector-kinesis-1.15.0.jar")
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文