import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.spark.rdd.NewHadoopRDD val conf = HBaseConfiguration.create() conf.set(TableInputFormat.INPUT_TABLE, "tmp") var hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) hBaseRDD.count() import scala.collection.JavaConverters._ hBaseRDD.map(tuple => tuple._2).map(result => result.getColumn("cf".getBytes(), "val".getBytes())).map(keyValues => { ( keyValues.asScala.reduceLeft { (a, b) => if (a.getTimestamp > b.getTimestamp) a else b }.getRow, keyValues.asScala.reduceLeft { (a, b) => if (a.getTimestamp > b.getTimestamp) a else b }.getValue ) }).take(10) hBaseRDD.map(tuple => tuple._2).map(result => (result.getRow, result.getColumn("cf".getBytes(), "val".getBytes()))).map(row => { ( row._1.map(_.toChar).mkString, row._2.asScala.reduceLeft { (a, b) => if (a.getTimestamp > b.getTimestamp) a else b }.getValue.map(_.toChar).mkString ) }).take(10) conf.set(TableInputFormat.INPUT_TABLE, "test1") //var hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) hBaseRDD.map(tuple => tuple._2).map(result => (result.getRow, result.getColumn("lf".getBytes(), "app1".getBytes()))).map(row => if (row._2.size > 0) { ( row._1.map(_.toChar).mkString, row._2.asScala.reduceLeft { (a, b) => if (a.getTimestamp > b.getTimestamp) a else b }.getValue.map(_.toInt).mkString ) }).take(10) import java.nio.ByteBuffer hBaseRDD.map(tuple => tuple._2).map(result => (result.getRow, result.getColumn("lf".getBytes(), "app1".getBytes()))).map(row => if (row._2.size > 0) { ( row._1.map(_.toChar).mkString, ByteBuffer.wrap(row._2.asScala.reduceLeft { (a, b) => if (a.getTimestamp > b.getTimestamp) a else b }.getValue).getLong ) }).take(10) //conf.set(TableInputFormat.SCAN_COLUMN_FAMILY, "lf") conf.set(TableInputFormat.SCAN_COLUMNS, "lf:app1") //var hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) import java.nio.ByteBuffer hBaseRDD.map(tuple => tuple._2).map(result => { ( result.getRow.map(_.toChar).mkString, ByteBuffer.wrap(result.value).getLong ) }).take(10) val conf = HBaseConfiguration.create() conf.set(TableInputFormat.INPUT_TABLE, "test1") var hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) var rows = hBaseRDD.map(tuple => tuple._2).map(result => result.getRow.map(_.toChar).mkString) rows.map(row => row.split("\\|")).map(r => if (r.length > 1) (r(0), r(1)) else (r(0), "") ).groupByKey.take(10)
当前文章:spark访问hbase
链接分享:https://www.cdcxhl.com/article16/pdgpdg.html
成都网站建设公司_创新互联,为您提供定制开发、网站营销、动态网站、微信公众号、外贸网站建设、电子商务
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联