如何配置Confluent Kafka connect集群?

Confluent

1 基础知识

1.1 Kafka Connect的介绍

– Kafak Connect是一个框架,支持将数据流导入和导出Apache Kafka
– Kafka Connect是Apache Kafka一个免费、开源组件
– Kafka Connect支持与数据库、健值存储、搜索索引和文件系统之间简单集成

1.2 Kafka Connect的作用

– Kafka Connect用于Apache Kafka与其他系统之间可扩展且可靠传输数据流的工具
– Kafak Connect支持大量的数据流入和流出Kakfa
– Kafka Connect支持收集数据库或应用服务器的事件流到Kafka的主题并进行低延迟流处理
– Kafka Connect支持导出数据流到辅助存储、查询系统或批处理系统进行离线分析

1.3 Kafka Connect的特点

– 以数据为中心的管道(类似Linux链接各个进程间的管道)
– 灵活和可伸缩(单台到多台机器群集都支持)
– 可重用性和可扩展性(流式管道可服用和支持分布式群集)

1.4 Kafka Connect的架构

– 独立模式,所有流处理都在一个进程中执行,建议用于测试
– 分布模式,支持负载平衡和横向扩展

1.5 Kafka Connect的工作模式

– 源连接器(Source connector)从数据库中的表或应用程序服务器中收集数据并以流形式传输到Kafka主题
– 导槽连接器(Sink Connector),支持从Kafka主题传输数据到二级索引(如Elasticsearch或Hadoop等批处理系统)进行离线分析

1.6 Kafka Connect的插件介绍

– Kafka Connect插件是一组JAR文件
– Kafka Connect插件包含一个或多个连接器、转换或转换器实现
– Kafka Connect插件相互隔离互不影响
– Kafka Connect插件需要确保每个插件只安装一个版本

1.7 Confluent平台的连接器

– Confluent平台附带几个内置的连接器
– Confluent支持数据流到常用系统或从常用系统导入
– Confluent支持关系型数据库或HDFS
– Confluent详细插件请参阅如下链接,
https://www.confluent.io/product/connectors/?_ga=2.18072048.852791858.1655884896-259847667.1655884896

2 最佳实践

2.1 Kafka集群环境部署

如何部署Confluent Kafka集群?

2.2 配置Kafka Connect集群

In cfkafka0[2-4],

2.2.1 修改Kafka Connect配置文件

cp /etc/kafka/connect-distributed.properties /etc/kafka/connect-distributed.properties.default
vim /etc/kafka/connect-distributed.properties

修改如下参数,

bootstrap.servers=cfkafka02:9092,cfkafka03:9092,cfkafka04:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
config.storage.topic=connect-configs
config.storage.replication.factor=3
status.storage.topic=connect-status
status.storage.replication.factor=3
offset.flush.interval.ms=10000
plugin.path=/usr/share/java

另外,如下参数是配置“控制中心服务”额外添加的,如果有请保留,

consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor

2.2.2 启动Kafka Connect服务

vim /usr/lib/systemd/system/confluent-kafka-connect.service

配置修改如下,

[Unit]
Description=Apache Kafka Connect - distributed
Documentation=http://docs.confluent.io/
After=network.target confluent-server.target

[Service]
Environment="EXTRA_ARGS=-Duser.dir=/data/kafka-connect"
Type=simple
User=cp-kafka-connect
Group=confluent
ExecStart=/usr/bin/connect-distributed /etc/kafka/connect-distributed.properties
TimeoutStopSec=180
Restart=no

[Install]
WantedBy=multi-user.target

根据配置的需求,我们需要使用如下命令创建用户目录,

mkdir -p /data/kafka-connect
chown cp-kafka-connect:confluent -R /data/kafka-connect
chmod 770 -R /data/kafka-connect

然后,你需要使用如下命令重载服务使配置生效,

systemctl daemon-reload

然后,请使用如下命令启动服务并查看服务状态,

systemctl start confluent-kafka-connect
systemctl status confluent-kafka-connect

另外,如果遇到错误,可使用如下命令手动启动调试,

sudo -u cp-kafka-connect /usr/bin/connect-distributed /etc/kafka/connect-distributed.properties

另外,你需要使用如下命令授予运行用户“cp-kafka-connect”的默认组“confluent”写入权限,

chmod 775 -R /var/log/kafka/

这样,你才能使用如下命令查看到Kafka Connect的日志,详细命令如下,

tail /var/log/kafka/connect.log

然后,可以使用如下命令查看启动的进程号和端口号,

pgrep -u cp-kafka-connect java
netstat -antp | grep `pgrep -u cp-kafka-connect java` | grep :::

最后一条命令显示如下,

