ApacheSparkストリーミングでの累積ステートフル変換



このブログ投稿では、SparkStreamingのステートフルトランスフォーメーションについて説明しています。 HadoopSparkキャリアの累積追跡とアップスキルについてすべて学びます。

PrithvirajBoseによる寄稿

以前のブログでは、Apache SparkStreamingのウィンドウ処理の概念を使用したステートフル変換について説明しました。あなたはそれを読むことができます ここに





この投稿では、Apache SparkStreamingでの累積的なステートフル操作について説明します。 Spark Streamingを初めて使用する場合は、ウィンドウ処理の仕組みを理解するために、以前のブログを読むことを強くお勧めします。

Sparkストリーミングにおけるステートフルトランスフォーメーションのタイプ(続き…)

>累積追跡

私たちは使用していました reduceByKeyAndWindow(…) キーの状態を追跡するためのAPI。ただし、ウィンドウ処理は特定のユースケースに制限をもたらします。キーの状態を時間枠に制限するのではなく、全体を通して蓄積したい場合はどうなりますか?その場合、使用する必要があります updateStateByKey(…) 火。



このAPIはSpark1.3.0で導入され、非常に人気があります。ただし、このAPIにはパフォーマンスのオーバーヘッドがあり、状態のサイズが時間の経過とともに大きくなると、パフォーマンスが低下します。このAPIの使用法を示すサンプルを作成しました。あなたはコードを見つけることができます ここに

Spark1.6.0は新しいAPIを導入しました mapWithState(…) これにより、パフォーマンスのオーバーヘッドが発生します。 updateStateByKey(…) 。このブログでは、私が作成したサンプルプログラムを使用して、この特定のAPIについて説明します。あなたはコードを見つけることができます ここに

コードのウォークスルーに飛び込む前に、チェックポイントについて少し説明しましょう。ステートフルトランスフォーメーションでは、チェックポイントが必須です。チェックポインティングは、ドライバプログラムが失敗した場合にキーの状態を復元するメカニズムです。ドライバが再起動すると、キーの状態がチェックポイントファイルから復元されます。チェックポイントの場所は通常、HDFSまたはAmazon S3、あるいは信頼できるストレージです。コードをテストしている間、ローカルファイルシステムに保存することもできます。



サンプルプログラムでは、host = localhostおよびport = 9999でソケットテキストストリームをリッスンします。これは、着信ストリームを(単語、出現回数)にトークン化し、1.6.0APIを使用して単語数を追跡します。 mapWithState(…) 。さらに、更新されていないキーは、を使用して削除されます StateSpec.timeout API。 HDFSでチェックポイントを設定しており、チェックポイントの頻度は20秒ごとです。

Javaでxmlファイルを解析します

まず、Sparkストリーミングセッションを作成しましょう。

Spark-streaming-session

作成します checkpointDir HDFSで、オブジェクトメソッドを呼び出します getOrCreate(…) 。ザ・ getOrCreate APIは checkpointDir 復元する以前の状態があるかどうかを確認するために、それが存在する場合は、Spark Streamingセッションを再作成し、ファイルに保存されているデータからキーの状態を更新してから、新しいデータに進みます。それ以外の場合は、新しいSparkStreamingセッションが作成されます。

ザ・ getOrCreate チェックポイントディレクトリ名と関数(名前を付けました)を受け取ります createFunc その署名は ()=> StreamingContext

内部のコードを調べてみましょう createFunc

行#2:「TestMapWithStateJob」のジョブ名とバッチ間隔= 5秒のストリーミングコンテキストを作成します。

5行目:チェックポイントディレクトリを設定します。

8行目:クラスを使用して状態仕様を設定します org.apache.streaming.StateSpec オブジェクト。最初に状態を追跡する関数を設定し、次に、後続の変換中に生成される結果のDStreamのパーティション数を設定します。最後に、タイムアウト(30秒)を設定します。30秒以内にキーの更新が受信されない場合、キーの状態は削除されます。

12行目:ソケットストリームを設定し、受信バッチデータをフラット化し、キーと値のペアを作成し、呼び出します mapWithState 、チェックポイント間隔を20秒に設定し、最後に結果を出力します。

Sparkフレームワークはthを呼び出します e createFunc 以前の値と現在の状態を持つすべてのキーに対して。合計を計算し、累積合計で状態を更新し、最後にキーの合計を返します。

SQLで部分文字列を使用する方法

Githubソース-> TestMapStateWithKey.scalaTestUpdateStateByKey.scala

質問がありますか?コメント欄にご記入ください。折り返しご連絡いたします。

関連記事:

Apache Spark&Scalaを使い始める

Sparkストリーミングでのウィンドウ処理によるステートフル変換