|
本帖最后由 feel 于 2015-7-1 13:03 编辑
spark-mlib增加几个统计分析和机器学习库.参考官方文档:http://spark.apache.org/docs/latest/mllib-statistics.html
代码中主要使用了
Summary statistics我们使用summary statistics 来直接算最大,最小,平均值.不用多说,直接上代码案例.
val filePath="hdfs:///test/**"
val pk="**"
val daily_sql= "***" //这里是我们的sql语句,也可以是其他任何的数据源,获取其他的rdd
val sparkConf=new SparkConf().setAppName("test")
val sc= new SparkContext(sparkConf)
val hdfs=FileSystem.get(new Configuration())
if(hdfs.exists(new Path(filePath)))hdfs.delete(new Path(filePath),true)
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
import sqlContext._
val df=sqlContext.sql(daily_sql)
// 具体的业务逻辑实现,数据清洗等相关工作
val d= df.map(line=>line.mkString(",")).map(line=>line.split(",")).map(f=>f.map(_.toDouble)).map(f=>Vectors.dense(f))
val info=Statistics.colStats(d)
val arr=Array(info.max.toArray,info.mean.toArray,info.min.toArray)
/** pretty
* 矩阵转置
* @param xss
* @return
*/
def transposeDouble(xss: Array[Array[Double]])={
for (i <- Array.range(0, xss(0).length)) yield
for (xs <- xss) yield xs(i)
}
//解析成json 可视化数据
def parse(xss: Array[Array[Double]]) = {
for (a <- Array.range(0, xss.length)) yield ("s_dp"+a,Map("max"->xss(a)(0),"mean"->xss(a)(1),"min"->xss(a)(2)) )
}
sc.parallelize(parse(transposeDouble(arr)).toList).map(line=>{("pk"->pk)~(line._1->line._2)}).map(f=>(pk,f)).reduceByKey(_ merge _).map(f=>pretty(render(f._2))).coalesce(1, shuffle = true).saveAsTextFile(filePath)
}
代码简洁明了大方
|
|