Skip to content

Commit

Permalink
add:1. mllib矢量学习
Browse files Browse the repository at this point in the history
  • Loading branch information
Kyofin committed Sep 9, 2019
1 parent 84da069 commit f2e2ddb
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@ object DoubanRecommendMovie {
//准备数据
preparation(rawUserMoviesData, rawHotMoviesData)
println("准备完数据")
model(sc, rawUserMoviesData, rawHotMoviesData)

evaluate(sc,rawUserMoviesData, rawHotMoviesData)
// model(sc, rawUserMoviesData, rawHotMoviesData)

// evaluate(sc,rawUserMoviesData, rawHotMoviesData)

recommend(sc, rawUserMoviesData, rawHotMoviesData, base)
}

/**
Expand Down Expand Up @@ -116,11 +119,20 @@ object DoubanRecommendMovie {
println("输出第一个userFeature")
println(model.userFeatures.mapValues(_.mkString(", ")).first())
for (userID <- Array(100, 1001, 10001, 100001, 110000)) {
checkRecommenderResult(userID, rawUserMoviesData, bMoviesAndName, reverseUserIDMapping, model)
checkRecommendResult(userID, rawUserMoviesData, bMoviesAndName, reverseUserIDMapping, model)
}
// unpersist(model)
}

/**
* 评价模型
* 我们可以通过计算均方差(Mean Squared Error, MSE)来衡量模型的好坏。
* 数理统计中均方误差是指参数估计值与参数真值之差平方的期望值,记为MSE。
* MSE是衡量“平均误差”的一种较方便的方法,MSE可以评价数据的变化程度,MSE的值越小,说明预测模型描述实验数据具有更好的精确度。
*
* 我们可以调整rank,numIterations,lambda,alpha这些参数,不断优化结果,使均方差变小。
* 比如:iterations越多,lambda较小,均方差会较小,推荐结果较优。
*/
def evaluate( sc: SparkContext,
rawUserMoviesData: RDD[String],
rawHotMoviesData: RDD[String]): Unit = {
Expand All @@ -141,9 +153,8 @@ object DoubanRecommendMovie {
}.cache()

val numIterations = 10

for (rank <- Array(10, 50);
lambda <- Array(1.0, 0.01,0.0001)) {
// 评估显性反馈的参数的结果
for (rank <- Array(10, 50); lambda <- Array(1.0, 0.01,0.0001)) {
val model = ALS.train(ratings, rank, numIterations, lambda)

// Evaluate the model on rating data
Expand All @@ -165,9 +176,8 @@ object DoubanRecommendMovie {
println(s"(rank:$rank, lambda: $lambda, Explicit ) Mean Squared Error = " + MSE)
}

for (rank <- Array(10, 50);
lambda <- Array(1.0, 0.01,0.0001);
alpha <- Array(1.0, 40.0)) {
//评估隐性反馈的参数的结果。
for (rank <- Array(10, 50); lambda <- Array(1.0, 0.01,0.0001); alpha <- Array(1.0, 40.0)) {
val model = ALS.trainImplicit(ratings, rank, numIterations, lambda, alpha)

// Evaluate the model on rating data
Expand All @@ -190,12 +200,14 @@ object DoubanRecommendMovie {
}
}

//查看给某个用户的推荐
def checkRecommenderResult(userID: Int,
rawUserMoviesData: RDD[String],
bMoviesAndName: Broadcast[scala.collection.Map[Int, String]],
reverseUserIDMapping: RDD[(Long, String)],
model: MatrixFactorizationModel): Unit = {
/**
* 挑选几个用户,查看这些用户看过的电影,以及这个模型推荐给他们的电影
*/
def checkRecommendResult(userID: Int,
rawUserMoviesData: RDD[String],
bMoviesAndName: Broadcast[scala.collection.Map[Int, String]],
reverseUserIDMapping: RDD[(Long, String)],
model: MatrixFactorizationModel): Unit = {

val userName = reverseUserIDMapping.lookup(userID).head

Expand All @@ -220,4 +232,62 @@ object DoubanRecommendMovie {
}


def recommend(sc: SparkContext,
rawUserMoviesData: RDD[String],
rawHotMoviesData: RDD[String],
base: String): Unit = {
val moviesAndName = buildMovies(rawHotMoviesData)
val bMoviesAndName = sc.broadcast(moviesAndName)

val data = buildUserMovieRatings(rawUserMoviesData)

val userIdToInt: RDD[(String, Long)] =
data.map(_.userID).distinct().zipWithUniqueId()
val reverseUserIDMapping: RDD[(Long, String)] =
userIdToInt map { case (l, r) => (r, l) }

val userIDMap: scala.collection.Map[String, Int] =
userIdToInt.collectAsMap().map { case (n, l) => (n, l.toInt) }

val bUserIDMap = sc.broadcast(userIDMap)
val bReverseUserIDMap = sc.broadcast(reverseUserIDMapping.collectAsMap())

val ratings: RDD[Rating] = data.map { r =>
Rating(bUserIDMap.value(r.userID), r.movieID, r.rating)
}.cache()
//使用协同过滤算法建模
//val model = ALS.trainImplicit(ratings, 10, 10, 0.01, 1.0)
val model = ALS.train(ratings, 50, 10, 0.0001)
ratings.unpersist()

model.save(sc, base + "model")
//val sameModel = MatrixFactorizationModel.load(sc, base + "model")

val allRecommendations = model.recommendProductsForUsers(5) map {
case (userID, recommendations) => {
var recommendationStr = ""
for (r <- recommendations) {
recommendationStr += r.product + ":" + bMoviesAndName.value.getOrElse(r.product, "") + ","
}
if (recommendationStr.endsWith(","))
recommendationStr = recommendationStr.substring(0, recommendationStr.length - 1)

(bReverseUserIDMap.value(userID), recommendationStr)
}
}

// 将推荐结果写入到文件中。
// 第一个字段是用户名,后面是五个推荐的电影(电影ID:电影名字)
//allRecommendations.saveAsTextFile(base + "result.csv")
allRecommendations.coalesce(1).sortByKey().saveAsTextFile(base + "result.csv")

unpersist(model)
}

def unpersist(model: MatrixFactorizationModel): Unit = {
// At the moment, it's necessary to manually unpersist the RDDs inside the model
// when done with it in order to make sure they are promptly uncached
model.userFeatures.unpersist()
model.productFeatures.unpersist()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.wugui.sparkstarter.ml

import breeze.linalg.DenseMatrix
import org.apache.spark.mllib.linalg.Matrices
import org.apache.spark.mllib.stat.Statistics

object MatrixExample {

def main(args: Array[String]): Unit = {
// 矩阵打竖排
val dm = Matrices.dense(3,2,Array(1,2,3,4,5,6))
println(dm)

// 矩阵打横排
val d1 = DenseMatrix(Array(1,2),Array(3,4),Array(5,6))
println(d1)

// 矩阵转置
println(d1.t)

// 皮尔森卡方检验
val pValue = Statistics.chiSqTest(Matrices.dense(2,2,Array(127,19,147,10)))
println(pValue)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.wugui.sparkstarter.ml

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.{SparkConf, SparkContext}

object StatisticsExample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("StatisticsApp")
val sc = new SparkContext(conf)
val classpath = this.getClass.getResource("/").getPath

val txt = sc.textFile(classpath + "北京降雨量.txt")
// 将数据转成向量vector的RDD
val data = txt.flatMap(line => line.split(",")).map { s =>
Vectors.dense(s.toDouble)
}

val summary = Statistics.colStats(data)
// 每一列的平均值
println(summary.mean)
// 方差
println(summary.variance)
// 非0数量
println(summary.numNonzeros)
// 最大值
println(summary.max)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.wugui.sparkstarter.ml
import breeze.linalg.DenseVector
import org.apache.spark.mllib.linalg.Vectors

object VectorsExample {
def main(args: Array[String]): Unit = {
val dv = Vectors.dense(1,2,3)

val denseVector1 = DenseVector(1,2,3)
val denseVector2 = DenseVector(1,2,3)

val sub =denseVector1 + denseVector2
println(sub)

println()

// 向量乘法
println(denseVector1 * denseVector2)
println(denseVector1 * denseVector2.t)
}

}
1 change: 1 addition & 0 deletions spark-starter/src/main/resources/北京降雨量.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
0.4806,0.4839,0.318,0.4107,0.4835,0.4445,0.3704,0.3389,0.3711,0.2669,0.7317,0.4309,0.7009,0.5725,0.8132,0.5067,0.5415,0.7479,0.6973,0.4422,0.6733,0.6839,0.6653,0.721,0.4888,0.4899,0.5444,0.3932,0.3807,0.7184,0.6648,0.779,0.684,0.3928,0.4747,0.6982,0.3742,0.5112,0.597,0.9132,0.3867,0.5934,0.5279,0.2618,0.8177,0.7756,0.3669,0.5998,0.5271,1.406,0.6919,0.4868,1.1157,0.9332,0.9614,0.6577,0.5573,0.4816,0.9109,0.921

0 comments on commit f2e2ddb

Please sign in to comment.