主要是要让它来格式化:GroupMetadataManager.OffsetsMessageFormatter
成都创新互联公司是一家专注于成都做网站、网站制作与策划设计,皮山网站建设哪家好?成都创新互联公司做网站,专注于网站建设十余年,网设计领域的专业建站公司;建站业务涵盖:皮山等地区。皮山做网站价格咨询:18980820575
最后用看了它的源码,把这部分挑选出来,自己解析了得到的byte[]。核心代码如下:
// com.sina.mis.app.ConsumerInnerTopic ConsumerRecords<byte[], byte[]> records = consumer.poll(512); for (ConsumerRecord<byte[], byte[]> record : records) { Object offsetKey = GroupMetadataManager.readMessageKey(ByteBuffer.wrap(record.key())); if (offsetKey instanceof OffsetKey) { GroupTopicPartition groupTopicPartition = ((OffsetKey) offsetKey).key(); OffsetAndMetadata value = GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(record.value())); LOG.info(groupTopicPartition.toString() + "---:---" + value); } else { LOG.info("############:{}", offsetKey); } }
#Create consumer configecho "exclude.internal.topics=false" > /tmp/consumer.config#Only consume the latest consumer offsets./kafka-console-consumer.sh --consumer.config /tmp/consumer.config \ --formatter "kafka.server.OffsetManager\$OffsetsMessageFormatter" \ --zookeeper localhost:2181 --topic __consumer_offsets
#Create consumer configecho "exclude.internal.topics=false" > /tmp/consumer.config#Only consume the latest consumer offsets./kafka-console-consumer.sh --consumer.config /tmp/consumer.config \ --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" \ --zookeeper 10.39.40.98:2181/kafka10 --topic __consumer_offsets
Committing and fetching consumer offsets in Kafka : 使用Java API获取Consumer Offset。
Since version 0.8.2, kafka has had the ability to store consumer offsets in an internal compacted topic called __consumer_offsets
Kafka 用 Yammer Metrics来存储Server、Client的数据。可以使用插件式的方式获取这些数据,写入到CSV文件。
Kafka 实现了 KafkaCSVMetricsReporter.scala
,可以 将metrics写入到CSV文件。
由于没有实现写入ganglia的实现类,所以无法直接从Kafka将metrics写入到ganglia。
document
为什么某个topic的HDFS的数据多余Kafka自己统计的流量40%左右。
sina的KafkaProxy都使用了snappy压缩后入kafka。
猜想 30%-40%
需要测试一下:找一批HDFS的文件,写入Kafka,消费出来,写成文件,看看大小差别。
High level Consumer的API,默认以Range的方式分配,还有另外一个是RoundRobin。
这是有DefaultPartitioner
决定的。
If a partition is specified in the record, use it.
If no partition is specified but a key is present choose a partition based on a hash of the key
If no partition or key is present choose a partition in a round-robin fashion
中文:
有key就hash
没key就Round-robin
0.8.0 版本在没key的时候,是Random的方式。
90%的broker GC暂停时间为21ms左右。每秒进行的young GC小于1次
传统网络IO流程,一次传送过程:
从Disk把数据读到内核区的Read Buffer。
把数据从内核区到用户区Buffer。
再把数据写入到内核区的Socket Buffer上。
把数据从Socket Buffer复制到网卡的NIC Buffer上。
Kafka少了中间两步,这就是sendfile技术:
依赖OS文件系统的页缓存 (当上层有写操作时,操作系统只是将数据写入PageCache,同时标记Page属性为Dirty。当读操作发生时,先从PageCache中查找,如果发生缺页才进行磁盘调度,最终返回需要的数据。实际上PageCache是把尽可能多的空闲内存都当做了磁盘缓存来使用。同时如果有其他进程申请内存,回收PageCache的代价又很小。
总结:依赖OS的页缓存能大量减少IO,高效利用内存来作为缓存)新航道雅思
不使用JVM缓存数据, 内存利用率高
顺序IO以及O(1)常量时间get、put消息
sendfile技术(零拷贝)
每次发送数据时,Producer都是send()
之后就认为已经发送出去了,但其实大多数情况下消息还在内存的MessageSet当中,尚未发送到网络,这时候如果Producer挂掉,那就会出现丢数据的情况。
解决办法: ack机制,一般设置为acks=1,消息只需要被Leader接受并确认即可,这样同时保证了可靠性和效率。
MessageSet手段批量顺序写入
数据支持压缩
异步发送
push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。
log:字节流,sendfile、zero copy技术
index:稀疏索引,mmap的数据结构-本质是个类,二分查找寻找到offset。
一种非常常用的选举leader的方式是“Majority Vote”(“少数服从多数”)。
刚创建的topic一般"preferred replica"是leader。在ZooKeeper中动态维护了一个ISR(in-sync replicas),这个ISR里的所有Replica都跟上了leader,只有ISR里的成员才有被选为Leader的可能。
所有Partition的Leader选举都由controller决定。controller会将Leader的改变直接通过RPC的方式,通知需要做此修改的Broker。
那么Controller是如何选举leader的?
如果当前ISR中有至少一个Replica还幸存,则选择其中一个作为新Leader。
如果replica都不在ISR列表里面,选择该Partition中任意一个幸存的Replica作为新的Leader以及ISR(该场景下可能会有潜在的数据丢失)。
如果该Partition的所有Replica都宕机了,则将新的Leader设置为-1。
Producer在发布消息到某个Partition时,先通过ZooKeeper找到该Partition的Leader,然后写入数据。
Consumer(0.8)通过zk找到leader,读取数据。
Consumer(0.10)通过Coordinator找到Leader,读取数据。
不会。扩容的时候,新的leader需要从旧有的broker复制数据,跟上以后,会切换成leader。
这个时间期间,Producer、Consumer会向旧有的leader通信。
如果请求过来的topic是__consumer_offsets,那就启动OffsetManager的异步读
这个异步读会一直读取__consumer_offsets并把消息解码成消费进度放入缓存
I/O线程可以处理请求的队列大小,若实际请求数超过此大小,网络线程将停止接收新的请求。
单机partition数的最大值:100 * broker * replica
(if you care about latency, it’s probably a good idea to limit the number of partitions per broker to 100 x b x r, where b is the number of brokers in a Kafka cluster and r is the replication factor.)
分享文章:Kafka0.10问题点滴
文章起源:https://www.cdcxhl.com/article30/gcdsso.html
成都网站建设公司_创新互联,为您提供云服务器、网页设计公司、软件开发、建站公司、微信公众号、外贸建站
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联