利用ELK分析日志,以及通过zabbix实现告警

记录一下自己折腾大半年的elk+zabbix,里面有logstash通过udp/tcp/logfile收集windows/linux/mysql等日志;elasticsearch的写入,读取,统计,聚合以及优化;kibana的绘图;logstash/elasticsearch接zabbix触发各类告警。

日志数据的采集

日志采集方式

tcp/udp

logstash的input中tcp/udp插件支持接收设备通过tcp或udp的方式传来的日志。

input {
	tcp {
		port => "50014"
		type => rsyslog
	}
	tcp {
		port => "5001"
		type => windows
	}
	udp {
		port => "5003"
		type => syslog
	}
}

logfile

logstash的input中file插件支持直接读取日志文件。

file {
	path => "/var/log/error.log"
	start_position => "beginning/end"
	type => logfile
}

start_position为设置读取文件的开始位置,beginning是从文件第一行开始读取(可以读取旧数据),end是从最后开始读取(只读取新数据)。

日志源

windows

收集windows日志需要在目标主机上安装nxlog,用于把syslog通过tcp方式发送到logstash所在的服务器上。 在$NXLOG_HOME/conf/nxlog.conf中修改添加如下代码

<Output out>
	Module      om_tcp
	Host        10.0.0.101
	Port        5001
	Exec        to_syslog_snare();
</Output>

在windows服务中打开nxlog服务,nxlog采用tcp的方式传输数据,所以在logstash的input模块中也采用tcp的方式收集,port设置为nxlog.conf对应的port。详细的logstash配置如下:

input {
	tcp {
		port => "5001"
		type => windows
	}
}
filter{
	if [type] == "windows" {
		grok{
			match => { "message" => "<%{INT:PRI}>(%{SYSLOGTIMESTAMP:syslog_timestamp}) %{SYSLOGHOST:syslog_hostname} %{DATA:loglabel}\t%{NONNEGINT:ID}\t%{DATA:channel}\t%{INT:generateid}\t%{DATA:event_timestamp}\t%{INT:eventid_qualifier}\t%{DATA:syslog_program}\t%{DATA:AcountName}\t%{DATA:AcountType}\t%{DATA:severity_string}\t%{DATA:computer}\t%{DATA:event_type}\t\t%{GREEDYDATA:syslog_message}\t%{INT:event_recordid}"}
			remove_field => ["message","loglabel","ID","channel","generateid","event_timestamp","eventid_qualifier","AcountName","AcountType","severity_string","computer","event_type","event_recordid","syslog_timestamp"]
		}
		syslog_pri{
			syslog_pri_field_name => "PRI"
			remove_field =>["PRI","syslog_facility_code","syslog_severity_code"]
		}
	}
	if ([syslog_message] =~ ".*10.120.7.121.*") {
		drop {}
	}
}

output{
	elasticsearch{
		hosts => "10.0.0.41:9200"
		template_overwrite => true
		template => "/config/logstash.json"
		template_name => "logstash.json"
		manage_template => true
	}
}

linux

linux系统收集日志的方式有两种,syslog和rsyslog,老版本linux支持syslog,新版本linux支持rsyslog。

  1. syslog 打开/etc/services 文件,找到syslog的相关配置,可以看到syslog进程对外发送日志消息的端口以及采用的协议;经过尝试发现syslog进程目前只支持UDP协议对外发送日志;可以在这里修改相关的发送端口,要和logstash.conf文件中定义的接收端口一致; 打开 /etc/syslog.conf 文件,在文件的最后添加*.* @ 10.0.0.119:5003 第一个*号 表示系统的facility;.是分隔符; 第二个*号 表示日志的级别;@表示采用UDP的协议发送日志数据;如果是两个’@@’表示采用TCP协议发送数据;‘10.0.0.119’是接收日志信息的服务器IP地址;‘5003’是端口号。 运行命令”service syslog restart” 重启syslog进程生效。
  2. rsyslog rsyslog 客户端的配置相对syslog 比较简单,直接修改’/etc/rsyslog.conf’ 文件,在文件的最后添加*.* @@ 10.0.0.119:5014 然后终端输入”service rsyslog restart” ,重启rsyslog进程。
  3. 修改logstash配置,接收并分析linux日志 在input模块中分别用tcp和udp的方式接收rsyslog和syslog传送的日志,并用filter的grok解析日志,具体的logstash配置如下:
