Hadoop对关系数据库一般为两种操作:

从关系数据库到HDFS(DBInputFormat,将关系数据库的一条记录作为向Mapper输入的value)
从HDFS到关系数据库(DBOutputFormat,将Reducer输出的key值存储到数据库)

两个例子

从关系数据库读取到HDFS

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.lib.db.*;
import java.sql.*;
import java.io.*;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.fs.Path;

public class SDBConnInput {
	public static class CustomerRecord implements Writable,DBWritable{
		String customerID;
		String customerName;
		String phoneNumber;
		public void readFields(ResultSet resultSet)  throws SQLException{
			customerID=resultSet.getString(1);
			customerName=resultSet.getString(2);
			phoneNumber=resultSet.getString(3);
			}
		public void write(PreparedStatement statement)  throws SQLException{
			statement.setString(1, customerID);
			statement.setString(2, customerName);
			statement.setString(3,phoneNumber);
		}
			 
		 public void readFields(DataInput in) throws IOException{
			 customerID=in.readUTF();
			 customerName=in.readUTF();
			 phoneNumber=in.readUTF();
		 }
		 public void write(DataOutput out) throws IOException{
			 out.writeUTF(customerID);
			 out.writeUTF(customerName);
			 out.writeUTF(phoneNumber);
		 }
		 public void setCustomerID(String customerID){
			 this.customerID=customerID;
		 }
		 public void setCustomerName(String customerName){
			 this.customerName=customerName;			 
		 }
		 public void setPhoneNumber(String phoneNumber){
			 this.phoneNumber=phoneNumber;
		 }
		 public String toString(){
			 return this.customerID+","+this.customerName+","+this.phoneNumber; 
		 }
	}
	public static class MapperClass extends MapReduceBase implements Mapper<LongWritable,CustomerRecord,LongWritable,Text>{
		Text result= new Text();
		 public void map(LongWritable key, CustomerRecord value,OutputCollector<LongWritable, Text> collector, Reporter reporter) throws IOException{
			result.set(value.toString());
			collector.collect(key, result);
		}
	}
	public static class ReducerClass extends MapReduceBase implements Reducer<LongWritable, Text,NullWritable,Text>{
		public void reduce(LongWritable key, Iterator<Text> values, OutputCollector<NullWritable,Text> output, Reporter reporter) throws IOException{
			 String str="";
			 while(values.hasNext()){
				  str+=values.next().toString();
			 }
			 output.collect(null, new Text(str));	
	  }
	}
	public static void main(String [] args) throws Exception{
		/**
		 * 从关系数据库读取数据到HDFS
		 */
		JobConf job = new JobConf();
		job.setJarByClass(SDBConnInput.class);
		job.setOutputKeyClass(LongWritable.class);
		job.setOutputValueClass(Text.class);
		job.setInputFormat(DBInputFormat.class);
		FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000/user/xuyizhen/out"));
		DBConfiguration.configureDB(job, "com.mysql.jdbc.Driver",
				"jdbc:mysql://192.168.0.25:3306/hadoop","root","1117");
		String fieldNames []={"customerID","customerName","phoneNumber"};
		DBInputFormat.setInput(job, CustomerRecord.class,"customers",null,"customerID", fieldNames);
		job.setMapperClass(MapperClass.class);
		job.setReducerClass(ReducerClass.class);
		JobClient.runJob(job);
	}
}

从HDFS输出到关系数据库

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.lib.db.*;
import java.sql.*;
import java.io.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.filecache.*;

public class SDBConnOutput {
	public static class CustomerRecord implements Writable,DBWritable{
		String customerID;
		String customerName;
		String phoneNumber;
		public void readFields(ResultSet resultSet)  throws SQLException{
			customerID=resultSet.getString(1);
			customerName=resultSet.getString(2);
			phoneNumber=resultSet.getString(3);
			}
		public void write(PreparedStatement statement)  throws SQLException{
			statement.setString(1, customerID);
			statement.setString(2, customerName);
			statement.setString(3,phoneNumber);
		}
			 
		 public void readFields(DataInput in) throws IOException{
			 customerID=in.readUTF();
			 customerName=in.readUTF();
			 phoneNumber=in.readUTF();
		 }
		 public void write(DataOutput out) throws IOException{
			 out.writeUTF(customerID);
			 out.writeUTF(customerName);
			 out.writeUTF(phoneNumber);
		 }
		 public void setCustomerID(String customerID){
			 this.customerID=customerID;
		 }
		 public void setCustomerName(String customerName){
			 this.customerName=customerName;			 
		 }
		 public void setPhoneNumber(String phoneNumber){
			 this.phoneNumber=phoneNumber;
		 }
		 public String toString(){
			 return this.customerID+","+this.customerName+","+this.phoneNumber; 
		 }
	}
	public static class MapperClass extends MapReduceBase implements Mapper<LongWritable,Text,CustomerRecord,Text>{
		CustomerRecord customer=new CustomerRecord();
		 public void map(LongWritable key, Text value,OutputCollector<CustomerRecord,Text> collector, Reporter reporter)  throws IOException{
			 String [] strs=value.toString().split(",");
			customer.setCustomerID(strs[0]);
			customer.setCustomerName(strs[1]);
			customer.setPhoneNumber(strs[2]);
			collector.collect( customer,value);
		}
		
	}
	/**
	*将HDFS中的文件输出到数据库
	*/
	public static void main(String [] args) throws Exception{
	
		
		/**
		 * 从关系数据库读取数据到HDFS
		 */
		
		JobConf job = new JobConf(SDBConnInput.class);
		//DBOutputFormat类只会将MapReduce框架输出结果的K值输出到关系数据库中
	    job.setOutputFormat(DBOutputFormat.class);
	    FileInputFormat.addInputPath(job, new Path("hdfs://master:9000/user/xuyizhen/in/customer.txt"));
	    DBConfiguration.configureDB(job, "com.mysql.jdbc.Driver",
				"jdbc:mysql://192.168.0.25:3306/hadoop","root","1117");
	    String fieldNames []={"customerID","customerName","phoneNumber"};
	    DBOutputFormat.setOutput(job, "customers", fieldNames);
	    job.setMapperClass(MapperClass.class);
	    job.setNumReduceTasks(0);
	    JobClient.runJob(job);
	}
注意:运行MapReduce时候报错:
java.io.IOException: com.mysql.jdbc.Driver
一般是由于程序找不到mysql驱动包。解决方法是让每个tasktracker运行MapReduce程序时都可以找到该驱动包。

添加包的两种方式

1.在每个节点下的${HADOOP_HOME}/lib下添加该包,然后重启集群,这是比较原始的方法。
2.把包传到集群上:hadoop fs -put mysql驱动jar包名称/lib,并且在提交job前,添加语句DistributedCache.addFileToClassPath(new Path("/lib/mysql驱动jar包名称"),conf);
以上方法使用与所有需要额外jar包的MapReduce代码。
⤧  Next post collection maven repository ⤧  Previous post Select2个人使用总结