Sparkを使用したRDD:ApacheSparkのビルディングブロック



Sparkを使用したRDDに関するこのブログでは、Sparkの基本単位であるRDDの詳細で包括的な知識とその有用性について説明します。

この言葉自体は、すべてのHadoopエンジニアの心に火花を散らすのに十分です。 nメモリ内 処理ツール これは、クラスターコンピューティングでは非常に高速です。 MapReduceと比較すると、メモリ内のデータ共有によりRDDが作成されます 10-100x もっと早く ネットワークやディスクの共有よりも、RDD(復元力のある分散データセット)により、これがすべて可能になります。 Sparkを使用したこのRDDの記事で今日焦点を当てている重要なポイントは次のとおりです。

RDDが必要ですか?

なぜRDDが必要なのですか?-Sparkを使用したRDD





世界は進化しています そして データサイエンス の進歩のため アルゴリズム に基づく 回帰 そして で実行されます 分散 反復計算 ation 複数のコンピューティングユニット間でのデータの再利用と共有を含む方法。

伝統的な 技術には、次のような安定した中間および分散ストレージが必要でした HDFS データの複製とデータのシリアル化を伴う反復計算で構成されているため、プロセスが大幅に遅くなりました。解決策を見つけることは決して簡単ではありませんでした。



ここが RDD (復元力のある分散データセット)が全体像になります。

RDD は、データがデータソースからインポートされ、RDDにドロップされるため、使いやすく、簡単に作成できます。さらに、操作はそれらを処理するために適用されます。彼らは メモリの分散コレクション としての許可を持つ 読み取り専用 そして最も重要なのは、 耐障害性



もしあれば データパーティション RDDは 失われた 、同じものを適用することで再生できます 変換 で失われたパーティションの操作 系統 、すべてのデータを最初から処理するのではなく。リアルタイムシナリオでのこの種のアプローチは、データ損失の状況やシステムがダウンしたときに奇跡を起こす可能性があります。

RDDとは何ですか?

RDD または( 復元力のある分散データセット )は基本です データ構造 Sparkで。用語 弾力性 データまたはデータを自動的に生成する機能を定義します ロールバック 元の状態 データ損失の可能性がある予期しない災害が発生した場合。

RDDに書き込まれるデータは パーティション化 に保存されます 複数の実行可能ノード 実行ノードの場合 失敗する 実行時に、それは即座にからバックアップを取得します 次の実行可能ノード 。これが、RDDが他の従来のデータ構造と比較した場合に高度なタイプのデータ構造と見なされる理由です。 RDDは、構造化データ、非構造化データ、および半構造化データを格納できます。

Sparkブログを使用してRDDを進め、他のタイプのデータ構造よりも優れているRDDの独自の機能について学びましょう。

plSQLでの例外処理

RDDの機能

  • インメモリ (羊) 計算 :インメモリ計算の概念は、データ処理をより高速で効率的な段階に導き、全体として パフォーマンス システムの アップグレードされました。
  • L 彼の評価 :遅延評価という用語は、 変換 RDDのデータに適用されますが、出力は生成されません。代わりに、適用される変換は ログに記録されます。
  • 永続性 :結果のRDDは常に 再利用可能。
  • 粗視化された操作 :ユーザーは、データセット内のすべての要素に変換を適用できます。 地図、 フィルタ または グループ化 オペレーション。
  • 耐障害性 :データが失われた場合、システムは次のことができます。 ロールバック そのに 元の状態 ログに記録されたものを使用して 変換
  • 不変性 :定義、取得、または作成されたデータは かわった システムにログインしたら。既存のRDDにアクセスして変更する必要がある場合は、次のセットを適用して新しいRDDを作成する必要があります。 変換 現在または先行するRDDに機能します
  • パーティショニング :それは 重要なユニット Sparkの並列処理 RDD。 デフォルトでは、作成されるパーティションの数はデータソースに基づいています。 を使用して作成するパーティションの数を決定することもできます カスタムパーティション 機能。

Sparkを使用したRDDの作成

RDDはで作成できます 3つの方法:

  1. からのデータの読み取り 並列化されたコレクション
val PCRDD = spark.sparkContext.parallelize(Array( 'Mon'、 'Tue'、 'Wed'、 'Thu'、 'Fri'、 'Sat')、2)val resultRDD = PCRDD.collect()resultRDD.collect( ).foreach(println)
  1. 申請中 変換 以前のRDDで
val words = spark.sparkContext.parallelize(Seq( 'Spark'、 'is'、 'a'、 'very'、 'powerful'、 'language'))val wordpair = words.map(w =(w.charAt( 0)、w))wordpair.collect()。foreach(println)
  1. からのデータの読み取り 外部記憶装置 またはのようなファイルパス HDFS または HBase
val Sparkfile = spark.read.textFile( '/ user / edureka_566977 / spark / spark.txt。')Sparkfile.collect()

RDDで実行される操作:

RDDで実行される操作には、主に2つのタイプがあります。

  • 変換
  • 行動