input {
    tcp {
        port => "50014"
        type => rsyslog
    }
    udp {
        port => "5003"
        type => syslog
    }
}
filter{
    if [type] == "rsyslog" {
        grok {
            match => { "message" => "<%{NONNEGINT:PRI}>(%{SYSLOGTIMESTAMP:syslog_timestamp}) %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}"  }
            remove_field =>["message","syslog_timestamp"]
        }
        syslog_pri{
            syslog_pri_field_name => "PRI"
            remove_field =>["PRI","syslog_facility_code","syslog_severity_code"]
        }
    }
    if [type] == "syslog" {
        grok {
            match => { "message" => "<%{NONNEGINT:PRI}>%{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}"  }            
            remove_field =>["message","syslog_timestamp"]     
        }   
        syslog_pri{
            syslog_pri_field_name => "PRI"
            remove_field =>["PRI","syslog_facility_code","syslog_severity_code"]
        }   
    }
}
output{
    elasticsearch{
        hosts => "10.0.0.41:9200"
        template_overwrite => true
        template => "/config/logstash.json"
        template_name => "logstash.json"
        manage_template => true
    }
}

mysql

目前支持收集分析mysql的慢查询日志和错误日志,但由于mysql日志属于应用日志,不能通过syslog发送,所以要在mysql的宿主服务器上安装logstash,然后通过logstash发送到日志服务器的es中。

慢查询日志

找到mysql的配置文件目录下的slowlog.cnf(不同数据库版本位置和文件不同,需具体环境具体分析)。

[mysqld]
slow_query_log                = on
slow_query_log_file           = /var/lib/mysql/slow.log
long_query_time               = 1

参数对应设置为:

  • slow_query_log设置为off关闭慢查询日志,on是打开慢查询日志
  • slow_query_log_file为设置慢查询日志文件的位置
  • long_query_time为设置慢查询临界时间,单位位秒,所有超过该时间的查询均为慢查询。

利用logstash input模块的file读取mysql的慢查询日志,并通过filter分析慢查询日志。但是不同大版本(5.1与5.5)的mysql slow log 格式不一致,相同大版本小版本不同的mysql也不一致,并且不同mysql变种(percona server) 也会不一致,即便版本都一致了,同一个slowlog中的不同记录格式也不尽相同,所以需要具体环境具体分析,编写不同的logstash配置文件。

logstash具体配置文件如下:

input {
	file {
		patch => "/var/lib/mysql/slow.log"
		type => mysql_slow
	}
}
filter {
	grok {
		match => [ "message", "(?m)^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+\[%{IP:clientip}\]\n#\s+Thread_id:\s+%{NUMBER:id:int}\s+Schema:\s+QC_hit:\s+%{DATA:QChit}\n#\s+Query_time:\s+%{NUMBER:query_time:float}\s+Lock_time:\s+%{NUMBER:lock_time:float}\s+Rows_sent:\s+%{NUMBER:rows_sent:int}\s+Rows_examined:\s+%{NUMBER:rows_examined:int}\n#\s+Rows_affected:\s+%{NUMBER:rows_affected:int}\nSET\s+timestamp=%{NUMBER:timestamp};\n%{DATA:MESSAGE};"]
	}
	date {
		match => [ "timestamp", "UNIX" ]
	}
}
output {
	elasticsearch{
		hosts => "10.0.0.41:9200"
		index => "mysql-%{+YYYY.MM.dd}"
	}
}
错误日志

找到mysql的配置文件目录下的logging.cnf(不同数据库版本位置和文件不同,需具体环境具体分析)。

