" name="sm-site-verification"/>
侧边栏壁纸
博主头像
PySuper 博主等级

千里之行,始于足下

  • 累计撰写 206 篇文章
  • 累计创建 14 个标签
  • 累计收到 2 条评论

目 录CONTENT

文章目录

Python 爬虫 | 获取装备前线发车数据

PySuper
2021-10-26 / 0 评论 / 0 点赞 / 13 阅读 / 0 字
温馨提示:
本文最后更新于2024-05-28,若内容或图片失效,请留言反馈。 所有牛逼的人都有一段苦逼的岁月。 但是你只要像SB一样去坚持,终将牛逼!!! ✊✊✊

爬虫有风险,抓数需谨慎!

分析

1、解析网页

  • 1、找到指定页面
  • 2、把页面拉到最下面,查看获取数据的请求
  • 3、F12审查元素,检查请求,并清空之前的请求
  • 4、点击XHR按钮,筛选Ajax请求
  • 5、当我们点击更早的发车按钮时,可以看到左侧有个请求被发送了
  • 6、点击请求,我们可以看到response是json类型的数据
k8s-ns-1.png

2、解析数据

  • 建议直接使用json解析工具,查看json数据结构
  • 通过json数据,我们可以看到:数据都在data-->groups-->list
  • 直接遍历这个list数组就可以拿到数据了
k8s-ns-1.png

3、问题解析

3.1、代理

3.2、随机UA

  • 这里我给大家一个文件,以后直接从这个文件中导入UA_LIST就可以了:点击查看
  • 使用"User-Agent": choice(UA_LIST)实现随机UA

3.3、页面结构变化

  • 在不断获取数据的过程中,有时候页面结构发生变化,这应该是该网站的技术变更导致的
  • 我们在代码中,再添加一段网页解析+数据解析的代码就好了
  • 使用try...except...if...else...returncontinuebreak

3.4、加速获取

这里并不建议太快的获取数据

更快的速度获取,在我们开启爬虫的时候,对网站后台的压力会骤增,可能会导致网站崩溃

  • 多任务:多进程多线程多协程
  • 使用asyncaiphttpcelery
  • 框架:scrapy

3.5、安装lxml

如果使用其他的解析方式,如:xpath,需要安装lxml,但是windows下安装lxml还有些烦

源码

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# FileName :zfrontier.py
# Author   :zheng xingtao
# Date     :2021/10/16 20:21


import copy
import json
import os.path
import re
import time
import urllib.request
from random import choice
from urllib.request import Request

import requests

from utils import IP_LIST, UA_LIST


class Zfrontier:
    def __init__(self):
        """
        初始化,准备变量,执行main()函数
        """
        self.items = []
        self.save_path = "zf"
        self.web_url = "https://www.zfrontier.com"
        self.token = "16346058898c07b7d010bcec07c093526984ff32"
        self.detail_url = "https://www.zfrontier.com/v2/mch/info"
        self.start_url = "https://www.zfrontier.com/api/circle/calendar"
        self.post_data = {"time": f"{round(time.time())}", "t": "8e8168180c57d703859d68a0b4748f63"}
        self.main()

    def html(self, url, post_data):
        """
        获取html/json数据,自动重连三次
        :param url:分类页面url
        :param post_data: POST请求需要提交的数据
        :return: html文档
        """
        i = 0
        while i < 3:
            try:
                response = requests.post(
                    url,
                    proxies=choice(IP_LIST),
                    headers={"X-CSRF-TOKEN": self.token, "User-Agent": choice(UA_LIST)},
                    data=post_data,
                    timeout=2
                )
                if response.status_code == 200:
                    response.encoding = response.apparent_encoding
                    return response.text
            except requests.exceptions.RequestException:
                i += 1
        return None

    def category(self, data):
        """
        从分类页面的json数据中获取有效数据
        :param data: 分类页面中的json数据
        :return:使用yield返回Dict数据
        """
        data_json = json.loads(data)
        for item in data_json["data"]["groups"]:
            for good in item["list"]:
                yield {
                    "time": item["title"],
                    "id": good["id"],
                    "title": good["title"],
                    "url": self.web_url + good["url"],
                    "status": good["status"],
                    "cover": good["cover"]
                }

    @staticmethod
    def detail(detail_html):
        """
        从json数据中获取需要的数据,直接返回
        :param detail_html: 详情页的json数据
        :return: json中的有用数据
        """
        data = detail_html["data"]
        return {
            "covers": data["covers"],
            "shop_info": data["ship_info"]
        }

    def save_info(self, good):
        """
        在main函数中进行迭代,这里针对一个good处理
        :param good: 单个商品信息
        :return: 保存数据的状态
        """
        for img_url in good["covers"]:

            proxy_support = urllib.request.ProxyHandler({'sock5': choice(IP_LIST)})
            opener = urllib.request.build_opener(proxy_support)
            urllib.request.install_opener(opener)

            req = Request(
                url=img_url,
                headers={"X-CSRF-TOKEN": self.token, "User-Agent": choice(UA_LIST)}
            )
            content = urllib.request.urlopen(req).read()
            img_name = re.findall("\\d+/(.*)", img_url)[0]

            save_path = self.save_path + "/" + good["time"].replace(".", "") + "/" + \
                        good["title"].strip().replace("/", "-")
            if not os.path.exists(save_path):
                os.makedirs(save_path, exist_ok=True)

            img_info = save_path + "/" + img_name + ".png"
            if os.path.exists(img_info):
                continue
            with open(img_info, 'wb') as f:
                f.write(content)
                print(img_info, "下载完成!")
            # break

    def main(self):
        """函数入口"""
        time_ = time.localtime(time.time())
        category_data = copy.copy(self.post_data)
        data = self.html(
            self.start_url,
            category_data.update({
                "type": "all",
                "startDate": f"{str(time_.tm_year) + '.' + str(time_.tm_mon + 1)}"
            })
        )
        if data:
            for item in self.category(data):
                print(item["title"])
                save_path = self.save_path + "/" + re.findall("/\d{6}.*?", item["cover"])[0] + "/" + \
                            item["title"].strip().replace("/", "-")
                if os.path.exists(save_path):
                    continue

                detail_data = copy.copy(self.post_data)
                detail_data.update({"id": re.findall("mch/(.*)", item["url"])[0]})
                detail_html = self.html(self.detail_url, detail_data)
                detail_html_json = json.loads(detail_html)

                if detail_html_json["ok"] == 302:
                    detail_302_data = copy.copy(self.post_data)
                    detail_302_data.update({
                        "id": re.findall("mch/(.*)", detail_html_json["data"]["url"])[0]
                    })
                    detail_html = self.html(self.detail_url, detail_302_data)
                elif detail_html_json["ok"] == 404:
                    print("data error!")
                    continue

                item.update(self.detail(json.loads(detail_html)))
                self.save_info(item)
        else:
            print(data)
            return "category_html is None"


Zfrontier()


0
  1. 支付宝打赏

    qrcode alipay
  2. 微信打赏

    qrcode weixin

评论区