" name="sm-site-verification"/>
侧边栏壁纸
博主头像
PySuper 博主等级

千里之行,始于足下

  • 累计撰写 231 篇文章
  • 累计创建 15 个标签
  • 累计收到 2 条评论

目 录CONTENT

文章目录
Web

Django--其他工具类

PySuper
2024-11-10 / 0 评论 / 0 点赞 / 16 阅读 / 0 字
温馨提示:
所有牛逼的人都有一段苦逼的岁月。 但是你只要像SB一样去坚持,终将牛逼!!! ✊✊✊

Excel工具

import logging
from datetime import datetime
from typing import Any, Dict, List, Optional, Union

import pandas as pd
from django.http.response import HttpResponse
from openpyxl import Workbook
from openpyxl.styles import Alignment, Font, PatternFill
from openpyxl.utils import get_column_letter

logger = logging.getLogger(__name__)


# 获取表格单元格的类型并返回字符类型
def get_cell_value(cell: Any) -> str:
    """
    获取单元格的值并转换为字符串
    :param cell: 单元格对象
    :return: 字符串格式的单元格值
    """
    if isinstance(cell, (int, float)):
        return str(int(cell)).strip()
    return str(cell).strip()


# action获取通用设置更新人和更新时间
def get_update_params(request, is_create: bool = False) -> Dict:
    """
    获取更新参数
    :param request: 请求对象
    :param is_create: 是否为创建操作
    :return: 包含更新信息的字典
    """
    now = datetime.now()
    params = {"update_by": request.user, "update_date": now}
    if is_create:
        params.update({"create_by": request.user, "create_date": now})
    return params


# 基于 pandas DataFrame 下载 Excel
def pandas_download_excel(
    data: Union[List[Dict], pd.DataFrame], filename: str = None, sheet_name: str = "Sheet1", index: bool = False
) -> HttpResponse:
    """
    使用pandas导出Excel文件
    :param data: 要导出的数据
    :param filename: 文件名
    :param sheet_name: 工作表名称
    :param index: 是否包含索引
    :return: HTTP响应对象
    """
    try:
        if not isinstance(data, pd.DataFrame):
            df = pd.DataFrame(data)
        else:
            df = data

        response = HttpResponse(content_type="application/vnd.ms-excel")
        if filename:
            response["Content-Disposition"] = f'attachment; filename="{filename}.xlsx"'

        with pd.ExcelWriter(response, engine="openpyxl") as writer:
            df.to_excel(writer, index=index, sheet_name=sheet_name)

        return response
    except Exception as e:
        logger.error(f"导出Excel失败: {str(e)}")
        raise


def read_excel(
    file_path: str,
    sheet_name: Optional[Union[str, int]] = 0,
    skiprows: int = 0,
    usecols: Optional[List] = None,
) -> pd.DataFrame:
    """
    读取Excel文件
    :param file_path: Excel文件路径
    :param sheet_name: 工作表名称或索引
    :param skiprows: 跳过的行数
    :param usecols: 使用的列
    :return: DataFrame对象
    """
    try:
        return pd.read_excel(file_path, sheet_name=sheet_name, skiprows=skiprows, usecols=usecols)
    except Exception as e:
        logger.error(f"读取Excel失败: {str(e)}")
        raise


def create_styled_excel(
    data: List[Dict], headers: List[str], filename: str, sheet_name: str = "Sheet1"
) -> HttpResponse:
    """
    创建带样式的Excel文件
    :param data: 数据列表
    :param headers: 表头列表
    :param filename: 文件名
    :param sheet_name: 工作表名称
    :return: HTTP响应对象
    """
    wb = Workbook()
    ws = wb.active
    ws.title = sheet_name

    # 设置表头样式
    header_fill = PatternFill(start_color="CCCCCC", end_color="CCCCCC", fill_type="solid")
    header_font = Font(bold=True)
    header_alignment = Alignment(horizontal="center", vertical="center")

    # 写入表头
    for col, header in enumerate(headers, 1):
        cell = ws.cell(row=1, column=col, value=header)
        cell.fill = header_fill
        cell.font = header_font
        cell.alignment = header_alignment

    # 写入数据
    for row, item in enumerate(data, 2):
        for col, key in enumerate(headers, 1):
            cell = ws.cell(row=row, column=col, value=item.get(key, ""))
            cell.alignment = Alignment(horizontal="center", vertical="center")

    # 调整列宽
    for col in range(1, len(headers) + 1):
        ws.column_dimensions[get_column_letter(col)].width = 15

    response = HttpResponse(content_type="application/vnd.ms-excel")
    response["Content-Disposition"] = f'attachment; filename="{filename}.xlsx"'
    wb.save(response)
    return response


