这篇文章主要为大家展示了“python如何实现对kafka的基本操作”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“python如何实现对kafka的基本操作”这篇文章吧。
成都创新互联是一家专业的成都网站建设公司,我们专注网站建设、网站制作、网络营销、企业网站建设,卖链接,广告投放为企业客户提供一站式建站解决方案,能带给客户新的互联网理念。从网站结构的规划UI设计到用户体验提高,创新互联力求做到尽善尽美。
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time
bootstrap_servers = []
class OperateKafka:
def init(self,bootstrap_servers,topic):
self.bootstrap_servers = bootstrap_servers
self.topic = topic
"""生产者""" def produce(self): producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers) for i in range(4): msg = "msg%d" %i producer.send(self.topic,key=str(i),value=msg) producer.close() """一个消费者消费一个topic""" def consume(self): #consumer = KafkaConsumer(self.topic,auto_offset_reset='earliest',group_id="testgroup",bootstrap_servers=self.bootstrap_servers) consumer = KafkaConsumer(self.topic,bootstrap_servers=self.bootstrap_servers) print consumer.partitions_for_topic(self.topic) #获取test主题的分区信息 print consumer.topics() #获取主题列表 print consumer.subscription() #获取当前消费者订阅的主题 print consumer.assignment() #获取当前消费者topic、分区信息 print consumer.beginning_offsets(consumer.assignment()) #获取当前消费者可消费的偏移量 consumer.seek(TopicPartition(topic=self.topic, partition=0), 1) #重置偏移量,从第1个偏移量消费 for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic,message.partition,message.offset, message.key,message.value)) """一个消费者订阅多个topic """ def consume2(self): consumer = KafkaConsumer(bootstrap_servers=['192.168.124.201:9092']) consumer.subscribe(topics=('TEST','TEST2')) #订阅要消费的主题 print consumer.topics() print consumer.position(TopicPartition(topic='TEST', partition=0)) #获取当前主题的最新偏移量 for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value)) """消费者(手动拉取消息)""" def consume3(self): consumer = KafkaConsumer(group_id="mygroup",max_poll_records=3,bootstrap_servers=['192.168.124.201:9092']) consumer.subscribe(topics=('TEST','TEST2')) while True: message = consumer.poll(timeout_ms=5) #从kafka获取消息 if message: print message time.sleep(1)
def main():
bootstrap_servers = ['192.168.124.201:9092']
topic = "TEST"
operateKafka = OperateKafka(bootstrap_servers,topic)
operateKafka.produce()
#operateKafka.consume()
#operateKafka.consume2()
operateKafka.consume3()
main()
以上是“python如何实现对kafka的基本操作”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注创新互联行业资讯频道!
文章题目:python如何实现对kafka的基本操作
标题路径:https://www.cdcxhl.com/article28/jdcgjp.html
成都网站建设公司_创新互联,为您提供用户体验、自适应网站、网站设计、电子商务、建站公司、品牌网站建设
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联