火花结构化流和Neo4J

发布于 2025-02-11 02:20:57 字数 1386 浏览 1 评论 0原文

我的目标是使用Spark结构化流从MongoDB集合中将转换的数据从MongoDB集合写入Neo4J。根据NEO4J文档的说法,“ neo4j连接器,适用于Apache Spark “版本4.1.2。

到目前为止,批次查询正常。但是,在下面的示例中,我遇到了一条错误消息:

spark-shell --packages org.mongodb.spark:mongo-spark-connector:10.0.2,org.neo4j:neo4j-connector-apache-spark_2.12:4.1.2_for_spark_3
val dfTxn = spark.readStream.format("mongodb")
.option("spark.mongodb.connection.uri", "mongodb://<IP>:<PORT>")
.option("spark.mongodb.database", "test")
.option("spark.mongodb.collection", "txn")
.option("park.mongodb.read.readPreference.name","primaryPreferred")
.option("spark.mongodb.change.stream.publish.full.document.only", "true")
.option("forceDeleteTempCheckpointLocation", "true").load()
val query = dfPaymentTx.writeStream.format("org.neo4j.spark.DataSource")
.option("url", "bolt://<IP>:<PORT>")
.option("save.mode", "Append")
.option("checkpointLocation", "/tmp/checkpoint/myCheckPoint")
.option("labels", "Account")
.option("node.keys", "txn_snd").start()

这给了我以下错误消息:

java.lang.UnsupportedOperationException: Data source org.neo4j.spark.DataSource does not support streamed writing

尽管连接器应以4.x版本开始正式支持流媒体。有人知道我做错了什么吗?

提前致谢!

My goal is to write transformed data from a MongoDB collection into Neo4j using Spark Structured Streaming. According to the Neo4j docs, this should be possible with the "Neo4j Connector for Apache Spark" version 4.1.2.

Batch queries so far work fine. However, with the following example below, I run into an error message:

spark-shell --packages org.mongodb.spark:mongo-spark-connector:10.0.2,org.neo4j:neo4j-connector-apache-spark_2.12:4.1.2_for_spark_3
val dfTxn = spark.readStream.format("mongodb")
.option("spark.mongodb.connection.uri", "mongodb://<IP>:<PORT>")
.option("spark.mongodb.database", "test")
.option("spark.mongodb.collection", "txn")
.option("park.mongodb.read.readPreference.name","primaryPreferred")
.option("spark.mongodb.change.stream.publish.full.document.only", "true")
.option("forceDeleteTempCheckpointLocation", "true").load()
val query = dfPaymentTx.writeStream.format("org.neo4j.spark.DataSource")
.option("url", "bolt://<IP>:<PORT>")
.option("save.mode", "Append")
.option("checkpointLocation", "/tmp/checkpoint/myCheckPoint")
.option("labels", "Account")
.option("node.keys", "txn_snd").start()

This gives me the following error message:

java.lang.UnsupportedOperationException: Data source org.neo4j.spark.DataSource does not support streamed writing

Although the Connector should officially support streaming starting with version 4.x. Does anybody have an idea what I'm doing wrong?

Thanks in advance!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

爺獨霸怡葒院 2025-02-18 02:20:57

INCASE,如果连接器不支持流媒体写作,则可以尝试以下类似。

您可以从Spark结构化流媒体中利用foreachBatch()功能,并在批处理模式下将数据写入neo4j。

https:https:// spark。 apache.org/docs/latest/sonstrucd-streaming-programing-guide.html#using-foreach-and-foreachbatch

def process_entry(df, id):
    df.write.ToNeo4j(url=url, table="mytopic", mode="append", properties=props)
    
query = df.writeStream.foreachBatch(process_entry).start()

在上述代码中,您可以使用Neo4J Writer逻辑,您可以使用数据库将数据写入数据库中批处理模式。

Incase, if the connector doesnt support streaming writes, you can try something like below.

you can leverage foreachBatch() functionality from spark structured streaming and write the data into Neo4j in batch mode.

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch

def process_entry(df, id):
    df.write.ToNeo4j(url=url, table="mytopic", mode="append", properties=props)
    
query = df.writeStream.foreachBatch(process_entry).start()

In the above code, you can have your Neo4j Writer logic and you can write the data into database using batch mode.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文