Django Channels
Channels是一个为Django提供异步扩展的库,通常主要用来提供WebSocket支持和后台任务。
网络架构
Protocol Type Router
:协议类型解析器,对不同类型的协议进行解析
处理请求
Django 请求
- Django是一个同步框架:接收到请求之后,必须处理完这个请求,返回了之后,浏览器才能显示加载完毕
- 业务场景:Django处理一个时间很长的请求
- 面临的问题
- 客户端浏览器会一直等待
- Nginx会超时(TimeOut),关闭连接
- 可以考虑的方案
- 使用celery,将比较耗时的任务传递给celery,进行异步处理;Django正常返回
- 问题:
celery
执行是否成功/执行成功之后的结果,都无法再通过Django主动发送给前端
- 面临的问题
Django Channels
interface server
:接口服务器,负责对协议进行解析,将不同协议分发到不同的ChannelChannel Leyer
:频道层,可以是一个FIFO(先进先出,first in;first out)队列,通常使用Redisconsumer
:消费者,接受和处理消息
Channels中文件和配置的含义
- 配置步骤
- 在
INSTALLED_APPS
中注册Channels应用 - 在settings.py的配置文件中,添加
CHANNEL_LAYERS
缓存
- 在
from channels.layers import get_channel_layer
channel_layer = get_channel_layer() # ChannelLayerManager() 频道层管理器实例 ==> 相当于ORM中查询集的管理器QuerySet
# 频道层的API
channel_layer.group_add("第一个参数是组名", "第二个参数是频道的名字")
channel_layer.group_discard() # 离开某一个组
- asgi.py:介于网络协议服务和Python应用之间的标准接口,能够处理多重通用类型协议,包括HTTP、HTTP2和WebSocket
- channel_layers:在settings.py中配置,类似于一个通道,发送者(producer)在一段发送消息,消费者(consumer)在另一端监听
- routungs.py:相当于Django中的urls.py
- consumers.py:相当于Django中的views.py
- views.py: 用来开发
符合WSGI
规范的应用 - consumers.py: 用来开发
符合ASGI
接口规范的Python应用
- views.py: 用来开发
WSGI 和 ASGI
- WSGI:
- 为Python语言定义的Web服务器和Web应用程序或框架之前的一种简单而通用的接口
- uWSGI:一个Web服务器, 可安装的软件,提供服务的
- WSGI:一套标准,(HTTP,HTTPS)
- ASGI:
- 异步服务网关接口,一个介于网络协议器和Python之间的标准接口
- 能够处理多种通用的协议类型,包括HTTP、HTTPS和WebSocket
- 部署:
- HTTP、HTTP2:Nginx/Apache + WSGI(uWSGI) + Django/Flask/Python3
- HTTP、HTTP2、WebSocket:Nginx/Apache + ASGI)(Daphne)+ Django/Flask/Python3
- 区别
- WSGI:基于HTTP协议模式的,不支持WebSocket
- ASGI:就是为了支持Python常用的WSGI所不支持的新的协议标准
- 即:
ASGI是WSGI的扩展,而且能通过asyncio异步运行
Consumer的使用
- event loop:事件循环
- event handler:事件处理
- sync:同步
- async:异步
- scope:在ASGI接口规范中定义了,相当于WSGI中的request
from channels.consumer import SyncConsumer
class EchoConsumer(SyncConsumer):
"""同步的Consumer"""
def websocket_connect(self, event):
"""
建立连接
event: 连接的事件
"""
self.send({
# 这里的这个字典的key是固定的
# 这里写的是一个字符串,但是对应的是websocket.accept() 这个方法
# -->接收websocket的连接
"type": "websocket.accept",
})
def websocket_receive(self, event):
"""
接受消息
event:接受的事件
"""
self.send({
# 这里写的是一个字符串,但是对应的是websocket.send() 这个方法
# --> 从后端主动发送websocket的消息
"type": "websocket.send",
"text": event["text"] # 后端返回给前端的数据
})
ORM 同步到异步
什么时候用sync什么时候用async
异步的代码里面不要写入同步的逻辑,否则起不到真正意义上的异步并发
class EchoAsyncConsumer(AsyncConsumer):
async def websocket_receive(self, event):
# 如果在异步的逻辑里面,出现同步的代码 --> 当前方法的事件循环会卡住,这时候就要把同步代码改成异步代码
# ORM同步到异步
user = User.objects.get(username=username)
from channels.db import database_sync_to_async
# 方式一:
# user = await database_sync_to_async(user = User.objects.get(username=username))
# 方式二:
@database_sync_to_async
def get_user(username):
return user = User.objects.get(username=username)
socp
在ASGI接口规范中定义了,相当于WSGI中的request
user = self.scope["user"]
path = self.scope["path"] # Request请求的路径==> HTTP/WebSocket
header = self.scope["headers"]
method = self.scope["method"] # 注意:这个只针对于HTTP请求
Generic Consumer同步与异步通信
from channels.generic.websocket import WebsocketConsumer, AsyncWebsocketConsumer
class MyConsumer(WebsocketConsumer):
"""WebsocketConsumer:对SyncConsumer的进一步封装"""
def connect(self):
"""同步,接收连接"""
# self.accept() 如果不传参,表示接受websocket的连接
# self.accept() 传参,subprotocol==>自定义的子协议
self.accept(subprotocol="you protocol")
# 拒绝连接,给客户端发送一个状态码403,表示权限错误
self.close(code=403)
def receive(self, text_data=None, bytes_data=None):
"""接收数据"""
self.send(text_data="") # 返回文本
self.send(bytes_data="") # 把字符串转换成二进制的帧返回
self.close()
def disconnect(self, code):
"""断开连接"""
pass
class MyAsyncConsumer(AsyncWebsocketConsumer):
"""
把上面同步的Consumer变成异步的Consumer
步骤:def()前面添加async,函数内部调用的方法前面添加await
"""
async def connect(self):
"""同步,接收连接"""
await self.accept(subprotocol="you protocol")
await self.close(code=403)
async def receive(self, text_data=None, bytes_data=None):
"""接收数据"""
await self.send(text_data="")
await self.send(bytes_data="")
await self.close()
async def disconnect(self, code):
"""断开连接"""
pass
Channels的路由Routing开发
- ProtocolTypeRouter:协议类型解析
- self.scope[‘type’]获取协议类型
- self.scope[‘url_route’][‘kwargs’][‘username’]获取url中关键字参数
- channels routing是scope级别的,一个连接只能由一个consumer接收和处理
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.security.websocket import AllowedHostsOriginValidator
from django.urls import path
from zanhu.messager.consumers import MessagesConsumer
application = ProtocolTypeRouter({
# 'http': # 普通的HTTP请求不需要我们手动在这里添加,框架会自动加载
'websocket':
# 使用AllowedHostsOriginValidator,允许的访问的源站与settings.py文件中的ALLOWED_HOSTS相同
AllowedHostsOriginValidator(
# 认证中间件站(兼容Django认证系统):
# 用于WebSocket认证,集成了CookieMiddleware, SessionMiddleware, AuthMiddleware
AuthMiddlewareStack(
# URL路由
URLRouter([
# URL路由匹配
path('ws/notifications/', NotificationsConsumer),
path('ws/<str:username>/', MessagesConsumer),
])
)
)
})
- OriginValidator、AllowedHostsOriginValidator可以防止通过WebSocket进行CSRF攻击
- OriginValidator需要手动添加允许访问的源站,如:
from channels.security.websocket import OriginValidator
application = ProtocolTypeRouter({
'websocket': OriginValidator(
AuthMiddlewareStack(
URLRouter([
...
])
),
# 第二个参数,手动添加的允许访问的源站
[".imooc.com", "http://.imooc.com:80", "http://muke.site.com"]
)
})
评论区