在Django单个视图中使用asyncio实例代码如下(有多个IO任务时)
from django.views import View
import asyncio
import time
from django.http import JsonResponse
class TestAsyncioView(View):
def get(self, request, *args, **kwargs):
"""
利用asyncio和async await关键字(python3.5之前使用yield)实现协程
"""
start_time = time.time()
loop = asyncio.new_event_loop() # 或 loop = asyncio.SelectorEventLoop()
asyncio.set_event_loop(loop)
self.loop = loop
try:
results = loop.run_until_complete(self.gather_tasks())
finally:
loop.close()
end_time = time.time()
return JsonResponse({'results': results, 'cost_time': (end_time - start_time)})
async def gather_tasks(self):
"""
也可以用回调函数处理results
task1 = self.loop.run_in_executor(None, self.io_task1, 2)
future1 = asyncio.ensure_future(task1)
future1.add_done_callback(callback)
def callback(self, future):
print("callback:",future.result())
"""
tasks = (
self.make_future(self.io_task1, 2),
self.make_future(self.io_task2, 2)
)
results = await asyncio.gather(*tasks)
return results
async def make_future(self, func, *args):
future = self.loop.run_in_executor(None, func, *args)
response = await future
return response
"""
# python3.5之前无async await写法
import types
@types.coroutine
# @asyncio.coroutine # 这个也行
def make_future(self, func, *args):
future = self.loop.run_in_executor(None, func, *args)
response = yield from future
return response
"""
def io_task1(self, sleep_time):
time.sleep(sleep_time)
return 66
def io_task2(self, sleep_time):
time.sleep(sleep_time)
return 77
使用async返回协程中的计算结果
import time
import asyncio
from threading import Thread
async def do_task(params): # 执行单个任务
print(time.time(), params)
await asyncio.sleep(2) # 注意这里不能用time.sleep(),否则全局都会sleep
return params
async def do_task_group(params_group): # 执行多个任务
tasks = [asyncio.ensure_future(do_task(params), loop=io_loop) for params in params_group]
results = await asyncio.gather(*tasks, loop=io_loop, return_exceptions=True)
return results
async def do_task_groups(all_params, send_step=5): # 执行多个任务组
# 把所有任务按照步长分成多个任务组
params_groups = [all_params[index: index + send_step] for index in range(0, len(all_params), send_step)]
tasks = [asyncio.ensure_future(do_task_group(params_group), loop=io_loop) for params_group in params_groups]
# 获取并合并任务结果
the_results = await asyncio.gather(*tasks, loop=io_loop, return_exceptions=True)
results = []
for result in the_results:
results.extend(result)
return results
def do_all_tasks(all_params): # 执行所有任务
results = asyncio.run_coroutine_threadsafe(do_task_groups(all_params), io_loop)
return results.result() # 阻塞式获取结果,真正阻塞执行事务的地方
def start_loop(): # 启动事件循环
io_loop = asyncio.new_event_loop()
asyncio.set_event_loop(io_loop)
thread = Thread(target=io_loop.run_forever)
thread.daemon = True
thread.start()
return io_loop
if __name__ == '__main__':
io_loop = start_loop()
all_params = list(range(10))
print(do_all_tasks(all_params))
需要在线程启动时显式创建/设置事件循环:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
在Django单个视图中使用ThreadPoolExecutor实例代码如下(有多个IO任务时)
from django.views import View
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
class TestThreadView(View):
def get(self, request, *args, **kargs):
start_time = time.time()
future_set = set()
tasks = (self.io_task1, self.io_task2)
with ThreadPoolExecutor(len(tasks)) as executor:
for task in tasks:
future = executor.submit(task, 2)
future_set.add(future)
for future in as_completed(future_set):
error = future.exception()
if error is not None:
raise error
results = self.get_results(future_set)
end_time = time.time()
return JsonResponse({'results': results, 'cost_time': (end_time - start_time)})
def get_results(self, future_set):
"""
处理io任务执行结果,也可以用future.add_done_callback(self.get_result)
def get(self, request, *args, **kargs):
start_time = time.time()
future_set = set()
tasks = (self.io_task1, self.io_task2)
with ThreadPoolExecutor(len(tasks)) as executor:
for task in tasks:
future = executor.submit(task, 2).add_done_callback(self.get_result)
future_set.add(future)
for future in as_completed(future_set):
error = future.exception()
print(dir(future))
if error is not None:
raise error
self.results = results = []
end_time = time.time()
return JsonResponse({'results': results, 'cost_time': (end_time - start_time)})
def get_result(self, future):
self.results.append(future.result())
"""
results = []
for future in future_set:
results.append(future.result())
return results
def io_task1(self, sleep_time):
time.sleep(sleep_time)
return 10
def io_task2(self, sleep_time):
time.sleep(sleep_time)
return 66
使用进程间通信完成计算结果(CPU密集型)
import os
import re
from multiprocessing import cpu_count, Pool, Manager
from algorithm.class_calcuate import *
from algorithm.calculate_name import CalculateNameDict
from algorithm.read_file import read_file
item = {
"calculate": "qwe",
"file_info": {
"checked": "True",
"children": [
{
"children": [{"id": 2, "title": "qwe"}, {"id": 2, "title": "qwe"}],
"id": 1,
"title": "qweqweqwe"
# "title": "xxxxx"
},{
"children": [{"id": 2, "title": "xxxxx"}, {"id": 2, "title": "xxxxx"}],
"id": 1,
"title": "xxxxx"
# "title": "xxxxx"
}
],
"field": "name1",
"id": 1,
"spread": "True",
"title": "文件夹名"
}
}
"""
Note:这部分有两个部分(一个一个实现)
1. IO密集:读取文件信息(多线程),把所有的IO写在一个函数里
2. CPU密集:算法计算(多进程)
"""
class ParseTask():
def __init__(self):
self.queue = Manager().Queue()
def parse_json(self):
# 使用yield完成多协程的IO操作
calculate_name = item["calculate"] # 名称
channel_info_list = item["file_info"]["children"] # 前端发送的文件列表数据(一级列表)
calculate_class_name = CalculateNameDict[calculate_name] # 将算法名称传递到工厂模式中 ==> 拿到当前算法中的类名
for info_dict in channel_info_list:
filename = info_dict["title"]
channel_front_list = info_dict["children"] # 二级列表
# 读文件的时候就要切换线程 ==> 就在当前进程中切换 ==> 协程
channel_file_info, channel_data = read_file(filename)
yield channel_file_info, channel_front_list, calculate_class_name, filename, channel_data
def parse_file_info(self):
items = []
pool = Pool(cpu_count())
# queue = Manager().Queue()
# 计算的时候使用多进程完成CPU密集型
for channel_file_list, channel_front_list, calculate_class_name, filename, channel_data in self.parse_json():
channel_location = {}
for channel in channel_front_list:
if channel['title'] in ["EngineRPM"]:
# if channel['title'] in ["RPM"]:
channel_front_list.remove(channel)
channel_key = list(channel_file_list.keys())[list(channel_file_list.values()).index(channel["title"])]
channel_key_num = re.match(r'.*?(\d+)', channel_key).group(1)
channel_location[channel["title"]] = int(channel_key_num)
"""同步执行"""
# img_path = eval(calculate_class_name)(
# filename,
# channel_data,
# channel["title"],
# int(0),
# int(channel_location[channel["title"]]),
# int(channel_location["EngineRPM"])
# # int(channel_location["RPM"])
# ).run()
# img_path = re.match(r'.*?(/front_end/calculate_image/.*)', img_path).group(1)
# print(img_path)
"""异步执行"""
# print(os.getpid())
# pool.apply_async(calculate_process, args=(
# calculate_class_name,
# filename,
# channel_data,
# channel["title"],
# int(0),
# int(channel_location[channel["title"]]),
# int(channel_location["EngineRPM"]),
# ))
# pool.close()
# pool.join()
"""异步返回"""
# 使用队列接受多进程中的数据返回值
pool.apply_async(self.calculate_process, args=(
self.queue,
calculate_class_name,
filename,
channel_data,
channel["title"],
int(0),
int(channel_location[channel["title"]]),
int(channel_location["xxxxx"]),
))
pool.close()
pool.join()
# print(time.time() - start_time) # 最后打印的时间
def calculate_process(self, queue, calculate_class_name, file_name, channel_data, channel_name, raw_time_num, raw_data_num, raw_rpm_num):
print(calculate_class_name, os.getpid())
# img_path = eval(calculate_class_name)(
# file_name,
# channel_data,
# channel_name,
# raw_time_num,
# raw_data_num,
# raw_rpm_num
# ).run()
# img_path = re.match(r'.*?(/front_end/calculate_image/.*)', img_path).group(1)
# 这里确定返回到前端的数据
data = {"asd":12, "qwe": 32}
queue.put(data)
# print(img_path)
def run(self):
# 使用进程间通信 返回多个数据的返回
items = []
self.parse_file_info()
while True:
if self.queue.empty():
break
items.append(self.queue.get(True))
print(items)
# if __name__ == '__main__':
# parse_file_info()
ParseTask().run()
评论区