通过火花流和删除Phoenix/HBase数据接收KAFKA消息

发布于 2025-01-28 13:55:55 字数 5550 浏览 2 评论 0原文

在我的项目中,我有当前的工作流程: kafka消息=>火花流/处理=>将/更新插入HBase和/或Phoenix

,插入和更新操作直接或通过Phoenix使用(我测试了两种情况)。

现在,如果我在Kafka上收到一条特定消息,我想在HBase/Phoenix中删除数据。我没有找到有关如何在流媒体传输时可以做到这一点的任何线索/文档。

我找到了一种使用HBase和Phoenix删除“静态”/“批处理”模式的数据的方法,但是在流中时,相同的代码不起作用(尽管没有错误,但根本没有删除数据)。

这是我们尝试删除零件的方式(我们首先创建一个镶木quet文件,然后在其上创建一个“ readstream”来启动“伪造”流):

主对象:

import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.functions.{col, concat_ws} import org.apache.spark.sql.streaming.DataStreamWriter

object AppMain{   def main(args: Array[String]): Unit = {
    val spark : SparkSession = SparkSession.builder().getOrCreate()


    import spark.implicits._
    //algo_name, partner_code, site_code, indicator
    val df = Seq(("FOO","FII"),
      ("FOO","FUU")
    ).toDF("col_1","col_2")

    df.write.mode("overwrite").parquet("testParquetStream")
    df.printSchema()
    df.show(false)

    val dfStreaming = spark.readStream.schema(df.schema).parquet("testParquetStream")
    dfStreaming.printSchema()
    val dfWithConcat = dfStreaming.withColumn("row", concat_ws("\u0000" , col("col_1"),col("col_2"))).select("row")

    // using delete class
    val withDeleteClass : WithDeleteClass = new WithDeleteClass(spark, dfWithConcat)
    withDeleteClass.delete_data()
    // using JDBC/Phoenix 
    //val withJDBCSink : WithJDBCSink = new WithJDBCSink(spark, dfStreaming)
    //withJDBCSink.delete_data() }

< em> withjdbcsink class

import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.streaming.DataStreamWriter

class WithJDBCSink (spark : SparkSession,dfToDelete : DataFrame){

  val df_writer = dfToDelete.writeStream

  def delete_data():Unit = {
    val writer = new JDBCSink()
    df_writer.foreach(writer).outputMode("append").start()
    spark.streams.awaitAnyTermination()
  }

}

import java.sql.{DriverManager, PreparedStatement, Statement}

class JDBCSink() extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row] {

  val quorum = "phoenix_quorum"
  var connection: java.sql.Connection = null
  var statement: Statement = null
  var ps : PreparedStatement= null
  def open(partitionId: Long, version: Long): Boolean = {
    connection = DriverManager.getConnection(s"jdbc:phoenix:$quorum")
    statement = connection.createStatement()
    true
  }

  def process(row: org.apache.spark.sql.Row): Unit = {
    //-----------------------Method 1
    connection = DriverManager.getConnection(s"jdbc:phoenix:$quorum")
    val query = s"DELETE from TABLE_NAME WHERE key_1 = 'val1' and key_2 = 'val2'"
    statement = connection.createStatement()
    statement.executeUpdate(query)
    connection.commit()

    //-----------------------Method 2
    //val query2 = s"DELETE from TABLE_NAME WHERE key_1 = ? and key_2 = ?"
    //connection = DriverManager.getConnection(s"jdbc:phoenix:$quorum")
    //ps = connection.prepareStatement(query2)
    //ps.setString(1, "val1")
    //ps.setString(2, "val2")
    //ps.executeUpdate()
    //connection.commit()
  }

  def close(errorOrNull: Throwable): Unit = {
    connection.commit()
    connection.close
  }
}

import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Delete, HTable, RetriesExhaustedWithDetailsException, Row, Table}
import org.apache.hadoop.hbase.ipc.CallTimeoutException
import org.apache.hadoop.hbase.util.Bytes
import org.apache.log4j.Logger
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import java.util
import org.apache.spark.sql.streaming.DataStreamWriter
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.storage.StorageLevel

import java.sql.Connection
import java.sql.DriverManager
import java.sql.ResultSet
import java.sql.SQLException
import java.sql.Statement

class WithDeleteClass (spark : SparkSession, dfToDelete : DataFrame){
  val finalData = dfToDelete
  var df_writer = dfToDelete.writeStream
  var dfWriter = finalData.writeStream.outputMode("append").format("console").start()

  def delete_data(): Unit = {
    deleteDataObj.open()
    df_writer.foreachBatch((output : DataFrame, batchId : Long) =>
      deleteDataObj.process(output)
    )
    df_writer.start()

  }

  object deleteDataObj{
    var quorum = "hbase_quorum"
    var zkPort = "portNumber"
    var hbConf = HBaseConfiguration.create()
    hbConf.set("hbase.zookeeper.quorum", quorum)
    hbConf.set("hbase.zookeeper.property.clientPort", zkPort)
    var tableName: TableName = TableName.valueOf("TABLE_NAME")
    var conn = ConnectionFactory.createConnection(hbConf)
    var table: Table = conn.getTable(tableName)
    var hTable: HTable = table.asInstanceOf[HTable]
    def open() : Unit = {
    }
    def process(df : DataFrame) : Unit = {
      val rdd : RDD[Array[Byte]] = df.rdd.map(row => Bytes.toBytes(row(0).toString))
      val deletions : util.ArrayList[Delete] = new util.ArrayList()
      //List all rows to delete
      rdd.foreach(row => {
        val delete: Delete = new Delete(row)
        delete.addColumns(Bytes.toBytes("0"), Bytes.toBytes("DATA"))
        deletions.add(delete)
      })
      hTable.delete(deletions)
    }
    def close(): Unit = {}
  }
}

jdbcSink类将不胜感激

In my project, I have the current workflow:
Kafka message => Spark Streaming/processing => Insert/Update to HBase and/or Phoenix

Both the Insert and Update operation works with HBase directly or through Phoenix (I tested both case).

Now I would like to delete data in HBase/Phoenix if I receive a specific message on Kafka. I did not find any clues/documentation on how I could do that while streaming.

I have found a way to delete data in "static"/"batch" mode with both HBase and Phoenix, but the same code does not work when on streaming (there is no error though, the data is simply not deleted).

Here is how we tried the delete part (we first create a parquet file on which we make a "readStream" to start a "fake" stream):

Main Object:

import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.functions.{col, concat_ws} import org.apache.spark.sql.streaming.DataStreamWriter

object AppMain{   def main(args: Array[String]): Unit = {
    val spark : SparkSession = SparkSession.builder().getOrCreate()


    import spark.implicits._
    //algo_name, partner_code, site_code, indicator
    val df = Seq(("FOO","FII"),
      ("FOO","FUU")
    ).toDF("col_1","col_2")

    df.write.mode("overwrite").parquet("testParquetStream")
    df.printSchema()
    df.show(false)

    val dfStreaming = spark.readStream.schema(df.schema).parquet("testParquetStream")
    dfStreaming.printSchema()
    val dfWithConcat = dfStreaming.withColumn("row", concat_ws("\u0000" , col("col_1"),col("col_2"))).select("row")

    // using delete class
    val withDeleteClass : WithDeleteClass = new WithDeleteClass(spark, dfWithConcat)
    withDeleteClass.delete_data()
    // using JDBC/Phoenix 
    //val withJDBCSink : WithJDBCSink = new WithJDBCSink(spark, dfStreaming)
    //withJDBCSink.delete_data() }

WithJDBCSink class

import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.streaming.DataStreamWriter

class WithJDBCSink (spark : SparkSession,dfToDelete : DataFrame){

  val df_writer = dfToDelete.writeStream

  def delete_data():Unit = {
    val writer = new JDBCSink()
    df_writer.foreach(writer).outputMode("append").start()
    spark.streams.awaitAnyTermination()
  }

}

JDBCSink class

import java.sql.{DriverManager, PreparedStatement, Statement}

class JDBCSink() extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row] {

  val quorum = "phoenix_quorum"
  var connection: java.sql.Connection = null
  var statement: Statement = null
  var ps : PreparedStatement= null
  def open(partitionId: Long, version: Long): Boolean = {
    connection = DriverManager.getConnection(s"jdbc:phoenix:$quorum")
    statement = connection.createStatement()
    true
  }

  def process(row: org.apache.spark.sql.Row): Unit = {
    //-----------------------Method 1
    connection = DriverManager.getConnection(s"jdbc:phoenix:$quorum")
    val query = s"DELETE from TABLE_NAME WHERE key_1 = 'val1' and key_2 = 'val2'"
    statement = connection.createStatement()
    statement.executeUpdate(query)
    connection.commit()

    //-----------------------Method 2
    //val query2 = s"DELETE from TABLE_NAME WHERE key_1 = ? and key_2 = ?"
    //connection = DriverManager.getConnection(s"jdbc:phoenix:$quorum")
    //ps = connection.prepareStatement(query2)
    //ps.setString(1, "val1")
    //ps.setString(2, "val2")
    //ps.executeUpdate()
    //connection.commit()
  }

  def close(errorOrNull: Throwable): Unit = {
    connection.commit()
    connection.close
  }
}

WithDeleteClass

import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Delete, HTable, RetriesExhaustedWithDetailsException, Row, Table}
import org.apache.hadoop.hbase.ipc.CallTimeoutException
import org.apache.hadoop.hbase.util.Bytes
import org.apache.log4j.Logger
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import java.util
import org.apache.spark.sql.streaming.DataStreamWriter
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.storage.StorageLevel

import java.sql.Connection
import java.sql.DriverManager
import java.sql.ResultSet
import java.sql.SQLException
import java.sql.Statement

class WithDeleteClass (spark : SparkSession, dfToDelete : DataFrame){
  val finalData = dfToDelete
  var df_writer = dfToDelete.writeStream
  var dfWriter = finalData.writeStream.outputMode("append").format("console").start()

  def delete_data(): Unit = {
    deleteDataObj.open()
    df_writer.foreachBatch((output : DataFrame, batchId : Long) =>
      deleteDataObj.process(output)
    )
    df_writer.start()

  }

  object deleteDataObj{
    var quorum = "hbase_quorum"
    var zkPort = "portNumber"
    var hbConf = HBaseConfiguration.create()
    hbConf.set("hbase.zookeeper.quorum", quorum)
    hbConf.set("hbase.zookeeper.property.clientPort", zkPort)
    var tableName: TableName = TableName.valueOf("TABLE_NAME")
    var conn = ConnectionFactory.createConnection(hbConf)
    var table: Table = conn.getTable(tableName)
    var hTable: HTable = table.asInstanceOf[HTable]
    def open() : Unit = {
    }
    def process(df : DataFrame) : Unit = {
      val rdd : RDD[Array[Byte]] = df.rdd.map(row => Bytes.toBytes(row(0).toString))
      val deletions : util.ArrayList[Delete] = new util.ArrayList()
      //List all rows to delete
      rdd.foreach(row => {
        val delete: Delete = new Delete(row)
        delete.addColumns(Bytes.toBytes("0"), Bytes.toBytes("DATA"))
        deletions.add(delete)
      })
      hTable.delete(deletions)
    }
    def close(): Unit = {}
  }
}

Any help/pointers would be greatly appreciated

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文