如何部署Oracle Linux 10.x Apache Kafka 3.0.0集群?

Apache-Hadoop

1 基础知识

如何配置独立的Kafka集群?

2 最佳实践

2.1 配置Zookeeper集群

In hd0[1-3],

IP Address = 10.168.0.10[1-3]
OS = Oracle Linux 10.x x86_64
Host Name = hd0[1-3].cmdschool.org

详细的角色分布如下,

Apache Hadoop ZooKeeper(zoo) = hd0[1-3].cmdschool.org.cmdschool.org

需要注意的是如果你尚未配置ZooKeeper,请参阅

如何部署Oracle Linux 10.x Apache ZooKeeper 3.4.5集群?

2.2 部署软件包

In hd0[1-3],

2.2.1 下载安装包

cd ~
wget https://archive.apache.org/dist/kafka/3.0.0/kafka_2.13-3.0.0.tgz

另外,如果需要其他版本请从如下链接下载,
https://kafka.apache.org/community/downloads/

2.2.2 解压安装包

cd ~
tar -xf kafka_2.13-3.0.0.tgz

2.2.3 配置运行用户

groupadd kafka
useradd -g kafka -d /var/lib/kafka kafka

2.2.4 部署安装包

cd ~
mv kafka_2.13-3.0.0 /usr/
mkdir -p /etc/kafka
ln -s /usr/kafka_2.13-3.0.0/config/ /etc/kafka/conf
ln -s /usr/kafka_2.13-3.0.0/logs/ /var/log/kafka
chown kafka:kafka -R /usr/kafka_2.13-3.0.0
chmod 755 -R /usr/kafka_2.13-3.0.0

2.2.5 配置主到从节点的kafka用户公钥认证

In hd01,

su - kafka
mkdir ~/.ssh
ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa

以上命令创建公钥证书,以下命令利用root复制证书到各个从节点:
In hd01,

cp /var/lib/kafka/.ssh/id_rsa.pub /var/lib/kafka/.ssh/authorized_keys
chown kafka:kafka -R /var/lib/kafka/.ssh/
chmod 0600 /var/lib/kafka/.ssh/authorized_keys

ssh hd02 mkdir -p /var/lib/kafka/.ssh/
scp /var/lib/kafka/.ssh/id_rsa.pub hd02:/var/lib/kafka/.ssh/authorized_keys
ssh hd02 chown kafka:kafka -R /var/lib/kafka/.ssh/
ssh hd02 chmod 0600 /var/lib/kafka/.ssh/authorized_keys

ssh hd03 mkdir -p /var/lib/kafka/.ssh/
scp /var/lib/kafka/.ssh/id_rsa.pub hd03:/var/lib/kafka/.ssh/authorized_keys
ssh hd03 chown kafka:kafka -R /var/lib/kafka/.ssh/
ssh hd03 chmod 0600 /var/lib/kafka/.ssh/authorized_keys

配置完成后,可使用如下命令测试,
In hd01,

su - kafka
ssh hd01
ssh hd02
ssh hd03

2.2.6 配置从到主节点的kafka用户公钥认证

In hd01,

su - kafka
scp /var/lib/kafka/.ssh/id_rsa hd02:/var/lib/kafka/.ssh/id_rsa
scp /var/lib/kafka/.ssh/id_rsa hd03:/var/lib/kafka/.ssh/id_rsa

配置完成或,务必使用如下命令测试公钥认证,
In hd0[1-3],

su - kafka
ssh hd01

注:以上,如果不用输入密码即可完成登录,则配置完成。

2.2.7 配置软件包的环境变量

vim /etc/profile.d/kafka.sh

加入如下定义,

export JAVA_HOME=/usr/java/jdk1.8.0_121
export KAFKA_HOME=/usr/kafka_2.13-3.0.0
export PATH=${KAFKA_HOME}/bin:$PATH
export KAFKA_MASTER=hd01:/usr/kafka_2.13-3.0.0

– 变量“KAFKA_HOME”声明KAFKA的家目录
– 变量“PATH”声明可执行文件的位置(加入KAFKA执行文件的声明)
创建完成后,你需要使用如下命令导入环境变量,

source /etc/profile.d/kafka.sh

2.3 配置集群

2.3.1 配置节点Broker ID

In hd01,

echo 'export BROKER_ID=1' >> /etc/profile.d/kafka.sh

In hd02,

