Files
gogo2/COBY/api/simple_websocket_server.py
Dobromir Popov 622d059aae 18: tests, fixes
2025-08-05 14:11:49 +03:00

53 lines
1.6 KiB
Python

"""
Simple WebSocket server for COBY system.
"""
import asyncio
import json
import logging
from typing import Set, Dict, Any
logger = logging.getLogger(__name__)
class WebSocketServer:
"""Simple WebSocket server implementation"""
def __init__(self, host: str = "0.0.0.0", port: int = 8081):
self.host = host
self.port = port
self.connections: Set = set()
self.running = False
async def start(self):
"""Start the WebSocket server"""
try:
logger.info(f"Starting WebSocket server on {self.host}:{self.port}")
self.running = True
# Simple implementation - just keep running
while self.running:
await asyncio.sleep(1)
except Exception as e:
logger.error(f"WebSocket server error: {e}")
async def stop(self):
"""Stop the WebSocket server"""
logger.info("Stopping WebSocket server")
self.running = False
async def broadcast(self, message: Dict[str, Any]):
"""Broadcast message to all connections"""
if self.connections:
logger.debug(f"Broadcasting to {len(self.connections)} connections")
def add_connection(self, websocket):
"""Add a WebSocket connection"""
self.connections.add(websocket)
logger.info(f"WebSocket connection added. Total: {len(self.connections)}")
def remove_connection(self, websocket):
"""Remove a WebSocket connection"""
self.connections.discard(websocket)
logger.info(f"WebSocket connection removed. Total: {len(self.connections)}")