如何配置独立的Kafka集群?

Apache-Hadoop

1 基础知识

1.1 事件流

1.1.1 事件流的概念

– 数据流指的是用于传输信息的数字编码信号序列
– 事件流属于数据流,是数据流的一个子集
需要注意的是,
– 事件流与一般的数据流处理差异在于事件流处理通常是异步的
– 事件流源于不同地方、不同类型、不同的接收顺序
– 事件流处理需要使用事件属性、时间发生时间以及可推断因果关系的其他基础元素。

1.1.2 事件流的产生背景

– 传统银行和股票交易领域、互联网监控、无限通讯网络等领域由于持续产生大量数据的数据
– 大量的数据需要接近于实时的方式对更新数据流进行复杂的分析、预测、监控等

1.1.3 事件流的特点

– 事件流中的事件元素在线到达。
– 事件流中系统无法控制将要处理的新到达的时间元素的顺序。
– 事件流模型中查询是相对静止不变的,而数据是时刻变化的
– 事件流相的潜在大小是无界的,所以能存储的数据相对事件流就数据大小就非常有限
– 事件流中的某个元素经过处理,要么被丢弃,要么被归档存储(丢弃的元素可能需要再次被访问)

1.2 Apache Kafka

1.2.1 Kafka的介绍

– Kafka是一个事件流处理平台
– Kafka实现端到端的事件流解决方案
– Kafka是一个分布式系统

1.2.2 Kafka的功能

– Kafka支持发布(写入)和订阅(读取)事件流(包括从其他系统持续导入、导出的数据)
– Kafka支持根据需要持久可靠第存储事件流
– Kafka支持在事件发生时或回顾性地处理事件流

1.2.3 Kafka的应用场景

– Kafka可用于实时处理支持和金融交易,例如证券交易所、银行和保险中
– Kafka可用于实时跟踪和监控汽车、卡车、车队和货运,例如在物流和汽车行业
– Kafka可用于持续捕获和分析来自物联网设备或其他设备的传感器数据,例如工厂和电风扇
– Kafka可用于收集并即刻响应客户互动和订单,例如零售、酒店和旅游行业记忆移动应用程序
– Kafka可用于检测住院病人、预测病情变化,确保紧急情况下及时治疗
– Kafka可用于连接、存储和提供公司不同部门产生的数据
– Kafka可用于数据平台、事件驱动框架和微服务的基础

1.2.4 Kafka的特点

– Kafka支持分布式部署、弹性扩展,具有高可用和高度安全性
– Kafka支持裸机硬件、虚拟机、容器部署,同时支持本地和云端部署
– Kafka可跨越多个数据中心或云区域运行

1.2.5 Kafka的事件记录范例

Event key: "Alice"
Event value: "Made a payment of $200 to Bob"
Event timestamp: "Jun. 25, 2020 at 2:06 p.m."

– “Event key”声明事件的键
– “Event value”声明事件的值
– “Event timestamp”声明事件的时间戳

1.2.6 Kafka的架构

– Kafka服务器端,多台服务器组成存储层(池)称为brokers
– Kafka客户端,包括生产者(Producers)和消费者(consumers)
注:服务器与客户端通过高性能的TCP网络协议进行通讯
基本的概念是,
– 包括生产者(Producers),向Kafka发布(写入)事件的客户端程序
– 消费者(consumers),订阅(读取和处理)事件的客户端程序
需要注意的是,
– Kafka生产者和消费者完全解耦且彼此不可知(高可扩展的关键设计)
– Kafka生产者永远不需要等待消费者(由Kafak服务端提供各种保证,例如一次性处理事件的能力)
– Kafka事件被组织并持久地存储在主题(topics)中(主题类似文件夹,事件类似文件夹中的文件)
– Kafka中始终是多生产者和多订阅者(同一主题可有零个以上的生产者和消费者)
– Kafka主题被消费后不会被删除(由主题配置定义事件的保留时间,过期将被丢弃)

1.2.7 Kafka的分区


参阅上图,
– Kafka主题分区(partitioned),使得同一个主题分布存储到Kafka代理上的多分区(上图是4个分区P1-P4)
– Kafka数据的分布式存储是实现伸缩(横向扩展)的基础
– Kafka允许客户端应用程序同时从brokers读取(生产)和写入(消费)数据(上图是两个生产客户端,他们彼此独立)
– Kafka发布新事件到主题时,事件键相同(上图颜色表示)时会被写入同一个分区(实质是事件附加到主题分区之一)
– Kafka始终保证消费者以与写入事件完全相同的顺序读取该分区的事件

1.2.7 Kafka的API

– Kafak以Java和Scala提供五个核心API
– Producer API支持将事件流发布到一个以上的Kafka主题
– Consumer API支持订阅一个以上的Kafak主题
– Kafka Streams API支持实现流处理应用程序和微服务(支持转换、聚合和连接、窗口化、基于事件处理、转换流到新主题等操作)
– Kafka Connect API支持构建和运行可重用的数据IO连接器(连接器支持从外部读写事件流,例捕获PostgreSQL表的更新事件流)

