實戰:在Java Web 項目中使用HBase

n342 10年前發布 | 99K 次閱讀 HBase NoSQL數據庫

在此之前我們使用Mysql作為數據源,但發現這數據增長速度太快,并且由于種種原因,因此必須使用HBase,所以我們要把Mysql表里面的數據遷移到HBase中,在這里我就不講解、不爭論為什么要使用HBase,HBase是什么了,喜歡的就認真看下去,總有些地方是有用的

我們要做的3大步驟:


  1. 新建HBase表格。

  2. 把MYSQL數據遷移到HBase中。

  3. 在Java Web項目中讀取HBase的數據。


先介紹一下必要的一些環境:

HBase的版本:0.98.8-hadoop2

所需的依賴包

commons-codec-1.7.jar
commons-collections-3.2.1.jar
commons-configuration-1.6.jar
commons-lang-2.6.jar
commons-logging-1.1.3.jar
guava-12.0.1.jar
hadoop-auth-2.5.0.jar
hadoop-common-2.5.0.jar
hbase-client-0.98.8-hadoop2.jar
hbase-common-0.98.8-hadoop2.jar
hbase-protocol-0.98.8-hadoop2.jar
htrace-core-2.04.jar
jackson-core-asl-1.9.13.jar
jackson-mapper-asl-1.9.13.jar
log4j-1.2.17.jar
mysql-connector-java-5.1.7-bin.jar
netty-3.6.6.Final.jar
protobuf-java-2.5.0.jar
slf4j-api-1.7.5.jar
slf4j-log4j12-1.7.5.jar
zookeeper-3.4.6.jar

如果在你的web項目中有些包已經存在,保留其中一個就好了,免得報奇怪的錯誤就麻煩了。


步驟1:建表

在此之前,我在Mysql中的業務數據表一共有6個,其結構重復性太高了,首先看看我在HBase里面的表結構:

表名 kpi














key fid+tid+date














簇(family) base gpower userate consum time



























描述 基礎信息 發電量相關指標 可利用率 自耗電量 累計運行小時數 檢修小時數 利用小時數








列(qualifier) fid tid date power windspeed unpower theory coup time power num cpower gpower runtime checktime usetime
描述 風場ID 風機號 日期 發電量 風速 棄風電量 理論電量 耦合度 故障時間 故障損失電量 故障臺次 當天自耗電量 當天發電量 當天并網秒數 當天檢修秒數 當天利用秒數

這個表中我們有5個family,其中base Family是對應6個mysql表中的key列, gpower、userate、consum分別對應一個表,time對應3個表。

這個kpi表的rowkey設計是base中的3個qualifier,分別從3個維度查詢數據,這樣的設計已經可以滿足我們的需求了。

具體在HBase中如何建表如何搭建環境自己參考我寫的【手把手教你配置HBase完全分布式環境】這篇文章吧。


步驟2:把MySQL數據遷移到HBase


這時我用gpower對應的mysql表來做演示吧,其他表的道理都一樣。(這里可能有人會說為什么不用第三方插件直接數據庫對數據庫遷移,這里我統一回答一下,我不會,我也不需要。)

okay,首先我們來看看代碼先吧:

import java.io.File;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;

public class GpowerTransfer{

    private static final String QUOREM = "192.168.103.50,192.168.103.51,192.168.103.52,192.168.103.53,192.168.103.54,192.168.103.55,192.168.103.56,192.168.103.57,192.168.103.58,192.168.103.59,192.168.103.60";//這里是你HBase的分布式集群結點,用逗號分開。
    private static final String CLIENT_PORT = "2181";//端口
    private static Logger log = Logger.getLogger(GpowerTransfer.class);

