Logstash+ElasticSearch處理mysql慢查詢日志
遇到一個需求, 需要查詢某些業務的慢查詢日志. 結果DBA平臺那邊提供的慢查詢日志不能解決實際的業務場景(上報的字段補全), 無奈, 自己挽起袖子上
參考了 這篇文章 , 不過自己根據需求做了較多的變更
開始吧
1. 找到日志的位置
先確認是否開啟了, 然后找到日志文件的位置
> show variables like '%slow%'; +---------------------+-------------------------------------+ | Variable_name | Value | +---------------------+-------------------------------------+ | log_slow_queries | ON | | slow_launch_time | 2 | | slow_query_log | ON | | slow_query_log_file | /data/mysqllog/20000/slow-query.log | +---------------------+-------------------------------------+
2. 慢查詢日志
格式基本是如下, 當然, 格式如果有差異, 需要根據具體格式進行小的修改
# Time: 160524 5:12:29 # User@Host: user_a[xxxx] @ [10.166.140.109] # Query_time: 1.711086 Lock_time: 0.000040 Rows_sent: 385489 Rows_examined: 385489 use dbname; SET timestamp=1464037949; SELECT 1 from dbname;
3. 使用 logstash 采集
采集, 無非是用 multiline 進行多行解析
但是, 需要處理的
第一個是, 去除掉沒用的信息
第二個, 慢查詢sql, 是會反復出現的, 所以, 執行次數成了一個很重要的指標. 我們要做的, 就是 降噪 (將參數去掉, 涉及帶引號的內容+數字), 將參數類信息過濾掉, 留下核心的sql, 然后計算出一個hash, 這樣就可以在查詢, 根據這個字段進行聚合. 這里用到了 mutate 以及 checksum
# calculate unique hash mutate { add_field => {"sql_for_hash" => "%{sql}"} } mutate { gsub => [ "sql_for_hash", "'.+?'", "", "sql_for_hash", "-?\d*\.{0,1}\d+", "" ] } checksum { algorithm => "md5" keys => ["sql_for_hash"] }
最后算出來的md5, 放入了 logstash_checksum
完整的logstash配置文件(具體使用可能需要根據自身日志格式做些小調整) 注意, 里面的pattern ALLWORD [\s\S]*
input { file { path => ["/data/mysqllog/20000/slow-query.log"] sincedb_path => "/data/LogNew/logstash/sincedb/mysql.sincedb" type => "mysql-slow-log" add_field => ["env", "PRODUCT"] codec => multiline { pattern => "^# User@Host:" negate => true what => previous } } } filter { grok { # User@Host: logstash[logstash] @ localhost [127.0.0.1] # User@Host: logstash[logstash] @ [127.0.0.1] match => [ "message", "^# User@Host: %{ALLWORD:user}\[%{ALLWAORD}\] @ %{ALLWORD:dbhost}? \[%{IP:ip}\]" ] } grok { # Query_time: 102.413328 Lock_time: 0.000167 Rows_sent: 0 Rows_examined: 1970 match => [ "message", "^# Query_time: %{NUMBER:duration:float}%{SPACE}Lock_time: %{NUMBER:lock_wait:float}%{SPACE}Rows_sent: %{NUMBER:results:int}%{SPACE}Rows_examined:%{SPACE}%{NUMBER:scanned:int}%{ALLWORD:sql}"] } // remove useless data mutate { gsub => [ "sql", "\nSET timestamp=\d+?;\n", "", "sql", "\nuse [a-zA-Z0-9\-\_]+?;", "", "sql", "\n# Time: \d+\s+\d+:\d+:\d+", "", "sql", "\n/usr/local/mysql/bin/mysqld.+$", "", "sql", "\nTcp port:.+$", "", "sql", "\nTime .+$", "" ] } # Capture the time the query happened grok { match => [ "message", "^SET timestamp=%{NUMBER:timestamp};" ] } date { match => [ "timestamp", "UNIX" ] } # calculate unique hash mutate { add_field => {"sql_for_hash" => "%{sql}"} } mutate { gsub => [ "sql_for_hash", "'.+?'", "", "sql_for_hash", "-?\d*\.{0,1}\d+", "" ] } checksum { algorithm => "md5" keys => ["sql_for_hash"] } # Drop the captured timestamp field since it has been moved to the time of the event mutate { # TODO: remove the message field remove_field => ["timestamp", "message", "sql_for_hash"] } } output { #stdout{ # codec => rubydebug #} #if ("_grokparsefailure" not in [tags]) { # stdout{ # codec => rubydebug # } #} if ("_grokparsefailure" not in [tags]) { elasticsearch { hosts => ["192.168.1.1:9200"] index => "logstash-slowlog" } } }
采集進去的內容
{ "@timestamp" => "2016-05-23T21:12:59.000Z", "@version" => "1", "tags" => [ [0] "multiline" ], "path" => "/Users/ken/tx/elk/logstash/data/slow_sql.log", "host" => "Luna-mac-2.local", "type" => "mysql-slow", "env" => "PRODUCT", "user" => "dba_bak_all_sel", "ip" => "10.166.140.109", "duration" => 28.812601, "lock_wait" => 0.000132, "results" => 749414, "scanned" => 749414, "sql" => "SELECT /*!40001 SQL_NO_CACHE */ * FROM `xxxxx`;", "logstash_checksum" => "3e3ccb89ee792de882a57e2bef6c5371" }
4. 寫查詢
查詢, 我們需要按 logstash_checksum 進行聚合, 然后按照次數由多到少降序展示, 同時, 每個 logstash_checksum 需要有一條具體的sql進行展示
通過 es 的 Top hits Aggregation 可以完美地解決這個查詢需求
查詢的query
body = { "from": 0, "size": 0, "query": { "filtered": { "query": { "match": { "user": "test" } }, "filter": { "range": { "@timestamp": { "gte": "now-1d", "lte": "now" } } } } }, "aggs": { "top_errors": { "terms": { "field": "logstash_checksum", "size": 20 }, "aggs": { "top_error_hits": { "top_hits": { "sort": [ { "@timestamp":{ "order": "desc" } } ], "_source": { "include": [ "user" , "sql", "logstash_checksum", "@timestamp", "duration", "lock_wait", "results", "scanned" ] }, "size" : 1 } } } } } }
跟這個寫法相關的幾個參考鏈接: Terms Aggregation / Elasticsearch filter document group by field
5. 渲染頁面
python的后臺, 使用 sqlparse 包, 將sql進行格式化(換行/縮進/大小寫), 再往前端傳. sqlparse
>>> sql = 'select * from foo where id in (select id from bar);' >>> print sqlparse.format(sql, reindent=True, keyword_case='upper') SELECT * FROM foo WHERE id IN (SELECT id FROM bar);
然后在頁面上, 使用js進行語法高亮 code-prettify
來自: http://www.wklken.me/posts/2016/05/24/elk-mysql-slolog.html