" name="sm-site-verification"/>
侧边栏壁纸
博主头像
PySuper博主等级

千里之行,始于足下

  • 累计撰写 203 篇文章
  • 累计创建 14 个标签
  • 累计收到 1 条评论

目 录CONTENT

文章目录
Web

Django 中实现多任务处理

PySuper
2019-12-18 / 0 评论 / 0 点赞 / 9 阅读 / 12210 字
温馨提示:
所有牛逼的人都有一段苦逼的岁月。 但是你只要像SB一样去坚持,终将牛逼!!! ✊✊✊

在Django单个视图中使用asyncio实例代码如下(有多个IO任务时)

from django.views import View
import asyncio
import time
from django.http import JsonResponse
 
class TestAsyncioView(View):
    def get(self, request, *args, **kwargs):
        """
        利用asyncio和async await关键字(python3.5之前使用yield)实现协程
        """
        start_time = time.time()
        loop = asyncio.new_event_loop()  # 或 loop = asyncio.SelectorEventLoop()
        asyncio.set_event_loop(loop)
        self.loop = loop
        try:
            results = loop.run_until_complete(self.gather_tasks())
        finally:
            loop.close()
        end_time = time.time()
        return JsonResponse({'results': results, 'cost_time': (end_time - start_time)})
 
    async def gather_tasks(self):
        """
         也可以用回调函数处理results
        task1 = self.loop.run_in_executor(None, self.io_task1, 2)
        future1 = asyncio.ensure_future(task1)
        future1.add_done_callback(callback)
 
        def callback(self, future):
            print("callback:",future.result())
        """
        tasks = (
            self.make_future(self.io_task1, 2),
            self.make_future(self.io_task2, 2)
        )
        results = await asyncio.gather(*tasks)
        return results
 
    async def make_future(self, func, *args):
        future = self.loop.run_in_executor(None, func, *args)
        response = await future
        return response
 
    """
    # python3.5之前无async await写法
    import types
    @types.coroutine
    # @asyncio.coroutine  # 这个也行
    def make_future(self, func, *args):
        future = self.loop.run_in_executor(None, func, *args)
        response = yield from future
        return response
    """
 
    def io_task1(self, sleep_time):
        time.sleep(sleep_time)
        return 66
 
    def io_task2(self, sleep_time):
        time.sleep(sleep_time)
        return 77

使用async返回协程中的计算结果

import time
import asyncio
from threading import Thread


async def do_task(params):  # 执行单个任务
    print(time.time(), params)
    await asyncio.sleep(2)  # 注意这里不能用time.sleep(),否则全局都会sleep
    return params


async def do_task_group(params_group):  # 执行多个任务
    tasks = [asyncio.ensure_future(do_task(params), loop=io_loop) for params in params_group]
    results = await asyncio.gather(*tasks, loop=io_loop, return_exceptions=True)
    return results


async def do_task_groups(all_params, send_step=5):  # 执行多个任务组
    # 把所有任务按照步长分成多个任务组
    params_groups = [all_params[index: index + send_step] for index in range(0, len(all_params), send_step)]
    tasks = [asyncio.ensure_future(do_task_group(params_group), loop=io_loop) for params_group in params_groups]

    # 获取并合并任务结果
    the_results = await asyncio.gather(*tasks, loop=io_loop, return_exceptions=True)
    results = []
    for result in the_results:
        results.extend(result)
    return results


def do_all_tasks(all_params):  # 执行所有任务
    results = asyncio.run_coroutine_threadsafe(do_task_groups(all_params), io_loop)
    return results.result()  # 阻塞式获取结果,真正阻塞执行事务的地方


def start_loop():  # 启动事件循环
    io_loop = asyncio.new_event_loop()
    asyncio.set_event_loop(io_loop)
    thread = Thread(target=io_loop.run_forever)
    thread.daemon = True
    thread.start()
    return io_loop


if __name__ == '__main__':
    io_loop = start_loop()
    all_params = list(range(10))
    print(do_all_tasks(all_params))

需要在线程启动时显式创建/设置事件循环:

loop = asyncio.new_event_loop()

asyncio.set_event_loop(loop)

在Django单个视图中使用ThreadPoolExecutor实例代码如下(有多个IO任务时)

from django.views import View
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
 
 
class TestThreadView(View):
    def get(self, request, *args, **kargs):
        start_time = time.time()
        future_set = set()
        tasks = (self.io_task1, self.io_task2)
        with ThreadPoolExecutor(len(tasks)) as executor:
            for task in tasks:
                future = executor.submit(task, 2)
                future_set.add(future)
        for future in as_completed(future_set):
            error = future.exception()
            if error is not None:
                raise error
        results = self.get_results(future_set)
        end_time = time.time()
        return JsonResponse({'results': results, 'cost_time': (end_time - start_time)})
 
    def get_results(self, future_set):
        """
        处理io任务执行结果,也可以用future.add_done_callback(self.get_result)
        def get(self, request, *args, **kargs):
            start_time = time.time()
            future_set = set()
            tasks = (self.io_task1, self.io_task2)
            with ThreadPoolExecutor(len(tasks)) as executor:
                for task in tasks:
                    future = executor.submit(task, 2).add_done_callback(self.get_result)
                    future_set.add(future)
            for future in as_completed(future_set):
                error = future.exception()
                print(dir(future))
                if error is not None:
                    raise error
            self.results = results = []
            end_time = time.time()
            return JsonResponse({'results': results, 'cost_time': (end_time - start_time)})
 
        def get_result(self, future):
            self.results.append(future.result())
        """
        results = []
        for future in future_set:
            results.append(future.result())
        return results
 
    def io_task1(self, sleep_time):
        time.sleep(sleep_time)
        return 10
 
    def io_task2(self, sleep_time):
        time.sleep(sleep_time)
        return 66

