Spark GraphXチュートリアル– ApacheSparkでのグラフ分析



このGraphXチュートリアルブログでは、Apache Spark GraphX、その機能、およびフライトデータ分析プロジェクトを含むコンポーネントを紹介します。

GraphX は、グラフおよびグラフ並列計算用のApacheSparkのAPIです。 GraphXは、ETL(Extract、Transform&Load)プロセス、探索的分析、および反復グラフ計算を単一のシステム内に統合します。グラフの使用法は、Facebookの友達、LinkedInの接続、インターネットのルーター、天体物理学の銀河と星の関係、およびGoogleの地図で見ることができます。グラフ計算の概念は非常に単純に見えますが、グラフのアプリケーションは文字通り無限であり、ほんの数例を挙げると、災害検出、銀行、株式市場、銀行、地理システムでの使用例があります。このAPIの使用法を学ぶことは、 このブログでは、Spark GraphXの概念、その機能とコンポーネントを例を通して学び、GraphXを使用したFlight DataAnalyticsの完全なユースケースについて説明します。

このSparkGraphXブログでは、次のトピックについて説明します。





  1. グラフとは何ですか?
  2. グラフ計算のユースケース
  3. Spark GraphXとは何ですか?
  4. SparkGraphXの機能
  5. 例を使用してGraphXを理解する
  6. ユースケース–GraphXを使用したフライトデータ分析

グラフとは何ですか?

グラフは、オブジェクトのいくつかのペアが何らかの意味で関連しているオブジェクトのセットに相当する数学的構造です。これらの関係は、グラフを形成するエッジと頂点を使用して表すことができます。頂点はオブジェクトを表し、エッジはそれらのオブジェクト間のさまざまな関係を示します。

グラフの概念-SparkGraphXチュートリアル-Edureka図: Spark GraphXチュートリアル–グラフの頂点、エッジ、トリプレット



コンピュータサイエンスでは、グラフは、数学、特にグラフ理論の分野からの無向グラフと有向グラフの概念を実装することを目的とした抽象データ型です。グラフデータ構造は、各エッジに関連付けられる場合もあります。 エッジ値 、シンボリックラベルや数値属性(コスト、容量、長さ、等。)。

グラフ計算のユースケース

次のユースケースは、グラフの計算の見通しと、グラフを使用して他のソリューションを実装するためのさらなる範囲を示しています。

  1. 災害検知システム

    グラフを使用して、ハリケーン、地震、津波、山火事、火山などの災害を検出し、人々に警告することができます。



  2. ページランク ページランクは、紙の引用ネットワークやソーシャルメディアネットワークなどのネットワークで影響力のある人物を見つけるために使用できます。
  3. 金融詐欺の検出

    グラフ分析を使用して、金融取引を監視し、金融詐欺やマネーロンダリングに関与している人々を検出できます。

  4. ビジネス分析

    グラフを機械学習と併用すると、顧客の購入傾向を理解するのに役立ちます。例えば。ユーバー、マクドナルドなど

  5. 地理情報システム

    グラフは、流域の描写や天気予報などの地理情報システムの機能を開発するために集中的に使用されます。

  6. グーグルプレゲル

    Pregelは、任意のグラフを表現するのに十分な柔軟性を備えたAPIを備えたGoogleのスケーラブルでフォールトトレラントなプラットフォームです。アルゴリズム。

Spark GraphXとは何ですか?

GraphX グラフおよびグラフ並列計算用のSparkAPIです。これには、グラフ分析タスクを簡素化するためのグラフアルゴリズムとビルダーのコレクションが増えています。


GraphXは、復元力のある分散プロパティグラフを使用してSparkRDDを拡張します。
プロパティグラフは、複数のエッジを並列に持つことができる有向マルチグラフです。すべてのエッジと頂点には、ユーザー定義のプロパティが関連付けられています。平行なエッジにより、複数の同じ頂点間の関係。

SparkGraphXの機能

SparkGraphXの機能は次のとおりです。

  1. 柔軟性
    Spark GraphXは、グラフと計算の両方で機能します。 GraphXは、ETL(抽出、変換、読み込み)、探索的分析、反復グラフ計算を1つのシステムに統合します。グラフとコレクションの両方と同じデータを表示し、グラフをRDDで効率的に変換および結合し、PregelAPIを使用してカスタムの反復グラフアルゴリズムを作成できます。
  2. 速度
    Spark GraphXは、最速の特殊なグラフ処理システムに匹敵するパフォーマンスを提供します。 Sparkの柔軟性、フォールトトレランス、使いやすさを維持しながら、最速のグラフシステムに匹敵します。
  3. 成長するアルゴリズムライブラリ
    SparkGraphXが提供しなければならないグラフアルゴリズムの増え続けるライブラリから選択できます。一般的なアルゴリズムには、ページランク、連結成分、ラベル伝播、SVD ++、強連結成分、および三角形の数。

