试试使用Apache Sqoop

试试使用Sqoop

Sqoop是一个从关系型数据库等结构化数据存储中提取数据并与Hadoop进行协作的工具。它是Apache的顶级项目。我已经亲自使用过,所以我会把我的使用体验写在博客上作为备忘录。

    http://sqoop.apache.org/

不需要构建环境,我们在这里使用了Hotronworks的Sandbox。版本如下所示。虽然出现了某个环境变量未设置的警告,但我们将忽略它。

[root@sandbox ~]# sqoop version
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
14/08/14 22:46:05 INFO sqoop.Sqoop: Running Sqoop version: 1.4.4.2.1.1.0-385
Sqoop 1.4.4.2.1.1.0-385
git commit id d3c37763356e55bbf152053f6db24b1bfe582972
Compiled by jenkins on Wed Apr 16 16:12:40 PDT 2014
    http://hortonworks.com/products/hortonworks-sandbox/

让我试试看

MySQL转换为HDFS。

创建数据库

暂时先创建一个sqoopsample数据库,并且赋予强大的权限。

> mysql -u root -p
> CREATE DATABASE sqoopsample;
> GRANT ALL PRIVILEGES ON sqoopsample.* TO '%'@'localhost';
> GRANT ALL PRIVILEGES ON sqoopsample.* TO ''@'localhost';
> quit;

运行以下脚本以输入数据。

使用mysql sqoopsample命令将sql.cmd文件导入

CREATE TABLE employees(id INT NOT NULL PRIMARY KEY AUTO_INCREMENT,
                       name VARCHAR(64) NOT NULL,
                       age  INT,
                       birtday DATE,
                       description VARCHAR(100));
INSERT INTO employees  VALUES(NULL, 'TANAKA Ichiro', 30, '1984-10-10', 'hoge');
INSERT INTO employees  VALUES(NULL, 'TANAKA Jiro'  , 29, '1985-12-22', 'fuge');
INSERT INTO employees  VALUES(NULL, 'TANAKA Saburo', 28, '1986-08-01', NULL);

只要数据输入完整,就没有问题。

mysql> select * from employees;
+----+---------------+------+------------+-------------+
| id | name          | age  | birtday    | description |
+----+---------------+------+------------+-------------+
|  1 | TANAKA Ichiro |   30 | 1984-10-10 | hoge        |
|  2 | TANAKA Jiro   |   29 | 1985-12-22 | fuge        |
|  3 | TANAKA Saburo |   28 | 1986-08-01 | NULL        |
+----+---------------+------+------------+-------------+
3 rows in set (0.00 sec)

执行Sqoop

使用Sqoop进行导入并将数据传输到HDFS。由于传输本身就像是一个MR程序,因此可以使用”-m”参数来指定Mapper的数量。以下是执行结果。

[root@sandbox ~]# sqoop import --connect jdbc:mysql://localhost/sqoopsample --table employees -m 1

...
...

[root@sandbox ~]# hadoop fs -ls employees
Found 2 items
-rw-r--r--   1 root root          0 2014-08-14 22:43 employees/_SUCCESS
-rw-r--r--   1 root root        103 2014-08-14 22:43 employees/part-m-00000
[root@sandbox ~]# hadoop fs -cat employees/part-m-00000
1,TANAKA Ichiro,30,1984-10-10,hoge
2,TANAKA Jiro,29,1985-12-22,fuge
3,TANAKA Saburo,28,1986-08-01,null

因此,传输已经完成。

追加说明

执行命令后,将在当前目录下生成自动生成的MR的Java程序(文件)。虽然有点长,但我将其贴在下面。

// ORM class for table 'employees'
// WARNING: This class is AUTO-GENERATED. Modify at your own risk.
//
// Debug information:
// Generated date: Thu Aug 14 22:43:02 PDT 2014
// For connector: org.apache.sqoop.manager.MySQLManager
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;
import com.cloudera.sqoop.lib.JdbcWritableBridge;
import com.cloudera.sqoop.lib.DelimiterSet;
import com.cloudera.sqoop.lib.FieldFormatter;
import com.cloudera.sqoop.lib.RecordParser;
import com.cloudera.sqoop.lib.BooleanParser;
import com.cloudera.sqoop.lib.BlobRef;
import com.cloudera.sqoop.lib.ClobRef;
import com.cloudera.sqoop.lib.LargeObjectLoader;
import com.cloudera.sqoop.lib.SqoopRecord;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

