| 
 | 
 
 
自从Amazone公布了协同过滤算法后,在推荐系统领域,它就占据了很重要的地位。不像传统的内容推荐,协同过滤不需要考虑物品的属性问题,用户的行为,行业问题等,只需要建立用户与物品的关联关系即可,可以物品之间更多的内在关系,类似于经典的啤酒与尿不湿的营销案例。所以,讲到推荐必须要首先分享协同过滤。下面代码实战基于sparkMLlib ASL 算法实战 
- package com.gizwits.mllib
 
  
- import org.apache.log4j._
 
 - import org.apache.spark.mllib.recommendation._
 
 - import org.apache.spark.rdd.RDD
 
 - import org.apache.spark.{SparkConf, SparkContext}
 
  
- /**
 
 -   * Created by feel 
 
 -   *
 
 -   * moivelens 电影推荐  协同过滤算法实现电影推荐.目前spark 实现的算法有(交替最小二乘法(ALS))
 
 -   * 数据下载:http://grouplens.org/datasets/movielens/
 
 -   *
 
 -   */
 
 - object MoiveRecommenderALS {
 
  
-   /**
 
 -     *
 
 -     * @param input            电影评分数据
 
 -     * @param numIterations    迭代的次数
 
 -     * @param lambda           ALS的正则化参数。
 
 -     * @param rank             模型中隐语义因子的个数。
 
 -     * @param numUserBlocks    用于并行化计算的分块个数 (设置为-1为自动配置)。
 
 -     * @param numProductBlocks 用于并行化计算的分块个数 (设置为-1为自动配置)。
 
 -     * @param implicitPrefs    决定了是用显性反馈ALS的版本还是用适用隐性反馈数据集的版本
 
 -     * @param userDataInput    用户数据输入
 
 -     */
 
 -   case class Params(
 
 -                      input: String = null,
 
 -                      numIterations: Int = 20,
 
 -                      lambda: Double = 1.0,
 
 -                      rank: Int = 10,
 
 -                      numUserBlocks: Int = -1,
 
 -                      numProductBlocks: Int = -1,
 
 -                      implicitPrefs: Boolean = false,
 
 -                      userDataInput: String = null)
 
  
-   val numRecommender = 10
 
  
-   def main(args: Array[String]) {
 
 -     //  设置日志级别
 
 -     val rootLogger = Logger.getRootLogger()
 
  
-     Logger.getLogger("com.gizwits").setLevel(Level.ERROR)
 
  
-     rootLogger.setLevel(Level.ERROR)
 
 -     val conf = new SparkConf()
 
 -       .setAppName("MoiveRecommenderALS")
 
 -     conf.setMaster("local[4]")
 
 -     val context = new SparkContext(conf)
 
  
-     val inputDataPath = "file:///Users/feel/githome/idea/spark-exercise/src/main/resources/u.data"
 
 -     val userInputPath = "file:///Users/feel/githome/idea/spark-exercise/src/main/resources/u.user"
 
  
-     //可以调整这些参数,不断优化结果,使均方差变小。比如iterations越多,lambda较小,均方差会较小,推荐结果较优
 
 -     val defaultParams = Params(
 
 -       inputDataPath, 20, 0.01, 10, -1, -1, false, userInputPath
 
 -     )
 
 -     //加载数据
 
 -     val data = context.textFile(inputDataPath)
 
  
-     /**
 
 -       * *MovieLens ratings are on a scale of 1-5:
 
 -       * 5: Must see
 
 -       * 4: Will enjoy
 
 -       * 3: It's okay
 
 -       * 2: Fairly bad
 
 -       * 1: Awful
 
 -       */
 
 -     val ratings = data.map(_.split("\t") match {
 
 -       case Array(user, item, rate, time) => Rating(user.toInt, item.toInt, rate.toDouble)
 
 -     })
 
  
 
 
-     //使用ALS建立推荐模型
 
 -     //也可以使用简单模式    val model = ALS.train(ratings, ranking, numIterations)
 
  
 
-     val model = new ALS()
 
 -       .setRank(defaultParams.rank)
 
 -       .setIterations(defaultParams.numIterations)
 
 -       .setLambda(defaultParams.lambda)
 
 -       .setImplicitPrefs(defaultParams.implicitPrefs)
 
 -       .setUserBlocks(defaultParams.numUserBlocks)
 
 -       .setProductBlocks(defaultParams.numProductBlocks)
 
 -       .run(ratings)
 
  
-     //预测
 
 -     predictMoive(defaultParams, context, model)
 
  
 
-     //模型评估
 
 -     evaluateMode(ratings, model)
 
  
-     //clean up
 
 -     context.stop()
 
 -     //end  main
 
  
-   }
 
  
-   /**
 
 -     * 模型评估
 
 -     */
 
 -   private def evaluateMode(ratings: RDD[Rating], model: MatrixFactorizationModel) {
 
  
-     //使用训练数据训练模型
 
 -     val usersProducets = ratings.map(r => r match {
 
 -       case Rating(user, product, rate) => (user, product)
 
 -     })
 
  
-     //预测数据
 
 -     val predictions = model.predict(usersProducets).map(u => u match {
 
 -       case Rating(user, product, rate) => ((user, product), rate)
 
 -     })
 
  
-     //将真实分数与预测分数进行合并
 
 -     val ratesAndPreds = ratings.map(r => r match {
 
 -       case Rating(user, product, rate) =>
 
 -         ((user, product), rate)
 
 -     }).join(predictions)
 
  
-     //计算均方差
 
 -     val MSE = ratesAndPreds.map(r => r match {
 
 -       case ((user, product), (r1, r2)) =>
 
 -         val err = (r1 - r2)
 
 -         err * err
 
 -     }).mean()
 
  
-     //打印出均方差值
 
 -     println("Mean Squared Error = " + MSE)
 
 -   }
 
  
-   /**
 
 -     * 预测数据并保存到HBase中或其他存储引擎
 
 -     */
 
 -   private def predictMoive(params: Params, context: SparkContext, model: MatrixFactorizationModel) {
 
  
 
-     val recommenders = new scala.collection.mutable.ArrayBuffer[scala.collection.mutable.HashMap[String, String]]();
 
  
-     //读取需要进行电影推荐的用户数据
 
 -     val userData = context.textFile(params.userDataInput)
 
  
-     userData.map(_.split("\\|") match {
 
 -       case Array(id, age, sex, job, x) => (id)
 
 -     }).collect().foreach(id => {
 
 -       //为用户推荐电影
 
 -       val rs = model.recommendProducts(id.toInt, numRecommender)
 
 -       var value = ""
 
 -       var key = 0
 
  
-       //保存推荐数据到hbase中
 
 -       rs.foreach(r => {
 
 -         key = r.user
 
 -         value = value + r.product + ":" + r.rating + ","
 
 -       })
 
  
-       //成功,则封装put对象,等待插入到Hbase中
 
 -       if (!value.equals("")) {
 
 -         val put = new scala.collection.mutable.HashMap[String, String]
 
 -         put += ("rowKey" -> key.toString)
 
 -         put += ("t:info" -> value)
 
 -         recommenders.+=(put)
 
  
-       }
 
 -     })
 
  
-     recommenders.foreach(println _)
 
  
-   }
 
 - }
 
 
  复制代码 
 
 
 |   
 
 
 
 |