    /**
     * @param args
     */
    public static void main(String[] args) {
        BasicConfigurator.configure();
        log.setLevel(Level.DEBUG);
        String tableName = "kpi";//HBase表名稱
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", QUOREM);   
        conf.set("hbase.zookeeper.property.clientPort", CLIENT_PORT);
        try { File workaround = new File(".");
            System.getProperties().put("hadoop.home.dir", workaround.getAbsolutePath());
            new File("./bin").mkdirs();
            new File("./bin/winutils.exe").createNewFile();//這幾段奇怪的代碼在windows跑的時候不加有時候分報錯,在web項目中可以不要,但單獨的java程序還是加上去吧,知道什么原因的小伙伴可以告訴我一下,不勝感激。
            HBaseAdmin admin = new HBaseAdmin(conf);
            if(admin.tableExists(tableName)){
                Class.forName("com.mysql.jdbc.Driver");//首先將mysql中的數據讀取出來,然后再插入到HBase中
                String url = "jdbc:mysql://192.168.***.***:3306/midb?useUnicode=true&characterEncoding=utf-8";
                String username = "********";
                String password = "********";
                Connection con = DriverManager.getConnection(url, username, password);
                PreparedStatement pstmt = con.prepareStatement("select * from kpi_gpower");
                ResultSet rs = pstmt.executeQuery();
                HTable table = new HTable(conf, tableName);
                log.debug(tableName + ":start copying data to hbase...");
                List<Put> list = new ArrayList<Put>();
                SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
                String base = "base";//family名稱
                String gpower = "gpower";//family名稱
                String[] qbase = {"fid","tid","date"};//qualifier名稱
                String[] qgpower = {"power","windspeed","unpower","theory","coup"};//qualifier名稱
                while(rs.next()){
                    String rowKey = rs.getString("farmid") + ":" + (rs.getInt("turbineid")<10?("0"+rs.getInt("turbineid")):rs.getInt("turbineid")) + ":" + sdf.format(rs.getDate("vtime"));//拼接rowkey
                    Put put = new Put(Bytes.toBytes(rowKey));//新建一條記錄,然后下面對相應的列進行賦值
                    put.add(base.getBytes(), qbase[0].getBytes(), Bytes.toBytes(rs.getString("farmid")));//base:fid
                    put.add(base.getBytes(), qbase[1].getBytes(), Bytes.toBytes(rs.getInt("turbineid")+""));//base:tid
                    put.add(base.getBytes(), qbase[2].getBytes(), Bytes.toBytes(rs.getDate("vtime")+""));//base:date
                    put.add(gpower.getBytes(), qgpower[0].getBytes(), Bytes.toBytes(rs.getFloat("value")+""));//gpower:power
                    put.add(gpower.getBytes(), qgpower[1].getBytes(), Bytes.toBytes(rs.getFloat("windspeed")+""));//gpower:windspeed
                    put.add(gpower.getBytes(), qgpower[2].getBytes(), Bytes.toBytes(rs.getFloat("unvalue")+""));//gpower:unvalue
                    put.add(gpower.getBytes(), qgpower[3].getBytes(), Bytes.toBytes(rs.getFloat("theory")+""));//gpower:theory
                    put.add(gpower.getBytes(), qgpower[4].getBytes(), Bytes.toBytes(rs.getFloat("coup")+""));//gpower:coup
                    list.add(put);
                }
                table.put(list);//這里真正對表進行插入操作
                log.debug(tableName + ":completed data copy!");
                table.close();//這里要非常注意一點,如果你頻繁地對表進行打開跟關閉,性能將會直線下降,可能跟集群有關系。
            }else{
                admin.close();
                log.error("table '" + tableName + "' not exisit!");
                throw new IllegalArgumentException("table '" + tableName + "' not exisit!");
            }
            admin.close();
        } catch (Exception e) {
            e.printStackTrace();
        } 
    }
}



在put語句進行add的時候要特別注意:對于int、float、Date等等非String類型的數據,要記得將其轉換成String類型,這里我直接用+""解決了,否則在你讀取數據的時候就會遇到麻煩了。

步驟3:Java Web項目讀取HBase里面的數據

ok,我們成功地把數據遷移到HBase,我們剩下的任務就是在Web應用中讀取數據了。

首先我們要確保Web項目中已經把必要的Jar包添加到ClassPath了,下面我對一些HBase的連接做了小封裝:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HBaseAdmin;

/**
 * @author a01513
 *
 */
public class HBaseConnector {

    private static final String QUOREM = "192.168.103.50,192.168.103.51,192.168.103.52,192.168.103.53,192.168.103.54,192.168.103.55,192.168.103.56,192.168.103.57,192.168.103.58,192.168.103.59,192.168.103.60";
    private static final String CLIENT_PORT = "2181";
    private HBaseAdmin admin;
    private Configuration conf;


