"/>
侧边栏壁纸
博主头像
PySuper 博主等级

千里之行,始于足下

  • 累计撰写 218 篇文章
  • 累计创建 15 个标签
  • 累计收到 2 条评论

目 录CONTENT

文章目录
Web

Django--日志处理、路由器

PySuper
2024-11-10 / 0 评论 / 0 点赞 / 7 阅读 / 0 字
温馨提示:
所有牛逼的人都有一段苦逼的岁月。 但是你只要像SB一样去坚持,终将牛逼!!! ✊✊✊

自定义日志处理器

import os
import time
from logging.handlers import TimedRotatingFileHandler


# 自定义日志处理器
class CoreLogFileHandler(TimedRotatingFileHandler):
    """
    CustomTimedRotatingFileHandler
    自定义日志处理器,解决多进程环境下日志写入的冲突问题
    """

    def __init__(self, filename, when="midnight", interval=1, backupCount=20, encoding="utf-8"):
        super().__init__(filename, when=when, interval=interval, backupCount=backupCount, encoding=encoding)

    @property
    def dfn(self):
        """获取当前日志文件的名称"""
        currentTime = int(time.time())
        dstNow = time.localtime(currentTime)[-1]
        t = self.rolloverAt - self.interval
        if self.utc:
            timeTuple = time.gmtime(t)
        else:
            timeTuple = time.localtime(t)
            dstThen = timeTuple[-1]
            if dstNow != dstThen:
                addend = 3600 if dstNow else -3600
                timeTuple = time.localtime(t + addend)
        return self.rotation_filename(self.baseFilename + "." + time.strftime(self.suffix, timeTuple))

    def shouldRollover(self, record):
        """
        判断是否应该执行日志滚动操作:
        1. 如果存档文件已存在,则执行滚动操作
        2. 当前时间达到滚动时间点时执行滚动操作
        """
        dfn = self.dfn
        t = int(time.time())
        return t >= self.rolloverAt or os.path.exists(dfn)

    def doRollover(self):
        """
        执行日志滚动操作:
        1. 关闭当前日志文件句柄并打开新文件
        2. 处理备份日志文件数
        3. 更新下一次的滚动时间点
        """
        if self.stream:
            self.stream.close()
            self.stream = None

        # 获取存档日志文件的名称
        dfn = self.dfn

        # 处理存档文件已存在的情况
        if not os.path.exists(dfn):
            self.rotate(self.baseFilename, dfn)

        # 控制备份日志文件数
        if self.backupCount > 0:
            for s in self.getFilesToDelete():
                os.remove(s)

        # 打开新日志文件
        if not self.delay:
            self.stream = self._open()

        # 更新下一次的滚动时间点
        currentTime = int(time.time())
        newRolloverAt = self.computeRollover(currentTime)
        while newRolloverAt <= currentTime:
            newRolloverAt += self.interval

        # 处理夏令时变化
        if (self.when == "MIDNIGHT" or self.when.startswith("W")) and not self.utc:
            dstAtRollover = time.localtime(newRolloverAt)[-1]
            dstNow = time.localtime(currentTime)[-1]
            if dstNow != dstAtRollover:
                addend = -3600 if not dstNow else 3600
                newRolloverAt += addend
        self.rolloverAt = newRolloverAt

import logging
import os
import time
import traceback
from functools import wraps
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler

# 创建日志目录
LOG_DIR = "logs"
if not os.path.exists(LOG_DIR):
    os.makedirs(LOG_DIR)

# 配置日志格式
formatter = logging.Formatter("[%(asctime)s] [%(levelname)s] [%(module)s:%(funcName)s:%(lineno)d] %(message)s")

# 创建logger实例
logger = logging.getLogger("django")
logger.setLevel(logging.INFO)

