借hbase-rdd二次开发谈如何在SparkCore之上扩建自己的模块

我是学院讲师张敏,在学院 “4.20 IT充电节”(4月19~20日) 到来之际,和大家分享一下Spark Core之上扩建自己的模块的经验。正文来啦~~~

创新互联专注于企业全网整合营销推广、网站重做改版、阿瓦提网站定制设计、自适应品牌网站建设、H5建站商城网站开发、集团公司官网建设、外贸网站制作、高端网站制作、响应式网页设计等建站业务,价格优惠性价比高,为阿瓦提等各大城市提供网站开发制作服务。

hbase-rdd是一个构建在SparkContext基础之上的用于对Hbase进行增删改查的第三方开源模块,目前***版本为0.7.1。目前该rdd在操作hbase时,默认调用隐式方法。

 
 
 
 
  1. implicitdef stringToBytes(s: String): Array[Byte] = { 
  2. Bytes.toBytes(s) 
  3. }

将RDD的key转换成字节b,然后调用Hbase的put(b)方法保存rowkey,之后将RDD的每一行存入hbase。

在轨迹图绘制项目数据计算中,我们考虑到hbase的rowkey的设计——尽量减少rowkey存储的开销。虽然hbase-rdd最终的rowkey默认都是采用字节数组,但这个地方我们希望按自己的方式组装rowkey。使用MD5(imei)+dateTime组成的字节数组作为rowkey。因此默认的hbase-rdd提供的方法是不满足我们存储需求的,需要对源代码进行修改。在toHbase方法中,有一个convert方法,该方法将对RDD中的每一行数据进行转化,使用RDD中的key生成Put(Bytes.toBytes(key))对象,该对象为之后存储Hbase提供rowkey。

在convert函数中,对其实现进行了改造,hbase-rdd默认使用stringToBytes隐式函数将RDD的String类型的key转换成字节数组,这里我们需要改造,不使stringToBytes隐式方法,而是直接生成字节数据。

 
 
 
 
  1. protected def convert(id: String, values: Map[String, Map[String, A]], put: PutAdder[A]) = { 
  2. val strs = id.split(",") 
  3. val imei = strs {0} 
  4. val dateTime = strs {1} 
  5. val b1 = MD5Utils.computeMD5Hash(imei.getBytes()) 
  6. val b2 = Bytes.toBytes(dateTime.toLong) 
  7. val key = b1.++(b2) 
  8. val p = new Put(key)//改造 
  9. var empty = true 
  10. for { 
  11. (family, content) <- values 
  12. (key, value) <- content 
  13. } { 
  14. empty = false 
  15. if (StrUtils.isNotEmpty(family) &&StrUtils.isNotEmpty(key)) { 
  16. put(p, family, key, value) 
  17. if (empty) None else Some(new ImmutableBytesWritable, p) 
  18. }

这样就实现了使用自己的方式构建rowkey,当然基于此思想我们可以使用任意的方式构建rowkey。

在使用hbase-rdd插件的过程中,我在思考,默认的RDD上是没有toHbase方法的,那为什么引入hbase-rdd包之后,RDD之上就有toHbase方法了?经过查看源码,发现hbase-rdd包中提供了两个隐式方法:

 
 
 
 
  1. implicitdef toHBaseRDDSimple[A](rdd: RDD[(String, Map[String, A])])(implicit writer: Writes[A]): HBaseWriteRDDSimple[A] =new HBaseWriteRDDSimple(rdd, pa[A])
  2. implicit def toHBaseRDDSimpleTS[A](rdd: RDD[(String, Map[String, (A, Long)])])(implicit writer: Writes[A]): HBaseWriteRDDSimple[(A, Long)] =new HBaseWriteRDDSimple(rdd, pa[A])

这两个方法在发现RDD上没有toHbase方法时会自动尝试调用,从隐式定义中尝试找到解决方案,尝试之后发现有定义toHBaseRDDSimple隐式方法,于是调用该隐式方法新建HBaseWriteRDDSimple类,返回hBaseWriteRDDSimple,而在hBaseWriteRDDSimple对象中是有toHbase方法的,因此在引入hbase-rdd之后,可以发现原本没有toHbase方法的RDD上有toHbase方法了。这一切都要归功于Scala强大的隐式转换功能。

那明白了原理,是否我们可以基于RDD写自己的模块,说干就干!

***步:新建Trait

 
 
 
 
  1. traitHaha{
  2. implicitdef gaga[A](rdd: RDD[String]): Hehe=
  3. newHehe(rdd)
  4. }

第二步:新建Hehe类

 
 
 
 
  1. final  class Hehe(rdd:RDD[String]) {
  2. def wow(tableName:String,family:String): Unit ={
  3. println("---------------------------------------------")
  4. println("tableName:"+tableName+" - family:"+family)
  5. println("size:"+rdd.count())
  6. rdd.collect().foreach(data=>println(data))
  7. println("---------------------------------------------")
  8.    }
  9. }

第三步:新建包对象

 
 
 
 
  1. package object test extends Haha

第四步:新建test类

 
 
 
 
  1. object Test{
  2. def main(args: Array[String]) {
  3. valsparkConf = new SparkConf().setAppName("Test")
  4. valsc = new SparkContext(sparkConf)
  5. sc.makeRDD(Seq("one","two","three","four")).wow("taskDataPre","T")
  6.   }
  7. }

项目结构图:

运行效果图:

希望对大家以后的开发有帮助,同时借鉴本案例,在Spark Core之上构建自己的小模块。

学院 4.20 IT充电节

(19-20号两天,100门视频课程免单抢,更有视频课程会员享6折,非会员享7折,套餐折上8折,微职位立减2000元钜惠)

活动链接:http://edu./activity/lists/id-47.html?wenzhang

相关视频教程:

【大数据 Spark2.x 流数据处理】精通Spark流数据处理(持续完毕)

网站名称:借hbase-rdd二次开发谈如何在SparkCore之上扩建自己的模块
链接地址:http://www.36103.cn/qtweb/news37/22487.html

网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联