diff --git a/NN/models/dqn_agent.py b/NN/models/dqn_agent.py index 0b53ea3..e4a9454 100644 --- a/NN/models/dqn_agent.py +++ b/NN/models/dqn_agent.py @@ -136,7 +136,7 @@ class DQNAgent: # Trade action fee and confidence thresholds self.trade_action_fee = 0.0005 # Small fee to discourage unnecessary trading - self.minimum_action_confidence = 0.5 # Minimum confidence to consider trading + self.minimum_action_confidence = 0.3 # Minimum confidence to consider trading (lowered from 0.5) self.recent_actions = [] # Track recent actions to avoid oscillations # Violent move detection diff --git a/dataprovider_realtime.py b/dataprovider_realtime.py index 0665031..6fd472d 100644 --- a/dataprovider_realtime.py +++ b/dataprovider_realtime.py @@ -2,6 +2,21 @@ import asyncio import json import logging +# Fix PIL import issue that causes plotly JSON serialization errors +import os +os.environ['MPLBACKEND'] = 'Agg' # Use non-interactive backend +try: + # Try to fix PIL import issue + import PIL.Image + # Disable PIL in plotly to prevent circular import issues + import plotly.io as pio + pio.kaleido.scope.default_format = "png" +except ImportError: + pass +except Exception: + # Suppress any PIL-related errors during import + pass + from typing import Dict, List, Optional, Tuple, Union import websockets import plotly.graph_objects as go @@ -1814,553 +1829,354 @@ class RealTimeChart: return html.Div(f"Error loading chart: {str(e)}", style={"color": "red", "padding": "20px"}) def _setup_callbacks(self): - """Set up all the callbacks for the dashboard""" - - # Callback for timeframe selection - @self.app.callback( - [Output('interval-store', 'data'), - Output('btn-1s', 'style'), - Output('btn-5s', 'style'), - Output('btn-15s', 'style'), - Output('btn-1m', 'style'), - Output('btn-5m', 'style'), - Output('btn-15m', 'style'), - Output('btn-1h', 'style')], - [Input('btn-1s', 'n_clicks'), - Input('btn-5s', 'n_clicks'), - Input('btn-15s', 'n_clicks'), - Input('btn-1m', 'n_clicks'), - Input('btn-5m', 'n_clicks'), - Input('btn-15m', 'n_clicks'), - Input('btn-1h', 'n_clicks')], - [dash.dependencies.State('interval-store', 'data')] - ) - def update_interval(n1, n5, n15, n60, n300, n900, n3600, data): - ctx = dash.callback_context - if not ctx.triggered: - # Default state (1s selected) - return ({'interval': 1}, - self.active_button_style, - self.button_style, - self.button_style, - self.button_style, - self.button_style, - self.button_style, - self.button_style) + """Setup Dash callbacks for the real-time chart""" + if self.app is None: + return - button_id = ctx.triggered[0]['prop_id'].split('.')[0] + try: + # Update chart with all components based on interval + @self.app.callback( + [ + Output('live-chart', 'figure'), + Output('secondary-charts', 'figure'), + Output('positions-list', 'children'), + Output('current-price', 'children'), + Output('current-balance', 'children'), + Output('accumulated-pnl', 'children'), + Output('trade-rate-second', 'children'), + Output('trade-rate-minute', 'children'), + Output('trade-rate-hour', 'children') + ], + [ + Input('interval-component', 'n_intervals'), + Input('interval-store', 'data') + ] + ) + def update_all(n_intervals, interval_data): + """Update all chart components""" + try: + # Get selected interval + interval_seconds = interval_data.get('interval', 1) if interval_data else 1 + + # Update main chart - limit data for performance + main_chart = self._update_main_chart(interval_seconds) + + # Update secondary charts - limit data for performance + secondary_charts = self._update_secondary_charts() + + # Update positions list + positions_list = self._get_position_list_rows() + + # Update current price and balance + current_price = f"${self.latest_price:.2f}" if self.latest_price else "Error" + current_balance = f"${self.current_balance:.2f}" + accumulated_pnl = f"${self.accumulative_pnl:.2f}" + + # Calculate trade rates + trade_rate = self._calculate_trade_rate() + trade_rate_second = f"{trade_rate['per_second']:.1f}" + trade_rate_minute = f"{trade_rate['per_minute']:.1f}" + trade_rate_hour = f"{trade_rate['per_hour']:.1f}" + + return (main_chart, secondary_charts, positions_list, + current_price, current_balance, accumulated_pnl, + trade_rate_second, trade_rate_minute, trade_rate_hour) + + except Exception as e: + logger.error(f"Error in update_all callback: {str(e)}") + # Return empty/error states + import plotly.graph_objects as go + empty_fig = go.Figure() + empty_fig.add_annotation(text="Chart Loading...", xref="paper", yref="paper", x=0.5, y=0.5) + + return (empty_fig, empty_fig, [], "Loading...", "$0.00", "$0.00", "0.0", "0.0", "0.0") - # Initialize all buttons to inactive - button_styles = [self.button_style] * 7 + # Timeframe selection callbacks + @self.app.callback( + [Output('interval-store', 'data'), + Output('btn-1s', 'style'), Output('btn-5s', 'style'), Output('btn-15s', 'style'), + Output('btn-1m', 'style'), Output('btn-5m', 'style'), Output('btn-15m', 'style'), + Output('btn-1h', 'style')], + [Input('btn-1s', 'n_clicks'), Input('btn-5s', 'n_clicks'), Input('btn-15s', 'n_clicks'), + Input('btn-1m', 'n_clicks'), Input('btn-5m', 'n_clicks'), Input('btn-15m', 'n_clicks'), + Input('btn-1h', 'n_clicks')] + ) + def update_timeframe(n1s, n5s, n15s, n1m, n5m, n15m, n1h): + """Update selected timeframe based on button clicks""" + ctx = dash.callback_context + if not ctx.triggered: + # Default to 1s + styles = [self.active_button_style] + [self.button_style] * 6 + return {'interval': 1}, *styles + + button_id = ctx.triggered[0]['prop_id'].split('.')[0] + + # Map button to interval seconds + interval_map = { + 'btn-1s': 1, 'btn-5s': 5, 'btn-15s': 15, + 'btn-1m': 60, 'btn-5m': 300, 'btn-15m': 900, 'btn-1h': 3600 + } + + selected_interval = interval_map.get(button_id, 1) + + # Create styles - active for selected, normal for others + button_names = ['btn-1s', 'btn-5s', 'btn-15s', 'btn-1m', 'btn-5m', 'btn-15m', 'btn-1h'] + styles = [] + for name in button_names: + if name == button_id: + styles.append(self.active_button_style) + else: + styles.append(self.button_style) + + return {'interval': selected_interval}, *styles + + logger.info("Dash callbacks registered successfully") - # Set the active button and interval - if button_id == 'btn-1s': - button_styles[0] = self.active_button_style - return ({'interval': 1}, *button_styles) - elif button_id == 'btn-5s': - button_styles[1] = self.active_button_style - return ({'interval': 5}, *button_styles) - elif button_id == 'btn-15s': - button_styles[2] = self.active_button_style - return ({'interval': 15}, *button_styles) - elif button_id == 'btn-1m': - button_styles[3] = self.active_button_style - return ({'interval': 60}, *button_styles) - elif button_id == 'btn-5m': - button_styles[4] = self.active_button_style - return ({'interval': 300}, *button_styles) - elif button_id == 'btn-15m': - button_styles[5] = self.active_button_style - return ({'interval': 900}, *button_styles) - elif button_id == 'btn-1h': - button_styles[6] = self.active_button_style - return ({'interval': 3600}, *button_styles) + except Exception as e: + logger.error(f"Error setting up callbacks: {str(e)}") + import traceback + logger.error(traceback.format_exc()) + + def _calculate_trade_rate(self): + """Calculate trading rate per second, minute, and hour""" + try: + now = datetime.now() + current_time = time.time() - # Default - keep current interval - current_interval = data.get('interval', 1) - # Set the appropriate button as active - if current_interval == 1: - button_styles[0] = self.active_button_style - elif current_interval == 5: - button_styles[1] = self.active_button_style - elif current_interval == 15: - button_styles[2] = self.active_button_style - elif current_interval == 60: - button_styles[3] = self.active_button_style - elif current_interval == 300: - button_styles[4] = self.active_button_style - elif current_interval == 900: - button_styles[5] = self.active_button_style - elif current_interval == 3600: - button_styles[6] = self.active_button_style - - return (data, *button_styles) - - # Main update callback - @self.app.callback( - [Output('live-chart', 'figure'), - Output('secondary-charts', 'figure'), - Output('positions-list', 'children'), - Output('current-price', 'children'), - Output('current-balance', 'children'), - Output('accumulated-pnl', 'children'), - Output('trade-rate-second', 'children'), - Output('trade-rate-minute', 'children'), - Output('trade-rate-hour', 'children')], - [Input('interval-component', 'n_intervals'), - Input('interval-store', 'data')] - ) - def update_all(n, interval_data): - try: - # Get selected interval - interval = interval_data.get('interval', 1) - - # Get updated chart figures - main_fig = self._update_main_chart(interval) - secondary_fig = self._update_secondary_charts() - - # Get updated positions list - positions = self._get_position_list_rows() - - # Format the current price - current_price = "$ ---.--" - if self.latest_price is not None: - current_price = f"${self.latest_price:.2f}" - - # Format balance and PnL - balance_text = f"${self.current_balance:.2f}" - pnl_text = f"${self.accumulative_pnl:.2f}" - - # Get trade rate statistics - trade_rate = self.calculate_trade_rate() - per_second = f"{trade_rate['per_second']:.1f}" - per_minute = f"{trade_rate['per_minute']:.1f}" - per_hour = f"{trade_rate['per_hour']:.1f}" - - return main_fig, secondary_fig, positions, current_price, balance_text, pnl_text, per_second, per_minute, per_hour - except Exception as e: - logger.error(f"Error in update callback: {str(e)}") - import traceback - logger.error(traceback.format_exc()) - # Return empty updates on error - return {}, {}, [], "Error", "$0.00", "$0.00", "0.0", "0.0", "0.0" + # Filter trades within different time windows + trades_last_second = sum(1 for trade_time in self.trade_times if current_time - trade_time <= 1) + trades_last_minute = sum(1 for trade_time in self.trade_times if current_time - trade_time <= 60) + trades_last_hour = sum(1 for trade_time in self.trade_times if current_time - trade_time <= 3600) + + return { + "per_second": trades_last_second, + "per_minute": trades_last_minute, + "per_hour": trades_last_hour + } + except Exception as e: + logger.warning(f"Error calculating trade rate: {str(e)}") + return {"per_second": 0.0, "per_minute": 0.0, "per_hour": 0.0} def _update_secondary_charts(self): - """Update the secondary charts for other timeframes""" + """Create secondary charts for volume and indicators""" try: - # For each timeframe, create a small chart - secondary_timeframes = ['1m', '5m', '15m', '1h'] - - if not all(tf in self.tick_storage.candles for tf in secondary_timeframes): - logger.warning("Not all secondary timeframes available") - # Return empty figure with a message - fig = make_subplots(rows=1, cols=4) - for i, tf in enumerate(secondary_timeframes, 1): - fig.add_annotation( - text=f"No data for {tf}", - xref=f"x{i}", yref=f"y{i}", - x=0.5, y=0.5, showarrow=False - ) - return fig - - # Create subplots for each timeframe + # Create subplots for secondary charts fig = make_subplots( - rows=1, cols=4, - subplot_titles=secondary_timeframes, - shared_yaxes=True + rows=2, cols=1, + subplot_titles=['Volume', 'Technical Indicators'], + shared_xaxes=True, + vertical_spacing=0.1, + row_heights=[0.3, 0.7] ) - # Loop through each timeframe - for i, timeframe in enumerate(secondary_timeframes, 1): - interval_key = timeframe + # Get latest candles (limit for performance) + candles = self.tick_storage.candles.get("1m", [])[-100:] # Last 100 candles for performance + + if not candles: + fig.add_annotation(text="No data available", xref="paper", yref="paper", x=0.5, y=0.5) + fig.update_layout( + title="Secondary Charts", + template="plotly_dark", + height=400 + ) + return fig + + # Extract data + timestamps = [candle['timestamp'] for candle in candles] + volumes = [candle['volume'] for candle in candles] + closes = [candle['close'] for candle in candles] + + # Volume chart + colors = ['#26a69a' if i == 0 or closes[i] >= closes[i-1] else '#ef5350' for i in range(len(closes))] + fig.add_trace( + go.Bar( + x=timestamps, + y=volumes, + name='Volume', + marker_color=colors, + showlegend=False + ), + row=1, col=1 + ) + + # Technical indicators + if len(closes) >= 20: + # Simple moving average + sma_20 = pd.Series(closes).rolling(window=20).mean() + fig.add_trace( + go.Scatter( + x=timestamps, + y=sma_20, + name='SMA 20', + line=dict(color='#ffeb3b', width=2) + ), + row=2, col=1 + ) - # Get candles for this timeframe - if interval_key in self.tick_storage.candles and self.tick_storage.candles[interval_key]: - # For rendering, limit to the last 100 candles for performance - candles = self.tick_storage.candles[interval_key][-100:] - - if candles: - # Extract OHLC values - timestamps = [candle['timestamp'] for candle in candles] - opens = [candle['open'] for candle in candles] - highs = [candle['high'] for candle in candles] - lows = [candle['low'] for candle in candles] - closes = [candle['close'] for candle in candles] - - # Add candlestick trace - fig.add_trace(go.Candlestick( + # RSI calculation + if len(closes) >= 14: + rsi = self._calculate_rsi(closes, 14) + fig.add_trace( + go.Scatter( x=timestamps, - open=opens, - high=highs, - low=lows, - close=closes, - name=interval_key, - increasing_line_color='rgba(0, 180, 0, 0.7)', - decreasing_line_color='rgba(255, 0, 0, 0.7)', - showlegend=False - ), row=1, col=i) - else: - # Add empty annotation if no data - fig.add_annotation( - text=f"No data for {interval_key}", - xref=f"x{i}", yref=f"y{i}", - x=0.5, y=0.5, showarrow=False + y=rsi, + name='RSI', + line=dict(color='#29b6f6', width=2), + yaxis='y3' + ), + row=2, col=1 ) # Update layout fig.update_layout( - height=250, + title="Volume & Technical Indicators", template="plotly_dark", - showlegend=False, - margin=dict(l=0, r=0, t=30, b=0), + height=400, + showlegend=True, + legend=dict(x=0, y=1, bgcolor='rgba(0,0,0,0)') ) - # Format Y-axis with 2 decimal places - fig.update_yaxes(tickformat=".2f") - - # Format X-axis to show only the date (no time) - for i in range(1, 5): - fig.update_xaxes( - row=1, col=i, - rangeslider_visible=False, - rangebreaks=[dict(bounds=["sat", "mon"])], # hide weekends - tickformat="%m-%d" # Show month-day only - ) + # Update y-axes + fig.update_yaxes(title="Volume", row=1, col=1) + fig.update_yaxes(title="Price", row=2, col=1) return fig except Exception as e: - logger.error(f"Error updating secondary charts: {str(e)}") - import traceback - logger.error(traceback.format_exc()) + logger.error(f"Error creating secondary charts: {str(e)}") # Return empty figure on error - return make_subplots(rows=1, cols=4) + fig = go.Figure() + fig.add_annotation(text=f"Error: {str(e)}", xref="paper", yref="paper", x=0.5, y=0.5) + fig.update_layout(template="plotly_dark", height=400) + return fig + + def _calculate_rsi(self, prices, period=14): + """Calculate RSI indicator""" + try: + prices = pd.Series(prices) + delta = prices.diff() + gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() + loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() + rs = gain / loss + rsi = 100 - (100 / (1 + rs)) + return rsi.fillna(50).tolist() # Fill NaN with neutral RSI value + except Exception: + return [50] * len(prices) # Return neutral RSI on error def _get_position_list_rows(self): - """Generate HTML for the positions list (last 10 positions only)""" + """Get list of current positions for display""" try: - if not hasattr(self, 'positions') or not self.positions: - # Return placeholder if no positions - return html.Div("No positions to display", style={"textAlign": "center", "padding": "20px"}) - - # Create table headers - table_header = [ - html.Thead(html.Tr([ - html.Th("ID"), - html.Th("Action"), - html.Th("Entry Price"), - html.Th("Exit Price"), - html.Th("Amount"), - html.Th("PnL"), - html.Th("Time") - ])) - ] + if not self.positions: + return [html.Div("No open positions", style={"color": "#888", "padding": "10px"})] - # Create table rows for only the last 10 positions to avoid overcrowding rows = [] - last_positions = self.positions[-10:] if len(self.positions) > 10 else self.positions - - for position in last_positions: - # Format times - entry_time = position.entry_timestamp.strftime("%H:%M:%S") - exit_time = position.exit_timestamp.strftime("%H:%M:%S") if position.exit_timestamp else "-" - - # Format PnL - pnl_value = position.pnl if position.pnl is not None else 0 - pnl_text = f"${pnl_value:.2f}" if position.pnl is not None else "-" - pnl_style = {"color": "green" if position.pnl and position.pnl > 0 else "red"} - - # Create row - row = html.Tr([ - html.Td(position.trade_id), - html.Td(position.action), - html.Td(f"${position.entry_price:.2f}"), - html.Td(f"${position.exit_price:.2f}" if position.exit_price else "-"), - html.Td(f"{position.amount:.4f}"), - html.Td(pnl_text, style=pnl_style), - html.Td(f"{entry_time} → {exit_time}") - ]) - rows.append(row) - - table_body = [html.Tbody(rows)] - - # Add summary row for total PnL and other statistics - total_trades = len(self.positions) - winning_trades = sum(1 for p in self.positions if p.pnl and p.pnl > 0) - win_rate = winning_trades / total_trades * 100 if total_trades > 0 else 0 - - # Format display colors for PnL - pnl_color = "green" if self.accumulative_pnl >= 0 else "red" - - summary_row = html.Tr([ - html.Td("SUMMARY", colSpan=2, style={"fontWeight": "bold"}), - html.Td(f"Trades: {total_trades}"), - html.Td(f"Win Rate: {win_rate:.1f}%"), - html.Td("Total PnL:", style={"fontWeight": "bold"}), - html.Td(f"${self.accumulative_pnl:.2f}", - style={"color": pnl_color, "fontWeight": "bold"}), - html.Td(f"Balance: ${self.current_balance:.2f}") - ], style={"backgroundColor": "rgba(80, 80, 80, 0.3)"}) - - # Create the table with improved styling - table = html.Table( - table_header + table_body + [html.Tfoot([summary_row])], - style={ - "width": "100%", - "textAlign": "center", - "borderCollapse": "collapse", - "marginTop": "20px" - }, - className="table table-striped table-dark" - ) - - return table - - except Exception as e: - logger.error(f"Error generating position list: {str(e)}") - import traceback - logger.error(traceback.format_exc()) - return html.Div("Error displaying positions") - - def get_candles(self, interval_seconds=60): - """Get candles for the specified interval""" - try: - # Get candles from tick storage - interval_key = self._get_interval_key(interval_seconds) - candles_list = self.tick_storage.get_candles(interval_key) - - if not candles_list: - logger.warning(f"No candle data available for {interval_key} - trying to load historical data") - # Try to load historical data if we don't have any - self.tick_storage.load_historical_data(self.symbol) - candles_list = self.tick_storage.get_candles(interval_key) - - if not candles_list: - logger.error(f"Still no candle data available for {interval_key} after loading historical data") - return [] - - logger.info(f"Retrieved {len(candles_list)} candles for {interval_key}") - return candles_list - - except Exception as e: - logger.error(f"Error getting candles: {str(e)}") - import traceback - logger.error(traceback.format_exc()) - return [] # Return empty list on error - - def _get_interval_key(self, interval_seconds): - """Convert interval seconds to a key used in the tick storage""" - if interval_seconds < 60: - return f"{interval_seconds}s" - elif interval_seconds < 3600: - return f"{interval_seconds // 60}m" - elif interval_seconds < 86400: - return f"{interval_seconds // 3600}h" - else: - return f"{interval_seconds // 86400}d" - - def calculate_trade_rate(self): - """Calculate and return trading rate statistics""" - now = datetime.now() - - # Only calculate once per second to avoid unnecessary processing - if (now - self.last_trade_rate_calculation).total_seconds() < 1.0: - return self.trade_rate - - self.last_trade_rate_calculation = now - - # Clean up old trade times (older than 1 hour) - one_hour_ago = now - timedelta(hours=1) - self.trade_times = [t for t in self.trade_times if t > one_hour_ago] - - if not self.trade_times: - self.trade_rate = {"per_second": 0, "per_minute": 0, "per_hour": 0} - return self.trade_rate - - # Calculate rates based on time windows - last_second = now - timedelta(seconds=1) - last_minute = now - timedelta(minutes=1) - - # Count trades in each time window - trades_last_second = sum(1 for t in self.trade_times if t > last_second) - trades_last_minute = sum(1 for t in self.trade_times if t > last_minute) - trades_last_hour = len(self.trade_times) # All remaining trades are from last hour - - # Calculate rates - self.trade_rate = { - "per_second": trades_last_second, - "per_minute": trades_last_minute, - "per_hour": trades_last_hour - } - - return self.trade_rate - - def _update_chart_and_positions(self): - """Update the chart with current data and positions""" - try: - # Force an update of the charts - self._update_main_chart(1) # Update 1s chart by default - self._update_secondary_charts() - logger.debug("Updated charts and positions") - return True - except Exception as e: - logger.error(f"Error updating chart and positions: {str(e)}") - import traceback - logger.error(traceback.format_exc()) - return False - - async def start_websocket(self): - """Start the websocket connection for real-time data""" - try: - # Step 1: Clear everything related to positions FIRST, before any other operations - logger.info(f"Initializing fresh chart for {self.symbol} - clearing all previous positions") - self.positions = [] # Clear positions list - self.accumulative_pnl = 0.0 # Reset accumulated PnL - self.current_balance = 100.0 # Reset balance - - # Step 2: Clear any previous tick data to avoid using stale data from previous training sessions - self.tick_storage.ticks = [] - - # Step 3: Clear any previous 1s candles before loading historical data - self.tick_storage.candles['1s'] = [] - - logger.info("Initialized empty 1s candles, tick collection, and positions for fresh data") - - # Load historical data first to ensure we have candles for all timeframes - logger.info(f"Loading historical data for {self.symbol}") - - # Load historical data directly from tick_storage - self.tick_storage.load_historical_data(self.symbol) - - # Double check that we have the 1s timeframe initialized - if '1s' not in self.tick_storage.candles: - self.tick_storage.candles['1s'] = [] - logger.info(f"After loading historical data, 1s candles count: {len(self.tick_storage.candles['1s'])}") - - # Make sure we update the charts once with historical data before websocket starts - # Update all the charts with the initial historical data - self._update_chart_and_positions() - - # Initialize websocket - self.websocket = ExchangeWebSocket(self.symbol) - await self.websocket.connect() - - logger.info(f"WebSocket connected for {self.symbol}") - - # Counter for received ticks - tick_count = 0 - last_update_time = time.time() - - # Start receiving data - while self.websocket.running: + for i, position in enumerate(self.positions): try: - data = await self.websocket.receive() - if data: - # Process the received data - if 'price' in data: - tick_count += 1 - - # Create a proper tick with timestamp object - tick = { - 'price': data['price'], - 'quantity': data.get('volume', 0), - 'timestamp': pd.Timestamp(data['timestamp'], unit='ms') - } - - # Update tick storage with proper tick object - self.tick_storage.add_tick(tick) - - # Store latest values - self.latest_price = data['price'] - self.latest_volume = data.get('volume', 0) - self.latest_timestamp = datetime.fromtimestamp(data['timestamp'] / 1000) - - # Force chart update every 5 seconds - current_time = time.time() - if current_time - last_update_time >= 5.0: - self._update_chart_and_positions() - last_update_time = current_time - logger.debug("Forced chart update after new ticks") - - # Log tick processing for debugging (every 100 ticks) - if tick_count % 100 == 0: - logger.info(f"Processed {tick_count} ticks, current price: ${self.latest_price:.2f}") - logger.info(f"Current 1s candles count: {len(self.tick_storage.candles['1s'])}") - + # Calculate current PnL + current_pnl = (self.latest_price - position.entry_price) * position.amount + if position.action.upper() == 'SELL': + current_pnl = -current_pnl + + # Create position row + row = html.Div([ + html.Span(f"#{i+1}: ", style={"fontWeight": "bold"}), + html.Span(f"{position.action.upper()} ", + style={"color": "#00e676" if position.action.upper() == "BUY" else "#ff1744"}), + html.Span(f"{position.amount:.4f} @ ${position.entry_price:.2f} "), + html.Span(f"PnL: ${current_pnl:.2f}", + style={"color": "#00e676" if current_pnl >= 0 else "#ff1744"}) + ], style={"padding": "5px", "borderBottom": "1px solid #333"}) + + rows.append(row) except Exception as e: - logger.error(f"Error processing websocket data: {str(e)}") - await asyncio.sleep(1) # Wait before retrying + logger.warning(f"Error formatting position {i}: {str(e)}") + + return rows except Exception as e: - logger.error(f"WebSocket error for {self.symbol}: {str(e)}") - import traceback - logger.error(traceback.format_exc()) - finally: - if hasattr(self, 'websocket'): - await self.websocket.close() + logger.error(f"Error getting position list: {str(e)}") + return [html.Div("Error loading positions", style={"color": "red", "padding": "10px"})] - def run(self, host='localhost', port=8050): - """Run the Dash app on the specified host and port""" + def add_trade(self, action, price, amount, timestamp=None, trade_id=None): + """Add a trade to the chart and update tracking""" try: - logger.info("="*60) - logger.info(f"🚀 STARTING WEB UI FOR {self.symbol}") - logger.info(f"📱 Web interface available at: http://{host}:{port}/") - logger.info(f"🌐 Open this URL in your browser to view the trading chart") - logger.info("="*60) - self.app.run(debug=False, use_reloader=False, host=host, port=port) + if timestamp is None: + timestamp = datetime.now() + + # Create trade record + trade = { + 'id': trade_id or str(uuid.uuid4()), + 'action': action.upper(), + 'price': float(price), + 'amount': float(amount), + 'timestamp': timestamp, + 'value': float(price) * float(amount) + } + + # Add to trades list + self.all_trades.append(trade) + + # Update trade rate tracking + self.trade_times.append(time.time()) + # Keep only last hour of trade times + cutoff_time = time.time() - 3600 + self.trade_times = [t for t in self.trade_times if t > cutoff_time] + + # Update positions + if action.upper() in ['BUY', 'SELL']: + position = Position( + action=action.upper(), + entry_price=float(price), + amount=float(amount), + timestamp=timestamp, + trade_id=trade['id'] + ) + self.positions.append(position) + + # Update balance and PnL + if action.upper() == 'BUY': + self.current_balance -= trade['value'] + else: # SELL + self.current_balance += trade['value'] + + # Calculate PnL for this trade + if len(self.all_trades) > 1: + # Simple PnL calculation - more sophisticated logic could be added + last_opposite_trades = [t for t in reversed(self.all_trades[:-1]) + if t['action'] != action.upper()] + if last_opposite_trades: + last_trade = last_opposite_trades[0] + if action.upper() == 'SELL': + pnl = (float(price) - last_trade['price']) * float(amount) + else: # BUY + pnl = (last_trade['price'] - float(price)) * float(amount) + self.accumulative_pnl += pnl + + logger.info(f"Added trade: {action.upper()} {amount} @ ${price:.2f}") + except Exception as e: - logger.error(f"Error running Dash app: {str(e)}") - import traceback - logger.error(traceback.format_exc()) + logger.error(f"Error adding trade: {str(e)}") - def add_trade(self, action, price, amount, timestamp): - """ - Adds a trade to the chart's positions and tracks the trade time. - Handles closing previous open positions if necessary. - """ - try: - trade_timestamp = datetime.fromtimestamp(timestamp / 1000) if isinstance(timestamp, (int, float)) else timestamp - trade_timestamp = trade_timestamp or datetime.now() - - # Close the previous open position if this trade is opposite - last_open_position = next((pos for pos in reversed(self.positions) if pos.is_open), None) - - if last_open_position: - if (action == 'SELL' and last_open_position.action == 'BUY') or \ - (action == 'BUY' and last_open_position.action == 'SELL'): - closed_pnl = last_open_position.close(price, trade_timestamp) - self.accumulative_pnl += closed_pnl - self.current_balance += closed_pnl # Simplified balance update - logger.info(f"Closed {last_open_position.action} position {last_open_position.trade_id} at {price:.2f}. PnL: {closed_pnl:.4f}") - - - # Create and add the new position - new_position = Position( - action=action, - entry_price=price, - amount=amount, - timestamp=trade_timestamp - ) - self.positions.append(new_position) - self.trade_times.append(datetime.now()) # Use current time for rate calculation accuracy - - logger.info(f"Added new {action} position {new_position.trade_id} at {price:.2f}, Time: {trade_timestamp}") - - # Limit the number of stored positions and trade times to prevent memory issues - max_history = 1000 - if len(self.positions) > max_history: - self.positions = self.positions[-max_history:] - if len(self.trade_times) > max_history * 5: # Keep more trade times for rate calc - self.trade_times = self.trade_times[-max_history*5:] - - except Exception as e: - logger.error(f"Error adding trade to chart: {e}") - import traceback - logger.error(traceback.format_exc()) - - def add_nn_signal(self, symbol, signal, confidence, timestamp): - # Placeholder for adding NN signals if needed in the future - pass + def _get_interval_key(self, interval_seconds): + """Convert interval seconds to timeframe key""" + if interval_seconds <= 1: + return "1s" + elif interval_seconds <= 5: + return "5s" if "5s" in self.tick_storage.timeframes else "1s" + elif interval_seconds <= 15: + return "15s" if "15s" in self.tick_storage.timeframes else "1m" + elif interval_seconds <= 60: + return "1m" + elif interval_seconds <= 300: + return "5m" + elif interval_seconds <= 900: + return "15m" + elif interval_seconds <= 3600: + return "1h" + elif interval_seconds <= 14400: + return "4h" + else: + return "1d" def _update_main_chart(self, interval_seconds): """Update the main chart for the specified interval""" @@ -2368,46 +2184,28 @@ class RealTimeChart: # Convert interval seconds to timeframe key interval_key = self._get_interval_key(interval_seconds) - # Get candles for this timeframe - if interval_key not in self.tick_storage.candles or not self.tick_storage.candles[interval_key]: + # Get candles for this timeframe (limit to last 100 for performance) + candles = self.tick_storage.candles.get(interval_key, [])[-100:] + + if not candles: logger.warning(f"No candle data available for {interval_key}") # Return empty figure with a message fig = go.Figure() fig.add_annotation( text=f"No data available for {interval_key}", xref="paper", yref="paper", - x=0.5, y=0.5, showarrow=False, - font=dict(size=20, color="white") + x=0.5, y=0.5, + showarrow=False, + font=dict(size=16, color="white") ) fig.update_layout( - template="plotly_dark", - height=600, title=f"{self.symbol} - {interval_key} Chart", - xaxis_title="Time", - yaxis_title="Price ($)" - ) - return fig - - # Get candles (limit to last 500 for performance) - candles = self.tick_storage.candles[interval_key][-500:] - - if not candles: - # Return empty figure if no candles - fig = go.Figure() - fig.add_annotation( - text=f"No candles available for {interval_key}", - xref="paper", yref="paper", - x=0.5, y=0.5, showarrow=False, - font=dict(size=20, color="white") - ) - fig.update_layout( template="plotly_dark", - height=600, - title=f"{self.symbol} - {interval_key} Chart" + height=600 ) return fig - # Extract OHLC values + # Extract data from candles timestamps = [candle['timestamp'] for candle in candles] opens = [candle['open'] for candle in candles] highs = [candle['high'] for candle in candles] @@ -2415,7 +2213,7 @@ class RealTimeChart: closes = [candle['close'] for candle in candles] volumes = [candle['volume'] for candle in candles] - # Create the main figure + # Create candlestick chart fig = go.Figure() # Add candlestick trace @@ -2425,80 +2223,110 @@ class RealTimeChart: high=highs, low=lows, close=closes, - name=f"{self.symbol}", - increasing_line_color='rgba(0, 200, 0, 0.8)', - decreasing_line_color='rgba(255, 0, 0, 0.8)', - increasing_fillcolor='rgba(0, 200, 0, 0.3)', - decreasing_fillcolor='rgba(255, 0, 0, 0.3)' + name="Price", + increasing_line_color='#26a69a', + decreasing_line_color='#ef5350', + increasing_fillcolor='#26a69a', + decreasing_fillcolor='#ef5350' )) - # Add trade markers if we have positions - if hasattr(self, 'positions') and self.positions: - # Get recent positions (last 50 to avoid clutter) - recent_positions = self.positions[-50:] if len(self.positions) > 50 else self.positions + # Add trade markers if we have trades + if self.all_trades: + # Filter trades to match the current timeframe window + start_time = timestamps[0] if timestamps else datetime.now() - timedelta(hours=1) + end_time = timestamps[-1] if timestamps else datetime.now() - for position in recent_positions: - # Add entry marker - fig.add_trace(go.Scatter( - x=[position.entry_timestamp], - y=[position.entry_price], - mode='markers', - marker=dict( - symbol='triangle-up' if position.action == 'BUY' else 'triangle-down', - size=12, - color='green' if position.action == 'BUY' else 'red', - line=dict(width=2, color='white') - ), - name=f"{position.action} Entry", - hovertemplate=f"{position.action} Entry
" + - f"Price: ${position.entry_price:.2f}
" + - f"Time: {position.entry_timestamp}
" + - f"ID: {position.trade_id}", - showlegend=False - )) + filtered_trades = [ + trade for trade in self.all_trades + if start_time <= trade['timestamp'] <= end_time + ] + + if filtered_trades: + buy_trades = [t for t in filtered_trades if t['action'] == 'BUY'] + sell_trades = [t for t in filtered_trades if t['action'] == 'SELL'] - # Add exit marker if position is closed - if not position.is_open and position.exit_price and position.exit_timestamp: + # Add BUY markers + if buy_trades: fig.add_trace(go.Scatter( - x=[position.exit_timestamp], - y=[position.exit_price], + x=[t['timestamp'] for t in buy_trades], + y=[t['price'] for t in buy_trades], mode='markers', marker=dict( - symbol='triangle-down' if position.action == 'BUY' else 'triangle-up', + symbol='triangle-up', size=12, - color='blue', - line=dict(width=2, color='white') + color='#00e676', + line=dict(color='white', width=1) ), - name=f"{position.action} Exit", - hovertemplate=f"{position.action} Exit
" + - f"Price: ${position.exit_price:.2f}
" + - f"Time: {position.exit_timestamp}
" + - f"PnL: ${position.pnl:.2f}
" + - f"ID: {position.trade_id}", - showlegend=False + name='BUY', + text=[f"BUY {t['amount']:.4f} @ ${t['price']:.2f}" for t in buy_trades], + hovertemplate='%{text}
Time: %{x}' )) + + # Add SELL markers + if sell_trades: + fig.add_trace(go.Scatter( + x=[t['timestamp'] for t in sell_trades], + y=[t['price'] for t in sell_trades], + mode='markers', + marker=dict( + symbol='triangle-down', + size=12, + color='#ff1744', + line=dict(color='white', width=1) + ), + name='SELL', + text=[f"SELL {t['amount']:.4f} @ ${t['price']:.2f}" for t in sell_trades], + hovertemplate='%{text}
Time: %{x}' + )) + + # Add moving averages if we have enough data + if len(closes) >= 20: + # 20-period SMA + sma_20 = pd.Series(closes).rolling(window=20).mean() + fig.add_trace(go.Scatter( + x=timestamps, + y=sma_20, + name='SMA 20', + line=dict(color='#ffeb3b', width=1), + opacity=0.7 + )) + + if len(closes) >= 50: + # 50-period SMA + sma_50 = pd.Series(closes).rolling(window=50).mean() + fig.add_trace(go.Scatter( + x=timestamps, + y=sma_50, + name='SMA 50', + line=dict(color='#ff9800', width=1), + opacity=0.7 + )) # Update layout fig.update_layout( + title=f"{self.symbol} - {interval_key} Chart ({len(candles)} candles)", template="plotly_dark", height=600, - title=f"{self.symbol} - {interval_key} Chart (Live Trading)", xaxis_title="Time", yaxis_title="Price ($)", - showlegend=True, - margin=dict(l=0, r=0, t=40, b=0), - xaxis_rangeslider_visible=False, - hovermode='x unified' + legend=dict( + yanchor="top", + y=0.99, + xanchor="left", + x=0.01, + bgcolor="rgba(0,0,0,0.5)" + ), + hovermode='x unified', + dragmode='pan' ) - # Format Y-axis with appropriate decimal places - fig.update_yaxes(tickformat=".2f") + # Remove range slider for better performance + fig.update_layout(xaxis_rangeslider_visible=False) - # Format X-axis - fig.update_xaxes( - rangeslider_visible=False, - type='date' - ) + # Update the latest price + if closes: + self.latest_price = closes[-1] + self.latest_timestamp = timestamps[-1] return fig @@ -2510,195 +2338,153 @@ class RealTimeChart: # Return error figure fig = go.Figure() fig.add_annotation( - text=f"Error loading chart: {str(e)}", + text=f"Chart Error: {str(e)}", xref="paper", yref="paper", - x=0.5, y=0.5, showarrow=False, + x=0.5, y=0.5, + showarrow=False, font=dict(size=16, color="red") ) fig.update_layout( + title="Chart Error", template="plotly_dark", - height=600, - title=f"{self.symbol} Chart - Error" + height=600 ) return fig -class BinanceWebSocket: - """Binance WebSocket implementation for real-time tick data""" - def __init__(self, symbol: str): - self.symbol = symbol.replace('/', '').lower() - self.ws = None - self.running = False - self.reconnect_delay = 1 - self.max_reconnect_delay = 60 - self.message_count = 0 - - # Binance WebSocket configuration - self.ws_url = f"wss://stream.binance.com:9443/ws/{self.symbol}@trade" - logger.info(f"Initialized Binance WebSocket for symbol: {self.symbol}") + def set_trading_env(self, trading_env): + """Set the trading environment to monitor for new trades""" + self.trading_env = trading_env + if hasattr(trading_env, 'add_trade_callback'): + trading_env.add_trade_callback(self.add_trade) + logger.info("Trading environment integrated with chart") - async def connect(self): - while True: - try: - logger.info(f"Attempting to connect to {self.ws_url}") - self.ws = await websockets.connect(self.ws_url) - logger.info("WebSocket connection established") - - self.running = True - self.reconnect_delay = 1 - logger.info(f"Successfully connected to Binance WebSocket for {self.symbol}") - return True - except Exception as e: - logger.error(f"WebSocket connection error: {str(e)}") - await asyncio.sleep(self.reconnect_delay) - self.reconnect_delay = min(self.reconnect_delay * 2, self.max_reconnect_delay) - continue + def set_agent(self, agent): + """Set the agent to monitor for trading decisions""" + self.agent = agent + logger.info("Agent integrated with chart") - async def receive(self) -> Optional[Dict]: - if not self.ws: - return None - + def update_from_env(self, env_data): + """Update chart data from trading environment""" try: - message = await self.ws.recv() - self.message_count += 1 + if 'latest_price' in env_data: + self.latest_price = env_data['latest_price'] - if self.message_count % 100 == 0: # Log every 100th message to avoid spam - logger.info(f"Received message #{self.message_count}") - logger.debug(f"Raw message: {message[:200]}...") + if 'balance' in env_data: + self.current_balance = env_data['balance'] - data = json.loads(message) + if 'pnl' in env_data: + self.accumulative_pnl = env_data['pnl'] - # Process trade data - if 'e' in data and data['e'] == 'trade': - trade_data = { - 'timestamp': data['T'], # Trade time - 'price': float(data['p']), # Price - 'volume': float(data['q']), # Quantity - 'type': 'trade' - } - logger.debug(f"Processed trade data: {trade_data}") - return trade_data - - return None - except websockets.exceptions.ConnectionClosed: - logger.warning("WebSocket connection closed") - self.running = False - return None - except json.JSONDecodeError as e: - logger.error(f"JSON decode error: {str(e)}, message: {message[:200]}...") - return None + if 'trades' in env_data: + # Add any new trades + for trade in env_data['trades']: + if trade not in self.all_trades: + self.add_trade( + action=trade.get('action', 'HOLD'), + price=trade.get('price', self.latest_price), + amount=trade.get('amount', 0.1), + timestamp=trade.get('timestamp', datetime.now()), + trade_id=trade.get('id') + ) except Exception as e: - logger.error(f"Error receiving message: {str(e)}") - return None + logger.error(f"Error updating from environment: {str(e)}") - async def close(self): - """Close the WebSocket connection""" - if self.ws: - await self.ws.close() + def get_latest_data(self): + """Get the latest data for external systems""" + return { + 'latest_price': self.latest_price, + 'latest_volume': self.latest_volume, + 'latest_timestamp': self.latest_timestamp, + 'current_balance': self.current_balance, + 'accumulative_pnl': self.accumulative_pnl, + 'positions': len(self.positions), + 'trade_count': len(self.all_trades), + 'trade_rate': self._calculate_trade_rate() + } -class ExchangeWebSocket: - """Generic WebSocket interface for cryptocurrency exchanges""" - def __init__(self, symbol: str, exchange: str = "binance"): - self.symbol = symbol - self.exchange = exchange.lower() - self.ws = None - - # Initialize the appropriate WebSocket implementation - if self.exchange == "binance": - self.ws = BinanceWebSocket(symbol) - else: - raise ValueError(f"Unsupported exchange: {exchange}") - - async def connect(self): - """Connect to the exchange WebSocket""" - return await self.ws.connect() - - async def receive(self) -> Optional[Dict]: - """Receive data from the WebSocket""" - return await self.ws.receive() - - async def close(self): - """Close the WebSocket connection""" - await self.ws.close() - - @property - def running(self): - """Check if the WebSocket is running""" - return self.ws.running if self.ws else False - -async def main(): - global charts # Make charts globally accessible for NN integration - symbols = ["ETH/USDT"] # Only use one symbol to simplify debugging - logger.info(f"Starting application for symbols: {symbols}") - - # Initialize neural network if enabled - if NN_ENABLED: - logger.info("Initializing Neural Network integration...") - if setup_neural_network(): - logger.info("Neural Network integration initialized successfully") - else: - logger.warning("Neural Network integration failed to initialize") - - charts = [] - websocket_tasks = [] - - # Create a chart and websocket task for each symbol - for symbol in symbols: + async def start_websocket(self): + """Start the websocket connection for real-time data""" try: - # Create a proper Dash app for each chart - app = dash.Dash(__name__, - external_stylesheets=[dbc.themes.DARKLY], - suppress_callback_exceptions=True) - - # Initialize the chart with the app - chart = RealTimeChart( - app=app, - symbol=symbol, - timeframe='1m', - standalone=True, - chart_title=f"{symbol} Realtime Trading Chart", - debug_mode=True, - port=8050, - height=800, - width=1200 - ) + logger.info("Starting websocket connection for real-time data") - charts.append(chart) + # Start the websocket data fetching + websocket_url = "wss://stream.binance.com:9443/ws/ethusdt@ticker" - # Start the chart's websocket in a separate task - websocket_task = asyncio.create_task(chart.start_websocket()) - websocket_tasks.append(websocket_task) - - # Run the Dash app in a separate thread - port = 8050 + len(charts) - 1 # Use different ports for each chart - logger.info(f"Starting chart for {chart.symbol} on port {port}") - thread = Thread(target=lambda c=chart, p=port: c.run(port=p)) - thread.daemon = True - thread.start() - logger.info(f"Thread started for {chart.symbol} on port {port}") + async def websocket_handler(): + """Handle websocket connection and data updates""" + try: + async with websockets.connect(websocket_url) as websocket: + logger.info(f"WebSocket connected for {self.symbol}") + message_count = 0 + + async for message in websocket: + try: + data = json.loads(message) + + # Update tick storage with new price data + tick = { + 'price': float(data['c']), # Current price + 'volume': float(data['v']), # Volume + 'timestamp': pd.Timestamp.now() + } + + self.tick_storage.add_tick(tick) + + # Update chart's latest price and volume + self.latest_price = float(data['c']) + self.latest_volume = float(data['v']) + self.latest_timestamp = pd.Timestamp.now() + + message_count += 1 + + # Log periodic updates + if message_count % 100 == 0: + logger.info(f"Received message #{message_count}") + logger.info(f"Processed {message_count} ticks, current price: ${self.latest_price:.2f}") + + # Log candle counts + candle_count = len(self.tick_storage.candles.get("1s", [])) + logger.info(f"Current 1s candles count: {candle_count}") + + except json.JSONDecodeError as e: + logger.warning(f"Failed to parse websocket message: {str(e)}") + except Exception as e: + logger.error(f"Error processing websocket message: {str(e)}") + + except websockets.exceptions.ConnectionClosed: + logger.warning("WebSocket connection closed") + except Exception as e: + logger.error(f"WebSocket error: {str(e)}") + + # Start the websocket handler in the background + await websocket_handler() + except Exception as e: - logger.error(f"Error initializing chart for {symbol}: {str(e)}") + logger.error(f"Error starting websocket: {str(e)}") import traceback logger.error(traceback.format_exc()) - - try: - # Keep the main task running - while True: - await asyncio.sleep(1) - except KeyboardInterrupt: - logger.info("Shutting down...") - except Exception as e: - logger.error(f"Unexpected error: {str(e)}") - finally: - for task in websocket_tasks: - task.cancel() - try: - await task - except asyncio.CancelledError: - pass - -if __name__ == "__main__": - try: - asyncio.run(main()) - except KeyboardInterrupt: - logger.info("Application terminated by user") + def run(self, host='127.0.0.1', port=8050, debug=False): + """Run the Dash app""" + try: + if self.app is None: + logger.error("No Dash app instance available") + return + + logger.info("="*60) + logger.info("🔗 ACCESS WEB UI AT: http://localhost:8050/") + logger.info("📊 View live trading data and charts in your browser") + logger.info("="*60) + + # Run the app + self.app.run_server( + host=host, + port=port, + debug=debug, + use_reloader=False, # Disable reloader to avoid conflicts + threaded=True # Enable threading for better performance + ) + except Exception as e: + logger.error(f"Error running Dash app: {str(e)}") + import traceback + logger.error(traceback.format_exc())