# # 添加控制台处理器
# # 创建一个 StreamHandler 实例,默认情况下,它将日志消息输出到控制台(标准输出 sys.stdout)
# console_handler = logging.StreamHandler()
# # 为 console_handler 设置一个格式化器 formatter。格式化器定义了日志消息的输出格式,比如时间戳、日志级别、消息内容等
# console_handler.setFormatter(formatter)
# # 将 console_handler 添加到 logger 中。这意味着 logger 记录的日志消息将通过 console_handler 输出到控制台
# logger.addHandler(console_handler)

# 添加按大小轮转的文件处理器
file_handler = RotatingFileHandler(  # 这是一个文件处理器,它会在日志文件达到指定大小时进行轮转
    os.path.join(LOG_DIR, "django.log"),
    maxBytes=10 * 1024 * 1024,  # 设置日志文件的最大大小为 10 MB, 当日志文件达到这个大小时,会创建一个新的日志文件
    backupCount=5,  # 保留最多 5 个旧的日志文件。超过这个数量的旧日志文件将被删除。
    encoding="utf-8",  # 使用 UTF-8 编码写入日志文件
)
file_handler.setFormatter(formatter)  # 设置日志格式
logger.addHandler(file_handler)  # 将处理器添加到 logger,使其生效。


# # 添加按时间轮转的文件处理器
# time_handler = TimedRotatingFileHandler(  # 这是一个文件处理器,它会在指定的时间间隔后进行轮转
#     os.path.join(LOG_DIR, "django.log"),
#     when="midnight",  # when="midnight": 每天午夜进行日志文件轮转
#     interval=1,  # 轮转间隔为 1 天。
#     backupCount=30,  #  保留最多 30 个旧的日志文件。超过这个数量的旧日志文件将被删除。
#     encoding="utf-8",  # 使用 UTF-8 编码写入日志文件
# )
# time_handler.setFormatter(formatter)
# logger.addHandler(time_handler)


# 性能监控装饰器 - 用于记录函数执行时间
def log_time(func):
    """
    装饰器函数,用于记录被装饰函数的执行时间
    :param func: 被装饰的函数
    :return: wrapper函数
    """

    @wraps(func)  # 保留原函数的元数据
    def wrapper(*args, **kwargs):
        start_time = time.time()  # 记录开始时间
        result = func(*args, **kwargs)  # 执行原函数
        end_time = time.time()  # 记录结束时间
        duration = (end_time - start_time) * 1000  # 计算执行时间(毫秒)
        logger.info(f"函数 {func.__name__} 执行时间: {duration:.2f}ms")
        return result

    return wrapper


# 异常捕获装饰器 - 用于记录函数执行过程中的异常信息
def log_exception(func):
    """
    装饰器函数,用于捕获并记录被装饰函数执行时的异常
    :param func: 被装饰的函数
    :return: wrapper函数
    """

    @wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            # 记录详细的异常信息
            logger.error(f"函数 {func.__name__} 执行异常: {str(e)}")
            logger.error(f"异常堆栈信息: {traceback.format_exc()}")
            raise  # 重新抛出异常,保持原有的异常处理流程

    return wrapper


# 代码块执行时间上下文管理器
class LogTimeContext:
    """
    上下文管理器类,用于记录代码块的执行时间
    使用方法:
    with LogTimeContext("业务操作名称"):
        # 需要计时的代码块
    """

    def __init__(self, name):
        """
        初始化上下文管理器
        :param name: 代码块名称标识
        """
        self.name = name
        self.start_time = None

    def __enter__(self):
        self.start_time = time.time()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        duration = (time.time() - self.start_time) * 1000
        logger.info(f"{self.name} 执行时间: {duration:.2f}ms")


