分析Nginx日志并屏蔽高频访问IP
日志分析脚本
创建python脚本 /data/sh/block_ips.py
并设置可执行
#!/usr/bin/env python3
import re
import os
import time
import json
import argparse
from collections import Counter
from datetime import datetime, timedelta
# 配置参数
LOG_FILE = '/data/bt/wwwlogs/www.xxxxx.com.log' # 日志文件路径
BLOCK_FILE = '/data/bt/block_ips/block_ips.conf' # 封禁IP配置文件
TEMP_BLOCK_FILE = '/data/bt/block_ips/block_ips.conf.tmp' # 临时封禁文件
WHITELIST_FILE = '/data/bt/block_ips/whitelist.json' # 白名单文件
LOG_DATE_FORMAT = '%d/%b/%Y' # 日志中的日期格式
THRESHOLD = 5000 # 封禁阈值(请求次数)
BLOCK_DURATION = 864000 # 封禁时长(秒)
ANALYZE_DAYS = 1 # 分析最近多少天的日志
NGINX_RELOAD_CMD = 'systemctl reload nginx' # 重载Nginx命令
def load_whitelist():
"""加载IP白名单"""
try:
if os.path.exists(WHITELIST_FILE):
with open(WHITELIST_FILE, 'r') as f:
return json.load(f)
return []
except Exception as e:
print(f"加载白名单失败: {e}")
return []
def analyze_logs():
"""分析日志文件,找出需要封禁的IP"""
print(f"开始分析日志文件: {LOG_FILE}")
ip_counter = Counter()
whitelist = load_whitelist()
try:
# 计算需要分析的日期范围
today = datetime.now()
date_range = [(today - timedelta(days=i)).strftime(LOG_DATE_FORMAT)
for i in range(ANALYZE_DAYS)]
with open(LOG_FILE, 'r') as f:
line_count = 0
for line in f:
line_count += 1
if line_count % 100000 == 0:
print(f"已处理 {line_count} 行日志")
# 提取IP地址(假设格式为日志的第一个字段)
match = re.search(r'^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', line)
if not match:
continue
ip = match.group(1)
# 跳过白名单IP
if ip in whitelist:
continue
# 检查日志时间(假设格式为 [day/month/year:hour:minute:second zone])
date_match = re.search(r'\[(\d{2}/\w+/\d{4}):', line)
if date_match and date_match.group(1) in date_range:
ip_counter[ip] += 1
except Exception as e:
print(f"分析日志时出错: {e}")
return []
# 找出超过阈值的IP
blocked_ips = {ip: count for ip, count in ip_counter.items() if count > THRESHOLD}
print(f"分析完成,共找到 {len(blocked_ips)} 个需要封禁的IP")
return blocked_ips
def load_existing_blocks():
"""加载现有的封禁列表和时间"""
existing_blocks = {}
if os.path.exists(BLOCK_FILE):
try:
with open(BLOCK_FILE, 'r') as f:
for line in f:
# 匹配格式: # 封禁IP: 1.2.3.4, 封禁时间: 1620000000, 解封时间: 1620864000
match = re.search(r'# 封禁IP: (\d+\.\d+\.\d+\.\d+), 封禁时间: (\d+), 解封时间: (\d+)', line)
if match:
ip = match.group(1)
block_time = int(match.group(2))
unblock_time = int(match.group(3))
# 只保留未过期的封禁
if unblock_time > time.time():
existing_blocks[ip] = {
'block_time': block_time,
'unblock_time': unblock_time,
'is_new': False # 标记为现有封禁,不是新添加的
}
except Exception as e:
print(f"加载现有封禁列表时出错: {e}")
return existing_blocks
def generate_block_file(new_blocks, existing_blocks):
"""生成Nginx封禁IP配置文件"""
try:
current_time = int(time.time())
# 合并新封禁和现有封禁
all_blocks = existing_blocks.copy()
for ip, count in new_blocks.items():
if ip not in all_blocks:
all_blocks[ip] = {
'block_time': current_time,
'unblock_time': current_time + BLOCK_DURATION,
'is_new': True # 标记为新封禁
}
print(f"总共有 {len(all_blocks)} 个IP被封禁(包括现有封禁)")
# 生成临时封禁文件
with open(TEMP_BLOCK_FILE, 'w') as f:
f.write("# 自动生成的IP封禁列表\n")
f.write(f"# 生成时间: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write("# 格式: # 封禁IP: IP地址, 封禁时间: 时间戳, 解封时间: 时间戳\n")
f.write("# deny IP地址;\n\n")
# 写入封禁IP和时间信息
for ip, info in all_blocks.items():
f.write(f"# 封禁IP: {ip}, 封禁时间: {info['block_time']}, 解封时间: {info['unblock_time']}\n")
f.write(f"deny {ip};\n\n")
# 替换正式封禁文件
os.replace(TEMP_BLOCK_FILE, BLOCK_FILE)
print(f"封禁文件已更新: {BLOCK_FILE}")
# 返回新添加的封禁IP列表
new_ips = [ip for ip, info in all_blocks.items() if info['is_new']]
return new_ips
except Exception as e:
print(f"生成封禁文件时出错: {e}")
return []
def reload_nginx():
"""重新加载Nginx配置"""
try:
# 检查配置文件语法
check_cmd = "nginx -t"
check_result = os.system(check_cmd)
if check_result != 0:
print("Nginx配置检查失败,不会重新加载")
return False
# 重新加载配置
reload_result = os.system(NGINX_RELOAD_CMD)
if reload_result != 0:
print("重新加载Nginx失败")
return False
print("Nginx配置已成功重新加载")
return True
except Exception as e:
print(f"重新加载Nginx时出错: {e}")
return False
def main():
# 解析命令行参数
parser = argparse.ArgumentParser(description='Nginx IP 封禁自动化工具')
parser.add_argument('--threshold', type=int, default=THRESHOLD,
help='封禁阈值(请求次数)')
parser.add_argument('--duration', type=int, default=BLOCK_DURATION,
help='封禁时长(秒)')
parser.add_argument('--days', type=int, default=ANALYZE_DAYS,
help='分析最近多少天的日志')
parser.add_argument('--test', action='store_true',
help='测试模式,不实际修改配置文件')
args = parser.parse_args()
# 更新全局配置
# global THRESHOLD, BLOCK_DURATION, ANALYZE_DAYS
# THRESHOLD = args.threshold
# BLOCK_DURATION = args.duration
# ANALYZE_DAYS = args.days
print(f"开始执行IP封禁分析,阈值: {THRESHOLD},封禁时长: {BLOCK_DURATION}秒")
# 分析日志
new_blocks = analyze_logs()
if not new_blocks:
print("没有找到需要封禁的IP,退出")
return
# 加载现有封禁
existing_blocks = load_existing_blocks()
if args.test:
print("测试模式: 以下IP将被封禁")
for ip, count in new_blocks.items():
print(f" {ip}: {count} 次请求")
print("测试模式,不会修改配置文件")
return
# 生成封禁文件
new_ips = generate_block_file(new_blocks, existing_blocks)
if new_ips:
print(f"新封禁了 {len(new_ips)} 个IP")
# 重新加载Nginx
reload_nginx()
else:
print("没有新增封禁IP")
if __name__ == "__main__":
main()
白名单文件
创建白名单文件 /data/bt/block_ips/whitelist.json
[
"127.0.0.1",
"192.168.1.0/24", # 示例:允许整个网段
"203.0.113.10" # 示例:允许单个IP
]
配置 Nginx
# 在 http 段或 server 段中添加
include /etc/nginx/block_ips.conf;
设置定时任务
crontab -e
30 23 * * * /data/sh/block_ips.py > /var/log/block_ips.log 2>&1
在防火墙中禁用ip
文件名 iptables_ipset_denyip.py
在iptables
防火墙中管理禁用ip功能, 这里我们使用了ipset
来管理防火墙ip集合.
命令 python3 /data/sh/iptables_ipset_denyip.py
#!/usr/bin/env python3
import re
import os
import time
import json
import argparse
import subprocess
from collections import Counter
from datetime import datetime, timedelta
# 脚本说明:
# 1.脚本功能: linux防火墙中批量拒绝高频访问ip/正则匹配指定日志字符串
# 2.需要开启防火墙iptables, 或者在宝塔面板中开启防火墙
# 3.这里默认使用宝塔面板的ipset集合名称in_bt_user_drop_ipset
# 配置参数
LOG_FILE = '/data/bt/wwwlogs/www.jobshaigui.com.log' # 日志文件路径
WHITELIST_FILE = '/data/bt/block_ips/whitelist.json' # 白名单文件
LOG_DATE_FORMAT = '%d/%b/%Y' # 日志中的日期格式
THRESHOLD = 5000 # 普通IP封禁阈值(请求次数)
REGEX_THRESHOLD = 100 # 正则匹配IP的封禁阈值(请求次数)
BLOCK_DURATION = 86400 # 封禁时长(秒,默认为10天)
ANALYZE_DAYS = 1 # 分析最近多少天的日志(1当天)
IPSET_NAME = 'in_bt_user_drop_ipset' # ipset集合名称
IPTABLES_CHAIN = 'INPUT' # iptables链名称
# 正则表达式列表,用于匹配特殊请求模式
REGEX_PATTERNS = [
#r'claudebot',
#r'bingbot',
]
def log_message(message):
"""记录日志"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
log_entry = f"[{timestamp}] {message}"
print(log_entry)
def load_whitelist():
"""加载IP白名单"""
try:
if os.path.exists(WHITELIST_FILE):
with open(WHITELIST_FILE, 'r') as f:
return json.load(f)
return []
except Exception as e:
log_message(f"加载白名单失败: {e}")
return []
def analyze_logs():
"""分析日志文件,找出需要封禁的IP"""
log_message(f"开始分析日志文件: {LOG_FILE}")
ip_counter = Counter() # 普通IP计数器
regex_ip_counter = Counter() # 正则匹配IP计数器
whitelist = load_whitelist()
try:
# 编译正则表达式
compiled_regexes = [re.compile(pattern, re.IGNORECASE) for pattern in REGEX_PATTERNS]
# 计算需要分析的日期范围
today = datetime.now()
date_range = [(today - timedelta(days=i)).strftime(LOG_DATE_FORMAT)
for i in range(ANALYZE_DAYS)]
with open(LOG_FILE, 'r') as f:
line_count = 0
for line in f:
line_count += 1
if line_count % 100000 == 0:
log_message(f"已处理 {line_count} 行日志")
# 提取IP地址(假设格式为日志的第一个字段)
match = re.search(r'^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', line)
if not match:
continue
ip = match.group(1)
# 跳过白名单IP
if ip in whitelist:
continue
# 检查日志时间(假设格式为 [day/month/year:hour:minute:second zone])
date_match = re.search(r'\[(\d{2}/\w+/\d{4}):', line)
if not date_match or date_match.group(1) not in date_range:
continue
# 普通计数
ip_counter[ip] += 1
# 检查是否匹配任何正则表达式
for regex in compiled_regexes:
if regex.search(line):
regex_ip_counter[ip] += 1
break # 只要匹配一个正则就计数,不重复计数
except Exception as e:
log_message(f"分析日志时出错: {e}")
return [], []
# 找出超过阈值的IP
normal_blocked_ips = {ip: count for ip, count in ip_counter.items() if count > THRESHOLD}
regex_blocked_ips = {ip: count for ip, count in regex_ip_counter.items() if count > REGEX_THRESHOLD}
log_message(f"分析完成,普通IP找到 {len(normal_blocked_ips)} 个需要封禁的IP")
log_message(f"分析完成,正则匹配IP找到 {len(regex_blocked_ips)} 个需要封禁的IP")
return normal_blocked_ips, regex_blocked_ips
def create_ipset_if_not_exists():
"""创建ipset集合(如果不存在)"""
try:
# 检查集合是否存在
check_cmd = ['sudo', 'ipset', 'list', IPSET_NAME]
check_result = subprocess.run(check_cmd, capture_output=True, text=True)
if check_result.returncode != 0:
# 集合不存在,创建新集合
create_cmd = ['sudo', 'ipset', 'create', IPSET_NAME, 'hash:ip', 'timeout', str(BLOCK_DURATION)]
create_result = subprocess.run(create_cmd, capture_output=True, text=True)
if create_result.returncode != 0:
log_message(f"创建ipset集合失败: {create_result.stderr}")
return False
# 添加ipset到iptables规则
iptables_cmd = ['sudo', 'iptables', '-I', IPTABLES_CHAIN, '-m', 'set', '--match-set', IPSET_NAME, 'src', '-j', 'DROP']
iptables_result = subprocess.run(iptables_cmd, capture_output=True, text=True)
if iptables_result.returncode != 0:
log_message(f"添加ipset到iptables规则失败: {iptables_result.stderr}")
return False
log_message(f"成功创建ipset集合: {IPSET_NAME}")
return True
except Exception as e:
log_message(f"创建ipset集合时出错: {e}")
return False
def get_currently_blocked_ips():
"""获取当前被封禁的IP列表"""
try:
result = subprocess.run(['sudo', 'ipset', 'list', IPSET_NAME],
capture_output=True, text=True)
if result.returncode != 0:
log_message(f"获取ipset列表失败: {result.stderr}")
return set()
lines = result.stdout.split('\n')
blocked_ips = set()
in_members_section = False
for line in lines:
if line.strip() == "Members:":
in_members_section = True
continue
if in_members_section and line.strip():
# 匹配IP地址(忽略超时信息)
match = re.search(r'(\d+\.\d+\.\d+\.\d+)', line)
if match:
blocked_ips.add(match.group(1))
log_message(f"当前ipset集合中有 {len(blocked_ips)} 个被封禁的IP")
return blocked_ips
except Exception as e:
log_message(f"获取当前封禁IP列表时出错: {e}")
return set()
def block_ip_with_ipset(ip, is_regex=False):
"""使用ipset封禁指定IP"""
try:
# 检查IP是否已被封禁
currently_blocked = get_currently_blocked_ips()
if ip in currently_blocked:
log_message(f"IP {ip} 已被封禁,跳过")
return True
# 添加IP到ipset集合,设置超时时间
reason = "正则匹配" if is_regex else "高频访问"
cmd = ['sudo', 'ipset', 'add', IPSET_NAME, ip, 'timeout', str(BLOCK_DURATION)]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
log_message(f"封禁IP {ip} 失败: {result.stderr}")
return False
log_message(f"成功封禁IP: {ip} (原因: {reason}, 超时: {BLOCK_DURATION}秒)")
return True
except Exception as e:
log_message(f"封禁IP {ip} 时出错: {e}")
return False
def main():
log_message(f"开始执行IP封禁分析,普通IP阈值: {THRESHOLD},正则匹配IP阈值: {REGEX_THRESHOLD}")
log_message(f"封禁时长: {BLOCK_DURATION}秒")
# 创建ipset集合(如果不存在)
if not create_ipset_if_not_exists():
log_message("创建ipset集合失败,退出")
return
# 分析日志
normal_blocks, regex_blocks = analyze_logs()
if not normal_blocks and not regex_blocks:
log_message("没有找到需要封禁的IP,退出")
return
# 获取当前已封禁的IP
currently_blocked = get_currently_blocked_ips()
# 计算需要新封禁的IP
new_normal_ips = [ip for ip in normal_blocks.keys() if ip not in currently_blocked]
new_regex_ips = [ip for ip in regex_blocks.keys() if ip not in currently_blocked]
# 合并所有需要封禁的IP
all_new_ips = new_normal_ips + new_regex_ips
if not all_new_ips:
log_message("没有新的IP需要封禁")
return
log_message(f"准备封禁 {len(all_new_ips)} 个新IP")
# 封禁新IP,优先处理正则匹配的IP(更危险)
for ip in new_regex_ips:
block_ip_with_ipset(ip, is_regex=True)
for ip in new_normal_ips:
block_ip_with_ipset(ip, is_regex=False)
log_message(f"已完成封禁操作,其中正则匹配IP: {len(new_regex_ips)} 个,普通高频IP: {len(new_normal_ips)} 个")
if __name__ == "__main__":
main()
手动管理防火墙中禁用ip
命令 python3 /data/sh/iptables_ipset_manage.py 1.1.1.1,2.2.2.2,3.3.3.0/24 add/del
#!/usr/bin/env python3
import re
import os
import sys
import argparse
import subprocess
from datetime import datetime
import ipaddress # 新增模块用于处理CIDR表示法
# 配置参数
IPSET_NAME = 'in_bt_user_drop_ipset' # ipset集合名称
DEFAULT_ACTION = 'add' # 默认操作
BLOCK_DURATION = 864000 # 默认封禁时间(秒),即10天
IPTABLES_CHAIN = 'INPUT' # iptables链名称
def log_message(message):
"""记录日志"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
log_entry = f"[{timestamp}] {message}"
print(log_entry)
def validate_ip_or_cidr(ip_str):
"""验证IP地址或CIDR格式"""
try:
# 尝试解析为IP地址
ipaddress.ip_address(ip_str)
return True
except ValueError:
try:
# 尝试解析为CIDR网络
ipaddress.ip_network(ip_str, strict=False)
return True
except ValueError:
return False
def validate_ip_list(ip_list):
"""验证IP列表格式"""
invalid_ips = [ip for ip in ip_list if not validate_ip_or_cidr(ip)]
if invalid_ips:
log_message(f"以下IP/CIDR格式无效: {', '.join(invalid_ips)}")
return False
return True
def get_currently_blocked_ips():
"""获取当前被封禁的IP/CIDR列表"""
try:
result = subprocess.run(['sudo', 'ipset', 'list', IPSET_NAME],
capture_output=True, text=True)
if result.returncode != 0:
log_message(f"获取ipset列表失败: {result.stderr}")
return set()
lines = result.stdout.split('\n')
blocked_ips = set()
in_members_section = False
for line in lines:
if line.strip() == "Members:":
in_members_section = True
continue
if in_members_section and line.strip():
# 匹配IP或CIDR(忽略超时信息)
match = re.search(r'(\d+\.\d+\.\d+\.\d+(?:/\d+)?)(?: timeout \d+)?', line)
if match:
blocked_ips.add(match.group(1))
log_message(f"当前ipset集合中有 {len(blocked_ips)} 个被封禁的IP/CIDR")
return blocked_ips
except Exception as e:
log_message(f"获取当前封禁IP/CIDR列表时出错: {e}")
return set()
def create_ipset_if_not_exists():
"""创建ipset集合(如果不存在)"""
try:
# 检查集合是否存在
check_cmd = ['sudo', 'ipset', 'list', IPSET_NAME]
check_result = subprocess.run(check_cmd, capture_output=True, text=True)
if check_result.returncode != 0:
# 集合不存在,创建新集合
create_cmd = ['sudo', 'ipset', 'create', IPSET_NAME, 'hash:net', 'timeout', str(BLOCK_DURATION)]
create_result = subprocess.run(create_cmd, capture_output=True, text=True)
if create_result.returncode != 0:
log_message(f"创建ipset集合失败: {create_result.stderr}")
return False
# 添加ipset到iptables规则
iptables_cmd = ['sudo', 'iptables', '-I', IPTABLES_CHAIN, '-m', 'set', '--match-set', IPSET_NAME, 'src', '-j', 'DROP']
iptables_result = subprocess.run(iptables_cmd, capture_output=True, text=True)
if iptables_result.returncode != 0:
log_message(f"添加ipset到iptables规则失败: {iptables_result.stderr}")
return False
log_message(f"成功创建ipset集合: {IPSET_NAME}")
return True
except Exception as e:
log_message(f"创建ipset集合时出错: {e}")
return False
def add_ip_to_ipset(ip, timeout=BLOCK_DURATION):
"""向ipset集合中添加指定IP或CIDR,并设置超时时间"""
try:
# 检查IP/CIDR是否已被封禁
currently_blocked = get_currently_blocked_ips()
if ip in currently_blocked:
log_message(f"IP/CIDR {ip} 已被封禁,跳过")
return True
# 确保ipset集合存在
if not create_ipset_if_not_exists():
return False
# 向ipset集合中添加IP/CIDR并设置超时
cmd = ['sudo', 'ipset', 'add', IPSET_NAME, ip, 'timeout', str(timeout)]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
log_message(f"添加IP/CIDR {ip} 失败: {result.stderr}")
return False
log_message(f"成功添加封禁IP/CIDR: {ip},超时时间: {timeout}秒")
return True
except Exception as e:
log_message(f"添加IP/CIDR {ip} 时出错: {e}")
return False
def delete_ip_from_ipset(ip):
"""从ipset集合中删除指定IP或CIDR"""
try:
# 检查IP/CIDR是否被封禁
currently_blocked = get_currently_blocked_ips()
if ip not in currently_blocked:
log_message(f"IP/CIDR {ip} 未被封禁,跳过")
return True
# 从ipset集合中删除IP/CIDR
cmd = ['sudo', 'ipset', 'del', IPSET_NAME, ip]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
log_message(f"删除IP/CIDR {ip} 失败: {result.stderr}")
return False
log_message(f"成功删除封禁IP/CIDR: {ip}")
return True
except Exception as e:
log_message(f"删除IP/CIDR {ip} 时出错: {e}")
return False
def batch_process_ips(ip_list, action, timeout=BLOCK_DURATION):
"""批量处理IP/CIDR列表"""
if not validate_ip_list(ip_list):
return 0, len(ip_list)
if action == 'add':
action_func = lambda ip: add_ip_to_ipset(ip, timeout)
action_name = f'添加(超时{timeout}秒)'
else:
action_func = delete_ip_from_ipset
action_name = '删除'
success_count = 0
fail_count = 0
for ip in ip_list:
if action_func(ip):
success_count += 1
else:
fail_count += 1
log_message(f"批量{action_name}操作完成: 成功{action_name.split('(')[0]} {success_count} 个IP/CIDR,失败 {fail_count} 个IP/CIDR")
return success_count, fail_count
def main():
# 解析命令行参数
parser = argparse.ArgumentParser(description='批量添加或删除ipset集合中的IP/CIDR')
parser.add_argument('ips', metavar='IPS', type=str,
help='要处理的IP或CIDR,多个条目用逗号分隔')
parser.add_argument('action', metavar='ACTION', type=str, nargs='?',
default=DEFAULT_ACTION, choices=['add', 'del'],
help=f"操作类型: add(添加,默认)或 del(删除)")
parser.add_argument('-t', '--timeout', type=int, default=BLOCK_DURATION,
help=f"封禁超时时间(秒),仅适用于添加操作,默认值: {BLOCK_DURATION}秒")
args = parser.parse_args()
# 分割IP/CIDR列表
ip_list = [ip.strip() for ip in args.ips.split(',') if ip.strip()]
if not ip_list:
log_message("没有提供有效的IP/CIDR,退出")
sys.exit(1)
log_message(f"开始处理IP/CIDR {args.action} 请求,共 {len(ip_list)} 个条目")
# 批量处理IP/CIDR
success_count, fail_count = batch_process_ips(ip_list, args.action, args.timeout)
# 根据处理结果设置退出码
if fail_count == 0:
sys.exit(0) # 全部成功
elif success_count == 0:
sys.exit(2) # 全部失败
else:
sys.exit(1) # 部分成功
if __name__ == "__main__":
main()
最后更新于 2025-06-25 11:05:58 并被添加「」标签,已有 298 位童鞋阅读过。
本站使用「署名 4.0 国际」创作共享协议,可自由转载、引用,但需署名作者且注明文章出处
此处评论已关闭