机器学习之推荐算法实战
自从Amazone公布了协同过滤算法后,在推荐系统领域,它就占据了很重要的地位。不像传统的内容推荐,协同过滤不需要考虑物品的属性问题,用户的行为,行业问题等,只需要建立用户与物品的关联关系即可,可以物品之间更多的内在关系,类似于经典的啤酒与尿不湿的营销案例。所以,讲到推荐必须要首先分享协同过滤。下面代码实战基于sparkMLlib ASL 算法实战
package com.gizwits.mllib
import org.apache.log4j._
import org.apache.spark.mllib.recommendation._
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by feel
*
* moivelens 电影推荐协同过滤算法实现电影推荐.目前spark 实现的算法有(交替最小二乘法(ALS))
* 数据下载:http://grouplens.org/datasets/movielens/
*
*/
object MoiveRecommenderALS {
/**
*
* @param input 电影评分数据
* @param numIterations 迭代的次数
* @param lambda ALS的正则化参数。
* @param rank 模型中隐语义因子的个数。
* @param numUserBlocks 用于并行化计算的分块个数 (设置为-1为自动配置)。
* @param numProductBlocks 用于并行化计算的分块个数 (设置为-1为自动配置)。
* @param implicitPrefs 决定了是用显性反馈ALS的版本还是用适用隐性反馈数据集的版本
* @param userDataInput 用户数据输入
*/
case class Params(
input: String = null,
numIterations: Int = 20,
lambda: Double = 1.0,
rank: Int = 10,
numUserBlocks: Int = -1,
numProductBlocks: Int = -1,
implicitPrefs: Boolean = false,
userDataInput: String = null)
val numRecommender = 10
def main(args: Array) {
//设置日志级别
val rootLogger = Logger.getRootLogger()
Logger.getLogger("com.gizwits").setLevel(Level.ERROR)
rootLogger.setLevel(Level.ERROR)
val conf = new SparkConf()
.setAppName("MoiveRecommenderALS")
conf.setMaster("local")
val context = new SparkContext(conf)
val inputDataPath = "file:///Users/feel/githome/idea/spark-exercise/src/main/resources/u.data"
val userInputPath = "file:///Users/feel/githome/idea/spark-exercise/src/main/resources/u.user"
//可以调整这些参数,不断优化结果,使均方差变小。比如iterations越多,lambda较小,均方差会较小,推荐结果较优
val defaultParams = Params(
inputDataPath, 20, 0.01, 10, -1, -1, false, userInputPath
)
//加载数据
val data = context.textFile(inputDataPath)
/**
* *MovieLens ratings are on a scale of 1-5:
* 5: Must see
* 4: Will enjoy
* 3: It's okay
* 2: Fairly bad
* 1: Awful
*/
val ratings = data.map(_.split("\t") match {
case Array(user, item, rate, time) => Rating(user.toInt, item.toInt, rate.toDouble)
})
//使用ALS建立推荐模型
//也可以使用简单模式 val model = ALS.train(ratings, ranking, numIterations)
val model = new ALS()
.setRank(defaultParams.rank)
.setIterations(defaultParams.numIterations)
.setLambda(defaultParams.lambda)
.setImplicitPrefs(defaultParams.implicitPrefs)
.setUserBlocks(defaultParams.numUserBlocks)
.setProductBlocks(defaultParams.numProductBlocks)
.run(ratings)
//预测
predictMoive(defaultParams, context, model)
//模型评估
evaluateMode(ratings, model)
//clean up
context.stop()
//endmain
}
/**
* 模型评估
*/
private def evaluateMode(ratings: RDD, model: MatrixFactorizationModel) {
//使用训练数据训练模型
val usersProducets = ratings.map(r => r match {
case Rating(user, product, rate) => (user, product)
})
//预测数据
val predictions = model.predict(usersProducets).map(u => u match {
case Rating(user, product, rate) => ((user, product), rate)
})
//将真实分数与预测分数进行合并
val ratesAndPreds = ratings.map(r => r match {
case Rating(user, product, rate) =>
((user, product), rate)
}).join(predictions)
//计算均方差
val MSE = ratesAndPreds.map(r => r match {
case ((user, product), (r1, r2)) =>
val err = (r1 - r2)
err * err
}).mean()
//打印出均方差值
println("Mean Squared Error = " + MSE)
}
/**
* 预测数据并保存到HBase中或其他存储引擎
*/
private def predictMoive(params: Params, context: SparkContext, model: MatrixFactorizationModel) {
val recommenders = new scala.collection.mutable.ArrayBuffer]();
//读取需要进行电影推荐的用户数据
val userData = context.textFile(params.userDataInput)
userData.map(_.split("\\|") match {
case Array(id, age, sex, job, x) => (id)
}).collect().foreach(id => {
//为用户推荐电影
val rs = model.recommendProducts(id.toInt, numRecommender)
var value = ""
var key = 0
//保存推荐数据到hbase中
rs.foreach(r => {
key = r.user
value = value + r.product + ":" + r.rating + ","
})
//成功,则封装put对象,等待插入到Hbase中
if (!value.equals("")) {
val put = new scala.collection.mutable.HashMap
put += ("rowKey" -> key.toString)
put += ("t:info" -> value)
recommenders.+=(put)
}
})
recommenders.foreach(println _)
}
}
页:
[1]