本篇文章为大家展示了在Spark Streaming job中如何读取Kafka messages及其offsetRange,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。
创新互联是一家集网站建设,红安企业网站建设,红安品牌网站建设,网站定制,红安网站建设报价,网络营销,网络优化,红安网站推广为一体的创新建站企业,帮助传统企业提升企业形象加强企业竞争力。可充分满足这一群体相比中小企业更为丰富、高端、多元的互联网需求。同时我们时刻保持专业、时尚、前沿,时刻以成就客户成长自我,坚持不断学习、思考、沉淀、净化自己,让我们为更多的企业打造出实用型网站。
在Spark Streaming job中读取Kafka topic(s)中的messages时,有时我们会需要同步记录下每次读取的messages的offsetRange。要达到这一目的,下面这两段代码(代码1和代码2)都是正确的,而且是等价的。
代码1(正确):
-----------------------
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
messages.foreachRDD(
new Function<JavaPairRDD<String, String>, Void>() {
@Override
public Void call(JavaPairRDD<String, String> rdd) throws Exception {
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
JavaRDD<String> valueRDD = rdd.values();
long msgNum = processEachRDD(valueRDD, outputFolderPath, definedDuration);
if (msgNum > 0 && zkPathRoot!= null) {
writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);
}
return null;
}
});
代码2(正确):
-----------------------
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
final AtomicReference<OffsetRange[]> offsetRanges=new AtomicReference();
lines = messages.transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
@Override
public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
offsetRanges.set(offsets);
return rdd;
}
}).map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
lines.foreachRDD(new Function<JavaRDD<String>, Void>() {
@Override
public Void call(JavaRDD<String> rdd) throws Exception {
long msgNum = processEachRDD(rdd, outputFolderPath, definedDuration);
if (msgNum > 0 && zkPathRoot!= null) {
OffsetRange[] offsets = offsetRanges.get();
writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);
}
return null;
}
});
但是要注意,下面这两段代码(代码3和代码4)是错误的,它们都会抛出一个exception:java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges
代码3(错误):
-----------------------
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
messages.transform(new Function<JavaPairRDD<String, String>, JavaRDD<String>>() {
@Override
public JavaRDD<String> call(JavaPairRDD<String, String> rdd) throws Exception {
return rdd.values();
}
}).foreachRDD(new Function<JavaRDD<String>, Void>() {
@Override
public Void call(JavaRDD<String> rdd) throws Exception {
long msgNum = processEachRDD(rdd, outputFolderPath, definedDuration);
if (msgNum > 0 && zkPathRoot!= null) {
OffsetRange[] offsets = offsetRanges.get();
writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);
}
return null;
}
});
代码4(错误):
-----------------------
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
messages.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
}).foreachRDD(new Function<JavaRDD<String>, Void>() {
@Override
public Void call(JavaRDD<String> rdd) throws Exception {
long msgNum = processEachRDD(rdd, outputFolderPath, definedDuration);
if (msgNum > 0 && zkPathRoot!= null) {
OffsetRange[] offsets = offsetRanges.get();
writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);
}
return null;
}
});
上述内容就是在Spark Streaming job中如何读取Kafka messages及其offsetRange,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注创新互联行业资讯频道。
分享题目:在SparkStreamingjob中如何读取Kafkamessages及其offsetRange
浏览地址:https://www.cdcxhl.com/article40/gejdho.html
成都网站建设公司_创新互联,为您提供微信公众号、网站建设、自适应网站、建站公司、域名注册、网站维护
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联