本篇内容介绍了“如何使用消息队列”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
做网站、成都网站设计的关注点不是能为您做些什么网站,而是怎么做网站,有没有做好网站,给成都创新互联一个展示的机会来证明自己,这并不会花费您太多时间,或许会给您带来新的灵感和惊喜。面向用户友好,注重用户体验,一切以用户为中心。
已发布的消息保存在一组服务器中,称为Kafka集群。集群中的每个服务器都是一个Broker。
通过Topic机制对消息进行分类,可以认为每个Topic就是一个队列。
每个Topic可以有多个分区,主要为了提高并发而设计。相同Topic下不同Partition可以并发接收消息,同时也能供消费者并发拉取消息。有多少Partition就有多少并发量。
在Kafka服务器上,分区是以文件目录的形式存在的。每个分区目录中,Kafka会按配置大小及配置周期将分区拆分成多个段文件(LogSegment),每个段由三部分组成:
- 日志文件:*.log - 位移索引文件:*.index - 时间索引文件:*.timeindex
其中*.log
用于存储消息本身的数据内容,*.index
存储消息在文件中的位置(包括消息的逻辑offset和物理存储offset),*.timeindex
存储消息创建时间和对应逻辑地址的映射关系。
将分区拆分成多个段是为了控制存储文件大小。可以很方便的通过操作系统mmap
机制映射到内存中,提高写入和读取效率。同时还有一个好处就是,当系统要清除过期数据时,可以直接将过期的段文件删除。
如果每个消息都要在index
中保存位置信息,index
文件自身大小也很容易变的很大。所以Kafka将index
设计为稀疏索引来减小index
文件的大小。
消息冗余数量。不能超过集群中Broker的数量。
# 创建Topic # --topic 主题名称 避免使用[_]及[.]号 # --replication-factor 副本数量(不能超过broker节点数) # --partitions 分区数量(并发) ./bin/kafka-topics.sh --create \ --topic UserDataQueue \ --replication-factor 3 \ --partitions 5 \ --bootstrap-server localhost:9092,localhost:9093,localhost:9094 # 查看Topic ./bin/kafka-topics.sh --list \ --bootstrap-server localhost:9092,localhost:9093,localhost:9094 # 修改Topic # 删除Topic
# 发送消息 # --topic 指定目标Topic ./bin/kafka-console-producer.sh \ --topic UserDataQueue \ --bootstrap-server localhost:9092,localhost:9093,localhost:9094 # 拉取消息 # --from-beginning 从头开始(获取现有的全量数据) ./bin/kafka-console-consumer.sh \ --topic UserDataQueue \ --bootstrap-server localhost:9092,localhost:9093,localhost:9094 \ --from-beginning
Kafka集群依赖于Zookeeper。
# 需要修改的参数 # the directory where the snapshot is stored. dataDir=/kafka/zkdata # the port at which the clients will connect clientPort=2182
# 启动 ./bin/zookeeper-server-start.sh -daemon /kafka/zookeeper.properties
# 需修改参数 # The id of the broker. This must be set to a unique integer for each broker. broker.id=1 # 同一集群内ID必须唯一 # The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://localhost:9092 # 同一主机的话,端口号不能相同 # A comma separated list of directories under which to store log files log.dirs=/kafka/data01 # 日志存储目录,需做隔离 # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=localhost:2182 # Zookeeper连接地址,参见2.1 zk配置
# Kafka启动 # broker-1 ./bin/kafka-server-start.sh -daemon /kafka/server01.properties # broker-2 ./bin/kafka-server-start.sh -daemon /kafka/server02.properties # broker-3 ./bin/kafka-server-start.sh -daemon /kafka/server03.properties
PrettyZoo 是一个基于 Apache Curator 和 JavaFX 实现的 Zookeeper 图形化管理客户端。
由下图可以看到,集群3个Broker均正常启动。
管理Topic,Topic消息,消费组等的Kafka可视化系统,相关文档:https://akhq.io/
一个简单且高效的监控系统。相关文档:http://www.kafka-eagle.org/index.html
Kafka Eagle 自带监控大屏。
Spring Boot版本:2.4.4。
官方示例:https://github.com/spring-projects/spring-kafka/tree/main/samples
implementation 'org.springframework.kafka:spring-kafka'
spring: kafka: bootstrap-servers: localhost:9092,localhost:9093,localhost:9094 producer: client-id: kfk-demo retries: 3
@RestController public class IndexController { @Autowired KafkaTemplate<Object, Object> kafkaTemplate; @GetMapping public String index() { int rdm = new Random().nextInt(1000); kafkaTemplate.send("UserDataQueue", new UserData("", rdm)); return "hello world"; } @GetMapping("index2") public String index2() { // 发送字符串方式 kafkaTemplate.send("UserDataTopic", new Gson().toJson(new UserData("apple", 23))); return "ok"; } }
@Component @KafkaListener( id = "kfk-demo-userdata", topics = {"UserDataQueue"}, groupId = "kfk-demo-group", clientIdPrefix = "kfk-demo-client") public class KfkListener { @KafkaHandler public void process(@Payload UserData ud, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) { System.out.println(String.format("topic: %s, partition: %d, userData: %s", topic, partition, ud)); } @KafkaHandler(isDefault = true) public void process(Object obj) { System.out.println(obj); } } // 接收字符串方式 @Slf4j @Component @KafkaListener(id = "kfk-demo2", topics = {"UserDataTopic"}) public class KfkUserDataTopicListener { @KafkaHandler public void process(String userDataStr) { UserData userData = new Gson().fromJson(userDataStr, UserData.class); log.info("username: {}, age: {}", userData.getUsername(), userData.getAge()); } }
@Configuration public class KafkaConfig { @Bean public NewTopic userDataTopic() { return new NewTopic("UserDataTopic", 3, (short) 1); } }
“如何使用消息队列”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注创新互联网站,小编将为大家输出更多高质量的实用文章!
本文标题:如何使用消息队列
当前路径:https://www.cdcxhl.com/article34/gseise.html
成都网站建设公司_创新互联,为您提供企业建站、云服务器、建站公司、网站设计、标签优化、定制开发
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联