Sparkストリーミングチュートリアル– ApacheSparkを使用した感情分析



このSparkStreamingブログでは、Spark Streaming、その機能とコンポーネントを紹介します。 Twitterを使用した感情分析プロジェクトが含まれています。

Spark Streamingは、コアSpark APIの拡張であり、ライブデータストリームのスケーラブルで高スループットのフォールトトレラントなストリーム処理を可能にします。 Spark Streamingを使用してライブデータをストリーミングでき、処理をリアルタイムで実行できます。 Spark Streamingの増え続けるユーザーベースは、Uber、Netflix、Pinterestなどの世帯名で構成されています。

Javaでのメソッドのオーバーロードとオーバーライド

リアルタイムデータ分析に関しては、Spark Streamingは、高速でライブ処理のためにデータを取り込むための単一のプラットフォームを提供し、 同じであなたのスキルを証明します。このブログを通じて、Spark Streamingのこの新しいエキサイティングなドメインを紹介し、完全なユースケースを紹介します。 Twitterの感情分析 SparkStreamingを使用します。





このブログで取り上げるトピックは次のとおりです。

  1. ストリーミングとは何ですか?
  2. なぜSparkStreamingなのか?
  3. Sparkストリーミングの概要
  4. Sparkストリーミング機能
  5. Sparkストリーミングの基礎
    5.1 ストリーミングコンテキスト
    5.2 DStream
    5.3 キャッシング/永続性
    5.4 アキュムレータ、ブロードキャスト変数、およびチェックポイント
  6. ユースケース–Twitterの感情分析

ストリーミングとは何ですか?

データストリーミングは、データを転送して、安定した連続ストリームとして処理できるようにするための手法です。インターネットの成長に伴い、ストリーミング技術はますます重要になっています。



ストリーミングとは-SparkStreaming-Edureka図: ストリーミングとは何ですか?

なぜSparkStreamingなのか?

Spark Streamingを使用して、Twitter、株式市場、地理システムなどのさまざまなソースからリアルタイムデータをストリーミングし、強力な分析を実行してビジネスを支援できます。

図: なぜSparkStreamingなのか?



Sparkストリーミングの概要

Sparkストリーミング リアルタイムストリーミングデータの処理に使用されます。これは、コアSparkAPIへの便利な追加です。 Spark Streamingは、ライブデータストリームの高スループットでフォールトトレラントなストリーム処理を可能にします。

図: Sparkストリーミングのストリーム

基本的なストリームユニットはDStreamですこれは基本的に、リアルタイムデータを処理するための一連のRDDです。

Sparkストリーミング機能

  1. スケーリング: Spark Streamingは、数百のノードに簡単に拡張できます。
  2. 速度: それは低レイテンシーを実現します。
  3. フォールトトレランス: Sparkにはeを実行する機能があります障害から効率的に回復します。
  4. 統合: Sparkは、バッチおよびリアルタイム処理と統合されています。
  5. ビジネス分析: SparkStreamingはuですビジネス分析で使用できる顧客の行動を追跡するためにsed。

Sparkストリーミングワークフロー

Spark Streamingワークフローには、4つの高レベルの段階があります。 1つは、さまざまなソースからデータをストリーミングすることです。これらのソースは、Akka、Kafka、Flume、AWS、Parquetなどのリアルタイムストリーミング用のストリーミングデータソースです。 2番目のタイプのソースには、静的/バッチストリーミング用のHBase、MySQL、PostgreSQL、Elastic Search、Mongo DB、Cassandraが含まれます。これが発生すると、Sparkを使用してMLlibAPIを介してデータに対して機械学習を実行できます。さらに、Spark SQLは、このデータに対してさらに操作を実行するために使用されます。最後に、ストリーミング出力は、HBase、Cassandra、MemSQL、Kafka、Elastic Search、HDFS、ローカルファイルシステムなどのさまざまなデータストレージシステムに保存できます。

図: Sparkストリーミングの概要

Sparkストリーミングの基礎

  1. ストリーミングコンテキスト
  2. DStream
  3. キャッシング
  4. アキュムレータ、ブロードキャスト変数、およびチェックポイント

ストリーミングコンテキスト

ストリーミングコンテキスト Sparkでデータのストリームを消費します。それは登録します 入力DStream を生成する レシーバー オブジェクト。これは、Spark機能の主要なエントリポイントです。 Sparkは、Twitter、Akka Actor、ZeroMQなど、コンテキストからアクセスできるソースのデフォルトの実装をいくつか提供します。

StreamingContextオブジェクトは、SparkContextオブジェクトから作成できます。 SparkContextは、Sparkクラスターへの接続を表し、そのクラスター上にRDD、アキュムレーター、およびブロードキャスト変数を作成するために使用できます。

import org.apache.spark._ import org.apache.spark.streaming._ var ssc = new StreamingContext(sc、Seconds(1))

DStream

