kafka使用个人总结整理,持续更新
命令后追加
一、Kafka后台操作命令
1. 启动Kafka服务:./bin/kafka-server-start.sh config/server.properties
启动主题:执行./bin/kafka-topics.sh命令并运行参数为--create --zookeeper localhost:2181 --replication-factor 1 --partitions 1以及--topic topic_name
3. 查看所有主题:./bin/kafka-topics.sh --list --zookeeper localhost:2181
获取关于特定主题的详细信息:运行./bin/kafka-topics.sh并输入相关参数
启动消息发送任务:指定Kafka控制台生产者脚本文件./bin/kafka-console-producer.sh,并设置Kafka经纪节点列表为localhost:9092以及主题名称为topic_name
接收消息信息:./bin/kafka-console-consumer.sh –启动服务器:localhost:9092 –主题名称:topic_name –自始至终
7. 查看消费者组信息:在二进制目录中运行 Kafka 消费者组管理脚本 ./bin/kafka-consumer-groups.sh,并指定 zookeeper 服务器地址为 localhost:9092 并列出已注册的消费者组
查询指定消费者组的详细配置信息:./bin/kafka-consumer-groups.sh -b localhost:9092 -d -g group_name
二、Kafka集群搭建步骤
1. 安装Zookeeper和Kafka
2. 修改Kafka配置文件config/server.properties中的以下配置:
broker node identifier = 0 # 确保每个节点的 broker node identifier 唯一
Listen on PLAINTEXT://localhost:9092 # 指定监听地址及端口
adverTrusted.listeners := PLAINTEXT://localhost:9092 # 指定外部发布使用的地址及端口
3. 在Zookeeper的配置文件conf/zoo.cfg中添加以下内容:
tickTime=2000
dataDir=/tmp/zookeeper
clientPort=2181
启动Zookeeper服务:在/bin目录下运行zookeeper-server-start.sh脚本,并配置相关参数到zookeeper.properties文件中
5. 启动Kafka服务:./bin/kafka-server-start.sh config/server.properties
三、Kafka Connect集群操作步骤
1. 安装Kafka Connect
2. 配置Kafka连接工具中的config(connect-distributed.properties)文件中,请按照以下步骤进行设置:
bootstrap.servers=localhost:9092 # 服务端口位置
group.id=connect-cluster # 集群标识符
key conversor=org.apache.kafka.connect.json.JsonConverter # 键转换工具
value conversor=org.apache.kafka.connect.json.JsonConverter # 值处理组件
offset.storage.topic(connect-offsets) # 偏移数据存储位置
config.storage.topic(connect-configs) # 配置文件存储路径
status.storage.topic(connect-status) # 状态码持久化位置
运行KafkaConnect服务:使用命令./bin/kafka-connect-distributed.sh配置连接所需的属性文件
启动连接器:执行./bin(connect-stand-alone.sh [config] connect-stand-alone.properties [config] connector.properties)
bootstraps - server 语法:
如果端口占用
exportJMX _ PORT =1100
listtopic
./ current directory's bin / kafka - topics . sh -- bootstrap -server ip-- list -- command - config / opt / kafka / config / console - admin . properties
bin/kafka-topics.sh -- bootstrap server IP地址启动 -- list 列出所有启动脚本 -- command 显示配置信息 opt/kafka/config-console管理员.properties
showtopicmsg
./ current directory's bin folder, kafka -console-consumer.sh script, which is located at ./bin/kafka-console-consumer.sh
在工作目录下的kafka消费者组脚本,在启动bootstrap服务器并指定ip地址后会自动列出当前注册的消费者组信息。随后将该脚本执行一个命令参数为-list的任务,并将配置文件配置内容传递给 admin属性的控制台界面。
showconsumergroupoffset
./ current directory's bin / kafka - consumer - groups . sh -- bootstrap -server10.63.247.214:29096-- start as "kafka consumer" configuration -- admin panel at console / console - admin . properties -- describe current group group1 status and offsets
createtopic
./bin/kafka-topics.sh ./bin/bootstrap-server-ip ./bin/create-topicxy-topic1-partitions3-replication-factor3 ./bin/console-admin.properties
修改 topic 分区数据
./ bin / kafka-topics.sh -- bootstrap -- topic=topic1 partitions=5 command configure / console/admin.properties
deletetopic
./ bin / kafka - topics . sh -- bootstrap -server ip --topicxytopic3--delete
添加console文件
3.vimconsole- admin . properties
该协议的机制采用SASL协议,并遵循SASL_plain_text格式。该协议的机制采用SASL协议,并遵循SASL_plain_text格式。该协议的配置文件路径设置为org.apache.kafka.common.security.PLAINtext安全配置文件。该系统要求输入测试管理员账号信息,请输入用户名:"testAdmin"并设置密码为"Admin"。系统将测试管理员身份设为空字符串 "" 。
4.vimconsole- client . properties
sasl._protocol = SCRAM - SHA-512_algorithm, security_protocol = SASL__clear-text, sasl._jaas_configuration = org.apache.kafka.common.security scram.ScramLoginModule.mandatory_username="user" password=""_empty_string;