echo 'export BROKER_ID=2' >> /etc/profile.d/kafka.sh

In hd03,

echo 'export BROKER_ID=3' >> /etc/profile.d/kafka.sh

2.3.2 修改主节点配置

In hd01,

cp /etc/kafka/conf/server.properties /etc/kafka/conf/server.properties.default
vim /etc/kafka/conf/server.properties

配置修改如下,

listeners=PLAINTEXT://0.0.0.0:9092
log.dirs=/data/kafka
num.partitions=3
log.retention.hours=168
zookeeper.connect=hd01.cmdschool.org:2181,hd02.cmdschool.org:2181,hd03.cmdschool.org:2181
default.replication.factor=3
message.max.bytes=16777216
min.insync.replicas=2

根据配置的需求,我们需要创建如下文件夹,
In hd0[1-3],

mkdir -p /data/kafka
chown kafka:kafka -R /data/kafka
chmod 755 -R /data/kafka

2.3.3 手动同步主节点配置

In hd0[1-3],

su - kafka -c 'rsync -a -e ssh --delete $KAFKA_MASTER/ "$KAFKA_HOME"'

2.3.4 手动启动服务

In hd0[1-3],

su - kafka -c '. /etc/profile;/usr/kafka_2.13-3.0.0/bin/kafka-server-start.sh /etc/kafka/conf/server.properties --override broker.id=${BROKER_ID} --override advertised.listeners=PLAINTEXT://${HOSTNAME}:9092'

如果你见到如下下警告提示,

egrep: warning: egrep is obsolescent; using grep -E
egrep: warning: egrep is obsolescent; using grep -E
#...

你可以尝试执行如下命令优化,

cp /usr/kafka_2.13-3.0.0/bin/kafka-run-class.sh /usr/kafka_2.13-3.0.0/bin/kafka-run-class.sh.default
sed -i 's/egrep/grep -E/g' /usr/kafka_2.13-3.0.0/bin/kafka-run-class.sh

如果见到如下提示信息则启动正常,

#...
[2026-03-18 13:04:48,886] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)
#...

服务启动后,你可以使用如下命令查看启动的进程,

pgrep -u kafka java -a

可见如下显示,

