" name="sm-site-verification"/>
侧边栏壁纸
博主头像
PySuper 博主等级

千里之行,始于足下

  • 累计撰写 235 篇文章
  • 累计创建 15 个标签
  • 累计收到 2 条评论

目 录CONTENT

文章目录
Web

WeChat 二维码--小程序

PySuper
2024-12-14 / 0 评论 / 0 点赞 / 25 阅读 / 0 字
温馨提示:
本文最后更新于2024-12-14,若内容或图片失效,请留言反馈。 所有牛逼的人都有一段苦逼的岁月。 但是你只要像SB一样去坚持,终将牛逼!!! ✊✊✊

生成二维码

多看看开发文档:https://developers.weixin.qq.com/doc/

先写到这里,后面没事儿再优化一下

import json

import requests


WeChatDict = {
    "token": "https://api.weixin.qq.com/cgi-bin/token",
    "qr": "https://api.weixin.qq.com/wxa/getwxacodeunlimit",
    "check": "https://api.weixin.qq.com/wxa/checksession",
    "reset": "https://api.weixin.qq.com/wxa/resetusersessionkey",
    "app_login": "https://api.weixin.qq.com/sns/jscode2session",
    "wx_login": "https://open.weixin.qq.com/connect/qrconnect",
    "callback": "YOUR_CALLBACK_URL",
}

# 微信开放平台AppID和AppSecret
OPEN_APP_ID = "YOUR_OPEN_APP_ID"
OPEN_APP_SECRET = "YOUR_OPEN_APP_SECRET"

# 微信小程序AppID和AppSecret
WECHAT_APP_ID = "YOUR_WECHAT_APP_ID"
WECHAT_APP_SECRET = "YOUR_WECHAT_APP_SECRETT"

# 微信登录URL
WX_LOGIN_URL = "https://open.weixin.qq.com/connect/qrconnect?appid={appid}&redirect_uri={redirect_uri}&response_type=code&scope={scope}&state={state}"

# 微信登录回调URL
WX_CALLBACK_URL = "https://pyro.affectai.cn"


class WeChatUtils:
    """
    微信工具类
    """

    @staticmethod
    def wechat_login():
        """微信登录"""
        pass

    def build_login_url(self, redirect_uri, scope):
        """
        构造微信登录URL
        :param redirect_uri: 回调URL
        :param scope: 授权范围
        :return: 微信登录URL
        """
        params = {
            "appid": WECHAT_APP_ID,
            "redirect_uri": redirect_uri,
            "response_type": "code",
            "scope": scope,
            "state": "STATE",  # 防止 CSRF 攻击
        }
        query_string = WeChatAppUtils.build_query_string(params)
        return f"{WX_LOGIN_URL.format(appid=APP_ID, redirect_uri=redirect_uri, scope=scope, state='STATE')}&{query_string}"

    def wx_login(self):
        # 构造微信登录URL
        wx_login_url = self.build_login_url(WX_CALLBACK_URL, "snsapi_login")
        print("微信登录URL:", wx_login_url)

    def get_access_token(code):
        token_url = ""
        params = {
            "appid": APP_ID,
            "secret": APP_SECRET,
            "code": code,
            "grant_type": "authorization_code",
        }
        response = requests.get(token_url, params=params)
        return response.json()

    def get_user_info(access_token, openid):
        userinfo_url = ""
        params = {
            "access_token": access_token,
            "openid": openid,
            "lang": "zh_CN",
        }
        response = requests.get(userinfo_url, params=params)
        return response.json()


