Files
Aether/backend/app/chats/service.py
T
2026-01-25 11:30:30 +03:00

218 lines
8.6 KiB
Python

import json
import uuid
from typing import List, Dict
import logging
from fastapi import HTTPException, status, WebSocket
from sqlalchemy import and_
from app.core.database import async_session_maker
from app.chats.dao import ChatDAO, MessageDAO, ParticipantDAO
from app.chats.models import ChatModel, MessageModel, ParticipantModel
from app.chats.schemas import Chat, MessageCreate, MessageCreateDB, ChatCreateDB, ParticipantCreateDB, Message, MessageUpdateDB, MessageUpdate
from app.users.models import UserModel
from app.core.redis import get_redis
log = logging.getLogger(__name__)
class ChatService:
active_connections: Dict[str, WebSocket] = {}
@classmethod
async def get_chats(cls, user: UserModel, offset: int, limit: int) -> List[Chat]:
log.debug("Getting chats", extra={"user_id": user.id, "offset": offset, "limit": limit})
async with async_session_maker() as session:
chats = await ChatDAO.get_chats(session, user.id, offset, limit)
log.debug("Retrieved chats", extra={"user_id": user.id, "count": len(chats)})
return chats
@classmethod
async def send_message(cls, sender: UserModel, message: MessageCreate) -> Message:
log.info("Sending message", extra={"sender_id": sender.id, "chat_id": message.chat_id, "recipient_id": message.recipient_id})
async with async_session_maker() as session:
target_chat_id = message.chat_id
if target_chat_id is None:
if message.recipient_id is None:
log.warning("Message send failed: missing chat_id and recipient_id", extra={"sender_id": sender.id})
raise HTTPException(status.HTTP_400_BAD_REQUEST, "Need chat_id or user_id")
target_chat_id = await ChatDAO.get_chat_id(session, sender.id, message.recipient_id)
if target_chat_id is None:
log.info("Creating new chat", extra={"sender_id": sender.id, "recipient_id": message.recipient_id})
target_chat_db = await ChatDAO.add(
session,
obj_in=ChatCreateDB(
is_group=False,
last_message=message.content
)
)
target_chat_id: uuid.UUID = target_chat_db.id
await ParticipantDAO.add(
session,
obj_in=ParticipantCreateDB(
user_id=sender.id,
chat_id=target_chat_id
)
)
await ParticipantDAO.add(
session,
obj_in=ParticipantCreateDB(
user_id=message.recipient_id,
chat_id=target_chat_id
)
)
log.info("Created new chat", extra={"chat_id": target_chat_id, "sender_id": sender.id, "recipient_id": message.recipient_id})
members = await ParticipantDAO.find_all(
session,
None,
None,
ParticipantModel.chat_id==target_chat_id
)
members_ids = [member.user_id for member in members]
if not sender.id in members_ids :
log.warning("Access denied to chat", extra={"user_id": sender.id, "chat_id": message.chat_id})
raise HTTPException(status.HTTP_403_FORBIDDEN, detail="Access denied")
message_db = await MessageDAO.add(
session,
obj_in=MessageCreateDB(
sender_id=sender.id,
chat_id=target_chat_id,
content=message.content
)
)
await cls._send_ws_message(members_ids, Message.model_validate(message_db))
await ChatDAO.update(
session,
ChatModel.id==target_chat_id,
obj_in={"last_message": message.content}
)
await session.commit()
log.info("Message sent", extra={"message_id": message_db.id, "sender_id": sender.id, "chat_id": target_chat_id})
return message_db
@classmethod
async def get_chat(cls, chat_id: uuid.UUID, user: UserModel, offset: int = 0, limit: int = 0) -> List[Message]:
log.debug("Getting chat messages", extra={"user_id": user.id, "chat_id": chat_id, "offset": offset, "limit": limit})
async with async_session_maker() as session:
chat_exist = await ChatDAO.get_chat_with_participant(session, chat_id, user.id)
if chat_exist is None:
log.warning("Chat not found", extra={"user_id": user.id, "chat_id": chat_id})
raise HTTPException(status.HTTP_404_NOT_FOUND, "Chat not found")
if chat_exist.participant_id is None:
log.warning("Access denied to chat", extra={"user_id": user.id, "chat_id": chat_id})
raise HTTPException(status.HTTP_403_FORBIDDEN, detail="Access denied")
messages = await MessageDAO.find_all_desc(
session,
offset,
limit,
MessageModel.chat_id==chat_id
)
log.debug("Retrieved chat messages", extra={"user_id": user.id, "chat_id": chat_id, "count": len(messages)})
return messages
@classmethod
async def save_websocket(cls, user: UserModel, ws: WebSocket):
cls.active_connections[str(user.id)] = ws
log.info("WebSocket connection saved", extra={"user_id": user.id, "active_connections": len(cls.active_connections) + 1})
@classmethod
async def delete_websocket(cls, user: UserModel):
cls.active_connections.pop(str(user.id))
log.info("WebSocket connection deleted", extra={"user_id": user.id, "active_connections": len(cls.active_connections) - 1})
@classmethod
async def message_listener(cls):
redis_client = await get_redis()
pubsub = redis_client.pubsub()
await pubsub.subscribe("messenger_updates")
async for message in pubsub.listen():
log.debug(f"Received message from Redis: {message}")
if message["type"] == "message":
payload = json.loads(message["data"])
user_id = payload["user_id"]
if user_id in cls.active_connections:
ws = cls.active_connections[user_id]
await ws.send_json(payload["message"])
log.info(f"Message sent to user {user_id} via WebSocket")
else:
log.debug(f"User {user_id} not connected")
@classmethod
async def _send_ws_message(cls, user_ids: List[int], message: Message):
redis_client = await get_redis()
for user_id in user_ids:
payload = {
"user_id": str(user_id),
"message": message.model_dump(mode='json')
}
await redis_client.publish("messenger_updates", json.dumps(payload))
log.debug(f"Published message for user_id: {user_id}")
@classmethod
async def update_message(cls, user: UserModel, message_update: MessageUpdate) -> Message:
async with async_session_maker() as session:
message_exist = await MessageDAO.find_one_or_none(
session,
and_(
MessageModel.id==message_update.id,
MessageModel.sender_id==user.id
)
)
if message_exist is None:
log.warning("Message not found", extra={"user_id": user.id, "message_id": message_update.id})
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Message not found")
message_update_db = await MessageDAO.update(
session,
MessageModel.id==message_update.id,
obj_in=MessageUpdateDB(
content=message_update.content,
is_edited=True
)
)
members = await ParticipantDAO.find_all(
session,
None,
None,
ParticipantModel.chat_id==message_exist.chat_id
)
member_ids = [member.user_id for member in members]
await cls._send_ws_message(member_ids, Message.model_validate(message_update_db))
await session.commit()
log.info("Message update successfully", extra={"user_id": user.id, "message_id": message_update.id})
return message_update_db