次のように、キー/値ペアの大きなリストを効率的に変換する必要があります。
val providedData = List(
(new Key("1"), new Val("one")),
(new Key("1"), new Val("un")),
(new Key("1"), new Val("ein")),
(new Key("2"), new Val("two")),
(new Key("2"), new Val("deux")),
(new Key("2"), new Val("zwei"))
)
キーごとの値のリストに、次のように記述します。
val expectedData = List(
(new Key("1"), List(
new Val("one"),
new Val("un"),
new Val("ein"))),
(new Key("2"), List(
new Val("two"),
new Val("deux"),
new Val("zwei")))
)
キーの値のペアは、大きなキー/値からのものですストア(Accumulo)ので、キーはソートされますが、通常はスパークパーティション境界を越えます。キーごとに数百万の鍵と数百の値が存在する可能性があります。
私はこの仕事のための正しいツールが火花だと思うcombineByKey操作ではなく、汎用の型(Intなど)を持つ簡潔な例しか見つけることができず、上記のようなユーザー定義型に一般化できませんでした。
私は多くの人が同じものを持つと思うので上記のようにcombineByKeyをユーザ定義型で使用するためのスカラ構文の完全指定(冗長)と簡潔な例の両方を提供できることを願っています。
回答:
回答№1は4私はSparkのエキスパートではありませんが、 この質問私は次のことができると思います:
val rdd = sc.parallelize(providedData)
rdd.combineByKey(
// createCombiner: add first value to a list
(x: Val) => List(x),
// mergeValue: add new value to existing list
(acc: List[Val], x) => x :: acc,
// mergeCominber: combine the 2 lists
(acc1: List[Val], acc2: List[Val]) => acc1 ::: acc2
)
使用 aggregateByKey
:
rdd.aggregateByKey(List[Val]())(
(acc, x) => x :: acc,
(acc1, acc2) => acc1 ::: acc2
)