mirror of
https://github.com/lorsanstand/Aether.git
synced 2026-06-19 12:05:16 +03:00
Edit message listener
This commit is contained in:
@@ -0,0 +1,33 @@
|
||||
name: Deploy to Server
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- main
|
||||
|
||||
jobs:
|
||||
deploy:
|
||||
runs-on: self-hosted
|
||||
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Create .env file
|
||||
run: |
|
||||
echo "${{ secrets.ENV_FILE }}" > .env
|
||||
|
||||
- name: Build
|
||||
run: |
|
||||
echo "Building to server"
|
||||
docker compose build
|
||||
|
||||
- name: Deploying
|
||||
run: |
|
||||
echo "Deploying to server"
|
||||
docker compose up -d
|
||||
|
||||
- name: Cleanup old images
|
||||
run: |
|
||||
docker image prune -f
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
# 🌌 Aether
|
||||
|
||||
<img src="assets/logo.png" alt="Aether logo" width="150" style="border-radius: 15px;">
|
||||
<img src="assets/mini-logo.png" alt="Aether logo" width="150" style="border-radius: 15px;">
|
||||
|
||||
**Современная full-stack платформа для чатов с мощным backend и элегантным frontend**
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@ from jose import jwt
|
||||
from sqlalchemy import or_
|
||||
|
||||
from app.utils.hash_password import verify_password
|
||||
from app.services.redis_service import RefreshTokenStorage
|
||||
from app.services.storage_service import RefreshTokenStorage
|
||||
from app.core.exceptions import InvalidTokenException
|
||||
from app.users.models import UserModel
|
||||
from app.users.dao import UserDAO
|
||||
|
||||
@@ -12,12 +12,12 @@ from app.chats.models import ChatModel, MessageModel, ParticipantModel
|
||||
from app.chats.schemas import Chat, MessageCreate, MessageCreateDB, ChatCreateDB, ParticipantCreateDB, Message, MessageUpdateDB, MessageUpdate
|
||||
from app.users.models import UserModel
|
||||
from app.core.redis import get_redis
|
||||
from app.utils.connect_manager import manager
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ChatService:
|
||||
active_connections: Dict[str, WebSocket] = {}
|
||||
|
||||
@classmethod
|
||||
async def get_chats(cls, user: UserModel, offset: int, limit: int) -> List[Chat]:
|
||||
@@ -130,38 +130,15 @@ class ChatService:
|
||||
|
||||
@classmethod
|
||||
async def save_websocket(cls, user: UserModel, ws: WebSocket):
|
||||
cls.active_connections[str(user.id)] = ws
|
||||
log.info("WebSocket connection saved", extra={"user_id": user.id, "active_connections": len(cls.active_connections) + 1})
|
||||
manager.add_connection(user_id=str(user.id), ws=ws)
|
||||
log.info("WebSocket connection saved", extra={"user_id": user.id, "active_connections": manager.count_connections + 1})
|
||||
|
||||
|
||||
|
||||
@classmethod
|
||||
async def delete_websocket(cls, user: UserModel):
|
||||
cls.active_connections.pop(str(user.id))
|
||||
log.info("WebSocket connection deleted", extra={"user_id": user.id, "active_connections": len(cls.active_connections) - 1})
|
||||
|
||||
|
||||
@classmethod
|
||||
async def message_listener(cls):
|
||||
redis_client = await get_redis()
|
||||
|
||||
pubsub = redis_client.pubsub()
|
||||
|
||||
await pubsub.subscribe("messenger_updates")
|
||||
|
||||
async for message in pubsub.listen():
|
||||
log.debug(f"Received message from Redis: {message}")
|
||||
if message["type"] == "message":
|
||||
payload = json.loads(message["data"])
|
||||
user_id = payload["user_id"]
|
||||
|
||||
if user_id in cls.active_connections:
|
||||
ws = cls.active_connections[user_id]
|
||||
await ws.send_json(payload["message"])
|
||||
log.info(f"Message sent to user {user_id} via WebSocket")
|
||||
else:
|
||||
log.debug(f"User {user_id} not connected")
|
||||
|
||||
manager.delete_connection(user_id=str(user.id))
|
||||
log.info("WebSocket connection deleted", extra={"user_id": user.id, "active_connections": manager.count_connections - 1})
|
||||
|
||||
|
||||
@classmethod
|
||||
@@ -170,7 +147,7 @@ class ChatService:
|
||||
for user_id in user_ids:
|
||||
payload = {
|
||||
"user_id": str(user_id),
|
||||
"message": message.model_dump(mode='json')
|
||||
"data": message.model_dump(mode='json')
|
||||
}
|
||||
await redis_client.publish("messenger_updates", json.dumps(payload))
|
||||
log.debug(f"Published message for user_id: {user_id}")
|
||||
|
||||
+2
-2
@@ -12,9 +12,9 @@ from app.core.redis import close_redis, init_redis
|
||||
from app.users.router import router as user_router
|
||||
from app.auth.router import router as auth_router
|
||||
from app.chats.router import router as chat_router
|
||||
from app.chats.service import ChatService
|
||||
from app.core.log_config import set_logging
|
||||
from app.core.config import settings
|
||||
from app.services.messenger_service import PubSubMessenger
|
||||
|
||||
set_logging()
|
||||
log = logging.getLogger(__name__)
|
||||
@@ -24,7 +24,7 @@ log = logging.getLogger(__name__)
|
||||
async def lifespan(app: FastAPI):
|
||||
await init_redis()
|
||||
log.info("Redis connected")
|
||||
task_send_message = asyncio.create_task(ChatService.message_listener())
|
||||
task_send_message = asyncio.create_task(PubSubMessenger.subscribe_to_channels())
|
||||
log.info("Message sender started")
|
||||
yield
|
||||
await close_redis()
|
||||
|
||||
@@ -0,0 +1,73 @@
|
||||
import json
|
||||
import logging
|
||||
|
||||
from fastapi import WebSocket
|
||||
|
||||
from app.core.redis import get_redis
|
||||
from app.utils.connect_manager import manager
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PubSubMessenger:
|
||||
@classmethod
|
||||
def get_handlers(cls) -> dict:
|
||||
return dict(
|
||||
messenger_updates=cls.handle_message
|
||||
)
|
||||
|
||||
@classmethod
|
||||
async def subscribe_to_channels(cls):
|
||||
print("test")
|
||||
redis_client = await get_redis()
|
||||
pubsub = redis_client.pubsub()
|
||||
|
||||
await pubsub.subscribe(*cls.get_handlers().keys())
|
||||
|
||||
async for message in pubsub.listen():
|
||||
log.debug(f"Received message from Redis: {message}")
|
||||
if message["type"] != "message":
|
||||
continue
|
||||
|
||||
payload = json.loads(message["data"])
|
||||
user_id = payload["user_id"]
|
||||
|
||||
ws = manager.get_connection(str(user_id))
|
||||
|
||||
if ws is None:
|
||||
log.debug(f"User {user_id} not connected")
|
||||
continue
|
||||
|
||||
handler = cls.get_handlers().get(message['channel'])
|
||||
if handler:
|
||||
await handler(payload["data"], ws)
|
||||
|
||||
|
||||
@classmethod
|
||||
async def handle_message(cls, message, ws: WebSocket):
|
||||
await ws.send_json(message)
|
||||
log.info(f"Message sent to user via WebSocket")
|
||||
|
||||
|
||||
|
||||
# async def message_listener():
|
||||
# redis_client = await get_redis()
|
||||
#
|
||||
# pubsub = redis_client.pubsub()
|
||||
#
|
||||
# await pubsub.subscribe("messenger_updates")
|
||||
#
|
||||
# async for message in pubsub.listen():
|
||||
# log.debug(f"Received message from Redis: {message}")
|
||||
# if message["type"] == "message":
|
||||
# payload = json.loads(message["data"])
|
||||
# user_id = payload["user_id"]
|
||||
#
|
||||
# ws = manager.get_connection(str(user_id))
|
||||
#
|
||||
# if ws is None:
|
||||
# log.debug(f"User {user_id} not connected")
|
||||
# continue
|
||||
#
|
||||
# await ws.send_json(payload["message"])
|
||||
# log.info(f"Message sent to user {user_id} via WebSocket")
|
||||
@@ -1,8 +1,10 @@
|
||||
import logging
|
||||
import uuid
|
||||
from typing import Optional
|
||||
import json
|
||||
|
||||
from app.core.redis import get_redis
|
||||
from app.utils.connect_manager import manager
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@@ -7,7 +7,7 @@ from fastapi import HTTPException, status, UploadFile
|
||||
from sqlalchemy import or_
|
||||
|
||||
from app.utils.hash_password import hash_password, verify_password
|
||||
from app.services.redis_service import EmailTokenStorage, ChangePasswordTokenStorage
|
||||
from app.services.storage_service import EmailTokenStorage, ChangePasswordTokenStorage
|
||||
from app.utils.S3_client import s3_client
|
||||
from app.core.exceptions import InvalidTokenException, TokenExpiredException, UserNotFoundException
|
||||
from app.users.models import UserModel
|
||||
|
||||
@@ -32,8 +32,6 @@ class OAuth2PasswordBearerWithCookie(OAuth2):
|
||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="No connection found")
|
||||
|
||||
authorization: str = connection.cookies.get("access_token")
|
||||
print(authorization)
|
||||
|
||||
scheme, param = get_authorization_scheme_param(authorization)
|
||||
if not authorization or scheme.lower() != "bearer":
|
||||
if self.auto_error:
|
||||
|
||||
@@ -0,0 +1,28 @@
|
||||
from typing import Dict, Optional
|
||||
|
||||
from fastapi import WebSocket
|
||||
|
||||
|
||||
class ConnectManager:
|
||||
def __init__(self):
|
||||
self.active_connections: Dict[str, WebSocket] = {}
|
||||
|
||||
|
||||
def get_connection(self, user_id: str) -> Optional[WebSocket]:
|
||||
return self.active_connections.get(user_id)
|
||||
|
||||
|
||||
def add_connection(self, user_id: str, ws: WebSocket):
|
||||
self.active_connections[user_id] = ws
|
||||
|
||||
|
||||
def delete_connection(self, user_id: str):
|
||||
self.active_connections.pop(user_id)
|
||||
|
||||
@property
|
||||
def count_connections(self) -> int:
|
||||
return len(self.active_connections)
|
||||
|
||||
|
||||
|
||||
manager = ConnectManager()
|
||||
Reference in New Issue
Block a user