# 批量日志处理器
class BatchLogger:
    """
    批量日志处理类,用于批量记录日志,避免频繁IO操作
    当日志数量达到阈值时自动写入,也可手动调用flush方法写入
    """

    def __init__(self, max_batch=100):
        """
        初始化批量日志处理器
        :param max_batch: 最大批处理数量,默认100条
        """
        self.logs = []  # 日志缓存列表
        self.max_batch = max_batch  # 最大批处理数量

    def add(self, message, level=logging.INFO):
        """
        添加日志消息
        :param message: 日志消息内容
        :param level: 日志级别,默认INFO
        """
        self.logs.append((level, message))
        if len(self.logs) >= self.max_batch:
            self.flush()

    def flush(self):
        """
        将缓存的日志写入到日志文件
        """
        for level, message in self.logs:
            logger.log(level, message)
        self.logs.clear()  # 清空缓存

路由器

api路由

from django.conf import settings
from django.urls import path
from rest_framework.documentation import include_docs_urls
from rest_framework.permissions import IsAuthenticated
from rest_framework.routers import DefaultRouter, SimpleRouter

"""
根据项目的 DEBUG 模式配置 REST Framework 的路由对象
并将生成的路由 URL 列表赋值给 urlpatterns,用于 Django 的 URL 路由配置
用于管理 API 路由和文档生成

if settings.DEBUG:
    # 在 DEBUG 模式下使用 DefaultRouter,它会自动为根 URL 添加一个可浏览的 API 文档(在浏览器中访问时会显示 API 根目录)
    router = DefaultRouter()
else:
    # 在非 DEBUG 模式(如生产环境)中使用 SimpleRouter,不会添加根 URL 的 API 浏览视图
    router = SimpleRouter()

# 在 Django 项目中为该路由配置命名空间,便于在其他地方引用该 URL 路由时使用 reverse("api:route_name") 的方式来生成 URL
app_name = "api"

# 将 router 中注册的路由列表赋值给 urlpatterns,供 Django 项目的主 URL 配置使用
urlpatterns = router.urls
"""


class APIRouter:
    """API路由管理类"""

    def __init__(self):
        # 根据DEBUG模式选择路由器类型
        self.router = DefaultRouter() if settings.DEBUG else SimpleRouter()

    def register_viewset(self, prefix, viewset, basename=None):
        """
        注册视图集到路由
        :param prefix: URL前缀
        :param viewset: 视图集类
        :param basename: 基础名称(可选)
        """
        self.router.register(prefix, viewset, basename=basename)

    def get_urls(self):
        """
        获取所有注册的URL模式
        :return: URL模式列表
        """
        # API文档URL配置
        api_doc_urls = (
            [path("docs/", include_docs_urls(title="API文档", permission_classes=[IsAuthenticated]))]
            if settings.DEBUG
            else []
        )

        # 合并所有URL
        return api_doc_urls + self.router.urls


# 创建路由实例
router = APIRouter()

# 配置命名空间
app_name = "api"

# 生成URL模式列表
urlpatterns = router.get_urls()

"""
使用示例:
from myapp.views import UserViewSet

# 注册视图集
router.register_viewset('users', UserViewSet, basename='user')

# 在views.py中创建视图集
class UserViewSet(viewsets.ModelViewSet):
    queryset = User.objects.all()
    serializer_class = UserSerializer
    permission_classes = [IsAuthenticated]
"""

database路由

# -*- coding: utf-8 -*-
from typing import Any, Optional

from django.conf import settings

# 从 settings 中获取数据库映射配置
DATABASE_MAPPING = settings.DATABASE_APPS_MAPPING


