#!/usr/bin/env python3 """ RinCoin Mining Pool Server Distributes block rewards among multiple miners based on share contributions """ import socket import threading import json import time import requests import hashlib import struct import sqlite3 from datetime import datetime from requests.auth import HTTPBasicAuth # Import web interface from pool_web_interface import start_web_interface # Import stratum base class from stratum_proxy import RinCoinStratumBase class RinCoinMiningPool(RinCoinStratumBase): def __init__(self, stratum_host='0.0.0.0', stratum_port=3333, rpc_host='127.0.0.1', rpc_port=9556, rpc_user='rinrpc', rpc_password='745ce784d5d537fc06105a1b935b7657903cfc71a5fb3b90', pool_address='rin1qahvvv9d5f3443wtckeqavwp9950wacxfmwv20q', pool_fee_percent=1.0): # Initialize base class super().__init__(stratum_host, stratum_port, rpc_host, rpc_port, rpc_user, rpc_password, pool_address) self.pool_address = pool_address self.pool_fee_percent = pool_fee_percent # Pool statistics self.total_shares = 0 self.total_blocks = 0 self.pool_hashrate = 0 # Database for persistent storage self.init_database() print(f"=== RinCoin Mining Pool Server ===") print(f"Stratum: {stratum_host}:{stratum_port}") print(f"RPC: {rpc_host}:{rpc_port}") print(f"Pool Address: {pool_address}") print(f"Pool Fee: {pool_fee_percent}%") def init_database(self): """Initialize SQLite database for miner tracking""" self.db = sqlite3.connect(':memory:', check_same_thread=False) cursor = self.db.cursor() # Create tables cursor.execute(''' CREATE TABLE IF NOT EXISTS miners ( id INTEGER PRIMARY KEY, user TEXT NOT NULL, worker TEXT NOT NULL, address TEXT, shares INTEGER DEFAULT 0, last_share TIMESTAMP, last_hashrate REAL DEFAULT 0, created TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS shares ( id INTEGER PRIMARY KEY, miner_id INTEGER, job_id TEXT, difficulty REAL, submitted TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (miner_id) REFERENCES miners (id) ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS blocks ( id INTEGER PRIMARY KEY, block_hash TEXT, height INTEGER, reward REAL, pool_fee REAL, miner_rewards TEXT, -- JSON of {address: amount} found_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Samples for pool hashrate chart cursor.execute(''' CREATE TABLE IF NOT EXISTS hashrate_samples ( id INTEGER PRIMARY KEY, ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, hashrate REAL ) ''') self.db.commit() def get_pool_block_template(self): """Get new block template and create pool-style job""" template = super().get_block_template() if template: # Convert to pool-style job format if needed job = self.current_job if job: # Add pool-specific fields job["coinb1"] = "01000000" + "0" * 60 job["coinb2"] = "ffffffff" job["merkle_branch"] = [] job["clean_jobs"] = True return job return None def validate_rincoin_address(self, address): """Validate if an address is a valid RinCoin address""" try: return self.decode_bech32_address(address) is not None except: return False def register_miner(self, user, worker, address=None): """Register or update miner in database""" cursor = self.db.cursor() # Check if miner exists cursor.execute('SELECT id, address FROM miners WHERE user = ? AND worker = ?', (user, worker)) result = cursor.fetchone() if result: miner_id, existing_address = result if address and not existing_address: cursor.execute('UPDATE miners SET address = ? WHERE id = ?', (address, miner_id)) self.db.commit() return miner_id else: # Create new miner cursor.execute('INSERT INTO miners (user, worker, address) VALUES (?, ?, ?)', (user, worker, address)) self.db.commit() return cursor.lastrowid def record_share(self, miner_id, job_id, difficulty): """Record a share submission""" cursor = self.db.cursor() # Record share cursor.execute('INSERT INTO shares (miner_id, job_id, difficulty) VALUES (?, ?, ?)', (miner_id, job_id, difficulty)) # Update miner stats cursor.execute('UPDATE miners SET shares = shares + 1, last_share = CURRENT_TIMESTAMP WHERE id = ?', (miner_id,)) self.db.commit() self.total_shares += 1 def distribute_block_reward(self, block_hash, block_height, total_reward): """Distribute block reward among miners based on their shares""" cursor = self.db.cursor() # Calculate pool fee pool_fee = total_reward * (self.pool_fee_percent / 100.0) miner_reward = total_reward - pool_fee # Get shares from last 24 hours cursor.execute(''' SELECT m.address, COUNT(s.id) as share_count, SUM(s.difficulty) as total_difficulty FROM miners m JOIN shares s ON m.id = s.miner_id WHERE s.submitted > datetime('now', '-1 day') GROUP BY m.id, m.address HAVING share_count > 0 ''') miners = cursor.fetchall() if not miners: print("No miners with shares in last 24 hours") return # Calculate total difficulty total_difficulty = sum(row[2] for row in miners) # Separate miners with and without addresses miners_with_addresses = [] miners_without_addresses = [] total_difficulty_with_addresses = 0 total_difficulty_without_addresses = 0 for address, share_count, difficulty in miners: if address: miners_with_addresses.append((address, share_count, difficulty)) total_difficulty_with_addresses += difficulty else: miners_without_addresses.append((address, share_count, difficulty)) total_difficulty_without_addresses += difficulty # Calculate total difficulty total_difficulty = total_difficulty_with_addresses + total_difficulty_without_addresses if total_difficulty == 0: print("No valid difficulty found") return # Distribute rewards miner_rewards = {} # First, distribute to miners with valid addresses if miners_with_addresses: for address, share_count, difficulty in miners_with_addresses: reward_share = (difficulty / total_difficulty) * miner_reward miner_rewards[address] = reward_share print(f"šŸ’° Miner {address}: {reward_share:.8f} RIN ({difficulty} difficulty)") # Calculate undistributed rewards (from miners without addresses) if miners_without_addresses: undistributed_reward = 0 for address, share_count, difficulty in miners_without_addresses: undistributed_reward += (difficulty / total_difficulty) * miner_reward print(f"āš ļø Miner without address: {difficulty} difficulty -> {undistributed_reward:.8f} RIN to pool") # Keep undistributed rewards for pool (no redistribution) print(f"šŸ’° Pool keeps {undistributed_reward:.8f} RIN from miners without addresses") # Record block cursor.execute(''' INSERT INTO blocks (block_hash, height, reward, pool_fee, miner_rewards) VALUES (?, ?, ?, ?, ?) ''', (block_hash, block_height, total_reward, pool_fee, json.dumps(miner_rewards))) self.db.commit() self.total_blocks += 1 print(f"šŸŽ‰ Block {block_height} reward distributed!") print(f"šŸ’° Pool fee: {pool_fee:.8f} RIN") print(f"šŸ’° Total distributed: {sum(miner_rewards.values()):.8f} RIN") # Summary if miners_without_addresses: print(f"šŸ“Š Summary: {len(miners_with_addresses)} miners with addresses, {len(miners_without_addresses)} without (rewards to pool)") # Use inherited send_stratum_response and send_stratum_notification from base class def handle_stratum_message(self, client, addr, message): """Handle incoming Stratum message from miner""" try: data = json.loads(message.strip()) method = data.get("method") msg_id = data.get("id") params = data.get("params", []) print(f"[{addr}] {method}: {params}") if method == "mining.subscribe": # Subscribe response self.send_stratum_response(client, msg_id, [ [["mining.set_difficulty", "subscription_id"], ["mining.notify", "subscription_id"]], "extranonce1", 4 ]) # Send difficulty (lower for CPU mining) self.send_stratum_notification(client, "mining.set_difficulty", [0.0001]) # Send initial job if self.get_pool_block_template(): job = self.current_job self.send_stratum_notification(client, "mining.notify", [ job["job_id"], job["prevhash"], job["coinb1"], job["coinb2"], job["merkle_branch"], f"{job['version']:08x}", job["bits"], job["ntime"], job["clean_jobs"] ]) elif method == "mining.extranonce.subscribe": # Handle extranonce subscription print(f"[{addr}] Extranonce subscription requested") self.send_stratum_response(client, msg_id, True) elif method == "mining.authorize": # Parse user.worker format if len(params) >= 2: user_worker = params[0] password = params[1] if len(params) > 1 else "" # Extract user and worker if '.' in user_worker: user, worker = user_worker.split('.', 1) else: user = user_worker worker = "default" # Check if user contains a RinCoin address (starts with 'rin') miner_address = None if user.startswith('rin'): # User is a RinCoin address if self.validate_rincoin_address(user): miner_address = user user = f"miner_{miner_address[:8]}" # Create a user ID from address print(f"[{addr}] āœ… Miner using valid RinCoin address: {miner_address}") else: print(f"[{addr}] āŒ Invalid RinCoin address: {user}") self.send_stratum_response(client, msg_id, False, "Invalid RinCoin address") return elif '.' in user and user.split('.')[0].startswith('rin'): # Format: rin1qahvvv9d5f3443wtckeqavwp9950wacxfmwv20q.workername address_part, worker_part = user.split('.', 1) if address_part.startswith('rin'): if self.validate_rincoin_address(address_part): miner_address = address_part user = f"miner_{miner_address[:8]}" worker = worker_part print(f"[{addr}] āœ… Miner using valid RinCoin address format: {miner_address}.{worker}") else: print(f"[{addr}] āŒ Invalid RinCoin address: {address_part}") self.send_stratum_response(client, msg_id, False, "Invalid RinCoin address") return # Register miner with address miner_id = self.register_miner(user, worker, miner_address) # Store client info self.clients[addr] = { 'client': client, 'user': user, 'worker': worker, 'miner_id': miner_id, 'address': miner_address, 'shares': 0, 'last_share': time.time(), 'extranonce1': '00000000' # Default extranonce1 } if miner_address: print(f"[{addr}] āœ… Authorized: {user}.{worker} -> {miner_address}") else: print(f"[{addr}] āš ļø Authorized: {user}.{worker} (rewards will go to pool address)") self.send_stratum_response(client, msg_id, True) else: self.send_stratum_response(client, msg_id, False, "Invalid authorization") elif method == "mining.submit": # Submit share if addr not in self.clients: self.send_stratum_response(client, msg_id, False, "Not authorized") return miner_info = self.clients[addr] try: if self.current_job and len(params) >= 5: username = params[0] job_id = params[1] extranonce2 = params[2] ntime = params[3] nonce = params[4] # Use base class to validate and submit share extranonce1 = miner_info.get('extranonce1', '00000000') miner_address = miner_info.get('address') # For pool mining, always mine to pool address success, message = self.submit_share( self.current_job, extranonce1, extranonce2, ntime, nonce, target_address=self.pool_address ) if success: # Record share with estimated difficulty actual_difficulty = 0.00133 # Estimated for ~381 kH/s self.record_share(miner_info['miner_id'], job_id, actual_difficulty) # Update miner stats now_ts = time.time() prev_ts = miner_info.get('last_share') or now_ts dt = max(now_ts - prev_ts, 1e-3) miner_hashrate = actual_difficulty * (2**32) / dt if miner_info['shares'] == 0: miner_hashrate = 381000 # Default estimate miner_info['shares'] += 1 miner_info['last_share'] = now_ts # Update database try: cursor = self.db.cursor() cursor.execute('UPDATE miners SET last_share = CURRENT_TIMESTAMP, last_hashrate = ? WHERE id = ?', (miner_hashrate, miner_info['miner_id'])) self.db.commit() except Exception as e: print(f"DB update error: {e}") print(f"[{addr}] āœ… Share accepted from {miner_info['user']}.{miner_info['worker']} (Total: {miner_info['shares']})") self.send_stratum_response(client, msg_id, True) # If block was found, distribute rewards if "Block found" in message: print(f"šŸŽ‰ [{addr}] BLOCK FOUND!") # Get block info and distribute rewards total_reward = self.current_job['coinbasevalue'] / 100000000 if self.current_job else 25.0 self.distribute_block_reward("pending", self.current_job['height'] if self.current_job else 0, total_reward) else: # Accept as share for pool statistics even if block validation fails self.send_stratum_response(client, msg_id, True) else: print(f"[{addr}] Invalid share parameters") self.send_stratum_response(client, msg_id, False, "Invalid parameters") except Exception as e: print(f"[{addr}] Share processing error: {e}") # Still accept the share for mining statistics self.send_stratum_response(client, msg_id, True) else: print(f"[{addr}] āš ļø Unknown method: {method}") # Send null result for unknown methods (standard Stratum behavior) self.send_stratum_response(client, msg_id, None, None) except json.JSONDecodeError: print(f"[{addr}] Invalid JSON: {message}") except Exception as e: print(f"[{addr}] Message handling error: {e}") def handle_client(self, client, addr): """Handle individual client connection""" print(f"[{addr}] Connected") try: while self.running: data = client.recv(4096) if not data: break # Handle multiple messages in one packet messages = data.decode('utf-8').strip().split('\n') for message in messages: if message: self.handle_stratum_message(client, addr, message) except Exception as e: print(f"[{addr}] Client error: {e}") finally: client.close() if addr in self.clients: del self.clients[addr] print(f"[{addr}] Disconnected") def job_updater(self): """Periodically update mining jobs""" last_job_time = 0 last_block_height = 0 while self.running: try: # Check for new blocks every 10 seconds time.sleep(10) # Get current blockchain info blockchain_info = self.rpc_call("getblockchaininfo") if blockchain_info: current_height = blockchain_info.get('blocks', 0) # Create new job if: # 1. New block detected # 2. 30+ seconds since last job # 3. No current job exists should_create_job = ( current_height != last_block_height or time.time() - last_job_time > 30 or not self.current_job ) if should_create_job: if self.get_pool_block_template(): job = self.current_job last_job_time = time.time() last_block_height = current_height print(f"šŸ“¦ New job created: {job['job_id']} (block {current_height})") # Send to all connected clients for addr, miner_info in list(self.clients.items()): try: self.send_stratum_notification(miner_info['client'], "mining.notify", [ job["job_id"], job["prevhash"], job["coinb1"], job["coinb2"], job["merkle_branch"], f"{job['version']:08x}", job["bits"], job["ntime"], job["clean_jobs"] ]) except Exception as e: print(f"Failed to send job to {addr}: {e}") except Exception as e: print(f"Job updater error: {e}") def stats_updater(self): """Periodically update pool statistics""" while self.running: try: time.sleep(60) # Update every minute cursor = self.db.cursor() # Pool hashrate is the sum of miners' last hashrates cursor.execute('SELECT COALESCE(SUM(last_hashrate), 0) FROM miners') self.pool_hashrate = cursor.fetchone()[0] or 0.0 # Sample for chart cursor.execute('INSERT INTO hashrate_samples (hashrate) VALUES (?)', (self.pool_hashrate,)) self.db.commit() print(f"šŸ“Š Pool Stats: {len(self.clients)} miners, {self.total_shares} shares, {self.pool_hashrate/1000:.2f} kH/s") except Exception as e: print(f"Stats updater error: {e}") def start(self): """Start the mining pool server""" try: # Test RPC connection blockchain_info = self.rpc_call("getblockchaininfo") if not blockchain_info: print("āŒ Failed to connect to RinCoin node!") return print(f"āœ… Connected to RinCoin node (block {blockchain_info.get('blocks', 'unknown')})") # Start background threads job_thread = threading.Thread(target=self.job_updater, daemon=True) job_thread.start() stats_thread = threading.Thread(target=self.stats_updater, daemon=True) stats_thread.start() # Start web interface in background web_thread = threading.Thread(target=start_web_interface, args=(self.db, '0.0.0.0', 8083, self.rpc_host, self.rpc_port, self.rpc_user, self.rpc_password), daemon=True) web_thread.start() print(f"🌐 Web dashboard started on http://0.0.0.0:8083") # Start Stratum server server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.bind((self.stratum_host, self.stratum_port)) server_socket.listen(10) print(f"šŸš€ Mining pool listening on {self.stratum_host}:{self.stratum_port}") print("Ready for multiple miners...") print("") print(f"šŸ’° Pool address: {self.pool_address}") print(f"šŸ’° Pool fee: {self.pool_fee_percent}%") print("") print("Connect miners with:") print(f"./cpuminer -a rinhash -o stratum+tcp://{self.stratum_host}:{self.stratum_port} -u username.workername -p x") print("") while self.running: try: client, addr = server_socket.accept() client_thread = threading.Thread(target=self.handle_client, args=(client, addr), daemon=True) client_thread.start() except KeyboardInterrupt: print("\nšŸ›‘ Shutting down pool...") self.running = False break except Exception as e: print(f"Server error: {e}") except OSError as e: if "Address already in use" in str(e): print(f"āŒ Port {self.stratum_port} is already in use!") print("") print("šŸ” Check what's using the port:") print(f"sudo netstat -tlnp | grep :{self.stratum_port}") print("") print("šŸ›‘ Kill existing process:") print(f"sudo lsof -ti:{self.stratum_port} | xargs sudo kill -9") print("") print("šŸ”„ Or use a different port by editing the script") else: print(f"Failed to start server: {e}") except Exception as e: print(f"Failed to start server: {e}") finally: print("Pool server stopped") if __name__ == "__main__": pool = RinCoinMiningPool() pool.start()