要使用Logstash收集Nginx默认格式的日志并按天写入MySQL,请按照以下步骤操作:
1. 安装Logstash JDBC插件
确保已安装logstash-output-jdbc
插件:
复制bin/logstash-plugin install logstash-output-jdbc
2. 准备MySQL JDBC驱动
下载MySQL Connector/J驱动(如mysql-connector-java-8.0.x.jar
),并将其放入Logstash的插件目录(如logstash/vendor/jar/jdbc/
)。
3. 创建MySQL表结构
在MySQL中创建日志数据库和按天分表的存储结构。例如:
复制CREATE DATABASE IF NOT EXISTS nginx_logs; -- 手动创建表(或通过事件自动创建) CREATE TABLE IF NOT EXISTS access_logs_20240517 ( id INT AUTO_INCREMENT PRIMARY KEY, client_ip VARCHAR(45), remote_user VARCHAR(255), timestamp DATETIME, request VARCHAR(1024), status INT, bytes_sent INT, referrer VARCHAR(1024), user_agent VARCHAR(1024) );
4. 配置Logstash
创建Logstash配置文件(如nginx-to-mysql.conf
),内容如下:
输入配置
复制input { file { path => "/var/log/nginx/access.log" # Nginx日志路径 start_position => "beginning" # 从文件开头读取 sincedb_path => "/dev/null" # 测试时忽略sincedb(生产环境请移除) } }
过滤配置
复制filter { grok { match => { "message" => "%{COMBINEDAPACHELOG}" } # 使用内置模式解析Nginx日志 remove_field => ["message"] # 可选:删除原始日志 } date { match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ] # 解析日志时间 target => "@timestamp" # 替换事件时间戳 timezone => "Asia/Shanghai" # 按需调整时区 } }
输出配置
复制output { jdbc { driver_class => "com.mysql.cj.jdbc.Driver" # JDBC驱动类 connection_string => "jdbc:mysql://localhost:3306/nginx_logs?useSSL=false&serverTimezone=Asia/Shanghai" username => "logstash_user" # 数据库用户名 password => "secure_password" # 数据库密码 statement => [ "INSERT INTO access_logs_%{+YYYYMMdd} (client_ip, remote_user, timestamp, request, status, bytes_sent, referrer, user_agent) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", "%{clientip}", # 客户端IP "%{auth}", # 远程用户(对应Nginx的$remote_user) "%{@timestamp}", # 解析后的时间戳 "%{request}", # 请求路径(如 "GET /index.html HTTP/1.1") "%{response}", # HTTP状态码(如200、404) "%{bytes}", # 发送的字节数 "%{referrer}", # 来源页面 "%{agent}" # 用户代理 ] } }
5. 运行Logstash
启动Logstash并指定配置文件:
复制bin/logstash -f nginx-to-mysql.conf
6. 自动创建MySQL表(可选)
通过MySQL事件自动创建每日表:
复制DELIMITER $$ CREATE EVENT IF NOT EXISTS create_daily_access_log_table ON SCHEDULE EVERY 1 DAY STARTS CURRENT_DATE + INTERVAL 1 DAY DO BEGIN SET @table_name = CONCAT('access_logs_', DATE_FORMAT(NOW(), '%Y%m%d')); SET @sql = CONCAT('CREATE TABLE IF NOT EXISTS ', @table_name, ' ( id INT AUTO_INCREMENT PRIMARY KEY, client_ip VARCHAR(45), remote_user VARCHAR(255), timestamp DATETIME, request VARCHAR(1024), status INT, bytes_sent INT, referrer VARCHAR(1024), user_agent VARCHAR(1024) )'); PREPARE stmt FROM @sql; EXECUTE stmt; DEALLOCATE PREPARE stmt; END $$ DELIMITER ;
注意事项
- 时区问题:确保Logstash的
date
插件和MySQL连接的时区配置一致。 - 字段映射:验证grok解析后的字段名称是否与SQL语句中的占位符匹配。
- 性能优化:在高流量场景下,调整
jdbc
插件的flush_size
和idle_flush_time
参数以提高性能。 - 错误处理:添加
stdout { codec => rubydebug }
输出以调试解析问题。
通过以上步骤,Logstash将每天自动将Nginx日志写入以日期后缀命名的MySQL表中。