hadoop與mysql數據庫的那點事
來自: http://www.cnblogs.com/JimLy-BUG/p/5177952.html
轉眼間已經接觸了hadoop兩周了,從之前的極力排斥到如今的有點喜歡,剛開始被搭建hadoop開發環境搞得幾乎要放棄,如今學會了編寫小程序,每天都在成長一點挺好的,好好努力,為自己的裝備庫再填一件武器挺好的,學習在于堅持不懈,加油!!!
馬上就要過年了,在最后一天的上班時間內完成了hadoop如何去連接mysql數據庫,自己感到很滿足,下面就把自己編寫的源碼貢獻給大家,希望能夠幫到你們,如存在優化的地方還請大牛們指出,也希望有hadoop的大牛能夠給點學習建議,一個來個HA初學者的心聲。第一次發布竟然被退回,也不知道為什么,瞬間心情都不好了,但我還是堅持寫自己的博客...
StudentRecord類:
package com.simope.mr.db;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;
public class StudentRecord implements Writable, DBWritable{
int id;
String name;
int age;
int departmentID;
@Override
public void readFields(DataInput in) throws IOException {
this.id = in.readInt();
this.name = Text.readString(in);
this.age = in.readInt();
this.departmentID = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.write(this.id);
Text.writeString(out, this.name);
out.write(this.age);
out.write(this.departmentID);
}
public void readFields(ResultSet rs) throws SQLException {
this.id = rs.getInt(1);
this.name = rs.getString(2);
this.age = rs.getInt(3);
this.departmentID = rs.getInt(4);
}
public void write(PreparedStatement ps) throws SQLException {
ps.setInt(1, this.id);
ps.setString(2, this.name);
ps.setInt(3, this.age);
ps.setInt(4, this.departmentID);
}
@Override
public String toString() {
return new String(this.name + "\t" + this.age + "\t" + this.departmentID);
}
} TeacherRecord類:
package com.simope.mr.db;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;
public class TeacherRecord implements Writable, DBWritable{
int id;
String name;
int age;
int departmentID;
@Override
public void readFields(DataInput in) throws IOException {
this.id = in.readInt();
this.name = Text.readString(in);
this.age = in.readInt();
this.departmentID = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.write(this.id);
Text.writeString(out, this.name);
out.write(this.age);
out.write(this.departmentID);
}
public void readFields(ResultSet rs) throws SQLException {
this.id = rs.getInt(1);
this.name = rs.getString(2);
this.age = rs.getInt(3);
this.departmentID = rs.getInt(4);
}
public void write(PreparedStatement ps) throws SQLException {
ps.setInt(1, this.id);
ps.setString(2, this.name);
ps.setInt(3, this.age);
ps.setInt(4, this.departmentID);
}
@Override
public String toString() {
return new String(this.name + "\t" + this.age + "\t" + this.departmentID);
}
} DBMapper類:
package com.simope.mr.db;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class DBMapper extends MapReduceBase implements
Mapper<LongWritable, TeacherRecord, LongWritable, Text> {
public void map(LongWritable key, TeacherRecord value,
OutputCollector<LongWritable, Text> collector, Reporter reporter)
throws IOException {
collector.collect(new LongWritable(value.id),
new Text(value.toString()));
}
} DBReducer類:
package com.simope.mr.db;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class DBReducer extends MapReduceBase implements Reducer<LongWritable, Text, StudentRecord, Text>{
@Override
public void reduce(LongWritable key, Iterator<Text> values,
OutputCollector<StudentRecord, Text> output, Reporter reporter)
throws IOException {
String[] InfoArr = values.next().toString().split("\t");
StudentRecord s = new StudentRecord();
// t.id = Integer.parseInt(InfoArr[0]); //id是自增長
s.name = InfoArr[0];
s.age = Integer.parseInt(InfoArr[1]);
s.departmentID = Integer.parseInt(InfoArr[2]);
output.collect(s, new Text(s.name));
}
} DBJob類: (讀取數據庫表內容,并將數據寫入hdfs文件中) 數據庫表- hdfs文件
package com.simope.mr.db;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
/**
* @deprecated 讀取數據庫錄入文件
* @author JimLy
* @see 20160202
* */
public class DBJob {
public static void main(String[] args) throws IOException{
JobConf jobConf = new JobConf(DBJob.class);
jobConf.setOutputKeyClass(LongWritable.class);
jobConf.setOutputValueClass(Text.class);
jobConf.setInputFormat(DBInputFormat.class);
FileOutputFormat.setOutputPath(jobConf, new Path("/usr/output/db"));
DBConfiguration.configureDB(jobConf, "com.mysql.jdbc.Driver", "jdbc:mysql://10.10.1.1:3306/my_hd", "root", "root");
String[] fields = {"id", "name", "age", "departmentID"};
//從my_hd數據庫的teacher表查詢數據
DBInputFormat.setInput(jobConf, TeacherRecord.class, "teacher", null, "id", fields);
jobConf.setMapperClass(DBMapper.class);
jobConf.setReducerClass(IdentityReducer.class);
JobClient.runJob(jobConf);
}
} DB2Job類: (讀取數據庫表內容,并將數據寫入hdfs文件中) 數據庫表- hdfs文件
package com.simope.mr.db;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
/**
* @deprecated 讀取數據庫錄入文件
* @author JimLy
* @see 20160202
* */
public class DB2Job {
public static void main(String[] args) throws IOException{
JobConf jobConf = new JobConf(DB2Job.class);
jobConf.setOutputKeyClass(LongWritable.class);
jobConf.setOutputValueClass(Text.class);
jobConf.setInputFormat(DBInputFormat.class);
FileOutputFormat.setOutputPath(jobConf, new Path("/usr/output/db"));
DBConfiguration.configureDB(jobConf, "com.mysql.jdbc.Driver", "jdbc:mysql://10.10.1.1:3306/my_hd", "root", "root");
// String[] fields = {"id", "name", "age", "departmentID"};
String inputQuery = "SELECT * FROM teacher where id != 4";
String inputCountQuery = "SELECT COUNT(1) FROM teacher where id != 4";
//從my_hd數據庫的teacher表查詢數據
DBInputFormat.setInput(jobConf, TeacherRecord.class, inputQuery, inputCountQuery);
jobConf.setMapperClass(DBMapper.class);
jobConf.setReducerClass(IdentityReducer.class);
JobClient.runJob(jobConf);
}
} DB3Job類: (讀取hdfs文件中的內容,并將數據寫入指定的數據庫表中) =>hdfs文件- 數據庫表
package com.simope.mr.db;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
/**
* @deprecated 讀取文件錄入數據庫
* @author JimLy
* @see 20160202
* */
public class DB3Job {
public static void main(String[] args) throws IOException{
JobConf jobConf = new JobConf(DB3Job.class);
jobConf.setInputFormat(TextInputFormat.class);
jobConf.setOutputFormat(DBOutputFormat.class);
FileInputFormat.addInputPath(jobConf, new Path("/usr/input/db"));
DBConfiguration.configureDB(jobConf, "com.mysql.jdbc.Driver", "jdbc:mysql://10.10.1.1:3306/my_hd", "root", "root");
String[] fields = {"id", "name", "age", "departmentID"};
DBOutputFormat.setOutput(jobConf, "teacher", fields);
jobConf.setMapperClass(IdentityMapper.class);
jobConf.setReducerClass(DBReducer.class);
JobClient.runJob(jobConf);
}
} DB4Job類: (讀取指定的數據庫表信息,并將數據寫入其他指定表中)=>數據庫表-表
package com.simope.mr.db;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
/**
* @deprecated 讀取數據庫表錄入其他表
* @author JimLy
* @see 20160202
* */
public class DB4Job {
public static void main(String[] args) throws IOException{
JobConf jobConf = new JobConf(DB4Job.class);
jobConf.setOutputKeyClass(LongWritable.class);
jobConf.setOutputValueClass(Text.class);
jobConf.setInputFormat(DBInputFormat.class);
jobConf.setOutputFormat(DBOutputFormat.class);
DBConfiguration.configureDB(jobConf, "com.mysql.jdbc.Driver", "jdbc:mysql://10.10.1.1:3306/my_hd", "root", "root");
String inputQuery = "SELECT * FROM teacher";
String inputCountQuery = "SELECT COUNT(1) FROM teacher";
//從my_hd數據庫的teacher表查詢數據
DBInputFormat.setInput(jobConf, TeacherRecord.class, inputQuery, inputCountQuery);
String[] fields = {"id", "name", "age", "departmentID"};
DBOutputFormat.setOutput(jobConf, "student", fields);
jobConf.setMapperClass(DBMapper.class);
jobConf.setReducerClass(DBReducer.class);
JobClient.runJob(jobConf);
}
} 如果你覺得寫的不錯的,請點個推薦,你的推薦是我繼續堅持寫博客的動力。。。
如需轉載的請注明出處 : http://www.cnblogs.com/JimLy-BUG/
本文由用戶 htwoz 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!