Source code for django_admin_collaborator.consumers

import json
import datetime as dt
import time
from typing import Dict, Any, Optional, cast

import redis
from redis.exceptions import ConnectionError, TimeoutError
from channels.db import database_sync_to_async
from channels.generic.websocket import AsyncWebsocketConsumer
from django.conf import settings
from django.contrib.auth import get_user_model
from django.db import connections
from django.utils import timezone
import logging

"""
Django Admin Collaborator WebSocket consumers.

This module contains the WebSocket consumers for real-time collaboration
in the Django admin interface, including collaborative editing and chat.

Redis connection settings can be configured in your Django settings:

    # Redis connection URL
    ADMIN_COLLABORATOR_REDIS_URL = 'redis://localhost:6379/0'

    # Maximum number of retry attempts for Redis operations (default: 3)
    ADMIN_COLLABORATOR_REDIS_MAX_RETRIES = 3

    # Delay between retries in seconds (default: 0.5)
    ADMIN_COLLABORATOR_REDIS_RETRY_DELAY = 0.5

    # Redis connection timeout in seconds (default: 5)
    ADMIN_COLLABORATOR_REDIS_SOCKET_TIMEOUT = 5

    # Redis connection pool settings (default: 10)
    ADMIN_COLLABORATOR_REDIS_MAX_CONNECTIONS = 10
"""

logger = logging.getLogger(__name__)

User = get_user_model()

# Maximum number of retry attempts for Redis operations
REDIS_MAX_RETRIES = getattr(settings, 'ADMIN_COLLABORATOR_REDIS_MAX_RETRIES', 3)
# Delay between retries in seconds
REDIS_RETRY_DELAY = getattr(settings, 'ADMIN_COLLABORATOR_REDIS_RETRY_DELAY', 0.5)
# Redis connection timeout in seconds
REDIS_SOCKET_TIMEOUT = getattr(settings, 'ADMIN_COLLABORATOR_REDIS_SOCKET_TIMEOUT', 5)
# Redis connection pool settings
REDIS_MAX_CONNECTIONS = getattr(settings, 'ADMIN_COLLABORATOR_REDIS_MAX_CONNECTIONS', 10)


def redis_operation_with_retry(func):
    """
    Decorator that adds retry logic for Redis operations.
    Catches ConnectionError and TimeoutError and retries the operation.
    """
    def wrapper(*args, **kwargs):
        retries = 0
        while retries < REDIS_MAX_RETRIES:
            try:
                return func(*args, **kwargs)
            except (ConnectionError, TimeoutError) as e:
                retries += 1
                if retries >= REDIS_MAX_RETRIES:
                    logger.error(f"Redis operation failed after {REDIS_MAX_RETRIES} retries: {e}")
                    raise
                logger.warning(f"Redis connection error (attempt {retries}/{REDIS_MAX_RETRIES}): {e}")
                time.sleep(REDIS_RETRY_DELAY * (2 ** (retries - 1)))  # Exponential backoff
    return wrapper


def _close_thread_db_connections() -> None:
    """
    Force-close every DB connection bound to the current thread.

    Channels' built-in ``close_old_connections()`` only closes connections older
    than ``CONN_MAX_AGE``. In a long-running ASGI process the ``sync_to_async``
    thread pool keeps threads (and their Django ``connections`` objects) alive
    indefinitely, so with ``CONN_MAX_AGE > 0`` every concurrent websocket connect
    can spawn a new thread that holds a Postgres connection until the next call
    into that thread closes it. Under heavy refresh / many concurrent admin
    users this exhausts Postgres ``max_connections``.

    Calling ``conn.close()`` here closes the connection unconditionally so the
    package is safe regardless of the host project's ``CONN_MAX_AGE`` setting.
    Trade-off: each call pays a fresh-connection cost (~few ms), which is
    acceptable for the once-per-websocket auth check we use it for.
    """
    for conn in connections.all():
        try:
            conn.close()
        except Exception:
            # Cleanup must never raise; the connection may already be broken.
            pass