[mysqld]
pid_file          = /var/lib/mysql/mysql.pid
log_error         = /var/lib/mysql/error.log
log_warnings      = 2
[general]
general_log       = off
general_log_file  = /var/lib/mysql/general.log
[mysqld_safe]
log_error         = /var/lib/mysql/error.log
pid_file          = /var/lib/mysql/mysql.pid

mysqld的错误日志默认是打开的;中间的gemeral默认是关闭的,记录的是mysql所有的查询操作,打开会损耗mysql的性能,不建议打开;目前mysql版本基本很少有用mysqld启动mysql的,常用mysqld_safe启动,后者较前者增加了一些安全特性,例如当出现错误时重启服务器并向错误日志文件写入运行时间信息。

记录下error日志的保存路径后,设置logstash,和搜集慢查询日志的方式一样,logstash具体的配置文件如下:

input {
	file {
		patch => "/var/lib/mysql/error.log"
		type => mysql_error
	}
}
filter {
	grok {
		match => { "message"=>"%{MYSQLDATE:timestamp}\s+%{NUMBER:some_number}\s+\[%{DATA:severity}\]\s+%{GREEDYDATA:log}"}
	}
}
output {
	elasticsearch{
		hosts => "10.0.0.41:9200"
		index => "mysql-%{+YYYY.MM.dd}"
	}
}

日志数据的集中存储

日志快速统一写入与读取

写入

日志系统采用elasticsearch存储日志数据,日志通过logstash收集后,利用其output模块配置elasticsearch插件,实现日志快速写入es中,output具体代码如下:

output{
	elasticsearch{
		hosts => "10.0.0.41:9200"
		template_overwrite => true
		template => "/config/logstash.json"
		template_name => "logstash.json"
		manage_template => true
	}
}

如果es添加了用户名/密码的验证模块,需要在output中设置user/password

读取

日志分析系统采用kibana作为读取并可视化日志的工具,kibana集成了es的索引,聚合等api,并利用聚合api对日志数据进行分类汇总,实现绘制各类图表的功能,在下面的日志索引和分析板块会有详细说明。

es的优化

为了提升es的性能,便于kibana的快速读取,我们做了以下优化:

增加es的jvm heap大小

在es镜像文件中,把./config/jvm.options里的ES_JAVA_OPTS注释掉。在./elasticsearch/elasticsearch.env中加入:

ES_JAVA_OPTS=-Xms5g -Xmx5g

具体数值根据现场服务器的内存大小修改,Xms和Xmx最好相等,且最大不超过32g。

修改es索引的分片和副本数量

由于索引是自动生成的,默认每个索引会有五个分片,一个副本,而当索引生成后,分片和副本的数量是无法更改的,所以我们需要在logstash把日志存入es之前设置索引模板。

修改logstash的配置文件中output模块为:

elasticsearch{
	hosts => "localhost:9200"
	template_overwrite => true
	template => "/config/logstash.json"
	template_name => "logstash.json"
	manage_template => true
}

其中template_overwrite设置为true是允许覆盖旧模板;

template是设置模板的路径;

template_name是设置模板在es中的名称;

manage_template设置为true是允许logstash向es中写入模板。

logstash.json的内容为:

{
	"template":"logstash*",
	"settings":{
		"index.number_of_shards":1,
		"index.number_of_replicas":0
	}
}

增加es容器的最大锁内存和最大打开文件数量

在docker-compose-elasticsearch.yml中加入如下代码:

ulimits:
	memlock:
		soft: -1
		hard: -1
	nofile:
		soft: 65536
		hard: 65536
	cap_add:
		- IPC_LOCK
	memlock的"-1"为无限unlimited,es要求nofile不得小于65536。

禁止es容器物理内存与虚拟内存的交换

在docker-compose-elasticsearch.yml的command中加入:

-Edefault.bootstrap.memory_lock=true

日志存储容量的扩大

