Files
gogo2/COBY/main.py
Dobromir Popov dc326acf85 launch configs
2025-08-05 17:36:58 +03:00

231 lines
8.2 KiB
Python

#!/usr/bin/env python3
"""
COBY Multi-Exchange Data Aggregation System
Main application entry point for Docker deployment
"""
import asyncio
import signal
import sys
import os
import argparse
from typing import Optional
# Add the current directory to Python path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
try:
from .utils.logging import get_logger, setup_logging
from .simple_config import Config
except ImportError:
from utils.logging import get_logger, setup_logging
from simple_config import Config
try:
# Try relative imports first (when run as module)
from .monitoring.metrics_collector import metrics_collector
from .monitoring.performance_monitor import get_performance_monitor
from .monitoring.memory_monitor import memory_monitor
from .api.rest_api import create_app
from .api.simple_websocket_server import WebSocketServer
except ImportError:
# Fall back to absolute imports (when run directly)
from monitoring.metrics_collector import metrics_collector
from monitoring.performance_monitor import get_performance_monitor
from monitoring.memory_monitor import memory_monitor
from api.rest_api import create_app
from api.simple_websocket_server import WebSocketServer
logger = get_logger(__name__)
class COBYApplication:
"""Main COBY application orchestrator"""
def __init__(self, config: Config):
self.config = config
self.running = False
self.tasks = []
self.websocket_server: Optional[WebSocketServer] = None
async def start(self):
"""Start all application components"""
try:
logger.info("Starting COBY Multi-Exchange Data Aggregation System")
# Start monitoring systems
logger.info("Starting monitoring systems...")
metrics_collector.start_collection()
get_performance_monitor().start_monitoring()
memory_monitor.start_monitoring()
# Start WebSocket server
logger.info("Starting WebSocket server...")
self.websocket_server = WebSocketServer(
host=self.config.api.host,
port=self.config.api.websocket_port
)
websocket_task = asyncio.create_task(self.websocket_server.start())
self.tasks.append(websocket_task)
# Start REST API server (includes static file serving)
logger.info("Starting REST API server with static file serving...")
app = create_app(self.config)
api_task = asyncio.create_task(
self._run_api_server(app, self.config.api.host, self.config.api.port)
)
self.tasks.append(api_task)
# Start exchange connectors
logger.info("Starting exchange connectors...")
await self._start_exchange_connectors()
# Start data processing pipeline
logger.info("Starting data processing pipeline...")
await self._start_data_processing()
self.running = True
logger.info("COBY system started successfully")
# Wait for all tasks
await asyncio.gather(*self.tasks, return_exceptions=True)
except Exception as e:
logger.error(f"Error starting COBY application: {e}")
raise
async def stop(self):
"""Stop all application components"""
if not self.running:
return
logger.info("Stopping COBY Multi-Exchange Data Aggregation System")
try:
# Stop WebSocket server
if self.websocket_server:
await self.websocket_server.stop()
# Cancel all tasks
for task in self.tasks:
if not task.done():
task.cancel()
# Wait for tasks to complete
if self.tasks:
await asyncio.gather(*self.tasks, return_exceptions=True)
# Stop monitoring systems
memory_monitor.stop_monitoring()
get_performance_monitor().stop_monitoring()
metrics_collector.stop_collection()
self.running = False
logger.info("COBY system stopped successfully")
except Exception as e:
logger.error(f"Error stopping COBY application: {e}")
async def _start_exchange_connectors(self):
"""Start exchange connectors"""
try:
# Import connectors
from connectors.binance_connector import BinanceConnector
from connectors.kucoin_connector import KucoinConnector
from connectors.coinbase_connector import CoinbaseConnector
# Initialize connectors
self.connectors = {
'binance': BinanceConnector(),
'kucoin': KucoinConnector(),
'coinbase': CoinbaseConnector()
}
# Start connectors
for name, connector in self.connectors.items():
try:
logger.info(f"Starting {name} connector...")
connector_task = asyncio.create_task(self._run_connector(connector))
self.tasks.append(connector_task)
except Exception as e:
logger.error(f"Failed to start {name} connector: {e}")
except Exception as e:
logger.error(f"Error starting exchange connectors: {e}")
async def _run_connector(self, connector):
"""Run a single connector"""
try:
# Connect to exchange
if await connector.connect():
logger.info(f"Connected to {connector.exchange_name}")
# Subscribe to default symbols
default_symbols = ['BTCUSDT', 'ETHUSDT', 'ADAUSDT', 'SOLUSDT']
for symbol in default_symbols:
try:
await connector.subscribe_orderbook(symbol)
await connector.subscribe_trades(symbol)
logger.info(f"Subscribed to {symbol} on {connector.exchange_name}")
except Exception as e:
logger.warning(f"Failed to subscribe to {symbol} on {connector.exchange_name}: {e}")
# Keep connector running
while connector.is_connected:
await asyncio.sleep(1)
async def main():
"""Main application entry point"""
parser = argparse.ArgumentParser(description='COBY Multi-Exchange Data Aggregation System')
parser.add_argument('--debug', action='store_true', help='Enable debug mode')
parser.add_argument('--reload', action='store_true', help='Enable auto-reload (development)')
parser.add_argument('--config', type=str, help='Configuration file path')
args = parser.parse_args()
# Setup logging
log_level = 'DEBUG' if args.debug else 'INFO'
setup_logging(level=log_level)
# Load configuration
config = Config()
if args.debug:
config.debug = True
config.logging.level = 'DEBUG'
# Create and start application
app = COBYApplication(config)
# Setup signal handlers
def signal_handler(signum, frame):
logger.info(f"Received signal {signum}, shutting down...")
asyncio.create_task(app.stop())
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
try:
await app.start()
except KeyboardInterrupt:
logger.info("Received keyboard interrupt, shutting down...")
except Exception as e:
logger.error(f"Application error: {e}")
sys.exit(1)
finally:
await app.stop()
if __name__ == "__main__":
# Ensure we're running in the correct directory
os.chdir(os.path.dirname(os.path.abspath(__file__)))
# Run the application
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nShutdown complete")
except Exception as e:
print(f"Fatal error: {e}")
sys.exit(1)