tcp6       0      0 :::8083                 :::*                    LISTEN      17536/java
tcp6       0      0 :::40347                :::*                    LISTEN      17536/java

另外,我们还需要使用如下命令配置服务自动启动,

systemctl enable confluent-kafka-connect

如果遇到运行中的错误,你可以使用如下命令输出运行日志,

journalctl -u confluent-kafka-connect.service --since "1 hour ago"

详细的进程信息,你可以参考如下命令,

jinfo `pgrep -u cp-kafka-connect java`
pgrep -u cp-kafka-connect java -a

2.3 配置控制中心管理Kafka Connect

In cfkafka01,

2.3.1 修改配置文件

vim /etc/confluent-control-center/control-center-production.properties

加入如下参数,

confluent.controlcenter.connect.connect-cluster.cluster=http://cfkafka02:8083,http://cfkafka03:8083,http://cfkafka04:8083

2.3.2 重启服务使配置生效

systemctl restart confluent-control-center

2.3.3 验证配置

http://cfkafka01.cmdschool.org:9021/
登录“控制中心”界面
单击【Connect Clusters】->【Connect】
界面显示如下,

2.4 安装Kafka Connect插件

In cfkafka0[2-4],

2.4.1 下载插件安装包

wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-ftps/versions/1.0.3-preview/confluentinc-kafka-connect-ftps-1.0.3-preview.zip

其他版本请从如下链接下载,
https://www.confluent.io/hub/confluentinc/kafka-connect-ftps

2.4.2 安装插件安装包

confluent-hub install confluentinc-kafka-connect-ftps-1.0.3-preview.zip

详细安装向导如下,

confluent-hub install confluentinc-kafka-connect-ftps-1.0.3-preview.zip
The component can be installed in any of the following Confluent Platform installations:
  1. / (installed rpm/deb package)
  2. / (where this tool is installed)
Choose one of these to continue the installation (1-2): 1
Do you want to install this into /usr/share/confluent-hub-components? (yN) n

Specify installation directory: /usr/share/java

Component's license:
Confluent Software Evaluation License
https://www.confluent.io/software-evaluation-license
I agree to the software license agreement (yN) y

Installing a component Kafka Connect Ftps Connector 1.0.3-preview, provided by Confluent, Inc. from the local file: confluentinc-kafka-connect-ftps-1.0.3-preview.zip into directory: /usr/share/java
Detected Worker's configs:
  1. Standard: /etc/kafka/connect-distributed.properties
  2. Standard: /etc/kafka/connect-standalone.properties
  3. Standard: /etc/schema-registry/connect-avro-distributed.properties
  4. Standard: /etc/schema-registry/connect-avro-standalone.properties
  5. Used by Connect process with PID : /etc/kafka/connect-distributed.properties
Do you want to update all detected configs? (yN) y

Adding installation directory to plugin path in the following files:
  /etc/kafka/connect-distributed.properties
  /etc/kafka/connect-standalone.properties
  /etc/schema-registry/connect-avro-distributed.properties
  /etc/schema-registry/connect-avro-standalone.properties
  /etc/kafka/connect-distributed.properties

Completed

http://cfkafka01.cmdschool.org:9021/
登录“控制中心”界面
单击【Connect Clusters】->【Connect】
界面显示如下,

如上图所示,
单击单击【Connect Clusters】->【Add connector】即可看到需要配置的连接器,

注:具体配置请自行摸索,本章不再详述

参阅文档
====================
连接器的概念
—————–
https://docs.confluent.io/platform/current/connect/concepts.html
https://docs.confluent.io/platform/current/connect/index.html

连接器插件安装
——————
https://docs.confluent.io/platform/current/connect/userguide.html#connect-installing-plugins

https://docs.confluent.io/home/connect/self-managed/userguide.html
https://kafka.apache.org/documentation/#connectconfigs

Apache Kafka 连接器
——————–
https://kafka.apache.org/documentation/#connect

MQTT Connector fails with MqttException(0)
————————
https://support.confluent.io/hc/en-us/articles/8241836881172-MQTT-Connector-fails-with-MqttException-0-

没有评论

发表回复

Confluent
如何更新Kafka topic的Replication Factor?

1 前言 一个问题,一篇文章,一出故事。 笔者部署完Confluent Kafka集群后发现Kafk …

Confluent
如何部署Confluent ksqlDB集群?

1 基础知识 1.1 KSQL 1.1.1 KSQL的概念 – KSQL与SQL数据库完 …

Confluent
如何修复Kafka主题connect-configs报错?

1 前言 一个问题,一篇文章,一出故事。 笔者安装完成Kafka connect后发现Kafka疯狂 …