我们要自定义输出时,首先继承两个抽象类,一个是 OutputFormat,一个是 RecordWriter
。前者是主要是创建RecordWriter,后者就是主要实现 write方法来将kv写入文件。
成都创新互联是一家专业提供抚顺县企业网站建设,专注与成都网站设计、成都网站建设、外贸网站建设、成都h5网站建设、小程序制作等业务。10年已为抚顺县众多企业、政府机构等服务。创新互联专业的建站公司优惠进行中。
1、需求
将reduce输出的KV中,如果key中包含特定字符串,则将其输出到一个文件中,剩下的KV则输出到另外的文件中。
2、源码
源数据
http://cn.bing.com
http://www.baidu.com
http://www.google.com
http://www.itstar.com
http://www.itstar1.com
http://www.itstar2.com
http://www.itstar3.com
http://www.baidu.com
http://www.sin2a.com
http://www.sin2a.comw.google.com
http://www.sin2desa.com
http://www.sin2desa.comw.google.com
http://www.sina.com
http://www.sindsafa.com
http://www.sohu.com
outputFormat
public class MyOutputFormat extends FileOutputFormat<Text, NullWritable> {
@Override
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
return new MyRecordWriter(taskAttemptContext);
}
}
RecordWriter
public class MyRecordWriter extends RecordWriter<Text, NullWritable> {
private FSDataOutputStream startOut;
private FSDataOutputStream otherOut;
public MyRecordWriter(TaskAttemptContext job) {
try {
FileSystem fs = FileSystem.get(job.getConfiguration());
startOut = fs.create(new Path("G:\\test\\date\\A\\itstarlog\\logdir\\startout.log"));
otherOut = fs.create(new Path("G:\\test\\date\\A\\itstarlog\\logdir\\otherout.log"));
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void write(Text key, NullWritable value) throws IOException, InterruptedException {
String line = key.toString();
//如果key中包含itstar就写入到另外一个文件中
if (line.contains("itstar")) {
this.startOut.writeUTF(line);
} else {
this.otherOut.writeUTF(line);
}
}
@Override
public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
this.startOut.close();
this.otherOut.close();
}
}
mapper
public class MyOutputMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(value, NullWritable.get());
}
}
reducer
public class MyOutputReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
Text k = new Text();
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
String line = key.toString();
line = line + "\r\n";
k.set(line);
context.write(k, NullWritable.get());
}
}
driver
ublic class MyDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{"G:\\test\\date\\A\\itstarlog\\A\\other.log", "G:\\test\\date\\A\\itstarlog\\logresult\\"};
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(MyDriver.class);
job.setMapperClass(MyOutputMapper.class);
job.setReducerClass(MyOutputReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//自定义输出的实现子类,也是继承FileOutputFormat
job.setOutputFormatClass(MyOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
//这个路径输出的是job的执行成功successs文件的输出路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
分享文章:十五、MapReduce--自定义output输出
URL标题:https://www.cdcxhl.com/article24/gisece.html
成都网站建设公司_创新互联,为您提供微信公众号、自适应网站、营销型网站建设、网站收录、企业网站制作、静态网站
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联