離散化されたストリーム (DStream)は、SparkStreamingによって提供される基本的な抽象化です。これは、データの連続ストリームです。これは、入力ストリームを変換することによって生成されたデータソースまたは処理されたデータストリームから受信されます。

図: 入力DStreamから単語を抽出する

内部的には、DStreamは一連の連続したRDDで表され、各RDDには特定の間隔のデータが含まれています。

入力DStreams: 入力DStream ストリーミングソースから受信した入力データのストリームを表すDStreamです。

図: レシーバーは、各バッチにRDDが含まれる入力DStreamにデータを送信します

すべての入力DStreamは、ソースからデータを受信し、処理のためにSparkのメモリに保存するReceiverオブジェクトに関連付けられています。

DStreamsでの変換:

DStreamに適用される操作はすべて、基になるRDDの操作に変換されます。変換により、入力DStreamからのデータをRDDと同様に変更できます。 DStreamsは、通常のSparkRDDで使用可能な変換の多くをサポートします。

図: DStream変換

以下は、DStreamsで一般的な変換の一部です。

地図( func地図( func )ソースDStreamの各要素を関数に渡すことにより、新しいDStreamを返します func。
flatMap( funcflatMap( func )はmap( func )ただし、各入力項目は0個以上の出力項目にマップでき、各ソース要素を関数に渡すことで新しいDStreamを返します。 func。
フィルタ( funcフィルタ( func )ソースDStreamのレコードのみを選択して、新しいDStreamを返します。 func trueを返します。
reduce( funcreduce( func )関数を使用してソースDStreamの各RDDの要素を集約することにより、単一要素RDDの新しいDStreamを返します func
groupBy( funcgroupBy( func )は、基本的にそのグループのキーと対応するアイテムのリストで構成される新しいRDDを返します。

出力DStreams:

出力操作により、DStreamのデータをデータベースやファイルシステムなどの外部システムにプッシュできます。出力操作は、すべてのDStream変換の実際の実行をトリガーします。

図: DStreamsの出力操作

キャッシング

DStreams 開発者がストリームのデータをメモリにキャッシュ/永続化できるようにします。これは、DStream内のデータが複数回計算される場合に役立ちます。これは、 持続() DStreamのメソッド。

図: 2つのノードへのキャッシュ

ネットワーク経由でデータを受信する入力ストリーム(Kafka、Flume、Socketsなど)の場合、デフォルトの永続性レベルは、フォールトトレランスのためにデータを2つのノードに複製するように設定されています。

アキュムレータ、ブロードキャスト変数、およびチェックポイント

アキュムレータ: アキュムレータ は、結合法則と可換法則によってのみ追加される変数です。これらは、カウンターまたは合計を実装するために使用されます。 UIでアキュムレータを追跡すると、実行中のステージの進行状況を理解するのに役立ちます。 Sparkは、数値アキュムレータをネイティブにサポートしています。名前付きまたは名前なしのアキュムレータを作成できます。

ブロードキャスト変数: ブロードキャスト変数 プログラマーが読み取り専用変数のコピーをタスクと一緒に出荷するのではなく、各マシンにキャッシュしておくことができるようにします。これらを使用して、すべてのノードに大きな入力データセットのコピーを効率的に提供できます。 Sparkはまた、効率的なブロードキャストアルゴリズムを使用してブロードキャスト変数を配布し、通信コストを削減しようとします。

チェックポイント: チェックポイント ゲームのチェックポイントに似ています。 24時間年中無休で実行され、アプリケーションロジックに関係のない障害に対する耐性があります。


図:
チェックポイントの特徴

ユースケース–Twitterの感情分析

Spark Streamingのコアコンセプトを理解したので、SparkStreamingを使用して実際の問題を解決しましょう。

問題文: 危機管理、サービス調整、ターゲットマーケティングのためにリアルタイムの感情を入力するTwitter感情分析システムを設計する。

感情分析のアプリケーション:

  • 映画の成功を予測する
  • 政治キャンペーンの成功を予測する
  • 特定の会社に投資するかどうかを決定する
  • ターゲットを絞った広告
  • 製品とサービスのレビュー

Sparkストリーミングの実装:

以下の擬似コードを見つけてください。

//必要なパッケージをSparkプログラムにインポートしますimportorg.apache.spark.streaming。{Seconds、StreamingContext} import org.apache.spark.SparkContext._ ... import java.io.File object twitterSentiment {def main(args :Array [String]){if(args.length<4) { System.err.println('Usage: TwitterPopularTags ' + ' []') System.exit(1) } StreamingExamples.setStreamingLogLevels() //Passing our Twitter keys and tokens as arguments for authorization val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4) val filters = args.takeRight(args.length - 4) // Set the system properties so that Twitter4j library used by twitter stream // Use them to generate OAuth credentials System.setProperty('twitter4j.oauth.consumerKey', consumerKey) ... System.setProperty('twitter4j.oauth.accessTokenSecret', accessTokenSecret) val sparkConf = new SparkConf().setAppName('twitterSentiment').setMaster('local[2]') val ssc = new Streaming Context val stream = TwitterUtils.createStream(ssc, None, filters) //Input DStream transformation using flatMap val tags = stream.flatMap { status =>ハッシュタグからテキストを取得} // sortByを使用してRDD変換し、関数tags.countByValue()。foreachRDD {rdd => val now =各ツイートの現在時刻を取得rdd.sortBy(_._ 2).map(x => (x、now))//出力を〜/ twitter /ディレクトリに保存.saveAsTextFile(s'〜 / twitter / $ now ')} //フィルターおよびマップ関数を使用したDStream変換valtweets = stream.filter {t => valタグ= t。スペースの分割.filter(_。startsWith( '#'))。小文字に変換tags.exists {x => true}} val data = Tweets.map {status => val sentiment = SentimentAnalysisUtils.detectSentiment(status.getText)val tags = status.getHashtagEntities.map(_。getText.toLowerCase) (status.getText、sentiment.toString、tagss.toString())} data.print()//出力を〜/に保存し、ファイル名はtwitters data.saveAsTextFiles( '〜/ twitters'、 '20000')sscのように開始します。 start()ssc.awaitTermination()}}

結果:

以下は、Twitter SentimentStreamingプログラムの実行中にEclipseIDEに表示される結果です。

図: EclipseIDEでの感情分析出力

スクリーンショットでわかるように、すべてのツイートは、ツイートの内容の感情に応じて、ポジティブ、ニュートラル、ネガティブに分類されます。

ツイートの感情の出力は、作成された時間に応じてフォルダーとファイルに保存されます。この出力は、必要に応じてローカルファイルシステムまたはHDFSに保存できます。出力ディレクトリは次のようになります。

図: 「twitter」プロジェクトフォルダ内の出力フォルダ

ここで、twitterディレクトリ内に、以下に示すように、Twitterユーザーのユーザー名とすべてのツイートのタイムスタンプを見つけることができます。

図: タイムスタンプ付きのTwitterユーザー名を含む出力ファイル

Twitterのユーザー名とタイムスタンプを取得したので、メインディレクトリに保存されているセンチメントとツイートを見てみましょう。ここでは、すべてのツイートの後に感情的な感情が続きます。保存されたこの感情は、企業による膨大な数の洞察を分析するためにさらに使用されます。

図: 感情のあるツイートを含む出力ファイル

コードの調整:

それでは、コードを少し変更して、特定のハッシュタグ(トピック)に対する感情を取得しましょう。現在、米国大統領のドナルド・トランプは、ニュースチャンネルやオンラインソーシャルメディアでトレンドになっています。キーワード「」に関連する感情を見てみましょう。 トランプ ‘。

図: 「トランプ」キーワードを使用したツイートに対する感情分析の実行

先に進む:

感情分析のデモンストレーションからわかるように、「トランプ」の場合と同じように、特定のトピックの感情を抽出できます。同様に、Sentiment Analyticsは、世界中の企業による危機管理、サービス調整、およびターゲットマーケティングで使用できます。

感情分析にSparkStreamingを使用している企業は、同じアプローチを適用して次のことを実現しています。

  1. カスタマーエクスペリエンスの向上
  2. 競争上の優位性を獲得する
  3. ビジネスインテリジェンスの獲得
  4. 負けたブランドの活性化

これで、これで終わりです Sparkストリーミングチュートリアル ブログ。これまでに、SparkStreamingとは何かをしっかりと理解している必要があります。 Twitter Sentiment Analysisのユースケースは、SparkStreamingとApacheSparkで遭遇する将来のプロジェクトに取り組むために必要な自信を与えてくれます。練習はあらゆる主題を習得するための鍵であり、このブログがApacheSparkについてさらに探求するのに十分な関心を生み出したことを願っています。

まず、Edurekaの次のSpark StreamingYouTubeチュートリアルをお勧めします。

Sparkストリーミング| Twitterの感情分析の例|エドゥレカ

Sparkチュートリアルに関するこのビデオシリーズは、コンポーネントの完全な背景と、次のような実際のユースケースを提供します。 Twitterの感情分析NBAゲーム予測分析地震検知システムフライトデータ分析 そして 映画レコメンデーションシステム 。私たちは、コードを実行するすべての人に総合的な専門知識を提供するように、ユースケースを個人的に設計しました。

質問がありますか?コメント欄にご記入ください。早急にご連絡いたします。 Sparkを学び、Sparkのドメインでキャリアを構築し、RDD、Spark Streaming、SparkSQL、MLlib、GraphX、Scalaを実際のユースケースで使用して大規模なデータ処理を実行する専門知識を構築したい場合は、インタラクティブなライブをチェックしてくださいオンライン ここに、 それはあなたの学習期間を通してあなたを導くために24 * 7のサポートが付属しています。