自定义日志处理器
import os
import time
from logging.handlers import TimedRotatingFileHandler
# 自定义日志处理器
class CoreLogFileHandler(TimedRotatingFileHandler):
"""
CustomTimedRotatingFileHandler
自定义日志处理器,解决多进程环境下日志写入的冲突问题
"""
def __init__(self, filename, when="midnight", interval=1, backupCount=20, encoding="utf-8"):
super().__init__(filename, when=when, interval=interval, backupCount=backupCount, encoding=encoding)
@property
def dfn(self):
"""获取当前日志文件的名称"""
currentTime = int(time.time())
dstNow = time.localtime(currentTime)[-1]
t = self.rolloverAt - self.interval
if self.utc:
timeTuple = time.gmtime(t)
else:
timeTuple = time.localtime(t)
dstThen = timeTuple[-1]
if dstNow != dstThen:
addend = 3600 if dstNow else -3600
timeTuple = time.localtime(t + addend)
return self.rotation_filename(self.baseFilename + "." + time.strftime(self.suffix, timeTuple))
def shouldRollover(self, record):
"""
判断是否应该执行日志滚动操作:
1. 如果存档文件已存在,则执行滚动操作
2. 当前时间达到滚动时间点时执行滚动操作
"""
dfn = self.dfn
t = int(time.time())
return t >= self.rolloverAt or os.path.exists(dfn)
def doRollover(self):
"""
执行日志滚动操作:
1. 关闭当前日志文件句柄并打开新文件
2. 处理备份日志文件数
3. 更新下一次的滚动时间点
"""
if self.stream:
self.stream.close()
self.stream = None
# 获取存档日志文件的名称
dfn = self.dfn
# 处理存档文件已存在的情况
if not os.path.exists(dfn):
self.rotate(self.baseFilename, dfn)
# 控制备份日志文件数
if self.backupCount > 0:
for s in self.getFilesToDelete():
os.remove(s)
# 打开新日志文件
if not self.delay:
self.stream = self._open()
# 更新下一次的滚动时间点
currentTime = int(time.time())
newRolloverAt = self.computeRollover(currentTime)
while newRolloverAt <= currentTime:
newRolloverAt += self.interval
# 处理夏令时变化
if (self.when == "MIDNIGHT" or self.when.startswith("W")) and not self.utc:
dstAtRollover = time.localtime(newRolloverAt)[-1]
dstNow = time.localtime(currentTime)[-1]
if dstNow != dstAtRollover:
addend = -3600 if not dstNow else 3600
newRolloverAt += addend
self.rolloverAt = newRolloverAt
import logging
import os
import time
import traceback
from functools import wraps
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler
# 创建日志目录
LOG_DIR = "logs"
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR)
# 配置日志格式
formatter = logging.Formatter("[%(asctime)s] [%(levelname)s] [%(module)s:%(funcName)s:%(lineno)d] %(message)s")
# 创建logger实例
logger = logging.getLogger("django")
logger.setLevel(logging.INFO)
# # 添加控制台处理器
# # 创建一个 StreamHandler 实例,默认情况下,它将日志消息输出到控制台(标准输出 sys.stdout)
# console_handler = logging.StreamHandler()
# # 为 console_handler 设置一个格式化器 formatter。格式化器定义了日志消息的输出格式,比如时间戳、日志级别、消息内容等
# console_handler.setFormatter(formatter)
# # 将 console_handler 添加到 logger 中。这意味着 logger 记录的日志消息将通过 console_handler 输出到控制台
# logger.addHandler(console_handler)
# 添加按大小轮转的文件处理器
file_handler = RotatingFileHandler( # 这是一个文件处理器,它会在日志文件达到指定大小时进行轮转
os.path.join(LOG_DIR, "django.log"),
maxBytes=10 * 1024 * 1024, # 设置日志文件的最大大小为 10 MB, 当日志文件达到这个大小时,会创建一个新的日志文件
backupCount=5, # 保留最多 5 个旧的日志文件。超过这个数量的旧日志文件将被删除。
encoding="utf-8", # 使用 UTF-8 编码写入日志文件
)
file_handler.setFormatter(formatter) # 设置日志格式
logger.addHandler(file_handler) # 将处理器添加到 logger,使其生效。
# # 添加按时间轮转的文件处理器
# time_handler = TimedRotatingFileHandler( # 这是一个文件处理器,它会在指定的时间间隔后进行轮转
# os.path.join(LOG_DIR, "django.log"),
# when="midnight", # when="midnight": 每天午夜进行日志文件轮转
# interval=1, # 轮转间隔为 1 天。
# backupCount=30, # 保留最多 30 个旧的日志文件。超过这个数量的旧日志文件将被删除。
# encoding="utf-8", # 使用 UTF-8 编码写入日志文件
# )
# time_handler.setFormatter(formatter)
# logger.addHandler(time_handler)
# 性能监控装饰器 - 用于记录函数执行时间
def log_time(func):
"""
装饰器函数,用于记录被装饰函数的执行时间
:param func: 被装饰的函数
:return: wrapper函数
"""
@wraps(func) # 保留原函数的元数据
def wrapper(*args, **kwargs):
start_time = time.time() # 记录开始时间
result = func(*args, **kwargs) # 执行原函数
end_time = time.time() # 记录结束时间
duration = (end_time - start_time) * 1000 # 计算执行时间(毫秒)
logger.info(f"函数 {func.__name__} 执行时间: {duration:.2f}ms")
return result
return wrapper
# 异常捕获装饰器 - 用于记录函数执行过程中的异常信息
def log_exception(func):
"""
装饰器函数,用于捕获并记录被装饰函数执行时的异常
:param func: 被装饰的函数
:return: wrapper函数
"""
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
# 记录详细的异常信息
logger.error(f"函数 {func.__name__} 执行异常: {str(e)}")
logger.error(f"异常堆栈信息: {traceback.format_exc()}")
raise # 重新抛出异常,保持原有的异常处理流程
return wrapper
# 代码块执行时间上下文管理器
class LogTimeContext:
"""
上下文管理器类,用于记录代码块的执行时间
使用方法:
with LogTimeContext("业务操作名称"):
# 需要计时的代码块
"""
def __init__(self, name):
"""
初始化上下文管理器
:param name: 代码块名称标识
"""
self.name = name
self.start_time = None
def __enter__(self):
self.start_time = time.time()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
duration = (time.time() - self.start_time) * 1000
logger.info(f"{self.name} 执行时间: {duration:.2f}ms")
# 批量日志处理器
class BatchLogger:
"""
批量日志处理类,用于批量记录日志,避免频繁IO操作
当日志数量达到阈值时自动写入,也可手动调用flush方法写入
"""
def __init__(self, max_batch=100):
"""
初始化批量日志处理器
:param max_batch: 最大批处理数量,默认100条
"""
self.logs = [] # 日志缓存列表
self.max_batch = max_batch # 最大批处理数量
def add(self, message, level=logging.INFO):
"""
添加日志消息
:param message: 日志消息内容
:param level: 日志级别,默认INFO
"""
self.logs.append((level, message))
if len(self.logs) >= self.max_batch:
self.flush()
def flush(self):
"""
将缓存的日志写入到日志文件
"""
for level, message in self.logs:
logger.log(level, message)
self.logs.clear() # 清空缓存
路由器
api路由
from django.conf import settings
from django.urls import path
from rest_framework.documentation import include_docs_urls
from rest_framework.permissions import IsAuthenticated
from rest_framework.routers import DefaultRouter, SimpleRouter
"""
根据项目的 DEBUG 模式配置 REST Framework 的路由对象
并将生成的路由 URL 列表赋值给 urlpatterns,用于 Django 的 URL 路由配置
用于管理 API 路由和文档生成
if settings.DEBUG:
# 在 DEBUG 模式下使用 DefaultRouter,它会自动为根 URL 添加一个可浏览的 API 文档(在浏览器中访问时会显示 API 根目录)
router = DefaultRouter()
else:
# 在非 DEBUG 模式(如生产环境)中使用 SimpleRouter,不会添加根 URL 的 API 浏览视图
router = SimpleRouter()
# 在 Django 项目中为该路由配置命名空间,便于在其他地方引用该 URL 路由时使用 reverse("api:route_name") 的方式来生成 URL
app_name = "api"
# 将 router 中注册的路由列表赋值给 urlpatterns,供 Django 项目的主 URL 配置使用
urlpatterns = router.urls
"""
class APIRouter:
"""API路由管理类"""
def __init__(self):
# 根据DEBUG模式选择路由器类型
self.router = DefaultRouter() if settings.DEBUG else SimpleRouter()
def register_viewset(self, prefix, viewset, basename=None):
"""
注册视图集到路由
:param prefix: URL前缀
:param viewset: 视图集类
:param basename: 基础名称(可选)
"""
self.router.register(prefix, viewset, basename=basename)
def get_urls(self):
"""
获取所有注册的URL模式
:return: URL模式列表
"""
# API文档URL配置
api_doc_urls = (
[path("docs/", include_docs_urls(title="API文档", permission_classes=[IsAuthenticated]))]
if settings.DEBUG
else []
)
# 合并所有URL
return api_doc_urls + self.router.urls
# 创建路由实例
router = APIRouter()
# 配置命名空间
app_name = "api"
# 生成URL模式列表
urlpatterns = router.get_urls()
"""
使用示例:
from myapp.views import UserViewSet
# 注册视图集
router.register_viewset('users', UserViewSet, basename='user')
# 在views.py中创建视图集
class UserViewSet(viewsets.ModelViewSet):
queryset = User.objects.all()
serializer_class = UserSerializer
permission_classes = [IsAuthenticated]
"""
database路由
# -*- coding: utf-8 -*-
from typing import Any, Optional
from django.conf import settings
# 从 settings 中获取数据库映射配置
DATABASE_MAPPING = settings.DATABASE_APPS_MAPPING
class DatabaseAppsRouter:
"""
数据库路由器
用于处理多数据库的读写分离、关系和迁移操作
"""
def db_for_read(self, model: Any, **hints) -> Optional[str]:
"""
处理数据库读操作路由
:param model: 数据模型
:param hints: 额外的提示信息
:return: 数据库别名
"""
# 根据模型的 app_label 确定读取操作的数据库
if model._meta.app_label in DATABASE_MAPPING:
return DATABASE_MAPPING[model._meta.app_label] # 返回指定数据库
return None # 默认返回 None
def db_for_write(self, model: Any, **hints) -> Optional[str]:
"""
处理数据库写操作路由
:param model: 数据模型
:param hints: 额外的提示信息
:return: 数据库别名
"""
# 根据模型的 app_label 确定写入操作的数据库
if model._meta.app_label in DATABASE_MAPPING:
return DATABASE_MAPPING[model._meta.app_label] # 返回指定数据库
return None # 默认返回 None
def allow_relation(self, obj1: Any, obj2: Any, **hints) -> Optional[bool]:
"""
判断两个对象是否允许建立关系
:param obj1: 第一个对象
:param obj2: 第二个对象
:param hints: 额外的提示信息
:return: True允许/False禁止/None默认行为
"""
# 获取两个对象所属的数据库
db_obj1 = DATABASE_MAPPING.get(obj1._meta.app_label)
db_obj2 = DATABASE_MAPPING.get(obj2._meta.app_label)
# if db_obj1 and db_obj2:
# return db_obj1 == db_obj2
# return None
# 如果两个对象在同一个数据库中,允许关系
if db_obj1 and db_obj2:
if db_obj1 == db_obj2:
return True # 同一数据库
else:
return False # 不同数据库,禁止关系
return None # 默认返回 None
def allow_migrate(self, db: str, app_label: str, model_name: Optional[str] = None, **hints) -> Optional[bool]:
"""
控制数据库迁移操作
:param db: 数据库别名
:param app_label: 应用标签
:param model_name: 模型名称
:param hints: 额外的提示信息
:return: True允许/False禁止/None默认行为
"""
# 处理特定模型的迁移
if "model" in hints:
model = hints["model"]
if hasattr(model, "_meta"):
if model._meta.app_label in DATABASE_MAPPING:
return db == DATABASE_MAPPING[model._meta.app_label]
# 处理整个应用的迁移
if db in DATABASE_MAPPING.values():
return DATABASE_MAPPING.get(app_label) == db
elif app_label in DATABASE_MAPPING:
return False
return None
def allow_syncdb(self, db: str, model: Any) -> Optional[bool]:
"""
控制数据库同步操作(已废弃,仅用于旧版Django)
:param db: 数据库别名
:param model: 数据模型
:return: True允许/False禁止/None默认行为
"""
if db in DATABASE_MAPPING.values():
return DATABASE_MAPPING.get(model._meta.app_label) == db
elif model._meta.app_label in DATABASE_MAPPING:
return False
return None
# # for Django 1.4 - Django 1.6
# def allow_syncdb(self, db, model):
# """Make sure that apps only appear in the related database."""
# # 确保应用只在相关数据库中出现
# if db in DATABASE_MAPPING.values():
# return DATABASE_MAPPING.get(model._meta.app_label) == db # 检查模型的 app_label 是否与数据库匹配
# elif model._meta.app_label in DATABASE_MAPPING:
# return False # 如果模型在数据库映射中,返回 False
# return None # 默认返回 None
#
# # Django 1.7 - Django 1.11
# def allow_migrate(self, db, app_label, model_name=None, **hints):
# """控制迁移操作的数据库"""
# print(db, app_label, model_name, hints) # 打印调试信息
# # 确保迁移操作只在相关数据库中进行
# if db in DATABASE_MAPPING.values():
# return DATABASE_MAPPING.get(app_label) == db # 检查应用标签与数据库是否匹配
# elif app_label in DATABASE_MAPPING:
# return False # 如果应用在数据库映射中,返回 False
# return None # 默认返回 None
评论区