分析Nginx日志并屏蔽高频访问IP

日志分析脚本

创建python脚本 /data/sh/block_ips.py 并设置可执行


#!/usr/bin/env python3
import re
import os
import time
import json
import argparse
from collections import Counter
from datetime import datetime, timedelta

# 配置参数
LOG_FILE = '/data/bt/wwwlogs/www.xxxxx.com.log'  # 日志文件路径
BLOCK_FILE = '/data/bt/block_ips/block_ips.conf'  # 封禁IP配置文件
TEMP_BLOCK_FILE = '/data/bt/block_ips/block_ips.conf.tmp'  # 临时封禁文件
WHITELIST_FILE = '/data/bt/block_ips/whitelist.json'  # 白名单文件
LOG_DATE_FORMAT = '%d/%b/%Y'  # 日志中的日期格式
THRESHOLD = 5000  # 封禁阈值(请求次数)
BLOCK_DURATION = 864000  # 封禁时长(秒)
ANALYZE_DAYS = 1  # 分析最近多少天的日志
NGINX_RELOAD_CMD = 'systemctl reload nginx'  # 重载Nginx命令

def load_whitelist():
    """加载IP白名单"""
    try:
        if os.path.exists(WHITELIST_FILE):
            with open(WHITELIST_FILE, 'r') as f:
                return json.load(f)
        return []
    except Exception as e:
        print(f"加载白名单失败: {e}")
        return []

def analyze_logs():
    """分析日志文件,找出需要封禁的IP"""
    print(f"开始分析日志文件: {LOG_FILE}")
    ip_counter = Counter()
    whitelist = load_whitelist()

    try:
        # 计算需要分析的日期范围
        today = datetime.now()
        date_range = [(today - timedelta(days=i)).strftime(LOG_DATE_FORMAT) 
                      for i in range(ANALYZE_DAYS)]

        with open(LOG_FILE, 'r') as f:
            line_count = 0
            for line in f:
                line_count += 1
                if line_count % 100000 == 0:
                    print(f"已处理 {line_count} 行日志")

                # 提取IP地址(假设格式为日志的第一个字段)
                match = re.search(r'^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', line)
                if not match:
                    continue

                ip = match.group(1)

                # 跳过白名单IP
                if ip in whitelist:
                    continue

                # 检查日志时间(假设格式为 [day/month/year:hour:minute:second zone])
                date_match = re.search(r'\[(\d{2}/\w+/\d{4}):', line)
                if date_match and date_match.group(1) in date_range:
                    ip_counter[ip] += 1

    except Exception as e:
        print(f"分析日志时出错: {e}")
        return []

    # 找出超过阈值的IP
    blocked_ips = {ip: count for ip, count in ip_counter.items() if count > THRESHOLD}
    print(f"分析完成,共找到 {len(blocked_ips)} 个需要封禁的IP")

    return blocked_ips

def load_existing_blocks():
    """加载现有的封禁列表和时间"""
    existing_blocks = {}
    if os.path.exists(BLOCK_FILE):
        try:
            with open(BLOCK_FILE, 'r') as f:
                for line in f:
                    # 匹配格式: # 封禁IP: 1.2.3.4, 封禁时间: 1620000000, 解封时间: 1620864000
                    match = re.search(r'# 封禁IP: (\d+\.\d+\.\d+\.\d+), 封禁时间: (\d+), 解封时间: (\d+)', line)
                    if match:
                        ip = match.group(1)
                        block_time = int(match.group(2))
                        unblock_time = int(match.group(3))

                        # 只保留未过期的封禁
                        if unblock_time > time.time():
                            existing_blocks[ip] = {
                                'block_time': block_time,
                                'unblock_time': unblock_time,
                                'is_new': False  # 标记为现有封禁,不是新添加的
                            }
        except Exception as e:
            print(f"加载现有封禁列表时出错: {e}")

    return existing_blocks

