生成二维码
多看看开发文档:
https://developers.weixin.qq.com/doc/ 先写到这里,后面没事儿再优化一下
import json
import requests
WeChatDict = {
"token": "https://api.weixin.qq.com/cgi-bin/token",
"qr": "https://api.weixin.qq.com/wxa/getwxacodeunlimit",
"check": "https://api.weixin.qq.com/wxa/checksession",
"reset": "https://api.weixin.qq.com/wxa/resetusersessionkey",
"app_login": "https://api.weixin.qq.com/sns/jscode2session",
"wx_login": "https://open.weixin.qq.com/connect/qrconnect",
"callback": "YOUR_CALLBACK_URL",
}
# 微信开放平台AppID和AppSecret
OPEN_APP_ID = "YOUR_OPEN_APP_ID"
OPEN_APP_SECRET = "YOUR_OPEN_APP_SECRET"
# 微信小程序AppID和AppSecret
WECHAT_APP_ID = "YOUR_WECHAT_APP_ID"
WECHAT_APP_SECRET = "YOUR_WECHAT_APP_SECRETT"
# 微信登录URL
WX_LOGIN_URL = "https://open.weixin.qq.com/connect/qrconnect?appid={appid}&redirect_uri={redirect_uri}&response_type=code&scope={scope}&state={state}"
# 微信登录回调URL
WX_CALLBACK_URL = "https://pyro.affectai.cn"
class WeChatUtils:
"""
微信工具类
"""
@staticmethod
def wechat_login():
"""微信登录"""
pass
def build_login_url(self, redirect_uri, scope):
"""
构造微信登录URL
:param redirect_uri: 回调URL
:param scope: 授权范围
:return: 微信登录URL
"""
params = {
"appid": WECHAT_APP_ID,
"redirect_uri": redirect_uri,
"response_type": "code",
"scope": scope,
"state": "STATE", # 防止 CSRF 攻击
}
query_string = WeChatAppUtils.build_query_string(params)
return f"{WX_LOGIN_URL.format(appid=APP_ID, redirect_uri=redirect_uri, scope=scope, state='STATE')}&{query_string}"
def wx_login(self):
# 构造微信登录URL
wx_login_url = self.build_login_url(WX_CALLBACK_URL, "snsapi_login")
print("微信登录URL:", wx_login_url)
def get_access_token(code):
token_url = ""
params = {
"appid": APP_ID,
"secret": APP_SECRET,
"code": code,
"grant_type": "authorization_code",
}
response = requests.get(token_url, params=params)
return response.json()
def get_user_info(access_token, openid):
userinfo_url = ""
params = {
"access_token": access_token,
"openid": openid,
"lang": "zh_CN",
}
response = requests.get(userinfo_url, params=params)
return response.json()
class WeChatAppUtils:
"""
微信小程序码生成工具
https://developers.weixin.qq.com/miniprogram/dev/OpenApiDoc/
"""
@staticmethod
def get_access_token():
"""
获取微信 access_token
:return: access_token 或 None
"""
params = {
"grant_type": "client_credential",
"appid": WECHAT_APP_ID,
"secret": WECHAT_APP_SECRET,
}
return WeChatAppUtils._make_request(WeChatDict["token"], params)
@staticmethod
def _make_request(url, params, headers=None, method="GET"):
"""
发送HTTP请求
:param url: 请求URL
:param params: 请求参数
:param headers: 请求头
:param method: 请求方法
:return: 响应数据或 None
"""
try:
if method == "GET":
response = requests.get(url, params=params, headers=headers)
else:
response = requests.post(url, headers=headers, data=json.dumps(params))
response.raise_for_status()
return response
except requests.RequestException as e:
print(f"请求失败: {e}")
return None
@staticmethod
def build_query_string(params):
"""
构造查询字符串
:param params: 查询参数
:return: 查询字符串
"""
query_string = ""
for key, value in params.items():
query_string += key + "=" + value + "&"
return query_string.rstrip("&")
@staticmethod
def generate_qrcode(data):
"""
生成微信小程序码
:param data: 需要生成二维码的参数
:return: 二维码内容或 None
"""
access_token = WeChatAppUtils.get_access_token()
if access_token:
access_token = access_token.json().get("access_token")
headers = {"Content-Type": "application/json"}
payload = {
"scene": data, # 最大 32 个可见字符,只支持数字,大小写英文以及部分特殊字符
"check_path": False, # 检查 page 是否存在
"env_version": "release", # 要打开的小程序版本,正式版为 "release",体验版为 "trial",开发版为 "develop"。默认是正式版
# "page": "pages/templates/index/index", # 默认是主页,页面 page,例如 pages/index/index
"width": 430, # 二维码宽度,默认 430px
"auto_color": False, # 自动配置线条颜色
"line_color": {"r": "0", "g": "0", "b": "0"}, # auto_color 为 false 时生效,使用 rgb 设置颜色
"is_hyaline": False, # 是否需要透明底色,默认 false
}
qrcode = WeChatAppUtils._make_request(
f"{WeChatDict['qr']}?access_token={access_token}",
payload,
headers,
method="POST",
)
if qrcode:
# with open("qrcode.png", "wb") as f:
# f.write(qrcode.content)
return qrcode.content
return None
return None
@staticmethod
def app_login(code):
"""
小程序登录
:param code: 小程序登录凭证
:return: 登录信息或 None
"""
params = {
"js_code": code,
"appid": WECHAT_APP_ID,
"secret": WECHAT_APP_SECRET,
"grant_type": "authorization_code",
}
return WeChatAppUtils._make_request(WeChatDict["app_login"], params)
@staticmethod
def check_session(session_key):
"""
检查小程序登录态
:param session_key: 会话密钥
:return: 检查结果或 None
"""
params = {
"access_token": session_key,
"openid": "openid",
"signature": "",
"sig_method": "hmac_sha256",
}
return WeChatAppUtils._make_request(WeChatDict["check"], params)
@staticmethod
def reset_session(session_key):
"""
重置小程序登录态
:param session_key: 会话密钥
:return: 重置结果或 None
"""
params = {
"access_token": session_key,
"openid": "openid",
"signature": "",
"sig_method": "hmac_sha256",
}
return WeChatAppUtils._make_request(WeChatDict["reset"], params)
@staticmethod
def get_session_key(openid):
"""
获取小程序 session_key
:param openid: 用户 openid
:return: session_key 或 None
"""
params = {
"appid": WECHAT_APP_ID,
"secret": WECHAT_APP_SECRET,
"js_code": openid,
"grant_type": "authorization_code",
}
return WeChatAppUtils._make_request("https://api.weixin.qq.com/sns/jscode2session", params)
@staticmethod
def decrypt_data(encrypted_data, iv, session_key):
"""
解密小程序用户数据
:param encrypted_data: 加密数据
:param iv: 加密算法初始向量
:param session_key: 会话密钥
:return: 解密后的数据
"""
from Crypto.Cipher import AES
from base64 import b64decode
try:
session_key = b64decode(session_key)
encrypted_data = b64decode(encrypted_data)
iv = b64decode(iv)
cipher = AES.new(session_key, AES.MODE_CBC, iv)
decrypted = cipher.decrypt(encrypted_data)
padding = decrypted[-1]
decrypted = decrypted[:-padding]
return decrypted.decode("utf-8")
except Exception as e:
print(f"解密数据失败: {e}")
return None
import base64
import io
import json
from datetime import datetime
from django.conf import settings
from django.core.cache import cache
from django.db import transaction
from django.shortcuts import redirect
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.viewsets import ViewSet
from wechatpy.oauth import WeChatOAuth
from apps.tasks.models import Results, Tasks
from apps.users.models import User
from config.settings.base import BUCKET_DICT
from sql.init_wenda import faker
from utils.add_queue import send_message
from utils.basic import XopsResponse
from utils.dfs import MinioDFS
from utils.error import ParamError
from utils.qrcode import WeChatAppUtils
class WeChatViewSet(ViewSet):
"""
微信相关的 API 集合
"""
@action(methods=["GET"], detail=False)
def get_qrcode(self, request):
"""
获取微信小程序的二维码
根据用户提交的task_id,生成当前任务的微信小程序的二维码
"""
task_id = request.query_params.get("task_id")
if not task_id:
raise ParamError("请提供任务ID!")
task = Tasks.objects.filter(id=task_id).first()
if not task:
raise ParamError("任务不存在!")
qrcode = WeChatAppUtils.generate_qrcode(task_id)
if qrcode is None:
raise ParamError("生成二维码失败!")
# 将二进制数据转换为base64字符串
qrcode_base64 = base64.b64encode(qrcode).decode("utf-8")
return Response({"code": 200, "message": "获取二维码成功", "data": qrcode_base64})
@action(methods=["GET"], detail=False)
def wechat_login(self, request):
redirect_uri = "http://pyro.affectai.cn/api/wechat/wechat_callback/"
oauth = WeChatOAuth(settings.WECHAT_APP_ID, settings.WECHAT_APP_SECRET, redirect_uri, scope="snsapi_userinfo")
authorize_url = oauth.authorize_url
return redirect(authorize_url)
@action(methods=["GET"], detail=False)
def wechat_callback(self, request):
code = request.GET.get("code")
oauth = WeChatOAuth(settings.WECHAT_APP_ID, settings.WECHAT_APP_SECRET, "")
oauth.fetch_access_token(code)
user_info = oauth.get_user_info()
openid = user_info["openid"]
print(openid)
@action(methods=["POST"], detail=False)
def wechat_logout(self, request):
"""
微信小程序退出,销毁的时候
"""
pass
@action(methods=["POST"], detail=False)
def bind_openid(self, request):
"""
微信扫码绑定
:param request:
:return:
"""
pass
@action(methods=["GET"], detail=False)
def get_signature(self, request):
"""
获取微信的签名
"""
pass
@action(methods=["POST"], detail=False)
def login(self, request):
"""
小程序登录
通过手机号和OpenID识别用户身份,并生成临时登录凭证
"""
task_id = request.data.get("task_id")
mobile = request.data.get("mobile")
print("=" * 20)
print(task_id, mobile)
print("=" * 20)
# 参数校验
if not all([task_id, mobile]):
raise ParamError("请提供完整的登录信息")
# 查询任务和用户
task = Tasks.objects.filter(id=task_id).first()
user = User.objects.filter(phone=mobile).first()
if not task or not user:
raise ParamError("任务或用户不存在")
# 生成登录凭证
uuid_cache_key = f"task-{task_id}-{user.phone}"
cache_data = cache.get(uuid_cache_key)
if cache_data:
return XopsResponse(data=cache_data})
login_data = {"task_id": task_id, "uuid": faker.uuid4(), "date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
cache.set(uuid_cache_key, login_data, 60 * 60)
return XopsResponse(data=login_data)
@action(methods=["POST"], detail=False)
def uploadfile(self, request):
"""
接收微信上传的文件并保存到MinIO
"""
data = request.POST
file = request.FILES.get("file")
required_fields = ["uuid", "mobile", "task_id", "filename"]
# 参数校验
if not file or not all(data.get(field) for field in required_fields):
raise ParamError("缺少必要参数")
# 查询用户和任务
user = User.objects.filter(phone=data["mobile"]).first()
task = Tasks.objects.filter(id=data["task_id"]).first()
if not user or not task:
raise ParamError("用户或任务不存在")
# 验证登录状态
uuid_cache_key = f"task-{data['task_id']}-{user.phone}"
cache_data = cache.get(uuid_cache_key)
if not cache_data or cache_data.get("uuid") != data["uuid"]:
raise ParamError("请重新登录")
# 上传文件
suffix = file.name.split(".")[-1]
date = cache_data.get("date")
folder_path = f"{data['task_id']}/{data['mobile']}/{date}/{data['filename']}.{suffix}"
path = MinioDFS().upload(BUCKET_DICT.get("result"), file, folder_path)
return XopsResponse(data={"info": "上传成功", "path": path})
@action(methods=["POST"], detail=False)
def submit_data(self, request):
"""
接收并保存微信端上传的量表数据
"""
required_fields = ["uuid", "mobile", "task_id", "status"]
data = request.data
# 参数校验
if not all(data.get(field) for field in required_fields):
raise ParamError("缺少必要参数")
# 查询用户和任务
user = User.objects.get(phone=data["mobile"])
task = Tasks.objects.get(id=data["task_id"])
# 验证登录状态
uuid_cache_key = f"task-{data['task_id']}-{user.phone}"
cache_data = cache.get(uuid_cache_key)
if not cache_data or cache_data.get("uuid") != data["uuid"]:
raise ParamError("请先登录")
# 处理任务结束
if data["status"] == "end":
folder_path = f"{data['task_id']}/{data['mobile']}/{cache_data.get('date')}/"
end_stream = io.BytesIO(b"task completed")
MinioDFS().upload_data(BUCKET_DICT.get("result"), end_stream, "end", folder_path)
# 删除缓存数据
cache.delete(uuid_cache_key)
# 发送推送消息
send_message(
{
"task_id": data["task_id"],
"phone": data["mobile"],
"date": cache_data.get("date"),
"folder_path": folder_path,
}
)
return XopsResponse(data={"info": "任务已完成"})
# 保存量表数据
if not data.get("data"):
raise ParamError("缺少量表数据")
date = cache_data.get("date")
folder_path = f"{data['task_id']}/{data['mobile']}/{date}/"
data_bytes = json.dumps(data["data"]).encode("utf-8")
data_stream = io.BytesIO(data_bytes)
path = MinioDFS().upload_data(BUCKET_DICT.get("result"), data_stream, "result.json", folder_path)
return XopsResponse(data={"info": "上传成功", "path": path})
@transaction.atomic
@action(methods=["POST"], detail=False)
def submit_person_info(self, request):
"""
保存用户个人信息和测试结果
"""
data = request.data
required_fields = ["uuid", "mobile", "task_id", "result"]
# 参数校验
if not all(data.get(field) for field in required_fields):
raise ParamError("缺少必要参数")
# 查询用户和任务
user = User.objects.get(phone=data["mobile"])
task = Tasks.objects.get(id=data["task_id"])
# 验证登录状态
uuid_cache_key = f"task-{data['task_id']}-{user.phone}"
cache_data = cache.get(uuid_cache_key)
if not cache_data or cache_data.get("uuid") != data["uuid"]:
raise ParamError("请先登录")
# 构建个人信息
result_dict = {
"姓名": user.nick_name,
"ID编号": user.username,
"手机号": user.phone,
"时间": cache_data.get("date"),
"年龄": user.age,
"结果": data["result"],
}
# 保存测试结果
personality = data["result"]["personality"]
result = Results.objects.create(
depression=data["result"]["depression"],
anxiety=data["result"]["anxiety"],
bipolar_disorder=data["result"]["bipolar_disorder"]
)
result.task.add(task)
result.executor.add(user)
# 保存个人信息
folder_path = f"{data['task_id']}/{data['mobile']}/{cache_data.get('date')}/"
data_bytes = json.dumps(result_dict).encode("utf-8")
data_stream = io.BytesIO(data_bytes)
path = MinioDFS().upload_data(BUCKET_DICT.get("result"), data_stream, "person_info.json", folder_path)
return XopsResponse(data={"info": "上传成功", "path": path})
评论区