def merge_excel_files(file_paths: List[str], output_path: str) -> None:
    """
    合并多个Excel文件
    :param file_paths: Excel文件路径列表
    :param output_path: 输出文件路径
    """
    try:
        dfs = []
        for file_path in file_paths:
            df = pd.read_excel(file_path)
            dfs.append(df)

        merged_df = pd.concat(dfs, ignore_index=True)
        merged_df.to_excel(output_path, index=False)
    except Exception as e:
        logger.error(f"合并Excel文件失败: {str(e)}")
        raise

import string
import xlsxwriter


def export_to_excel(filename, col_items, datas):
    """将信息导出为excel文件并返回给前端

    Args:
        filename (str): 文件名
        col_items (list): 列名
        datas (list): 数据信息

    Returns:
        HttpResponse: 包含excel文件的响应对象
    """
    import io

    # 创建一个内存中的文件对象
    output = io.BytesIO()

    # 生成.xlsx文件
    workbook = xlsxwriter.Workbook(output, {"in_memory": True})

    # 设置sheet页签名称
    table = workbook.add_worksheet(filename)

    # 定义格式
    header_format = workbook.add_format(
        {
            "align": "center",
            "bg_color": "gray",
            "color": "white",
            "font": "宋体",
            "bold": True,
            "border": 1,
        }
    )
    data_format = workbook.add_format({"align": "center", "border": 1, "font_name": "Calibri Light"})

    # 设置列名及宽度
    for idx, (col_name, col_width) in enumerate(col_items):
        col_code = string.ascii_uppercase[idx]
        table.write(0, idx, col_name, header_format)
        table.set_column(f"{col_code}:{col_code}", col_width)

    # 写入数据
    for row, item in enumerate(datas, start=1):
        for col, value in enumerate(item):
            table.write(row, col, value, data_format)

    # 关闭工作簿
    workbook.close()

    # 将文件指针移动到开始位置
    output.seek(0)

    # 创建HttpResponse对象并返回
    response = HttpResponse(output, content_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
    # response["Content-Disposition"] = f"attachment; filename={filename}.xlsx"
    return response



if __name__ == "__main__":

    # 构造数据
    faker_obj = Faker(locale="zh")

    # 文件名
    filename = "人员名单"

    # 列名
    cols = [("序号", 10), ("姓名", 20)]

    # 构造数据
    datas = []
    for i in range(10):
        datas.append((i + 1, faker_obj.name()))

    # 将数据信息导出到excel文件中
    export_to_excel(filename, cols, datas)

import time
from openpyxl import Workbook
from django.http import HttpResponse
from myapp.models import User

def export_data(request):
    # 记录开始时间
    start_time = time.time()

    # 获取数据
    data = User.objects.all()[:10000]

    # 创建 Excel 文件
    wb = Workbook()
    ws = wb.active
    ws.append(['姓名', '学号', '性别', '手机'])

    # 流式写入数据
    for user in data.iterator():  # 使用 iterator 避免一次性加载所有数据
        ws.append([user.nick_name, user.username, user.gender, user.phone])

    # 将文件作为 HTTP 响应返回
    response = HttpResponse(content_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
    response['Content-Disposition'] = 'attachment; filename=users.xlsx'
    wb.save(response)

    # 记录结束时间并输出
    end_time = time.time()
    print(f"Excel 写入并返回耗时:{end_time - start_time:.2f} 秒")

    return response

下面这个方法,下载数据倒也挺快的,不过不好抽取出来,后面看情况封装

如果有优化,就再添加进来

def download(self, request):
    """下载数据"""
    self.filter_users(request)

    wb = Workbook()
    ws = wb.active
    ws.append(["姓名", "学号", "性别", "手机", "专业", "班级"])

    # 流式写入数据
    for user in self.queryset.iterator():
        ws.append(
            [
                user.nick_name,
                user.username,
                "男" if user.gender == 1 else "女",
                user.phone,
                user.dept.pid.name if user.dept.pid else None,
                user.dept.name,
            ]
        )
    response = HttpResponse(content_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
    wb.save(response)
    return response

驼峰命名


import inflection


def convert_dict_keys_to_camel_case(data):
    """
    将字典中的键转换为驼峰命名法
    :param data: 字典数据
    :return: 转换后的字典数据
    """
    # 判断数据类型是否为字典
    if isinstance(data, dict):
        new_data = {}
        for k, v in data.items():
            # 仅在需要时转换键
            new_key = inflection.camelize(k, uppercase_first_letter=False) if '_' in k else k
            # 递归转换字典中的值
            new_data[new_key] = convert_dict_keys_to_camel_case(v)
        return new_data
    # 判断数据类型是否为列表
    elif isinstance(data, list):
        # 递归转换列表中的每一项
        return [convert_dict_keys_to_camel_case(item) for item in data]
    else:
        # 返回原始数据
        return data


Redis缓存

import hashlib
import json
import logging
from typing import Any, Optional

from django.core.cache import caches

logger = logging.getLogger(__name__)


class FastJsonRedisSerializer:
    """自定义 Redis 序列化器,使用 JSON 序列化和反序列化对象"""

    def serialize(self, obj: Any) -> Optional[bytes]:
        """
        序列化对象为 JSON 字符串
        :param obj: 要序列化的对象
        :return: 序列化后的字节串
        """
        if obj is None:
            return None
        return json.dumps(obj, default=str).encode("utf-8")

    def deserialize(self, data: Optional[bytes]) -> Any:
        """
        反序列化 JSON 字符串为对象
        :param data: 要反序列化的字节串
        :return: 反序列化后的对象
        """
        if not data:
            return None
        return json.loads(data.decode("utf-8"))


class StringRedisSerializer:
    """字符串类型 Redis 键的序列化器"""

    def serialize(self, obj: str) -> bytes:
        """序列化字符串为字节数组"""
        return json.dumps(obj).encode("utf-8")

    def deserialize(self, data: Optional[bytes]) -> Optional[str]:
        """反序列化字节数组为字符串"""
        return data.decode("utf-8") if data else None


def generate_cache_key(target: Any, method: Any, *params: Any) -> str:
    """
    生成缓存键,使用 SHA-256 哈希
    :param target: 目标对象
    :param method: 方法对象
    :param params: 参数列表
    :return: 哈希后的缓存键
    """
    container = {
        "class": str(target.__class__.__name__),
        "methodName": method.__name__,
        "package": target.__module__,
        "params": params,
    }
    json_string = json.dumps(container, default=str)
    return hashlib.sha256(json_string.encode("utf-8")).hexdigest()


class CustomCache:
    """自定义缓存类,使用 Django 的缓存系统,封装常用的缓存操作"""

    def __init__(self, cache_name: str = "default"):
        """
        初始化缓存对象
        :param cache_name: 缓存配置名称
        """
        self.cache = caches[cache_name]

    def get(self, key: str) -> Any:
        """
        获取缓存值
        :param key: 缓存键
        :return: 缓存的值
        """
        try:
            return self.cache.get(key)
        except Exception as e:
            logger.error(f"缓存获取错误,键[{key}]: {e}")
            return None

    def set(self, key: str, value: Any, timeout: int = 7200) -> bool:
        """
        设置缓存值
        :param key: 缓存键
        :param value: 缓存值
        :param timeout: 过期时间(秒)
        :return: 是否设置成功
        """
        try:
            self.cache.set(key, value, timeout)
            return True
        except Exception as e:
            logger.error(f"缓存设置错误,键[{key}]: {e}")
            return False

    def delete(self, key: str) -> bool:
        """
        删除缓存
        :param key: 缓存键
        :return: 是否删除成功
        """
        try:
            self.cache.delete(key)
            return True
        except Exception as e:
            logger.error(f"缓存删除错误,键[{key}]: {e}")
            return False

    def exists(self, key: str) -> bool:
        """
        检查键是否存在
        :param key: 缓存键
        :return: 是否存在
        """
        return self.cache.get(key) is not None

    def incr(self, key: str, delta: int = 1) -> Optional[int]:
        """
        递增计数器
        :param key: 缓存键
        :param delta: 增量值
        :return: 增加后的值
        """
        try:
            return self.cache.incr(key, delta)
        except Exception as e:
            logger.error(f"计数器递增错误,键[{key}]: {e}")
            return None

    def clear(self) -> bool:
        """
        清空所有缓存
        :return: 是否清空成功
        """
        try:
            self.cache.clear()
            return True
        except Exception as e:
            logger.error(f"缓存清空错误: {e}")
            return False

0
  1. 支付宝打赏

    qrcode alipay
  2. 微信打赏

    qrcode weixin

评论区