存储日志的elasticsearch是docker容器,所以利用docker的数据卷计数,把es的索引存储目录映射到容器之外,这样的话日志存储容量跟宿主机磁盘大小有着直接的关系,扩大日志存储容量只需扩大宿主机磁盘容量即可。es的docker-compose.yml关于数据卷代码如下:

volumes:
	- /etc/localtime:/etc/localtime:ro
	- /var/local/es:/opt/elasticsearch/data

旧数据导出并删除

目前的日志数据是根据日期存放在es中的,一天一个索引,根据上面docker的数据卷技术,旧数据可以根据索引直接导出到其他备份磁盘中,而且后期如果做elk集群的话,设置索引的副本数量,日志数据会自动备份到es的其他节点中,可以有效的防止因单个节点宿主机的机器故障而导致日志数据的丢失。

日志数据的实时索引

快速新建索引

logstash把从各个目标主机收集的日志数据,通过filter过滤解析后,存入es中,每天零点自动生成当天的索引,格式为logstash的filter中匹配的各个key:value对,包含日志产生的时间,设备IP,日志类别,日志级别,日志内容等。 ELK1 也可以手动新建索引,并添加数据:

PUT test/type1/1
{
	"doc" : {
		"name" : "name"
	}
}

快速更新索引

elasticsearch提供了更新索引的api,可以更新修改已有的索引数据:

POST test/type1/1/_update
{
	"doc" : {
		"name" : "new_name"
	}
}

快速删除索引

如果需要删除旧索引数据,根据索引日期,查到该索引的uuid,然后可以去es宿主机的es数据卷映射目录下,找到该索引手动删除,查询uuid的代码如下:

GET /logstash-2017.02.17/_settings

es将会返回该索引的信息,其中就包含了该索引的uuid:

{
  "logstash-2017.02.17": {
    "settings": {
      "index": {
        "refresh_interval": "5s",
        "number_of_shards": "1",
        "provided_name": "logstash-2017.02.17",
        "creation_date": "1487289601950",
        "number_of_replicas": "0",
        "uuid": "FMpmue-GQ2CJmK7nzYkr1Q",
        "version": {
          "created": "5000199"
        }
      }
    }
  }
}

elasticsearch也提供了删除索引的api,快速删除索引:

$ curl -XDELETE 'http://localhost:9200/test’

日志数据的实时分析

elasticsearch通过聚合操作进行实时分析,es提供了多种聚合方法,如Sum,min,max,avg,count,top等,kibana已经集成了es的多种聚合方式,在图表类型中Metric就是基于单个关键字的统计计数,其他图表都能用到基于多个关键字的统计计数。当然也可以在kibana的Dev Tools板块用Query DSL直接操作elasticsearch,而且结合过滤语句,组合成不同DSL,得到各种想要的数据,下面用Qiery DSL操作解释es的聚合。

基于单个关键字的统计计数

max聚合

返回某个字段所有值里最大值,可以用来得到某host在当天memory_free的峰值(min聚合,avg聚合和sum聚合的使用和max类似)Query DSL语句如下:

GET /metricbeat-2017.02.19/_search
{
	"query": {
		"term": {
			"beat.name": "10.0.0.105-beat"
		}
	},
	"aggs": {
		"memory_free_max": {
			"max": {
				"field": "system.memory.free"
			}
		},
		"memory_free_min": {
			"min": {
				"field": "system.memory.free"
			}
		},
		"memory_free_avg": {
			"avg": {
				"field": "system.memory.free"
			}
		},
		"memory_free_sum": {
			"sum": {
				"field": "system.memory.free"
			}
		}
	}
}

运行后返回的结果为:

{
...
  "aggregations": {
    "memory_free_sum": {
      "value": 814722232320
    },
    "memory_free_min": {
      "value": 67133440
    },
    "memory_free_avg": {
      "value": 94296554.66666667
    },
    "memory_free_max": {
      "value": 107405312
    }
  }
}

count聚合

返回某字段的总个数,比如上面那个例子,如果把聚合改成value_count :

"aggs": {
  "error_log_program": {
    "value_count": {
      "field": "syslog_program.keyword"
    }
  }
}