変換 ザ・ オペレーション RDDに適用する フィルタ、アクセス そして 変更する 親RDDのデータを生成して 連続RDD と呼ばれる 変換 。新しいRDDは、前のRDDへのポインターを返し、それらの間の依存関係を保証します。

変換は 遅延評価、 つまり、作業中のRDDに適用された操作はログに記録されますが、ログには記録されません。 実行されました。 システムは、トリガーした後に結果または例外をスローします アクション

変換は次のように2つのタイプに分けることができます。

  • 狭い変換
  • 幅広い変換

狭い変換 に狭い変換を適用します 単一パーティション RDDの処理に必要なデータがの単一のパーティションで利用可能であるため、新しいRDDを生成するための親RDDの 親ASD 。狭い変換の例は次のとおりです。

  • 地図()
  • フィルタ()
  • flatMap()
  • パーティション()
  • mapPartitions()

幅広い変換: 幅広い変換を適用します 複数のパーティション 新しいRDDを生成します。 RDDの処理に必要なデータは、の複数のパーティションで利用できます。 親ASD 。幅広い変換の例は次のとおりです。

  • reduceBy()
  • 連合()

行動 :アクションはApacheSparkに適用するように指示します 計算 結果または例外をドライバーRDDに返します。アクションのいくつかは次のとおりです。

  • collect()
  • カウント()
  • 取る()
  • 最初()

実際にRDDに操作を適用してみましょう。

IPL(インディアンプレミアリーグ) 最高レベルのヒップを備えたクリケットトーナメントです。それでは、今日はIPLデータセットを手に入れ、Sparkを使用してRDDを実行しましょう。

  • まず、 IPLのCSV一致データをダウンロードしましょう。ダウンロードすると、行と列のあるEXCELファイルのように見え始めます。

次のステップでは、sparkを起動し、その場所からmatches.csvファイルをロードします。私の場合はcsvファイルの場所は 「/user/edureka_566977/test/matches.csv」

それでは、 変換 最初の部分:

  • 地図():

を使用しております マップ変換 RDDのすべての要素に特定の変換操作を適用します。ここでは、CKfileという名前のRDDを作成します。csvファイル。 Statesと呼ばれる別のRDDを作成して 都市の詳細を保存する

spark2-shell val CKfile = sc.textFile( '/ user / edureka_566977 / test / matches.csv')CKfile.collect.foreach(println)val状態= CKfile.map(_。split( '、')(2)) States.collect()。foreach(println)

  • フィルタ():

フィルタ変換。名前自体がその使用法を説明しています。この変換操作を使用して、指定されたデータのコレクションから選択データを除外します。適用します フィルター操作 今年のIPLの試合の記録を取得するにはここに 2017年 filRDDに保存します。

val fil = CKfile.filter(line => line.contains( '2017'))fil.collect()。foreach(println)

  • flatMap():

flatMapは、RDDの各要素に変換操作を適用して、newRDDを作成します。これは、マップ変換に似ています。ここで適用しますフラットマップハイデラバード市の試合を吐き出す データをに保存しますfilRDDRDD。

val filRDD = fil.flatMap(line => line.split( 'Hyderabad'))。collect()

  • パーティション():

RDDに書き込むすべてのデータは、特定の数のパーティションに分割されます。この変換を使用して、 パーティションの数 データは実際にはに分割されます。

val fil = CKfile.filter(line => line.contains( '2017'))fil.partitions.size

  • mapPartitions():

MapPatitionsをMap()の代替と見なし、foreach() 一緒。ここでmapPartitionsを使用して、 行の数 filRDDにあります。

val fil = CKfile.filter(line => line.contains( '2016'))fil.mapPartitions(idx => Array(idx.size).iterator).collect

  • reduceBy():

を使用しておりますReduceBy() オン キーと値のペア 。この変換をcsvでプレーヤーを見つけるためのファイル 試合の最高の男

val ManOfTheMatch = CKfile.map(_。split( '、')(13))val MOTMcount = ManOfTheMatch.map(WINcount =>(WINcount、1))val ManOTH = MOTMcount.reduceByKey((x、y)=> x + y).map(tup =>(tup._2、tup._1))sortByKey(false)ManOTH.take(10).foreach(println)

  • 連合():

名前はそれをすべて説明します、私たちはユニオン変換を使用します 2つのRDDを一緒にクラブする 。ここでは、filとfil2という2つのRDDを作成しています。 fil RDDには2017 IPL一致のレコードが含まれ、fil2RDDには2016IPL一致レコードが含まれます。

val fil = CKfile.filter(line => line.contains( '2017'))val fil2 = CKfile.filter(line => line.contains( '2016'))val uninRDD = fil.union(fil2)

から始めましょう アクション 実際の出力を示す部分:

  • collect():

収集は、私たちが使用するアクションです 内容を表示する RDDで。

is-Javaの関係
val CKfile = sc.textFile( '/ user / edureka_566977 / test / matches.csv')CKfile.collect.foreach(println)

  • カウント():

カウントカウントするために使用するアクションです レコード数 RDDに存在。ここにこの操作を使用して、matches.csvファイル内のレコードの総数をカウントしています。

