/ / Фільтруйте RDD на основі кількості подій - scala, apache-spark, rdd, apache-spark-mllib

Фільтрувати RDD на основі кількості входів - scala, apache-spark, rdd, apache-spark-mllib

У мене є RDD рейтингів продуктів за допомогою MLlibОб'єкт оцінювання, який є лише кортежем (int userId, int productId, подвійний рейтинг). Я хочу видалити будь-який елемент із RDD, який є оглядом продукту із занадто низькою оцінкою.

Наприклад, RDD може бути таким:

Rating(35, 1, 5.0)
Rating(18, 1, 4.0)
Rating(29, 2, 3.0)
Rating(12, 2, 2.0)
Rating(65, 3, 1.0)

і якщо я фільтрував це, щоб видалити будь-який продуктменше 2 відгуків, це просто відфільтрує останню оцінку і поверне перші чотири. (Я хочу фільтрувати таким чином, що більший мінімальний кількість оглядів, ніж 2, але тільки для прикладу).

В даний час у мене є цей код, який виводить aпослідовність ідентифікаторів продукту відповідно до кількості оцінок, але я не знав способу фільтрації з головного RDD, виходячи з цього, і все одно здається неефективним:

val mostRated = ratings.map(_._2.product)
.countByValue
.toSeq
.sortBy(- _._2)
.map(_._1)

Відповіді:

0 для відповіді № 1

Ви можете згрупувати rdd по ProductId а потім відфільтруйте його, виходячи з того, якщо довжина групи більша за поріг (1 тут). Використовуйте flatMap для отримання результатів із згрупованих rdd:

case class Rating(UserId: Int, ProductId: Int, Rating: Double)

val ratings = sc.parallelize(Seq(Rating(35, 1, 5.0),
Rating(18, 1, 4.0),
Rating(29, 2, 3.0),
Rating(12, 2, 2.0),
Rating(65, 3, 1.0)))

val prodMinCounts = ratings.groupBy(_.ProductId).
filter(_._2.toSeq.length > 1).
flatMap(_._2)
prodMinCounts.collect
// res14: Array[Rating] = Array(Rating(35,1,5.0), Rating(18,1,4.0), Rating(29,2,3.0), Rating(12,2,2.0))