/ / Przyspiesz wspólne filtrowanie dużych zbiorów danych w Spark MLLib - scala, apache-spark, apache-spark-mllib, wspólne filtrowanie

Przyspiesz wspólne filtrowanie dużego zestawu danych w Spark MLLib - scala, apache-spark, apache-spark-mllib, wspólne filtrowanie

Korzystam z faktoryzacji macierzy MLlibpolecaj produkty użytkownikom. Mam na myśli dużą niejawną matrycę interakcji M = 20 milionów użytkowników i N = 50 000 elementów. Po szkoleniu modelu chcę uzyskać krótką listę (np. 200) zaleceń dla każdego użytkownika. próbowałem recommendProductsForUsers w MatrixFactorizationModel ale jest bardzo bardzo powolna (trwała 9 godzin, ale wciąż daleko od końca.) Testuję z 50 executorami, każdy z pamięcią 8g. Można się tego spodziewać, ponieważ recommendProductsForUsers trzeba obliczyć wszystkie M*N interakcje użytkownika z przedmiotami i uzyskiwanie najwyższego poziomu dla każdego użytkownika.

Spróbuję użyć większej liczby executorów, ale z tego, co widziałem w szczegółach aplikacji w Spark UI, wątpię, że może to skończyć się w ciągu kilku godzin lub nawet, gdy mam 1000 wykonawców (po 9 godzinach nadal jest to flatmap tutaj https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala#L279-L289, 10000 zadań ogółem i tylko ~ 200 zakończonych) Czy są jeszcze inne rzeczy, które mogę dostroić, aby przyspieszyć proces rekomendacji, zwiększając liczbę wykonawców?

Oto przykładowy kod:

val data = input.map(r => Rating(r.getString(0).toInt, r.getString(1).toInt, r.getLong(2))).cache
val rank = 20
val alpha = 40
val maxIter = 10
val lambda = 0.05
val checkpointIterval = 5
val als = new ALS()
.setImplicitPrefs(true)
.setCheckpointInterval(checkpointIterval)
.setRank(rank)
.setAlpha(alpha)
.setIterations(maxIter)
.setLambda(lambda)
val model = als.run(ratings)
val recommendations = model.recommendProductsForUsers(200)
recommendations.saveAsTextFile(outdir)

Odpowiedzi:

1 dla odpowiedzi № 1

@ Jack Lei: Czy znalazłeś odpowiedź na to pytanie? Ja sam próbowałem kilku rzeczy, ale tylko trochę pomogłem.

Na przykład: próbowałem

javaSparkContext.setCheckpointDir("checkpoint/");

Pomaga to uniknąć wielokrotnego obliczania pomiędzy.

Próbowano także dodawać więcej pamięci na pamięć wykonawczą i napowietrzną

--conf spark.driver.maxResultSize=5g --conf spark.yarn.executor.memoryOverhead=4000