/ /キー/値ペアのリストを、spark - scala、apache-spark、combinersのキーごとの値リストに変換する

キー/値ペアのリストをspark - scala、apache-spark、combinersのキーごとの値のリストに変換する

次のように、キー/値ペアの大きなリストを効率的に変換する必要があります。

val providedData = List(
(new Key("1"), new Val("one")),
(new Key("1"), new Val("un")),
(new Key("1"), new Val("ein")),
(new Key("2"), new Val("two")),
(new Key("2"), new Val("deux")),
(new Key("2"), new Val("zwei"))
)

キーごとの値のリストに、次のように記述します。

val expectedData = List(
(new Key("1"), List(
new Val("one"),
new Val("un"),
new Val("ein"))),
(new Key("2"), List(
new Val("two"),
new Val("deux"),
new Val("zwei")))
)

キーの値のペアは、大きなキー/値からのものですストア(Accumulo)ので、キーはソートされますが、通常はスパークパーティション境界を越えます。キーごとに数百万の鍵と数百の値が存在する可能性があります。

私はこの仕事のための正しいツールが火花だと思うcombineByKey操作ではなく、汎用の型(Intなど)を持つ簡潔な例しか見つけることができず、上記のようなユーザー定義型に一般化できませんでした。

私は多くの人が同じものを持つと思うので上記のようにcombineByKeyをユーザ定義型で使用するためのスカラ構文の完全指定(冗長)と簡潔な例の両方を提供できることを願っています。

回答:

回答№1は4

私はSparkのエキスパートではありませんが、 この質問私は次のことができると思います:

val rdd = sc.parallelize(providedData)

rdd.combineByKey(
// createCombiner: add first value to a list
(x: Val) => List(x),
// mergeValue: add new value to existing list
(acc: List[Val], x) => x :: acc,
// mergeCominber: combine the 2 lists
(acc1: List[Val], acc2: List[Val]) => acc1 ::: acc2
)

使用 aggregateByKey

rdd.aggregateByKey(List[Val]())(
(acc, x) => x :: acc,
(acc1, acc2) => acc1 ::: acc2
)