大数据项目实战 - Hbase Kafka Flume 集成安装

发布于 2023-05-23 19:37:12 字数 13564 浏览 46 评论 0

hbase

下载安装 hbase

$ wget http://archive.apache.org/dist/hbase/hbase-0.98.6/hbase-0.98.6-hadoop2-bin.tar.gz
$ tar xvf hbase-0.98.6-hadoop2-bin.tar.gz -C /opt/modules/  # 解压到指定目录

编辑 hbase-env.sh 文件

export JAVA_HOME=/opt/modules/jdk1.7.0_67
export HBASE_MANAGES_ZK=false

编辑 hbase-site.xml 文件

  <property>
    <name>hbase.rootdir</name>
    <value>hdfs://header:9000/hbase</value>
  </property>
  <property>
    <name>hbase.cluster.distributed</name>
    <value>true</value>
  </property>
  <property>
    <name>hbase.zookeeper.quorum</name>
    <value>header,worker-1,worker-2</value>
  </property>

编辑 regionservers 文件

header
worker-1
worker-2

新建 backup-masters 文件,添加需要用来作为备份的主机

worker-1

启动 hbase

$ bin/hbase-daemon.sh start master  # header
$ bin/hbase-daemon.sh start regionserver # header
$ bin/hbase-daemon.sh start regionserver # worker-1
$ bin/hbase-daemon.sh start regionserver # worker-2

输入http://header:60010/ 访问 hbase 管理界面

$ hbase shell

HBase 常用命令

表操作命令

  • create
  • describe
  • is_enabled
  • drop
  • enable
  • is_disabled
  • disable
  • list
  • count
  • delete
  • get
  • put
  • scan
  • truncate
hbase> help  # 查看命令
hbase> create 'test', 'info'
hbase> put 'test','0001','info:userName','laocao'
hbase> put 'test','0001','info:age','20'
hbase> put 'test','0001','info:tel','13500000000'
hbase> put 'test','0002','info:userName','小明'
hbase> put 'test','0002','info:age','22'
hbase> put 'test','0002','info:tel','13500000002'
hbase> scan 'test'
hbase> count 'test'
hbase> disable 'test'
hbase> drop 'test'

Kafka

下载安装 kafka

$ wget https://archive.apache.org/dist/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz
$ tar xvf kafka_2.11-0.9.0.0.tgz -C /opt/modules
$ mkdir kafka-logs  # kafka主目录下创建文件夹

在 header 上编辑 config/server.properties 文件

broker.id=0
host.name=header
log.dirs=/opt/modules/kafka_2.11-0.8.2.1/kafka-logs
zookeeper.connect=header:2181,worker-1:2181,worker-2:2181

分发服务到 worker-1, worker-2,修改 server.properties 文件的 broker.id 分别为1,2 和 host.name 与主机名对应。

启动测试 kafka

$ bin/kafka-server-start.sh config/server.properties  # 启动kafka服务
$ bin/kafka-topics.sh --create --zookeeper header:2181,worker-1:2181,worker-2:2181 --replication-factor 1 --partitions 1 --topic protest  # 创建好topic
  • --replication-factor 为副本数
  • paritions 分区数:后面性能调优可以使用

查看创建的 topics

$ bin/zkCli.sh  # 到zookeeper上去看创建的目录
[zk: localhost:2181(CONNECTED) 4] ls /brokers/topics
[protest]

启动生产者和消费者

$ bin/kafka-console-producer.sh --broker-list header:9092,worker-1:9092,worker-2:9092 --topic protest # 启动生产者
$ bin/kafka-console-consumer.sh --zookeeper header:2181,worker-1:2181,worker-2:2181 --from-beginning  --topic protest  # 启动消费者

在生产者的命令行里输入,消费者实时显示消费。

ps:可以启动三个kafka服务,可把在 producer.properties 文件中添加3个服务器 metadata.broker.list=header:9092,worker-1:9092,worker-2:9092

Flume

下载安装

$ wget http://archive.apache.org/dist/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz
$ tar xvf apache-flume-1.7.0-bin.tar.gz -C /opt/modules/

如上,下载 flume 到 worker-1。worker-2, header 上。

配置 worker-1 与 worker-2

编辑 flume-env.sh 添加环境变量

export JAVA_HOME=/opt/modules/jdk1.7.0_67

编辑 flume-conf.properties 文件

agent.sources = s1
agent.channels = c1
agent.sinks = k1

agent.sources.s1.type = exec
agent.sources.s1.command = tail -F /opt/datas/weblog-flume.log
agent.sources.s1.channels = c1

