Skip to content

Commit

Permalink
Merge pull request #441 from lovisek/topic/movie-lens-validation
Browse files Browse the repository at this point in the history
Add validation to movie-lens benchmark
  • Loading branch information
lbulej committed Apr 29, 2024
2 parents 3e55f28 + afb777d commit 65d596e
Showing 1 changed file with 99 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import org.renaissance.Benchmark
import org.renaissance.Benchmark._
import org.renaissance.BenchmarkContext
import org.renaissance.BenchmarkResult
import org.renaissance.BenchmarkResult.Validators
import org.renaissance.BenchmarkResult.Assert
import org.renaissance.BenchmarkResult.ValidationException
import org.renaissance.License
import org.renaissance.apache.spark.ResourceUtil.linesFromUrl
import org.renaissance.apache.spark.ResourceUtil.writeResourceToFile
Expand All @@ -32,24 +33,42 @@ import scala.collection.Map
@Parameter(
name = "als_configs",
defaultValue =
"rmse,rank,lambda,iterations;" +
"3.622, 8,5.00,20;" +
"2.134,10,2.00,20;" +
"1.311,12,1.00,20;" +
"0.992, 8,0.05,20;" +
"1.207,10,0.01,10;" +
"1.115, 8,0.02,10;" +
"0.923,12,0.10,10;" +
"0.898, 8,0.20,10",
summary = "A table of ALS configuration parameters and expected RMSE values."
"rank,lambda,iterations;" +
" 8,5.00,20;" +
"10,2.00,20;" +
"12,1.00,20;" +
" 8,0.05,20;" +
"10,0.01,10;" +
" 8,0.02,10;" +
"12,0.10,10;" +
" 8,0.20,10",
summary = "A table of ALS configuration parameters to try."
)
@Parameter(
name = "top_recommended_movie_count",
defaultValue = "5",
summary = "Number of top recommended movies to check for expected movies during validation."
)
@Parameter(
name = "expected_movie_ids",
defaultValue = "67504,83318,83359,83411,8530",
summary = "Movie identifiers that must (all) be found among the top recommended movies."
)
@Parameter(
name = "expected_best_validation_rmse",
defaultValue = "0.898",
summary = "The expected RMSE achieved by the best model on the validation subset."
)
@Configuration(
name = "test",
settings = Array(
"input_file = /ratings-small.csv",
"als_configs = " +
"rmse,rank,lambda,iterations;" +
"1.086,8,0.20,10"
"rank,lambda,iterations;" +
"8,0.20,10",
"top_recommended_movie_count = 2",
"expected_movie_ids = 1254",
"expected_best_validation_rmse = 1.086"
)
)
@Configuration(name = "jmh")
Expand All @@ -70,14 +89,21 @@ final class MovieLens extends Benchmark with SparkUtil {

private val helper = new MovieLensHelper

/** Holds ALS parameters and expected RMSE on validation data. */
case class AlsConfig(rank: Int, lambda: Double, iterations: Int, rmse: Double)
private var topRecommendedMovieCount: Int = _

private var expectedMovieIds: Seq[Int] = _

private var expectedBestValidationRmse: Double = _

/** Holds ALS training configuration. */
case class AlsConfig(rank: Int, lambda: Double, iterations: Int)

class MovieLensHelper {
var movies: Map[Int, String] = _
var ratings: RDD[(Long, Rating)] = _
var personalRatings: Seq[Rating] = _
var personalRatingsRDD: RDD[Rating] = _
var personalRatingsUserId: Int = _
var training: RDD[Rating] = _
var validation: RDD[Rating] = _
var test: RDD[Rating] = _
Expand Down Expand Up @@ -108,15 +134,15 @@ final class MovieLens extends Benchmark with SparkUtil {
// Get only entries with positive rating.
val lines = sparkContext.parallelize(linesFromUrl(url))
val ratings = parseRatingsCsvLines(lines).values.filter { _.rating > 0.0 }
assume(!ratings.isEmpty(), "collection of personal ratings is not empty!")

if (ratings.isEmpty()) {
// TODO Fail the benchmark here.
sys.error("No ratings provided.")
} else {
personalRatings = ratings.collect().toSeq
}
val positiveRatings = ratings.collect().toSeq
val userIds = positiveRatings.map(_.user).distinct
assume(userIds.length == 1, "personal ratings come from a single user!")

personalRatings = positiveRatings
personalRatingsRDD = ensureCached(ratings)
personalRatingsUserId = userIds.head
}

def loadRatings(file: Path) = {
Expand All @@ -141,6 +167,7 @@ final class MovieLens extends Benchmark with SparkUtil {
}

def splitRatings(trainingThreshold: Int, validationThreshold: Int) = {
// Merge personal ratings into training data set and cache them.
training = ensureCached(
ratings
.filter(x => x._1 < trainingThreshold)
Expand All @@ -150,7 +177,9 @@ final class MovieLens extends Benchmark with SparkUtil {
numTraining = training.count()

validation = ensureCached(
ratings.filter(x => x._1 >= trainingThreshold && x._1 < validationThreshold).values
ratings
.filter(x => x._1 >= trainingThreshold && x._1 < validationThreshold)
.values
)
numValidation = validation.count()

Expand All @@ -159,10 +188,7 @@ final class MovieLens extends Benchmark with SparkUtil {
)
numTest = test.count()

println(
"Training: " + numTraining + ", validation: " + numValidation + ", test: "
+ numTest
)
println(s"Training: $numTraining, validation: $numValidation, test: $numTest")
}

def trainModels(configs: Iterable[AlsConfig]) = {
Expand Down Expand Up @@ -211,28 +237,16 @@ final class MovieLens extends Benchmark with SparkUtil {
)

val improvement = (baselineRmse - testRmse) / baselineRmse * 100
println("The best model improves the baseline by " + "%1.2f".format(improvement) + "%.")

// Make personalized recommendations.
println(f"The best model improves the baseline by $improvement%.2f%%.")

val myRatedMovieIds = personalRatings.map(_.product).toSet
val candidates =
sparkContext.parallelize(movies.keys.filter(!myRatedMovieIds.contains(_)).toSeq)
// Make personalized recommendations for movies not rated by the user.

val recommendations = bestModel.get
.predict(candidates.map((0, _)))
.collect()
.sortBy(-_.rating)
.take(50)

var i = 1
println("Movies recommended for you:")
recommendations.foreach { r =>
println("%2d".format(i) + ": " + movies(r.product))
i += 1
}
val ratedMovieIds = personalRatings.map(_.product).toSet
val candidates = sparkContext.parallelize(
movies.keys.filter(!ratedMovieIds.contains(_)).toSeq.map((personalRatingsUserId, _))
)

recommendations
bestModel.get.predict(candidates).collect()
}

/** Compute RMSE (Root Mean Squared Error). */
Expand All @@ -249,6 +263,11 @@ final class MovieLens extends Benchmark with SparkUtil {
override def setUpBeforeAll(bc: BenchmarkContext): Unit = {
import scala.jdk.CollectionConverters._

// Validation parameters.
topRecommendedMovieCount = bc.parameter("top_recommended_movie_count").toInteger
expectedMovieIds = bc.parameter("expected_movie_ids").toList(_.toInt).asScala.toSeq
expectedBestValidationRmse = bc.parameter("expected_best_validation_rmse").toDouble

//
// Without a checkpoint directory set, JMH runs of this
// benchmark in Travis CI tend to crash with stack overflow.
Expand All @@ -265,16 +284,15 @@ final class MovieLens extends Benchmark with SparkUtil {
AlsConfig(
m.get("rank").toInt,
m.get("lambda").toDouble,
m.get("iterations").toInt,
m.get("rmse").toDouble
m.get("iterations").toInt
)
)
.asScala

loadData(bc.scratchDirectory())

// Split ratings into train (60%), validation (20%), and test (20%) based on the
// last digit of the timestamp, add myRatings to train, and cache them.
// Split ratings into training (~60%), validation (~20%), and test (~20%)
// data sets based on the last digit of a rating's timestamp.
helper.splitRatings(6, 8)
}

Expand All @@ -291,10 +309,39 @@ final class MovieLens extends Benchmark with SparkUtil {

override def run(bc: BenchmarkContext): BenchmarkResult = {
helper.trainModels(alsConfigurations)
val recommendations = helper.recommendMovies()

// TODO: add proper validation
Validators.dummy(recommendations)
val topRecommended = helper
.recommendMovies()
.sortBy(r => (-r.rating, r.product))
.take(topRecommendedMovieCount)

println(s"Top recommended movies for user id ${helper.personalRatingsUserId}:")
topRecommended.zipWithIndex.foreach {
case (r: Rating, i: Int) =>
println(
f"${i + 1}%2d: ${helper.movies(r.product)}%s (rating: ${r.rating}%.3f, id: ${r.product}%d)"
)
}

() => validate(topRecommended)
}

private def validate(recommendedMovies: Array[Rating]): Unit = {
val recommendedMovieIds = recommendedMovies.map(_.product)
expectedMovieIds.foreach(expectedId => {
if (!recommendedMovieIds.contains(expectedId)) {
throw new ValidationException(
s"Expected ${recommendedMovies.length} top-rated movies to contain movie with id $expectedId"
)
}
})

Assert.assertEquals(
expectedBestValidationRmse,
helper.bestValidationRmse,
0.005,
"Best model RMSE on the validation set"
)
}

override def tearDownAfterAll(bc: BenchmarkContext): Unit = {
Expand Down

0 comments on commit 65d596e

Please sign in to comment.