事件中心的AVRO字节不能用Pyspark进行重新序列化
我们将使用( azure.schemaregistry.encoder.avroencoder ))发送AVRO数据)使用独立的python作业,我们可以使用另一个独立的解码器使用同一解码器进行验证Python消费者。在这种情况下,架构注册表还提供给AVRO编码器。
这是我使用的独立生产商,
import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
os.environ["AZURE_CLIENT_ID"] = ''
os.environ["AZURE_TENANT_ID"] = ''
os.environ["AZURE_CLIENT_SECRET"] = ''
token_credential = DefaultAzureCredential()
fully_qualified_namespace = ""
group_name = "testSchemaReg"
eventhub_connection_str = ""
eventhub_name = ""
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name, auto_register=True)
eventhub_producer = EventHubProducerClient.from_connection_string(
conn_str=eventhub_connection_str,
eventhub_name=eventhub_name
)
with eventhub_producer, avro_encoder:
event_data_batch = eventhub_producer.create_batch()
dict_content = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
event_data = avro_encoder.encode(dict_content, schema=definition, message_type=EventData)
event_data_batch.add(event_data)
eventhub_producer.send_batch(event_data_batch)
我能够使用独立的消费者
async def on_event(partition_context, event):
print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'),
partition_context.partition_id))
print("message type is :")
print(type(event))
dec = avro_encoder.decode(event)
print("decoded msg:\n")
print(dec)
await partition_context.update_checkpoint(event)
async def main():
client = EventHubConsumerClient.from_connection_string(
"connection str"
"topic name",
consumer_group="$Default",
eventhub_name="")
async with client:
await client.receive(on_event=on_event, starting_position="-1")
作为下一步,我用 py py py-spark消费者在 Synapse笔记本上替换了独立的Python消费者。 /strong>。以下是我面临的问题来自
- _AVRO Spark中的功能,无法对使用Azure Encoder编码的AVRO消息进行序列化。
- 作为一轮工作,我绑着创建一个使用Azure编码器的UDF,但是我看到Azure Engoder期望该事件是类型的EventData,但是当Spark使用Event Hub API读取数据时,我们将获得数据,我们将获得数据。字节阵列。
@udf
def decode(row_msg):
encoder = AvroEncoder(client=schema_registry_client)
encoder.decode(bytes(row_msg))
- 我看不到我们可以与Spark或任何分布式系统一起使用的Deserializer上的任何适当的文档。 所有示例都与独立的客户有关。我们有可以与Spark/Flink一起使用的连接器吗?
We are sending Avro data encoded with (azure.schemaregistry.encoder.avroencoder) to Event-Hub using a standalone python job and we can deserialize using the same decoder using another standalone python consumer. The schema registry is also supplied to the Avro encoder in this case.
This is the stand alone producer I use
import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
os.environ["AZURE_CLIENT_ID"] = ''
os.environ["AZURE_TENANT_ID"] = ''
os.environ["AZURE_CLIENT_SECRET"] = ''
token_credential = DefaultAzureCredential()
fully_qualified_namespace = ""
group_name = "testSchemaReg"
eventhub_connection_str = ""
eventhub_name = ""
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name, auto_register=True)
eventhub_producer = EventHubProducerClient.from_connection_string(
conn_str=eventhub_connection_str,
eventhub_name=eventhub_name
)
with eventhub_producer, avro_encoder:
event_data_batch = eventhub_producer.create_batch()
dict_content = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
event_data = avro_encoder.encode(dict_content, schema=definition, message_type=EventData)
event_data_batch.add(event_data)
eventhub_producer.send_batch(event_data_batch)
I was able to deserialise using the stand alone consumer
async def on_event(partition_context, event):
print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'),
partition_context.partition_id))
print("message type is :")
print(type(event))
dec = avro_encoder.decode(event)
print("decoded msg:\n")
print(dec)
await partition_context.update_checkpoint(event)
async def main():
client = EventHubConsumerClient.from_connection_string(
"connection str"
"topic name",
consumer_group="$Default",
eventhub_name="")
async with client:
await client.receive(on_event=on_event, starting_position="-1")
As a next step , I replaced the standalone python consumer with the py-spark consumer running on synapse notebook. Below are the problems I faced
- The from_avro function in spark is not able to deserialize the Avro message encoded with azure encoder.
- As a work a round, I tied creating an UDF which makes use of azure encoder , but I see that azure encoder is expecting the event to be of type EventData, but when spark reads the data using event hub API, we get the data in Byte Array.
@udf
def decode(row_msg):
encoder = AvroEncoder(client=schema_registry_client)
encoder.decode(bytes(row_msg))
- I don't see any proper documentation on the deserializer that we can use with spark or any distributed system.
All examples are with Stand Alone clients. Do we have any connector that we can use with spark/Flink ?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
回答我自己的问题,Azure Event Hub模式注册表不支持SPARK或任何分布式系统。
他们正在努力并试图添加此支持以激发
https://github.com/azure/azure/azure-azure-event-hubs--hubs----------火花/拉/573
Answering my own question, azure event hub schema registry doesn't support spark or any distributed system.
They are working on it and trying to add this support to spark
https://github.com/Azure/azure-event-hubs-spark/pull/573
因为AVRO模式不是有效载荷的一部分(“数据”),所以Spark中的
from_avro
函数将无法对消息进行估算。这应该是可以预料的。为了解码,您还需要将关联的
content_type
在eventdata对象上的值传递到解码方法中。此content_type
值保留将用于检索用于验证的模式的模式ID。您可以在MessageContent
dict中设置content_type
以及content
。 此样本应该有用。我们目前没有与Spark/Flint一起使用的连接器。但是,如果这是您感兴趣的,请随时在此处提交功能 - 请求问题: https://github.com/azure/azure-sdk-for-python/issues 。
Because the avro schema is not part of the payload ("the data"), the
from_avro
function in spark will not be able to deserialize the message. This should be expected.In order to decode, you also need to pass the associated
content_type
value on the EventData object into the decode method. Thiscontent_type
value holds the schema ID that will be used to retrieve the schema used for deserialization. You can setcontent_type
along withcontent
in theMessageContent
dict. This sample should be helpful.We currently don't have a connector to be used with spark/flint. However, if this is something you're interested in, please feel free to file a feature-request issue here: https://github.com/Azure/azure-sdk-for-python/issues.