agent.channels.c1.type = memory
agent.channels.c1.capacity = 10000
agent.channels.c1.transactionCapacity = 10000
agent.channels.c1.keep-alive=5

agent.sinks.k1.type = avro
agent.sinks.k1.channel = c1
agent.sinks.k1.hostname = header
agent.sinks.k1.port = 55555
  • capacity: 容量大小
  • transactionCapacity: 事务最大容量
  • keep-alive: 数据调优,source 插入到 channel 时等待(channel 空间满了),slink 从 channel 读取数据为空时等待。

配置 header

编辑 flume-env.sh 添加环境变量

export JAVA_HOME=/opt/modules/jdk1.7.0_67
export HADOOP_HOME=/opt/modules/hadoop-2.5.0
export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2

编辑 flume-conf.properties 文件

agent.sources = r1
agent.channels = kafkaC hbaseC
agent.sinks = kafkaSink  hbaseSink

agent.sources.r1.type = avro
agent.sources.r1.channels = hbaseC kafkaC
agent.sources.r1.bind = header
agent.sources.r1.port = 55555
agent.sources.r1.threads = 5

#************** flume + hbase *************
agent.channels.hbaseC.type = memory
agent.channels.hbaseC.capacity = 10000
agent.channels.hbaseC.transactionCapacity = 10000
agent.channels.hbaseC.keep-alive = 20

agent.sinks.hbaseSink.type = asynchbase
agent.sinks.hbaseSink.table = weblogs
agent.sinks.hbaseSink.columnFamily = info
agent.sinks.hbaseSink.serializer = org.apache.flume.sink.hbase.KfkAsyncHbaseEventSerializer
agent.sinks.hbaseSink.channel = hbaseC
agent.sinks.hbaseSink.serializer.payloadColumn=datetime,userid,searchname,retorder,cliorder,cliurl

#************** flume + kafak **************
agent.channels.kafkaC.type = memory
agent.channels.kafkaC.capacity = 10000
agent.channels.kafkaC.transactionCapacity = 10000
agent.channels.kafkaC.keep-alive = 20

agent.sinks.kafkaSink.channel = kafkaC
agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkaSink.brokerList = header:9092,worker-1:9092,worker-2:9092
agent.sinks.kafkaSink.topic = weblogs
agent.sinks.kafkaSink.zookeeperConnect = header:2181,worker-1:2181,worker-2:2181
agent.sinks.kafkaSink.requiredAcks = 1
agent.sinks.kafkaSink.batchSize=1
agent.sinks.kafkaSink.serializer.class = kafka.serializer.StringEncoder

相关参数的优化:

  • agent1.sinks.kafkaSink.requiredAcks:消息的确认
  • agent1.sinks.kafkaSink.batchSize: 给 kafka 推送批量的 msg 数
  • agent1.sinks.kafkaSink.serializer.class: 解析类

修改 flume 源码

下载

$ wget http://archive.apache.org/dist/flume/1.7.0/apache-flume-1.7.0-src.tar.gz

修改源码

SimpleRowKeyGenerator 类新增函数

