Logstash+ElasticSearch處理mysql慢查詢日志

yeyou 8年前發布 | 19K 次閱讀 MySQL SQL Logstash 數據庫服務器

遇到一個需求, 需要查詢某些業務的慢查詢日志. 結果DBA平臺那邊提供的慢查詢日志不能解決實際的業務場景(上報的字段補全), 無奈, 自己挽起袖子上

參考了 這篇文章 , 不過自己根據需求做了較多的變更

開始吧

1. 找到日志的位置

先確認是否開啟了, 然后找到日志文件的位置

> show variables like '%slow%';
+---------------------+-------------------------------------+
| Variable_name       | Value                               |
+---------------------+-------------------------------------+
| log_slow_queries    | ON                                  |
| slow_launch_time    | 2                                   |
| slow_query_log      | ON                                  |
| slow_query_log_file | /data/mysqllog/20000/slow-query.log |
+---------------------+-------------------------------------+

2. 慢查詢日志

格式基本是如下, 當然, 格式如果有差異, 需要根據具體格式進行小的修改

# Time: 160524  5:12:29
# User@Host: user_a[xxxx] @  [10.166.140.109]
# Query_time: 1.711086  Lock_time: 0.000040 Rows_sent: 385489  Rows_examined: 385489
use dbname;
SET timestamp=1464037949;
SELECT 1 from dbname;

3. 使用 logstash 采集

采集, 無非是用 multiline 進行多行解析

但是, 需要處理的

第一個是, 去除掉沒用的信息

第二個, 慢查詢sql, 是會反復出現的, 所以, 執行次數成了一個很重要的指標. 我們要做的, 就是 降噪 (將參數去掉, 涉及帶引號的內容+數字), 將參數類信息過濾掉, 留下核心的sql, 然后計算出一個hash, 這樣就可以在查詢, 根據這個字段進行聚合. 這里用到了 mutate 以及 checksum

# calculate unique hash
  mutate {
    add_field => {"sql_for_hash" => "%{sql}"}
  }
  mutate {
    gsub => [
        "sql_for_hash", "'.+?'", "",
        "sql_for_hash", "-?\d*\.{0,1}\d+", ""
    ]
  }
  checksum {
    algorithm => "md5"
    keys => ["sql_for_hash"]
  }

最后算出來的md5, 放入了 logstash_checksum

完整的logstash配置文件(具體使用可能需要根據自身日志格式做些小調整) 注意, 里面的pattern ALLWORD [\s\S]*

input {
  file {
    path => ["/data/mysqllog/20000/slow-query.log"]
    sincedb_path => "/data/LogNew/logstash/sincedb/mysql.sincedb"
    type => "mysql-slow-log"
    add_field => ["env", "PRODUCT"]
    codec => multiline {
      pattern => "^# User@Host:"
      negate => true
      what => previous
    }
  }
}
filter {
  grok {
    # User@Host: logstash[logstash] @ localhost [127.0.0.1]
    # User@Host: logstash[logstash] @  [127.0.0.1]
    match => [ "message", "^# User@Host: %{ALLWORD:user}\[%{ALLWAORD}\] @ %{ALLWORD:dbhost}? \[%{IP:ip}\]" ]
  }
  grok {
    # Query_time: 102.413328  Lock_time: 0.000167 Rows_sent: 0  Rows_examined: 1970
    match => [ "message", "^# Query_time: %{NUMBER:duration:float}%{SPACE}Lock_time: %{NUMBER:lock_wait:float}%{SPACE}Rows_sent: %{NUMBER:results:int}%{SPACE}Rows_examined:%{SPACE}%{NUMBER:scanned:int}%{ALLWORD:sql}"]
  }

  // remove useless data
  mutate {
    gsub => [
        "sql", "\nSET timestamp=\d+?;\n", "",
        "sql", "\nuse [a-zA-Z0-9\-\_]+?;", "",
        "sql", "\n# Time: \d+\s+\d+:\d+:\d+", "",
        "sql", "\n/usr/local/mysql/bin/mysqld.+$", "",
        "sql", "\nTcp port:.+$", "",
        "sql", "\nTime .+$", ""
    ]
  }

  # Capture the time the query happened
  grok {
    match => [ "message", "^SET timestamp=%{NUMBER:timestamp};" ]
  }
  date {
    match => [ "timestamp", "UNIX" ]
  }


  # calculate unique hash
  mutate {
    add_field => {"sql_for_hash" => "%{sql}"}
  }
  mutate {
    gsub => [
        "sql_for_hash", "'.+?'", "",
        "sql_for_hash", "-?\d*\.{0,1}\d+", ""
    ]
  }
  checksum {
    algorithm => "md5"
    keys => ["sql_for_hash"]
  }

  # Drop the captured timestamp field since it has been moved to the time of the event
  mutate {
    # TODO: remove the message field
    remove_field => ["timestamp", "message", "sql_for_hash"]
  }
}
output {
    #stdout{
    #    codec => rubydebug
    #}
    #if ("_grokparsefailure" not in [tags]) {
    #    stdout{
    #        codec => rubydebug
    #    }
    #}
    if ("_grokparsefailure" not in [tags]) {
        elasticsearch {
          hosts => ["192.168.1.1:9200"]
          index => "logstash-slowlog"
        }
    }
}

