| 
 | 
 
 本帖最后由 feel 于 2015-7-1 13:03 编辑  
 
spark-mlib增加几个统计分析和机器学习库.参考官方文档:http://spark.apache.org/docs/latest/mllib-statistics.html 
代码中主要使用了 
Summary statistics我们使用summary statistics   来直接算最大,最小,平均值.不用多说,直接上代码案例. 
 val filePath="hdfs:///test/**" 
   val  pk="**" 
    val daily_sql= "***"  //这里是我们的sql语句,也可以是其他任何的数据源,获取其他的rdd 
    val sparkConf=new SparkConf().setAppName("test") 
    val sc= new SparkContext(sparkConf) 
    val hdfs=FileSystem.get(new Configuration()) 
    if(hdfs.exists(new Path(filePath)))hdfs.delete(new Path(filePath),true) 
    val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) 
    import sqlContext._ 
    val  df=sqlContext.sql(daily_sql) 
// 具体的业务逻辑实现,数据清洗等相关工作 
   val d= df.map(line=>line.mkString(",")).map(line=>line.split(",")).map(f=>f.map(_.toDouble)).map(f=>Vectors.dense(f)) 
   val info=Statistics.colStats(d) 
   val arr=Array(info.max.toArray,info.mean.toArray,info.min.toArray) 
    /**  pretty 
     * 矩阵转置 
     * @param xss 
     * @return 
     */ 
    def transposeDouble(xss: Array[Array[Double]])={ 
      for (i <- Array.range(0, xss(0).length)) yield 
      for (xs <- xss) yield  xs(i) 
    } 
//解析成json 可视化数据 
    def parse(xss: Array[Array[Double]]) = { 
      for (a <- Array.range(0, xss.length)) yield   ("s_dp"+a,Map("max"->xss(a)(0),"mean"->xss(a)(1),"min"->xss(a)(2)) ) 
    } 
    sc.parallelize(parse(transposeDouble(arr)).toList).map(line=>{("pk"->pk)~(line._1->line._2)}).map(f=>(pk,f)).reduceByKey(_ merge _).map(f=>pretty(render(f._2))).coalesce(1, shuffle = true).saveAsTextFile(filePath) 
  } 
 
 
代码简洁明了大方 
 
 |   
 
 
 
 |