Sparkアキュムレータの説明:Apache Spark



このApacheSparkブログでは、Sparkアキュムレータについて詳しく説明しています。例を使用してSparkアキュムレータの使用法を学びます。 Sparkアキュムレータは、HadoopMapreduceカウンターのようなものです。

PrithvirajBoseによる寄稿

これは、Sparkアキュムレータについて知っておく必要のあることに関するブログです。 ほとんどのIT採用担当者が求める重要なスキルであるため、業界でのその成長と需要は、創業以来指数関数的でした。





アキュムレータとは何ですか?

アキュムレータは、エグゼキュータ全体で情報を集約するために使用される変数です。たとえば、この情報は、破損したレコードの数や特定のライブラリAPIが呼び出された回数などのデータまたはAPI診断に関連する場合があります。

アキュムレータが必要な理由を理解するために、小さな例を見てみましょう。



これは、中央コルカタ地域周辺のチェーン店の取引の架空のログです。

logs-Spark-accumulators

4つのフィールドがあります、

フィールド1->都市



フィールド2->地域

フィールド3->販売されたアイテムのカテゴリ

フィールド4->販売されたアイテムの価値

ただし、ログが破損する可能性があります。たとえば、2行目は空白行、4行目はネットワークの問題を報告し、最後の行は売上値がゼロであることを示しています(これは起こり得ません!)。

アキュムレータを使用してトランザクションログを分析し、空白のログ(空白行)の数、ネットワークに障害が発生した回数、カテゴリがない製品、または売上ゼロが記録された回数を確認できます。完全なサンプルログは見つけることができます ここに
アキュムレータは、次のようなすべての操作に適用できます。
1.可換-> f(x、y)= f(y、x) 、および
2.連想-> f(f(x、y)、z)= f(f(x、z)、y)= f(f(y、z)、x)
例えば、 そして 最大 関数は上記の条件を満たすのに対し、関数は上記の条件を満たす。 平均 ではない。

なぜSparkアキュムレータを使用するのですか?

では、なぜアキュムレータが必要なのか、そして以下のコードに示すように変数を使用しないのはなぜですか。

テーブル内のhtmlテーブル

上記のコードの問題は、ドライバーが変数を出力するときに blankLines その値はゼロになります。これは、Sparkがこのコードをすべてのエグゼキューターに出荷すると、変数がそのエグゼキューターに対してローカルになり、更新された値がドライバーに中継されないためです。この問題を回避するには、次のことを行う必要があります blankLines すべてのエグゼキュータのこの変数に対するすべての更新がドライバに中継されるようなアキュムレータ。したがって、上記のコードは次のように記述する必要があります。

これにより、アキュムレータが blankLines はすべてのエグゼキュータで更新され、更新はドライバに中継されます。

ネットワークエラーやゼロセールスバリューなどのために他のカウンターを実装することができます。他のカウンターの実装と一緒に完全なソースコードを見つけることができます ここに

Hadoop Map-Reduceに精通している人は、SparkのアキュムレータがHadoopのMap-Reduceカウンターに似ていることに気付くでしょう。

警告

アキュムレータを使用する場合、プログラマーとして注意する必要のあるいくつかの注意事項があります。

  1. 内部の計算 変換 怠惰に評価されるので、 アクション RDDで発生します 変換 実行されません。この結果、次のような関数内で使用されるアキュムレータ 地図() または フィルタ() いくつかがなければ実行されません アクション RDDで発生します。
  2. Sparkはアキュムレータを更新することを保証します 内部 行動 1回だけ 。そのため、タスクが再開されて系統が再計算された場合でも、アキュムレータは1回だけ更新されます。
  3. Sparkはこれを保証しません 変換 。そのため、タスクを再開して系統を再計算すると、アキュムレータが複数回更新されるときに望ましくない副作用が発生する可能性があります。

安全のため、常にアクション内でのみアキュムレータを使用してください。
コード ここに は、これを実現する方法についてのシンプルで効果的な例を示しています。
アキュムレータの詳細については、 この

質問がありますか?コメントセクションでそれらに言及してください。折り返しご連絡いたします。

関連記事:

ApacheSparkのcombineByKeyの説明