本篇内容主要讲解“Kafka Consumer使用要注意什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Kafka Consumer使用要注意什么”吧!
创新互联公司服务项目包括凤县网站建设、凤县网站制作、凤县网页制作以及凤县网络营销策划等。多年来,我们专注于互联网行业,利用自身积累的技术优势、行业经验、深度合作伙伴关系等,向广大中小型企业、政府机构等提供互联网行业的解决方案,凤县网站推广取得了明显的社会效益与经济效益。目前,我们服务的客户以成都为中心已经辐射到凤县省份的部分城市,未来相信会继续扩大服务区域并继续获得客户的支持与信任!
一、特点:
不用关心offset, 会自动的读zookeeper中该Consumer group的last offset
二、注意事项
1. 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,
所以consumer数不要大于partition数
2. 如果consumer比partition少,一个consumer会对应于多个partitions,
这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀
最好partiton数目是consumer数目的整数倍,所以partition数目很重要,
比如取24,就很容易设定consumer数目
3. 如果consumer从多个partition读到数据,不保证数据间的顺序性,
kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同
4. 增减consumer,broker,partition会导致rebalance,
所以rebalance后consumer对应的partition会发生变化
5. High-level接口中获取不到数据的时候是会block的
三、代码如下:
package kafkatest.kakfademo;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class ConsumerDemo1 {
public static void main(String[] args) {
ConsumerDemo1 demo = new ConsumerDemo1();
demo.test();
}
@SuppressWarnings("rawtypes")
public void test() {
String topicName = "test";
int numThreads = 1;
Properties properties = new Properties();
properties.put("zookeeper.connect", "hadoop0:2181");// 声明zk
properties.put("group.id", "group--demo");// 必须要使用别的组名称,
// 如果生产者和消费者都在同一组,则不能访问同一组内的topic数据
ConsumerConnector consumer = Consumer
.createJavaConsumerConnector(new ConsumerConfig(properties));
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topicName, numThreads); // 一次从主题中获取一个数据
Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer
.createMessageStreams(topicCountMap);
// 获取每次接收到的这个数据
List<KafkaStream<byte[], byte[]>> streams = messageStreams
.get(topicName);
// now launch all the threads
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.execute(new ConsumerMsgTask(stream, threadNumber));
threadNumber++;
}
}
class ConsumerMsgTask implements Runnable {
private KafkaStream m_stream;
private int m_threadNumber;
public ConsumerMsgTask(KafkaStream stream, int threadNumber) {
m_threadNumber = threadNumber;
m_stream = stream;
}
public void run() {
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
long offset = 0;
try {
while (it.hasNext())
offset = it.next().offset();
byte[] bytes = it.next().message();
String msg = new String(bytes, "UTF-8");
System.out.print("offset: " + offset + ",msg:" + msg);
System.out.println("Shutting down Thread: " + m_threadNumber);
} catch (UnsupportedEncodingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
四、实验验证
到此,相信大家对“Kafka Consumer使用要注意什么”有了更深的了解,不妨来实际操作一番吧!这里是创新互联网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
当前题目:KafkaConsumer使用要注意什么
本文路径:https://www.cdcxhl.com/article28/jodcjp.html
成都网站建设公司_创新互联,为您提供静态网站、品牌网站设计、App设计、网站收录、网站制作、响应式网站
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联