OutputFormat在java中怎么实现自定义

本篇文章给大家分享的是有关OutputFormat在java 中怎么实现自定义,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

昌图网站制作公司哪家好,找成都创新互联!从网页设计、网站建设、微信开发、APP开发、成都响应式网站建设公司等网站项目制作,到程序开发,运营维护。成都创新互联自2013年创立以来到现在10年的时间,我们拥有了丰富的建站经验和运维经验,来保证我们的工作的顺利进行。专注于网站建设就选成都创新互联

java 中 自定义OutputFormat

实例代码:

package com.ccse.hadoop.outputformat; 
 
import java.io.IOException; 
import java.net.URI; 
import java.net.URISyntaxException; 
import java.util.StringTokenizer; 
 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FSDataOutputStream; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.JobContext; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.OutputCommitter; 
import org.apache.hadoop.mapreduce.OutputFormat; 
import org.apache.hadoop.mapreduce.RecordWriter; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; 
 
 
public class MySelfOutputFormatApp { 
   
  public final static String INPUT_PATH = "hdfs://chaoren1:9000/mapinput"; 
  public final static String OUTPUT_PATH = "hdfs://chaoren1:9000/mapoutput"; 
  public final static String OUTPUT_FILENAME = "/abc"; 
   
  public static void main(String[] args) throws IOException, URISyntaxException,  
    ClassNotFoundException, InterruptedException { 
    Configuration conf = new Configuration(); 
    FileSystem fileSystem = FileSystem.get(new URI(OUTPUT_PATH), conf); 
    fileSystem.delete(new Path(OUTPUT_PATH), true); 
     
    Job job = new Job(conf, MySelfOutputFormatApp.class.getSimpleName()); 
    job.setJarByClass(MySelfOutputFormatApp.class); 
     
    FileInputFormat.setInputPaths(job, new Path(INPUT_PATH)); 
    job.setMapperClass(MyMapper.class); 
    job.setMapOutputKeyClass(Text.class); 
    job.setMapOutputValueClass(LongWritable.class); 
     
    job.setReducerClass(MyReducer.class); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(LongWritable.class); 
    job.setOutputFormatClass(MyselfOutputFormat.class); 
     
    job.waitForCompletion(true); 
  } 
   
  public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { 
 
    private Text word = new Text(); 
    private LongWritable writable = new LongWritable(1); 
     
    @Override 
    protected void map(LongWritable key, Text value, 
        Mapper<LongWritable, Text, Text, LongWritable>.Context context) 
        throws IOException, InterruptedException { 
      if (value != null) { 
        String line = value.toString(); 
        StringTokenizer tokenizer = new StringTokenizer(line); 
        while (tokenizer.hasMoreTokens()) { 
          word.set(tokenizer.nextToken()); 
          context.write(word, writable); 
        } 
      } 
    } 
     
  } 
   
  public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { 
 
    @Override 
    protected void reduce(Text key, Iterable<LongWritable> values, 
        Reducer<Text, LongWritable, Text, LongWritable>.Context context) 
        throws IOException, InterruptedException { 
      long sum = 0;  
      for (LongWritable value : values) { 
        sum += value.get(); 
      } 
      context.write(key, new LongWritable(sum)); 
    } 
  } 
 
  public static class MyselfOutputFormat extends OutputFormat<Text, LongWritable> { 
 
    private FSDataOutputStream outputStream = null; 
     
    @Override 
    public RecordWriter<Text, LongWritable> getRecordWriter( 
        TaskAttemptContext context) throws IOException, 
        InterruptedException { 
      try { 
        FileSystem fileSystem = FileSystem.get(new URI(MySelfOutputFormatApp.OUTPUT_PATH), context.getConfiguration()); 
        //指定文件的输出路径 
        final Path path = new Path(MySelfOutputFormatApp.OUTPUT_PATH  
                     + MySelfOutputFormatApp.OUTPUT_FILENAME); 
        this.outputStream = fileSystem.create(path, false); 
      } catch (URISyntaxException e) { 
        e.printStackTrace(); 
      } 
      return new MySelfRecordWriter(outputStream); 
    } 
 
    @Override 
    public void checkOutputSpecs(JobContext context) throws IOException, 
        InterruptedException { 
    } 
 
    @Override 
    public OutputCommitter getOutputCommitter(TaskAttemptContext context) 
        throws IOException, InterruptedException { 
      return new FileOutputCommitter(new Path(MySelfOutputFormatApp.OUTPUT_PATH), context); 
    } 
     
  } 
   
  public static class MySelfRecordWriter extends RecordWriter<Text, LongWritable> { 
 
    private FSDataOutputStream outputStream = null; 
     
    public MySelfRecordWriter(FSDataOutputStream outputStream) { 
      this.outputStream = outputStream; 
    } 
     
    @Override 
    public void write(Text key, LongWritable value) throws IOException, 
        InterruptedException { 
      this.outputStream.writeBytes(key.toString()); 
      this.outputStream.writeBytes("\t"); 
      this.outputStream.writeLong(value.get()); 
    } 
 
    @Override 
    public void close(TaskAttemptContext context) throws IOException, 
        InterruptedException { 
      this.outputStream.close(); 
    } 
     
  } 
   
} 

 2.OutputFormat是用于处理各种输出目的地的。

2.1 OutputFormat需要写出去的键值对,是来自于Reducer类,是通过RecordWriter获得的。

2.2 RecordWriter中的write(...)方法只有k和v,写到哪里去哪?这要通过单独传入OutputStream来处理。write就是把k和v写入到OutputStream中的。

2.3 RecordWriter类位于OutputFormat中的。因此,我们自定义的OutputFromat必须继承OutputFormat类型。那么,流对象必须在getRecordWriter(...)方法中获得。

以上就是OutputFormat在java 中怎么实现自定义,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注创新互联行业资讯频道。

网页题目:OutputFormat在java中怎么实现自定义
分享链接:https://www.cdcxhl.com/article42/jhdhhc.html

成都网站建设公司_创新互联,为您提供电子商务企业建站网站收录品牌网站建设手机网站建设品牌网站制作

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联

网站优化排名