收藏官网首页
查看: 11234|回复: 1

5分钟玩转统计分析

36

主题

69

帖子

265

积分

中级会员

Rank: 3Rank: 3

积分
265
跳转到指定楼层
楼主
发表于 2015-7-1 12:26:48 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
校园创客福利
本帖最后由 feel 于 2015-7-1 13:03 编辑

spark-mlib增加几个统计分析和机器学习库.参考官方文档:http://spark.apache.org/docs/latest/mllib-statistics.html
代码中主要使用了
Summary statistics我们使用summary statistics   来直接算最大,最小,平均值.不用多说,直接上代码案例.
val filePath="hdfs:///test/**"
   val  pk="**"
    val daily_sql= "***"  //这里是我们的sql语句,也可以是其他任何的数据源,获取其他的rdd
    val sparkConf=new SparkConf().setAppName("test")
    val sc= new SparkContext(sparkConf)
    val hdfs=FileSystem.get(new Configuration())
    if(hdfs.exists(new Path(filePath)))hdfs.delete(new Path(filePath),true)
    val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
    import sqlContext._
    val  df=sqlContext.sql(daily_sql)
// 具体的业务逻辑实现,数据清洗等相关工作
   val d= df.map(line=>line.mkString(",")).map(line=>line.split(",")).map(f=>f.map(_.toDouble)).map(f=>Vectors.dense(f))
   val info=Statistics.colStats(d)
   val arr=Array(info.max.toArray,info.mean.toArray,info.min.toArray)
    /**  pretty
     * 矩阵转置
     * @param xss
     * @return
     */
    def transposeDouble(xss: Array[Array[Double]])={
      for (i <- Array.range(0, xss(0).length)) yield
      for (xs <- xss) yield  xs(i)
    }
//解析成json 可视化数据
    def parse(xss: Array[Array[Double]]) = {
      for (a <- Array.range(0, xss.length)) yield   ("s_dp"+a,Map("max"->xss(a)(0),"mean"->xss(a)(1),"min"->xss(a)(2)) )
    }
    sc.parallelize(parse(transposeDouble(arr)).toList).map(line=>{("pk"->pk)~(line._1->line._2)}).map(f=>(pk,f)).reduceByKey(_ merge _).map(f=>pretty(render(f._2))).coalesce(1, shuffle = true).saveAsTextFile(filePath)
  }


代码简洁明了大方

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

加入Q群 返回顶部

版权与免责声明 © 2006-2024 Gizwits IoT Technology Co., Ltd. ( 粤ICP备11090211号 )

快速回复 返回顶部 返回列表