那我们得到的将是error的总数,也就是上面例子结果中doc_count的sum值29 :

{
...
  "aggregations": {
    "error_log_program": {
      "value": 29
    }
  }
}

top聚合

按照设定的排序(正序or逆序)返回前几个文档。例如返回当天某主机最大swap利用率的两个值,Query DSL语句如下:

GET /metricbeat-2017.02.19/_search
{
	"query": {
		"bool": {
			"filter":{
				"term": {"beat.name":"10.0.0.105-beat"}
			}
		}
	},
	"aggs": {
		"top_age": {
			"top_hits": {
				"sort": [           
					{
						"system.memory.swap.used.pct": {    
							"order": "desc"
						}
					}
				],
				"_source": {
					"includes": [       
						"beat.name",
						"system.memory.swap.used.pct"
					]
				},
				"size": 2             
			}
		}
	}
}

运行后返回的结果如下:

{
  ...
  "aggregations": {
    "top_age": {
      "hits": {
        "total": 2157279,
        "max_score": null,
        "hits": [
          {
            ...
            "_source": {
              ...
              "beat": {
                "name": "10.0.0.105-beat"
              }
            },
            "sort": [
              0.195
            ]
          },
          {
            ...
            "_source": {
              ...
              "beat": {
                "name": "10.0.0.105-beat"
              }
            },
            "sort": [
              0.195
            ]
          }
        ]
      }
    }
   }
}

基于多个关键字的统计计数

多关键字统计计数要用到es的Bucket概念,Bucket分为terms Aggregation,Range Aggregation,Date Range Aggregation,Histogram Aggregation,Date Histogram Aggregation以及Missing Aggregation六种,用法基本类似,经常结合max,min等嵌套使用。

下面介绍最常用的一种Buchet聚合——terms聚合: terms Aggregation返回某字段value的所有种类以及文档数目,类似于SQL的group by,可以用来得到某主机error日志的program的分组以及每种Program下error日志的数量,Query DSL语句如下:

GET /logstash-*/_search
{
	"query": {
		"bool": {
			"filter":[
				{"term": {"host":"10.0.0.105"}},
				{"term": {"syslog_severity": "error"}}
			]
		}
	},
	"aggs": {
		"error_log_program": {
			"terms": {
				"field": "syslog_program.keyword"
			}
		}
	}
}

运行后得到的结果如下:

{
...
  "aggregations": {
    "error_log_program": {
      "doc_count_error_upper_bound": 0,
      "sum_other_doc_count": 0,
      "buckets": [
        {
          "key": "systemd",
          "doc_count": 20
        },
        {
          "key": "snmpd",
          "doc_count": 8
        },
        {
          "key": "systemd-udevd",
          "doc_count": 1
        }
      ]
    }
  }
}

例如我们想知道每个host当天memory_free的峰值,Query DSL语句如下:

GET /metricbeat-2017.02.19/_search
{
	"aggs":{
		"host":{
			"terms":{
				"field":"beat.name"
			},
			"aggs": {
				"memory_free_max": {
					"max": {
						"field": "system.memory.free"
					}
				}
			}
		}
	}
}

运行后我们得到的结果为:

{
  ...
  "aggregations": {
    "host": {
      "doc_count_error_upper_bound": 0,
      "sum_other_doc_count": 0,
      "buckets": [
        {
          "key": "10.0.0.105-beat",
          "doc_count": 2157279,
          "memory_free_max": {
            "value": 107405312
          }
        },
        {
          "key": "10.0.0.112-beat",
          "doc_count": 1923871,
          "memory_free_max": {
            "value": 93758235
          }
        }
      ]
    }
  }
}

其他类型的多关键字聚合同理。

日志数据可视化

kibana5.0支持九种图表类型,分别是Area chart,Data table,Line chart, Markdown widget,Metric,Pie chart,Tile map,Timeseries和Vertical bar chart。

基于时间及统计绘制线图

