feel 发表于 2015-7-1 12:26:48

5分钟玩转统计分析

本帖最后由 feel 于 2015-7-1 13:03 编辑

spark-mlib增加几个统计分析和机器学习库.参考官方文档:http://spark.apache.org/docs/latest/mllib-statistics.html
代码中主要使用了
Summary statistics我们使用summary statistics   来直接算最大,最小,平均值.不用多说,直接上代码案例.
val filePath="hdfs:///test/**"
   valpk="**"
    val daily_sql= "***"//这里是我们的sql语句,也可以是其他任何的数据源,获取其他的rdd
    val sparkConf=new SparkConf().setAppName("test")
    val sc= new SparkContext(sparkConf)
    val hdfs=FileSystem.get(new Configuration())
    if(hdfs.exists(new Path(filePath)))hdfs.delete(new Path(filePath),true)
    val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
    import sqlContext._
    valdf=sqlContext.sql(daily_sql)
// 具体的业务逻辑实现,数据清洗等相关工作
   val d= df.map(line=>line.mkString(",")).map(line=>line.split(",")).map(f=>f.map(_.toDouble)).map(f=>Vectors.dense(f))
   val info=Statistics.colStats(d)
   val arr=Array(info.max.toArray,info.mean.toArray,info.min.toArray)
    /**pretty
   * 矩阵转置
   * @param xss
   * @return
   */
    def transposeDouble(xss: Array])={
      for (i <- Array.range(0, xss(0).length)) yield
      for (xs <- xss) yieldxs(i)
    }
//解析成json 可视化数据
    def parse(xss: Array]) = {
      for (a <- Array.range(0, xss.length)) yield   ("s_dp"+a,Map("max"->xss(a)(0),"mean"->xss(a)(1),"min"->xss(a)(2)) )
    }
    sc.parallelize(parse(transposeDouble(arr)).toList).map(line=>{("pk"->pk)~(line._1->line._2)}).map(f=>(pk,f)).reduceByKey(_ merge _).map(f=>pretty(render(f._2))).coalesce(1, shuffle = true).saveAsTextFile(filePath)
}


代码简洁明了大方

Snail 发表于 2015-7-1 13:24:31

干货,学习!!!
页: [1]
查看完整版本: 5分钟玩转统计分析