hadoop中mapreduce的常用类(一)-创新互联

云智慧(北京)科技有限公司陈鑫

创新互联公司主要从事成都网站建设、网站制作、网页设计、企业做网站、公司建网站等业务。立足成都服务湛河,十载网站建设经验,价格优惠、服务专业,欢迎来电咨询建站服务:028-86922220

写这个文章的时候才意识到新旧API是同时存在于1.1.2的hadoop中的。以前还一直纳闷儿为什么有时候是jobClient提交任务,有时是Job...不管API是否更新,下面这些类也还是存在于API中的,经过自己跟踪源码,发现原理还是这些。只不过进行了重新组织,进行了一些封装,使得扩展性更好。所以还是把这些东西从记事本贴进来吧。

关于这些类的介绍以及使用,有的是在自己debug中看到的,多数为纯翻译API的注释,但是翻译的过程受益良多。

GenericOptionsParser

parseGeneralOptions(Optionsopts, Configuration conf, String[] args)解析命令行参数
   GenericOptionsParser是为hadoop框架解析命令行参数的工具类。它能够辨认标准的命令行参数,使app能够轻松指定namenode,jobtracker,以及额外的配置资源或信息等。它支持的功能有:
    -conf 指定配置文件;
    -D 指定配置信息;
    -fs    指定namenode
    -jt   指定jobtracker
    -files  指定需要copy到MR集群的文件,以逗号分隔
    -libjars指定需要copy到MR集群的classpath的jar包,以逗号分隔
    -archives指定需要copy到MR集群的压缩文件,以逗号分隔,会自动解压缩

1.String[] otherArgs = new GenericOptionsParser(job, args)

2.     .getRemainingArgs();

3.if (otherArgs.length != 2) {

4.   System.err.println("Usage: wordcount <in> <out>");

5.   System.exit(2);

6.}

ToolRunner

用来跑实现Tool接口的工具。它与GenericOptionsParser合作来解析命令行参数,只在此次运行中更改configuration的参数。
Tool

处理命令行参数的接口。Tool是MR的任何tool/app的标准。这些实现应该代理对标准命令行参数的处理。下面是典型实现:

1.public class MyApp extends Configured implements Tool {

2.

3.   public int run(String[] args) throws Exception {

4.     // 即将被ToolRunner执行的Configuration

5.     Configuration conf = getConf();

6.

7.     // 使用conf建立JobConf

8.     JobConf job = new JobConf(conf, MyApp.class);

9.

10.     // 执行客户端参数

11.     Path in = new Path(args[1]);

12.     Path out = new Path(args[2]);

13.

14.     // 指定job相关的参数

15.     job.setJobName("my-app");

16.     job.setInputPath(in);

17.     job.setOutputPath(out);

18.     job.setMapperClass(MyApp.MyMapper.class);

19.     job.setReducerClass(MyApp.MyReducer.class);

20.*

21.     // 提交job,然后监视进度直到job完成

22.     JobClient.runJob(job);

23.   }

24.

25.   public static void main(String[] args) throws Exception {

26.     // 让ToolRunner 处理命令行参数

27.     int res = ToolRunner.run(new Configuration(), new Sort(), args);  //这里封装了GenericOptionsParser解析args

28.

29.     System.exit(res);

30.   }

31. }

MultipleOutputFormat
自定义输出文件名称或者说名称格式。在jobconf中setOutputFormat(MultipleOutputFormat的子类)就行了。而不是那种part-r-00000啥的了。。。并且可以分配结果到多个文件中。
     MultipleOutputFormat继承了FileOutputFormat, 允许将输出数据写进不同的输出文件中。有三种应用场景:

a. 最少有一个reducer的mapreduce任务。这个reducer想要根据实际的key将输出写进不同的文件中。假设一个key编码了实际的key和为实际的key指定的位置

b. 只有map的任务。这个任务想要把输入文件或者输入内容的部分名称设为输出文件名。

c. 只有map的任务。这个任务为输出命名时,需要依赖keys和输入文件名。

1.//这里是根据key生成多个文件的地方,可以看到还有value,name等参数

2.@Override

3.protected String generateFileNameForKeyValue(Text key,

4.     IntWritable value, String name) {

5.   char c = key.toString().toLowerCase().charAt(0);

6.   if (c >= 'a' && c <= 'z') {

7.     return c + ".txt";

8.   }

9.   return "result.txt";

10.}

DistributedCache

在集群中快速分发大的只读文件。DistributedCache是MR用来缓存app需要的诸如text,archive,jar等的文件的。app通过jobconf中的url来指定需要缓存的文件。它会假定指定的这个文件已经在url指定的对应位置上了。在job在node上执行之前,DistributedCache会copy必要的文件到这个slave node。它的功效就是为每个job只copy一次,而且copy到指定位置,能够自动解压缩。

DistributedCache可以用来分发简单的只读文件,或者一些复杂的例如archive,jar文件等。archive文件会自动解压缩,而jar文件会被自动放置到任务的classpath中(lib)。分发压缩archive时,可以指定解压名称如:dict.zip#dict。这样就会解压到dict中,否则默认是dict.zip中。

文件是有执行权限的。用户可以选择在任务的工作目录下建立指向DistributedCache的软链接。

1.DistributedCache.createSymlink(conf);

2.    DistributedCache.addCacheFile(new Path("hdfs://host:port/absolute-path#link-name").toUri(), conf);

DistributedCache.createSymlink(Configuration)方法让DistributedCache在当前工作目录下创建到缓存文件的符号链接。则在task的当前工作目录会有link-name的链接,相当于快捷方法,链接到expr.txt文件,在setup方法使用的情况则要简单许多。或者通过设置配置文件属性mapred.create.symlink为yes。分布式缓存会截取URI的片段作为链接的名字。例如,URI是hdfs://namenode:port/lib.so.1#lib.so,则在task当前工作目录会有名为lib.so的链接,它会链接分布式缓存中的lib.so.1

DistributedCache会跟踪修改缓存文件的timestamp。

下面是使用的例子, 为应用app设置缓存

1. 将需要的文件copy到FileSystem中:

1.  $ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat

2.  $ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip

3.  $ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar

4.  $ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar

5.  $ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz

6.  $ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz

2. 设置app的jobConf:

7.  JobConf job = new JobConf();

8.  DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),

9.                                job);

10.  DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);

11.  DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);

12.  DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);

13.  DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);

14.  DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job);

3. 在mapper或者reducer中使用缓存文件:

15.  public static class MapClass extends MapReduceBase

16.  implements Mapper<K, V, K, V> {

17.

18.    private Path[] localArchives;

19.    private Path[] localFiles;

20.

21.    public void configure(JobConf job) {

22.      // 得到刚刚缓存的文件

23.      localArchives = DistributedCache.getLocalCacheArchives(job);

24.      localFiles = DistributedCache.getLocalCacheFiles(job);

25.    }

26.

27.    public void map(K key, V value,

28.                    OutputCollector<K, V>; output, Reporter reporter)

29.    throws IOException {

30.      // 使用缓存文件

31.      // ...

32.      // ...

33.      output.collect(k, v);

34.    }

35.  }

 它跟GenericOptionsParser的部分功能有异曲同工之妙。

PathFilter+ 通配符。accept(Path path)筛选path是否通过。

另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。

标题名称:hadoop中mapreduce的常用类(一)-创新互联
文章链接:https://www.cdcxhl.com/article34/cooepe.html

成都网站建设公司_创新互联,为您提供建站公司搜索引擎优化网站导航网站建设网站收录网站改版

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联

商城网站建设