kibana的Visualize板块是用来制作图表的,其中Area chart和Line chart是用来制作线图的,前者是区块线图,后者是折线图。 For example,制作异常日志数量变化图:

  1. 点击Line chart,选择要统计的索引,logstash-*。
  2. Y坐标选择默认的count,X坐标选择Date Histogram,也就是timestamp。
  3. Split Lines选择Filters,设置异常日志的类别,分别是:
    Filter1: syslog_severity:"error"
    Filter2: syslog_severity:"warning"
    Filter3: syslog_severity:"critical"
    Filter4: syslog_severity:"alert"
    Filter5: syslog_severity:"emergency"
    
  4. 点击运行,并保存图表。

异常日志数量变化图效果如下: ELK2

基于关键字统计绘制柱状图或饼图

Pie chart是用来制作饼图的,Vertical bar chart是用来制作柱状图的。 For example,制作各个主机运行程序在各个异常级别的分布图:

  1. 点击Visualize板块的Pie chart,选择要统计的索引,logstash-*。
  2. Slice Size选择Count的聚合方式。
  3. 由上到下选择各个层级的聚合方式和聚合关键字:
    第一层:选择Terms聚合,host.keyword字段,Order by为Count,Order Size根据主机数量而定。
    第二层:选择Terms聚合,syslog_severity.keyword字段,Order by为Count,Order Size为8,因为syslog_severity有八种。
    第三层:选择Terms聚合,syslog_program.keyword字段,Order by为Count,Order size根据具体情况而定。
    
  4. 点击运行,保存图表。

各个主机运行程序在各个异常级别的分布图效果如下:

ELK3

基于关键字统计绘制表格

Data table是制作表格的,可以用来制作异常日志统计表,统计各时段各主机的异常日志数量,制作步骤如下:

  1. 在Visualize板块下选择Data table,并选择要统计的索引,logstash-*
  2. Metric选择默认的Count,用于设置统计方式为总数,这是表格的最后一列。
  3. 下面开始选择表格前面的各项列Split Rows:
第一列:选择Data Histogram聚合方式,@timestamp字段,Interval为Auto,默认根据具体情况自动划分时段单位。
第二列:选择Terms聚合方式,host.keyword字段,Order By为Count,Order Size根据主机数量而定。
第三列:选择Filters聚合方式,Filter为syslog_severity,内容根据异常日志级别而定,分别是error,warning,critical,alert和emergency。
第四列:选择Terms聚合方式,syslog_facility.keyword字段,Order By为Count,Order Size根据具体情况而定。
第五列:选择Terms聚合方式,syslog_program.keyword字段,Order By为Count,Order Size根据具体情况稳定。
第六列:选择Terms聚合方式,syslog_message.keyword字段,Order By为Count,Order Size最好填写尽可能的最大值,因为日志的message种类太多,最大2147480000。
  1. 点击运行,并保存图表。

异常日志统计表的效果如下:

ELK4

基于关键字的快速查找

如果搜索包含某个关键字的所有文档,可以直接在kibana首页的搜索框中输入该关键字,进行搜索,可以已输入通配符搜索。 如果想要更加复杂的搜索,elasticsearch支持query DSL的结构化语句查询,通过在kibana首页的搜索框中编写query DSL语句,快速查找符合DSL条件的日志,并返回显示在kibana首页中。 Query DSL的例子如下:

{
	"query":{
		"bool": {
			"filter":[
				{
					"terms": {
						"syslog_severity": [
							"warning",
							"error",
							"critical",
							"alert",
							"emergency"
						]
					}
				},
				{
					"wildcard": {
						"syslog_message": "*print*"
					}
				}
			]
		}
	}
}

其中bool支持组合过滤器,分别为must,filter,must_not,should。

  • must:所有分句都必须匹配,并且参与计算匹配分值。
  • filter:所有分句都必须匹配,但与must不同的是不参与计算匹配分值,比must快。
  • must_not:所有分句都必须不匹配。
  • should:至少有一个分句匹配。