163189 /usr/java/jdk1.8.0_121/bin/java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xloggc:/usr/kafka_2.13-3.0.0/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/usr/kafka_2.13-3.0.0/bin/../logs -Dlog4j.configuration=file:/usr/kafka_2.13-3.0.0/bin/../config/log4j.properties -cp .:/usr/java/jdk1.8.0_121/lib:/usr/java/jdk1.8.0_121/jre/lib:/usr/kafka_2.13-3.0.0/bin/../libs/activation-1.1.1.jar:/usr/kafka_2.13-3.0.0/bin/../libs/aopalliance-repackaged-2.6.1.jar:/usr/kafka_2.13-3.0.0/bin/../libs/argparse4j-0.7.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/audience-annotations-0.5.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/commons-cli-1.4.jar:/usr/kafka_2.13-3.0.0/bin/../libs/commons-lang3-3.8.1.jar:/usr/kafka_2.13-3.0.0/bin/../libs/connect-api-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/connect-basic-auth-extension-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/connect-file-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/connect-json-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/connect-mirror-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/connect-mirror-client-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/connect-runtime-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/connect-transforms-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/hk2-api-2.6.1.jar:/usr/kafka_2.13-3.0.0/bin/../libs/hk2-locator-2.6.1.jar:/usr/kafka_2.13-3.0.0/bin/../libs/hk2-utils-2.6.1.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jackson-annotations-2.12.3.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jackson-core-2.12.3.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jackson-databind-2.12.3.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jackson-dataformat-csv-2.12.3.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jackson-datatype-jdk8-2.12.3.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jackson-jaxrs-base-2.12.3.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jackson-jaxrs-json-provider-2.12.3.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jackson-module-jaxb-annotations-2.12.3.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jackson-module-scala_2.13-2.12.3.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jakarta.activation-api-1.2.1.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jakarta.annotation-api-1.3.5.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jakarta.inject-2.6.1.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jakarta.validation-api-2.0.2.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/usr/kafka_2.13-3.0.0/bin/../libs/javassist-3.27.0-GA.jar:/usr/kafka_2.13-3.0.0/bin/../libs/javax.servlet-api-3.1.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/javax.ws.rs-api-2.1.1.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jaxb-api-2.3.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jersey-client-2.34.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jersey-common-2.34.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jersey-container-servlet-2.34.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jersey-container-servlet-core-2.34.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jersey-hk2-2.34.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jersey-server-2.34.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jetty-client-9.4.43.v20210629.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jetty-continuation-9.4.43.v20210629.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jetty-http-9.4.43.v20210629.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jetty-io-9.4.43.v20210629.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jetty-security-9.4.43.v20210629.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jetty-server-9.4.43.v20210629.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jetty-servlet-9.4.43.v20210629.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jetty-servlets-9.4.43.v20210629.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jetty-util-9.4.43.v20210629.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jetty-util-ajax-9.4.43.v20210629.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jline-3.12.1.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jopt-simple-5.0.4.jar:/usr/kafka_2.13-3.0.0/bin/../libs/kafka_2.13-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/kafka-clients-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/kafka-log4j-appender-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/kafka-metadata-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/kafka-raft-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/kafka-server-common-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/kafka-shell-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/kafka-storage-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/kafka-storage-api-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/kafka-streams-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/kafka-streams-examples-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/kafka-streams-scala_2.13-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/kafka-streams-test-utils-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/kafka-tools-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/log4j-1.2.17.jar:/usr/kafka_2.13-3.0.0/bin/../libs/lz4-java-1.7.1.jar:/usr/kafka_2.13-3.0.0/bin/../libs/maven-artifact-3.8.1.jar:/usr/kafka_2.13-3.0.0/bin/../libs/metrics-core-2.2.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/metrics-core-4.1.12.1.jar:/usr/kafka_2.13-3.0.0/bin/../libs/netty-buffer-4.1.62.Final.jar:/usr/kafka_2.13-3.0.0/bin/../libs/netty-codec-4.1.62.Final.jar:/usr/kafka_2.13-3.0.0/bin/../libs/netty-common-4.1.62.Final.jar:/usr/kafka_2.13-3.0.0/bin/../libs/netty-handler-4.1.62.Final.jar:/usr/kafka_2.13-3.0.0/bin/../libs/netty-resolver-4.1.62.Final.jar:/usr/kafka_2.13-3.0.0/bin/../libs/netty-transport-4.1.62.Final.jar:/usr/kafka_2.13-3.0.0/bin/../libs/netty-transport-native-epoll-4.1.62.Final.jar:/usr/kafka_2.13-3.0.0/bin/../libs/netty-transport-native-unix-common-4.1.62.Final.jar:/usr/kafka_2.13-3.0.0/bin/../libs/osgi-resource-locator-1.0.3.jar:/usr/kafka_2.13-3.0.0/bin/../libs/paranamer-2.8.jar:/usr/kafka_2.13-3.0.0/bin/../libs/plexus-utils-3.2.1.jar:/usr/kafka_2.13-3.0.0/bin/../libs/reflections-0.9.12.jar:/usr/kafka_2.13-3.0.0/bin/../libs/rocksdbjni-6.19.3.jar:/usr/kafka_2.13-3.0.0/bin/../libs/scala-collection-compat_2.13-2.4.4.jar:/usr/kafka_2.13-3.0.0/bin/../libs/scala-java8-compat_2.13-1.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/scala-library-2.13.6.jar:/usr/kafka_2.13-3.0.0/bin/../libs/scala-logging_2.13-3.9.3.jar:/usr/kafka_2.13-3.0.0/bin/../libs/scala-reflect-2.13.6.jar:/usr/kafka_2.13-3.0.0/bin/../libs/slf4j-api-1.7.30.jar:/usr/kafka_2.13-3.0.0/bin/../libs/slf4j-log4j12-1.7.30.jar:/usr/kafka_2.13-3.0.0/bin/../libs/snappy-java-1.1.8.1.jar:/usr/kafka_2.13-3.0.0/bin/../libs/trogdor-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/zookeeper-3.6.3.jar:/usr/kafka_2.13-3.0.0/bin/../libs/zookeeper-jute-3.6.3.jar:/usr/kafka_2.13-3.0.0/bin/../libs/zstd-jni-1.5.0-2.jar kafka.Kafka /etc/kafka/conf/server.properties --override broker.id=1 --override advertised.listeners=PLAINTEXT://hd01.cmdschool.org:9092

然后,你可以使用如下命令查看服务倾听的端口,

ss -antp | grep -f <(pgrep -u kafka java)

可见如下显示,