val CKfile = sc.textFile( '/ user / edureka_566977 / test / matches.csv')CKfile.count()

  • 取る():

Takeはcollectと同様のアクション操作ですが、唯一の違いは、 選択した行数 ユーザーの要求に応じて。ここでは、次のコードを適用して、 トップ10の主要なレポート。

val statecountm = Scount.reduceByKey((x、y)=> x + y).map(tup =>(tup._2、tup._1))sortByKey(false)statecountm.collect()。foreach(println)statecountm。 take(10).foreach(println)

  • 最初():

First()は、collect()およびtake()と同様のアクション操作です。それ最上位のレポートを出力するために使用されますここでは、first()操作を使用して 特定の都市でプレイされた試合の最大数 そして、出力としてムンバイを取得します。

val CKfile = sc.textFile( '/ user / edureka_566977 / test / matches.csv')val States = CKfile.map(_。split( '、')(2))val Scount = States.map(Scount =>( Scount、1))scala&gt val statecount = Scount.reduceByKey((x、y)=> x + y).collect.foreach(println)Scount.reduceByKey((x、y)=> x + y).collect.foreach (println)val statecountm = Scount.reduceByKey((x、y)=> x + y).map(tup =>(tup._2、tup._1))sortByKey(false)statecountm.first()

Sparkを使用してRDDを学習するプロセスをさらに興味深いものにするために、興味深いユースケースを考え出しました。

Sparkを使用したRDD:ポケモンのユースケース

  • まず、 Matches.csvファイルと同じように、Pokemon.csvファイルをダウンロードしてspark-shellにロードしましょう。
val PokemonDataRDD1 = sc.textFile( '/ user / edureka_566977 / PokemonFile / PokemonData.csv')PokemonDataRDD1.collect()。foreach(println)

ポケモンは実際には多種多様です。いくつかの種類を見つけましょう。

  • Pokemon.csvファイルからスキーマを削除する

私たちは必要ないかもしれません スキーマ Pokemon.csvファイルの。したがって、それを削除します。

val Head = PokemonDataRDD1.first()val NoHeader = PokemonDataRDD1.filter(line =>!line.equals(Head))

  • の数を見つける パーティション 私たちのpokemon.csvはに配布されています。
println( 'No.ofpartitions =' + NoHeader.partitions.size)

  • ウォーターポケモン

を見つける ウォーターポケモンの数

val WaterRDD = PokemonDataRDD1.filter(line => line.contains( 'Water'))WaterRDD.collect()。foreach(println)

  • 火のポケモン

を見つける 火のポケモンの数

val FireRDD = PokemonDataRDD1.filter(line => line.contains( 'Fire'))FireRDD.collect()。foreach(println)

  • 検出することもできます 人口 カウント機能を使用した別の種類のポケモンの
WaterRDD.count()FireRDD.count()

  • 私はのゲームが好きなので 防御戦略 でポケモンを見つけましょう 最大の防御。
val defenceList = NoHeader.map {x => x.split( '、')}。map {x =>(x(6).toDouble)} println( 'Highest_Defence:' + defenceList.max())

  • 私たちは最大値を知っています 防御力の値 でも、どのポケモンなのかわかりません。だから、それがどれであるかを見つけましょう ポケットモンスター。
val defWithPokemonName = NoHeader.map {x => x.split( '、')}。map {x =>(x(6).toDouble、x(1))} val MaxDefencePokemon = defWithPokemonName.groupByKey.takeOrdered(1) (Ordering [Double] .reverse.on(_._ 1))MaxDefencePokemon.foreach(println)

  • さあ、ポケモンを整理しましょう 最小の防御
val minDefencePokemon = defenceList.distinct.sortBy(x => x.toDouble、true、1)minDefencePokemon.take(5).foreach(println)

  • さて、ポケモンを 防御力の低い戦略。
val PokemonDataRDD2 = sc.textFile( '/ user / edureka_566977 / PokemonFile / PokemonData.csv')val Head2 = PokemonDataRDD2.first()val NoHeader2 = PokemonDataRDD2.filter(line =>!line.equals(Head))val defWithPokemonName .map {x => x.split( '、')}。map {x =>(x(6).toDouble、x(1))} val MinDefencePokemon2 = defWithPokemonName2.groupByKey.takeOrdered(1)(Ordering [Double ] .on(_._ 1))MinDefencePokemon2.foreach(println)

これで、Sparkを使用したこのRDDの記事は終わりです。 RDD、その機能、およびRDDで実行できるさまざまなタイプの操作についての知識に少し光を当てることを願っています。

この記事は ClouderaHadoopおよびSparkDeveloper認定試験(CCA175)の準備をするように設計されています。 Apache Sparkと、Spark RDD、Spark SQL、Spark MLlib、SparkStreamingを含むSparkエコシステムに関する深い知識を得ることができます。 Scalaプログラミング言語、HDFS、Sqoop、Flume、Spark GraphX、Kafkaなどのメッセージングシステムに関する包括的な知識が得られます。