使用进程间通信完成计算结果(CPU密集型)

import os
import re
from multiprocessing import cpu_count, Pool, Manager

from algorithm.class_calcuate import *
from algorithm.calculate_name import CalculateNameDict
from algorithm.read_file import read_file



item = {
    "calculate": "qwe",
    "file_info": {
        "checked": "True",
        "children": [
            {
                "children": [{"id": 2, "title": "qwe"}, {"id": 2, "title": "qwe"}],
                "id": 1,
                "title": "qweqweqwe"
                # "title": "xxxxx"
            },{
                "children": [{"id": 2, "title": "xxxxx"}, {"id": 2, "title": "xxxxx"}],
                "id": 1,
                "title": "xxxxx"
                # "title": "xxxxx"
            }
        ],
        "field": "name1",
        "id": 1,
        "spread": "True",
        "title": "文件夹名"
    }
}

"""
Note:这部分有两个部分(一个一个实现)
    1. IO密集:读取文件信息(多线程),把所有的IO写在一个函数里
    2. CPU密集:算法计算(多进程)
"""

class ParseTask():
    def __init__(self):
        self.queue = Manager().Queue()

    def parse_json(self):
        # 使用yield完成多协程的IO操作
        calculate_name = item["calculate"]  # 名称
        channel_info_list = item["file_info"]["children"]  # 前端发送的文件列表数据(一级列表)
        calculate_class_name = CalculateNameDict[calculate_name]  # 将算法名称传递到工厂模式中 ==> 拿到当前算法中的类名

        for info_dict in channel_info_list:
            filename = info_dict["title"]
            channel_front_list = info_dict["children"]  # 二级列表

            # 读文件的时候就要切换线程 ==> 就在当前进程中切换 ==> 协程
            channel_file_info, channel_data = read_file(filename)
            yield channel_file_info, channel_front_list, calculate_class_name, filename, channel_data


    def parse_file_info(self):
        items = []
        pool = Pool(cpu_count())
        # queue = Manager().Queue()
        # 计算的时候使用多进程完成CPU密集型
        for channel_file_list, channel_front_list, calculate_class_name, filename, channel_data in self.parse_json():

            channel_location = {}
            for channel in channel_front_list:
                if channel['title'] in ["EngineRPM"]:
                # if channel['title'] in ["RPM"]:
                    channel_front_list.remove(channel)
                channel_key = list(channel_file_list.keys())[list(channel_file_list.values()).index(channel["title"])]
                channel_key_num = re.match(r'.*?(\d+)', channel_key).group(1)
                channel_location[channel["title"]] = int(channel_key_num)

                """同步执行"""
                # img_path = eval(calculate_class_name)(
                #     filename,
                #     channel_data,
                #     channel["title"],
                #     int(0),
                #     int(channel_location[channel["title"]]),
                #     int(channel_location["EngineRPM"])
                #     # int(channel_location["RPM"])
                # ).run()
                # img_path = re.match(r'.*?(/front_end/calculate_image/.*)', img_path).group(1)
                # print(img_path)

                """异步执行"""
        #         print(os.getpid())
        #         pool.apply_async(calculate_process, args=(
        #             calculate_class_name,
        #             filename,
        #             channel_data,
        #             channel["title"],
        #             int(0),
        #             int(channel_location[channel["title"]]),
        #             int(channel_location["EngineRPM"]),
        #         ))
        # pool.close()
        # pool.join()

                """异步返回"""
                # 使用队列接受多进程中的数据返回值
                pool.apply_async(self.calculate_process, args=(
                            self.queue,
                            calculate_class_name,
                            filename,
                            channel_data,
                            channel["title"],
                            int(0),
                            int(channel_location[channel["title"]]),
                            int(channel_location["xxxxx"]),
                        ))
        pool.close()
        pool.join()

        # print(time.time() - start_time)   # 最后打印的时间


    def calculate_process(self, queue, calculate_class_name, file_name, channel_data, channel_name, raw_time_num, raw_data_num, raw_rpm_num):
        print(calculate_class_name, os.getpid())
        # img_path = eval(calculate_class_name)(
        #     file_name,
        #     channel_data,
        #     channel_name,
        #     raw_time_num,
        #     raw_data_num,
        #     raw_rpm_num
        # ).run()
        # img_path = re.match(r'.*?(/front_end/calculate_image/.*)', img_path).group(1)
        # 这里确定返回到前端的数据
        data = {"asd":12, "qwe": 32}
        queue.put(data)
        # print(img_path)

    def run(self):
        # 使用进程间通信 返回多个数据的返回
        items = []
        self.parse_file_info()
        while True:
            if self.queue.empty():
                break
            items.append(self.queue.get(True))
        print(items)

# if __name__ == '__main__':
#     parse_file_info()

ParseTask().run()
0
  1. 支付宝打赏

    qrcode alipay
  2. 微信打赏

    qrcode weixin

评论区