例を使用してGraphXを理解する

ここで、例を使用してSparkGraphXの概念を理解します。下の画像に示すような単純なグラフを考えてみましょう。

図: Spark GraphXチュートリアル–グラフの例

グラフを見ると、人(頂点)と人との関係(辺)に関​​する情報を抽出できます。ここのグラフは、TwitterユーザーとそのユーザーがTwitterでフォローしているユーザーを表しています。例:ボブはツイッターでダビデとアリスをフォローしています。

ApacheSparkを使用して同じことを実装しましょう。まず、GraphXに必要なクラスをインポートします。

//必要なクラスをインポートするimportorg.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.util.IntParam import org.apache.spark.graphx._ import org.apache.spark .graphx.util.GraphGenerators

頂点の表示さらに、ユーザー(頂点)のすべての名前と年齢を表示します。

valvertexRDD:RDD [(Long、(String、Int))] = sc.parallelize(vertexArray)val edgeRDD:RDD [Edge [Int]] = sc.parallelize(edgeArray)val graph:Graph [(String、Int)、 Int] = Graph(vertexRDD、edgeRDD)graph.vertices.filter {case(id、(name、age))=> age> 30} .collect.foreach {case(id、(name、age))=> println( s '$ name is $ age')}

上記のコードの出力は次のとおりです。

デビッドです42 フランです50 エドです55 チャーリーです65

エッジの表示 :Twitterで誰が誰を好きか見てみましょう。

for(トリプレット<- graph.triplets.collect) { println(s'${triplet.srcAttr._1} likes ${triplet.dstAttr._1}') } 

上記のコードの出力は次のとおりです。

ボブ好きアリス ボブ好きデビッド チャーリー好きボブ チャーリー好きフラン デビッド好きアリス エド好きボブ エド好きチャーリー エド好きフラン

GraphXの基本を理解したので、もう少し深く掘り下げて、同じ上でいくつかの高度な計算を実行しましょう。

フォロワー数 :グラフ内のすべてのユーザーには、異なる数のフォロワーがいます。すべてのユーザーのすべてのフォロワーを見てみましょう。

例を含むSQLサーバーの部分文字列
//ユーザープロパティをより明確にモデル化するためのクラスの定義caseclass User(name:String、age:Int、inDeg:Int、outDeg:Int)//ユーザーグラフの作成val initialUserGraph:Graph [User、Int] = graph。 mapVertices {case(id、(name、age))=> User(name、age、0、0)} //次数情報の入力valuserGraph = initialUserGraph.outerJoinVertices(initialUserGraph.inDegrees){case(id、u、 inDegOpt)=> User(u.name、u.age、inDegOpt.getOrElse(0)、u.outDeg)} .outerJoinVertices(initialUserGraph.outDegrees){case(id、u、outDegOpt)=> User(u.name、 u.age、u.inDeg、outDegOpt.getOrElse(0))} for((id、property)<- userGraph.vertices.collect) { println(s'User $id is called ${property.name} and is liked by ${property.inDeg} people.') } 

上記のコードの出力は次のとおりです。

ユーザー 1と呼ばれるアリスに好かれています2 ユーザー 2と呼ばれるボブに好かれています2 ユーザー 3と呼ばれるチャーリーに好かれています1 ユーザー 4と呼ばれるデビッドに好かれています1 ユーザー 5と呼ばれるエドに好かれています0 ユーザー 6と呼ばれるフランに好かれています2

最古のフォロワー :フォロワーを特性別に並べ替えることもできます。各ユーザーの最も古いフォロワーを年齢別に見つけましょう。

//各ユーザーの最も古いフォロワーを見つけるvaloldestFollower:VertexRDD [(String、Int)] = userGraph.mapReduceTriplets [(String、Int)](//各エッジについて、ソースの属性を使用して宛先頂点にメッセージを送信します頂点エッジ=> Iterator((edge.dstId、(edge.srcAttr.name、edge.srcAttr.age)))、//メッセージを結合するには、古いフォロワー(a、b)のメッセージを取得します=> if(a。 _2> b._2)a else b)

上記のコードの出力は次のとおりです。

デビッドの最古のフォロワーですアリス チャーリーの最古のフォロワーですボブ エドの最古のフォロワーですチャーリー ボブの最古のフォロワーですデビッド エドフォロワーがいない チャーリーの最古のフォロワーですフラン 

ユースケース:SparkGraphXを使用したフライトデータ分析

Spark GraphXのコアコンセプトを理解したので、GraphXを使用して実際の問題を解決しましょう。これにより、将来のSparkプロジェクトに自信を持って取り組むことができます。

問題文Spark GraphXを使用してリアルタイムフライトデータを分析するには、ほぼリアルタイムの計算結果を提供し、Google DataStudioを使用して結果を視覚化します。