采集進去的內容

{
           "@timestamp" => "2016-05-23T21:12:59.000Z",
             "@version" => "1",
                 "tags" => [
        [0] "multiline"
    ],
                 "path" => "/Users/ken/tx/elk/logstash/data/slow_sql.log",
                 "host" => "Luna-mac-2.local",
                 "type" => "mysql-slow",
                  "env" => "PRODUCT",
                 "user" => "dba_bak_all_sel",
                   "ip" => "10.166.140.109",
             "duration" => 28.812601,
            "lock_wait" => 0.000132,
              "results" => 749414,
              "scanned" => 749414,
                  "sql" => "SELECT /*!40001 SQL_NO_CACHE */ * FROM `xxxxx`;",
    "logstash_checksum" => "3e3ccb89ee792de882a57e2bef6c5371"
}

4. 寫查詢

查詢, 我們需要按 logstash_checksum 進行聚合, 然后按照次數由多到少降序展示, 同時, 每個 logstash_checksum 需要有一條具體的sql進行展示

通過 es 的 Top hits Aggregation 可以完美地解決這個查詢需求

查詢的query

body = {
    "from": 0,
    "size": 0,
    "query": {
        "filtered": {
            "query": {
                "match": {
                    "user": "test"
                }
            },
            "filter": {
                "range": {
                    "@timestamp": {
                        "gte": "now-1d",
                        "lte": "now"
                    }
                }
            }
        }
    },
    "aggs": {
        "top_errors": {
            "terms": {
                "field": "logstash_checksum",
                "size": 20
            },
            "aggs": {
                "top_error_hits": {
                    "top_hits": {
                        "sort": [
                            {
                                "@timestamp":{
                                    "order": "desc"
                                }
                            }
                        ],
                        "_source": {
                            "include": [
                               "user" , "sql", "logstash_checksum", "@timestamp", "duration", "lock_wait", "results", "scanned"
                            ]
                        },
                        "size" : 1
                    }
                }
            }
        }
    }
}

跟這個寫法相關的幾個參考鏈接: Terms Aggregation / Elasticsearch filter document group by field

5. 渲染頁面

python的后臺, 使用 sqlparse 包, 將sql進行格式化(換行/縮進/大小寫), 再往前端傳. sqlparse

>>> sql = 'select * from foo where id in (select id from bar);'
>>> print sqlparse.format(sql, reindent=True, keyword_case='upper')
SELECT *
FROM foo
WHERE id IN
  (SELECT id
   FROM bar);

然后在頁面上, 使用js進行語法高亮 code-prettify

 

來自: http://www.wklken.me/posts/2016/05/24/elk-mysql-slolog.html

 

 本文由用戶 yeyou 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!