/ / Filtrujte RDD na základe počtu výskytov - scala, apache-spark, rdd, apache-spark-mllib

Filtrovanie RDD na základe počtu výskytov - scala, apache-spark, rdd, apache-spark-mllib

Mám RDD hodnotení výrobkov pomocou MLlibObjekt hodnotenia, ktorý predstavuje iba n-ticu (int userId, int productId, dvojité hodnotenie). Chcem z RDD odstrániť akýkoľvek prvok, ktorý predstavuje recenziu produktu s príliš nízkym počtom hodnotení.

Napríklad RDD môže byť toto:

Rating(35, 1, 5.0)
Rating(18, 1, 4.0)
Rating(29, 2, 3.0)
Rating(12, 2, 2.0)
Rating(65, 3, 1.0)

a ak som to prefiltroval, aby som odstránil akýkoľvek produkt pomocoumenej ako 2 recenzie, odfiltrovalo by to len posledné hodnotenie a vrátilo by prvé štyri. (Chcem filtrovať s oveľa vyšším minimálnym počtom recenzií ako 2, ale napríklad).

Momentálne mám tento kód, z ktorého vychádza apostupnosť ID produktov v poradí podľa počtu hodnotení, ale nebol som si istý, ako na základe toho odfiltrovať z hlavného RDD a zdá sa mi to aj tak neúčinné:

val mostRated = ratings.map(_._2.product)
.countByValue
.toSeq
.sortBy(- _._2)
.map(_._1)

odpovede:

0 pre odpoveď č. 1

Môžete zoskupiť DDD podľa Identifikačné číslo produktu a potom ho prefiltrujte podľa toho, či je dĺžka skupiny väčšia ako prahová hodnota (1 tu). Použite flatMap extrahovať výsledky zo zoskupenia DDD:

case class Rating(UserId: Int, ProductId: Int, Rating: Double)

val ratings = sc.parallelize(Seq(Rating(35, 1, 5.0),
Rating(18, 1, 4.0),
Rating(29, 2, 3.0),
Rating(12, 2, 2.0),
Rating(65, 3, 1.0)))

val prodMinCounts = ratings.groupBy(_.ProductId).
filter(_._2.toSeq.length > 1).
flatMap(_._2)
prodMinCounts.collect
// res14: Array[Rating] = Array(Rating(35,1,5.0), Rating(18,1,4.0), Rating(29,2,3.0), Rating(12,2,2.0))