如何二进制部署Kafka?

Apache-Hadoop

1 基础知识

1.1 事件流

1.1.1 事件流的概念

– 数据流指的是用于传输信息的数字编码信号序列
– 事件流属于数据流,是数据流的一个子集
需要注意的是,
– 事件流与一般的数据流处理差异在于事件流处理通常是异步的
– 事件流源于不同地方、不同类型、不同的接收顺序
– 事件流处理需要使用事件属性、事件发生时间以及可推断因果关系的其他基础元素。

1.1.2 事件流的产生背景

– 传统银行和股票交易领域、互联网监控、无限通讯网络等领域由于持续产生大量数据
– 大量的数据需要接近于实时的方式对更新数据流进行复杂的分析、预测、监控等

1.1.3 事件流的特点

– 事件流中的事件元素在线到达。
– 事件流中系统无法控制将要处理的新到达的时间元素的顺序。
– 事件流模型中查询是相对静止不变的,而数据是时刻变化的
– 事件流的潜在大小是无界的,所以能存储的数据相对事件流就数据大小就非常有限
– 事件流中的某个元素经过处理,要么被丢弃,要么被归档存储(丢弃的元素可能需要再次被访问)

1.2 Apache Kafka

1.2.1 Kafka的介绍

– Kafka是一个事件流处理平台
– Kafka实现端到端的事件流解决方案
– Kafka是一个分布式系统

1.2.2 Kafka的功能

– Kafka支持发布(写入)和订阅(读取)事件流(包括从其他系统持续导入、导出的数据)
– Kafka支持根据需要持久可靠第存储事件流
– Kafka支持在事件发生时或回顾性地处理事件流

1.2.3 Kafka的应用场景

– Kafka可用于实时处理支持和金融交易,例如证券交易所、银行和保险中
– Kafka可用于实时跟踪和监控汽车、卡车、车队和货运,例如在物流和汽车行业
– Kafka可用于持续捕获和分析来自物联网设备或其他设备的传感器数据,例如工厂和电风扇
– Kafka可用于收集并即刻响应客户互动和订单,例如零售、酒店和旅游行业记忆移动应用程序
– Kafka可用于检测住院病人、预测病情变化,确保紧急情况下及时治疗
– Kafka可用于连接、存储和提供公司不同部门产生的数据
– Kafka可用于数据平台、事件驱动框架和微服务的基础

1.2.4 Kafka的特点

– Kafka支持分布式部署、弹性扩展,具有高可用和高度安全性
– Kafka支持裸机硬件、虚拟机、容器部署,同时支持本地和云端部署
– Kafka可跨越多个数据中心或云区域运行

1.2.5 Kafka的事件记录范例

Event key: "Alice"
Event value: "Made a payment of $200 to Bob"
Event timestamp: "Jun. 25, 2020 at 2:06 p.m."

– “Event key”声明事件的键
– “Event value”声明事件的值
– “Event timestamp”声明事件的时间戳

1.2.6 Kafka的架构

– Kafka服务器端,多台服务器组成存储层(池)称为brokers
– Kafka客户端,包括生产者(Producers)和消费者(consumers)
注:服务器与客户端通过高性能的TCP网络协议进行通讯
基本的概念是,
– 包括生产者(Producers),向Kafka发布(写入)事件的客户端程序
– 消费者(consumers),订阅(读取和处理)事件的客户端程序
需要注意的是,
– Kafka生产者和消费者完全解耦且彼此不可知(高可扩展的关键设计)
– Kafka生产者永远不需要等待消费者(由Kafak服务端提供各种保证,例如一次性处理事件的能力)
– Kafka事件被组织并持久地存储在主题(topics)中(主题类似文件夹,事件类似文件夹中的文件)
– Kafka中始终是多生产者和多订阅者(同一主题可有零个以上的生产者和消费者)
– Kafka主题被消费后不会被删除(由主题配置定义事件的保留时间,过期将被丢弃)

1.2.7 Kafka的分区


参阅上图,
– Kafka主题分区(partitioned),使得同一个主题分布存储到Kafka代理上的多分区(上图是4个分区P1-P4)
– Kafka数据的分布式存储是实现伸缩(横向扩展)的基础
– Kafka允许客户端应用程序同时从brokers读取(生产)和写入(消费)数据(上图是两个生产客户端,他们彼此独立)
– Kafka发布新事件到主题时,事件键相同(上图颜色表示)时会被写入同一个分区(实质是事件附加到主题分区之一)
– Kafka始终保证消费者以与写入事件完全相同的顺序读取该分区的事件

1.2.7 Kafka的API

