如何部署Oracle Linux 10.x Apache Kafka 3.0.0集群?
- By : Will
- Category : Apache-Hadoop
1 基础知识
2 最佳实践
2.1 配置Zookeeper集群
In hd0[1-3],
IP Address = 10.168.0.10[1-3] OS = Oracle Linux 10.x x86_64 Host Name = hd0[1-3].cmdschool.org
详细的角色分布如下,
Apache Hadoop ZooKeeper(zoo) = hd0[1-3].cmdschool.org.cmdschool.org
需要注意的是如果你尚未配置ZooKeeper,请参阅
2.2 部署软件包
In hd0[1-3],
2.2.1 下载安装包
cd ~ wget https://archive.apache.org/dist/kafka/3.0.0/kafka_2.13-3.0.0.tgz
另外,如果需要其他版本请从如下链接下载,
https://kafka.apache.org/community/downloads/
2.2.2 解压安装包
cd ~ tar -xf kafka_2.13-3.0.0.tgz
2.2.3 配置运行用户
groupadd kafka useradd -g kafka -d /var/lib/kafka kafka
2.2.4 部署安装包
cd ~ mv kafka_2.13-3.0.0 /usr/ mkdir -p /etc/kafka ln -s /usr/kafka_2.13-3.0.0/config/ /etc/kafka/conf ln -s /usr/kafka_2.13-3.0.0/logs/ /var/log/kafka chown kafka:kafka -R /usr/kafka_2.13-3.0.0 chmod 755 -R /usr/kafka_2.13-3.0.0
2.2.5 配置主到从节点的kafka用户公钥认证
In hd01,
su - kafka mkdir ~/.ssh ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa
以上命令创建公钥证书,以下命令利用root复制证书到各个从节点:
In hd01,
cp /var/lib/kafka/.ssh/id_rsa.pub /var/lib/kafka/.ssh/authorized_keys chown kafka:kafka -R /var/lib/kafka/.ssh/ chmod 0600 /var/lib/kafka/.ssh/authorized_keys ssh hd02 mkdir -p /var/lib/kafka/.ssh/ scp /var/lib/kafka/.ssh/id_rsa.pub hd02:/var/lib/kafka/.ssh/authorized_keys ssh hd02 chown kafka:kafka -R /var/lib/kafka/.ssh/ ssh hd02 chmod 0600 /var/lib/kafka/.ssh/authorized_keys ssh hd03 mkdir -p /var/lib/kafka/.ssh/ scp /var/lib/kafka/.ssh/id_rsa.pub hd03:/var/lib/kafka/.ssh/authorized_keys ssh hd03 chown kafka:kafka -R /var/lib/kafka/.ssh/ ssh hd03 chmod 0600 /var/lib/kafka/.ssh/authorized_keys
配置完成后,可使用如下命令测试,
In hd01,
su - kafka ssh hd01 ssh hd02 ssh hd03
2.2.6 配置从到主节点的kafka用户公钥认证
In hd01,
su - kafka scp /var/lib/kafka/.ssh/id_rsa hd02:/var/lib/kafka/.ssh/id_rsa scp /var/lib/kafka/.ssh/id_rsa hd03:/var/lib/kafka/.ssh/id_rsa
配置完成或,务必使用如下命令测试公钥认证,
In hd0[1-3],
su - kafka ssh hd01
注:以上,如果不用输入密码即可完成登录,则配置完成。
2.2.7 配置软件包的环境变量
vim /etc/profile.d/kafka.sh
加入如下定义,
export JAVA_HOME=/usr/java/jdk1.8.0_121
export KAFKA_HOME=/usr/kafka_2.13-3.0.0
export PATH=${KAFKA_HOME}/bin:$PATH
export KAFKA_MASTER=hd01:/usr/kafka_2.13-3.0.0
– 变量“KAFKA_HOME”声明KAFKA的家目录
– 变量“PATH”声明可执行文件的位置(加入KAFKA执行文件的声明)
创建完成后,你需要使用如下命令导入环境变量,
source /etc/profile.d/kafka.sh
2.3 配置集群
2.3.1 配置节点Broker ID
In hd01,
echo 'export BROKER_ID=1' >> /etc/profile.d/kafka.sh
In hd02,
echo 'export BROKER_ID=2' >> /etc/profile.d/kafka.sh
In hd03,
echo 'export BROKER_ID=3' >> /etc/profile.d/kafka.sh
2.3.2 修改主节点配置
In hd01,
cp /etc/kafka/conf/server.properties /etc/kafka/conf/server.properties.default vim /etc/kafka/conf/server.properties
配置修改如下,
listeners=PLAINTEXT://0.0.0.0:9092 log.dirs=/data/kafka num.partitions=3 log.retention.hours=168 zookeeper.connect=hd01.cmdschool.org:2181,hd02.cmdschool.org:2181,hd03.cmdschool.org:2181 default.replication.factor=3 message.max.bytes=16777216 min.insync.replicas=2
根据配置的需求,我们需要创建如下文件夹,
In hd0[1-3],
mkdir -p /data/kafka chown kafka:kafka -R /data/kafka chmod 755 -R /data/kafka
2.3.3 手动同步主节点配置
In hd0[1-3],
su - kafka -c 'rsync -a -e ssh --delete $KAFKA_MASTER/ "$KAFKA_HOME"'
2.3.4 手动启动服务
In hd0[1-3],
su - kafka -c '. /etc/profile;/usr/kafka_2.13-3.0.0/bin/kafka-server-start.sh /etc/kafka/conf/server.properties --override broker.id=${BROKER_ID} --override advertised.listeners=PLAINTEXT://${HOSTNAME}:9092'
如果你见到如下下警告提示,
egrep: warning: egrep is obsolescent; using grep -E egrep: warning: egrep is obsolescent; using grep -E #...
你可以尝试执行如下命令优化,
cp /usr/kafka_2.13-3.0.0/bin/kafka-run-class.sh /usr/kafka_2.13-3.0.0/bin/kafka-run-class.sh.default sed -i 's/egrep/grep -E/g' /usr/kafka_2.13-3.0.0/bin/kafka-run-class.sh
如果见到如下提示信息则启动正常,
#... [2026-03-18 13:04:48,886] INFO [KafkaServer id=1] started (kafka.server.KafkaServer) #...
服务启动后,你可以使用如下命令查看启动的进程,
pgrep -u kafka java -a
可见如下显示,
163189 /usr/java/jdk1.8.0_121/bin/java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xloggc:/usr/kafka_2.13-3.0.0/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/usr/kafka_2.13-3.0.0/bin/../logs -Dlog4j.configuration=file:/usr/kafka_2.13-3.0.0/bin/../config/log4j.properties -cp .:/usr/java/jdk1.8.0_121/lib:/usr/java/jdk1.8.0_121/jre/lib:/usr/kafka_2.13-3.0.0/bin/../libs/activation-1.1.1.jar:/usr/kafka_2.13-3.0.0/bin/../libs/aopalliance-repackaged-2.6.1.jar:/usr/kafka_2.13-3.0.0/bin/../libs/argparse4j-0.7.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/audience-annotations-0.5.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/commons-cli-1.4.jar:/usr/kafka_2.13-3.0.0/bin/../libs/commons-lang3-3.8.1.jar:/usr/kafka_2.13-3.0.0/bin/../libs/connect-api-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/connect-basic-auth-extension-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/connect-file-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/connect-json-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/connect-mirror-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/connect-mirror-client-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/connect-runtime-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/connect-transforms-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/hk2-api-2.6.1.jar:/usr/kafka_2.13-3.0.0/bin/../libs/hk2-locator-2.6.1.jar:/usr/kafka_2.13-3.0.0/bin/../libs/hk2-utils-2.6.1.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jackson-annotations-2.12.3.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jackson-core-2.12.3.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jackson-databind-2.12.3.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jackson-dataformat-csv-2.12.3.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jackson-datatype-jdk8-2.12.3.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jackson-jaxrs-base-2.12.3.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jackson-jaxrs-json-provider-2.12.3.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jackson-module-jaxb-annotations-2.12.3.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jackson-module-scala_2.13-2.12.3.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jakarta.activation-api-1.2.1.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jakarta.annotation-api-1.3.5.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jakarta.inject-2.6.1.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jakarta.validation-api-2.0.2.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/usr/kafka_2.13-3.0.0/bin/../libs/javassist-3.27.0-GA.jar:/usr/kafka_2.13-3.0.0/bin/../libs/javax.servlet-api-3.1.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/javax.ws.rs-api-2.1.1.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jaxb-api-2.3.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jersey-client-2.34.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jersey-common-2.34.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jersey-container-servlet-2.34.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jersey-container-servlet-core-2.34.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jersey-hk2-2.34.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jersey-server-2.34.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jetty-client-9.4.43.v20210629.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jetty-continuation-9.4.43.v20210629.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jetty-http-9.4.43.v20210629.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jetty-io-9.4.43.v20210629.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jetty-security-9.4.43.v20210629.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jetty-server-9.4.43.v20210629.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jetty-servlet-9.4.43.v20210629.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jetty-servlets-9.4.43.v20210629.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jetty-util-9.4.43.v20210629.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jetty-util-ajax-9.4.43.v20210629.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jline-3.12.1.jar:/usr/kafka_2.13-3.0.0/bin/../libs/jopt-simple-5.0.4.jar:/usr/kafka_2.13-3.0.0/bin/../libs/kafka_2.13-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/kafka-clients-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/kafka-log4j-appender-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/kafka-metadata-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/kafka-raft-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/kafka-server-common-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/kafka-shell-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/kafka-storage-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/kafka-storage-api-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/kafka-streams-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/kafka-streams-examples-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/kafka-streams-scala_2.13-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/kafka-streams-test-utils-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/kafka-tools-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/log4j-1.2.17.jar:/usr/kafka_2.13-3.0.0/bin/../libs/lz4-java-1.7.1.jar:/usr/kafka_2.13-3.0.0/bin/../libs/maven-artifact-3.8.1.jar:/usr/kafka_2.13-3.0.0/bin/../libs/metrics-core-2.2.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/metrics-core-4.1.12.1.jar:/usr/kafka_2.13-3.0.0/bin/../libs/netty-buffer-4.1.62.Final.jar:/usr/kafka_2.13-3.0.0/bin/../libs/netty-codec-4.1.62.Final.jar:/usr/kafka_2.13-3.0.0/bin/../libs/netty-common-4.1.62.Final.jar:/usr/kafka_2.13-3.0.0/bin/../libs/netty-handler-4.1.62.Final.jar:/usr/kafka_2.13-3.0.0/bin/../libs/netty-resolver-4.1.62.Final.jar:/usr/kafka_2.13-3.0.0/bin/../libs/netty-transport-4.1.62.Final.jar:/usr/kafka_2.13-3.0.0/bin/../libs/netty-transport-native-epoll-4.1.62.Final.jar:/usr/kafka_2.13-3.0.0/bin/../libs/netty-transport-native-unix-common-4.1.62.Final.jar:/usr/kafka_2.13-3.0.0/bin/../libs/osgi-resource-locator-1.0.3.jar:/usr/kafka_2.13-3.0.0/bin/../libs/paranamer-2.8.jar:/usr/kafka_2.13-3.0.0/bin/../libs/plexus-utils-3.2.1.jar:/usr/kafka_2.13-3.0.0/bin/../libs/reflections-0.9.12.jar:/usr/kafka_2.13-3.0.0/bin/../libs/rocksdbjni-6.19.3.jar:/usr/kafka_2.13-3.0.0/bin/../libs/scala-collection-compat_2.13-2.4.4.jar:/usr/kafka_2.13-3.0.0/bin/../libs/scala-java8-compat_2.13-1.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/scala-library-2.13.6.jar:/usr/kafka_2.13-3.0.0/bin/../libs/scala-logging_2.13-3.9.3.jar:/usr/kafka_2.13-3.0.0/bin/../libs/scala-reflect-2.13.6.jar:/usr/kafka_2.13-3.0.0/bin/../libs/slf4j-api-1.7.30.jar:/usr/kafka_2.13-3.0.0/bin/../libs/slf4j-log4j12-1.7.30.jar:/usr/kafka_2.13-3.0.0/bin/../libs/snappy-java-1.1.8.1.jar:/usr/kafka_2.13-3.0.0/bin/../libs/trogdor-3.0.0.jar:/usr/kafka_2.13-3.0.0/bin/../libs/zookeeper-3.6.3.jar:/usr/kafka_2.13-3.0.0/bin/../libs/zookeeper-jute-3.6.3.jar:/usr/kafka_2.13-3.0.0/bin/../libs/zstd-jni-1.5.0-2.jar kafka.Kafka /etc/kafka/conf/server.properties --override broker.id=1 --override advertised.listeners=PLAINTEXT://hd01.cmdschool.org:9092
然后,你可以使用如下命令查看服务倾听的端口,
ss -antp | grep -f <(pgrep -u kafka java)
可见如下显示,
LISTEN 0 50 *:9092 *:* users:(("java",pid=163189,fd=130))
LISTEN 0 50 *:37325 *:* users:(("java",pid=163189,fd=111))
ESTAB 0 0 [::ffff:10.168.0.101]:37592 [::ffff:10.168.0.101]:2181 users:(("java",pid=163189,fd=122))
ESTAB 0 0 [::ffff:10.168.0.101]:9092 [::ffff:10.168.0.101]:54968 users:(("java",pid=163189,fd=150))
ESTAB 0 0 [::ffff:10.168.0.101]:54968 [::ffff:10.168.0.101]:9092 users:(("java",pid=163189,fd=149))
根据以上端口倾听的规则,你可能需要使用如下命令开放防火墙端口,
firewall-cmd --permanent --add-port 9092/tcp firewall-cmd --reload firewall-cmd --list-all
如果启动异常,我们建议你检查以下日志,
tail -f /var/log/kafka/server.log
2.3.5 配置启动脚本
In hd0[1-3],
vim /usr/lib/systemd/system/kafka.service
加入如下配置,
[Unit]
Description=Apache Kafka manager
Wants=network.target
After=network.target network-online.target
Requires=network-online.target
Documentation=https://kafka.apache.org/documentation/#gettingStarted
[Service]
Type=simple
User=kafka
Group=kafka
Environment="JAVA_HOME=/usr/java/jdk1.8.0_121"
Environment="KAFKA_HOME=/usr/kafka_2.13-3.0.0"
Environment="KAFKA_MASTER=hd01:/usr/kafka_2.13-3.0.0"
Environment="BROKER_ID=1"
WorkingDirectory=/var/lib/kafka
ExecStartPre=rsync -a -e ssh --delete $KAFKA_MASTER/ "$KAFKA_HOME"
ExecStart=/usr/kafka_2.13-3.0.0/bin/kafka-server-start.sh /etc/kafka/conf/server.properties --override broker.id=${BROKER_ID} --override advertised.listeners=PLAINTEXT://%H:9092
ExecStop=/usr/bin/kill -SIGINT $MAINPID
Restart=on-failure
RestartSec=5s
LimitNOFILE=16384
LimitNPROC=16384
[Install]
WantedBy=multi-user.target
请注意,你需要根据实际情况定义每台服务器的BROKER_ID值,脚本创建后,需要使用如下命令重载使配置生效,
systemctl daemon-reload
然后可以使用如下命令启动服务并设置服务自启动,
systemctl start kafka.service systemctl status kafka.service systemctl enable kafka.service
2.4 测试Kafka
In hd01,
kafka-topics.sh --create --partitions 1 --replication-factor 3 --topic quickstart-events --bootstrap-server hd01.cmdschool.org:9092
创建成功会收到如下提示,
Created topic quickstart-events.
以上创建主题,然后你可以使用如下命令查询主题,
kafka-topics.sh --describe --topic quickstart-events --bootstrap-server hd02.cmdschool.org:9092
可见如下显示,
Topic: quickstart-events TopicId: D6ASlFuoTl2iNhPzed985w PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2,max.message.bytes=16777216
Topic: quickstart-events Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
把事件写入主题,
kafka-console-producer.sh --topic quickstart-events --bootstrap-server hd03.cmdschool.org:9092 >This is my first event >This is my second event
以上按下【Ctrl+c】即可结束,然后你可以使用如下命令查询事件,
kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server hd01.cmdschool.org:9092
可见如下显示,
This is my first event This is my second event
以上按下【Ctrl+c】即可结束
没有评论