public class employees extends SqoopRecord  implements DBWritable, Writable {
  private final int PROTOCOL_VERSION = 3;
  public int getClassFormatVersion() { return PROTOCOL_VERSION; }
  protected ResultSet __cur_result_set;
  private Integer id;
  public Integer get_id() {
    return id;
  }
  public void set_id(Integer id) {
    this.id = id;
  }
  public employees with_id(Integer id) {
    this.id = id;
    return this;
  }
  private String name;
  public String get_name() {
    return name;
  }
  public void set_name(String name) {
    this.name = name;
  }
  public employees with_name(String name) {
    this.name = name;
    return this;
  }
  private Integer age;
  public Integer get_age() {
    return age;
  }
  public void set_age(Integer age) {
    this.age = age;
  }
  public employees with_age(Integer age) {
    this.age = age;
    return this;
  }
  private java.sql.Date birtday;
  public java.sql.Date get_birtday() {
    return birtday;
  }
  public void set_birtday(java.sql.Date birtday) {
    this.birtday = birtday;
  }
  public employees with_birtday(java.sql.Date birtday) {
    this.birtday = birtday;
    return this;
  }
  private String description;
  public String get_description() {
    return description;
  }
  public void set_description(String description) {
    this.description = description;
  }
  public employees with_description(String description) {
    this.description = description;
    return this;
  }
  public boolean equals(Object o) {
    if (this == o) {
      return true;
    }
    if (!(o instanceof employees)) {
      return false;
    }
    employees that = (employees) o;
    boolean equal = true;
    equal = equal && (this.id == null ? that.id == null : this.id.equals(that.id));
    equal = equal && (this.name == null ? that.name == null : this.name.equals(that.name));
    equal = equal && (this.age == null ? that.age == null : this.age.equals(that.age));
    equal = equal && (this.birtday == null ? that.birtday == null : this.birtday.equals(that.birtday));
    equal = equal && (this.description == null ? that.description == null : this.description.equals(that.description));
    return equal;
  }
  public void readFields(ResultSet __dbResults) throws SQLException {
    this.__cur_result_set = __dbResults;
    this.id = JdbcWritableBridge.readInteger(1, __dbResults);
    this.name = JdbcWritableBridge.readString(2, __dbResults);
    this.age = JdbcWritableBridge.readInteger(3, __dbResults);
    this.birtday = JdbcWritableBridge.readDate(4, __dbResults);
    this.description = JdbcWritableBridge.readString(5, __dbResults);
  }
  public void loadLargeObjects(LargeObjectLoader __loader)
      throws SQLException, IOException, InterruptedException {
  }
  public void write(PreparedStatement __dbStmt) throws SQLException {
    write(__dbStmt, 0);
  }

  public int write(PreparedStatement __dbStmt, int __off) throws SQLException {
    JdbcWritableBridge.writeInteger(id, 1 + __off, 4, __dbStmt);
    JdbcWritableBridge.writeString(name, 2 + __off, 12, __dbStmt);
    JdbcWritableBridge.writeInteger(age, 3 + __off, 4, __dbStmt);
    JdbcWritableBridge.writeDate(birtday, 4 + __off, 91, __dbStmt);
    JdbcWritableBridge.writeString(description, 5 + __off, 12, __dbStmt);
    return 5;
  }
  public void readFields(DataInput __dataIn) throws IOException {
    if (__dataIn.readBoolean()) { 
        this.id = null;
    } else {
    this.id = Integer.valueOf(__dataIn.readInt());
    }
    if (__dataIn.readBoolean()) { 
        this.name = null;
    } else {
    this.name = Text.readString(__dataIn);
    }
    if (__dataIn.readBoolean()) { 
        this.age = null;
    } else {
    this.age = Integer.valueOf(__dataIn.readInt());
    }
    if (__dataIn.readBoolean()) { 
        this.birtday = null;
    } else {
    this.birtday = new Date(__dataIn.readLong());
    }
    if (__dataIn.readBoolean()) { 
        this.description = null;
    } else {
    this.description = Text.readString(__dataIn);
    }
  }
  public void write(DataOutput __dataOut) throws IOException {
    if (null == this.id) { 
        __dataOut.writeBoolean(true);
    } else {
        __dataOut.writeBoolean(false);
    __dataOut.writeInt(this.id);
    }
    if (null == this.name) { 
        __dataOut.writeBoolean(true);
    } else {
        __dataOut.writeBoolean(false);
    Text.writeString(__dataOut, name);
    }
    if (null == this.age) { 
        __dataOut.writeBoolean(true);
    } else {
        __dataOut.writeBoolean(false);
    __dataOut.writeInt(this.age);
    }
    if (null == this.birtday) { 
        __dataOut.writeBoolean(true);
    } else {
        __dataOut.writeBoolean(false);
    __dataOut.writeLong(this.birtday.getTime());
    }
    if (null == this.description) { 
        __dataOut.writeBoolean(true);
    } else {
        __dataOut.writeBoolean(false);
    Text.writeString(__dataOut, description);
    }
  }
  private static final DelimiterSet __outputDelimiters = new DelimiterSet((char) 44, (char) 10, (char) 0, (char) 0, false);
  public String toString() {
    return toString(__outputDelimiters, true);
  }
  public String toString(DelimiterSet delimiters) {
    return toString(delimiters, true);
  }
  public String toString(boolean useRecordDelim) {
    return toString(__outputDelimiters, useRecordDelim);
  }
  public String toString(DelimiterSet delimiters, boolean useRecordDelim) {
    StringBuilder __sb = new StringBuilder();
    char fieldDelim = delimiters.getFieldsTerminatedBy();
    __sb.append(FieldFormatter.escapeAndEnclose(id==null?"null":"" + id, delimiters));
    __sb.append(fieldDelim);
    __sb.append(FieldFormatter.escapeAndEnclose(name==null?"null":name, delimiters));
    __sb.append(fieldDelim);
    __sb.append(FieldFormatter.escapeAndEnclose(age==null?"null":"" + age, delimiters));
    __sb.append(fieldDelim);
    __sb.append(FieldFormatter.escapeAndEnclose(birtday==null?"null":"" + birtday, delimiters));
    __sb.append(fieldDelim);
    __sb.append(FieldFormatter.escapeAndEnclose(description==null?"null":description, delimiters));
    if (useRecordDelim) {
      __sb.append(delimiters.getLinesTerminatedBy());
    }
    return __sb.toString();
  }
  private static final DelimiterSet __inputDelimiters = new DelimiterSet((char) 44, (char) 10, (char) 0, (char) 0, false);
  private RecordParser __parser;
  public void parse(Text __record) throws RecordParser.ParseError {
    if (null == this.__parser) {
      this.__parser = new RecordParser(__inputDelimiters);
    }
    List<String> __fields = this.__parser.parseRecord(__record);
    __loadFromFields(__fields);
  }

