Files
scripts/MINE/rin/stratum_pool.py
Dobromir Popov 34b095d6ff refactoring
2025-09-05 01:15:58 +03:00

603 lines
26 KiB
Python

#!/usr/bin/env python3
"""
RinCoin Mining Pool Server
Distributes block rewards among multiple miners based on share contributions
"""
import socket
import threading
import json
import time
import requests
import hashlib
import struct
import sqlite3
from datetime import datetime
from requests.auth import HTTPBasicAuth
# Import web interface
from pool_web_interface import start_web_interface
# Import stratum base class
from stratum_proxy import RinCoinStratumBase
class RinCoinMiningPool(RinCoinStratumBase):
def __init__(self, stratum_host='0.0.0.0', stratum_port=3333,
rpc_host='127.0.0.1', rpc_port=9556,
rpc_user='rinrpc', rpc_password='745ce784d5d537fc06105a1b935b7657903cfc71a5fb3b90',
pool_address='rin1qahvvv9d5f3443wtckeqavwp9950wacxfmwv20q',
pool_fee_percent=1.0):
# Initialize base class
super().__init__(stratum_host, stratum_port, rpc_host, rpc_port, rpc_user, rpc_password, pool_address)
self.pool_address = pool_address
self.pool_fee_percent = pool_fee_percent
# Pool statistics
self.total_shares = 0
self.total_blocks = 0
self.pool_hashrate = 0
# Database for persistent storage
self.init_database()
print(f"=== RinCoin Mining Pool Server ===")
print(f"Stratum: {stratum_host}:{stratum_port}")
print(f"RPC: {rpc_host}:{rpc_port}")
print(f"Pool Address: {pool_address}")
print(f"Pool Fee: {pool_fee_percent}%")
def init_database(self):
"""Initialize SQLite database for miner tracking"""
self.db = sqlite3.connect(':memory:', check_same_thread=False)
cursor = self.db.cursor()
# Create tables
cursor.execute('''
CREATE TABLE IF NOT EXISTS miners (
id INTEGER PRIMARY KEY,
user TEXT NOT NULL,
worker TEXT NOT NULL,
address TEXT,
shares INTEGER DEFAULT 0,
last_share TIMESTAMP,
last_hashrate REAL DEFAULT 0,
created TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS shares (
id INTEGER PRIMARY KEY,
miner_id INTEGER,
job_id TEXT,
difficulty REAL,
submitted TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (miner_id) REFERENCES miners (id)
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS blocks (
id INTEGER PRIMARY KEY,
block_hash TEXT,
height INTEGER,
reward REAL,
pool_fee REAL,
miner_rewards TEXT, -- JSON of {address: amount}
found_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Samples for pool hashrate chart
cursor.execute('''
CREATE TABLE IF NOT EXISTS hashrate_samples (
id INTEGER PRIMARY KEY,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
hashrate REAL
)
''')
self.db.commit()
def get_pool_block_template(self):
"""Get new block template and create pool-style job"""
template = super().get_block_template()
if template:
# Convert to pool-style job format if needed
job = self.current_job
if job:
# Add pool-specific fields
job["coinb1"] = "01000000" + "0" * 60
job["coinb2"] = "ffffffff"
job["merkle_branch"] = []
job["clean_jobs"] = True
return job
return None
def validate_rincoin_address(self, address):
"""Validate if an address is a valid RinCoin address"""
try:
return self.decode_bech32_address(address) is not None
except:
return False
def register_miner(self, user, worker, address=None):
"""Register or update miner in database"""
cursor = self.db.cursor()
# Check if miner exists
cursor.execute('SELECT id, address FROM miners WHERE user = ? AND worker = ?', (user, worker))
result = cursor.fetchone()
if result:
miner_id, existing_address = result
if address and not existing_address:
cursor.execute('UPDATE miners SET address = ? WHERE id = ?', (address, miner_id))
self.db.commit()
return miner_id
else:
# Create new miner
cursor.execute('INSERT INTO miners (user, worker, address) VALUES (?, ?, ?)', (user, worker, address))
self.db.commit()
return cursor.lastrowid
def record_share(self, miner_id, job_id, difficulty):
"""Record a share submission"""
cursor = self.db.cursor()
# Record share
cursor.execute('INSERT INTO shares (miner_id, job_id, difficulty) VALUES (?, ?, ?)',
(miner_id, job_id, difficulty))
# Update miner stats
cursor.execute('UPDATE miners SET shares = shares + 1, last_share = CURRENT_TIMESTAMP WHERE id = ?', (miner_id,))
self.db.commit()
self.total_shares += 1
def distribute_block_reward(self, block_hash, block_height, total_reward):
"""Distribute block reward among miners based on their shares"""
cursor = self.db.cursor()
# Calculate pool fee
pool_fee = total_reward * (self.pool_fee_percent / 100.0)
miner_reward = total_reward - pool_fee
# Get shares from last 24 hours
cursor.execute('''
SELECT m.address, COUNT(s.id) as share_count, SUM(s.difficulty) as total_difficulty
FROM miners m
JOIN shares s ON m.id = s.miner_id
WHERE s.submitted > datetime('now', '-1 day')
GROUP BY m.id, m.address
HAVING share_count > 0
''')
miners = cursor.fetchall()
if not miners:
print("No miners with shares in last 24 hours")
return
# Calculate total difficulty
total_difficulty = sum(row[2] for row in miners)
# Separate miners with and without addresses
miners_with_addresses = []
miners_without_addresses = []
total_difficulty_with_addresses = 0
total_difficulty_without_addresses = 0
for address, share_count, difficulty in miners:
if address:
miners_with_addresses.append((address, share_count, difficulty))
total_difficulty_with_addresses += difficulty
else:
miners_without_addresses.append((address, share_count, difficulty))
total_difficulty_without_addresses += difficulty
# Calculate total difficulty
total_difficulty = total_difficulty_with_addresses + total_difficulty_without_addresses
if total_difficulty == 0:
print("No valid difficulty found")
return
# Distribute rewards
miner_rewards = {}
# First, distribute to miners with valid addresses
if miners_with_addresses:
for address, share_count, difficulty in miners_with_addresses:
reward_share = (difficulty / total_difficulty) * miner_reward
miner_rewards[address] = reward_share
print(f"💰 Miner {address}: {reward_share:.8f} RIN ({difficulty} difficulty)")
# Calculate undistributed rewards (from miners without addresses)
if miners_without_addresses:
undistributed_reward = 0
for address, share_count, difficulty in miners_without_addresses:
undistributed_reward += (difficulty / total_difficulty) * miner_reward
print(f"⚠️ Miner without address: {difficulty} difficulty -> {undistributed_reward:.8f} RIN to pool")
# Keep undistributed rewards for pool (no redistribution)
print(f"💰 Pool keeps {undistributed_reward:.8f} RIN from miners without addresses")
# Record block
cursor.execute('''
INSERT INTO blocks (block_hash, height, reward, pool_fee, miner_rewards)
VALUES (?, ?, ?, ?, ?)
''', (block_hash, block_height, total_reward, pool_fee, json.dumps(miner_rewards)))
self.db.commit()
self.total_blocks += 1
print(f"🎉 Block {block_height} reward distributed!")
print(f"💰 Pool fee: {pool_fee:.8f} RIN")
print(f"💰 Total distributed: {sum(miner_rewards.values()):.8f} RIN")
# Summary
if miners_without_addresses:
print(f"📊 Summary: {len(miners_with_addresses)} miners with addresses, {len(miners_without_addresses)} without (rewards to pool)")
# Use inherited send_stratum_response and send_stratum_notification from base class
def handle_stratum_message(self, client, addr, message):
"""Handle incoming Stratum message from miner"""
try:
data = json.loads(message.strip())
method = data.get("method")
msg_id = data.get("id")
params = data.get("params", [])
print(f"[{addr}] {method}: {params}")
if method == "mining.subscribe":
# Subscribe response
self.send_stratum_response(client, msg_id, [
[["mining.set_difficulty", "subscription_id"], ["mining.notify", "subscription_id"]],
"extranonce1",
4
])
# Send difficulty (lower for CPU mining)
self.send_stratum_notification(client, "mining.set_difficulty", [0.0001])
# Send initial job
if self.get_pool_block_template():
job = self.current_job
self.send_stratum_notification(client, "mining.notify", [
job["job_id"],
job["prevhash"],
job["coinb1"],
job["coinb2"],
job["merkle_branch"],
f"{job['version']:08x}",
job["bits"],
job["ntime"],
job["clean_jobs"]
])
elif method == "mining.extranonce.subscribe":
# Handle extranonce subscription
print(f"[{addr}] Extranonce subscription requested")
self.send_stratum_response(client, msg_id, True)
elif method == "mining.authorize":
# Parse user.worker format
if len(params) >= 2:
user_worker = params[0]
password = params[1] if len(params) > 1 else ""
# Extract user and worker
if '.' in user_worker:
user, worker = user_worker.split('.', 1)
else:
user = user_worker
worker = "default"
# Check if user contains a RinCoin address (starts with 'rin')
miner_address = None
if user.startswith('rin'):
# User is a RinCoin address
if self.validate_rincoin_address(user):
miner_address = user
user = f"miner_{miner_address[:8]}" # Create a user ID from address
print(f"[{addr}] ✅ Miner using valid RinCoin address: {miner_address}")
else:
print(f"[{addr}] ❌ Invalid RinCoin address: {user}")
self.send_stratum_response(client, msg_id, False, "Invalid RinCoin address")
return
elif '.' in user and user.split('.')[0].startswith('rin'):
# Format: rin1qahvvv9d5f3443wtckeqavwp9950wacxfmwv20q.workername
address_part, worker_part = user.split('.', 1)
if address_part.startswith('rin'):
if self.validate_rincoin_address(address_part):
miner_address = address_part
user = f"miner_{miner_address[:8]}"
worker = worker_part
print(f"[{addr}] ✅ Miner using valid RinCoin address format: {miner_address}.{worker}")
else:
print(f"[{addr}] ❌ Invalid RinCoin address: {address_part}")
self.send_stratum_response(client, msg_id, False, "Invalid RinCoin address")
return
# Register miner with address
miner_id = self.register_miner(user, worker, miner_address)
# Store client info
self.clients[addr] = {
'client': client,
'user': user,
'worker': worker,
'miner_id': miner_id,
'address': miner_address,
'shares': 0,
'last_share': time.time(),
'extranonce1': '00000000' # Default extranonce1
}
if miner_address:
print(f"[{addr}] ✅ Authorized: {user}.{worker} -> {miner_address}")
else:
print(f"[{addr}] ⚠️ Authorized: {user}.{worker} (rewards will go to pool address)")
self.send_stratum_response(client, msg_id, True)
else:
self.send_stratum_response(client, msg_id, False, "Invalid authorization")
elif method == "mining.submit":
# Submit share
if addr not in self.clients:
self.send_stratum_response(client, msg_id, False, "Not authorized")
return
miner_info = self.clients[addr]
try:
if self.current_job and len(params) >= 5:
username = params[0]
job_id = params[1]
extranonce2 = params[2]
ntime = params[3]
nonce = params[4]
# Use base class to validate and submit share
extranonce1 = miner_info.get('extranonce1', '00000000')
miner_address = miner_info.get('address')
# For pool mining, always mine to pool address
success, message = self.submit_share(
self.current_job, extranonce1, extranonce2, ntime, nonce,
target_address=self.pool_address
)
if success:
# Record share with estimated difficulty
actual_difficulty = 0.00133 # Estimated for ~381 kH/s
self.record_share(miner_info['miner_id'], job_id, actual_difficulty)
# Update miner stats
now_ts = time.time()
prev_ts = miner_info.get('last_share') or now_ts
dt = max(now_ts - prev_ts, 1e-3)
miner_hashrate = actual_difficulty * (2**32) / dt
if miner_info['shares'] == 0:
miner_hashrate = 381000 # Default estimate
miner_info['shares'] += 1
miner_info['last_share'] = now_ts
# Update database
try:
cursor = self.db.cursor()
cursor.execute('UPDATE miners SET last_share = CURRENT_TIMESTAMP, last_hashrate = ? WHERE id = ?',
(miner_hashrate, miner_info['miner_id']))
self.db.commit()
except Exception as e:
print(f"DB update error: {e}")
print(f"[{addr}] ✅ Share accepted from {miner_info['user']}.{miner_info['worker']} (Total: {miner_info['shares']})")
self.send_stratum_response(client, msg_id, True)
# If block was found, distribute rewards
if "Block found" in message:
print(f"🎉 [{addr}] BLOCK FOUND!")
# Get block info and distribute rewards
total_reward = self.current_job['coinbasevalue'] / 100000000 if self.current_job else 25.0
self.distribute_block_reward("pending", self.current_job['height'] if self.current_job else 0, total_reward)
else:
# Accept as share for pool statistics even if block validation fails
self.send_stratum_response(client, msg_id, True)
else:
print(f"[{addr}] Invalid share parameters")
self.send_stratum_response(client, msg_id, False, "Invalid parameters")
except Exception as e:
print(f"[{addr}] Share processing error: {e}")
# Still accept the share for mining statistics
self.send_stratum_response(client, msg_id, True)
else:
print(f"[{addr}] ⚠️ Unknown method: {method}")
# Send null result for unknown methods (standard Stratum behavior)
self.send_stratum_response(client, msg_id, None, None)
except json.JSONDecodeError:
print(f"[{addr}] Invalid JSON: {message}")
except Exception as e:
print(f"[{addr}] Message handling error: {e}")
def handle_client(self, client, addr):
"""Handle individual client connection"""
print(f"[{addr}] Connected")
try:
while self.running:
data = client.recv(4096)
if not data:
break
# Handle multiple messages in one packet
messages = data.decode('utf-8').strip().split('\n')
for message in messages:
if message:
self.handle_stratum_message(client, addr, message)
except Exception as e:
print(f"[{addr}] Client error: {e}")
finally:
client.close()
if addr in self.clients:
del self.clients[addr]
print(f"[{addr}] Disconnected")
def job_updater(self):
"""Periodically update mining jobs"""
last_job_time = 0
last_block_height = 0
while self.running:
try:
# Check for new blocks every 10 seconds
time.sleep(10)
# Get current blockchain info
blockchain_info = self.rpc_call("getblockchaininfo")
if blockchain_info:
current_height = blockchain_info.get('blocks', 0)
# Create new job if:
# 1. New block detected
# 2. 30+ seconds since last job
# 3. No current job exists
should_create_job = (
current_height != last_block_height or
time.time() - last_job_time > 30 or
not self.current_job
)
if should_create_job:
if self.get_pool_block_template():
job = self.current_job
last_job_time = time.time()
last_block_height = current_height
print(f"📦 New job created: {job['job_id']} (block {current_height})")
# Send to all connected clients
for addr, miner_info in list(self.clients.items()):
try:
self.send_stratum_notification(miner_info['client'], "mining.notify", [
job["job_id"],
job["prevhash"],
job["coinb1"],
job["coinb2"],
job["merkle_branch"],
f"{job['version']:08x}",
job["bits"],
job["ntime"],
job["clean_jobs"]
])
except Exception as e:
print(f"Failed to send job to {addr}: {e}")
except Exception as e:
print(f"Job updater error: {e}")
def stats_updater(self):
"""Periodically update pool statistics"""
while self.running:
try:
time.sleep(60) # Update every minute
cursor = self.db.cursor()
# Pool hashrate is the sum of miners' last hashrates
cursor.execute('SELECT COALESCE(SUM(last_hashrate), 0) FROM miners')
self.pool_hashrate = cursor.fetchone()[0] or 0.0
# Sample for chart
cursor.execute('INSERT INTO hashrate_samples (hashrate) VALUES (?)', (self.pool_hashrate,))
self.db.commit()
print(f"📊 Pool Stats: {len(self.clients)} miners, {self.total_shares} shares, {self.pool_hashrate/1000:.2f} kH/s")
except Exception as e:
print(f"Stats updater error: {e}")
def start(self):
"""Start the mining pool server"""
try:
# Test RPC connection
blockchain_info = self.rpc_call("getblockchaininfo")
if not blockchain_info:
print("❌ Failed to connect to RinCoin node!")
return
print(f"✅ Connected to RinCoin node (block {blockchain_info.get('blocks', 'unknown')})")
# Start background threads
job_thread = threading.Thread(target=self.job_updater, daemon=True)
job_thread.start()
stats_thread = threading.Thread(target=self.stats_updater, daemon=True)
stats_thread.start()
# Start web interface in background
web_thread = threading.Thread(target=start_web_interface,
args=(self.db, '0.0.0.0', 8083,
self.rpc_host, self.rpc_port,
self.rpc_user, self.rpc_password),
daemon=True)
web_thread.start()
print(f"🌐 Web dashboard started on http://0.0.0.0:8083")
# Start Stratum server
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((self.stratum_host, self.stratum_port))
server_socket.listen(10)
print(f"🚀 Mining pool listening on {self.stratum_host}:{self.stratum_port}")
print("Ready for multiple miners...")
print("")
print(f"💰 Pool address: {self.pool_address}")
print(f"💰 Pool fee: {self.pool_fee_percent}%")
print("")
print("Connect miners with:")
print(f"./cpuminer -a rinhash -o stratum+tcp://{self.stratum_host}:{self.stratum_port} -u username.workername -p x")
print("")
while self.running:
try:
client, addr = server_socket.accept()
client_thread = threading.Thread(target=self.handle_client, args=(client, addr), daemon=True)
client_thread.start()
except KeyboardInterrupt:
print("\n🛑 Shutting down pool...")
self.running = False
break
except Exception as e:
print(f"Server error: {e}")
except OSError as e:
if "Address already in use" in str(e):
print(f"❌ Port {self.stratum_port} is already in use!")
print("")
print("🔍 Check what's using the port:")
print(f"sudo netstat -tlnp | grep :{self.stratum_port}")
print("")
print("🛑 Kill existing process:")
print(f"sudo lsof -ti:{self.stratum_port} | xargs sudo kill -9")
print("")
print("🔄 Or use a different port by editing the script")
else:
print(f"Failed to start server: {e}")
except Exception as e:
print(f"Failed to start server: {e}")
finally:
print("Pool server stopped")
if __name__ == "__main__":
pool = RinCoinMiningPool()
pool.start()