通过火花流和删除Phoenix/HBase数据接收KAFKA消息
在我的项目中,我有当前的工作流程: kafka消息=>火花流/处理=>将/更新插入HBase和/或Phoenix
,插入和更新操作直接或通过Phoenix使用(我测试了两种情况)。
现在,如果我在Kafka上收到一条特定消息,我想在HBase/Phoenix中删除数据。我没有找到有关如何在流媒体传输时可以做到这一点的任何线索/文档。
我找到了一种使用HBase和Phoenix删除“静态”/“批处理”模式的数据的方法,但是在流中时,相同的代码不起作用(尽管没有错误,但根本没有删除数据)。
这是我们尝试删除零件的方式(我们首先创建一个镶木quet文件,然后在其上创建一个“ readstream”来启动“伪造”流):
主对象:
import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.functions.{col, concat_ws} import org.apache.spark.sql.streaming.DataStreamWriter
object AppMain{ def main(args: Array[String]): Unit = {
val spark : SparkSession = SparkSession.builder().getOrCreate()
import spark.implicits._
//algo_name, partner_code, site_code, indicator
val df = Seq(("FOO","FII"),
("FOO","FUU")
).toDF("col_1","col_2")
df.write.mode("overwrite").parquet("testParquetStream")
df.printSchema()
df.show(false)
val dfStreaming = spark.readStream.schema(df.schema).parquet("testParquetStream")
dfStreaming.printSchema()
val dfWithConcat = dfStreaming.withColumn("row", concat_ws("\u0000" , col("col_1"),col("col_2"))).select("row")
// using delete class
val withDeleteClass : WithDeleteClass = new WithDeleteClass(spark, dfWithConcat)
withDeleteClass.delete_data()
// using JDBC/Phoenix
//val withJDBCSink : WithJDBCSink = new WithJDBCSink(spark, dfStreaming)
//withJDBCSink.delete_data() }
< em> withjdbcsink class
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.streaming.DataStreamWriter
class WithJDBCSink (spark : SparkSession,dfToDelete : DataFrame){
val df_writer = dfToDelete.writeStream
def delete_data():Unit = {
val writer = new JDBCSink()
df_writer.foreach(writer).outputMode("append").start()
spark.streams.awaitAnyTermination()
}
}
import java.sql.{DriverManager, PreparedStatement, Statement}
class JDBCSink() extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row] {
val quorum = "phoenix_quorum"
var connection: java.sql.Connection = null
var statement: Statement = null
var ps : PreparedStatement= null
def open(partitionId: Long, version: Long): Boolean = {
connection = DriverManager.getConnection(s"jdbc:phoenix:$quorum")
statement = connection.createStatement()
true
}
def process(row: org.apache.spark.sql.Row): Unit = {
//-----------------------Method 1
connection = DriverManager.getConnection(s"jdbc:phoenix:$quorum")
val query = s"DELETE from TABLE_NAME WHERE key_1 = 'val1' and key_2 = 'val2'"
statement = connection.createStatement()
statement.executeUpdate(query)
connection.commit()
//-----------------------Method 2
//val query2 = s"DELETE from TABLE_NAME WHERE key_1 = ? and key_2 = ?"
//connection = DriverManager.getConnection(s"jdbc:phoenix:$quorum")
//ps = connection.prepareStatement(query2)
//ps.setString(1, "val1")
//ps.setString(2, "val2")
//ps.executeUpdate()
//connection.commit()
}
def close(errorOrNull: Throwable): Unit = {
connection.commit()
connection.close
}
}
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Delete, HTable, RetriesExhaustedWithDetailsException, Row, Table}
import org.apache.hadoop.hbase.ipc.CallTimeoutException
import org.apache.hadoop.hbase.util.Bytes
import org.apache.log4j.Logger
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import java.util
import org.apache.spark.sql.streaming.DataStreamWriter
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.storage.StorageLevel
import java.sql.Connection
import java.sql.DriverManager
import java.sql.ResultSet
import java.sql.SQLException
import java.sql.Statement
class WithDeleteClass (spark : SparkSession, dfToDelete : DataFrame){
val finalData = dfToDelete
var df_writer = dfToDelete.writeStream
var dfWriter = finalData.writeStream.outputMode("append").format("console").start()
def delete_data(): Unit = {
deleteDataObj.open()
df_writer.foreachBatch((output : DataFrame, batchId : Long) =>
deleteDataObj.process(output)
)
df_writer.start()
}
object deleteDataObj{
var quorum = "hbase_quorum"
var zkPort = "portNumber"
var hbConf = HBaseConfiguration.create()
hbConf.set("hbase.zookeeper.quorum", quorum)
hbConf.set("hbase.zookeeper.property.clientPort", zkPort)
var tableName: TableName = TableName.valueOf("TABLE_NAME")
var conn = ConnectionFactory.createConnection(hbConf)
var table: Table = conn.getTable(tableName)
var hTable: HTable = table.asInstanceOf[HTable]
def open() : Unit = {
}
def process(df : DataFrame) : Unit = {
val rdd : RDD[Array[Byte]] = df.rdd.map(row => Bytes.toBytes(row(0).toString))
val deletions : util.ArrayList[Delete] = new util.ArrayList()
//List all rows to delete
rdd.foreach(row => {
val delete: Delete = new Delete(row)
delete.addColumns(Bytes.toBytes("0"), Bytes.toBytes("DATA"))
deletions.add(delete)
})
hTable.delete(deletions)
}
def close(): Unit = {}
}
}
jdbcSink类将不胜感激
In my project, I have the current workflow:
Kafka message => Spark Streaming/processing => Insert/Update to HBase and/or Phoenix
Both the Insert and Update operation works with HBase directly or through Phoenix (I tested both case).
Now I would like to delete data in HBase/Phoenix if I receive a specific message on Kafka. I did not find any clues/documentation on how I could do that while streaming.
I have found a way to delete data in "static"/"batch" mode with both HBase and Phoenix, but the same code does not work when on streaming (there is no error though, the data is simply not deleted).
Here is how we tried the delete part (we first create a parquet file on which we make a "readStream" to start a "fake" stream):
Main Object:
import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.functions.{col, concat_ws} import org.apache.spark.sql.streaming.DataStreamWriter
object AppMain{ def main(args: Array[String]): Unit = {
val spark : SparkSession = SparkSession.builder().getOrCreate()
import spark.implicits._
//algo_name, partner_code, site_code, indicator
val df = Seq(("FOO","FII"),
("FOO","FUU")
).toDF("col_1","col_2")
df.write.mode("overwrite").parquet("testParquetStream")
df.printSchema()
df.show(false)
val dfStreaming = spark.readStream.schema(df.schema).parquet("testParquetStream")
dfStreaming.printSchema()
val dfWithConcat = dfStreaming.withColumn("row", concat_ws("\u0000" , col("col_1"),col("col_2"))).select("row")
// using delete class
val withDeleteClass : WithDeleteClass = new WithDeleteClass(spark, dfWithConcat)
withDeleteClass.delete_data()
// using JDBC/Phoenix
//val withJDBCSink : WithJDBCSink = new WithJDBCSink(spark, dfStreaming)
//withJDBCSink.delete_data() }
WithJDBCSink class
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.streaming.DataStreamWriter
class WithJDBCSink (spark : SparkSession,dfToDelete : DataFrame){
val df_writer = dfToDelete.writeStream
def delete_data():Unit = {
val writer = new JDBCSink()
df_writer.foreach(writer).outputMode("append").start()
spark.streams.awaitAnyTermination()
}
}
JDBCSink class
import java.sql.{DriverManager, PreparedStatement, Statement}
class JDBCSink() extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row] {
val quorum = "phoenix_quorum"
var connection: java.sql.Connection = null
var statement: Statement = null
var ps : PreparedStatement= null
def open(partitionId: Long, version: Long): Boolean = {
connection = DriverManager.getConnection(s"jdbc:phoenix:$quorum")
statement = connection.createStatement()
true
}
def process(row: org.apache.spark.sql.Row): Unit = {
//-----------------------Method 1
connection = DriverManager.getConnection(s"jdbc:phoenix:$quorum")
val query = s"DELETE from TABLE_NAME WHERE key_1 = 'val1' and key_2 = 'val2'"
statement = connection.createStatement()
statement.executeUpdate(query)
connection.commit()
//-----------------------Method 2
//val query2 = s"DELETE from TABLE_NAME WHERE key_1 = ? and key_2 = ?"
//connection = DriverManager.getConnection(s"jdbc:phoenix:$quorum")
//ps = connection.prepareStatement(query2)
//ps.setString(1, "val1")
//ps.setString(2, "val2")
//ps.executeUpdate()
//connection.commit()
}
def close(errorOrNull: Throwable): Unit = {
connection.commit()
connection.close
}
}
WithDeleteClass
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Delete, HTable, RetriesExhaustedWithDetailsException, Row, Table}
import org.apache.hadoop.hbase.ipc.CallTimeoutException
import org.apache.hadoop.hbase.util.Bytes
import org.apache.log4j.Logger
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import java.util
import org.apache.spark.sql.streaming.DataStreamWriter
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.storage.StorageLevel
import java.sql.Connection
import java.sql.DriverManager
import java.sql.ResultSet
import java.sql.SQLException
import java.sql.Statement
class WithDeleteClass (spark : SparkSession, dfToDelete : DataFrame){
val finalData = dfToDelete
var df_writer = dfToDelete.writeStream
var dfWriter = finalData.writeStream.outputMode("append").format("console").start()
def delete_data(): Unit = {
deleteDataObj.open()
df_writer.foreachBatch((output : DataFrame, batchId : Long) =>
deleteDataObj.process(output)
)
df_writer.start()
}
object deleteDataObj{
var quorum = "hbase_quorum"
var zkPort = "portNumber"
var hbConf = HBaseConfiguration.create()
hbConf.set("hbase.zookeeper.quorum", quorum)
hbConf.set("hbase.zookeeper.property.clientPort", zkPort)
var tableName: TableName = TableName.valueOf("TABLE_NAME")
var conn = ConnectionFactory.createConnection(hbConf)
var table: Table = conn.getTable(tableName)
var hTable: HTable = table.asInstanceOf[HTable]
def open() : Unit = {
}
def process(df : DataFrame) : Unit = {
val rdd : RDD[Array[Byte]] = df.rdd.map(row => Bytes.toBytes(row(0).toString))
val deletions : util.ArrayList[Delete] = new util.ArrayList()
//List all rows to delete
rdd.foreach(row => {
val delete: Delete = new Delete(row)
delete.addColumns(Bytes.toBytes("0"), Bytes.toBytes("DATA"))
deletions.add(delete)
})
hTable.delete(deletions)
}
def close(): Unit = {}
}
}
Any help/pointers would be greatly appreciated
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论