在IDEA中编写spark程序-创新互联

这里以一个scala版本的word count 程序为例:
①创建一个maven项目:
在IDEA中编写spark程序在IDEA中编写spark程序
②填写maven的GAV:
在IDEA中编写spark程序
③填写项目名称:
在IDEA中编写spark程序
④ 创建好 maven 项目后,点击 Enable Auto-Import
在IDEA中编写spark程序
⑤配置pom.xml文件:

网站建设哪家好,找创新互联公司!专注于网页设计、网站建设、微信开发、微信小程序定制开发、集团企业网站建设等服务项目。为回馈新老客户创新互联还提供了湖北免费建站欢迎大家使用!
<properties>
    <project.build.sourceEncoding>UTF8</project.build.sourceEncoding> 
  <maven.compiler.source>1.8</maven.compiler.source> 
  <maven.compiler.target>1.8</maven.compiler.target> 
  <encoding>UTF-8</encoding> 
  <scala.version>2.11.8</scala.version> 
  <spark.version>2.3.1</spark.version> 
  <hadoop.version>2.7.6</hadoop.version> 
  <scala.compat.version>2.11</scala.compat.version> 
 </properties>
 <dependencies>
  <dependency>
  <groupId>org.scala-lang</groupId> 
  <artifactId>scala-library</artifactId> 
  <version>${scala.version}</version> 
  </dependency>
  <dependency>
  <groupId>org.apache.spark</groupId> 
  <artifactId>spark-core_2.11</artifactId> 
  <version>${spark.version}</version> 
  </dependency>
  <dependency>
  <groupId>org.apache.spark</groupId> 
  <artifactId>spark-sql_2.11</artifactId> 
  <version>${spark.version}</version> 
  </dependency>
  <dependency>
  <groupId>org.apache.spark</groupId> 
  <artifactId>spark-streaming_2.11</artifactId> 
  <version>${spark.version}</version> 
  </dependency>
  <dependency>
  <groupId>org.apache.spark</groupId> 
  <artifactId>spark-graphx_2.11</artifactId> 
  <version>${spark.version}</version> 
  </dependency>
  <dependency>
  <groupId>org.apache.spark</groupId> 
  <artifactId>spark-mllib_2.11</artifactId> 
  <version>${spark.version}</version> 
  </dependency>
  <dependency>
  <groupId>org.apache.hadoop</groupId> 
  <artifactId>hadoop-client</artifactId> 
  <version>${hadoop.version}</version> 
  </dependency>
</dependencies>

编写代码:

object WordCount {
    def main(args: Array[String]): Unit = {
        //获取集群入口
        val conf: SparkConf = new SparkConf()
        conf.setAppName("WordCount")
        val sc = new SparkContext(conf)
        //从 HDFS 中读取文件
        val lineRDD: RDD[String] = sc.textFile("hdfs://zzy/data/input/words.txt")
        //做数据处理
        val wordRDD: RDD[String] = lineRDD.flatMap(line=>line.split("\\s+"))
        val wordAndCountRDD: RDD[(String, Int)] = wordRDD.map(word=>(word,1))
        //将结果写入到 HDFS 中
       wordAndCountRDD.reduceByKey(_+_).saveAsTextFile("hdfs://zzy/data/output")
        //关闭编程入口
        sc.stop()
    }
}

打jar包:
在pom.xml中加入相应的插件:

<build>
    <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
      <plugins>
        <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
        <plugin>
          <artifactId>maven-clean-plugin</artifactId>
          <version>3.1.0</version>
        </plugin>
        <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
        <plugin>
          <artifactId>maven-resources-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.8.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-surefire-plugin</artifactId>
          <version>2.22.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-jar-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-install-plugin</artifactId>
          <version>2.5.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-deploy-plugin</artifactId>
          <version>2.8.2</version>
        </plugin>
        <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
        <plugin>
          <artifactId>maven-site-plugin</artifactId>
          <version>3.7.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-project-info-reports-plugin</artifactId>
          <version>3.0.0</version>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>

然后:
在IDEA中编写spark程序
在IDEA中编写spark程序
在IDEA中编写spark程序
在IDEA中编写spark程序

将jar包上传到集群中运行:

spark-submit \
--class com.zy.scala.WordCount \
--master yarn \
--deploy-mode cluster \
--driver-memory 200M \
--executor-memory 200M \
--total-executor-cores 1 \
hdfs://zzy/data/jar/ScalaTest-1.0-SNAPSHOT.jar \

此时在yarn的web就能查看相应的程序的运行进度。
这时候,程序总是异常的结束:
在IDEA中编写spark程序
我就使用:
yarn logs -applicationId application_1522668922644_40211 查看了一下报错信息。
结果:not fount class :scala.WordCount.
然后我就在想是不是jar包出现了问题,我就打开了,之前上传的jar包,果然根本找不到 我打jar的程序,只有一个,META-INF,此时 我就百思不得不得解,然后由重新反复尝试了很多次,还是解决不了。后来吃个饭回来,突然想到是不是maven不能将scala编写的程序打成jar包,后来通过百度,发现了:
maven 默认只编译java 的文件,而不会编译scala 文件。但是maven 提供了 能够编译scala 的类库。
好心的博主:scala 在IDEA打jar包相关问题:https://blog.csdn.net/freecrystal_alex/article/details/78296851
然后 我修改了pom.xml文件:
http://down.51cto.com/data/2457588
按照上述的步骤,重新的向集群提交了一次任务,结果不尽人意,又出错了:
但是这一次错误和上次的不同(说明上一个问题已经解决):
在IDEA中编写spark程序
这才明白,原来是Driver进程分配的内存太小了,最少应该大于450M,之后我又修改了 --driver-memory 512M --executor-memory 512M,重新提交任务。结果运行成功!!!

注意
这里使用的是yarn的任务调用,不是spark自带的standalone,需要加入的参数:
--master yarn
--deploy-mode cluster
这里的--deploy-mode,使用的是cluster集群模式,client是客户端模式。
二者的区别是:client表示,在哪个节点提交,Driver就哪里启动,而cluster模式表示当将Driver放入到集群中启动。

另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。

文章名称:在IDEA中编写spark程序-创新互联
文章链接:https://www.cdcxhl.com/article44/didcee.html

成都网站建设公司_创新互联,为您提供小程序开发品牌网站设计面包屑导航标签优化关键词优化外贸建站

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联

h5响应式网站建设