class WeChatAppUtils:
    """
    微信小程序码生成工具
    https://developers.weixin.qq.com/miniprogram/dev/OpenApiDoc/
    """

    @staticmethod
    def get_access_token():
        """
        获取微信 access_token
        :return: access_token 或 None
        """
        params = {
            "grant_type": "client_credential",
            "appid": WECHAT_APP_ID,
            "secret": WECHAT_APP_SECRET,
        }
        return WeChatAppUtils._make_request(WeChatDict["token"], params)

    @staticmethod
    def _make_request(url, params, headers=None, method="GET"):
        """
        发送HTTP请求
        :param url: 请求URL
        :param params: 请求参数
        :param headers: 请求头
        :param method: 请求方法
        :return: 响应数据或 None
        """
        try:
            if method == "GET":
                response = requests.get(url, params=params, headers=headers)
            else:
                response = requests.post(url, headers=headers, data=json.dumps(params))
            response.raise_for_status()
            return response
        except requests.RequestException as e:
            print(f"请求失败: {e}")
            return None

    @staticmethod
    def build_query_string(params):
        """
        构造查询字符串
        :param params: 查询参数
        :return: 查询字符串
        """
        query_string = ""
        for key, value in params.items():
            query_string += key + "=" + value + "&"
        return query_string.rstrip("&")

    @staticmethod
    def generate_qrcode(data):
        """
        生成微信小程序码
        :param data: 需要生成二维码的参数
        :return: 二维码内容或 None
        """
        access_token = WeChatAppUtils.get_access_token()
        if access_token:
            access_token = access_token.json().get("access_token")
            headers = {"Content-Type": "application/json"}
            payload = {
                "scene": data,  # 最大 32 个可见字符,只支持数字,大小写英文以及部分特殊字符
                "check_path": False,  # 检查 page 是否存在
                "env_version": "release",  # 要打开的小程序版本,正式版为 "release",体验版为 "trial",开发版为 "develop"。默认是正式版
                # "page": "pages/templates/index/index",  # 默认是主页,页面 page,例如 pages/index/index
                "width": 430,  # 二维码宽度,默认 430px
                "auto_color": False,  # 自动配置线条颜色
                "line_color": {"r": "0", "g": "0", "b": "0"},  # auto_color 为 false 时生效,使用 rgb 设置颜色
                "is_hyaline": False,  # 是否需要透明底色,默认 false
            }
            qrcode = WeChatAppUtils._make_request(
                f"{WeChatDict['qr']}?access_token={access_token}",
                payload,
                headers,
                method="POST",
            )

            if qrcode:
                # with open("qrcode.png", "wb") as f:
                #     f.write(qrcode.content)
                return qrcode.content
            return None
        return None

    @staticmethod
    def app_login(code):
        """
        小程序登录
        :param code: 小程序登录凭证
        :return: 登录信息或 None
        """
        params = {
            "js_code": code,
            "appid": WECHAT_APP_ID,
            "secret": WECHAT_APP_SECRET,
            "grant_type": "authorization_code",
        }
        return WeChatAppUtils._make_request(WeChatDict["app_login"], params)

    @staticmethod
    def check_session(session_key):
        """
        检查小程序登录态
        :param session_key: 会话密钥
        :return: 检查结果或 None
        """
        params = {
            "access_token": session_key,
            "openid": "openid",
            "signature": "",
            "sig_method": "hmac_sha256",
        }
        return WeChatAppUtils._make_request(WeChatDict["check"], params)

    @staticmethod
    def reset_session(session_key):
        """
        重置小程序登录态
        :param session_key: 会话密钥
        :return: 重置结果或 None
        """
        params = {
            "access_token": session_key,
            "openid": "openid",
            "signature": "",
            "sig_method": "hmac_sha256",
        }
        return WeChatAppUtils._make_request(WeChatDict["reset"], params)

    @staticmethod
    def get_session_key(openid):
        """
        获取小程序 session_key
        :param openid: 用户 openid
        :return: session_key 或 None
        """
        params = {
            "appid": WECHAT_APP_ID,
            "secret": WECHAT_APP_SECRET,
            "js_code": openid,
            "grant_type": "authorization_code",
        }
        return WeChatAppUtils._make_request("https://api.weixin.qq.com/sns/jscode2session", params)

    @staticmethod
    def decrypt_data(encrypted_data, iv, session_key):
        """
        解密小程序用户数据
        :param encrypted_data: 加密数据
        :param iv: 加密算法初始向量
        :param session_key: 会话密钥
        :return: 解密后的数据
        """
        from Crypto.Cipher import AES
        from base64 import b64decode

        try:
            session_key = b64decode(session_key)
            encrypted_data = b64decode(encrypted_data)
            iv = b64decode(iv)

            cipher = AES.new(session_key, AES.MODE_CBC, iv)
            decrypted = cipher.decrypt(encrypted_data)

            padding = decrypted[-1]
            decrypted = decrypted[:-padding]

            return decrypted.decode("utf-8")
        except Exception as e:
            print(f"解密数据失败: {e}")
            return None