ユースケース–実行する計算

  1. 飛行ルートの総数を計算する
  2. 最長飛行ルートを計算して並べ替える
  3. 頂点が最も高い空港を表示します
  4. PageRankに従って最も重要な空港を一覧表示します
  5. フライトコストが最も低いルートを一覧表示します

上記の計算にはSparkGraphXを使用し、Google DataStudioを使用して結果を視覚化します。

ユースケース–データセット

図: ユースケース–米国のフライトデータセット

ユースケース–フロー図

次の図は、フライトデータ分析に含まれるすべてのステップを明確に説明しています。

図: ユースケース– SparkGraphXを使用したフライトデータ分析のフロー図

ユースケース–Sparkの実装

次に、Eclipse IDE forSparkを使用してプロジェクトを実装しましょう。

以下の擬似コードを見つけてください。

//必要なクラスをインポートするimportorg.apache.spark._ ... import java.io.File object Airport {def main(args:Array [String]){//ケースクラスの作成FlightケースクラスFlight(dofM: String、dofW:String、...、dist:Int)// Flightクラスへの入力を解析するためのParseString関数の定義defparseFlight(str:String):Flight = {val line = str.split( '、')Flight (line(0)、line(1)、...、line(16).toInt)} val conf = new SparkConf()。setAppName( 'airport')。setMaster( 'local [2]')val sc = new SparkContext(conf)//データをRDDにロードval textRDD = sc.textFile( '/ home / edureka / usecases / airport / airportdataset.csv')// CSV行のRDDをフライトクラスvalのRDDに解析しますflightRDD = ParseFlightをテキストRDDにマップ// IDと名前を使用して空港RDDを作成valairports = FlightOriginIDとOriginairports.take(1)をマップ// nowhereというデフォルトの頂点を定義し、printlnsの空港IDをマッピングval nowhere = 'nowhere' val AirportMap =マップ関数を使用.collect.toList.toMap //ルートRDDを作成sourceID、destinationID、distancevalルート= flightsRDDを使用します。マップ関数を使用します。distinctroutes.take(2)// sourceID、destinationID、distancevalでエッジRDDを作成します。edges= routes.map {(Map OriginID and DestinationID)=> Edge(org_id.toLong、dest_id.toLong、distance)} Edges.take(1)//グラフを定義し、いくつかの頂点とエッジを表示しますval graph = Graph(Airports、Edges and Nowhere)graph.vertices.take(2)graph.edges.take(2)//クエリ1-検索空港の総数valnumairports =頂点数//クエリ2-ルートの総数を計算しますか? val numroutes =エッジの数//クエリ3-距離が1000マイルを超えるルートを計算しますgraph.edges.filter {エッジの距離を取得)=>距離> 1000} .take(3)//同様にScalaコードを記述しますクエリの下//クエリ4-最長ルートを並べ替えて印刷します//クエリ5-空港の発着便の最高次の頂点を表示します//クエリ6-ID10397および12478の空港名を取得します//クエリ7-検索到着便が最も多い空港//クエリ8-出発便が最も多い空港を検索する//クエリ9-PageRankに従って最も重要な空港を検索する//クエリ10-ランク付けして空港を並べ替える//クエリ11-最も多く表示する重要な空港//クエリ12-フライトコストが最も低いルートを見つける//クエリ13-空港とそのフライトコストが最も低いルートを見つける//クエリ14-空港コードと並べ替えられた最低フライトコストを表示する

ユースケース–結果の視覚化

Google DataStudioを使用して分析を視覚化します。 Google Data Studioは、Google Analytics 360Suiteの製品です。ジオマップサービスを使用して、米国の地図上のそれぞれの場所に空港をマッピングし、メトリックの量を表示します。

  1. 空港ごとのフライトの総数を表示します
  2. すべての空港からの目的地ルートのメトリック合計を表示します
  3. 空港ごとのすべてのフライトの合計遅延を表示します

これで、SparkGraphXブログは終了です。あなたがそれを読んで楽しんで、それが有益であるとわかったことを願っています。 ApacheSparkシリーズの次のブログをチェックしてください。 ApacheSparkで市場に対応できるようになります。

以下をお勧めします ApacheSparkトレーニング|フライトデータ分析ビデオ そもそもエドゥレカから:

ApacheSparkトレーニング| SparkGraphXフライトデータ分析|エドゥレカ

質問がありますか?コメント欄にご記入ください。早急にご連絡いたします。

Sparkを学び、Sparkのドメインでキャリアを構築し、RDD、Spark Streaming、SparkSQL、MLlib、GraphX、Scalaを実際のユースケースで使用して大規模なデータ処理を実行する専門知識を構築したい場合は、インタラクティブなライブをチェックしてください-オンライン ここに、 それはあなたの学習期間を通してあなたを導くために24 * 7のサポートが付属しています。