From 2711270d9be0396b22e299e002d16f072a3c5afe Mon Sep 17 00:00:00 2001 From: lorsan Date: Sun, 25 Jan 2026 10:29:17 +0300 Subject: [PATCH] Edit message listener --- .github/workflows/deploy.yml | 33 +++++++++ README.md | 2 +- backend/app/auth/service.py | 2 +- backend/app/chats/service.py | 35 ++------- backend/app/main.py | 4 +- backend/app/services/messenger_service.py | 73 +++++++++++++++++++ .../{redis_service.py => storage_service.py} | 2 + backend/app/users/service.py | 2 +- backend/app/utils/OAuth2WithCookie.py | 2 - backend/app/utils/connect_manager.py | 28 +++++++ 10 files changed, 147 insertions(+), 36 deletions(-) create mode 100644 backend/app/services/messenger_service.py rename backend/app/services/{redis_service.py => storage_service.py} (97%) create mode 100644 backend/app/utils/connect_manager.py diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index e69de29..13cca30 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -0,0 +1,33 @@ +name: Deploy to Server + +on: + push: + branches: + - main + +jobs: + deploy: + runs-on: self-hosted + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Create .env file + run: | + echo "${{ secrets.ENV_FILE }}" > .env + + - name: Build + run: | + echo "Building to server" + docker compose build + + - name: Deploying + run: | + echo "Deploying to server" + docker compose up -d + + - name: Cleanup old images + run: | + docker image prune -f + diff --git a/README.md b/README.md index f2b3892..9140469 100755 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ # 🌌 Aether -Aether logo +Aether logo **Современная full-stack платформа для чатов с мощным backend и элегантным frontend** diff --git a/backend/app/auth/service.py b/backend/app/auth/service.py index 196eea7..7b8e687 100644 --- a/backend/app/auth/service.py +++ b/backend/app/auth/service.py @@ -7,7 +7,7 @@ from jose import jwt from sqlalchemy import or_ from app.utils.hash_password import verify_password -from app.services.redis_service import RefreshTokenStorage +from app.services.storage_service import RefreshTokenStorage from app.core.exceptions import InvalidTokenException from app.users.models import UserModel from app.users.dao import UserDAO diff --git a/backend/app/chats/service.py b/backend/app/chats/service.py index bfe1a65..9ed0532 100644 --- a/backend/app/chats/service.py +++ b/backend/app/chats/service.py @@ -12,12 +12,12 @@ from app.chats.models import ChatModel, MessageModel, ParticipantModel from app.chats.schemas import Chat, MessageCreate, MessageCreateDB, ChatCreateDB, ParticipantCreateDB, Message, MessageUpdateDB, MessageUpdate from app.users.models import UserModel from app.core.redis import get_redis +from app.utils.connect_manager import manager log = logging.getLogger(__name__) class ChatService: - active_connections: Dict[str, WebSocket] = {} @classmethod async def get_chats(cls, user: UserModel, offset: int, limit: int) -> List[Chat]: @@ -130,38 +130,15 @@ class ChatService: @classmethod async def save_websocket(cls, user: UserModel, ws: WebSocket): - cls.active_connections[str(user.id)] = ws - log.info("WebSocket connection saved", extra={"user_id": user.id, "active_connections": len(cls.active_connections) + 1}) + manager.add_connection(user_id=str(user.id), ws=ws) + log.info("WebSocket connection saved", extra={"user_id": user.id, "active_connections": manager.count_connections + 1}) @classmethod async def delete_websocket(cls, user: UserModel): - cls.active_connections.pop(str(user.id)) - log.info("WebSocket connection deleted", extra={"user_id": user.id, "active_connections": len(cls.active_connections) - 1}) - - - @classmethod - async def message_listener(cls): - redis_client = await get_redis() - - pubsub = redis_client.pubsub() - - await pubsub.subscribe("messenger_updates") - - async for message in pubsub.listen(): - log.debug(f"Received message from Redis: {message}") - if message["type"] == "message": - payload = json.loads(message["data"]) - user_id = payload["user_id"] - - if user_id in cls.active_connections: - ws = cls.active_connections[user_id] - await ws.send_json(payload["message"]) - log.info(f"Message sent to user {user_id} via WebSocket") - else: - log.debug(f"User {user_id} not connected") - + manager.delete_connection(user_id=str(user.id)) + log.info("WebSocket connection deleted", extra={"user_id": user.id, "active_connections": manager.count_connections - 1}) @classmethod @@ -170,7 +147,7 @@ class ChatService: for user_id in user_ids: payload = { "user_id": str(user_id), - "message": message.model_dump(mode='json') + "data": message.model_dump(mode='json') } await redis_client.publish("messenger_updates", json.dumps(payload)) log.debug(f"Published message for user_id: {user_id}") diff --git a/backend/app/main.py b/backend/app/main.py index c1c52b7..8d0ea00 100755 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -12,9 +12,9 @@ from app.core.redis import close_redis, init_redis from app.users.router import router as user_router from app.auth.router import router as auth_router from app.chats.router import router as chat_router -from app.chats.service import ChatService from app.core.log_config import set_logging from app.core.config import settings +from app.services.messenger_service import PubSubMessenger set_logging() log = logging.getLogger(__name__) @@ -24,7 +24,7 @@ log = logging.getLogger(__name__) async def lifespan(app: FastAPI): await init_redis() log.info("Redis connected") - task_send_message = asyncio.create_task(ChatService.message_listener()) + task_send_message = asyncio.create_task(PubSubMessenger.subscribe_to_channels()) log.info("Message sender started") yield await close_redis() diff --git a/backend/app/services/messenger_service.py b/backend/app/services/messenger_service.py new file mode 100644 index 0000000..b595266 --- /dev/null +++ b/backend/app/services/messenger_service.py @@ -0,0 +1,73 @@ +import json +import logging + +from fastapi import WebSocket + +from app.core.redis import get_redis +from app.utils.connect_manager import manager + +log = logging.getLogger(__name__) + + +class PubSubMessenger: + @classmethod + def get_handlers(cls) -> dict: + return dict( + messenger_updates=cls.handle_message + ) + + @classmethod + async def subscribe_to_channels(cls): + print("test") + redis_client = await get_redis() + pubsub = redis_client.pubsub() + + await pubsub.subscribe(*cls.get_handlers().keys()) + + async for message in pubsub.listen(): + log.debug(f"Received message from Redis: {message}") + if message["type"] != "message": + continue + + payload = json.loads(message["data"]) + user_id = payload["user_id"] + + ws = manager.get_connection(str(user_id)) + + if ws is None: + log.debug(f"User {user_id} not connected") + continue + + handler = cls.get_handlers().get(message['channel']) + if handler: + await handler(payload["data"], ws) + + + @classmethod + async def handle_message(cls, message, ws: WebSocket): + await ws.send_json(message) + log.info(f"Message sent to user via WebSocket") + + + +# async def message_listener(): +# redis_client = await get_redis() +# +# pubsub = redis_client.pubsub() +# +# await pubsub.subscribe("messenger_updates") +# +# async for message in pubsub.listen(): +# log.debug(f"Received message from Redis: {message}") +# if message["type"] == "message": +# payload = json.loads(message["data"]) +# user_id = payload["user_id"] +# +# ws = manager.get_connection(str(user_id)) +# +# if ws is None: +# log.debug(f"User {user_id} not connected") +# continue +# +# await ws.send_json(payload["message"]) +# log.info(f"Message sent to user {user_id} via WebSocket") \ No newline at end of file diff --git a/backend/app/services/redis_service.py b/backend/app/services/storage_service.py similarity index 97% rename from backend/app/services/redis_service.py rename to backend/app/services/storage_service.py index 0220e50..ae1d194 100644 --- a/backend/app/services/redis_service.py +++ b/backend/app/services/storage_service.py @@ -1,8 +1,10 @@ import logging import uuid from typing import Optional +import json from app.core.redis import get_redis +from app.utils.connect_manager import manager log = logging.getLogger(__name__) diff --git a/backend/app/users/service.py b/backend/app/users/service.py index 13848f6..f0b061a 100644 --- a/backend/app/users/service.py +++ b/backend/app/users/service.py @@ -7,7 +7,7 @@ from fastapi import HTTPException, status, UploadFile from sqlalchemy import or_ from app.utils.hash_password import hash_password, verify_password -from app.services.redis_service import EmailTokenStorage, ChangePasswordTokenStorage +from app.services.storage_service import EmailTokenStorage, ChangePasswordTokenStorage from app.utils.S3_client import s3_client from app.core.exceptions import InvalidTokenException, TokenExpiredException, UserNotFoundException from app.users.models import UserModel diff --git a/backend/app/utils/OAuth2WithCookie.py b/backend/app/utils/OAuth2WithCookie.py index 40f9764..0a6c5e3 100644 --- a/backend/app/utils/OAuth2WithCookie.py +++ b/backend/app/utils/OAuth2WithCookie.py @@ -32,8 +32,6 @@ class OAuth2PasswordBearerWithCookie(OAuth2): raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="No connection found") authorization: str = connection.cookies.get("access_token") - print(authorization) - scheme, param = get_authorization_scheme_param(authorization) if not authorization or scheme.lower() != "bearer": if self.auto_error: diff --git a/backend/app/utils/connect_manager.py b/backend/app/utils/connect_manager.py new file mode 100644 index 0000000..5dba3f8 --- /dev/null +++ b/backend/app/utils/connect_manager.py @@ -0,0 +1,28 @@ +from typing import Dict, Optional + +from fastapi import WebSocket + + +class ConnectManager: + def __init__(self): + self.active_connections: Dict[str, WebSocket] = {} + + + def get_connection(self, user_id: str) -> Optional[WebSocket]: + return self.active_connections.get(user_id) + + + def add_connection(self, user_id: str, ws: WebSocket): + self.active_connections[user_id] = ws + + + def delete_connection(self, user_id: str): + self.active_connections.pop(user_id) + + @property + def count_connections(self) -> int: + return len(self.active_connections) + + + +manager = ConnectManager() \ No newline at end of file