public static byte[] getKfkRowKey(String userid, String datetime) throws UnsupportedEncodingException {
return (userid + datetime + String.valueOf(System.currentTimeMillis()).getBytes("UTF8");
}

新建 KfkAsyncHbaseEventSerializer 类(拷贝自 SimpleAsyncHbaseEventSerializer),并修改函数 getActions

@Override
public List<PutRequest> getActions() {
    List<PutRequest> actions = new ArrayList<PutRequest>();
    if (payloadColumn != null) {
        byte[] rowKey;
        try {
            String[] columns = new String(this.payloadColumn).split(",");
            String[] values = new String(this.payload).split(",");

            for(int i=0; i<columns.length; i++){
                byte[] colColumn = columns[i].getBytes();
                byte[] colValue = values[i].getBytes();

                if(columns.length != values.length) break;
                String datetime = String.valueOf(values[0]);
                String userid = String.valueOf(values[1]);
                rowKey = SimpleRowKeyGenerator.getKfkRowKey(userid, datetime);
                PutRequest putRequest = new PutRequest(table, rowKey, cf, colColumn, colValue);
                actions.add(putRequest);
            }

        } catch (Exception e) {
            throw new FlumeException("Could not get row key!", e);
        }
    }
    return actions;
}

AsyncHBaseSink 类中的修改 SimpleAsyncHbaseEventSerializer 为 KfkAsyncHbaseEventSerializer

eventSerializerType =
          "org.apache.flume.sink.hbase.KfkAsyncHbaseEventSerializer";

编译打包 jar 包,上传到 header 的 flume 的 lib 目录。

Flume+HBase+Kafka 集成

下载用户查询日志

访问 搜狗实验室用户查询数据,先选用精简版,在 worker-1 和 worker-2 上下载部署查询日志。

$ wget http://download.labs.sogou.com/dl/sogoulabdown/SogouQ/SogouQ.mini.tar.gz
$ tar -xvf SogouQ.mini.tar.gz -C /opt/datas
$ cat SogouQ.sample | tr "\t" "," > weblog2.log  # 将文件中的tab更换成逗号
$ cat weblog2.log | tr " " "," > weblogs.log

编写启动脚本

编写运行模型程序的 shell 脚本

worker-1 和 worker-2 /opt/datas/weblog-flume.sh

#/bin/bash
echo "start log ...."
java -jar /opt/jar/weblogs.jar  /opt/datas/weblogs.log weblog-flume.log

编写 flume 集群服务启动的脚本

flume 主目录下创建文件 flume-kfk-start.sh [worker-1]

#/bin/bash
echo "flume-1 start ....."
bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n agent -Dflume.root.logger=DEBUG,console

flume 主目录下创建文件 flume-kfk-start.sh [worker-2]

#/bin/bash
echo "flume-2 start ....."
bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n agent -Dflume.root.logger=INFO,console

flume 主目录下创建文件 flume-kfk-start.sh [header]

#/bin/bash
echo "flume-3 start ....."
bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n agent -Dflume.root.logger=INFO,console

编写 kafka 消费脚本

在 header、worker-1、worker-2 的 kafka 主目录下都存放 kfk-test-consumer.sh 文件

#/bin/bash
echo "kfk-kafka-comsumer.sh start ....."
bin/kafka-console-consumer.sh -zookeeper header:2181,worker-1:2181,worker-2:2181 -from-beginning -topic weblogs

启动数据采集所有服务

启动 HDFS 服务

$ sbin/hadoop-daemon.sh start namenode  # header
$ sbin/hadoop-daemon.sh start datanode  # header
$ sbin/hadoop-daemon.sh start datanode  # worker-1
$ sbin/hadoop-daemon.sh start datanode  # worker-2

启动 Zookeeper 服务

$ bin/zkServer.sh start  # header
$ bin/zkServer.sh start  # worker-1
$ bin/zkServer.sh start  # worker-2
$ bin/zkServer.sh status  # 查看状态

启动 Hbase 服务

$ bin/hbase-daemon.sh start master  # header
$ bin/hbase-daemon.sh start regionserver # header
$ bin/hbase-daemon.sh start regionserver # worker-1
$ bin/hbase-daemon.sh start regionserver # worker-2

创建业务表

hbase > create 'weblogs', 'info'

启动 Kafka 服务

$ bin/kafka-server-start.sh config/server.properties  # header
$ bin/kafka-server-start.sh config/server.properties  # worker-1
$ bin/kafka-server-start.sh config/server.properties  # worker-2
$ bin/zkCli.sh  # 进入zookeeper客户端

删除之前的 topic

zk> ls /brokers/topics
[protest]
zk> rmr /brokers/topics/protest

创建新的 topic

$ bin/kafka-topics.sh --create --zookeeper header:2181,worker-1:2181,worker-2:2181 --replication-factor 3 --partitions 1 --topic weblogs

启动 Flume-1 服务(worker-1、worker-2)

$ ./flume-kfk-start.sh  # worker-1的flume主目录
$ ./weblog-flume.sh  # worker-1的/opt/datas/中写入实时日志
$ ./flume-kfk-start.sh  # worker-2的flume主目录
$ ./weblog-flume.sh  # worker-2的/opt/datas/中写入实时日志

启动 Flume-3 服务(header)

$ ./flume-kfk-start.sh  # header的flume主目录
$ ./kfk-test-consumer.sh  # worker-1里的kafka消费程序

问题

关闭 hdfs 的安全模式

$ bin/hadoop dfsadmin -safemode leave

flume+kafka 报错

问题:

2018-07-22 22:20:33,733 (kafka-producer-network-thread | producer-1) [ERROR - org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:130)] Uncaught error in kafka producer I/O thread:
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'throttle_time_ms': java.nio.BufferUnderflowException
    at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
    at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
    at java.lang.Thread.run(Thread.java:745)

解决方案:

升级 kafka 版本为 0.9.x

引用

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据

关于作者

疧_╮線

暂无简介

文章
评论
26 人气
更多

推荐作者

櫻之舞

文章 0 评论 0

弥枳

文章 0 评论 0

m2429

文章 0 评论 0

野却迷人

文章 0 评论 0

我怀念的。

文章 0 评论 0

    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文