Python讀取Redis數據導出到Elasticsearch

jopen 9年前發布 | 11K 次閱讀 ElasticSearch 搜索引擎

#! usr/bin/python

 -- coding:utf-8 --

import redis import datetime from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk import sys, getopt

def usage(): print 'usage: python cmd.py -e <elasticsearch host> -r <redis host> -p <redis list key prefix>'

if name == 'main':

opts, args = getopt.getopt(sys.argv[1:], "e:r:p:")

elasticsearch_host = ''
redis_host = ''
redis_list_prefix= ''
for op, value in opts:
    if op == "-e":
        elasticsearch_host = value
    elif op == "-r":
        redis_host = value
    elif op == "-p":
        redis_list_prefix = value

if elasticsearch_host == '' or redis_host == '' or redis_list_prefix == '':
    usage()
    sys.exit(1)

client = Elasticsearch([{'host' : elasticsearch_host}])

r = redis.Redis(host = redis_host, port=6379)

doctype = "watchvideocount"

#get all keys ends with day description except today
videocount_keys = r.keys(redis_list_prefix + '_*')

#start query data
for k_index in videocount_keys:
    #get all <k, v> data
    kvalues = r.hgetall(k_index)

    #get day
    p = k_index.index('_')
    day = k_index[p+1:]   

    #make index name  
    index_name = k_index.lower()      
    #create index
    try:
        client.indices.create(index=index_name)
    except Exception:
        print 'index ', index_name, ' exist'
    namapping = {
            doctype: {
                "properties": {
                    "day":{"type": "date"},
                    "EID":{"type": "string"},
                    "CID":{"type": "string"},
                    "VID":{"type": "string"},
                    "VALUE":{"type": "integer"}
                }
            }
        }
    client.indices.put_mapping(index=index_name, doc_type=doctype, body=namapping)

    #data cache
    data_cache = []   
    for key in kvalues:
        fields = key.split('_')
        eid = fields[0]
        cid = fields[1]
        vid = fields[2]
        value = kvalues[key]
        #make primary key
        ID = day + '_' + eid + '_' + cid + '_' + vid      
        #make JSON data
        json_data = {'_index':index_name, '_type':doctype, "_id":ID, "DAY":day, "EID":eid, "CID":cid, "VID":vid, "VALUE":value}    
        #append data cache
        data_cache.append(json_data)

    #index into elasticsearch 
    bulk(client, actions=data_cache, stats_only=True)
    print k_index, 'done'</pre> <p></p>


來自: http://my.oschina.net/u/2242064/blog/553022

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!