– Kafak以Java和Scala提供五个核心API
– Producer API支持将事件流发布到一个以上的Kafka主题
– Consumer API支持订阅一个以上的Kafak主题
– Kafka Streams API支持实现流处理应用程序和微服务(支持转换、聚合和连接、窗口化、基于事件处理、转换流到新主题等操作)
– Kafka Connect API支持构建和运行可重用的数据IO连接器(连接器支持从外部读写事件流,例捕获PostgreSQL表的更新事件流)

2 最佳实践

2.1 系统的基本配置

2.1.1 配置NTP服务

In hd[19-21],

yum install -y chrony

确认以下时间服务器的配置符合环境需求,

grep ^server /etc/chrony.conf

可见如下服务配置,

server 0.centos.pool.ntp.org iburst
server 1.centos.pool.ntp.org iburst
server 2.centos.pool.ntp.org iburst
server 3.centos.pool.ntp.org iburst

如果不符合请更换为内网的NTP服务器地址,可使用如下命令启动服务并配置自启动

systemctl start chronyd.service
systemctl enable chronyd.service

另外还建议你根据实际配置时区,

timedatectl set-timezone 'Asia/Shanghai'

2.1.2 安装JDK

In hd[19-21],
请参阅以下方法安装jdk-8u121-linux-x64,
https://www.cmdschool.org/archives/397
安装完成后,请使用如下命令检查JDK的安装,

java -version

命令显示如下,

java version "1.8.0_121"
Java(TM) SE Runtime Environment (build 1.8.0_121-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.121-b13, mixed mode)

2.2 软件环境配置

2.2.1 下载安装包

cd ~
wget https://dlcdn.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz  --no-check-certificate

另外,如果需要其他版本请从如下链接下载,
https://dlcdn.apache.org/kafka/

2.2.2 解压安装包

cd ~
tar -xf kafka_2.13-3.0.0.tgz

2.3 部署软件包

2.3.1 配置运行用户

groupadd kafka
useradd -g kafka -d /var/lib/kafka kafka

2.2.3 部署安装包

cd ~
mv kafka_2.13-3.0.0 /usr/

部署完成后,建议创建以下目录便于管理,

mkdir -p /etc/kafka
ln -s /usr/kafka_2.13-3.0.0/config/ /etc/kafka/conf
ln -s /usr/kafka_2.13-3.0.0/logs /var/log/kafka

另外,还需配置目录权限

chown kafka:kafka -R /usr/kafka_2.13-3.0.0
chmod 755 -R /usr/kafka_2.13-3.0.0

2.4 配置软件包

2.4.1 配置软件包的环境变量

vim /etc/profile.d/kafka.sh

加入如下定义,

export KAFKA_HOME=/usr/kafka_2.13-3.0.0
export PATH=${KAFKA_HOME}/bin:$PATH

– 变量“KAFKA_HOME”声明KAFKA的家目录
– 变量“PATH”声明可执行文件的位置(加入KAFKA执行文件的声明)
创建完成后,你需要使用如下命令导入环境变量,

source /etc/profile.d/kafka.sh

2.4.2 修改zookeeper的定义

vim /etc/kafka/conf/zookeeper.properties

参数修改如下,

dataDir=/var/lib/zookeeper
clientPort=2181
maxClientCnxns=0
admin.enableServer=false

根据配置,我们需要创建Zookeeper的数据存储目录,

mkdir -p /var/lib/zookeeper
chown kafka:kafka /var/lib/zookeeper
chmod 775 /var/lib/zookeeper

2.4.3 测试zookeeper启动

su - kafka
/usr/kafka_2.13-3.0.0/bin/zookeeper-server-start.sh /etc/kafka/conf/zookeeper.properties

以上按下【Ctrl+c】终止程序运行,程序启动后,可使用如下命令查看程序倾听的端口,

netstat -anp | grep java

可见如下信息,

tcp6       0      0 :::39775                :::*                    LISTEN      13907/java
tcp6       0      0 :::2181                 :::*                    LISTEN      13907/java
unix  2      [ ]         STREAM     CONNECTED     129728   13907/java
unix  2      [ ]         STREAM     CONNECTED     129731   13907/java

2.4.4 修改Kafka的定义

vim /etc/kafka/conf/server.properties

参数修改如下,

broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

根据配置的需求,我们需要创建如下文件夹,

mkdir -p /data/kafka
chown kafka:kafka -R /data/kafka
chmod 755 -R /data/kafka

2.4.5 测试Kafka启动

su - kafka
/usr/kafka_2.13-3.0.0/bin/kafka-server-start.sh /etc/kafka/conf/server.properties

以上按下【Ctrl+c】终止程序运行,程序启动后,可使用如下命令查看程序倾听的端口,

netstat -anp | grep java

可见如下信息,

