Excel工具
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional, Union
import pandas as pd
from django.http.response import HttpResponse
from openpyxl import Workbook
from openpyxl.styles import Alignment, Font, PatternFill
from openpyxl.utils import get_column_letter
logger = logging.getLogger(__name__)
# 获取表格单元格的类型并返回字符类型
def get_cell_value(cell: Any) -> str:
"""
获取单元格的值并转换为字符串
:param cell: 单元格对象
:return: 字符串格式的单元格值
"""
if isinstance(cell, (int, float)):
return str(int(cell)).strip()
return str(cell).strip()
# action获取通用设置更新人和更新时间
def get_update_params(request, is_create: bool = False) -> Dict:
"""
获取更新参数
:param request: 请求对象
:param is_create: 是否为创建操作
:return: 包含更新信息的字典
"""
now = datetime.now()
params = {"update_by": request.user, "update_date": now}
if is_create:
params.update({"create_by": request.user, "create_date": now})
return params
# 基于 pandas DataFrame 下载 Excel
def pandas_download_excel(
data: Union[List[Dict], pd.DataFrame], filename: str = None, sheet_name: str = "Sheet1", index: bool = False
) -> HttpResponse:
"""
使用pandas导出Excel文件
:param data: 要导出的数据
:param filename: 文件名
:param sheet_name: 工作表名称
:param index: 是否包含索引
:return: HTTP响应对象
"""
try:
if not isinstance(data, pd.DataFrame):
df = pd.DataFrame(data)
else:
df = data
response = HttpResponse(content_type="application/vnd.ms-excel")
if filename:
response["Content-Disposition"] = f'attachment; filename="{filename}.xlsx"'
with pd.ExcelWriter(response, engine="openpyxl") as writer:
df.to_excel(writer, index=index, sheet_name=sheet_name)
return response
except Exception as e:
logger.error(f"导出Excel失败: {str(e)}")
raise
def read_excel(
file_path: str,
sheet_name: Optional[Union[str, int]] = 0,
skiprows: int = 0,
usecols: Optional[List] = None,
) -> pd.DataFrame:
"""
读取Excel文件
:param file_path: Excel文件路径
:param sheet_name: 工作表名称或索引
:param skiprows: 跳过的行数
:param usecols: 使用的列
:return: DataFrame对象
"""
try:
return pd.read_excel(file_path, sheet_name=sheet_name, skiprows=skiprows, usecols=usecols)
except Exception as e:
logger.error(f"读取Excel失败: {str(e)}")
raise
def create_styled_excel(
data: List[Dict], headers: List[str], filename: str, sheet_name: str = "Sheet1"
) -> HttpResponse:
"""
创建带样式的Excel文件
:param data: 数据列表
:param headers: 表头列表
:param filename: 文件名
:param sheet_name: 工作表名称
:return: HTTP响应对象
"""
wb = Workbook()
ws = wb.active
ws.title = sheet_name
# 设置表头样式
header_fill = PatternFill(start_color="CCCCCC", end_color="CCCCCC", fill_type="solid")
header_font = Font(bold=True)
header_alignment = Alignment(horizontal="center", vertical="center")
# 写入表头
for col, header in enumerate(headers, 1):
cell = ws.cell(row=1, column=col, value=header)
cell.fill = header_fill
cell.font = header_font
cell.alignment = header_alignment
# 写入数据
for row, item in enumerate(data, 2):
for col, key in enumerate(headers, 1):
cell = ws.cell(row=row, column=col, value=item.get(key, ""))
cell.alignment = Alignment(horizontal="center", vertical="center")
# 调整列宽
for col in range(1, len(headers) + 1):
ws.column_dimensions[get_column_letter(col)].width = 15
response = HttpResponse(content_type="application/vnd.ms-excel")
response["Content-Disposition"] = f'attachment; filename="{filename}.xlsx"'
wb.save(response)
return response
def merge_excel_files(file_paths: List[str], output_path: str) -> None:
"""
合并多个Excel文件
:param file_paths: Excel文件路径列表
:param output_path: 输出文件路径
"""
try:
dfs = []
for file_path in file_paths:
df = pd.read_excel(file_path)
dfs.append(df)
merged_df = pd.concat(dfs, ignore_index=True)
merged_df.to_excel(output_path, index=False)
except Exception as e:
logger.error(f"合并Excel文件失败: {str(e)}")
raise
import string
import xlsxwriter
def export_to_excel(filename, col_items, datas):
"""将信息导出为excel文件并返回给前端
Args:
filename (str): 文件名
col_items (list): 列名
datas (list): 数据信息
Returns:
HttpResponse: 包含excel文件的响应对象
"""
import io
# 创建一个内存中的文件对象
output = io.BytesIO()
# 生成.xlsx文件
workbook = xlsxwriter.Workbook(output, {"in_memory": True})
# 设置sheet页签名称
table = workbook.add_worksheet(filename)
# 定义格式
header_format = workbook.add_format(
{
"align": "center",
"bg_color": "gray",
"color": "white",
"font": "宋体",
"bold": True,
"border": 1,
}
)
data_format = workbook.add_format({"align": "center", "border": 1, "font_name": "Calibri Light"})
# 设置列名及宽度
for idx, (col_name, col_width) in enumerate(col_items):
col_code = string.ascii_uppercase[idx]
table.write(0, idx, col_name, header_format)
table.set_column(f"{col_code}:{col_code}", col_width)
# 写入数据
for row, item in enumerate(datas, start=1):
for col, value in enumerate(item):
table.write(row, col, value, data_format)
# 关闭工作簿
workbook.close()
# 将文件指针移动到开始位置
output.seek(0)
# 创建HttpResponse对象并返回
response = HttpResponse(output, content_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
# response["Content-Disposition"] = f"attachment; filename={filename}.xlsx"
return response
if __name__ == "__main__":
# 构造数据
faker_obj = Faker(locale="zh")
# 文件名
filename = "人员名单"
# 列名
cols = [("序号", 10), ("姓名", 20)]
# 构造数据
datas = []
for i in range(10):
datas.append((i + 1, faker_obj.name()))
# 将数据信息导出到excel文件中
export_to_excel(filename, cols, datas)
import time
from openpyxl import Workbook
from django.http import HttpResponse
from myapp.models import User
def export_data(request):
# 记录开始时间
start_time = time.time()
# 获取数据
data = User.objects.all()[:10000]
# 创建 Excel 文件
wb = Workbook()
ws = wb.active
ws.append(['姓名', '学号', '性别', '手机'])
# 流式写入数据
for user in data.iterator(): # 使用 iterator 避免一次性加载所有数据
ws.append([user.nick_name, user.username, user.gender, user.phone])
# 将文件作为 HTTP 响应返回
response = HttpResponse(content_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
response['Content-Disposition'] = 'attachment; filename=users.xlsx'
wb.save(response)
# 记录结束时间并输出
end_time = time.time()
print(f"Excel 写入并返回耗时:{end_time - start_time:.2f} 秒")
return response
下面这个方法,下载数据倒也挺快的,不过不好抽取出来,后面看情况封装
如果有优化,就再添加进来
def download(self, request):
"""下载数据"""
self.filter_users(request)
wb = Workbook()
ws = wb.active
ws.append(["姓名", "学号", "性别", "手机", "专业", "班级"])
# 流式写入数据
for user in self.queryset.iterator():
ws.append(
[
user.nick_name,
user.username,
"男" if user.gender == 1 else "女",
user.phone,
user.dept.pid.name if user.dept.pid else None,
user.dept.name,
]
)
response = HttpResponse(content_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
wb.save(response)
return response
驼峰命名
import inflection
def convert_dict_keys_to_camel_case(data):
"""
将字典中的键转换为驼峰命名法
:param data: 字典数据
:return: 转换后的字典数据
"""
# 判断数据类型是否为字典
if isinstance(data, dict):
new_data = {}
for k, v in data.items():
# 仅在需要时转换键
new_key = inflection.camelize(k, uppercase_first_letter=False) if '_' in k else k
# 递归转换字典中的值
new_data[new_key] = convert_dict_keys_to_camel_case(v)
return new_data
# 判断数据类型是否为列表
elif isinstance(data, list):
# 递归转换列表中的每一项
return [convert_dict_keys_to_camel_case(item) for item in data]
else:
# 返回原始数据
return data
Redis缓存
import hashlib
import json
import logging
from typing import Any, Optional
from django.core.cache import caches
logger = logging.getLogger(__name__)
class FastJsonRedisSerializer:
"""自定义 Redis 序列化器,使用 JSON 序列化和反序列化对象"""
def serialize(self, obj: Any) -> Optional[bytes]:
"""
序列化对象为 JSON 字符串
:param obj: 要序列化的对象
:return: 序列化后的字节串
"""
if obj is None:
return None
return json.dumps(obj, default=str).encode("utf-8")
def deserialize(self, data: Optional[bytes]) -> Any:
"""
反序列化 JSON 字符串为对象
:param data: 要反序列化的字节串
:return: 反序列化后的对象
"""
if not data:
return None
return json.loads(data.decode("utf-8"))
class StringRedisSerializer:
"""字符串类型 Redis 键的序列化器"""
def serialize(self, obj: str) -> bytes:
"""序列化字符串为字节数组"""
return json.dumps(obj).encode("utf-8")
def deserialize(self, data: Optional[bytes]) -> Optional[str]:
"""反序列化字节数组为字符串"""
return data.decode("utf-8") if data else None
def generate_cache_key(target: Any, method: Any, *params: Any) -> str:
"""
生成缓存键,使用 SHA-256 哈希
:param target: 目标对象
:param method: 方法对象
:param params: 参数列表
:return: 哈希后的缓存键
"""
container = {
"class": str(target.__class__.__name__),
"methodName": method.__name__,
"package": target.__module__,
"params": params,
}
json_string = json.dumps(container, default=str)
return hashlib.sha256(json_string.encode("utf-8")).hexdigest()
class CustomCache:
"""自定义缓存类,使用 Django 的缓存系统,封装常用的缓存操作"""
def __init__(self, cache_name: str = "default"):
"""
初始化缓存对象
:param cache_name: 缓存配置名称
"""
self.cache = caches[cache_name]
def get(self, key: str) -> Any:
"""
获取缓存值
:param key: 缓存键
:return: 缓存的值
"""
try:
return self.cache.get(key)
except Exception as e:
logger.error(f"缓存获取错误,键[{key}]: {e}")
return None
def set(self, key: str, value: Any, timeout: int = 7200) -> bool:
"""
设置缓存值
:param key: 缓存键
:param value: 缓存值
:param timeout: 过期时间(秒)
:return: 是否设置成功
"""
try:
self.cache.set(key, value, timeout)
return True
except Exception as e:
logger.error(f"缓存设置错误,键[{key}]: {e}")
return False
def delete(self, key: str) -> bool:
"""
删除缓存
:param key: 缓存键
:return: 是否删除成功
"""
try:
self.cache.delete(key)
return True
except Exception as e:
logger.error(f"缓存删除错误,键[{key}]: {e}")
return False
def exists(self, key: str) -> bool:
"""
检查键是否存在
:param key: 缓存键
:return: 是否存在
"""
return self.cache.get(key) is not None
def incr(self, key: str, delta: int = 1) -> Optional[int]:
"""
递增计数器
:param key: 缓存键
:param delta: 增量值
:return: 增加后的值
"""
try:
return self.cache.incr(key, delta)
except Exception as e:
logger.error(f"计数器递增错误,键[{key}]: {e}")
return None
def clear(self) -> bool:
"""
清空所有缓存
:return: 是否清空成功
"""
try:
self.cache.clear()
return True
except Exception as e:
logger.error(f"缓存清空错误: {e}")
return False
评论区