自定義Spark Partitioner提升es-hadoop Bulk效率

前言

之前寫過一篇文章, 如何提高ElasticSearch 索引速度 。除了對ES本身的優化以外,我現在大體思路是盡量將邏輯外移到Spark上,Spark的分布式計算能力強,cpu密集型的很適合。這篇文章涉及的調整也是對 SparkES 多維分析引擎設計 中提及的一個重要概念“shard to partition ,partition to shard ” 的實現。不過目前只涉及到構建索引那塊。

問題描述

當你bulk數據到集群,按照 ElasticSearch Bulk 源碼解析 所描述的:

接著通過executeBulk方法進入原來的流程。在該方法中,對bulkRequest.requests 進行了兩次for循環。

第一次判定如果是IndexRequest就調用IndexRequest.process方法,主要是為了解析出timestamp,routing,id,parent 等字段。

第二次是為了對數據進行分揀。大致是為了形成這么一種結構:

第二次就是對提交的數據進行分揀,然后根據route/_id 等值找到每個數據所屬的Shard,最后將數據發送到對應Shard所在的Node節點上。

然而這導致了兩個問題:

  1. ES Node之間會形成N*N個連接,消耗掉過多的bulk線程
  2. 出現了很多并不需要的網絡IO

所以我們希望能夠避免這種情況。

Spark Partition to ES Shard

我們希望能夠將分揀的邏輯放到Spark端,保證Spark 的Partition 和ES的Shard 一一對應,并且實現特定的Partitoner 保證數據到達ES都會被對應的Shard所在的節點直接消費,而不會再被轉發到其他節點。經過我的實際測試,做了該調整后,寫入QPS有兩倍以上的提升

理論基礎

這里的理論基礎自然是es-hadoop項目。

類的調用路徑關系為:

EsSpark -> 
     EsRDDWriter -> 
           RestService -> 
                  RestRepository -> 
                            RestClient ->
                                NetworkClient -> 
                                        CommonsHttpTransport

簡單介紹下他們的作用:

  • EsSpark, 讀取ES和存儲ES的入口。通過隱式轉換,會顯得更Spark.
  • EsRDDWriter ,調用RestService創建PartitionWriter,對ES進行數據寫入
  • RestService,負責創建 RestRepository,PartitionWriter
  • RestRepository,bulk高層抽象,底層利用NetworkClient做真實的http請求,另外也維護Buffer相關的,典型比如積攢了多少條,多少M之后進行flush等。
  • NetworkClient 對 CommonsHttpTransport的封裝,主要添加了一些節點校驗功能。
  • CommonsHttpTransport 你可以認為是對HttpClient的一個封裝

原來我以為需要對es-hadoop項目的源碼進行修改才能實現前面提到的邏輯。事實上基于es-hadoop很容易實現上面提到的需求。

在RestService類里,構建RestRepository的時候,會判定是多索引還是單索引。對應代碼如下:

RestRepository repository = (iformat.hasPattern() ?
 initMultiIndices(settings, currentSplit, resource, log) : 
initSingleIndex(settings, currentSplit, resource, log));

這里我們只解析單索引部分代碼,在對應的initSingleIndex方法里有如下代碼:

int bucket = currentInstance % targetShards.size();
Shard chosenShard = orderedShards.get(bucket);
Node targetNode = targetShards.get(chosenShard);
  • targetShards 是索引所有的主分片到對應Node節點的映射。
  • orderedShards 則是根據shardId 順序排序Shard集合
  • currentInstance 是partitionId

這里我們假設partitonId 和 shardId 是對應的。也就是partitionId 0 里的數據,都是屬于shardId 為0,則,

  • currentInstance < targetShards.size()
  • bucket == currentInstance
  • 對應的targetNode持有的Shard正好可以消化掉currentInstance partition 分區的數據

所以我們唯一要做的就是實現一個Partitioner就好。

ESShardPartitioner 實現

涉及到這塊的主要有 es-hadoop 的mr以及 spark模塊。在mr模塊里包含了ES的分片規則實現。 spark 模塊則包含ESShardPartitioner類。

代碼如下:

package org.elasticsearch.spark
import ....
class ESShardPartitioner(settings:String) extends Partitioner {
      protected val log = LogFactory.getLog(this.getClass())

  protected var _numPartitions = -1 

  override def numPartitions: Int = {   
    val newSettings = new PropertiesSettings().load(settings)
    val repository = new RestRepository(newSettings)
    val targetShards = repository.getWriteTargetPrimaryShards(newSettings.getNodesClientOnly())
    repository.close()
    _numPartitions = targetShards.size()
    _numPartitions
  }

  override def getPartition(key: Any): Int = {
    val shardId = ShardAlg.shard(key.toString(), _numPartitions)
    shardId
  }

}

public class ShardAlg { public static int shard(String id, int shardNum) { int hash = Murmur3HashFunction.hash(id); return mod(hash, shardNum); }

public static int mod(int v, int m) {
    int r = v % m;
    if (r < 0) {
        r += m;
    }
    return r;
}

}</code></pre>

使用方式如下:

......partitionBy(new ESShardPartitioner(settings)).foreachPartition { iter =>
      try {
        val newSettings = new PropertiesSettings().load(settings)
        //創建EsRDDWriter
        val writer = EsRDDCreator.createWriter(newSettings.save())
        writer.write(TaskContext.get(), iter.map(f => f._2))        
      }

不過這種方式也是有一定大問題,就是如果ES的Shard數目比較小,導致Spark的partition數也小,寫入并發性會受到影響。

所以這里有第二套方案:

  1. 修改ESShardPartitioner,可以讓多個分區對應一個Shard
  2. 每個分區通過EsRDDWriter指定shardId進行寫入。

第二點可能需要修改es-hadoop源碼了,不過修改也很簡單,通過settings傳遞shardId,然后在RestService.initSingleIndex添加如下代碼:

if(settings.getProperty(ConfigurationOptions.ES_BULK_SHARDID) != null){   
targetNode = targetShards.get(
Integer.parseInt(
settings.getProperty(ConfigurationOptions.ES_BULK_SHARDID)));
}

在創建EsRDDWriter時拷貝settings的副本并且加入對應的ConfigurationOptions.ES_BULK_SHARDID.

來源:http://www.jianshu.com/p/cccc56e39429?utm_source=tuicool&utm_medium=referral

 本文由用戶 ukon7587 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!