    public HBaseAdmin getHBaseAdmin(){
        getConfiguration();
        try {
            admin = new HBaseAdmin(conf);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return admin; 
    }

    public Configuration getConfiguration(){
        if(conf == null){
            conf = HBaseConfiguration.create();
            conf.set("hbase.zookeeper.quorum", QUOREM);   
            conf.set("hbase.zookeeper.property.clientPort", CLIENT_PORT);   
        }
        return conf;
    }



這里的代碼基本上跟遷移的那部分代碼一樣,由于我在其他地方都要重用這些代碼,就裝在一個地方免得重復寫了。

我在Service層做了一下測試,下面看看具體的讀取過程:

private final String tableName = "kpi";
@Override
    public List<GenPowerEntity> getGenPower(String farmid,int ltb,int htb,String start,String end) {
        List<GenPowerEntity> list = new ArrayList<GenPowerEntity>();
        HBaseConnector hbaseConn = new HBaseConnector();
        HBaseAdmin admin = hbaseConn.getHBaseAdmin();
        try {
            if(admin.tableExists(tableName)){
                HTable table = new HTable(hbaseConn.getConfiguration(), tableName);
                Scan scan = new Scan();
                scan.addFamily(Bytes.toBytes("base"));
                scan.addFamily(Bytes.toBytes("gpower"));
                scan.addFamily(Bytes.toBytes("userate"));
                String startRowKey = new String();
                String stopRowKey = new String();
                if("".equals(start) && !"".equals(end)){
                    stopRowKey = farmid + ":" + Tools.addZero(htb) + ":" + end;
                    scan.setStopRow(Bytes.toBytes(stopRowKey));
                }else if(!"".equals(start) && "".equals(end)){
                    startRowKey = farmid + ":" + Tools.addZero(ltb) + ":" + start;
                    scan.setStartRow(Bytes.toBytes(startRowKey));
                }else if(!"".equals(start) && !"".equals(end)){
                    startRowKey = farmid + ":" + Tools.addZero(ltb) + ":" + start;
                    stopRowKey = farmid + ":" + Tools.addZero(htb) + ":" + end;
                    scan.setStartRow(Bytes.toBytes(startRowKey));
                    scan.setStopRow(Bytes.toBytes(stopRowKey));
                }else{
                    table.close();
                    admin.close();
                    return null;
                }
                ResultScanner rsc = table.getScanner(scan);
                Iterator<Result> it = rsc.iterator();
                List<GenPowerEntity> slist = new ArrayList<GenPowerEntity>();
                List<UseRateEntity> ulist = new ArrayList<UseRateEntity>();
                String tempRowKey = "";//這個臨時rowkey是用來判斷一行數據是否已經讀取完了的。
                GenPowerEntity gpower = new GenPowerEntity();
                UseRateEntity userate = new UseRateEntity();
                while(it.hasNext()){
                    for(Cell cell: it.next().rawCells()){
                        String rowKey = new String(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength(),"UTF-8");
                        String family = new String(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength(),"UTF-8");
                        String qualifier = new String(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength(),"UTF-8");
                        String value = new String(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength(),"UTF-8");//假如我們當時插入HBase的時候沒有把int、float等類型的數據轉換成String,這里就會亂碼了,并且用Bytes.toInt()這個方法還原也沒有用,哈哈
                        System.out.println("RowKey=>"+rowKey+"->"+family+":"+qualifier+"="+value);
                        if("".equals(tempRowKey))
                            tempRowKey = rowKey;
                        if(!rowKey.equals(tempRowKey)){
                            slist.add(gpower);
                            ulist.add(userate);
                            gpower = null;
                            userate = null;
                            gpower = new GenPowerEntity();
                            userate = new UseRateEntity();
                            tempRowKey = rowKey;
                        }
                        switch(family){
                        case "base":
                            switch(qualifier){
                            case "fid":
                                gpower.setFarmid(value);
                                userate.setFarmid(value);
                                break;
                            case "tid":
                                gpower.setTurbineid(Integer.parseInt(value));
                                userate.setTurbineid(Integer.parseInt(value));
                                break;
                            case "date":
                                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
                                Date date = null;
                                try {
                                    date = sdf.parse(value);
                                } catch (ParseException e) {
                                    e.printStackTrace();
                                }
                                gpower.setVtime(date);
                                userate.setVtime(date);
                                break;
                            }
                            break;
                        case "gpower":
                            switch(qualifier){
                            case "power":
                                gpower.setValue(Float.parseFloat(value));
                                break;
                            case "windspeed":
                                gpower.setWindspeed(Float.parseFloat(value));
                                break;
                            case "unpower":
                                gpower.setUnvalue(Float.parseFloat(value));
                                break;
                            case "theory":
                                gpower.setTvalue(Float.parseFloat(value));
                                break;
                            case "coup":
                                gpower.setCoup(Float.parseFloat(value));
                                break;
                            }
                            break;
                        case "userate":
                            switch(qualifier){
                            case "num":
                                userate.setFnum(Integer.parseInt(value));
                                break;
                            case "power":
                                userate.setFpower(Float.parseFloat(value));
                                break;
                            case "time":
                                userate.setFvalue(Float.parseFloat(value));
                                break;
                            }
                            break;
                        }

                    }
                }
                rsc.close();
                table.close();
                admin.close();
                ......
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return list;
    }



這是我在Service層中用作測試的一個方法,業務邏輯代碼可以直接無視(已經用.....代替了,哈哈),至此我們的所有工作完成,對于更深入的應用,還要靠自己去認真挖掘學習了。

來自:http://my.oschina.net/lanzp/blog/398644

 本文由用戶 n342 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!