  public void parse(CharSequence __record) throws RecordParser.ParseError {
    if (null == this.__parser) {
      this.__parser = new RecordParser(__inputDelimiters);
    }
    List<String> __fields = this.__parser.parseRecord(__record);
    __loadFromFields(__fields);
  }

  public void parse(byte [] __record) throws RecordParser.ParseError {
    if (null == this.__parser) {
      this.__parser = new RecordParser(__inputDelimiters);
    }
    List<String> __fields = this.__parser.parseRecord(__record);
    __loadFromFields(__fields);
  }

  public void parse(char [] __record) throws RecordParser.ParseError {
    if (null == this.__parser) {
      this.__parser = new RecordParser(__inputDelimiters);
    }
    List<String> __fields = this.__parser.parseRecord(__record);
    __loadFromFields(__fields);
  }

  public void parse(ByteBuffer __record) throws RecordParser.ParseError {
    if (null == this.__parser) {
      this.__parser = new RecordParser(__inputDelimiters);
    }
    List<String> __fields = this.__parser.parseRecord(__record);
    __loadFromFields(__fields);
  }

  public void parse(CharBuffer __record) throws RecordParser.ParseError {
    if (null == this.__parser) {
      this.__parser = new RecordParser(__inputDelimiters);
    }
    List<String> __fields = this.__parser.parseRecord(__record);
    __loadFromFields(__fields);
  }

  private void __loadFromFields(List<String> fields) {
    Iterator<String> __it = fields.listIterator();
    String __cur_str = null;
    try {
    __cur_str = __it.next();
    if (__cur_str.equals("null") || __cur_str.length() == 0) { this.id = null; } else {
      this.id = Integer.valueOf(__cur_str);
    }

    __cur_str = __it.next();
    if (__cur_str.equals("null")) { this.name = null; } else {
      this.name = __cur_str;
    }

    __cur_str = __it.next();
    if (__cur_str.equals("null") || __cur_str.length() == 0) { this.age = null; } else {
      this.age = Integer.valueOf(__cur_str);
    }

    __cur_str = __it.next();
    if (__cur_str.equals("null") || __cur_str.length() == 0) { this.birtday = null; } else {
      this.birtday = java.sql.Date.valueOf(__cur_str);
    }

    __cur_str = __it.next();
    if (__cur_str.equals("null")) { this.description = null; } else {
      this.description = __cur_str;
    }

    } catch (RuntimeException e) {    throw new RuntimeException("Can't parse input data: '" + __cur_str + "'", e);    }  }

  public Object clone() throws CloneNotSupportedException {
    employees o = (employees) super.clone();
    o.birtday = (o.birtday != null) ? (java.sql.Date) o.birtday.clone() : null;
    return o;
  }

  public Map<String, Object> getFieldMap() {
    Map<String, Object> __sqoop$field_map = new TreeMap<String, Object>();
    __sqoop$field_map.put("id", this.id);
    __sqoop$field_map.put("name", this.name);
    __sqoop$field_map.put("age", this.age);
    __sqoop$field_map.put("birtday", this.birtday);
    __sqoop$field_map.put("description", this.description);
    return __sqoop$field_map;
  }

  public void setField(String __fieldName, Object __fieldVal) {
    if ("id".equals(__fieldName)) {
      this.id = (Integer) __fieldVal;
    }
    else    if ("name".equals(__fieldName)) {
      this.name = (String) __fieldVal;
    }
    else    if ("age".equals(__fieldName)) {
      this.age = (Integer) __fieldVal;
    }
    else    if ("birtday".equals(__fieldName)) {
      this.birtday = (java.sql.Date) __fieldVal;
    }
    else    if ("description".equals(__fieldName)) {
      this.description = (String) __fieldVal;
    }
    else {
      throw new RuntimeException("No such field: " + __fieldName);
    }
  }
}