hadoop與mysql數據庫的那點事
來自: http://www.cnblogs.com/JimLy-BUG/p/5177952.html
轉眼間已經接觸了hadoop兩周了,從之前的極力排斥到如今的有點喜歡,剛開始被搭建hadoop開發環境搞得幾乎要放棄,如今學會了編寫小程序,每天都在成長一點挺好的,好好努力,為自己的裝備庫再填一件武器挺好的,學習在于堅持不懈,加油!!!
馬上就要過年了,在最后一天的上班時間內完成了hadoop如何去連接mysql數據庫,自己感到很滿足,下面就把自己編寫的源碼貢獻給大家,希望能夠幫到你們,如存在優化的地方還請大牛們指出,也希望有hadoop的大牛能夠給點學習建議,一個來個HA初學者的心聲。第一次發布竟然被退回,也不知道為什么,瞬間心情都不好了,但我還是堅持寫自己的博客...
StudentRecord類:
package com.simope.mr.db; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.lib.db.DBWritable; public class StudentRecord implements Writable, DBWritable{ int id; String name; int age; int departmentID; @Override public void readFields(DataInput in) throws IOException { this.id = in.readInt(); this.name = Text.readString(in); this.age = in.readInt(); this.departmentID = in.readInt(); } @Override public void write(DataOutput out) throws IOException { out.write(this.id); Text.writeString(out, this.name); out.write(this.age); out.write(this.departmentID); } public void readFields(ResultSet rs) throws SQLException { this.id = rs.getInt(1); this.name = rs.getString(2); this.age = rs.getInt(3); this.departmentID = rs.getInt(4); } public void write(PreparedStatement ps) throws SQLException { ps.setInt(1, this.id); ps.setString(2, this.name); ps.setInt(3, this.age); ps.setInt(4, this.departmentID); } @Override public String toString() { return new String(this.name + "\t" + this.age + "\t" + this.departmentID); } }
TeacherRecord類:
package com.simope.mr.db; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.lib.db.DBWritable; public class TeacherRecord implements Writable, DBWritable{ int id; String name; int age; int departmentID; @Override public void readFields(DataInput in) throws IOException { this.id = in.readInt(); this.name = Text.readString(in); this.age = in.readInt(); this.departmentID = in.readInt(); } @Override public void write(DataOutput out) throws IOException { out.write(this.id); Text.writeString(out, this.name); out.write(this.age); out.write(this.departmentID); } public void readFields(ResultSet rs) throws SQLException { this.id = rs.getInt(1); this.name = rs.getString(2); this.age = rs.getInt(3); this.departmentID = rs.getInt(4); } public void write(PreparedStatement ps) throws SQLException { ps.setInt(1, this.id); ps.setString(2, this.name); ps.setInt(3, this.age); ps.setInt(4, this.departmentID); } @Override public String toString() { return new String(this.name + "\t" + this.age + "\t" + this.departmentID); } }
DBMapper類:
package com.simope.mr.db; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; public class DBMapper extends MapReduceBase implements Mapper<LongWritable, TeacherRecord, LongWritable, Text> { public void map(LongWritable key, TeacherRecord value, OutputCollector<LongWritable, Text> collector, Reporter reporter) throws IOException { collector.collect(new LongWritable(value.id), new Text(value.toString())); } }
DBReducer類:
package com.simope.mr.db; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; public class DBReducer extends MapReduceBase implements Reducer<LongWritable, Text, StudentRecord, Text>{ @Override public void reduce(LongWritable key, Iterator<Text> values, OutputCollector<StudentRecord, Text> output, Reporter reporter) throws IOException { String[] InfoArr = values.next().toString().split("\t"); StudentRecord s = new StudentRecord(); // t.id = Integer.parseInt(InfoArr[0]); //id是自增長 s.name = InfoArr[0]; s.age = Integer.parseInt(InfoArr[1]); s.departmentID = Integer.parseInt(InfoArr[2]); output.collect(s, new Text(s.name)); } }
DBJob類: (讀取數據庫表內容,并將數據寫入hdfs文件中) 數據庫表- hdfs文件
package com.simope.mr.db; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.lib.IdentityReducer; import org.apache.hadoop.mapred.lib.db.DBConfiguration; import org.apache.hadoop.mapred.lib.db.DBInputFormat; /** * @deprecated 讀取數據庫錄入文件 * @author JimLy * @see 20160202 * */ public class DBJob { public static void main(String[] args) throws IOException{ JobConf jobConf = new JobConf(DBJob.class); jobConf.setOutputKeyClass(LongWritable.class); jobConf.setOutputValueClass(Text.class); jobConf.setInputFormat(DBInputFormat.class); FileOutputFormat.setOutputPath(jobConf, new Path("/usr/output/db")); DBConfiguration.configureDB(jobConf, "com.mysql.jdbc.Driver", "jdbc:mysql://10.10.1.1:3306/my_hd", "root", "root"); String[] fields = {"id", "name", "age", "departmentID"}; //從my_hd數據庫的teacher表查詢數據 DBInputFormat.setInput(jobConf, TeacherRecord.class, "teacher", null, "id", fields); jobConf.setMapperClass(DBMapper.class); jobConf.setReducerClass(IdentityReducer.class); JobClient.runJob(jobConf); } }
DB2Job類: (讀取數據庫表內容,并將數據寫入hdfs文件中) 數據庫表- hdfs文件
package com.simope.mr.db; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.lib.IdentityReducer; import org.apache.hadoop.mapred.lib.db.DBConfiguration; import org.apache.hadoop.mapred.lib.db.DBInputFormat; /** * @deprecated 讀取數據庫錄入文件 * @author JimLy * @see 20160202 * */ public class DB2Job { public static void main(String[] args) throws IOException{ JobConf jobConf = new JobConf(DB2Job.class); jobConf.setOutputKeyClass(LongWritable.class); jobConf.setOutputValueClass(Text.class); jobConf.setInputFormat(DBInputFormat.class); FileOutputFormat.setOutputPath(jobConf, new Path("/usr/output/db")); DBConfiguration.configureDB(jobConf, "com.mysql.jdbc.Driver", "jdbc:mysql://10.10.1.1:3306/my_hd", "root", "root"); // String[] fields = {"id", "name", "age", "departmentID"}; String inputQuery = "SELECT * FROM teacher where id != 4"; String inputCountQuery = "SELECT COUNT(1) FROM teacher where id != 4"; //從my_hd數據庫的teacher表查詢數據 DBInputFormat.setInput(jobConf, TeacherRecord.class, inputQuery, inputCountQuery); jobConf.setMapperClass(DBMapper.class); jobConf.setReducerClass(IdentityReducer.class); JobClient.runJob(jobConf); } }
DB3Job類: (讀取hdfs文件中的內容,并將數據寫入指定的數據庫表中) =>hdfs文件- 數據庫表
package com.simope.mr.db; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.lib.IdentityMapper; import org.apache.hadoop.mapred.lib.db.DBConfiguration; import org.apache.hadoop.mapred.lib.db.DBOutputFormat; /** * @deprecated 讀取文件錄入數據庫 * @author JimLy * @see 20160202 * */ public class DB3Job { public static void main(String[] args) throws IOException{ JobConf jobConf = new JobConf(DB3Job.class); jobConf.setInputFormat(TextInputFormat.class); jobConf.setOutputFormat(DBOutputFormat.class); FileInputFormat.addInputPath(jobConf, new Path("/usr/input/db")); DBConfiguration.configureDB(jobConf, "com.mysql.jdbc.Driver", "jdbc:mysql://10.10.1.1:3306/my_hd", "root", "root"); String[] fields = {"id", "name", "age", "departmentID"}; DBOutputFormat.setOutput(jobConf, "teacher", fields); jobConf.setMapperClass(IdentityMapper.class); jobConf.setReducerClass(DBReducer.class); JobClient.runJob(jobConf); } }
DB4Job類: (讀取指定的數據庫表信息,并將數據寫入其他指定表中)=>數據庫表-表
package com.simope.mr.db; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.lib.db.DBConfiguration; import org.apache.hadoop.mapred.lib.db.DBInputFormat; import org.apache.hadoop.mapred.lib.db.DBOutputFormat; /** * @deprecated 讀取數據庫表錄入其他表 * @author JimLy * @see 20160202 * */ public class DB4Job { public static void main(String[] args) throws IOException{ JobConf jobConf = new JobConf(DB4Job.class); jobConf.setOutputKeyClass(LongWritable.class); jobConf.setOutputValueClass(Text.class); jobConf.setInputFormat(DBInputFormat.class); jobConf.setOutputFormat(DBOutputFormat.class); DBConfiguration.configureDB(jobConf, "com.mysql.jdbc.Driver", "jdbc:mysql://10.10.1.1:3306/my_hd", "root", "root"); String inputQuery = "SELECT * FROM teacher"; String inputCountQuery = "SELECT COUNT(1) FROM teacher"; //從my_hd數據庫的teacher表查詢數據 DBInputFormat.setInput(jobConf, TeacherRecord.class, inputQuery, inputCountQuery); String[] fields = {"id", "name", "age", "departmentID"}; DBOutputFormat.setOutput(jobConf, "student", fields); jobConf.setMapperClass(DBMapper.class); jobConf.setReducerClass(DBReducer.class); JobClient.runJob(jobConf); } }
如果你覺得寫的不錯的,請點個推薦,你的推薦是我繼續堅持寫博客的動力。。。
如需轉載的請注明出處 : http://www.cnblogs.com/JimLy-BUG/
本文由用戶 htwoz 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!