HBase性能優化方法總結(4):讀表操作
本文主要是從HBase應用程序設計與開發的角度,總結幾種常用的性能優化方法。有關HBase系統配置級別的優化,可參考:淘寶Ken Wu同學的博客。
下面是本文總結的第三部分內容:讀表操作相關的優化方法。
3. 讀表操作
3.1 多HTable并發讀
創建多個HTable客戶端用于讀操作,提高讀數據的吞吐量,一個例子:
static final Configuration conf = HBaseConfiguration.create(); static final String table_log_name = “user_log”; rTableLog = new HTable[tableN]; for (int i = 0; i < tableN; i++) { rTableLog[i] = new HTable(conf, table_log_name); rTableLog[i].setScannerCaching(50); }
3.2 HTable參數設置
3.2.1 Scanner Caching
通過調用HTable.setScannerCaching(int scannerCaching)可以設置HBase scanner一次從服務端抓取的數據條數,默認情況下一次一條。通過將此值設置成一個合理的值,可以減少scan過程中next()的時間開銷,代價是scanner需要通過客戶端的內存來維持這些被cache的行記錄。
3.2.2 Scan Attribute Selection
scan時指定需要的Column Family,可以減少網絡傳輸數據量,否則默認scan操作會返回整行所有Column Family的數據。
3.2.3 Close ResultScanner
通過scan取完數據后,記得要關閉ResultScanner,否則RegionServer可能會出現問題(對應的Server資源無法釋放)。
3.3 批量讀
通過調用HTable.get(Get)方法可以根據一個指定的row key獲取一行記錄,同樣HBase提供了另一個方法:通過調用HTable.get(List<Get>)方法可以根據一個指定的row key列表,批量獲取多行記錄,這樣做的好處是批量執行,只需要一次網絡I/O開銷,這對于對數據實時性要求高而且網絡傳輸RTT高的情景下可能帶來明顯的性能提升。
3.4 多線程并發讀
在客戶端開啟多個HTable讀線程,每個讀線程負責通過HTable對象進行get操作。下面是一個多線程并發讀取HBase,獲取店鋪一天內各分鐘PV值的例子:
public class DataReaderServer { //獲取店鋪一天內各分鐘PV值的入口函數 public static ConcurrentHashMap<String, String> getUnitMinutePV(long uid, long startStamp, long endStamp){ long min = startStamp; int count = (int)((endStamp - startStamp) / (601000)); List<String> lst = new ArrayList<String>(); for (int i = 0; i <= count; i++) { min = startStamp + i 60 * 1000; lst.add(uid + "_" + min); } return parallelBatchMinutePV(lst); } //多線程并發查詢,獲取分鐘PV值 private static ConcurrentHashMap<String, String> parallelBatchMinutePV(List<String> lstKeys){ ConcurrentHashMap<String, String> hashRet = new ConcurrentHashMap<String, String>(); int parallel = 3; List<List<String>> lstBatchKeys = null; if (lstKeys.size() < parallel ){ lstBatchKeys = new ArrayList<List<String>>(1); lstBatchKeys.add(lstKeys); } else{ lstBatchKeys = new ArrayList<List<String>>(parallel); for(int i = 0; i < parallel; i++ ){ List<String> lst = new ArrayList<String>(); lstBatchKeys.add(lst); }for(int i = 0 ; i < lstKeys.size() ; i ++ ){ lstBatchKeys.get(i%parallel).add(lstKeys.get(i)); } } List<Future< ConcurrentHashMap<String, String> >> futures = new ArrayList<Future< ConcurrentHashMap<String, String> >>(5); ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); builder.setNameFormat("ParallelBatchQuery"); ThreadFactory factory = builder.build(); ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(lstBatchKeys.size(), factory); for(List<String> keys : lstBatchKeys){ Callable< ConcurrentHashMap<String, String> > callable = new BatchMinutePVCallable(keys); FutureTask< ConcurrentHashMap<String, String> > future = (FutureTask< ConcurrentHashMap<String, String> >) executor.submit(callable); futures.add(future); } executor.shutdown(); // Wait for all the tasks to finish try { boolean stillRunning = !executor.awaitTermination( 5000000, TimeUnit.MILLISECONDS); if (stillRunning) { try { executor.shutdownNow(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } } catch (InterruptedException e) { try { Thread.currentThread().interrupt(); } catch (Exception e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } // Look for any exception for (Future f : futures) { try { if(f.get() != null) { hashRet.putAll((ConcurrentHashMap<String, String>)f.get()); } } catch (InterruptedException e) { try { Thread.currentThread().interrupt(); } catch (Exception e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } catch (ExecutionException e) { e.printStackTrace(); } } return hashRet; } //一個線程批量查詢,獲取分鐘PV值 protected static ConcurrentHashMap<String, String> getBatchMinutePV(List<String> lstKeys){ ConcurrentHashMap<String, String> hashRet = null; List<Get> lstGet = new ArrayList<Get>(); String[] splitValue = null; for (String s : lstKeys) { splitValue = s.split("_"); long uid = Long.parseLong(splitValue[0]); long min = Long.parseLong(splitValue[1]); byte[] key = new byte[16]; Bytes.putLong(key, 0, uid); Bytes.putLong(key, 8, min); Get g = new Get(key); g.addFamily(fp); lstGet.add(g); } Result[] res = null; try { res = tableMinutePV[rand.nextInt(tableN)].get(lstGet); } catch (IOException e1) { logger.error("tableMinutePV exception, e=" + e1.getStackTrace()); } if (res != null && res.length > 0) { hashRet = new ConcurrentHashMap<String, String>(res.length); for (Result re : res) { if (re != null && !re.isEmpty()) { try { byte[] key = re.getRow(); byte[] value = re.getValue(fp, cp); if (key != null && value != null) { hashRet.put(String.valueOf(Bytes.toLong(key, Bytes.SIZEOF_LONG)), String.valueOf(Bytes .toLong(value))); } } catch (Exception e2) { logger.error(e2.getStackTrace()); } } } } return hashRet; }
} //調用接口類,實現Callable接口 class BatchMinutePVCallable implements Callable<ConcurrentHashMap<String, String>>{ private List<String> keys;
public BatchMinutePVCallable(List<String> lstKeys ) { this.keys = lstKeys; } public ConcurrentHashMap<String, String> call() throws Exception { return DataReadServer.getBatchMinutePV(keys); }
}</pre>
3.5 緩存查詢結果
對于頻繁查詢HBase的應用場景,可以考慮在應用程序中做緩存,當有新的查詢請求時,首先在緩存中查找,如果存在則直接返回,不再查詢HBase;否則對HBase發起讀請求查詢,然后在應用程序中將查詢結果緩存起來。至于緩存的替換策略,可以考慮LRU等常用的策略。
3.6 Blockcache
HBase上Regionserver的內存分為兩個部分,一部分作為Memstore,主要用來寫;另外一部分作為BlockCache,主要用于讀。
寫請求會先寫入Memstore,Regionserver會給每個region提供一個Memstore,當Memstore滿64MB以后,會啟動 flush刷新到磁盤。當Memstore的總大小超過限制時(heapsize * hbase.regionserver.global.memstore.upperLimit * 0.9),會強行啟動flush進程,從最大的Memstore開始flush直到低于限制。
讀請求先到Memstore中查數據,查不到就到BlockCache中查,再查不到就會到磁盤上讀,并把讀的結果放入BlockCache。由于BlockCache采用的是LRU策略,因此BlockCache達到上限(heapsize * hfile.block.cache.size * 0.85)后,會啟動淘汰機制,淘汰掉最老的一批數據。
一個Regionserver上有一個BlockCache和N個Memstore,它們的大小之和不能大于等于heapsize * 0.8,否則HBase不能啟動。默認BlockCache為0.2,而Memstore為0.4。對于注重讀響應時間的系統,可以將 BlockCache設大些,比如設置BlockCache=0.4,Memstore=0.39,以加大緩存的命中率。
有關BlockCache機制,請參考這里:HBase的Block cache,HBase的blockcache機制,hbase中的緩存的計算與使用。
轉載自:http://www.cnblogs.com/panfeng412/archive/2012/03/08/hbase-performance-tuning-section3.html