Rsyslog 是 RHEL6 开始的默认系统 syslog 应用软件(当然,RHEL 自带的版本较低,实际官方稳定版本已经到 v8 了)。官网地址:http://www.rsyslog.com
目前 Rsyslog 本身也支持多种输入输出方式,内部逻辑判断和模板处理。
不同模块插件在 rsyslog 流程中发挥作用的原理,可以阅读:http://www.rsyslog.com/doc/master/configuration/modules/workflow.html
流程中可以使用 mmnormalize 组件来完成数据的切分(相当于 logstash 的 filters/grok 功能)。
rsyslog 从 v7 版本开始带有 omelasticsearch 插件可以直接写入数据到 elasticsearch 集群,配合 mmnormalize 的使用示例见: http://puppetlabs.com/blog/use-rsyslog-and-elasticsearch-powerful-log-aggregation
而 normalize 语法说明见: http://www.liblognorm.com/files/manual/index.html?sampledatabase.htm
类似的还有 mmjsonparse 组件。
使用示例见:http://blog.sematext.com/2013/05/28/structured-logging-with-rsyslog-and-elasticsearch/
虽然 Rsyslog 很早就支持直接输出数据给 elasticsearch,但如果你使用的是 v8.4 以下的版本,我们这里并不推荐这种方式。因为normalize 语法还是比较简单,只支持时间,字符串,数字,ip 地址等几种。在复杂条件下远比不上完整的正则引擎。
那么,怎么使用 rsyslog 作为日志收集和传输组件,来配合 logstash 工作呢?
如果只是简单的 syslog 数据,直接单个 logstash 运行即可,配置方式见本书 2.4 章节。
如果你运行着一个高负荷运行的 rsyslog 系统,每秒传输的数据远大过单个 logstash 能处理的能力,你可以运行多个 logstash 在多个端口,然后让 rsyslog 做轮训转发(事实上,单个 omfwd 本身的转发能力也有限,所以推荐这种做法):
Ruleset( name="forwardRuleSet" ) {
Action ( type="mmsequence" mode="instance" from="0" to="4" var="$.seq" )
if $.seq == "0" then {
action (type="omfwd" Target="127.0.0.1" Port="5140" Protocol="tcp" queue.size="150000" queue.dequeuebatchsize="2000" )
}
if $.seq == "1" then {
action (type="omfwd" Target="127.0.0.1" Port="5141" Protocol="tcp" queue.size="150000" queue.dequeuebatchsize="2000" )
}
if $.seq == "2" then {
action (type="omfwd" Target="127.0.0.1" Port="5142" Protocol="tcp" queue.size="150000" queue.dequeuebatchsize="2000" )
}
if $.seq == "3" then {
action (type="omfwd" Target="127.0.0.1" Port="5143" Protocol="tcp" queue.size="150000" queue.dequeuebatchsize="2000" )
}
}
如果你使用的是最新的 v8.4 版 rsyslog,其中有一个新加入的 mmexternal 模块。该模块是在 v7 的 omprog 模块基础上发展出来的,可以让你使用任意脚本,接收标准输入,自行处理以后再输出回来,而 rsyslog 接收到这个输出再进行下一步处理,这就解决了前面提到的 “normalize 语法太简单”的问题!
下面是使用 rsyslog 的 mmexternal 和 omelasticsearch 完成 Nginx 访问日志直接解析存储的配置。
rsyslog 配置如下:
module(load="imuxsock" SysSock.RateLimit.Interval="0")
module(load="mmexternal")
module(load="omelasticsearch")
template(name="logstash-index" type="list") {
constant(value="logstash-")
property(name="timereported" dateFormat="rfc3339" position.from="1" position.to="4")
constant(value=".")
property(name="timereported" dateFormat="rfc3339" position.from="6" position.to="7")
constant(value=".")
property(name="timereported" dateFormat="rfc3339" position.from="9" position.to="10")
}
template( name="nginx-log" type="string" string="%msg%\n" )
if ( $syslogfacility-text == 'local6' and $programname startswith 'wb-www-access-' and not ($msg contains '/2/remind/unread_count' or $msg contains '/2/remind/group_unread') ) then
{
action( type="mmexternal" binary="/usr/local/bin/rsyslog-nginx-elasticsearch.py" interface.input="fulljson" )
action( type="omelasticsearch"
template="nginx-log"
server="eshost.example.com"
searchIndex="logstash-index"
searchType="nginxaccess"
asyncrepl="on"
bulkmode="on"
queue.type="linkedlist"
queue.size="10000"
queue.dequeuebatchsize="2000"
dynSearchIndex="on"
)
stop
}
其中调用的 python 脚本示例如下:
#! /usr/bin/python
import sys
import json
import datetime
def nginxLog(data):
hostname = data['hostname']
logline = data['msg']
time_local, http_x_up_calling_line_id, request, http_user_agent, staTus, remote_addr, http_x_log_uid, http_referer, request_time, body_bytes_sent, http_x_forwarded_proto, http_x_forwarded_for, request_uid, http_host, http_cookie, upstream_response_time = logline.split('`')
try:
upstream_response_time = float(upstream_response_time)
except:
upstream_response_time = None
method, uri, verb = request.split(' ')
arg = {}
try:
url_path, url_args = uri.split('?')
for args in url_args.split('&'):
k, v = args.split('=')
arg[k] = v
except:
url_path = uri
# Why %z do not implement?
ret = {
"@timestamp": datetime.datetime.strptime(time_local, ' [%d/%b/%Y:%H:%M:%S +0800]').strftime('%FT%T+0800'),
"host": hostname,
"method": method.lstrip('"'),
"url_path": url_path,
"url_args": arg,
"verb": verb.rstrip('"'),
"http_x_up_calling_line_id": http_x_up_calling_line_id,
"http_user_agent": http_user_agent,
"status": int(staTus),
"remote_addr": remote_addr.strip('[]'),
"http_x_log_uid": http_x_log_uid,
"http_referer": http_referer,
"request_time": float(request_time),
"body_bytes_sent": int(body_bytes_sent),
"http_x_forwarded_proto": http_x_forwarded_proto,
"http_x_forwarded_for": http_x_forwarded_for,
"request_uid": request_uid,
"http_host": http_host,
"http_cookie": http_cookie,
"upstream_response_time": upstream_response_time
}
return ret
def onInit():
""" Do everything that is needed to initialize processing
"""
def onReceive(msg):
data = json.loads(msg)
ret = nginxLog(data)
print json.dumps({'msg': ret})
def onExit():
""" Do everything that is needed to finish processing. This is being called immediately before exiting.
"""
# most often, nothing to do here
onInit()
keepRunning = 1
while keepRunning == 1:
msg = sys.stdin.readline()
if msg:
msg = msg[:len(msg)-1]
onReceive(msg)
sys.stdout.flush()
else:
keepRunning = 0
onExit()
sys.stdout.flush()
注意输出的时候,顶层的 key 是不能变的,msg 还得叫 msg,如果是 hostname 还得叫 hostname ,等等。否则,rsyslog 会当做处理无效,直接传递原有数据内容给下一步。
慎用提示
从实际运行效果看,rsyslog 对 mmexternal 的程序没有最大并发数限制!所以如果你发送的数据量较大的事情,rsyslog 并不会像普通的转发模式那样缓冲在磁盘队列上,而是持续 fork 出新的 mmexternal 程序,几千个进程后,你的服务器就挂了!!