kafka干货(五):kakfka的python客户端----Confluent-kafka
该kafka客户端由Confluent公司维护。此外,在产品系列中还包括c/c++语言版本、Java版本以及Go语言版本等。
该软件具備了企業級别的支持保障。
该Python模块名为couflent-kafka,并基于librdkafka实现了轻量化的封装。值得注意的是(librdkf) kafka库本身也是基于C/C++开发的第三方库。相比Kafka Python API而言,在易用性上有显著提升
参考:kafka干货(四):kafka-python和confluent-kafka比较
#安装
##第一步 安装librdkafka
通过git命令克隆仓库:git clone https://github.com/edenhill/librdkafka.git
切换到克隆后的librdkafka目录:cd librdkafka/
执行配置脚本:./configure
运行编译指令:make
执行安装命令:sudo make install
####安装confluent-kafka
pip install confluent-kafka
#测试
##Producer
from confluent_kafka import Producer
##producer配置,dict格式
p = Producer({'bootstrap.servers': '192.168.56.101,192.168.56.103,192.168.56.102'})
##回调函数
def delivery_report(err, msg):
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
##发送
for data in ['hello','word’]:
p.produce('mytopic', data.encode('utf-8'), callback=delivery_report)
p.poll(10) ##等待返回结果最大时常,单位秒
p.flush()
###参数详解
Producer()
指定生产者配置,参数类型:dict。
len(int)
等待发送的消息数,参数类型:int。
flush(timeout)
发送调用poll(),发送消息,直到len()为0。参数类型:timeout
poll(timeout) 按预设的时间间隔持续向目标发送数据包,并根据返回结果执行相应的操作。参数为预定的时间间隔。
produce()
发送消息,支持回调函数。参数包含:
- topic
- value
- key
- partition
- on_delivery(err,msg)
- timestamp
- dict|list
##Consumer
from confluent_kafka import Consumer, KafkaError
c = Consumer({
'bootstrap.servers': '192.168.56.101',
'group.id': 'mygroup',
'default.topic.config': {
'auto.offset.reset': 'smallest'
}
})
c.subscribe(['mytopic'])
while True:
msg = c.poll()
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(msg.error())
break
print('Received message: {}'.format(msg.value().decode('utf-8')))
c.close()
###参数详解
Consumer(config)
指定消费者配置,参数类型:dict。bootstrap.servers and group.id是必须配置。
on_commit(err, partitions)
回调函数
close()
关闭消费者,主要有三步:
- 停止消费。
- 提交偏移量。前提是enable.auto.commit=true。
- 离开消费者组。
该函数用于接收消息或偏移量进行提交操作。若无输入参数,则默认会将当前分区的偏移量进行提交处理。但需注意的是enable.auto.commit必须设置为true方能正常运行。其中asynchronous=True表示执行异步操作,“不回会塞”则表明该操作完成后不会阻塞当前执行流程
poll(timeout)
消费消息,参数为等待超时时间。
该函数用于实现批量处理消息的消耗操作,在调用时可指定每批次处理的消息数量以及超时等待时间等参数设置。每次调用该函数后将返回一个列表类型的结果;若操作中出现异常情况则会触发并返回相应的错误信息
- RuntimeError - 如果消费者处于关闭状态
- KafkaError - 如果kafka内部服务发生底层服务崩溃
- ValueError - 如果消息数量超过百万
pause(partitions)
暂停。
resume(partitions)
恢复。
position(partitions[, timeout=None])
返回分区偏移量。
get(partition) 指定偏移量使用。参数:TopicPartition(主题, [分段], [偏移])。
Store Offsets()负责提交偏移量。参数包括message或offset。前提是 enable.auto.offset.store = false。
注册消费**subscribe(topics[, listener=None])**中指定消费的主题。参数:list。可使用正则匹配器包含两个回调函数:
- on_assign(consumer, partitions)
- on_revoke(consumer, partitions)
unassign()
删除当前分区分配
unsubscribe()
删除订阅
下班临近时,请注意:关于序列化相关的topic操作、配置方面的修改以及返回报错处理等内容将会在近期进行更新
更多文章关注公众号