[docs]def get_utc_timestamp() -> str: """Return the current time as a UTC ISO-8601 string with timezone info.""" return timezone.now().astimezone(dt.timezone.utc).isoformat()
@database_sync_to_async def _is_staff_user(scope) -> bool: """ Authorize an ASGI scope's user as authenticated + staff. Runs in the ``database_sync_to_async`` thread pool. See ``_close_thread_db_connections`` for why we force-close the DB connection in the finally block. """ try: user = cast(User, scope.get('user')) if user is None: return False return bool(user.is_authenticated and getattr(user, 'is_staff', False)) finally: _close_thread_db_connections()
[docs]class RedisClientMixin: """ Shared Redis client + pool + retry-wrapped helpers for the WebSocket consumers. Both consumer classes used to duplicate this block verbatim. A single pool on the mixin means both classes share Redis connections (lower client count against managed Redis plans like Heroku Redis). """ _redis_connection_pool = None def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._redis_client = None @property def redis_client(self): if self._redis_client is None: if RedisClientMixin._redis_connection_pool is None: redis_url = getattr( settings, 'ADMIN_COLLABORATOR_REDIS_URL', getattr(settings, 'REDIS_URL', 'redis://localhost:6379/0'), ) RedisClientMixin._redis_connection_pool = redis.ConnectionPool.from_url( redis_url, max_connections=REDIS_MAX_CONNECTIONS, socket_timeout=REDIS_SOCKET_TIMEOUT, socket_keepalive=True, retry_on_timeout=True, ) self._redis_client = redis.Redis(connection_pool=RedisClientMixin._redis_connection_pool) return self._redis_client @redis_operation_with_retry def redis_exists(self, key): return self.redis_client.exists(key) @redis_operation_with_retry def redis_get(self, key): return self.redis_client.get(key) @redis_operation_with_retry def redis_set(self, key, value, ex=None): return self.redis_client.set(key, value, ex=ex) @redis_operation_with_retry def redis_delete(self, key): return self.redis_client.delete(key) @redis_operation_with_retry def redis_setex(self, key, time, value): return self.redis_client.setex(key, time, value) @redis_operation_with_retry def redis_hset(self, name, key, value): return self.redis_client.hset(name, key, value) @redis_operation_with_retry def redis_hdel(self, name, key): return self.redis_client.hdel(name, key) @redis_operation_with_retry def redis_hgetall(self, name): return self.redis_client.hgetall(name) @redis_operation_with_retry def redis_pipeline_execute(self, pipeline): return pipeline.execute()
[docs] async def check_user_authorization(self) -> bool: """Validate the connection's user is authenticated + has staff permissions.""" return await _is_staff_user(self.scope)
[docs] def get_timestamp(self) -> str: """Compat shim — prefer the module-level ``get_utc_timestamp``.""" return get_utc_timestamp()
[docs]class AdminCollaborationConsumer(RedisClientMixin, AsyncWebsocketConsumer): """ WebSocket consumer for real-time collaborative editing in Django admin. This consumer manages edit locks for admin model instances, ensuring only one staff user can edit a specific object at a time. It also broadcasts editing status and updates to all connected clients. Communication is coordinated through Redis for lock management and message distribution via Django Channels layer for WebSocket messaging. Redis client, connection pool, auth check, and timestamp helpers live on ``RedisClientMixin``. """
[docs] async def connect(self) -> None: """ Handle WebSocket connection establishment. - Extracts model and object identifiers from URL - Authenticates the user - Sets up channel group for this specific object - Notifies other users about this user's presence - Retrieves and maintains last modified timestamp Closes the connection if user is not authorized. """ # Get parameters from the URL self.app_label: str = self.scope['url_route']['kwargs']['app_label'] self.model_name: str = self.scope['url_route']['kwargs']['model_name'] self.object_id: str = self.scope['url_route']['kwargs']['object_id'] # Perform authentication check with database_sync_to_async is_authorized: bool = await self.check_user_authorization() if not is_authorized: await self.close() return # Create a unique channel group name for this object self.room_group_name: str = f"admin_{self.app_label}_{self.model_name}_{self.object_id}" self.user_id: int = self.scope['user'].id self.email: str = self.scope['user'].email # Get avatar URL if configured avatar_url = None avatar_field = getattr(settings, 'ADMIN_COLLABORATOR_OPTIONS', {}).get('avatar_field') if avatar_field and hasattr(self.scope['user'], avatar_field): avatar = getattr(self.scope['user'], avatar_field) if avatar: avatar_url = avatar.url # Redis keys for this object self.editor_key: str = f"editor:{self.room_group_name}" self.last_modified_key: str = f"last_modified:{self.room_group_name}" # Join room group await self.channel_layer.group_add( self.room_group_name, self.channel_name ) # Accept the WebSocket connection await self.accept() # Get-or-init the last_modified timestamp in a single round-trip. # Previously this took 3 ops (exists + set + get); GET + conditional SET # covers the same cases with at most 2 ops, and exactly 1 on the hot path # (the key is set after the first connect to this object). try: last_modified_bytes = self.redis_get(self.last_modified_key) if last_modified_bytes: last_modified: str = last_modified_bytes.decode('utf-8') else: last_modified = get_utc_timestamp() self.redis_set(self.last_modified_key, last_modified) except Exception as e: logger.exception(f"Redis error during connect: {e}") last_modified = get_utc_timestamp() # Notify the group about this user's presence await self.channel_layer.group_send( self.room_group_name, { 'type': 'user_joined', 'user_id': self.user_id, 'username': self.email, 'timestamp': self.get_timestamp(), 'last_modified': last_modified, 'avatar_url': avatar_url } )
[docs] @database_sync_to_async def get_last_modified(self) -> str: """ Retrieve the last modified timestamp for the current object. This is a placeholder that should be implemented based on your actual model. Default implementation returns current timestamp. Returns: str: Last modified timestamp in ISO format """ return self.get_timestamp()
[docs] async def disconnect(self, close_code: int) -> None: """ Handle WebSocket disconnection. - Removes user from the editor role if they were the active editor - Notifies other users about this user leaving - Leaves the channel group - Stores editor ID as previous editor to allow reclaiming on reconnection Args: close_code (int): WebSocket close code """ try: if hasattr(self, 'room_group_name'): # First, check if this user was the editor and capture that information is_editor = False try: editor_data_bytes = self.redis_get(self.editor_key) if editor_data_bytes: editor_info: Dict[str, Any] = json.loads(editor_data_bytes) if editor_info.get('editor_id') == self.user_id: # Clear editor for this room self.redis_delete(self.editor_key) is_editor = True # Store this editor ID as the previous editor to allow reclaiming on reconnection previous_editor_key = f"previous_editor:{self.room_group_name}" self.redis_setex( previous_editor_key, dt.timedelta(minutes=5), # 5-minute grace period for reconnection str(self.user_id) ) except Exception as e: logger.exception(f"Redis error during disconnect (checking editor): {e}") # Prepare message data before leaving the group message_data = { 'type': 'user_left', 'user_id': self.user_id, 'username': self.email, } # Send messages to group (before we leave the group) await self.channel_layer.group_send( self.room_group_name, message_data ) # Send lock_released message if we were the editor if is_editor: await self.channel_layer.group_send( self.room_group_name, { 'type': 'lock_released', 'user_id': self.user_id, 'username': self.email, } ) # Now leave the group - do this after sending messages but before potential exception handling await self.channel_layer.group_discard( self.room_group_name, self.channel_name ) except Exception as e: logger.exception(f"Error during disconnect: {e}")
[docs] async def receive(self, text_data: str) -> None: """ Process incoming WebSocket messages. Routes each message to the appropriate handler based on its type. Args: text_data (str): JSON string containing the message data """ try: data: Dict[str, Any] = json.loads(text_data) message_type: str = data.get('type') if message_type == 'request_editor_status': await self.handle_editor_status_request() elif message_type == 'claim_editor': await self.handle_claim_editor(data.get('timestamp')) elif message_type == 'heartbeat': await self.handle_heartbeat() elif message_type == 'content_updated': await self.handle_content_updated(data.get('timestamp')) elif message_type == 'release_lock': await self.handle_release_lock() elif message_type == 'request_attention': await self.handle_request_attention() except Exception as e: logger.exception(f"Error processing message: {e}")
[docs] async def handle_editor_status_request(self) -> None: """ Handle requests for the current editor status. Checks Redis for current editor information and sends it to the requester. If the current editor hasn't sent a heartbeat recently, clears the editor lock. """ # Get current editor status from Redis editor_id: Optional[int] = None editor_name: Optional[str] = None try: editor_data_bytes = self.redis_get(self.editor_key) if editor_data_bytes: editor_info: Dict[str, Any] = json.loads(editor_data_bytes) # Check if the editor's heartbeat is recent (within last 2 minutes) try: # Parse the ISO format timestamp into a datetime object with UTC timezone last_heartbeat: dt.datetime = dt.datetime.fromisoformat(editor_info['last_heartbeat']) current_time: dt.datetime = timezone.now().astimezone(dt.timezone.utc) # Using timedelta directly instead of timezone.now() to avoid DB connections if current_time - last_heartbeat > dt.timedelta(minutes=2): # Editor timed out self.redis_delete(self.editor_key) else: editor_id = editor_info['editor_id'] editor_name = editor_info['editor_name'] except (ValueError, TypeError): # Handle invalid timestamp format self.redis_delete(self.editor_key) except Exception as e: logger.exception(f"Redis error during editor status request: {e}") await self.send(text_data=json.dumps({ 'type': 'editor_status', 'editor_id': editor_id, 'editor_name': editor_name, }))
[docs] async def handle_claim_editor(self, timestamp: Optional[str]) -> None: """ Process a request to claim editor status for the current object. Only assigns editor status if no other user currently has it. Sets a 3-minute expiration on the editor lock to handle disconnections. Gives priority to the previous editor during reconnection for a short grace period. Args: timestamp (Optional[str]): Optional timestamp string in ISO format """ try: # Use a Redis transaction to protect against race conditions during editor claiming # Get current editor data and previous editor data editor_key = self.editor_key previous_editor_key = f"previous_editor:{self.room_group_name}" # Get editor and previous editor data editor_data_bytes = self.redis_get(editor_key) previous_editor_data_bytes = self.redis_get(previous_editor_key) previous_editor_id = None if previous_editor_data_bytes: try: previous_editor_id = int(previous_editor_data_bytes.decode('utf-8')) except (ValueError, TypeError): previous_editor_id = None # Check if this user is the previous editor is_previous_editor = previous_editor_id == self.user_id # Attempt to claim editor status based on conditions if editor_data_bytes: # There's already an active editor editor_info = json.loads(editor_data_bytes) current_editor_id = editor_info.get('editor_id') # Only allow reclaiming if this user was the previous editor and is not the current active editor if is_previous_editor and current_editor_id != self.user_id: # Record new editor session with reclaim editor_info = { 'editor_id': self.user_id, 'editor_name': self.email, 'last_heartbeat': self.get_timestamp(), 'reclaimed': True # Mark as a reclaimed session } # Update the editor with expiration self.redis_setex( editor_key, dt.timedelta(minutes=3), json.dumps(editor_info) ) # Clear previous editor key self.redis_delete(previous_editor_key) # Broadcast the new editor status await self.channel_layer.group_send( self.room_group_name, { 'type': 'editor_status', 'editor_id': self.user_id, 'editor_name': self.email, } ) logger.info(f"User {self.user_id} ({self.email}) reclaimed editor status") else: # No active editor editor_info = { 'editor_id': self.user_id, 'editor_name': self.email, 'last_heartbeat': self.get_timestamp() } # Give priority to previous editors during claim attempts # If this user was the previous editor, set the editor immediately # Otherwise, check if there's a grace period for a previous editor that might reconnect if is_previous_editor: # This was the previous editor - claim immediately self.redis_setex( editor_key, dt.timedelta(minutes=3), json.dumps(editor_info) ) # Clear previous editor key self.redis_delete(previous_editor_key) # Broadcast the new editor status await self.channel_layer.group_send( self.room_group_name, { 'type': 'editor_status', 'editor_id': self.user_id, 'editor_name': self.email, } ) logger.info(f"Previous editor {self.user_id} ({self.email}) claimed editor status") elif not previous_editor_id: # No previous editor or grace period expired - new user can claim self.redis_setex( editor_key, dt.timedelta(minutes=3), json.dumps(editor_info) ) # Broadcast the new editor status await self.channel_layer.group_send( self.room_group_name, { 'type': 'editor_status', 'editor_id': self.user_id, 'editor_name': self.email, } ) logger.info(f"New editor {self.user_id} ({self.email}) claimed editor status") except Exception as e: logger.exception(f"Redis error during claim editor: {e}")
[docs] async def handle_heartbeat(self) -> None: """ Process heartbeat messages from the active editor. Updates the last heartbeat timestamp and resets the expiration time for the editor lock in Redis. Only processes heartbeats from the current editor. Also stores the current editor ID as the previous editor with a longer expiration time to handle temporary disconnections. """ try: editor_data_bytes = self.redis_get(self.editor_key) if not editor_data_bytes: return editor_info: Dict[str, Any] = json.loads(editor_data_bytes) if editor_info.get('editor_id') != self.user_id: return editor_info['last_heartbeat'] = get_utc_timestamp() previous_editor_key = f"previous_editor:{self.room_group_name}" # Pipeline the two writes — they are independent and the heartbeat # fires every ~15s per active editor, so saving one Redis RTT here # adds up under many concurrent editors. pipeline = self.redis_client.pipeline() pipeline.setex(self.editor_key, dt.timedelta(minutes=3), json.dumps(editor_info)) pipeline.setex(previous_editor_key, dt.timedelta(minutes=10), str(self.user_id)) self.redis_pipeline_execute(pipeline) except Exception as e: logger.exception(f"Redis error during heartbeat: {e}")
[docs] async def handle_content_updated(self, timestamp: Optional[str]) -> None: """ Process content update notifications. Updates the last modified timestamp and notifies all connected clients that content has changed. Args: timestamp (Optional[str]): Optional timestamp string in ISO format """ try: # Use provided timestamp if valid, otherwise generate new one if timestamp: try: # Ensure timestamp is in the expected format dt.datetime.fromisoformat(timestamp) new_timestamp: str = timestamp except (ValueError, TypeError): new_timestamp = self.get_timestamp() else: new_timestamp = self.get_timestamp() # Update the last modified timestamp self.redis_set(self.last_modified_key, new_timestamp) # Notify all clients that content has been updated await self.channel_layer.group_send( self.room_group_name, { 'type': 'content_updated', 'user_id': self.user_id, 'username': self.email, 'timestamp': new_timestamp } ) except Exception as e: logger.exception(f"Redis error during content update: {e}")
[docs] async def handle_release_lock(self) -> None: """ Process a request to release the editor lock. Only allows the current editor to release their own lock. Updates the last modified timestamp and notifies all clients that the lock has been released. """ try: # Only allow the current editor to release the lock editor_data_bytes = self.redis_get(self.editor_key) if editor_data_bytes: editor_info: Dict[str, Any] = json.loads(editor_data_bytes) if editor_info.get('editor_id') == self.user_id: # Clear the editor self.redis_delete(self.editor_key) # Since this is a deliberate release, also clear previous editor record previous_editor_key = f"previous_editor:{self.room_group_name}" self.redis_delete(previous_editor_key) # Get the latest data timestamp latest_timestamp: str = await self.get_last_modified() self.redis_set(self.last_modified_key, latest_timestamp) # Notify all clients that the lock has been released await self.channel_layer.group_send( self.room_group_name, { 'type': 'lock_released', 'user_id': self.user_id, 'username': self.email, 'timestamp': latest_timestamp } ) except Exception as e: logger.exception(f"Redis error during lock release: {e}")
[docs] async def handle_request_attention(self) -> None: """ Process a request for the editor's attention from a viewer. Checks if the user can send a notification based on rate limiting, then forwards the request to the current editor. """ try: # Get current editor status from Redis editor_data_bytes = self.redis_get(self.editor_key) if not editor_data_bytes: # No editor to notify return editor_info: Dict[str, Any] = json.loads(editor_data_bytes) editor_id = editor_info.get('editor_id') if editor_id == self.user_id: # User is the editor, no need to notify self return # Rate limiting key specific to this user for this object rate_limit_key = f"attention_request:{self.room_group_name}:{self.user_id}" # Get the notification interval from settings (default 15 seconds) notification_interval = getattr( settings, 'ADMIN_COLLABORATOR_OPTIONS', {} ).get('notification_request_interval', 15) # Check if user has sent a request recently if self.redis_exists(rate_limit_key): # Too soon to send another request return # Set rate limiting key with expiration self.redis_setex( rate_limit_key, notification_interval, # Expires after the configured interval 1 ) # Notify the editor await self.channel_layer.group_send( self.room_group_name, { 'type': 'attention_requested', 'user_id': self.user_id, 'username': self.email, 'timestamp': self.get_timestamp() } ) except Exception as e: logger.exception(f"Redis error during attention request: {e}")
# Event handlers for channel layer messages
[docs] async def user_joined(self, event: Dict[str, Any]) -> None: """ Handle user_joined events from the channel layer. Args: event (Dict[str, Any]): Event data including user_id, username, and timestamp """ await self.send(text_data=json.dumps(event))
[docs] async def user_left(self, event: Dict[str, Any]) -> None: """ Handle user_left events from the channel layer. Args: event (Dict[str, Any]): Event data including user_id and username """ await self.send(text_data=json.dumps(event))
[docs] async def editor_status(self, event: Dict[str, Any]) -> None: """ Handle editor_status events from the channel layer. Args: event (Dict[str, Any]): Event data including editor_id and editor_name """ await self.send(text_data=json.dumps(event))
[docs] async def content_updated(self, event: Dict[str, Any]) -> None: """ Handle content_updated events from the channel layer. Args: event (Dict[str, Any]): Event data including user_id, username, and timestamp """ await self.send(text_data=json.dumps(event))
[docs] async def lock_released(self, event: Dict[str, Any]) -> None: """ Handle lock_released events from the channel layer. Args: event (Dict[str, Any]): Event data including user_id, username, and timestamp """ await self.send(text_data=json.dumps(event))
[docs] async def attention_requested(self, event: Dict[str, Any]) -> None: """ Handle attention_requested events from the channel layer. Args: event (Dict[str, Any]): Event data including user_id, username, and timestamp """ await self.send(text_data=json.dumps(event))
[docs]class ChatConsumer(RedisClientMixin, AsyncWebsocketConsumer): """ WebSocket consumer for real-time chat between users on the same page. Manages presence tracking and message routing between users viewing the same page in the Django admin. Redis client, connection pool, auth check, and timestamp helpers live on ``RedisClientMixin``. """
[docs] async def connect(self) -> None: """ Handle WebSocket connection establishment. - Extracts current page path from URL - Authenticates the user - Sets up channel group for users on this page - Adds user to the presence list - Notifies other users about this user's presence """ # Get parameters from the URL self.page_path = self.scope['url_route']['kwargs']['page_path'] # Perform authentication check is_authorized: bool = await self.check_user_authorization() if not is_authorized: await self.close() return # Create a unique channel group name for this page self.room_group_name: str = f"chat_{self.page_path}" self.user_id: int = self.scope['user'].id self.username: str = self.scope['user'].username self.email: str = self.scope['user'].email # Get avatar URL if configured self.avatar_url = None avatar_field = getattr(settings, 'ADMIN_COLLABORATOR_OPTIONS', {}).get('avatar_field') if avatar_field and hasattr(self.scope['user'], avatar_field): avatar = getattr(self.scope['user'], avatar_field) if avatar: self.avatar_url = avatar.url # Join room group await self.channel_layer.group_add( self.room_group_name, self.channel_name ) # Accept the WebSocket connection await self.accept() try: # Save active users in Redis with expiration active_users_key = f"chat_active_users:{self.room_group_name}" user_data = json.dumps({ 'user_id': self.user_id, 'username': self.username, 'email': self.email, 'avatar_url': self.avatar_url, 'last_seen': self.get_timestamp() }) self.redis_hset(active_users_key, self.user_id, user_data) # Get all active users active_users = self.redis_hgetall(active_users_key) users_list = [] for user_id, user_data in active_users.items(): if user_id != str(self.user_id).encode(): # Exclude self users_list.append(json.loads(user_data)) # Send active users list to the new user await self.send(text_data=json.dumps({ 'type': 'active_users', 'users': users_list })) except Exception as e: logger.exception(f"Redis error during chat connect: {e}") users_list = [] # Fallback to empty list if Redis operation fails # Notify others about this user's presence await self.channel_layer.group_send( self.room_group_name, { 'type': 'user_joined', 'user_id': self.user_id, 'username': self.username, 'email': self.email, 'avatar_url': self.avatar_url, 'timestamp': self.get_timestamp() } )
[docs] async def disconnect(self, close_code: int) -> None: """ Handle WebSocket disconnection. - Removes user from active users list - Notifies other users about this user leaving - Leaves the channel group """ try: if hasattr(self, 'room_group_name'): try: # Remove user from active users in Redis active_users_key = f"chat_active_users:{self.room_group_name}" self.redis_hdel(active_users_key, self.user_id) except Exception as e: logger.exception(f"Redis error during chat disconnect: {e}") # Notify the group that the user has left (before leaving the group) await self.channel_layer.group_send( self.room_group_name, { 'type': 'user_left', 'user_id': self.user_id, 'username': self.username, 'email': self.email } ) # Leave room group (after sending messages) await self.channel_layer.group_discard( self.room_group_name, self.channel_name ) except Exception as e: logger.exception(f"Error during disconnect: {e}")
[docs] async def receive(self, text_data: str) -> None: """ Process incoming WebSocket messages. Routes each message to the appropriate handler based on its type. Args: text_data (str): JSON string containing the message data """ try: data: Dict[str, Any] = json.loads(text_data) message_type: str = data.get('type') if message_type == 'chat_message': await self.handle_chat_message( data.get('recipient_id'), data.get('message') ) elif message_type == 'heartbeat': await self.handle_heartbeat() except Exception as e: logger.exception(f"Error processing message: {e}")
[docs] async def handle_chat_message(self, recipient_id: int, message: str) -> None: """ Process and route chat messages between users. Args: recipient_id (int): ID of the user receiving the message message (str): Content of the chat message """ if not message or not recipient_id: return timestamp = self.get_timestamp() # Send to recipient and sender (so both see the message) await self.channel_layer.group_send( self.room_group_name, { 'type': 'chat_message', 'sender_id': self.user_id, 'sender_name': self.username, 'sender_email': self.email, 'sender_avatar': self.avatar_url, 'recipient_id': recipient_id, 'message': message, 'timestamp': timestamp } )
[docs] async def handle_heartbeat(self) -> None: """ Process heartbeat messages to keep track of active users. Updates the user's last_seen timestamp in Redis """ try: # Update user's last seen timestamp active_users_key = f"chat_active_users:{self.room_group_name}" user_data = json.dumps({ 'user_id': self.user_id, 'username': self.username, 'email': self.email, 'avatar_url': self.avatar_url, 'last_seen': self.get_timestamp() }) self.redis_hset(active_users_key, self.user_id, user_data) except Exception as e: logger.exception(f"Redis error during chat heartbeat: {e}")
[docs] async def user_joined(self, event: Dict[str, Any]) -> None: """ Handle user_joined events and relay to the WebSocket. Args: event (Dict[str, Any]): Event data containing user information """ # Forward the event to the WebSocket await self.send(text_data=json.dumps(event))
[docs] async def user_left(self, event: Dict[str, Any]) -> None: """ Handle user_left events and relay to the WebSocket. Args: event (Dict[str, Any]): Event data containing user information """ # Forward the event to the WebSocket await self.send(text_data=json.dumps(event))
[docs] async def chat_message(self, event: Dict[str, Any]) -> None: """ Handle chat_message events and relay to the WebSocket. Args: event (Dict[str, Any]): Event data containing message information """ # Only send the message to the sender and recipient if self.user_id == event['sender_id'] or self.user_id == event['recipient_id']: await self.send(text_data=json.dumps(event))