elk 日志系统搭建

简介

一般提到 elk 指的是 elasticsearch + logstash + kibana的全家桶组合。其中 es 可以对日志进行存储和索引,提供搜索和分析的能力,logstash 用来对日志内容做过滤解析等操作,kibana 则是一个为 ElasticSearch 提供可视化能力的平台,可以搜索、查看并和存储在Elasticsearch索引中的数据进行交互。不过 logstash 占用内存较大,如果在每台需要采集日志的机器上都部署的话,会造成资源的浪费。

因此采用更轻量级的 filebeat 做日志收集是一个更好的选择。同时在新版本的 filebeat中,可以直接写数据至 es 中,如果需要对日志进行处理,设置对应的 pipeline 即可。

FileBeat配置

下载 filebeat 压缩包,解压后编辑 filebeat.yml 文件,配置如下,这里我们收集的是某个 service 下的log

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
filebeat.inputs:

- type: log

enabled: true

# 需要收集的日志路径,支持正则匹配
paths:
- /your/logpath*
fields:
type: "service_name"

setup.template.name: "ignore-this"
setup.template.pattern: "ignore-this"

output.elasticsearch:
# Array of hosts to connect to.
hosts: ["localhost:8200"]

indices:
# 索引名
- index: "filebeat-service-name-%{+yyy.MM.dd}"
when.equals:
fields.type: "service_name"
pipelines:
# 管道名
- pipeline: "service_name"
when.equals:
fields.type: "service_name"

ES 和 Kibana 配置

es 和 kibana 基本用默认配置就好,我对 es 配置修改如下:

1
2
3
4
5
6
# http
http.cors.enabled: true #开启跨域访问支持 默认为false

http.cors.allow-origin: "*" #跨域访问允许的域名地址,支持正则

JAVA_HOME: /.../elk/elasticsearch-7.6.2/jdk

如果是通过代理访问 kibana, 则需要配置 kibana.yml 中的 server.basePath 字段。

pipeline 配置

ES 的 ingest node 可以对文档做预处理工作。在 ingest node 内部定义一个pipeline,pipeline中定义多个processor,这些processor定义了文档的预处理操作。这些预处理可以包括解析字符串、增加字段、删除字段等等。

例如 service 日志的格式如下,这是一行非结构化的文本:

1
NOTICE: 2020-04-08 14:52:07 xxx/weblogware.go:44 logid[3159816465] err[0] api[plan_getPlan] status[200] appkey[] req_app[] appname[job] cost[20.071] errno[-1] errmsg[] protocol[http] idc[test] client_ip[111.111.111.111] local_ip[111.111.111.111] pid[47663] method[get] uri[/plan/getPlan?plan_id=100000] host[xxx.baidu.com:8099] refer[] optime[1586328727780] cookie[BAIDUID=xxxxxxxxxxxxxxxxxx:FG=1]

记录至 es 前需要对其进行结构化处理,这一步在 pipeline 中完成。首先我们要定义结构化的格式,可以借助 grok pattern 这个网页工具完成,如果无法访问或调试,可以访问其国内镜像。经调试,得到对应的 grok pattern 如下

1
%{LOGLEVEL:level}: %{TIMESTAMP_ISO8601:date} %{GREEDYDATA:file_path}:%{INT:line_num} logid[%{INT:log_id}] err[%{GREEDYDATA:err}] api[%{GREEDYDATA:api}] status[%{INT:status}] appkey[%{GREEDYDATA:appkey}] req_app[%{GREEDYDATA:appkey}] appname[%{WORD:appname}] cost[%{BASE10NUM:cost}] errno[%{BASE10NUM:errno}] errmsg[%{GREEDYDATA:errmsg}] protocol[%{WORD:protocol}] idc[%{WORD:idc}] client_ip[%{IP:client_ip}] local_ip[%{IP:local_ip}] pid[%{INT:pid}] method[%{WORD:method}] uri[%{GREEDYDATA:uri}] host[%{GREEDYDATA:host}] refer[%{GREEDYDATA:refer}] optime[%{INT:optime}] cookie[%{GREEDYDATA:cookie}

现在我们进入 kibana 的控制台,通过 es 的 restful 接口创建对应的 pipeline:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
PUT _ingest/pipeline/service_name
{
"description" : "service log pipeline",
"processors" : [
{
"grok" : {
"field" : "message",
"patterns" : [
"%{LOGLEVEL:level}: %{TIMESTAMP_ISO8601:date} %{GREEDYDATA:file_path}:%{INT:line_num} logid\\[%{INT:log_id}\\] err\\[%{GREEDYDATA:err}\\] api\\[%{GREEDYDATA:api}\\] status\\[%{INT:status}\\] appkey\\[%{GREEDYDATA:appkey}\\] req_app\\[%{GREEDYDATA:appkey}\\] appname\\[%{WORD:appname}\\] cost\\[%{BASE10NUM:cost}\\] errno\\[%{BASE10NUM:errno}\\] errmsg\\[%{GREEDYDATA:errmsg}\\] protocol\\[%{WORD:protocol}\\] idc\\[%{WORD:idc}\\] client_ip\\[%{IP:client_ip}\\] local_ip\\[%{IP:local_ip}\\] pid\\[%{INT:pid}\\] method\\[%{WORD:method}\\] uri\\[%{GREEDYDATA:uri}\\] host\\[%{GREEDYDATA:host}\\] refer\\[%{GREEDYDATA:refer}\\] optime\\[%{INT:optime}\\] cookie\\[%{GREEDYDATA:cookie}\\]"
]
}
},
{
"rename" : {
"field" : "@timestamp",
"target_field" : "event.created"
}
},
{
"date" : {
"field" : "date",
"target_field" : "@timestamp",
"timezone" : "Asia/Shanghai",
"formats" : [
"yyyy-MM-dd H:m:s"
],
"on_failure" : [
{
"append" : {
"field" : "error.message",
"value" : "{{ _ingest.on_failure_message }}"
}
}
]
}
}
]
}

这里有两点需要注意:
一是 grok pattern 里的特殊符号前需要加两层转义 \
二是日志在导入 es 的过程中,会根据导入时间创建 timestamp 这个字段,这样的话我们根据 timestamp 排序得到的实际上是日志被插入到 es 中的时间,并非日志的产生时间。而日志的产生时间实际上记录在 date 字段中。这里我的处理是首先将 timestamp 字段重命名为 event.created,然后将 date 重命名为stamp,同时对 date 进行时间格式化,并设定正确的时区。

tips

经过上面的配置,依次启动 es、kibana 和各台机器上的 filebeat,理论上已经可以正确收集到服务日志。
但初次部署启动时,es 可能会启动失败,最常见的是系统或进程文件描述符达到上限或者内存不够等问题。可以阅读 elasticsearch性能调优 这篇文章来解决这些问题,以及对 es 性能进行优化。
另外还有一点需要提醒的是,测试时要删除filebeat目录下data/registry,这里面记录的是读取log的位置信息。