Flink-可以将表转换为dataStream
我设法使用Pyflink表API连接到运动型并处理数据流。我现在试图将此表转换为数据流,因为我需要更多的低级处理。
AttributeError: 'TableEnvironment' object has no attribute 'to_data_stream'
似乎很明显,但是将我的代码与示例进行比较,我看不到我缺少什么。
我使用的代码是:
from pyflink.table import (EnvironmentSettings, TableEnvironment, StreamTableEnvironment, TableDescriptor, Schema,
DataTypes, FormatDescriptor, AggregateFunction)
from pyflink.datastream import StreamExecutionEnvironment
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().set("parallelism.default", "1")
t_env.get_config().set("pipeline.jars", "file:///home/ubuntu/connectors/flink-sql-connector-kinesis-1.15.0.jar")
#Create stream env
env = StreamExecutionEnvironment.get_execution_environment()
# Create sources
source_ddl = """
CREATE TABLE source(
decoded_timestamp TIMESTAMP(3),
lon DOUBLE,
lat DOUBLE,
WATERMARK FOR decoded_timestamp AS decoded_timestamp
) WITH (
'connector' = 'kinesis',
'stream' = 'flink_formatted',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'aws.region' = 'eu-west-1'
)
"""
# Trigger execution of job
t_env.execute_sql(source_ddl)
tab = t_env.from_path('source')
ds = t_env.to_data_stream(tab)
谢谢!
I've managed to use the Pyflink table API to connect to Kinesis and process a stream of data. I'm now trying to convert this table to a DataStream as I need more low level processing.
I've tried following the example here https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/data_stream_api/#examples-for-todatastream but I'm getting the error:
AttributeError: 'TableEnvironment' object has no attribute 'to_data_stream'
I know it seems obvious but comparing my code to the example I can't see what I'm missing.
The code I've used is:
from pyflink.table import (EnvironmentSettings, TableEnvironment, StreamTableEnvironment, TableDescriptor, Schema,
DataTypes, FormatDescriptor, AggregateFunction)
from pyflink.datastream import StreamExecutionEnvironment
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().set("parallelism.default", "1")
t_env.get_config().set("pipeline.jars", "file:///home/ubuntu/connectors/flink-sql-connector-kinesis-1.15.0.jar")
#Create stream env
env = StreamExecutionEnvironment.get_execution_environment()
# Create sources
source_ddl = """
CREATE TABLE source(
decoded_timestamp TIMESTAMP(3),
lon DOUBLE,
lat DOUBLE,
WATERMARK FOR decoded_timestamp AS decoded_timestamp
) WITH (
'connector' = 'kinesis',
'stream' = 'flink_formatted',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'aws.region' = 'eu-west-1'
)
"""
# Trigger execution of job
t_env.execute_sql(source_ddl)
tab = t_env.from_path('source')
ds = t_env.to_data_stream(tab)
Thanks!
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
尝试重新组织这样的初始化:
Try reorganizing the initialization like this: