elasticsearch批量數據導入和導出

jopen 8年前發布 | 57K 次閱讀 ElasticSearch 搜索引擎

 之前使用ES的時候建表Type時有個字段的類型搞錯了。以至于用API查詢時出錯。所以就研究一下ES API做了一下ES批量導出和導入重構了Type

1:Java API批量導出

 Settings settings = ImmutableSettings.settingsBuilder().put("cluster.name", "elasticsearch-bigdata").build();
        Client client = new TransportClient(settings)
                .addTransportAddress(new InetSocketTransportAddress("10.58.71.6", 9300));
 SearchResponse response = client.prepareSearch("bigdata").setTypes("student")
               .setQuery(QueryBuilders.matchAllQuery()).setSize(10000).setScroll(new TimeValue(6000                  00))
                .setSearchType(SearchType.SCAN).execute().actionGet();//setSearchType(SearchType.Scan) 告訴ES不需要排序只要結果返回即可 setScroll(new TimeValue(600000)) 設置滾動的時間
        String scrollid = response.getScrollId();
        try {
        //把導出的結果以JSON的格式寫到文件里
            BufferedWriter out = new BufferedWriter(new FileWriter("es", true));
            
            //每次返回數據10000條。一直循環查詢直到所有的數據都查詢出來
            while (true) {
                SearchResponse response2 = client.prepareSearchScroll(scrollid).setScroll(new TimeValue(1000000))
                        .execute().actionGet();
                SearchHits searchHit = response2.getHits();
                //再次查詢不到數據時跳出循環
                if (searchHit.getHits().length == 0) {
                    break;
                }
                System.out.println("查詢數量 :" + searchHit.getHits().length);
                for (int i = 0; i < searchHit.getHits().length; i++) {
                    String json = searchHit.getHits()[i].getSourceAsString();
                    out.write(json);
                    out.write("\r\n");
                }
            }
            System.out.println("查詢結束");
            out.close();
        } catch (FileNotFoundException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

2:Java API 批量導入

Settings settings = ImmutableSettings.settingsBuilder().put("cluster.name", "elasticsearch-bigdata").build();
        Client client = new TransportClient(settings)
                .addTransportAddress(new InetSocketTransportAddress("10.58.71.6", 9300));
        try {
        //讀取剛才導出的ES數據
            BufferedReader br = new BufferedReader(new FileReader("es"));
            String json = null;
            int count = 0;
            //開啟批量插入
            BulkRequestBuilder bulkRequest = client.prepareBulk();
            while ((json = br.readLine()) != null) {
                bulkRequest.add(client.prepareIndex("bigdata", "student").setSource(json));
                //每一千條提交一次
                if (count% 1000==0) {
                    bulkRequest.execute().actionGet();
                    System.out.println("提交了:" + count);
                }
                count++;
            }
            bulkRequest.execute().actionGet();
            System.out.println("插入完畢");
            br.close();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

 以上就是ES的批量導出和導入了。



來自: http://my.oschina.net/chiyong/blog/552622

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!