2 最佳实践

2.1 环境配置

2.1.1 Kafka主机信息

hostname = hd[19-21].cmdschool.org
ipaddress = 10.168.0.1[19-21] OS = centOS 7.x x86_64

2.1.2 配置名称解析

In hd[19-21],

echo "10.168.0.119 hd19 hd19.cmdschool.org" >> /etc/hosts
echo "10.168.0.120 hd20 hd20.cmdschool.org" >> /etc/hosts
echo "10.168.0.121 hd21 hd21.cmdschool.org" >> /etc/hosts

注:以上仅用于测试,生产环境请使用DNS代替

2.1.3 公钥认证

In hd19,

ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa
ssh-copy-id -i ~/.ssh/id_rsa.pub root@hd19
ssh-copy-id -i ~/.ssh/id_rsa.pub root@hd20
ssh-copy-id -i ~/.ssh/id_rsa.pub root@hd21

如果你不理解什么是公钥认证,请参阅如下章节,

如何部署公钥认证?

2.1.4 配置NTP服务

In hd[19-21],

yum install -y chrony

确认以下时间服务器的配置符合环境需求,

grep ^server /etc/chrony.conf

可见如下服务配置,

server 0.centos.pool.ntp.org iburst
server 1.centos.pool.ntp.org iburst
server 2.centos.pool.ntp.org iburst
server 3.centos.pool.ntp.org iburst

如果不符合请更换为内网的NTP服务器地址,可使用如下命令启动服务并配置自启动

systemctl start chronyd.service
systemctl enable chronyd.service

另外还建议你根据实际配置时区,

timedatectl set-timezone 'Asia/Shanghai'

2.1.5 安装JDK

In hd[19-21],
请参阅以下方法安装jdk-8u121-linux-x64,
https://www.cmdschool.org/archives/397
安装完成后,请使用如下命令检查JDK的安装,

java -version

命令显示如下,

java version "1.8.0_121"
Java(TM) SE Runtime Environment (build 1.8.0_121-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.121-b13, mixed mode)

2.2 配置Zookeeper集群

In hd[19-21],
请参阅如下链接配置Zookeeper集群环境,

如何部署Apache ZooKeeper 3.6.x集群?

2.3 部署软件包

In hd[19-21],

2.3.1 下载安装包

cd ~
wget https://dlcdn.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz  --no-check-certificate

另外,如果需要其他版本请从如下链接下载,
https://dlcdn.apache.org/kafka/

2.3.2 解压安装包

cd ~
tar -xf kafka_2.13-3.0.0.tgz

2.3.3 配置运行用户

groupadd kafka
useradd -g kafka -d /var/lib/kafka kafka

2.3.4 部署安装包

cd ~
mv kafka_2.13-3.0.0 /usr/

部署完成后,建议创建以下目录便于管理,

mkdir -p /etc/kafka
ln -s /usr/kafka_2.13-3.0.0/config/ /etc/kafka/conf
ln -s /usr/kafka_2.13-3.0.0/logs/ /var/log/kafka

另外,还需配置目录权限

chown kafka:kafka -R /usr/kafka_2.13-3.0.0
chmod 755 -R /usr/kafka_2.13-3.0.0

2.3.5 配置软件包的环境变量

vim /etc/profile.d/kafka.sh

加入如下定义,

export KAFKA_HOME=/usr/kafka_2.13-3.0.0
export PATH=${KAFKA_HOME}/bin:$PATH

– 变量“KAFKA_HOME”声明KAFKA的家目录
– 变量“PATH”声明可执行文件的位置(加入KAFKA执行文件的声明)
创建完成后,你需要使用如下命令导入环境变量,

source /etc/profile.d/kafka.sh

2.3.5 配置启动脚本

vim /usr/lib/systemd/system/kafka.service

加入如下配置,

[Unit]
Description=Apache Kafka manager
Wants=network.target zoo.service
After=network.target zoo.service
Documentation=https://kafka.apache.org/documentation/#gettingStarted

[Service]
Type=simple
User=kafka
WorkingDirectory=/var/lib/kafka
ExecStart=/bin/sh -c '. /etc/profile;/usr/kafka_2.13-3.0.0/bin/kafka-server-start.sh /etc/kafka/conf/server.properties'
ExecStop=/bin/sh -c '. /etc/profile;/usr/kafka_2.13-3.0.0/bin/kafka-server-stop.sh /etc/kafka/conf/server.properties'

[Install]
WantedBy=multi-user.target

以上启动依赖zookeeper请根据实际需要配置,脚本创建后,需要使用如下命令重载使配置生效,

systemctl daemon-reload

然后可以使用如下命令服务控制,

systemctl start kafka.service
systemctl restart kafka.service
systemctl stop kafka.service
systemctl status kafka.service

如果服务启动后,你可以使用如下命令查看端口的启动情况,

netstat -antp | grep `pgrep -u kafka java` | grep :::

可见如下显示,

tcp6       0      0 :::41811                :::*                    LISTEN      4568/java
tcp6       0      0 :::36918                :::*                    LISTEN      4568/java
tcp6       0      0 :::9092                 :::*                    LISTEN      4568/java
tcp6       0      0 :::9988                 :::*                    LISTEN      4568/java

