このブログでは、Hadoopテクノロジーの最も重要なコンポーネントの1つであるMapReduceの機能と可能性について説明します。
今日、企業は、大規模なデータを効果的に処理する機能があるため、データストレージの最初の選択肢としてHadoopフレームワークを採用しています。しかし、データは用途が広く、さまざまな構造や形式で存在することもわかっています。このような多種多様なデータとそのさまざまな形式を制御するには、すべての種類に対応しながら、効果的で一貫した結果を生成するメカニズムが必要です。
Hadoopフレームワークの最も強力なコンポーネントはMapReduceであり、他の対応するコンポーネントよりも優れたデータとその構造の制御を提供できます。学習曲線のオーバーヘッドとプログラミングの複雑さが必要ですが、これらの複雑さを処理できれば、Hadoopを使用してあらゆる種類のデータを確実に処理できます。
MapReduceフレームワークは、すべての処理タスクを基本的に2つのフェーズ(MapとReduce)に分割します。
これらのフェーズの生データを準備するには、いくつかの基本的なクラスとインターフェイスを理解する必要があります。これらの再処理のスーパークラスは InputFormat。
Javaで数値を累乗する方法
ザ・ InputFormat クラスは、Hadoop MapReduceAPIのコアクラスの1つです。このクラスは、2つの主要なものを定義する責任があります。
- データ分割
- レコードリーダー
データ分割 は、Hadoop MapReduceフレームワークの基本的な概念であり、個々のマップタスクのサイズとその潜在的な実行サーバーの両方を定義します。ザ・ レコードリーダー 入力ファイルからレコードを実際に読み取り、それらを(キーと値のペアとして)マッパーに送信する責任があります。
マッパーの数は、分割の数に基づいて決定されます。分割を作成するのはInputFormatの仕事です。ほとんどの場合、分割サイズはブロックサイズと同等ですが、HDFSブロックサイズに基づいて分割が作成されるとは限りません。これは、InputFormatのgetSplits()メソッドがどのようにオーバーライドされたかに完全に依存します。
MRスプリットとHDFSブロックには根本的な違いがあります。ブロックはデータの物理チャンクであり、スプリットはマッパーが読み取る論理チャンクです。分割には入力データは含まれず、データの参照またはアドレスを保持するだけです。分割には基本的に2つのものがあります。バイト単位の長さと、単なる文字列である一連の格納場所です。
これをよりよく理解するために、1つの例を見てみましょう。MRを使用してMySQLに保存されたデータを処理します。この場合、ブロックの概念がないため、「スプリットは常にHDFSブロックに基づいて作成されます」という理論があります。失敗する。 1つの可能性は、MySQLテーブルの行の範囲に基づいて分割を作成することです(これは、リレーショナルデータベースからデータを読み取るための入力形式であるDBInputFormatが行うことです)。 n行で構成されるk個の分割がある場合があります。
分割が作成されるのは、FileInputFormat(ファイルに格納されたデータを処理するためのInputFormat)に基づくInputFormatの場合のみであり、入力ファイルの合計サイズ(バイト単位)に基づいて分割が作成されます。ただし、入力ファイルのFileSystemブロックサイズは、入力分割の上限として扱われます。 HDFSブロックサイズよりも小さいファイルがある場合、そのファイルに対して取得できるマッパーは1つだけです。別の動作が必要な場合は、mapred.min.split.sizeを使用できます。ただし、これもInputFormatのgetSplits()のみに依存します。
パッケージorg.apache.hadoop.mapreduce.lib.inputには、非常に多くの既存の入力形式があります。
CombineFileRecordReaderWrapper.html
CombineSequenceFileInputFormat.html
SequenceFileAsBinaryInputFormat.html
SequenceFileAsTextInputFormat.html
SequenceFileAsTextRecordReader.html
デフォルトはTextInputFormatです。
同様に、レデューサーからデータを読み取り、HDFSに保存する出力形式は非常に多くあります。
PartialFileOutputCommitter.html
SequenceFileAsBinaryOutputFormat.html
デフォルトはTextOutputFormatです。
このブログを読み終える頃には、次のことを学んでいたでしょう。
- マップリデュースプログラムの書き方
- Mapreduceで利用可能なさまざまなタイプのInputFormatsについて
- InputFormatsの必要性は何ですか
- カスタムInputFormatsの書き方
- SQLデータベースからHDFSにデータを転送する方法
- SQL(ここではMySQL)データベースからNoSQLデータベース(ここではHbase)にデータを転送する方法
- あるSQLデータベースからSQLデータベース内の他のテーブルにデータを転送する方法(同じSQLデータベースでこれを行う場合、おそらくこれはそれほど重要ではないかもしれません。しかし、同じ知識を持っていても問題はありません。あなたは決して知りませんどのように使用できるか)
前提条件:
- Hadoopがプリインストールされています
- SQLがプリインストールされています
- Hbaseがプリインストールされています
- Javaの基本的な理解
- MapReduceの知識
- Hadoopフレームワークの基本的な知識
ここで解決しようとしている問題の説明を理解しましょう。
MySQLDBのリレーショナルデータベースEdurekaにemployeeテーブルがあります。ここで、ビジネス要件に従って、リレーショナルDBで利用可能なすべてのデータをHadoopファイルシステム(HDFS、Hbaseとして知られるNoSQL DB)にシフトする必要があります。
このタスクを実行するための多くのオプションがあります。
- Sqoop
- Flume
- MapReduce
ここで、この操作のために他のツールをインストールして構成する必要はありません。 Hadoopの処理フレームワークMapReduceという1つのオプションしか残されていません。 MapReduceフレームワークを使用すると、転送中にデータを完全に制御できます。列を操作して、2つのターゲット位置のいずれかに直接配置できます。
javaプログラムを終了する方法
注意:
- MySQLテーブルからテーブルをフェッチするには、MySQLコネクタをダウンロードしてHadoopのクラスパスに配置する必要があります。これを行うには、コネクタcom.mysql.jdbc_5.1.5.jarをダウンロードし、Hadoop_home / share / Hadoop / MaPreduce / libディレクトリに保存します。
cp Downloads / com.mysql.jdbc_5.1.5.jar $ HADOOP_HOME / share / hadoop / mapreduce / lib /
- また、MRプログラムがHbaseにアクセスできるように、すべてのHbasejarをHadoopクラスパスの下に配置します。これを行うには、次のコマンドを実行します :
cp $ HBASE_HOME / lib / * $ HADOOP_HOME / share / hadoop / mapreduce / lib /
このタスクの実行に使用したソフトウェアのバージョンは次のとおりです。
- Hadooop-2.3.0
- HBase 0.98.9-Hadoop2
- 日食ムーン
互換性の問題でプログラムを回避するために、私は読者に同様の環境でコマンドを実行するように規定しています。
カスタムDBInputWritable:
パッケージcom.inputFormat.copyimport java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBInputWritableimplements Writable、DBWritable {private int id private String name、dept public void readFields(DataInput in)throws IOException {} public void readFields(ResultSet rs) throws SQLException // ResultsetオブジェクトはSQLステートメントから返されたデータを表します{id = rs.getInt(1)name = rs.getString(2)dept = rs.getString(3)} public void write(DataOutput out)throws IOException { } public void write(PreparedStatement ps)throws SQLException {ps.setInt(1、id)ps.setString(2、name)ps.setString(3、dept)} public int getId(){return id} public String getName() {return name} public String getDept(){return dept}}
カスタムDBOutputWritable:
パッケージcom.inputFormat.copyimport java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBOutputWritableimplements Writable、DBWritable {private String name private int id private String dept public DBOutputWritable(String name、int id、String dept){this.name = name this.id = id this.dept = dept} public void readFields(DataInput in)throws IOException {} public void readFields(ResultSet rs)throws SQLException {} public void write(DataOutput out)throws IOException {} public void write(PreparedStatement ps)はSQLExceptionをスローします{ps.setString(1、name)ps.setInt(2、id)ps.setString(3、dept)}}
入力テーブル:
データベースedurekaを作成する
テーブルemp(empid int not null、name varchar(30)、dept varchar(20)、primary key(empid))を作成します
emp値に挿入(1、 'abhay'、 'development')、(2、 'brundesh'、 'test')
empから*を選択
ケース1:MySQLからHDFSへの転送
パッケージcom.inputFormat.copyimport java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce .Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop .io.Text import org.apache.hadoop.io.IntWritable public class MainDbtohdfs {public static void main(String [] args)throws Exception {Configuration conf = new Configuration()DBConfiguration.configureDB(conf、 'com.mysql.jdbc .Driver '、//ドライバークラス' jdbc:mysql:// localhost:3306 / edureka '、// db url' root '、//ユーザー名' root ')//パスワードJob job = new Job(conf)job .setJarByClass(MainDbtohdfs.class)job.setMapperClass(Map.class)job.setMapOutputKeyClass(Text.class)job.setMapOutputValueClass(IntWritable.class)job.setInputFormatClass(DBInputFormat.class)FileOutputFormat.setOutputPath(job、 new Path(args [0]))DBInputFormat.setInput(job、DBInputWritable.class、 'emp'、//入力テーブル名null、null、new String [] {'empid'、 'name'、 'dept'} / /テーブル列)Path p = new Path(args [0])FileSystem fs = FileSystem.get(new URI(args [0])、conf)fs.delete(p)System.exit(job.waitForCompletion(true)? 0:1)}}
このコードを使用すると、ソースSQL DBにアクセスするためのinputformatを準備または構成できます。パラメーターにはドライバークラスが含まれ、URLにはSQLデータベースのアドレス、そのユーザー名、およびパスワードが含まれます。
DBConfiguration.configureDB(conf、 'com.mysql.jdbc.Driver'、//ドライバークラス 'jdbc:mysql:// localhost:3306 / edureka'、// db url'root '、//ユーザー名' root ') //パスワード
このコードを使用すると、データベース内のテーブルの詳細を渡して、ジョブオブジェクトに設定できます。パラメータには、もちろん、ジョブインスタンス、DBWritableインターフェイスを実装する必要のあるカスタム書き込み可能クラス、ソーステーブル名、nullの場合は条件、nullの場合はソートパラメータ、テーブル列のリストが含まれます。
DBInputFormat.setInput(job、DBInputWritable.class、 'emp'、//入力テーブル名null、null、new String [] {'empid'、 'name'、 'dept'} //テーブル列)
マッパー
パッケージcom.inputFormat.copyimport java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io .IntWritableパブリッククラスMapはMapperを拡張します{
protected void map(LongWritable key、DBInputWritable value、Context ctx){try {String name = value.getName()IntWritable id = new IntWritable(value.getId())String dept = value.getDept()
ctx.write(new Text(name + '' + id + '' + dept)、id)
} catch(IOException e){e.printStackTrace()} catch(InterruptedException e){e.printStackTrace()}}}
レデューサー:使用されるIDリデューサー
実行するコマンド:
hadoop jar dbhdfs.jar com.inputFormat.copy.MainDbtohdfs / dbtohdfs
出力:MySQLテーブルがHDFSに転送されました
hadoop dfs -ls / dbtohdfs / *
ケース2:MySQLの1つのテーブルからMySQLの別のテーブルへの転送
MySQLで出力テーブルを作成する
テーブルemployee1(name varchar(20)、id int、dept varchar(20))を作成します
パッケージcom.inputFormat.copyimport org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib .db.DBInputFormat import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io.NullWritable public class Mainonetable_to_other_table {public static void main(String [] args)throws Exception {Configuration conf = new Configuration()DBConfiguration.configureDB(conf、 'com.mysql.jdbc.Driver'、//ドライバークラス 'jdbc:mysql:// localhost :3306 / edureka '、// db url' root '、//ユーザー名' root ')//パスワードJob job = new Job(conf)job.setJarByClass(Mainonetable_to_other_table.class)job.setMapperClass(Map.class)job .setReducerClass(Reduce.class)job.setMapOutputKeyClass(Text.class)job.setMapOutputValueClass(IntWritable.class)job.setOutputKeyClass(DBOutputWritable.class)job.setOutputValueClass(Nul lWritable.class)job.setInputFormatClass(DBInputFormat.class)job.setOutputFormatClass(DBOutputFormat.class)DBInputFormat.setInput(job、DBInputWritable.class、 'emp'、//入力テーブル名null、null、new String [] {'empid '、' name '、' dept '} //テーブル列)DBOutputFormat.setOutput(job、' employee1 '、//出力テーブル名new String [] {' name '、' id '、' dept '} // table列)System.exit(job.waitForCompletion(true)? 0:1)}}
このコードを使用すると、SQL DBで出力テーブル名を構成できます。パラメーターは、それぞれジョブインスタンス、出力テーブル名、および出力列名です。
DBOutputFormat.setOutput(job、 'employee1'、//出力テーブル名new String [] {'name'、 'id'、 'dept'} //テーブル列)
マッパー:ケース1と同じ
レデューサー:
パッケージcom.inputFormat.copyimport java.io.IOException import org.apache.hadoop.mapreduce.Reducer import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io .NullWritable public class Reduce extends Reducer {protected void reduce(Text key、Iterable values、Context ctx){int sum = 0 String line [] = key.toString()。split( '')try {ctx.write(new DBOutputWritable (line [0] .toString()、Integer.parseInt(line [1] .toString())、line [2] .toString())、NullWritable.get())} catch(IOException e){e.printStackTrace ()} catch(InterruptedException e){e.printStackTrace()}}}
実行するコマンド:
hadoop jar dbhdfs.jar com.inputFormat.copy.Mainonetable_to_other_table
出力:MySQLのEMPテーブルからMySQLの別のテーブルEmployee1に転送されたデータ
ケース3:MySQLのテーブルからNoSQL(Hbase)テーブルへの転送
SQLテーブルからの出力に対応するHbaseテーブルの作成:
'employee'、 'official_info'を作成します
ドライバークラス:
パッケージDbtohbaseimport org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.client.HTableInterface import org.apache .hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil import org.apache.hadoop.io.Text public class MainDbToHbase {public static void main(String [] args)throws Exception {Configuration conf = HBaseConfiguration.create()HTableInterface mytable = new HTable(conf、 'emp')DBConfiguration.configureDB(conf、 'com.mysql.jdbc.Driver'、//ドライバークラス 'jdbc:mysql:// localhost:3306 / edureka' 、// db url'root '、//ユーザー名' root ')//パスワードJob job = new Job(conf、' dbtohbase ')job.setJarByClass(MainDbToHbase.class)job.s etMapperClass(Map.class)job.setMapOutputKeyClass(ImmutableBytesWritable.class)job.setMapOutputValueClass(Text.class)TableMapReduceUtil.initTableReducerJob( 'employee'、Reduce.class、job)job.setInputFormatClass(DBInputFormat.class)job.setOutputFormatClass(TableOutputFormat。 class)DBInputFormat.setInput(job、DBInputWritable.class、 'emp'、//入力テーブル名null、null、new String [] {'empid'、 'name'、 'dept'} //テーブル列)System.exit (job.waitForCompletion(true)? 0:1)}}
このコードを使用すると、hbaseの場合はImmutableBytesWritableである出力キークラスを構成できます。
job.setMapOutputKeyClass(ImmutableBytesWritable.class)job.setMapOutputValueClass(Text.class)
ここでは、hbaseテーブル名とレデューサーを渡してテーブルを操作します。
TableMapReduceUtil.initTableReducerJob( 'employee'、Reduce.class、job)
マッパー:
パッケージDbtohbaseimport java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.io .LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable public class Map extends Mapper {private IntWritable one = new IntWritable(1)protected void map(LongWritable id、DBInputWritable value、Context context) {try {String line = value.getName()String cd = value.getId()+ '' String dept = value.getDept()context.write(new ImmutableBytesWritable(Bytes.toBytes(cd))、new Text(line + ' '+ dept))} catch(IOException e){e.printStackTrace()} catch(InterruptedException e){e.printStackTrace()}}}
このコードでは、DBinputwritableクラスのゲッターから値を取得し、それらを渡します。
ImmutableBytesWritableは、Hbaseが理解できるbytewriatble形式でレデューサーに到達するようにします。
String line = value.getName()String cd = value.getId()+ '' String dept = value.getDept()context.write(new ImmutableBytesWritable(Bytes.toBytes(cd))、new Text(line + '' + dept ))
レデューサー:
パッケージDbtohbaseimport java.io.IOException import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableReducer import org.apache.hadoop .hbase.util.Bytes import org.apache.hadoop.io.Text public class Reduce extends TableReducer {public void reduce(ImmutableBytesWritable key、Iterable values、Context context)throws IOException、InterruptedException {String [] cause = null //ループ値for(Text val:values){cause = val.toString()。split( '')} // HBaseに配置Putput = new Put(key.get())put.add(Bytes.toBytes( 'official_info' )、Bytes.toBytes( 'name')、Bytes.toBytes(cause [0]))put.add(Bytes.toBytes( 'official_info')、Bytes.toBytes( 'department')、Bytes.toBytes(cause [1 ]))context.write(key、put)}}
このコードにより、レデューサーからの値を格納する正確な行と列を決定できます。ここでは、一意の行キーとしてempidを作成したため、各empidを別々の行に格納しています。各行には、列ファミリー「official_info」の下の列「name」と「department」の下に、従業員の公式情報がそれぞれ格納されています。
Put put = new Put(key.get())put.add(Bytes.toBytes( 'official_info')、Bytes.toBytes( 'name')、Bytes.toBytes(cause [0]))put.add(Bytes。 toBytes( 'official_info')、Bytes.toBytes( 'department')、Bytes.toBytes(cause [1]))context.write(key、put)
Hbaseで転送されたデータ:
従業員をスキャンする
ご覧のとおり、ビジネスデータをリレーショナルSQLDBからNoSQLDBに正常に移行するタスクを完了することができました。
次のブログでは、他の入力および出力形式のコードを記述して実行する方法を学習します。
コメント、質問、またはフィードバックを投稿し続けてください。ご連絡をお待ちしております。
質問がありますか?コメント欄にご記入ください。折り返しご連絡いたします。
関連記事: