Skip to content

Commit

Permalink
change: 1.更新es相关
Browse files Browse the repository at this point in the history
add:1. 豆瓣电影推荐
  • Loading branch information
Kyofin committed Sep 6, 2019
1 parent 130b0e1 commit 84da069
Show file tree
Hide file tree
Showing 5 changed files with 326 additions and 46 deletions.
7 changes: 7 additions & 0 deletions spark-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,13 @@
<version>2.11.12</version>
<!--<scope>provided</scope>-->
</dependency>
<!-- https://mvnrepository.com/artifact/com.github.fommil.netlib/all -->
<dependency>
<groupId>com.github.fommil.netlib</groupId>
<artifactId>all</artifactId>
<version>1.1.2</version>
<type>pom</type>
</dependency>


</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,6 @@
public class EsSparkTest {

public static void main(String[] args) {
// new EsSparkTest().writeEs();
// new EsSparkTest().readEs();
new EsSparkTest().writeBeanEs();
}

/**
* 以map方式存入es
*/
public void writeEs() {
String elasticIndex = "spark/docs";
SparkConf sparkConf = new SparkConf()
.setAppName("writeEs")
.setMaster("local[*]")
Expand All @@ -36,6 +26,21 @@ public void writeEs() {
.set("es.nodes.wan.only", "true");

SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();

// new EsSparkTest().writeEs(sparkSession);
// new EsSparkTest().writeBeanEs(sparkSession);
// new EsSparkTest().writeJsonEs(sparkSession);

// new EsSparkTest().readEs(sparkSession);
new EsSparkTest().readEs2(sparkSession);
}

/**
* 以map方式存入es
*/
public void writeEs(SparkSession sparkSession) {
String elasticIndex = "spark/docs";

JavaSparkContext jsc = new JavaSparkContext(sparkSession.sparkContext());//adapter
Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2);
Map<String, ?> airports = ImmutableMap.of("city", "广州", "airportName", "广州白云机场");
Expand All @@ -46,15 +51,8 @@ public void writeEs() {
/**
* 以对象存入es
*/
public void writeBeanEs() {
SparkConf sparkConf = new SparkConf()
.setAppName("writeEs")
.setMaster("local[*]")
.set("es.index.auto.create", "true")
.set("es.nodes", "192.168.1.25")
.set("es.port", "9200")
.set("es.nodes.wan.only", "true");
SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
public void writeBeanEs(SparkSession sparkSession) {

JavaSparkContext jsc = new JavaSparkContext(sparkSession.sparkContext());//adapter


Expand All @@ -66,16 +64,47 @@ public void writeBeanEs() {
JavaEsSpark.saveToEs(javaRDD, "spark/docs");
}

public void readEs() {
SparkConf sparkConf = new SparkConf()
.setAppName("writeEs")
.setMaster("local[*]")
.set("es.index.auto.create", "true")
.set("es.nodes", "192.168.1.25")
.set("es.port", "9200")
.set("es.nodes.wan.only", "true");
/**
* 以json方式存入es
* @param sparkSession
*/
public void writeJsonEs(SparkSession sparkSession) {
String json1 = "{\"reason\" : \"business\",\"airport\" : \"SFO\"}";
String json2 = "{\"participants\" : 5,\"airport\" : \"OTP\"}";
String json3 = "{" +
" \"flavor\": {" +
" \"name\": \"IMS_CMREPO\"," +
" \"links\": [" +
" {" +
" \"href\": \"http://192.168.49.25:8774/v2/29ec86a6f17942f49fdc0bcc0748087b/flavors/00061ec1-4405-40fe-87a5-d06191f6826d\"," +
" \"rel\": \"self\"" +
" }," +
" {" +
" \"href\": \"http://192.168.49.25:8774/29ec86a6f17942f49fdc0bcc0748087b/flavors/00061ec1-4405-40fe-87a5-d06191f6826d\"," +
" \"rel\": \"bookmark\"" +
" }" +
" ]," +
" \"ram\": 16384," +
" \"OS-FLV-DISABLED:disabled\": false," +
" \"vcpus\": 8," +
" \"swap\": \"\"," +
" \"os-flavor-access:is_public\": true," +
" \"rxtx_factor\": 1," +
" \"OS-FLV-EXT-DATA:ephemeral\": 0," +
" \"disk\": 38," +
" \"id\": \"00061ec1-4405-40fe-87a5-d06191f6826d\"" +
" }" +
'}';

JavaSparkContext jsc = new JavaSparkContext(sparkSession.sparkContext());
JavaRDD<String> stringRDD = jsc.parallelize(ImmutableList.of(json1, json2,json3));
JavaEsSpark.saveJsonToEs(stringRDD, "spark/json-trips");
}



public void readEs(SparkSession sparkSession) {

SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
JavaSparkContext jsc = new JavaSparkContext(sparkSession.sparkContext());//adapter
JavaRDD<Map<String, Object>> searchRdd = JavaEsSpark.esRDD(jsc, "spark/docs", "?q=广州").values();
for (Map<String, Object> item : searchRdd.collect()) {
Expand All @@ -85,5 +114,9 @@ public void readEs() {
}


public void readEs2(SparkSession sparkSession) {
sparkSession.sqlContext().read().format("es").load("spark/docs").show();
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
package com.wugui.sparkstarter.ml

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}



object DoubanRecommendMovie {

/**
* 定义电影评价样本类
*
* @param userID
* @param movieID
* @param rating
*/
case class MovieRating(userID: String, movieID: Int, rating: Double) extends scala.Serializable


def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setAppName("DoubanRecommender").setMaster("local"))
//数据读取目录
val base = if (args.length > 0) args(0) else "/Users/huzekang/study/bigdata-starter/spark-starter/src/main/resources/data/"

//获取RDD
// <userID>:<movieID>:<评分>
val rawUserMoviesData = sc.textFile(base + "user_movies.csv")
// <movieID>,<评分>,<电影名>
val rawHotMoviesData = sc.textFile(base + "hot_movies.csv")

//准备数据
preparation(rawUserMoviesData, rawHotMoviesData)
println("准备完数据")
model(sc, rawUserMoviesData, rawHotMoviesData)

evaluate(sc,rawUserMoviesData, rawHotMoviesData)
}

/**
* 准备数据
*/
def preparation(rawUserMoviesData: RDD[String],
rawHotMoviesData: RDD[String]) = {
// 用户id的基本的统计信息
val userIDStats = rawUserMoviesData
.map(_.split(',')(0).trim)
.distinct()
.zipWithUniqueId()
.map(_._2.toDouble)
.stats()
// 电影id的基本的统计信息
val itemIDStats = rawUserMoviesData
.map(_.split(',')(1).trim.toDouble)
.distinct().stats()
println("用户id的基本的统计信息" + userIDStats)
println("电影id的基本的统计信息" + itemIDStats)
// 获取电影名和id映射的map
val movieIdAndName = buildMovies(rawHotMoviesData)
val (movieID, movieName) = movieIdAndName.head
println(movieID + " -> " + movieName)
}

/**
* 获取电影名和id映射的map
*
* 读取rawHotMoviesData,
* 因为rawHotMoviesData的每一行是一条类似 20645098,8.2,小王子 的字符串,
* 需要按照,分割,得到第一个值(电影id)和第三个值(电影名)
*/
def buildMovies(rawHotMoviesData: RDD[String]): scala.collection.Map[Int, String] = {
rawHotMoviesData.flatMap(line => {
val tokens = line.split(',')
if (tokens(0).isEmpty) {
None
} else {
Some((tokens(0).toInt, tokens(2)))
}
}).collectAsMap()
}

/**
* 用户对电影的评分对象封装
*/
def buildUserMovieRatings(rawUserMoviesData: RDD[String]): RDD[MovieRating] = {
rawUserMoviesData.map { line =>
val Array(userID, moviesID, countStr) = line.split(',').map(_.trim)
var count = countStr.toInt
count = if (count == -1) 3 else count
MovieRating(userID, moviesID.toInt, count)
}
}

def model(sc: SparkContext,
rawUserMoviesData: RDD[String],
rawHotMoviesData: RDD[String]): Unit = {
val moviesAndName = buildMovies(rawHotMoviesData)
val bMoviesAndName = sc.broadcast(moviesAndName)
val data = buildUserMovieRatings(rawUserMoviesData)
// 为每个用户id 都分配唯一索引 => (50249730,0)
val userIdToInt: RDD[(String, Long)] = data.map(_.userID).distinct().zipWithUniqueId()
// (0,50249730)
val reverseUserIDMapping: RDD[(Long, String)] = userIdToInt.map({ case (l, r) => (r, l) })

// println(data.collect().mkString(" "))
// println(userIdToInt.collect().mkString(" "))
// println(reverseUserIDMapping.collect().mkString(" "))
val userIDMap = userIdToInt.collectAsMap().map { case (n, l) => (n, l.toInt) }
val bUserIDMap = sc.broadcast(userIDMap)
val ratings: RDD[Rating] = data.map { r => Rating(bUserIDMap.value(r.userID), r.movieID, r.rating) }.cache()
//使用协同过滤算法建模
//val model = ALS.trainImplicit(ratings, 10, 10, 0.01, 1.0)
val model = ALS.train(ratings, 50, 10, 0.0001)
ratings.unpersist()
println("输出第一个userFeature")
println(model.userFeatures.mapValues(_.mkString(", ")).first())
for (userID <- Array(100, 1001, 10001, 100001, 110000)) {
checkRecommenderResult(userID, rawUserMoviesData, bMoviesAndName, reverseUserIDMapping, model)
}
// unpersist(model)
}

def evaluate( sc: SparkContext,
rawUserMoviesData: RDD[String],
rawHotMoviesData: RDD[String]): Unit = {
val moviesAndName = buildMovies(rawHotMoviesData)
val data = buildUserMovieRatings(rawUserMoviesData)

val userIdToInt: RDD[(String, Long)] =
data.map(_.userID).distinct().zipWithUniqueId()


val userIDMap: scala.collection.Map[String, Int] =
userIdToInt.collectAsMap().map { case (n, l) => (n, l.toInt) }

val bUserIDMap = sc.broadcast(userIDMap)

val ratings: RDD[Rating] = data.map { r =>
Rating(bUserIDMap.value(r.userID), r.movieID, r.rating)
}.cache()

val numIterations = 10

for (rank <- Array(10, 50);
lambda <- Array(1.0, 0.01,0.0001)) {
val model = ALS.train(ratings, rank, numIterations, lambda)

// Evaluate the model on rating data
val usersMovies = ratings.map { case Rating(user, movie, rate) =>
(user, movie)
}
val predictions =
model.predict(usersMovies).map { case Rating(user, movie, rate) =>
((user, movie), rate)
}
val ratesAndPreds = ratings.map { case Rating(user, movie, rate) =>
((user, movie), rate)
}.join(predictions)

val MSE = ratesAndPreds.map { case ((user, movie), (r1, r2)) =>
val err = (r1 - r2)
err * err
}.mean()
println(s"(rank:$rank, lambda: $lambda, Explicit ) Mean Squared Error = " + MSE)
}

for (rank <- Array(10, 50);
lambda <- Array(1.0, 0.01,0.0001);
alpha <- Array(1.0, 40.0)) {
val model = ALS.trainImplicit(ratings, rank, numIterations, lambda, alpha)

// Evaluate the model on rating data
val usersMovies = ratings.map { case Rating(user, movie, rate) =>
(user, movie)
}
val predictions =
model.predict(usersMovies).map { case Rating(user, movie, rate) =>
((user, movie), rate)
}
val ratesAndPreds = ratings.map { case Rating(user, movie, rate) =>
((user, movie), rate)
}.join(predictions)

val MSE = ratesAndPreds.map { case ((user, movie), (r1, r2)) =>
val err = (r1 - r2)
err * err
}.mean()
println(s"(rank:$rank, lambda: $lambda,alpha:$alpha ,implicit ) Mean Squared Error = " + MSE)
}
}

//查看给某个用户的推荐
def checkRecommenderResult(userID: Int,
rawUserMoviesData: RDD[String],
bMoviesAndName: Broadcast[scala.collection.Map[Int, String]],
reverseUserIDMapping: RDD[(Long, String)],
model: MatrixFactorizationModel): Unit = {

val userName = reverseUserIDMapping.lookup(userID).head

val recommendations = model.recommendProducts(userID, 5)
//给此用户的推荐的电影ID集合
val recommendedMovieIDs = recommendations.map(_.product).toSet

//得到用户点播的电影ID集合
val rawMoviesForUser = rawUserMoviesData.map(_.split(',')).
filter { case Array(user, _, _) => user.trim == userName }
val existingUserMovieIDs = rawMoviesForUser.map { case Array(_, movieID, _) => movieID.toInt }.
collect().toSet


println("=============用户" + userName + "点播过的电影名=============")
//点播的电影名
bMoviesAndName.value.filter { case (id, name) => existingUserMovieIDs.contains(id) }.values.foreach(println)

println("=============推荐给用户" + userName + "的电影名=============")
//推荐的电影名
bMoviesAndName.value.filter { case (id, name) => recommendedMovieIDs.contains(id) }.values.foreach(println)
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.wugui.sparkstarter.ml

import org.apache.spark.sql.{Encoders, SparkSession}



object LearnALS {

case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)

def main(args: Array[String]): Unit = {

val spark = SparkSession.builder().appName("ml").master("local").getOrCreate()

val ratings = spark.read.textFile("/Users/huzekang/study/bigdata-starter/spark-starter/src/main/resources/sample_movielens_ratings.txt")
.map(parseRating)(Encoders.product)
.toDF()
ratings.show()

// df 按比例分成训练集和测试集
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))



}

/**
* 解析每行数据返回封装好的Rating对象
*/
def parseRating(str: String): Rating = {
val fields = str.split(":")
assert(fields.size == 4)
Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong)
}
}
Loading

0 comments on commit 84da069

Please sign in to comment.