然后可以使用如下命令设置服务自动启动,

systemctl enable kafka.service

2.3.6 开放应用所需的端口

In hd19,

firewall-cmd --permanent --add-port 9092/tcp
firewall-cmd --reload
firewall-cmd --list-all

2.4 配置Kafka集群

2.4.1 修改19号节点配置

In hd19,

cp /etc/kafka/conf/server.properties /etc/kafka/conf/server.properties.default
vim /etc/kafka/conf/server.properties

配置修改如下,

broker.id=19
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://hd19.cmdschool.org:9092
log.dirs=/data/kafka
num.partitions=3
log.retention.hours=168
zookeeper.connect=hd19.cmdschool.org:2181,hd20.cmdschool.org:2181,hd21.cmdschool.org:2181
default.replication.factor=3
message.max.bytes=16777216
min.insync.replicas=2

根据配置的需求,我们需要创建如下文件夹,

mkdir -p /data/kafka
chown kafka:kafka -R /data/kafka
chmod 755 -R /data/kafka

2.4.2 修改20号节点配置

In hd20,

cp /etc/kafka/conf/server.properties /etc/kafka/conf/server.properties.default
vim /etc/kafka/conf/server.properties

配置修改如下,

broker.id=20
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://hd20.cmdschool.org:9092
log.dirs=/data/kafka
num.partitions=3
log.retention.hours=168
zookeeper.connect=hd19.cmdschool.org:2181,hd20.cmdschool.org:2181,hd21.cmdschool.org:2181
default.replication.factor=3
message.max.bytes=16777216
min.insync.replicas=2

根据配置的需求,我们需要创建如下文件夹,

mkdir -p /data/kafka
chown kafka:kafka -R /data/kafka
chmod 755 -R /data/kafka

2.4.3 修改21号节点配置

In hd21,

cp /etc/kafka/conf/server.properties /etc/kafka/conf/server.properties.default
vim /etc/kafka/conf/server.properties

配置修改如下,

broker.id=21
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://hd21.cmdschool.org:9092
log.dirs=/data/kafka
num.partitions=3
log.retention.hours=168
zookeeper.connect=hd19.cmdschool.org:2181,hd20.cmdschool.org:2181,hd21.cmdschool.org:2181
default.replication.factor=3
message.max.bytes=16777216
min.insync.replicas=2

根据配置的需求,我们需要创建如下文件夹,

mkdir -p /data/kafka
chown kafka:kafka -R /data/kafka
chmod 755 -R /data/kafka

2.4.3 重启服务使配置生效

In hd19,

systemctl restart kafka.service
ssh hd20 systemctl restart kafka.service
ssh hd21 systemctl restart kafka.service

然后,我们可以使用如下命令查看服务状态,

systemctl status kafka.service
ssh hd20 systemctl status kafka.service
ssh hd21 systemctl status kafka.service

2.4.4 测试Kafka

In hd19,

kafka-topics.sh --create --partitions 1 --replication-factor 3 --topic quickstart-events --bootstrap-server hd19.cmdschool.org:9092

创建成功会收到如下提示,

Created topic quickstart-events.

以上创建主题,然后你可以使用如下命令查询主题,

kafka-topics.sh --describe --topic quickstart-events --bootstrap-server hd20.cmdschool.org:9092

可见如下显示,

Topic: quickstart-events        TopicId: 7m1AW3MkQ4us6togB60exw PartitionCount: 1       ReplicationFactor: 3    Configs: min.insync.replicas=2,max.message.bytes=16777216
        Topic: quickstart-events        Partition: 0    Leader: 19      Replicas: 19,21,20      Isr: 19,21,20

把事件写入主题,

kafka-console-producer.sh --topic quickstart-events --bootstrap-server hd21.cmdschool.org:9092
>This is my first event
>This is my second event

以上按下【Ctrl+c】即可结束,然后你可以使用如下命令查询事件,

kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server hd19.cmdschool.org:9092

可见如下显示,

This is my first event
This is my second event

以上按下【Ctrl+c】即可结束

参阅文档
===========

官方首页
————
https://kafka.apache.org/

安装文档
————
https://kafka.apache.org/quickstart
https://kafka.apache.org/documentation/#gettingStarted
https://github.com/apache/kafka

事件流的概念
————-
https://baike.baidu.com/item/%E4%BA%8B%E4%BB%B6%E6%B5%81%E5%A4%84%E7%90%86/18748665?fr=aladdin

没有评论

发表回复

Apache-Hadoop
如何安装Kafka connect mqtt?

1 前言 一个问题,一篇文章,一出故事。 我们配置好Kafka connect集群后,我们来尝试安装 …

Apache-Hadoop
如何配置Kafka connect集群?

1 基础知识 1.1 Kafka Connect的介绍 – Kafak Connect是 …

Apache-Hadoop
如何二进制部署CMAK?

1 基础知识 1.1 CMAK 1.1.1 CMAK的介绍 – CMAK原称Kafka …