LISTEN     0      50                          *:9092                       *:*     users:(("java",pid=163189,fd=130))                                       
LISTEN     0      50                          *:37325                      *:*     users:(("java",pid=163189,fd=111))                                       
ESTAB      0      0      [::ffff:10.168.0.101]:37592 [::ffff:10.168.0.101]:2181  users:(("java",pid=163189,fd=122))                                       
ESTAB      0      0      [::ffff:10.168.0.101]:9092  [::ffff:10.168.0.101]:54968 users:(("java",pid=163189,fd=150))                                       
ESTAB      0      0      [::ffff:10.168.0.101]:54968 [::ffff:10.168.0.101]:9092  users:(("java",pid=163189,fd=149))   

根据以上端口倾听的规则,你可能需要使用如下命令开放防火墙端口,

firewall-cmd --permanent --add-port 9092/tcp
firewall-cmd --reload
firewall-cmd --list-all

如果启动异常,我们建议你检查以下日志,

tail -f /var/log/kafka/server.log

2.3.5 配置启动脚本

In hd0[1-3],

vim /usr/lib/systemd/system/kafka.service

加入如下配置,

[Unit]
Description=Apache Kafka manager
Wants=network.target
After=network.target network-online.target
Requires=network-online.target
Documentation=https://kafka.apache.org/documentation/#gettingStarted

[Service]
Type=simple
User=kafka
Group=kafka

Environment="JAVA_HOME=/usr/java/jdk1.8.0_121"
Environment="KAFKA_HOME=/usr/kafka_2.13-3.0.0"
Environment="KAFKA_MASTER=hd01:/usr/kafka_2.13-3.0.0"
Environment="BROKER_ID=1"

WorkingDirectory=/var/lib/kafka
ExecStartPre=rsync -a -e ssh --delete $KAFKA_MASTER/ "$KAFKA_HOME"
ExecStart=/usr/kafka_2.13-3.0.0/bin/kafka-server-start.sh /etc/kafka/conf/server.properties --override broker.id=${BROKER_ID} --override advertised.listeners=PLAINTEXT://%H:9092
ExecStop=/usr/bin/kill -SIGINT $MAINPID

Restart=on-failure
RestartSec=5s
LimitNOFILE=16384
LimitNPROC=16384

[Install]
WantedBy=multi-user.target

请注意,你需要根据实际情况定义每台服务器的BROKER_ID值,脚本创建后,需要使用如下命令重载使配置生效,

systemctl daemon-reload

然后可以使用如下命令启动服务并设置服务自启动,

systemctl start kafka.service
systemctl status kafka.service
systemctl enable kafka.service

2.4 测试Kafka

In hd01,

kafka-topics.sh --create --partitions 1 --replication-factor 3 --topic quickstart-events --bootstrap-server hd01.cmdschool.org:9092

创建成功会收到如下提示,

Created topic quickstart-events.

以上创建主题,然后你可以使用如下命令查询主题,

kafka-topics.sh --describe --topic quickstart-events --bootstrap-server hd02.cmdschool.org:9092

可见如下显示,

Topic: quickstart-events        TopicId: D6ASlFuoTl2iNhPzed985w PartitionCount: 1       ReplicationFactor: 3    Configs: min.insync.replicas=2,max.message.bytes=16777216
        Topic: quickstart-events        Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3

把事件写入主题,

kafka-console-producer.sh --topic quickstart-events --bootstrap-server hd03.cmdschool.org:9092
>This is my first event
>This is my second event

以上按下【Ctrl+c】即可结束,然后你可以使用如下命令查询事件,

kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server hd01.cmdschool.org:9092

可见如下显示,

This is my first event
This is my second event

以上按下【Ctrl+c】即可结束

没有评论

发表回复

Apache-Hadoop
如何解决Apache Kafka启动异常问题?

1 前言 一个问题,一篇文章,一出故事。 今天安装Apache Kafka,不慎给了已经使用的bro …

Apache-Hadoop
如何部署Oracle Linux 10.x Apache HBase 1.2.0集群?

1 基础知识 如何部署HBase的集群? 2 最佳实践 2.1 环境信息 2.1.1 角色Hadoo …

Apache-Hadoop
如何部署Oracle Linux 10.x Apache ZooKeeper 3.4.5集群?

1 基础知识 如何部署Apache ZooKeeper 3.4.x集群? 2 最佳实践 2.1 环境 …