class DatabaseAppsRouter:
    """
    数据库路由器
    用于处理多数据库的读写分离、关系和迁移操作
    """

    def db_for_read(self, model: Any, **hints) -> Optional[str]:
        """
        处理数据库读操作路由
        :param model: 数据模型
        :param hints: 额外的提示信息
        :return: 数据库别名
        """
        # 根据模型的 app_label 确定读取操作的数据库
        if model._meta.app_label in DATABASE_MAPPING:
            return DATABASE_MAPPING[model._meta.app_label]  # 返回指定数据库
        return None  # 默认返回 None

    def db_for_write(self, model: Any, **hints) -> Optional[str]:
        """
        处理数据库写操作路由
        :param model: 数据模型
        :param hints: 额外的提示信息
        :return: 数据库别名
        """
        # 根据模型的 app_label 确定写入操作的数据库
        if model._meta.app_label in DATABASE_MAPPING:
            return DATABASE_MAPPING[model._meta.app_label]  # 返回指定数据库
        return None  # 默认返回 None

    def allow_relation(self, obj1: Any, obj2: Any, **hints) -> Optional[bool]:
        """
        判断两个对象是否允许建立关系
        :param obj1: 第一个对象
        :param obj2: 第二个对象
        :param hints: 额外的提示信息
        :return: True允许/False禁止/None默认行为
        """
        # 获取两个对象所属的数据库
        db_obj1 = DATABASE_MAPPING.get(obj1._meta.app_label)
        db_obj2 = DATABASE_MAPPING.get(obj2._meta.app_label)

        # if db_obj1 and db_obj2:
        #     return db_obj1 == db_obj2
        # return None

        # 如果两个对象在同一个数据库中,允许关系
        if db_obj1 and db_obj2:
            if db_obj1 == db_obj2:
                return True  # 同一数据库
            else:
                return False  # 不同数据库,禁止关系
        return None  # 默认返回 None

    def allow_migrate(self, db: str, app_label: str, model_name: Optional[str] = None, **hints) -> Optional[bool]:
        """
        控制数据库迁移操作
        :param db: 数据库别名
        :param app_label: 应用标签
        :param model_name: 模型名称
        :param hints: 额外的提示信息
        :return: True允许/False禁止/None默认行为
        """
        # 处理特定模型的迁移
        if "model" in hints:
            model = hints["model"]
            if hasattr(model, "_meta"):
                if model._meta.app_label in DATABASE_MAPPING:
                    return db == DATABASE_MAPPING[model._meta.app_label]

        # 处理整个应用的迁移
        if db in DATABASE_MAPPING.values():
            return DATABASE_MAPPING.get(app_label) == db
        elif app_label in DATABASE_MAPPING:
            return False
        return None

    def allow_syncdb(self, db: str, model: Any) -> Optional[bool]:
        """
        控制数据库同步操作(已废弃,仅用于旧版Django)
        :param db: 数据库别名
        :param model: 数据模型
        :return: True允许/False禁止/None默认行为
        """
        if db in DATABASE_MAPPING.values():
            return DATABASE_MAPPING.get(model._meta.app_label) == db
        elif model._meta.app_label in DATABASE_MAPPING:
            return False
        return None

    # # for Django 1.4 - Django 1.6
    # def allow_syncdb(self, db, model):
    #     """Make sure that apps only appear in the related database."""
    #     # 确保应用只在相关数据库中出现
    #     if db in DATABASE_MAPPING.values():
    #         return DATABASE_MAPPING.get(model._meta.app_label) == db  # 检查模型的 app_label 是否与数据库匹配
    #     elif model._meta.app_label in DATABASE_MAPPING:
    #         return False  # 如果模型在数据库映射中,返回 False
    #     return None  # 默认返回 None
    #
    # # Django 1.7 - Django 1.11
    # def allow_migrate(self, db, app_label, model_name=None, **hints):
    #     """控制迁移操作的数据库"""
    #     print(db, app_label, model_name, hints)  # 打印调试信息
    #     # 确保迁移操作只在相关数据库中进行
    #     if db in DATABASE_MAPPING.values():
    #         return DATABASE_MAPPING.get(app_label) == db  # 检查应用标签与数据库是否匹配
    #     elif app_label in DATABASE_MAPPING:
    #         return False  # 如果应用在数据库映射中,返回 False
    #     return None  # 默认返回 None

0
  1. 支付宝打赏

    qrcode alipay
  2. 微信打赏

    qrcode weixin

评论区