#!/usr/bin/env python3 """ RinCoin Mining Pool Server Distributes block rewards among multiple miners based on share contributions """ import socket import threading import json import time import requests import hashlib import struct import sqlite3 from datetime import datetime from requests.auth import HTTPBasicAuth class RinCoinMiningPool: def __init__(self, stratum_host='0.0.0.0', stratum_port=3333, rpc_host='127.0.0.1', rpc_port=9556, rpc_user='rinrpc', rpc_password='745ce784d5d5d537fc06105a1b935b7657903cfc71a5fb3b90', pool_address='rin1qahvvv9d5f3443wtckeqavwp9950wacxfmwv20q', pool_fee_percent=1.0): self.stratum_host = stratum_host self.stratum_port = stratum_port self.rpc_host = rpc_host self.rpc_port = rpc_port self.rpc_user = rpc_user self.rpc_password = rpc_password self.pool_address = pool_address self.pool_fee_percent = pool_fee_percent # Miner tracking self.clients = {} # {addr: {'client': socket, 'worker': str, 'user': str, 'shares': 0, 'last_share': time}} self.job_counter = 0 self.current_job = None self.running = True # Pool statistics self.total_shares = 0 self.total_blocks = 0 self.pool_hashrate = 0 # Database for persistent storage self.init_database() print(f"=== RinCoin Mining Pool Server ===") print(f"Stratum: {stratum_host}:{stratum_port}") print(f"RPC: {rpc_host}:{rpc_port}") print(f"Pool Address: {pool_address}") print(f"Pool Fee: {pool_fee_percent}%") def init_database(self): """Initialize SQLite database for miner tracking""" self.db = sqlite3.connect(':memory:', check_same_thread=False) cursor = self.db.cursor() # Create tables cursor.execute(''' CREATE TABLE IF NOT EXISTS miners ( id INTEGER PRIMARY KEY, user TEXT NOT NULL, worker TEXT NOT NULL, address TEXT, shares INTEGER DEFAULT 0, last_share TIMESTAMP, created TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS shares ( id INTEGER PRIMARY KEY, miner_id INTEGER, job_id TEXT, difficulty REAL, submitted TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (miner_id) REFERENCES miners (id) ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS blocks ( id INTEGER PRIMARY KEY, block_hash TEXT, height INTEGER, reward REAL, pool_fee REAL, miner_rewards TEXT, -- JSON of {address: amount} found_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') self.db.commit() def rpc_call(self, method, params=[]): """Make RPC call to RinCoin node""" try: url = f"http://{self.rpc_host}:{self.rpc_port}/" headers = {'content-type': 'text/plain'} auth = HTTPBasicAuth(self.rpc_user, self.rpc_password) payload = { "jsonrpc": "1.0", "id": "mining_pool", "method": method, "params": params } response = requests.post(url, json=payload, headers=headers, auth=auth, timeout=10) if response.status_code == 200: result = response.json() if 'error' in result and result['error'] is not None: print(f"RPC Error: {result['error']}") return None return result.get('result') else: print(f"HTTP Error: {response.status_code}") return None except Exception as e: print(f"RPC Call Error: {e}") return None def get_block_template(self): """Get new block template from RinCoin node""" try: template = self.rpc_call("getblocktemplate", [{"rules": ["segwit", "mweb"]}]) if template: self.job_counter += 1 job = { "job_id": f"job_{self.job_counter}", "template": template, "prevhash": template.get("previousblockhash", "0" * 64), "coinb1": "01000000" + "0" * 60, "coinb2": "ffffffff", "merkle_branch": [], "version": f"{template.get('version', 1):08x}", "nbits": template.get("bits", "1d00ffff"), "ntime": f"{int(time.time()):08x}", "clean_jobs": True, "target": template.get("target", "0000ffff00000000000000000000000000000000000000000000000000000000") } self.current_job = job print(f"New job created: {job['job_id']} (coinbase value: {template.get('coinbasevalue', 0)} satoshis)") return job return None except Exception as e: print(f"Get block template error: {e}") return None def register_miner(self, user, worker, address=None): """Register or update miner in database""" cursor = self.db.cursor() # Check if miner exists cursor.execute('SELECT id, address FROM miners WHERE user = ? AND worker = ?', (user, worker)) result = cursor.fetchone() if result: miner_id, existing_address = result if address and not existing_address: cursor.execute('UPDATE miners SET address = ? WHERE id = ?', (address, miner_id)) self.db.commit() return miner_id else: # Create new miner cursor.execute('INSERT INTO miners (user, worker, address) VALUES (?, ?, ?)', (user, worker, address)) self.db.commit() return cursor.lastrowid def record_share(self, miner_id, job_id, difficulty): """Record a share submission""" cursor = self.db.cursor() # Record share cursor.execute('INSERT INTO shares (miner_id, job_id, difficulty) VALUES (?, ?, ?)', (miner_id, job_id, difficulty)) # Update miner stats cursor.execute('UPDATE miners SET shares = shares + 1, last_share = CURRENT_TIMESTAMP WHERE id = ?', (miner_id,)) self.db.commit() self.total_shares += 1 def distribute_block_reward(self, block_hash, block_height, total_reward): """Distribute block reward among miners based on their shares""" cursor = self.db.cursor() # Calculate pool fee pool_fee = total_reward * (self.pool_fee_percent / 100.0) miner_reward = total_reward - pool_fee # Get shares from last 24 hours cursor.execute(''' SELECT m.address, COUNT(s.id) as share_count, SUM(s.difficulty) as total_difficulty FROM miners m JOIN shares s ON m.id = s.miner_id WHERE s.submitted > datetime('now', '-1 day') GROUP BY m.id, m.address HAVING share_count > 0 ''') miners = cursor.fetchall() if not miners: print("No miners with shares in last 24 hours") return # Calculate total difficulty total_difficulty = sum(row[2] for row in miners) # Distribute rewards miner_rewards = {} for address, share_count, difficulty in miners: if address and total_difficulty > 0: reward_share = (difficulty / total_difficulty) * miner_reward miner_rewards[address] = reward_share print(f"šŸ’° Miner {address}: {reward_share:.8f} RIN ({difficulty} difficulty)") # Record block cursor.execute(''' INSERT INTO blocks (block_hash, height, reward, pool_fee, miner_rewards) VALUES (?, ?, ?, ?, ?) ''', (block_hash, block_height, total_reward, pool_fee, json.dumps(miner_rewards))) self.db.commit() self.total_blocks += 1 print(f"šŸŽ‰ Block {block_height} reward distributed!") print(f"šŸ’° Pool fee: {pool_fee:.8f} RIN") print(f"šŸ’° Total distributed: {sum(miner_rewards.values()):.8f} RIN") def send_stratum_response(self, client, msg_id, result, error=None): """Send Stratum response to client""" try: response = { "id": msg_id, "result": result, "error": error } message = json.dumps(response) + "\n" client.send(message.encode('utf-8')) except Exception as e: print(f"Send response error: {e}") def send_stratum_notification(self, client, method, params): """Send Stratum notification to client""" try: notification = { "id": None, "method": method, "params": params } message = json.dumps(notification) + "\n" client.send(message.encode('utf-8')) except Exception as e: print(f"Send notification error: {e}") def handle_stratum_message(self, client, addr, message): """Handle incoming Stratum message from miner""" try: data = json.loads(message.strip()) method = data.get("method") msg_id = data.get("id") params = data.get("params", []) print(f"[{addr}] {method}: {params}") if method == "mining.subscribe": # Subscribe response self.send_stratum_response(client, msg_id, [ [["mining.set_difficulty", "subscription_id"], ["mining.notify", "subscription_id"]], "extranonce1", 4 ]) # Send difficulty self.send_stratum_notification(client, "mining.set_difficulty", [1]) # Send initial job if self.get_block_template(): job = self.current_job self.send_stratum_notification(client, "mining.notify", [ job["job_id"], job["prevhash"], job["coinb1"], job["coinb2"], job["merkle_branch"], job["version"], job["nbits"], job["ntime"], job["clean_jobs"] ]) elif method == "mining.extranonce.subscribe": # Handle extranonce subscription print(f"[{addr}] Extranonce subscription requested") self.send_stratum_response(client, msg_id, True) elif method == "mining.authorize": # Parse user.worker format if len(params) >= 2: user_worker = params[0] password = params[1] if len(params) > 1 else "" # Extract user and worker if '.' in user_worker: user, worker = user_worker.split('.', 1) else: user = user_worker worker = "default" # Check if user contains a RinCoin address (starts with 'rin') miner_address = None if user.startswith('rin'): # User is a RinCoin address miner_address = user user = f"miner_{miner_address[:8]}" # Create a user ID from address print(f"[{addr}] Miner using address as username: {miner_address}") elif '.' in user and user.split('.')[0].startswith('rin'): # Format: rin1qahvvv9d5f3443wtckeqavwp9950wacxfmwv20q.workername address_part, worker_part = user.split('.', 1) if address_part.startswith('rin'): miner_address = address_part user = f"miner_{miner_address[:8]}" worker = worker_part print(f"[{addr}] Miner using address format: {miner_address}.{worker}") # Register miner with address miner_id = self.register_miner(user, worker, miner_address) # Store client info self.clients[addr] = { 'client': client, 'user': user, 'worker': worker, 'miner_id': miner_id, 'address': miner_address, 'shares': 0, 'last_share': time.time() } if miner_address: print(f"[{addr}] āœ… Authorized: {user}.{worker} -> {miner_address}") else: print(f"[{addr}] āš ļø Authorized: {user}.{worker} (no address specified)") self.send_stratum_response(client, msg_id, True) else: self.send_stratum_response(client, msg_id, False, "Invalid authorization") elif method == "mining.submit": # Submit share if addr not in self.clients: self.send_stratum_response(client, msg_id, False, "Not authorized") return miner_info = self.clients[addr] try: if self.current_job and len(params) >= 5: job_id = params[0] extranonce2 = params[1] ntime = params[2] nonce = params[3] # Record share self.record_share(miner_info['miner_id'], job_id, 1.0) # Simplified difficulty miner_info['shares'] += 1 miner_info['last_share'] = time.time() print(f"[{addr}] āœ… Share accepted from {miner_info['user']}.{miner_info['worker']} (Total: {miner_info['shares']})") # Try to submit block if it's a valid solution print(f"[{addr}] šŸ” Attempting to submit block solution...") # Use generatetoaddress to submit the mining result result = self.rpc_call("generatetoaddress", [1, self.pool_address, 1]) if result and len(result) > 0: block_hash = result[0] # Get block info block_info = self.rpc_call("getblock", [block_hash]) if block_info: block_height = block_info.get('height', 0) coinbase_tx = block_info.get('tx', [])[0] if block_info.get('tx') else None # Get coinbase value (simplified) total_reward = 50.0 # Default block reward print(f"šŸŽ‰ [{addr}] BLOCK FOUND! Hash: {block_hash}") print(f"šŸ’° Block reward: {total_reward} RIN") # Distribute rewards self.distribute_block_reward(block_hash, block_height, total_reward) self.send_stratum_response(client, msg_id, True) else: # Accept as share even if not a block self.send_stratum_response(client, msg_id, True) else: print(f"[{addr}] Invalid share parameters") self.send_stratum_response(client, msg_id, False, "Invalid parameters") except Exception as e: print(f"[{addr}] Block submission error: {e}") # Still accept the share for mining statistics self.send_stratum_response(client, msg_id, True) else: print(f"[{addr}] āš ļø Unknown method: {method}") # Send null result for unknown methods (standard Stratum behavior) self.send_stratum_response(client, msg_id, None, None) except json.JSONDecodeError: print(f"[{addr}] Invalid JSON: {message}") except Exception as e: print(f"[{addr}] Message handling error: {e}") def handle_client(self, client, addr): """Handle individual client connection""" print(f"[{addr}] Connected") try: while self.running: data = client.recv(4096) if not data: break # Handle multiple messages in one packet messages = data.decode('utf-8').strip().split('\n') for message in messages: if message: self.handle_stratum_message(client, addr, message) except Exception as e: print(f"[{addr}] Client error: {e}") finally: client.close() if addr in self.clients: del self.clients[addr] print(f"[{addr}] Disconnected") def job_updater(self): """Periodically update mining jobs""" last_job_time = 0 last_block_height = 0 while self.running: try: # Check for new blocks every 10 seconds time.sleep(10) # Get current blockchain info blockchain_info = self.rpc_call("getblockchaininfo") if blockchain_info: current_height = blockchain_info.get('blocks', 0) # Create new job if: # 1. New block detected # 2. 30+ seconds since last job # 3. No current job exists should_create_job = ( current_height != last_block_height or time.time() - last_job_time > 30 or not self.current_job ) if should_create_job: if self.get_block_template(): job = self.current_job last_job_time = time.time() last_block_height = current_height print(f"šŸ“¦ New job created: {job['job_id']} (block {current_height})") # Send to all connected clients for addr, miner_info in list(self.clients.items()): try: self.send_stratum_notification(miner_info['client'], "mining.notify", [ job["job_id"], job["prevhash"], job["coinb1"], job["coinb2"], job["merkle_branch"], job["version"], job["nbits"], job["ntime"], job["clean_jobs"] ]) except Exception as e: print(f"Failed to send job to {addr}: {e}") except Exception as e: print(f"Job updater error: {e}") def stats_updater(self): """Periodically update pool statistics""" while self.running: try: time.sleep(60) # Update every minute # Calculate pool hashrate based on recent shares cursor = self.db.cursor() cursor.execute(''' SELECT COUNT(*) FROM shares WHERE submitted > datetime('now', '-5 minutes') ''') recent_shares = cursor.fetchone()[0] self.pool_hashrate = recent_shares * 12 # Rough estimate (12 shares per minute = 1 H/s) print(f"šŸ“Š Pool Stats: {len(self.clients)} miners, {self.total_shares} shares, {self.pool_hashrate:.2f} H/s") except Exception as e: print(f"Stats updater error: {e}") def start(self): """Start the mining pool server""" try: # Test RPC connection blockchain_info = self.rpc_call("getblockchaininfo") if not blockchain_info: print("āŒ Failed to connect to RinCoin node!") return print(f"āœ… Connected to RinCoin node (block {blockchain_info.get('blocks', 'unknown')})") # Start background threads job_thread = threading.Thread(target=self.job_updater, daemon=True) job_thread.start() stats_thread = threading.Thread(target=self.stats_updater, daemon=True) stats_thread.start() # Start Stratum server server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.bind((self.stratum_host, self.stratum_port)) server_socket.listen(10) print(f"šŸš€ Mining pool listening on {self.stratum_host}:{self.stratum_port}") print("Ready for multiple miners...") print("") print(f"šŸ’° Pool address: {self.pool_address}") print(f"šŸ’° Pool fee: {self.pool_fee_percent}%") print("") print("Connect miners with:") print(f"./cpuminer -a rinhash -o stratum+tcp://{self.stratum_host}:{self.stratum_port} -u username.workername -p x") print("") while self.running: try: client, addr = server_socket.accept() client_thread = threading.Thread(target=self.handle_client, args=(client, addr), daemon=True) client_thread.start() except KeyboardInterrupt: print("\nšŸ›‘ Shutting down pool...") self.running = False break except Exception as e: print(f"Server error: {e}") except OSError as e: if "Address already in use" in str(e): print(f"āŒ Port {self.stratum_port} is already in use!") print("") print("šŸ” Check what's using the port:") print(f"sudo netstat -tlnp | grep :{self.stratum_port}") print("") print("šŸ›‘ Kill existing process:") print(f"sudo lsof -ti:{self.stratum_port} | xargs sudo kill -9") print("") print("šŸ”„ Or use a different port by editing the script") else: print(f"Failed to start server: {e}") except Exception as e: print(f"Failed to start server: {e}") finally: print("Pool server stopped") if __name__ == "__main__": pool = RinCoinMiningPool() pool.start()