import base64
import io
import json
from datetime import datetime

from django.conf import settings
from django.core.cache import cache
from django.db import transaction
from django.shortcuts import redirect
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.viewsets import ViewSet
from wechatpy.oauth import WeChatOAuth

from apps.tasks.models import Results, Tasks
from apps.users.models import User
from config.settings.base import BUCKET_DICT
from sql.init_wenda import faker
from utils.add_queue import send_message
from utils.basic import XopsResponse
from utils.dfs import MinioDFS
from utils.error import ParamError
from utils.qrcode import WeChatAppUtils


class WeChatViewSet(ViewSet):
    """
    微信相关的 API 集合
    """

    @action(methods=["GET"], detail=False)
    def get_qrcode(self, request):
        """
        获取微信小程序的二维码
        根据用户提交的task_id,生成当前任务的微信小程序的二维码
        """
        task_id = request.query_params.get("task_id")
        if not task_id:
            raise ParamError("请提供任务ID!")
        task = Tasks.objects.filter(id=task_id).first()
        if not task:
            raise ParamError("任务不存在!")

        qrcode = WeChatAppUtils.generate_qrcode(task_id)
        if qrcode is None:
            raise ParamError("生成二维码失败!")

        # 将二进制数据转换为base64字符串
        qrcode_base64 = base64.b64encode(qrcode).decode("utf-8")
        return Response({"code": 200, "message": "获取二维码成功", "data": qrcode_base64})

    @action(methods=["GET"], detail=False)
    def wechat_login(self, request):
        redirect_uri = "http://pyro.affectai.cn/api/wechat/wechat_callback/"
        oauth = WeChatOAuth(settings.WECHAT_APP_ID, settings.WECHAT_APP_SECRET, redirect_uri, scope="snsapi_userinfo")
        authorize_url = oauth.authorize_url
        return redirect(authorize_url)

    @action(methods=["GET"], detail=False)
    def wechat_callback(self, request):
        code = request.GET.get("code")
        oauth = WeChatOAuth(settings.WECHAT_APP_ID, settings.WECHAT_APP_SECRET, "")
        oauth.fetch_access_token(code)
        user_info = oauth.get_user_info()
        openid = user_info["openid"]
        print(openid)

    @action(methods=["POST"], detail=False)
    def wechat_logout(self, request):
        """
        微信小程序退出,销毁的时候
        """
        pass

    @action(methods=["POST"], detail=False)
    def bind_openid(self, request):
        """
        微信扫码绑定
        :param request:
        :return:
        """
        pass

    @action(methods=["GET"], detail=False)
    def get_signature(self, request):
        """
        获取微信的签名
        """
        pass

    @action(methods=["POST"], detail=False)
    def login(self, request):
        """
        小程序登录
        通过手机号和OpenID识别用户身份,并生成临时登录凭证
        """
        task_id = request.data.get("task_id")
        mobile = request.data.get("mobile")
        print("=" * 20)
        print(task_id, mobile)
        print("=" * 20)

        # 参数校验
        if not all([task_id, mobile]):
            raise ParamError("请提供完整的登录信息")

        # 查询任务和用户
        task = Tasks.objects.filter(id=task_id).first()
        user = User.objects.filter(phone=mobile).first()
        if not task or not user:
            raise ParamError("任务或用户不存在")

        # 生成登录凭证
        uuid_cache_key = f"task-{task_id}-{user.phone}"
        cache_data = cache.get(uuid_cache_key)
        if cache_data:
            return XopsResponse(data=cache_data})

        login_data = {"task_id": task_id, "uuid": faker.uuid4(), "date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
        cache.set(uuid_cache_key, login_data, 60 * 60)
        return XopsResponse(data=login_data)

    @action(methods=["POST"], detail=False)
    def uploadfile(self, request):
        """
        接收微信上传的文件并保存到MinIO
        """
        data = request.POST
        file = request.FILES.get("file")
        required_fields = ["uuid", "mobile", "task_id", "filename"]

        # 参数校验
        if not file or not all(data.get(field) for field in required_fields):
            raise ParamError("缺少必要参数")

        # 查询用户和任务
        user = User.objects.filter(phone=data["mobile"]).first()
        task = Tasks.objects.filter(id=data["task_id"]).first()
        if not user or not task:
            raise ParamError("用户或任务不存在")

        # 验证登录状态
        uuid_cache_key = f"task-{data['task_id']}-{user.phone}"
        cache_data = cache.get(uuid_cache_key)
        if not cache_data or cache_data.get("uuid") != data["uuid"]:
            raise ParamError("请重新登录")

        # 上传文件
        suffix = file.name.split(".")[-1]
        date = cache_data.get("date")
        folder_path = f"{data['task_id']}/{data['mobile']}/{date}/{data['filename']}.{suffix}"
        path = MinioDFS().upload(BUCKET_DICT.get("result"), file, folder_path)
        return XopsResponse(data={"info": "上传成功", "path": path})

    @action(methods=["POST"], detail=False)
    def submit_data(self, request):
        """
        接收并保存微信端上传的量表数据
        """
        required_fields = ["uuid", "mobile", "task_id", "status"]
        data = request.data

        # 参数校验
        if not all(data.get(field) for field in required_fields):
            raise ParamError("缺少必要参数")

        # 查询用户和任务
        user = User.objects.get(phone=data["mobile"])
        task = Tasks.objects.get(id=data["task_id"])

        # 验证登录状态
        uuid_cache_key = f"task-{data['task_id']}-{user.phone}"
        cache_data = cache.get(uuid_cache_key)
        if not cache_data or cache_data.get("uuid") != data["uuid"]:
            raise ParamError("请先登录")

        # 处理任务结束
        if data["status"] == "end":
            folder_path = f"{data['task_id']}/{data['mobile']}/{cache_data.get('date')}/"
            end_stream = io.BytesIO(b"task completed")
            MinioDFS().upload_data(BUCKET_DICT.get("result"), end_stream, "end", folder_path)
            # 删除缓存数据
            cache.delete(uuid_cache_key)
            # 发送推送消息
            send_message(
                {
                    "task_id": data["task_id"],
                    "phone": data["mobile"],
                    "date": cache_data.get("date"),
                    "folder_path": folder_path,
                }
            )
            return XopsResponse(data={"info": "任务已完成"})

        # 保存量表数据
        if not data.get("data"):
            raise ParamError("缺少量表数据")

        date = cache_data.get("date")
        folder_path = f"{data['task_id']}/{data['mobile']}/{date}/"
        data_bytes = json.dumps(data["data"]).encode("utf-8")
        data_stream = io.BytesIO(data_bytes)
        path = MinioDFS().upload_data(BUCKET_DICT.get("result"), data_stream, "result.json", folder_path)
        return XopsResponse(data={"info": "上传成功", "path": path})

    @transaction.atomic
    @action(methods=["POST"], detail=False)
    def submit_person_info(self, request):
        """
        保存用户个人信息和测试结果
        """
        data = request.data
        required_fields = ["uuid", "mobile", "task_id", "result"]

        # 参数校验
        if not all(data.get(field) for field in required_fields):
            raise ParamError("缺少必要参数")

        # 查询用户和任务
        user = User.objects.get(phone=data["mobile"])
        task = Tasks.objects.get(id=data["task_id"])

        # 验证登录状态
        uuid_cache_key = f"task-{data['task_id']}-{user.phone}"
        cache_data = cache.get(uuid_cache_key)
        if not cache_data or cache_data.get("uuid") != data["uuid"]:
            raise ParamError("请先登录")

        # 构建个人信息
        result_dict = {
            "姓名": user.nick_name,
            "ID编号": user.username,
            "手机号": user.phone,
            "时间": cache_data.get("date"),
            "年龄": user.age,
            "结果": data["result"],
        }

        # 保存测试结果
        personality = data["result"]["personality"]
        result = Results.objects.create(
            depression=data["result"]["depression"],
            anxiety=data["result"]["anxiety"],
            bipolar_disorder=data["result"]["bipolar_disorder"]
        )
        result.task.add(task)
        result.executor.add(user)

        # 保存个人信息
        folder_path = f"{data['task_id']}/{data['mobile']}/{cache_data.get('date')}/"
        data_bytes = json.dumps(result_dict).encode("utf-8")
        data_stream = io.BytesIO(data_bytes)
        path = MinioDFS().upload_data(BUCKET_DICT.get("result"), data_stream, "person_info.json", folder_path)
        return XopsResponse(data={"info": "上传成功", "path": path})

0

评论区