def generate_block_file(new_blocks, existing_blocks):
    """生成Nginx封禁IP配置文件"""
    try:
        current_time = int(time.time())

        # 合并新封禁和现有封禁
        all_blocks = existing_blocks.copy()
        for ip, count in new_blocks.items():
            if ip not in all_blocks:
                all_blocks[ip] = {
                    'block_time': current_time,
                    'unblock_time': current_time + BLOCK_DURATION,
                    'is_new': True  # 标记为新封禁
                }

        print(f"总共有 {len(all_blocks)} 个IP被封禁(包括现有封禁)")

        # 生成临时封禁文件
        with open(TEMP_BLOCK_FILE, 'w') as f:
            f.write("# 自动生成的IP封禁列表\n")
            f.write(f"# 生成时间: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
            f.write("# 格式: # 封禁IP: IP地址, 封禁时间: 时间戳, 解封时间: 时间戳\n")
            f.write("# deny IP地址;\n\n")

            # 写入封禁IP和时间信息
            for ip, info in all_blocks.items():
                f.write(f"# 封禁IP: {ip}, 封禁时间: {info['block_time']}, 解封时间: {info['unblock_time']}\n")
                f.write(f"deny {ip};\n\n")

        # 替换正式封禁文件
        os.replace(TEMP_BLOCK_FILE, BLOCK_FILE)
        print(f"封禁文件已更新: {BLOCK_FILE}")

        # 返回新添加的封禁IP列表
        new_ips = [ip for ip, info in all_blocks.items() if info['is_new']]
        return new_ips

    except Exception as e:
        print(f"生成封禁文件时出错: {e}")
        return []

def reload_nginx():
    """重新加载Nginx配置"""
    try:
        # 检查配置文件语法
        check_cmd = "nginx -t"
        check_result = os.system(check_cmd)

        if check_result != 0:
            print("Nginx配置检查失败,不会重新加载")
            return False

        # 重新加载配置
        reload_result = os.system(NGINX_RELOAD_CMD)

        if reload_result != 0:
            print("重新加载Nginx失败")
            return False

        print("Nginx配置已成功重新加载")
        return True
    except Exception as e:
        print(f"重新加载Nginx时出错: {e}")
        return False

def main():
    # 解析命令行参数
    parser = argparse.ArgumentParser(description='Nginx IP 封禁自动化工具')
    parser.add_argument('--threshold', type=int, default=THRESHOLD, 
                        help='封禁阈值(请求次数)')
    parser.add_argument('--duration', type=int, default=BLOCK_DURATION, 
                        help='封禁时长(秒)')
    parser.add_argument('--days', type=int, default=ANALYZE_DAYS, 
                        help='分析最近多少天的日志')
    parser.add_argument('--test', action='store_true', 
                        help='测试模式,不实际修改配置文件')
    args = parser.parse_args()

    # 更新全局配置
    # global THRESHOLD, BLOCK_DURATION, ANALYZE_DAYS
    # THRESHOLD = args.threshold
    # BLOCK_DURATION = args.duration
    # ANALYZE_DAYS = args.days

    print(f"开始执行IP封禁分析,阈值: {THRESHOLD},封禁时长: {BLOCK_DURATION}秒")

    # 分析日志
    new_blocks = analyze_logs()

    if not new_blocks:
        print("没有找到需要封禁的IP,退出")
        return

    # 加载现有封禁
    existing_blocks = load_existing_blocks()

    if args.test:
        print("测试模式: 以下IP将被封禁")
        for ip, count in new_blocks.items():
            print(f"  {ip}: {count} 次请求")
        print("测试模式,不会修改配置文件")
        return

    # 生成封禁文件
    new_ips = generate_block_file(new_blocks, existing_blocks)

    if new_ips:
        print(f"新封禁了 {len(new_ips)} 个IP")
        # 重新加载Nginx
        reload_nginx()
    else:
        print("没有新增封禁IP")

if __name__ == "__main__":
    main()

白名单文件

创建白名单文件 /data/bt/block_ips/whitelist.json

[
    "127.0.0.1",
    "192.168.1.0/24",  # 示例:允许整个网段
    "203.0.113.10"     # 示例:允许单个IP
]

配置 Nginx

# 在 http 段或 server 段中添加
include /etc/nginx/block_ips.conf;

设置定时任务

crontab -e
30 23 * * * /data/sh/block_ips.py > /var/log/block_ips.log 2>&1

此处评论已关闭