Logstash+ElasticSearch處理mysql慢查詢日志
遇到一個需求, 需要查詢某些業務的慢查詢日志. 結果DBA平臺那邊提供的慢查詢日志不能解決實際的業務場景(上報的字段補全), 無奈, 自己挽起袖子上
參考了 這篇文章 , 不過自己根據需求做了較多的變更
開始吧
1. 找到日志的位置
先確認是否開啟了, 然后找到日志文件的位置
> show variables like '%slow%'; +---------------------+-------------------------------------+ | Variable_name | Value | +---------------------+-------------------------------------+ | log_slow_queries | ON | | slow_launch_time | 2 | | slow_query_log | ON | | slow_query_log_file | /data/mysqllog/20000/slow-query.log | +---------------------+-------------------------------------+
2. 慢查詢日志
格式基本是如下, 當然, 格式如果有差異, 需要根據具體格式進行小的修改
# Time: 160524 5:12:29 # User@Host: user_a[xxxx] @ [10.166.140.109] # Query_time: 1.711086 Lock_time: 0.000040 Rows_sent: 385489 Rows_examined: 385489 use dbname; SET timestamp=1464037949; SELECT 1 from dbname;
3. 使用 logstash 采集
采集, 無非是用 multiline 進行多行解析
但是, 需要處理的
第一個是, 去除掉沒用的信息
第二個, 慢查詢sql, 是會反復出現的, 所以, 執行次數成了一個很重要的指標. 我們要做的, 就是 降噪 (將參數去掉, 涉及帶引號的內容+數字), 將參數類信息過濾掉, 留下核心的sql, 然后計算出一個hash, 這樣就可以在查詢, 根據這個字段進行聚合. 這里用到了 mutate 以及 checksum
# calculate unique hash
mutate {
add_field => {"sql_for_hash" => "%{sql}"}
}
mutate {
gsub => [
"sql_for_hash", "'.+?'", "",
"sql_for_hash", "-?\d*\.{0,1}\d+", ""
]
}
checksum {
algorithm => "md5"
keys => ["sql_for_hash"]
}
最后算出來的md5, 放入了 logstash_checksum
完整的logstash配置文件(具體使用可能需要根據自身日志格式做些小調整) 注意, 里面的pattern ALLWORD [\s\S]*
input {
file {
path => ["/data/mysqllog/20000/slow-query.log"]
sincedb_path => "/data/LogNew/logstash/sincedb/mysql.sincedb"
type => "mysql-slow-log"
add_field => ["env", "PRODUCT"]
codec => multiline {
pattern => "^# User@Host:"
negate => true
what => previous
}
}
}
filter {
grok {
# User@Host: logstash[logstash] @ localhost [127.0.0.1]
# User@Host: logstash[logstash] @ [127.0.0.1]
match => [ "message", "^# User@Host: %{ALLWORD:user}\[%{ALLWAORD}\] @ %{ALLWORD:dbhost}? \[%{IP:ip}\]" ]
}
grok {
# Query_time: 102.413328 Lock_time: 0.000167 Rows_sent: 0 Rows_examined: 1970
match => [ "message", "^# Query_time: %{NUMBER:duration:float}%{SPACE}Lock_time: %{NUMBER:lock_wait:float}%{SPACE}Rows_sent: %{NUMBER:results:int}%{SPACE}Rows_examined:%{SPACE}%{NUMBER:scanned:int}%{ALLWORD:sql}"]
}
// remove useless data
mutate {
gsub => [
"sql", "\nSET timestamp=\d+?;\n", "",
"sql", "\nuse [a-zA-Z0-9\-\_]+?;", "",
"sql", "\n# Time: \d+\s+\d+:\d+:\d+", "",
"sql", "\n/usr/local/mysql/bin/mysqld.+$", "",
"sql", "\nTcp port:.+$", "",
"sql", "\nTime .+$", ""
]
}
# Capture the time the query happened
grok {
match => [ "message", "^SET timestamp=%{NUMBER:timestamp};" ]
}
date {
match => [ "timestamp", "UNIX" ]
}
# calculate unique hash
mutate {
add_field => {"sql_for_hash" => "%{sql}"}
}
mutate {
gsub => [
"sql_for_hash", "'.+?'", "",
"sql_for_hash", "-?\d*\.{0,1}\d+", ""
]
}
checksum {
algorithm => "md5"
keys => ["sql_for_hash"]
}
# Drop the captured timestamp field since it has been moved to the time of the event
mutate {
# TODO: remove the message field
remove_field => ["timestamp", "message", "sql_for_hash"]
}
}
output {
#stdout{
# codec => rubydebug
#}
#if ("_grokparsefailure" not in [tags]) {
# stdout{
# codec => rubydebug
# }
#}
if ("_grokparsefailure" not in [tags]) {
elasticsearch {
hosts => ["192.168.1.1:9200"]
index => "logstash-slowlog"
}
}
}
采集進去的內容
{
"@timestamp" => "2016-05-23T21:12:59.000Z",
"@version" => "1",
"tags" => [
[0] "multiline"
],
"path" => "/Users/ken/tx/elk/logstash/data/slow_sql.log",
"host" => "Luna-mac-2.local",
"type" => "mysql-slow",
"env" => "PRODUCT",
"user" => "dba_bak_all_sel",
"ip" => "10.166.140.109",
"duration" => 28.812601,
"lock_wait" => 0.000132,
"results" => 749414,
"scanned" => 749414,
"sql" => "SELECT /*!40001 SQL_NO_CACHE */ * FROM `xxxxx`;",
"logstash_checksum" => "3e3ccb89ee792de882a57e2bef6c5371"
}
4. 寫查詢
查詢, 我們需要按 logstash_checksum 進行聚合, 然后按照次數由多到少降序展示, 同時, 每個 logstash_checksum 需要有一條具體的sql進行展示
通過 es 的 Top hits Aggregation 可以完美地解決這個查詢需求
查詢的query
body = {
"from": 0,
"size": 0,
"query": {
"filtered": {
"query": {
"match": {
"user": "test"
}
},
"filter": {
"range": {
"@timestamp": {
"gte": "now-1d",
"lte": "now"
}
}
}
}
},
"aggs": {
"top_errors": {
"terms": {
"field": "logstash_checksum",
"size": 20
},
"aggs": {
"top_error_hits": {
"top_hits": {
"sort": [
{
"@timestamp":{
"order": "desc"
}
}
],
"_source": {
"include": [
"user" , "sql", "logstash_checksum", "@timestamp", "duration", "lock_wait", "results", "scanned"
]
},
"size" : 1
}
}
}
}
}
}
跟這個寫法相關的幾個參考鏈接: Terms Aggregation / Elasticsearch filter document group by field
5. 渲染頁面
python的后臺, 使用 sqlparse 包, 將sql進行格式化(換行/縮進/大小寫), 再往前端傳. sqlparse
>>> sql = 'select * from foo where id in (select id from bar);' >>> print sqlparse.format(sql, reindent=True, keyword_case='upper') SELECT * FROM foo WHERE id IN (SELECT id FROM bar);
然后在頁面上, 使用js進行語法高亮 code-prettify
來自: http://www.wklken.me/posts/2016/05/24/elk-mysql-slolog.html