# #!/usr/bin/env python3 # """ # RinCoin Mining Pool Server # Distributes block rewards among multiple miners based on share contributions # """ # import socket # import threading # import json # import time # import requests # import hashlib # import struct # import sqlite3 # from datetime import datetime # from requests.auth import HTTPBasicAuth # # Import web interface # from pool_web_interface import start_web_interface # # Import stratum base class # from stratum_proxy import RinCoinStratumBase # class RinCoinMiningPool(RinCoinStratumBase): # def __init__(self, stratum_host='0.0.0.0', stratum_port=3333, # rpc_host='127.0.0.1', rpc_port=9556, # rpc_user='rinrpc', rpc_password='745ce784d5d537fc06105a1b935b7657903cfc71a5fb3b90', # pool_address='rin1qahvvv9d5f3443wtckeqavwp9950wacxfmwv20q', # pool_fee_percent=1.0): # # Initialize base class # super().__init__(stratum_host, stratum_port, rpc_host, rpc_port, rpc_user, rpc_password, pool_address) # self.pool_address = pool_address # self.pool_fee_percent = pool_fee_percent # # Pool statistics # self.total_shares = 0 # self.total_blocks = 0 # self.pool_hashrate = 0 # # Database for persistent storage # self.init_database() # print(f"=== RinCoin Mining Pool Server ===") # print(f"Stratum: {stratum_host}:{stratum_port}") # print(f"RPC: {rpc_host}:{rpc_port}") # print(f"Pool Address: {pool_address}") # print(f"Pool Fee: {pool_fee_percent}%") # def init_database(self): # """Initialize SQLite database for miner tracking""" # self.db = sqlite3.connect(':memory:', check_same_thread=False) # cursor = self.db.cursor() # # Create tables # cursor.execute(''' # CREATE TABLE IF NOT EXISTS miners ( # id INTEGER PRIMARY KEY, # user TEXT NOT NULL, # worker TEXT NOT NULL, # address TEXT, # shares INTEGER DEFAULT 0, # last_share TIMESTAMP, # last_hashrate REAL DEFAULT 0, # created TIMESTAMP DEFAULT CURRENT_TIMESTAMP # ) # ''') # cursor.execute(''' # CREATE TABLE IF NOT EXISTS shares ( # id INTEGER PRIMARY KEY, # miner_id INTEGER, # job_id TEXT, # difficulty REAL, # submitted TIMESTAMP DEFAULT CURRENT_TIMESTAMP, # FOREIGN KEY (miner_id) REFERENCES miners (id) # ) # ''') # cursor.execute(''' # CREATE TABLE IF NOT EXISTS blocks ( # id INTEGER PRIMARY KEY, # block_hash TEXT, # height INTEGER, # reward REAL, # pool_fee REAL, # miner_rewards TEXT, -- JSON of {address: amount} # found_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP # ) # ''') # # Samples for pool hashrate chart # cursor.execute(''' # CREATE TABLE IF NOT EXISTS hashrate_samples ( # id INTEGER PRIMARY KEY, # ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, # hashrate REAL # ) # ''') # self.db.commit() # def get_pool_block_template(self): # """Get new block template and create pool-style job""" # template = super().get_block_template() # if template: # # Convert to pool-style job format if needed # job = self.current_job # if job: # # Add pool-specific fields # job["coinb1"] = "01000000" + "0" * 60 # job["coinb2"] = "ffffffff" # job["merkle_branch"] = [] # job["clean_jobs"] = True # return job # return None # def validate_rincoin_address(self, address): # """Validate if an address is a valid RinCoin address""" # try: # return self.decode_bech32_address(address) is not None # except: # return False # def register_miner(self, user, worker, address=None): # """Register or update miner in database""" # cursor = self.db.cursor() # # Check if miner exists # cursor.execute('SELECT id, address FROM miners WHERE user = ? AND worker = ?', (user, worker)) # result = cursor.fetchone() # if result: # miner_id, existing_address = result # if address and not existing_address: # cursor.execute('UPDATE miners SET address = ? WHERE id = ?', (address, miner_id)) # self.db.commit() # return miner_id # else: # # Create new miner # cursor.execute('INSERT INTO miners (user, worker, address) VALUES (?, ?, ?)', (user, worker, address)) # self.db.commit() # return cursor.lastrowid # def record_share(self, miner_id, job_id, difficulty): # """Record a share submission""" # cursor = self.db.cursor() # # Record share # cursor.execute('INSERT INTO shares (miner_id, job_id, difficulty) VALUES (?, ?, ?)', # (miner_id, job_id, difficulty)) # # Update miner stats # cursor.execute('UPDATE miners SET shares = shares + 1, last_share = CURRENT_TIMESTAMP WHERE id = ?', (miner_id,)) # self.db.commit() # self.total_shares += 1 # def distribute_block_reward(self, block_hash, block_height, total_reward): # """Distribute block reward among miners based on their shares""" # cursor = self.db.cursor() # # Calculate pool fee # pool_fee = total_reward * (self.pool_fee_percent / 100.0) # miner_reward = total_reward - pool_fee # # Get shares from last 24 hours # cursor.execute(''' # SELECT m.address, COUNT(s.id) as share_count, SUM(s.difficulty) as total_difficulty # FROM miners m # JOIN shares s ON m.id = s.miner_id # WHERE s.submitted > datetime('now', '-1 day') # GROUP BY m.id, m.address # HAVING share_count > 0 # ''') # miners = cursor.fetchall() # if not miners: # print("No miners with shares in last 24 hours") # return # # Calculate total difficulty # total_difficulty = sum(row[2] for row in miners) # # Separate miners with and without addresses # miners_with_addresses = [] # miners_without_addresses = [] # total_difficulty_with_addresses = 0 # total_difficulty_without_addresses = 0 # for address, share_count, difficulty in miners: # if address: # miners_with_addresses.append((address, share_count, difficulty)) # total_difficulty_with_addresses += difficulty # else: # miners_without_addresses.append((address, share_count, difficulty)) # total_difficulty_without_addresses += difficulty # # Calculate total difficulty # total_difficulty = total_difficulty_with_addresses + total_difficulty_without_addresses # if total_difficulty == 0: # print("No valid difficulty found") # return # # Distribute rewards # miner_rewards = {} # # First, distribute to miners with valid addresses # if miners_with_addresses: # for address, share_count, difficulty in miners_with_addresses: # reward_share = (difficulty / total_difficulty) * miner_reward # miner_rewards[address] = reward_share # print(f"šŸ’° Miner {address}: {reward_share:.8f} RIN ({difficulty} difficulty)") # # Calculate undistributed rewards (from miners without addresses) # if miners_without_addresses: # undistributed_reward = 0 # for address, share_count, difficulty in miners_without_addresses: # undistributed_reward += (difficulty / total_difficulty) * miner_reward # print(f"āš ļø Miner without address: {difficulty} difficulty -> {undistributed_reward:.8f} RIN to pool") # # Keep undistributed rewards for pool (no redistribution) # print(f"šŸ’° Pool keeps {undistributed_reward:.8f} RIN from miners without addresses") # # Record block # cursor.execute(''' # INSERT INTO blocks (block_hash, height, reward, pool_fee, miner_rewards) # VALUES (?, ?, ?, ?, ?) # ''', (block_hash, block_height, total_reward, pool_fee, json.dumps(miner_rewards))) # self.db.commit() # self.total_blocks += 1 # print(f"šŸŽ‰ Block {block_height} reward distributed!") # print(f"šŸ’° Pool fee: {pool_fee:.8f} RIN") # print(f"šŸ’° Total distributed: {sum(miner_rewards.values()):.8f} RIN") # # Summary # if miners_without_addresses: # print(f"šŸ“Š Summary: {len(miners_with_addresses)} miners with addresses, {len(miners_without_addresses)} without (rewards to pool)") # # Use inherited send_stratum_response and send_stratum_notification from base class # def handle_stratum_message(self, client, addr, message): # """Handle incoming Stratum message from miner""" # try: # data = json.loads(message.strip()) # method = data.get("method") # msg_id = data.get("id") # params = data.get("params", []) # print(f"[{addr}] {method}: {params}") # if method == "mining.subscribe": # # Subscribe response # self.send_stratum_response(client, msg_id, [ # [["mining.set_difficulty", "subscription_id"], ["mining.notify", "subscription_id"]], # "extranonce1", # 4 # ]) # # Send difficulty (lower for CPU mining) # self.send_stratum_notification(client, "mining.set_difficulty", [0.0001]) # # Send initial job # if self.get_pool_block_template(): # job = self.current_job # self.send_stratum_notification(client, "mining.notify", [ # job["job_id"], # job["prevhash"], # job["coinb1"], # job["coinb2"], # job["merkle_branch"], # f"{job['version']:08x}", # job["bits"], # job["ntime"], # job["clean_jobs"] # ]) # elif method == "mining.extranonce.subscribe": # # Handle extranonce subscription # print(f"[{addr}] Extranonce subscription requested") # self.send_stratum_response(client, msg_id, True) # elif method == "mining.authorize": # # Parse user.worker format # if len(params) >= 2: # user_worker = params[0] # password = params[1] if len(params) > 1 else "" # # Extract user and worker # if '.' in user_worker: # user, worker = user_worker.split('.', 1) # else: # user = user_worker # worker = "default" # # Check if user contains a RinCoin address (starts with 'rin') # miner_address = None # if user.startswith('rin'): # # User is a RinCoin address # if self.validate_rincoin_address(user): # miner_address = user # user = f"miner_{miner_address[:8]}" # Create a user ID from address # print(f"[{addr}] āœ… Miner using valid RinCoin address: {miner_address}") # else: # print(f"[{addr}] āŒ Invalid RinCoin address: {user}") # self.send_stratum_response(client, msg_id, False, "Invalid RinCoin address") # return # elif '.' in user and user.split('.')[0].startswith('rin'): # # Format: rin1qahvvv9d5f3443wtckeqavwp9950wacxfmwv20q.workername # address_part, worker_part = user.split('.', 1) # if address_part.startswith('rin'): # if self.validate_rincoin_address(address_part): # miner_address = address_part # user = f"miner_{miner_address[:8]}" # worker = worker_part # print(f"[{addr}] āœ… Miner using valid RinCoin address format: {miner_address}.{worker}") # else: # print(f"[{addr}] āŒ Invalid RinCoin address: {address_part}") # self.send_stratum_response(client, msg_id, False, "Invalid RinCoin address") # return # # Register miner with address # miner_id = self.register_miner(user, worker, miner_address) # # Store client info # self.clients[addr] = { # 'client': client, # 'user': user, # 'worker': worker, # 'miner_id': miner_id, # 'address': miner_address, # 'shares': 0, # 'last_share': time.time(), # 'extranonce1': '00000000' # Default extranonce1 # } # if miner_address: # print(f"[{addr}] āœ… Authorized: {user}.{worker} -> {miner_address}") # else: # print(f"[{addr}] āš ļø Authorized: {user}.{worker} (rewards will go to pool address)") # self.send_stratum_response(client, msg_id, True) # else: # self.send_stratum_response(client, msg_id, False, "Invalid authorization") # elif method == "mining.submit": # # Submit share # if addr not in self.clients: # self.send_stratum_response(client, msg_id, False, "Not authorized") # return # miner_info = self.clients[addr] # try: # if self.current_job and len(params) >= 5: # username = params[0] # job_id = params[1] # extranonce2 = params[2] # ntime = params[3] # nonce = params[4] # # Use base class to validate and submit share # extranonce1 = miner_info.get('extranonce1', '00000000') # miner_address = miner_info.get('address') # # For pool mining, always mine to pool address # success, message = self.submit_share( # self.current_job, extranonce1, extranonce2, ntime, nonce, # target_address=self.pool_address # ) # if success: # # Record share with estimated difficulty # actual_difficulty = 0.00133 # Estimated for ~381 kH/s # self.record_share(miner_info['miner_id'], job_id, actual_difficulty) # # Update miner stats # now_ts = time.time() # prev_ts = miner_info.get('last_share') or now_ts # dt = max(now_ts - prev_ts, 1e-3) # miner_hashrate = actual_difficulty * (2**32) / dt # if miner_info['shares'] == 0: # miner_hashrate = 381000 # Default estimate # miner_info['shares'] += 1 # miner_info['last_share'] = now_ts # # Update database # try: # cursor = self.db.cursor() # cursor.execute('UPDATE miners SET last_share = CURRENT_TIMESTAMP, last_hashrate = ? WHERE id = ?', # (miner_hashrate, miner_info['miner_id'])) # self.db.commit() # except Exception as e: # print(f"DB update error: {e}") # print(f"[{addr}] āœ… Share accepted from {miner_info['user']}.{miner_info['worker']} (Total: {miner_info['shares']})") # self.send_stratum_response(client, msg_id, True) # # If block was found, distribute rewards # if "Block found" in message: # print(f"šŸŽ‰ [{addr}] BLOCK FOUND!") # # Get block info and distribute rewards # total_reward = self.current_job['coinbasevalue'] / 100000000 if self.current_job else 25.0 # self.distribute_block_reward("pending", self.current_job['height'] if self.current_job else 0, total_reward) # else: # # Accept as share for pool statistics even if block validation fails # self.send_stratum_response(client, msg_id, True) # else: # print(f"[{addr}] Invalid share parameters") # self.send_stratum_response(client, msg_id, False, "Invalid parameters") # except Exception as e: # print(f"[{addr}] Share processing error: {e}") # # Still accept the share for mining statistics # self.send_stratum_response(client, msg_id, True) # else: # print(f"[{addr}] āš ļø Unknown method: {method}") # # Send null result for unknown methods (standard Stratum behavior) # self.send_stratum_response(client, msg_id, None, None) # except json.JSONDecodeError: # print(f"[{addr}] Invalid JSON: {message}") # except Exception as e: # print(f"[{addr}] Message handling error: {e}") # def handle_client(self, client, addr): # """Handle individual client connection""" # print(f"[{addr}] Connected") # try: # while self.running: # data = client.recv(4096) # if not data: # break # # Handle multiple messages in one packet # messages = data.decode('utf-8').strip().split('\n') # for message in messages: # if message: # self.handle_stratum_message(client, addr, message) # except Exception as e: # print(f"[{addr}] Client error: {e}") # finally: # client.close() # if addr in self.clients: # del self.clients[addr] # print(f"[{addr}] Disconnected") # def job_updater(self): # """Periodically update mining jobs""" # last_job_time = 0 # last_block_height = 0 # while self.running: # try: # # Check for new blocks every 10 seconds # time.sleep(10) # # Get current blockchain info # blockchain_info = self.rpc_call("getblockchaininfo") # if blockchain_info: # current_height = blockchain_info.get('blocks', 0) # # Create new job if: # # 1. New block detected # # 2. 30+ seconds since last job # # 3. No current job exists # should_create_job = ( # current_height != last_block_height or # time.time() - last_job_time > 30 or # not self.current_job # ) # if should_create_job: # if self.get_pool_block_template(): # job = self.current_job # last_job_time = time.time() # last_block_height = current_height # print(f"šŸ“¦ New job created: {job['job_id']} (block {current_height})") # # Send to all connected clients # for addr, miner_info in list(self.clients.items()): # try: # self.send_stratum_notification(miner_info['client'], "mining.notify", [ # job["job_id"], # job["prevhash"], # job["coinb1"], # job["coinb2"], # job["merkle_branch"], # f"{job['version']:08x}", # job["bits"], # job["ntime"], # job["clean_jobs"] # ]) # except Exception as e: # print(f"Failed to send job to {addr}: {e}") # except Exception as e: # print(f"Job updater error: {e}") # def stats_updater(self): # """Periodically update pool statistics""" # while self.running: # try: # time.sleep(60) # Update every minute # cursor = self.db.cursor() # # Pool hashrate is the sum of miners' last hashrates # cursor.execute('SELECT COALESCE(SUM(last_hashrate), 0) FROM miners') # self.pool_hashrate = cursor.fetchone()[0] or 0.0 # # Sample for chart # cursor.execute('INSERT INTO hashrate_samples (hashrate) VALUES (?)', (self.pool_hashrate,)) # self.db.commit() # print(f"šŸ“Š Pool Stats: {len(self.clients)} miners, {self.total_shares} shares, {self.pool_hashrate/1000:.2f} kH/s") # except Exception as e: # print(f"Stats updater error: {e}") # def start(self): # """Start the mining pool server""" # try: # # Test RPC connection # blockchain_info = self.rpc_call("getblockchaininfo") # if not blockchain_info: # print("āŒ Failed to connect to RinCoin node!") # return # print(f"āœ… Connected to RinCoin node (block {blockchain_info.get('blocks', 'unknown')})") # # Start background threads # job_thread = threading.Thread(target=self.job_updater, daemon=True) # job_thread.start() # stats_thread = threading.Thread(target=self.stats_updater, daemon=True) # stats_thread.start() # # Start web interface in background # web_thread = threading.Thread(target=start_web_interface, # args=(self.db, '0.0.0.0', 8083, # self.rpc_host, self.rpc_port, # self.rpc_user, self.rpc_password), # daemon=True) # web_thread.start() # print(f"🌐 Web dashboard started on http://0.0.0.0:8083") # # Start Stratum server # server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # server_socket.bind((self.stratum_host, self.stratum_port)) # server_socket.listen(10) # print(f"šŸš€ Mining pool listening on {self.stratum_host}:{self.stratum_port}") # print("Ready for multiple miners...") # print("") # print(f"šŸ’° Pool address: {self.pool_address}") # print(f"šŸ’° Pool fee: {self.pool_fee_percent}%") # print("") # print("Connect miners with:") # print(f"./cpuminer -a rinhash -o stratum+tcp://{self.stratum_host}:{self.stratum_port} -u username.workername -p x") # print("") # while self.running: # try: # client, addr = server_socket.accept() # client_thread = threading.Thread(target=self.handle_client, args=(client, addr), daemon=True) # client_thread.start() # except KeyboardInterrupt: # print("\nšŸ›‘ Shutting down pool...") # self.running = False # break # except Exception as e: # print(f"Server error: {e}") # except OSError as e: # if "Address already in use" in str(e): # print(f"āŒ Port {self.stratum_port} is already in use!") # print("") # print("šŸ” Check what's using the port:") # print(f"sudo netstat -tlnp | grep :{self.stratum_port}") # print("") # print("šŸ›‘ Kill existing process:") # print(f"sudo lsof -ti:{self.stratum_port} | xargs sudo kill -9") # print("") # print("šŸ”„ Or use a different port by editing the script") # else: # print(f"Failed to start server: {e}") # except Exception as e: # print(f"Failed to start server: {e}") # finally: # print("Pool server stopped") # if __name__ == "__main__": # pool = RinCoinMiningPool() # pool.start()