如何在 Django 通道中使用令牌基础身份验证并在邮递员上进行测试
通过使用 Django 通道,我想使用令牌身份验证并在邮递员上进行测试,但使用身份验证后无法连接。这是一个通知应用程序,用户在 websocket 上收到通知,无需刷新且无需身份验证,工作正常。我已经上传了所有文件。请检查一下,我将非常感谢你。 consumers.py
from channels.db import database_sync_to_async
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.generic.websocket import WebsocketConsumer
from channels.auth import login
from asgiref.sync import async_to_sync, sync_to_async
import json
from .models import *
class OrderProgress(WebsocketConsumer):
def connect(self):
self.user = self.scope["user"]
self.room_name = self.scope['url_route']['kwargs']['order_id']
print('the result is ....', self.scope['path'])
print('the user is', self.scope["user"])
self.room_group_name = 'order_%s' % self.room_name
print('order is this......', )
print(self.room_group_name)
async_to_sync(self.channel_layer.group_add)(
self.room_group_name,
self.channel_name
)
if self.scope['user'].is_authenticated:
order = Order.give_order_details(self.room_name)
self.accept()
self.send(text_data=json.dumps({
'payload': order
}))
def disconnect(self, close_code):
async_to_sync(self.channel_layer.group_discard)(
self.room_group_name,
self.channel_name
)
def receive(self, text_data):
async_to_sync(self.channel_layer.group_send)(
self.room_group_name,
{
'type': 'order_status',
'payload': text_data
}
)
def order_status(self, event):
print('data in event............', event)
data = json.loads(event['value'])
self.send(text_data=json.dumps({
'payload': data
}))
**middeleware.py**
from django.contrib.auth.models import AnonymousUser
from rest_framework.authtoken.models import Token
from channels.db import database_sync_to_async
from channels.middleware import BaseMiddleware
@database_sync_to_async
def get_user(token_key):
try:
token = Token.objects.get(key=token_key)
return token.user
except Token.DoesNotExist:
return AnonymousUser()
class TokenAuthMiddleware(BaseMiddleware):
def __init__(self, inner):
super().__init__(inner)
async def __call__(self, scope, receive, send):
try:
token_key = (dict((x.split('=') for x in scope['query_string'].decode().split("&")))).get('token', None)
except ValueError:
token_key = None
scope['user'] = AnonymousUser() if token_key is None else await get_user(token_key)
return await super().__call__(scope, receive, send)
**asgi.py**
import os
from channels.security.websocket import AllowedHostsOriginValidator
from channels.routing import ProtocolTypeRouter, URLRouter
from home.middleware import TokenAuthMiddleware
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
import home.routing
from channels.auth import AuthMiddlewareStack
from home.token_auth import TokenAuthMiddlewareStack
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'pizza2.settings')
application = ProtocolTypeRouter({
'http': get_asgi_application(),
'websocket': TokenAuthMiddleware(
URLRouter(
home.routing.websocket_urlpatterns
))
})
**routing.py**
from django.urls import path
from . import consumers
websocket_urlpatterns = [
path('ws/pizza/<order_id>', consumers.OrderProgress.as_asgi()),
path('ws/pizza1/', consumers.OrderConsumer.as_asgi()),
]
**models.py**
# Create your models here.
from django.db import models
import string
from django.contrib.auth.models import AbstractUser
from django.contrib.auth.models import User
from django.db.models.signals import post_save
import channels.layers
from django.contrib.auth import get_user_model
from django.dispatch import receiver
from asgiref.sync import async_to_sync
import json
from django.core.validators import MinLengthValidator
import random
from channels.layers import get_channel_layer
class User(AbstractUser):
username = None
email = models.EmailField(max_length=100, null=True, blank=False, unique=True)
user_name = models.CharField(max_length=100, null=True, blank=False)
password = models.CharField(max_length=100, null=True, blank=False)
USERNAME_FIELD = 'email'
REQUIRED_FIELDS = ['user_name']
def __str__(self):
return self.email
class Pizza(models.Model):
name = models.CharField(max_length=100, null=True)
price = models.IntegerField(default=100, null=True)
image = models.CharField(max_length=100, null=True)
def __str__(self):
return self.name
def random_string_generator(size=10, chars=string.ascii_lowercase + string.digits):
return ''.join(random.choice(chars) for _ in range(size))
CHOICES = (
("Order Recieved", "Order Recieved"),
("Baking", "Baking"),
("Baked", "Baked"),
("Out for delivery", "Out for delivery"),
("Order recieved", "Order recieved"),
)
class Order(models.Model):
pizza = models.ForeignKey(Pizza, on_delete=models.CASCADE, null=True)
user = models.ForeignKey(User, on_delete=models.CASCADE, null=True)
order_id = models.CharField(max_length=100, blank=True, null=True)
amount = models.IntegerField(default=100, null=True)
status = models.CharField(max_length=100, choices=CHOICES, default="Order Recieved", null=True)
date = models.DateTimeField(auto_now_add=True, null=True)
def save(self, *args, **kwargs)
if self.order_id is None:
self.order_id = random_string_generator()
print("Error")
super(Order, self).save(*args, **kwargs)
@staticmethod
def give_order_details(order_id):
instance = Order.objects.filter(order_id=order_id).first()
data = {}
data['order_id'] = instance.order_id
data['amount'] = instance.amount
data['status'] = instance.status
data['date'] = str(instance.date)
progress_percentage = 20
if instance.status == 'Order Recieved':
progress_percentage = 20
elif instance.status == 'Baking':
progress_percentage = 40
elif instance.status == 'Baked':
progress_percentage = 60
elif instance.status == 'Out for delivery':
progress_percentage = 80
elif instance.status == 'Order recieved':
progress_percentage = 100
data['progress'] = progress_percentage
return data
def __str__(self):
return self.order_id
@receiver(post_save, sender=Order)
def order_status_handler(sender, instance, created, **kwargs):
if not created:
print("###################")
channel_layer = get_channel_layer()
data = {}
data['order_id'] = instance.order_id
data['amount'] = instance.amount
data['status'] = instance.status
data['date'] = str(instance.date)
progress_percentage = 20
if instance.status == 'Order Recieved':
progress_percentage = 20
elif instance.status == 'Baking':
progress_percentage = 40
elif instance.status == 'Baked':
progress_percentage = 60
elif instance.status == 'Out for delivery':
progress_percentage = 80
elif instance.status == 'Order recieved':
progress_percentage = 100
data['progress'] = progress_percentage
async_to_sync(channel_layer.group_send)(
'order_%s' % instance.order_id, {
'type': 'order_status',
'value': json.dumps(data)
}
)
**postman**
[I want to connect and recive meassage fromm websocket but it denied please corect me I will be thankful to you][1]
[1]: https://i.sstatic.net/k3fyP.png
By using Django channels I want to use token authentication and test on postman but I can't connect after using authentication.It's a notification app where user receive notification on websocket without refresh and without authentication It is working fine.. I have uploaded all the files .Please check it out I will be thankful to you.
consumers.py
from channels.db import database_sync_to_async
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.generic.websocket import WebsocketConsumer
from channels.auth import login
from asgiref.sync import async_to_sync, sync_to_async
import json
from .models import *
class OrderProgress(WebsocketConsumer):
def connect(self):
self.user = self.scope["user"]
self.room_name = self.scope['url_route']['kwargs']['order_id']
print('the result is ....', self.scope['path'])
print('the user is', self.scope["user"])
self.room_group_name = 'order_%s' % self.room_name
print('order is this......', )
print(self.room_group_name)
async_to_sync(self.channel_layer.group_add)(
self.room_group_name,
self.channel_name
)
if self.scope['user'].is_authenticated:
order = Order.give_order_details(self.room_name)
self.accept()
self.send(text_data=json.dumps({
'payload': order
}))
def disconnect(self, close_code):
async_to_sync(self.channel_layer.group_discard)(
self.room_group_name,
self.channel_name
)
def receive(self, text_data):
async_to_sync(self.channel_layer.group_send)(
self.room_group_name,
{
'type': 'order_status',
'payload': text_data
}
)
def order_status(self, event):
print('data in event............', event)
data = json.loads(event['value'])
self.send(text_data=json.dumps({
'payload': data
}))
**middeleware.py**
from django.contrib.auth.models import AnonymousUser
from rest_framework.authtoken.models import Token
from channels.db import database_sync_to_async
from channels.middleware import BaseMiddleware
@database_sync_to_async
def get_user(token_key):
try:
token = Token.objects.get(key=token_key)
return token.user
except Token.DoesNotExist:
return AnonymousUser()
class TokenAuthMiddleware(BaseMiddleware):
def __init__(self, inner):
super().__init__(inner)
async def __call__(self, scope, receive, send):
try:
token_key = (dict((x.split('=') for x in scope['query_string'].decode().split("&")))).get('token', None)
except ValueError:
token_key = None
scope['user'] = AnonymousUser() if token_key is None else await get_user(token_key)
return await super().__call__(scope, receive, send)
**asgi.py**
import os
from channels.security.websocket import AllowedHostsOriginValidator
from channels.routing import ProtocolTypeRouter, URLRouter
from home.middleware import TokenAuthMiddleware
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
import home.routing
from channels.auth import AuthMiddlewareStack
from home.token_auth import TokenAuthMiddlewareStack
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'pizza2.settings')
application = ProtocolTypeRouter({
'http': get_asgi_application(),
'websocket': TokenAuthMiddleware(
URLRouter(
home.routing.websocket_urlpatterns
))
})
**routing.py**
from django.urls import path
from . import consumers
websocket_urlpatterns = [
path('ws/pizza/<order_id>', consumers.OrderProgress.as_asgi()),
path('ws/pizza1/', consumers.OrderConsumer.as_asgi()),
]
**models.py**
# Create your models here.
from django.db import models
import string
from django.contrib.auth.models import AbstractUser
from django.contrib.auth.models import User
from django.db.models.signals import post_save
import channels.layers
from django.contrib.auth import get_user_model
from django.dispatch import receiver
from asgiref.sync import async_to_sync
import json
from django.core.validators import MinLengthValidator
import random
from channels.layers import get_channel_layer
class User(AbstractUser):
username = None
email = models.EmailField(max_length=100, null=True, blank=False, unique=True)
user_name = models.CharField(max_length=100, null=True, blank=False)
password = models.CharField(max_length=100, null=True, blank=False)
USERNAME_FIELD = 'email'
REQUIRED_FIELDS = ['user_name']
def __str__(self):
return self.email
class Pizza(models.Model):
name = models.CharField(max_length=100, null=True)
price = models.IntegerField(default=100, null=True)
image = models.CharField(max_length=100, null=True)
def __str__(self):
return self.name
def random_string_generator(size=10, chars=string.ascii_lowercase + string.digits):
return ''.join(random.choice(chars) for _ in range(size))
CHOICES = (
("Order Recieved", "Order Recieved"),
("Baking", "Baking"),
("Baked", "Baked"),
("Out for delivery", "Out for delivery"),
("Order recieved", "Order recieved"),
)
class Order(models.Model):
pizza = models.ForeignKey(Pizza, on_delete=models.CASCADE, null=True)
user = models.ForeignKey(User, on_delete=models.CASCADE, null=True)
order_id = models.CharField(max_length=100, blank=True, null=True)
amount = models.IntegerField(default=100, null=True)
status = models.CharField(max_length=100, choices=CHOICES, default="Order Recieved", null=True)
date = models.DateTimeField(auto_now_add=True, null=True)
def save(self, *args, **kwargs)
if self.order_id is None:
self.order_id = random_string_generator()
print("Error")
super(Order, self).save(*args, **kwargs)
@staticmethod
def give_order_details(order_id):
instance = Order.objects.filter(order_id=order_id).first()
data = {}
data['order_id'] = instance.order_id
data['amount'] = instance.amount
data['status'] = instance.status
data['date'] = str(instance.date)
progress_percentage = 20
if instance.status == 'Order Recieved':
progress_percentage = 20
elif instance.status == 'Baking':
progress_percentage = 40
elif instance.status == 'Baked':
progress_percentage = 60
elif instance.status == 'Out for delivery':
progress_percentage = 80
elif instance.status == 'Order recieved':
progress_percentage = 100
data['progress'] = progress_percentage
return data
def __str__(self):
return self.order_id
@receiver(post_save, sender=Order)
def order_status_handler(sender, instance, created, **kwargs):
if not created:
print("###################")
channel_layer = get_channel_layer()
data = {}
data['order_id'] = instance.order_id
data['amount'] = instance.amount
data['status'] = instance.status
data['date'] = str(instance.date)
progress_percentage = 20
if instance.status == 'Order Recieved':
progress_percentage = 20
elif instance.status == 'Baking':
progress_percentage = 40
elif instance.status == 'Baked':
progress_percentage = 60
elif instance.status == 'Out for delivery':
progress_percentage = 80
elif instance.status == 'Order recieved':
progress_percentage = 100
data['progress'] = progress_percentage
async_to_sync(channel_layer.group_send)(
'order_%s' % instance.order_id, {
'type': 'order_status',
'value': json.dumps(data)
}
)
**postman**
[I want to connect and recive meassage fromm websocket but it denied please corect me I will be thankful to you][1]
[1]: https://i.sstatic.net/k3fyP.png
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论