分句支持term,terms,prefix,wildcard,regexp。

  • term:检索文档中某个field指定的内容。
  • terms:检索文档中某些field指定的内容,这些field之间的关系为OR。
  • prefix:检索文档中某个field内容符合该前缀的所有文档。
  • wildcard:检索文档中某个field内容符合该通配符的所有文档。
  • regexp:检索文档中某个field内容符合该正则的所有文档。

通过这些组合可以做到很多比较复杂的搜索,也可以增加聚合语句,做到聚合查询。

日志数据监控告警

日志监控告警利用zabbix自身的监控告警功能实现的,具体步骤为:

  1. 在zabbix上配置主机监控项
  2. 选择监控项类型为外部检查
  3. 调用python脚本
  4. 返回监控的结果
  5. 设置触发器
  6. 当监控的结果满足触发器条件时,zabbix首页告警

基线告警

For example,异常日志数目告警,利用python脚本,通过elasticsearch的api,使用query DSL语句,索引每天从零点到当前时间期间,es收集到的所有日志级别大于warning的日志的数量,设置触发器的告警基线,如果python脚本返回的日志数量大于基线,便触发告警。具体python脚本为:

#!/usr/bin/evn python

from elasticsearch import Elasticsearch
import datetime

if __name__=='__main__':

    es = Elasticsearch(hosts="10.0.0.41:9200")

    qbody={
        "query":{
            "bool":{
	            "filter":{
	                "range":{
	                    "@timestamp":{
	                        "gte":"2017-01-01 00:00:00",
	                        "lt":"2017-01-01 23:59:59",
	                        "format":"yyyy-MM-dd HH:mm:ss",
	                        "time_zone":"+08:00"
	                    }
	                }
	            },
	            "must_not":{
	                "match_phrase":{
	                    "syslog_message":""
	                }
                },    
	            "should":[
	                {"term":{"syslog_severity":"warning"}},
	                {"term":{"syslog_severity":"error"}},
	                {"term":{"syslog_severity":"critical"}},
	                {"term":{"syslog_severity":"alert"}},
	                {"term":{"syslog_severity":"emergency"}}
	            ],
	            "minimum_should_match":1
            }
        }
    }
    start_time_str = ''
    start_time_str += str(datetime.date.today()) + ' 00:00:00'
    end_time_str = ''
    end_time_str += str(datetime.date.today()) + ' 23:59:59'

    qbody['query']['bool']['filter']['range']['@timestamp']['gte'] = start_time_str
    qbody['query']['bool']['filter']['range']['@timestamp']['lt'] = end_time_str
    ret = es.search(index="logstash-*",body=qbody,size=0)
    print ret['hits']['total']

连续告警

For example,安全日志异常告警,利用python脚本,通过elasticsearch的api,使用query DSL语句,索引当前时间之前五分钟内,elsaticsearch收集到的关于security类型的日志中出现failure或者failed或者error关键字的日志数量,如果触发器设置的表达式是{XXX.py.last()}>5,则该告警的意义是:当某主机在五分钟之内连续发生大于五次的系统安全事件时触发告警。具体的python脚本为:

#!/usr/bin/evn python

from elasticsearch import Elasticsearch

if __name__=='__main__':

    es = Elasticsearch(hosts=[{"host":"10.0.0.41","port":"9200"}])

    query_body={
		"query": {
		    "bool": {
		        "must": [
			       {
			            "range": {
				            "@timestamp": {
					            "gte": "now-5m",
					            "lt": "now",
					            "time_zone": "+08:00"
			                }
			            }
			        },
			        {
			            "term": {
			                "syslog_facility": "security"
			            }
			        },
		           {
			            "query_string": {
				            "default_field": "syslog_message",
				            "query": "failure OR failed OR error"
			            }
		            }
		        ]
		    }
		}
    }
    ret = es.search(index="logstash-*",body=query_body,size=0)

    print ret['hits']['total']

好像我这篇blog已经被→_→了 次了

Search

    Table of Contents