tcp6       0      0 :::43066                :::*                    LISTEN      15564/java
tcp6       0      0 :::34145                :::*                    LISTEN      15978/java
tcp6       0      0 :::9092                 :::*                    LISTEN      15978/java
tcp6       0      0 :::2181                 :::*                    LISTEN      15564/java
tcp6       0      0 127.0.0.1:2181          127.0.0.1:43684         ESTABLISHED 15564/java
tcp6       0      0 127.0.0.1:59590         127.0.0.1:9092          ESTABLISHED 15978/java
tcp6       0      0 127.0.0.1:9092          127.0.0.1:59590         ESTABLISHED 15978/java
tcp6       0      0 127.0.0.1:43684         127.0.0.1:2181          ESTABLISHED 15978/java
unix  2      [ ]         STREAM     CONNECTED     133093   15564/java
unix  2      [ ]         STREAM     CONNECTED     134408   15978/java
unix  2      [ ]         STREAM     CONNECTED     134405   15978/java
unix  2      [ ]         STREAM     CONNECTED     133090   15564/java

2.4.6 配置启动脚本

vim /usr/lib/systemd/system/zookeeper.service

加入如下配置,

[Unit]
Description=Apache Zookeeper manager
Wants=network.target
Before=network.target
After=network-pre.target
Documentation=https://zookeeper.apache.org/doc/current/index.html

[Service]
Type=simple
User=kafka
WorkingDirectory=/var/lib/kafka
ExecStart=/bin/sh -c '. /etc/profile;/usr/kafka_2.13-3.0.0/bin/zookeeper-server-start.sh /etc/kafka/conf/zookeeper.properties'
ExecStop=/bin/sh -c '. /etc/profile;/usr/kafka_2.13-3.0.0/bin/zookeeper-server-stop.sh /etc/kafka/conf/zookeeper.properties'

[Install]
WantedBy=multi-user.target

以上是zookeeper的启动控制脚本,以下创建kafka的启动控制脚本,

vim /usr/lib/systemd/system/kafka.service

加入如下配置,

[Unit]
Description=Apache Kafka manager
Wants=network.target zookeeper.service
After=network.target zookeeper.service
Documentation=https://kafka.apache.org/documentation/#gettingStarted

[Service]
Type=simple
User=kafka
WorkingDirectory=/var/lib/kafka
ExecStart=/bin/sh -c '. /etc/profile;/usr/kafka_2.13-3.0.0/bin/kafka-server-start.sh /etc/kafka/conf/server.properties'
ExecStop=/bin/sh -c '. /etc/profile;/usr/kafka_2.13-3.0.0/bin/kafka-server-stop.sh /etc/kafka/conf/server.properties'

[Install]
WantedBy=multi-user.target

脚本创建后,需要使用如下命令重载使配置生效,

systemctl daemon-reload

然后可以使用如下命令服务控制,

systemctl start zookeeper.service
systemctl restart zookeeper.service
systemctl stop zookeeper.service
systemctl status zookeeper.service

systemctl start kafka.service
systemctl restart kafka.service
systemctl stop kafka.service
systemctl status kafka.service

然后可以使用如下命令设置服务自动启动,

systemctl enable zookeeper.service
systemctl enable kafka.service

2.4.7 测试Kafka

kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic quickstart-events --bootstrap-server localhost:9092

创建成功会收到如下提示,

Created topic quickstart-events.

以上创建主题,然后你可以使用如下命令查询主题,

kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092

可见如下显示,

Topic: quickstart-events        TopicId: EHZjDqn2S2CnnKlxeYVWkA PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: quickstart-events        Partition: 0    Leader: 0       Replicas: 0     Isr: 0

把事件写入主题,

kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
>This is my first event
>This is my second event

以上按下【Ctrl+c】即可结束,然后你可以使用如下命令查询事件,

kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092

可见如下显示,

This is my first event
This is my second event

以上按下【Ctrl+c】即可结束

参阅文档
===========

官方首页
————
https://kafka.apache.org/

安装文档
————
https://kafka.apache.org/quickstart
https://kafka.apache.org/documentation/#gettingStarted
https://github.com/apache/kafka

事件流的概念
————-
https://baike.baidu.com/item/%E4%BA%8B%E4%BB%B6%E6%B5%81%E5%A4%84%E7%90%86/18748665?fr=aladdin

没有评论

发表回复

Apache-Hadoop
如何安装Kafka connect mqtt?

1 前言 一个问题,一篇文章,一出故事。 我们配置好Kafka connect集群后,我们来尝试安装 …

Apache-Hadoop
如何配置Kafka connect集群?

1 基础知识 1.1 Kafka Connect的介绍 – Kafak Connect是 …

Apache-Hadoop
如何二进制部署CMAK?

1 基础知识 1.1 CMAK 1.1.1 CMAK的介绍 – CMAK原称Kafka …