自定義Spark Partitioner提升es-hadoop Bulk效率
前言
之前寫過一篇文章, 如何提高ElasticSearch 索引速度 。除了對ES本身的優化以外,我現在大體思路是盡量將邏輯外移到Spark上,Spark的分布式計算能力強,cpu密集型的很適合。這篇文章涉及的調整也是對 SparkES 多維分析引擎設計 中提及的一個重要概念“shard to partition ,partition to shard ” 的實現。不過目前只涉及到構建索引那塊。
問題描述
當你bulk數據到集群,按照 ElasticSearch Bulk 源碼解析 所描述的:
接著通過executeBulk方法進入原來的流程。在該方法中,對bulkRequest.requests 進行了兩次for循環。
第一次判定如果是IndexRequest就調用IndexRequest.process方法,主要是為了解析出timestamp,routing,id,parent 等字段。
第二次是為了對數據進行分揀。大致是為了形成這么一種結構:
第二次就是對提交的數據進行分揀,然后根據route/_id 等值找到每個數據所屬的Shard,最后將數據發送到對應Shard所在的Node節點上。
然而這導致了兩個問題:
- ES Node之間會形成N*N個連接,消耗掉過多的bulk線程
- 出現了很多并不需要的網絡IO
所以我們希望能夠避免這種情況。
Spark Partition to ES Shard
我們希望能夠將分揀的邏輯放到Spark端,保證Spark 的Partition 和ES的Shard 一一對應,并且實現特定的Partitoner 保證數據到達ES都會被對應的Shard所在的節點直接消費,而不會再被轉發到其他節點。經過我的實際測試,做了該調整后,寫入QPS有兩倍以上的提升
理論基礎
這里的理論基礎自然是es-hadoop項目。
類的調用路徑關系為:
EsSpark ->
EsRDDWriter ->
RestService ->
RestRepository ->
RestClient ->
NetworkClient ->
CommonsHttpTransport
簡單介紹下他們的作用:
- EsSpark, 讀取ES和存儲ES的入口。通過隱式轉換,會顯得更Spark.
- EsRDDWriter ,調用RestService創建PartitionWriter,對ES進行數據寫入
- RestService,負責創建 RestRepository,PartitionWriter
- RestRepository,bulk高層抽象,底層利用NetworkClient做真實的http請求,另外也維護Buffer相關的,典型比如積攢了多少條,多少M之后進行flush等。
- NetworkClient 對 CommonsHttpTransport的封裝,主要添加了一些節點校驗功能。
- CommonsHttpTransport 你可以認為是對HttpClient的一個封裝
原來我以為需要對es-hadoop項目的源碼進行修改才能實現前面提到的邏輯。事實上基于es-hadoop很容易實現上面提到的需求。
在RestService類里,構建RestRepository的時候,會判定是多索引還是單索引。對應代碼如下:
RestRepository repository = (iformat.hasPattern() ?
initMultiIndices(settings, currentSplit, resource, log) :
initSingleIndex(settings, currentSplit, resource, log));
這里我們只解析單索引部分代碼,在對應的initSingleIndex方法里有如下代碼:
int bucket = currentInstance % targetShards.size();
Shard chosenShard = orderedShards.get(bucket);
Node targetNode = targetShards.get(chosenShard);
- targetShards 是索引所有的主分片到對應Node節點的映射。
- orderedShards 則是根據shardId 順序排序Shard集合
- currentInstance 是partitionId
這里我們假設partitonId 和 shardId 是對應的。也就是partitionId 0 里的數據,都是屬于shardId 為0,則,
- currentInstance < targetShards.size()
- bucket == currentInstance
- 對應的targetNode持有的Shard正好可以消化掉currentInstance partition 分區的數據
所以我們唯一要做的就是實現一個Partitioner就好。
ESShardPartitioner 實現
涉及到這塊的主要有 es-hadoop 的mr以及 spark模塊。在mr模塊里包含了ES的分片規則實現。 spark 模塊則包含ESShardPartitioner類。
代碼如下:
package org.elasticsearch.spark
import ....
class ESShardPartitioner(settings:String) extends Partitioner {
protected val log = LogFactory.getLog(this.getClass())
protected var _numPartitions = -1
override def numPartitions: Int = {
val newSettings = new PropertiesSettings().load(settings)
val repository = new RestRepository(newSettings)
val targetShards = repository.getWriteTargetPrimaryShards(newSettings.getNodesClientOnly())
repository.close()
_numPartitions = targetShards.size()
_numPartitions
}
override def getPartition(key: Any): Int = {
val shardId = ShardAlg.shard(key.toString(), _numPartitions)
shardId
}
}
public class ShardAlg {
public static int shard(String id, int shardNum) {
int hash = Murmur3HashFunction.hash(id);
return mod(hash, shardNum);
}
public static int mod(int v, int m) {
int r = v % m;
if (r < 0) {
r += m;
}
return r;
}
}</code></pre>
使用方式如下:
......partitionBy(new ESShardPartitioner(settings)).foreachPartition { iter =>
try {
val newSettings = new PropertiesSettings().load(settings)
//創建EsRDDWriter
val writer = EsRDDCreator.createWriter(newSettings.save())
writer.write(TaskContext.get(), iter.map(f => f._2))
}
不過這種方式也是有一定大問題,就是如果ES的Shard數目比較小,導致Spark的partition數也小,寫入并發性會受到影響。
所以這里有第二套方案:
- 修改ESShardPartitioner,可以讓多個分區對應一個Shard
- 每個分區通過EsRDDWriter指定shardId進行寫入。
第二點可能需要修改es-hadoop源碼了,不過修改也很簡單,通過settings傳遞shardId,然后在RestService.initSingleIndex添加如下代碼:
if(settings.getProperty(ConfigurationOptions.ES_BULK_SHARDID) != null){
targetNode = targetShards.get(
Integer.parseInt(
settings.getProperty(ConfigurationOptions.ES_BULK_SHARDID)));
}
在創建EsRDDWriter時拷貝settings的副本并且加入對應的ConfigurationOptions.ES_BULK_SHARDID.
來源:http://www.jianshu.com/p/cccc56e39429?utm_source=tuicool&utm_medium=referral