如何配置Kafka connect集群?
- By : Will
- Category : Apache-Hadoop
1 基础知识
1.1 Kafka Connect的介绍
– Kafak Connect是一个框架,支持将数据流导入和导出Apache Kafka
– Kafka Connect是Apache Kafka一个免费、开源组件
– Kafka Connect支持与数据库、健值存储、搜索索引和文件系统之间简单集成
1.2 Kafka Connect的作用
– Kafka Connect用于Apache Kafka与其他系统之间可扩展且可靠传输数据流的工具
– Kafak Connect支持大量的数据流入和流出Kakfa
– Kafka Connect支持收集数据库或应用服务器的事件流到Kafka的主题并进行低延迟流处理
– Kafka Connect支持导出数据流到辅助存储、查询系统或批处理系统进行离线分析
1.3 Kafka Connect的特点
– 以数据为中心的管道(类似Linux链接各个进程间的管道)
– 灵活和可伸缩(单台到多台机器群集都支持)
– 可重用性和可扩展性(流式管道可服用和支持分布式群集)
1.4 Kafka Connect的架构
– 独立模式,所有流处理都在一个进程中执行,建议用于测试
– 分布模式,支持负载平衡和横向扩展
1.5 Kafka Connect的工作模式
– 源连接器(Source connector)从数据库中的表或应用程序服务器中收集数据并以流形式传输到Kafka主题
– 导槽连接器(Sink Connector),支持从Kafka主题传输数据到二级索引(如Elasticsearch或Hadoop等批处理系统)进行离线分析
1.6 Kafka Connect的插件介绍
– Kafka Connect插件是一组JAR文件
– Kafka Connect插件包含一个或多个连接器、转换或转换器实现
– Kafka Connect插件相互隔离互不影响
– Kafka Connect插件需要确保每个插件只安装一个版本
2 最佳实践
2.1 Kafka集群环境部署
2.2 配置Kafka Connect集群
In hd[19-21],
2.2.1 修改Kafka Connect配置文件
useradd -g kafka -d /var/lib/kafka-connect kafka-connect
2.2.2 修改Kafka Connect配置文件
cp /etc/kafka/conf/connect-distributed.properties /etc/kafka/conf/connect-distributed.properties.default vim /etc/kafka/conf/connect-distributed.properties
修改如下参数,
bootstrap.servers=hd19.cmdschool.org:9092,hd20.cmdschool.org:9092,hd21.cmdschool.org:9092 group.id=connect-cluster key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true offset.storage.topic=connect-offsets offset.storage.replication.factor=3 config.storage.topic=connect-configs config.storage.replication.factor=3 status.storage.topic=connect-status status.storage.replication.factor=3 listeners=HTTP://:8083 offset.flush.interval.ms=10000 plugin.path=/usr/share/java
根据以上配置,我们需要创建如下文件夹,
mkdir -p /usr/share/java
然后,我们可以使用如下命令测试服务启动,
su - kafka /usr/kafka_2.13-3.0.0/bin/connect-distributed.sh /etc/kafka/conf/connect-distributed.properties
以上按下【Ctrl+c】终止程序运行,程序启动后,可使用如下命令查看程序倾听的端口,
netstat -antp | grep `pgrep -u kafka-connect java` | grep :::
可见如下信息,
tcp6 0 0 :::8083 :::* LISTEN 3661/java tcp6 0 0 :::39672 :::* LISTEN 3661/java
2.2.3 配置启动脚本
vim /usr/lib/systemd/system/kafka-connect.service
加入如下配置,
[Unit] Description=Apache Kafka connect manager Wants=network.target kafka.service After=network.target kafka.service Documentation=https://kafka.apache.org/documentation/#connect [Service] Type=simple User=kafka-connect WorkingDirectory=/var/lib/kafka-connect ExecStart=/bin/sh -c '. /etc/profile;/usr/kafka_2.13-3.0.0/bin/connect-distributed.sh /etc/kafka/conf/connect-distributed.properties' ExecStop=/bin/sh -c '. /etc/profile;/usr/kafka_2.13-3.0.0/bin/connect-distributed.sh /etc/kafka/conf/connect-distributed.properties' [Install] WantedBy=multi-user.target
脚本创建后,需要使用如下命令重载使配置生效,
systemctl daemon-reload
然后可以使用如下命令服务控制,
systemctl start kafka-connect.service systemctl restart kafka-connect.service systemctl stop kafka-connect.service systemctl status kafka-connect.service
然后可以使用如下命令设置服务自动启动,
systemctl enable kafka-connect.service
2.2.4 开放应用所需的端口
firewall-cmd --permanent --add-port 8083/tcp firewall-cmd --reload firewall-cmd --list-all
2.2.5 测试服务访问
yum install -y yajl
以上安装Json格式化工具,然后我们使用如下命令测试,
curl -s http://hd19.cmdschool.org:8083 | json_reformat curl -s http://hd20.cmdschool.org:8083 | json_reformat curl -s http://hd21.cmdschool.org:8083 | json_reformat
可见如下返回信息,
{ "version": "3.0.0", "commit": "8cb0a5e9d3441962", "kafka_cluster_id": "N__UTzE3RXq1GK_9W1nVwA" }
2.3 安装kafka-connect插件
参阅文档
=================
Apache Kafka 连接器
——————–
https://kafka.apache.org/documentation/#connect
没有评论