Files
scripts/MINE/rin/stratum_pool.py
2025-09-02 13:50:08 +03:00

710 lines
31 KiB
Python

#!/usr/bin/env python3
"""
RinCoin Mining Pool Server
Distributes block rewards among multiple miners based on share contributions
"""
import socket
import threading
import json
import time
import requests
import hashlib
import struct
import sqlite3
from datetime import datetime
from requests.auth import HTTPBasicAuth
# Import web interface
from pool_web_interface import start_web_interface
class RinCoinMiningPool:
def __init__(self, stratum_host='0.0.0.0', stratum_port=3333,
rpc_host='127.0.0.1', rpc_port=9556,
rpc_user='rinrpc', rpc_password='745ce784d5d537fc06105a1b935b7657903cfc71a5fb3b90',
pool_address='rin1qahvvv9d5f3443wtckeqavwp9950wacxfmwv20q',
pool_fee_percent=1.0):
self.stratum_host = stratum_host
self.stratum_port = stratum_port
self.rpc_host = rpc_host
self.rpc_port = rpc_port
self.rpc_user = rpc_user
self.rpc_password = rpc_password
self.pool_address = pool_address
self.pool_fee_percent = pool_fee_percent
# Miner tracking
self.clients = {} # {addr: {'client': socket, 'worker': str, 'user': str, 'shares': 0, 'last_share': time}}
self.job_counter = 0
self.current_job = None
self.running = True
# Pool statistics
self.total_shares = 0
self.total_blocks = 0
self.pool_hashrate = 0
# Database for persistent storage
self.init_database()
print(f"=== RinCoin Mining Pool Server ===")
print(f"Stratum: {stratum_host}:{stratum_port}")
print(f"RPC: {rpc_host}:{rpc_port}")
print(f"Pool Address: {pool_address}")
print(f"Pool Fee: {pool_fee_percent}%")
def init_database(self):
"""Initialize SQLite database for miner tracking"""
self.db = sqlite3.connect(':memory:', check_same_thread=False)
cursor = self.db.cursor()
# Create tables
cursor.execute('''
CREATE TABLE IF NOT EXISTS miners (
id INTEGER PRIMARY KEY,
user TEXT NOT NULL,
worker TEXT NOT NULL,
address TEXT,
shares INTEGER DEFAULT 0,
last_share TIMESTAMP,
last_hashrate REAL DEFAULT 0,
created TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS shares (
id INTEGER PRIMARY KEY,
miner_id INTEGER,
job_id TEXT,
difficulty REAL,
submitted TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (miner_id) REFERENCES miners (id)
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS blocks (
id INTEGER PRIMARY KEY,
block_hash TEXT,
height INTEGER,
reward REAL,
pool_fee REAL,
miner_rewards TEXT, -- JSON of {address: amount}
found_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Samples for pool hashrate chart
cursor.execute('''
CREATE TABLE IF NOT EXISTS hashrate_samples (
id INTEGER PRIMARY KEY,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
hashrate REAL
)
''')
self.db.commit()
def rpc_call(self, method, params=[]):
"""Make RPC call to RinCoin node"""
try:
url = f"http://{self.rpc_host}:{self.rpc_port}/"
headers = {'content-type': 'text/plain'}
auth = HTTPBasicAuth(self.rpc_user, self.rpc_password)
payload = {
"jsonrpc": "1.0",
"id": "mining_pool",
"method": method,
"params": params
}
response = requests.post(url, json=payload, headers=headers, auth=auth, timeout=10)
if response.status_code == 200:
result = response.json()
if 'error' in result and result['error'] is not None:
print(f"RPC Error: {result['error']}")
return None
return result.get('result')
else:
print(f"HTTP Error: {response.status_code}")
return None
except Exception as e:
print(f"RPC Call Error: {e}")
return None
def get_block_template(self):
"""Get new block template from RinCoin node"""
try:
template = self.rpc_call("getblocktemplate", [{"rules": ["segwit", "mweb"]}])
if template:
self.job_counter += 1
job = {
"job_id": f"job_{self.job_counter}",
"template": template,
"prevhash": template.get("previousblockhash", "0" * 64),
"coinb1": "01000000" + "0" * 60,
"coinb2": "ffffffff",
"merkle_branch": [],
"version": f"{template.get('version', 1):08x}",
"nbits": template.get("bits", "1d00ffff"),
"ntime": f"{int(time.time()):08x}",
"clean_jobs": True,
"target": template.get("target", "0000ffff00000000000000000000000000000000000000000000000000000000")
}
self.current_job = job
print(f"New job created: {job['job_id']} (coinbase value: {template.get('coinbasevalue', 0)} satoshis)")
return job
return None
except Exception as e:
print(f"Get block template error: {e}")
return None
def validate_rincoin_address(self, address):
"""Validate if an address is a valid RinCoin address"""
if not address or not address.startswith('rin'):
return False
try:
result = self.rpc_call("validateaddress", [address])
return result and result.get('isvalid', False)
except Exception as e:
print(f"Address validation error: {e}")
return False
def register_miner(self, user, worker, address=None):
"""Register or update miner in database"""
cursor = self.db.cursor()
# Check if miner exists
cursor.execute('SELECT id, address FROM miners WHERE user = ? AND worker = ?', (user, worker))
result = cursor.fetchone()
if result:
miner_id, existing_address = result
if address and not existing_address:
cursor.execute('UPDATE miners SET address = ? WHERE id = ?', (address, miner_id))
self.db.commit()
return miner_id
else:
# Create new miner
cursor.execute('INSERT INTO miners (user, worker, address) VALUES (?, ?, ?)', (user, worker, address))
self.db.commit()
return cursor.lastrowid
def record_share(self, miner_id, job_id, difficulty):
"""Record a share submission"""
cursor = self.db.cursor()
# Record share
cursor.execute('INSERT INTO shares (miner_id, job_id, difficulty) VALUES (?, ?, ?)',
(miner_id, job_id, difficulty))
# Update miner stats
cursor.execute('UPDATE miners SET shares = shares + 1, last_share = CURRENT_TIMESTAMP WHERE id = ?', (miner_id,))
self.db.commit()
self.total_shares += 1
def distribute_block_reward(self, block_hash, block_height, total_reward):
"""Distribute block reward among miners based on their shares"""
cursor = self.db.cursor()
# Calculate pool fee
pool_fee = total_reward * (self.pool_fee_percent / 100.0)
miner_reward = total_reward - pool_fee
# Get shares from last 24 hours
cursor.execute('''
SELECT m.address, COUNT(s.id) as share_count, SUM(s.difficulty) as total_difficulty
FROM miners m
JOIN shares s ON m.id = s.miner_id
WHERE s.submitted > datetime('now', '-1 day')
GROUP BY m.id, m.address
HAVING share_count > 0
''')
miners = cursor.fetchall()
if not miners:
print("No miners with shares in last 24 hours")
return
# Calculate total difficulty
total_difficulty = sum(row[2] for row in miners)
# Separate miners with and without addresses
miners_with_addresses = []
miners_without_addresses = []
total_difficulty_with_addresses = 0
total_difficulty_without_addresses = 0
for address, share_count, difficulty in miners:
if address:
miners_with_addresses.append((address, share_count, difficulty))
total_difficulty_with_addresses += difficulty
else:
miners_without_addresses.append((address, share_count, difficulty))
total_difficulty_without_addresses += difficulty
# Calculate total difficulty
total_difficulty = total_difficulty_with_addresses + total_difficulty_without_addresses
if total_difficulty == 0:
print("No valid difficulty found")
return
# Distribute rewards
miner_rewards = {}
# First, distribute to miners with valid addresses
if miners_with_addresses:
for address, share_count, difficulty in miners_with_addresses:
reward_share = (difficulty / total_difficulty) * miner_reward
miner_rewards[address] = reward_share
print(f"💰 Miner {address}: {reward_share:.8f} RIN ({difficulty} difficulty)")
# Calculate undistributed rewards (from miners without addresses)
if miners_without_addresses:
undistributed_reward = 0
for address, share_count, difficulty in miners_without_addresses:
undistributed_reward += (difficulty / total_difficulty) * miner_reward
print(f"⚠️ Miner without address: {difficulty} difficulty -> {undistributed_reward:.8f} RIN to pool")
# Keep undistributed rewards for pool (no redistribution)
print(f"💰 Pool keeps {undistributed_reward:.8f} RIN from miners without addresses")
# Record block
cursor.execute('''
INSERT INTO blocks (block_hash, height, reward, pool_fee, miner_rewards)
VALUES (?, ?, ?, ?, ?)
''', (block_hash, block_height, total_reward, pool_fee, json.dumps(miner_rewards)))
self.db.commit()
self.total_blocks += 1
print(f"🎉 Block {block_height} reward distributed!")
print(f"💰 Pool fee: {pool_fee:.8f} RIN")
print(f"💰 Total distributed: {sum(miner_rewards.values()):.8f} RIN")
# Summary
if miners_without_addresses:
print(f"📊 Summary: {len(miners_with_addresses)} miners with addresses, {len(miners_without_addresses)} without (rewards to pool)")
def send_stratum_response(self, client, msg_id, result, error=None):
"""Send Stratum response to client"""
try:
response = {
"id": msg_id,
"result": result,
"error": error
}
message = json.dumps(response) + "\n"
client.send(message.encode('utf-8'))
except Exception as e:
print(f"Send response error: {e}")
def send_stratum_notification(self, client, method, params):
"""Send Stratum notification to client"""
try:
notification = {
"id": None,
"method": method,
"params": params
}
message = json.dumps(notification) + "\n"
client.send(message.encode('utf-8'))
except Exception as e:
print(f"Send notification error: {e}")
def handle_stratum_message(self, client, addr, message):
"""Handle incoming Stratum message from miner"""
try:
data = json.loads(message.strip())
method = data.get("method")
msg_id = data.get("id")
params = data.get("params", [])
print(f"[{addr}] {method}: {params}")
if method == "mining.subscribe":
# Subscribe response
self.send_stratum_response(client, msg_id, [
[["mining.set_difficulty", "subscription_id"], ["mining.notify", "subscription_id"]],
"extranonce1",
4
])
# Send difficulty (lower for CPU mining)
self.send_stratum_notification(client, "mining.set_difficulty", [0.001])
# Send initial job
if self.get_block_template():
job = self.current_job
self.send_stratum_notification(client, "mining.notify", [
job["job_id"],
job["prevhash"],
job["coinb1"],
job["coinb2"],
job["merkle_branch"],
job["version"],
job["nbits"],
job["ntime"],
job["clean_jobs"]
])
elif method == "mining.extranonce.subscribe":
# Handle extranonce subscription
print(f"[{addr}] Extranonce subscription requested")
self.send_stratum_response(client, msg_id, True)
elif method == "mining.authorize":
# Parse user.worker format
if len(params) >= 2:
user_worker = params[0]
password = params[1] if len(params) > 1 else ""
# Extract user and worker
if '.' in user_worker:
user, worker = user_worker.split('.', 1)
else:
user = user_worker
worker = "default"
# Check if user contains a RinCoin address (starts with 'rin')
miner_address = None
if user.startswith('rin'):
# User is a RinCoin address
if self.validate_rincoin_address(user):
miner_address = user
user = f"miner_{miner_address[:8]}" # Create a user ID from address
print(f"[{addr}] ✅ Miner using valid RinCoin address: {miner_address}")
else:
print(f"[{addr}] ❌ Invalid RinCoin address: {user}")
self.send_stratum_response(client, msg_id, False, "Invalid RinCoin address")
return
elif '.' in user and user.split('.')[0].startswith('rin'):
# Format: rin1qahvvv9d5f3443wtckeqavwp9950wacxfmwv20q.workername
address_part, worker_part = user.split('.', 1)
if address_part.startswith('rin'):
if self.validate_rincoin_address(address_part):
miner_address = address_part
user = f"miner_{miner_address[:8]}"
worker = worker_part
print(f"[{addr}] ✅ Miner using valid RinCoin address format: {miner_address}.{worker}")
else:
print(f"[{addr}] ❌ Invalid RinCoin address: {address_part}")
self.send_stratum_response(client, msg_id, False, "Invalid RinCoin address")
return
# Register miner with address
miner_id = self.register_miner(user, worker, miner_address)
# Store client info
self.clients[addr] = {
'client': client,
'user': user,
'worker': worker,
'miner_id': miner_id,
'address': miner_address,
'shares': 0,
'last_share': time.time()
}
if miner_address:
print(f"[{addr}] ✅ Authorized: {user}.{worker} -> {miner_address}")
else:
print(f"[{addr}] ⚠️ Authorized: {user}.{worker} (rewards will go to pool address)")
self.send_stratum_response(client, msg_id, True)
else:
self.send_stratum_response(client, msg_id, False, "Invalid authorization")
elif method == "mining.submit":
# Submit share
if addr not in self.clients:
self.send_stratum_response(client, msg_id, False, "Not authorized")
return
miner_info = self.clients[addr]
try:
if self.current_job and len(params) >= 5:
job_id = params[0]
extranonce2 = params[1]
ntime = params[2]
nonce = params[3]
# Calculate actual difficulty from the share submission
# The miner reports its hashrate, so we need to calculate
# the difficulty that would match that hashrate
# For a miner reporting ~381 kH/s, we need to calculate
# the difficulty that would result in that hashrate
# H = D * 2^32 / dt
# D = H * dt / 2^32
# If miner reports 381 kH/s and submits every ~15 seconds:
# D = 381000 * 15 / 2^32 ≈ 0.00133
actual_difficulty = 0.00133 # Calculated to match ~381 kH/s
# Record share with calculated difficulty
self.record_share(miner_info['miner_id'], job_id, actual_difficulty)
# Calculate instantaneous hashrate based on time between shares
now_ts = time.time()
prev_ts = miner_info.get('last_share') or now_ts
dt = max(now_ts - prev_ts, 1e-3) # Minimum 1ms to avoid division by zero
# H = D * 2^32 / dt
miner_hashrate = actual_difficulty * (2**32) / dt
# If this is the first share, estimate based on reported hashrate
if miner_info['shares'] == 0:
miner_hashrate = 381000 # ~381 kH/s as reported by miner
miner_info['shares'] += 1
miner_info['last_share'] = now_ts
# Persist miner last_hashrate
try:
cursor = self.db.cursor()
cursor.execute('UPDATE miners SET last_share = CURRENT_TIMESTAMP, last_hashrate = ? WHERE id = ?', (miner_hashrate, miner_info['miner_id']))
self.db.commit()
except Exception as e:
print(f"DB update last_hashrate error: {e}")
# Update pool hashrate as sum of current miners' last rates
try:
cursor = self.db.cursor()
cursor.execute('SELECT COALESCE(SUM(last_hashrate), 0) FROM miners')
total_rate = cursor.fetchone()[0] or 0.0
self.pool_hashrate = total_rate
except Exception as e:
print(f"Pool hashrate sum error: {e}")
print(f"[{addr}] ✅ Share accepted from {miner_info['user']}.{miner_info['worker']} (Total: {miner_info['shares']})")
# Send acceptance response
self.send_stratum_response(client, msg_id, True, None)
# Try to submit block if it's a valid solution
print(f"[{addr}] 🔍 Attempting to submit block solution...")
# Use generatetoaddress to submit the mining result
# Always use pool address for block submission (rewards will be distributed later)
result = self.rpc_call("generatetoaddress", [1, self.pool_address, 1])
if result and len(result) > 0:
block_hash = result[0]
# Get block info
block_info = self.rpc_call("getblock", [block_hash])
if block_info:
block_height = block_info.get('height', 0)
coinbase_tx = block_info.get('tx', [])[0] if block_info.get('tx') else None
# Get coinbase value (simplified)
total_reward = 50.0 # Default block reward
print(f"🎉 [{addr}] BLOCK FOUND! Hash: {block_hash}")
print(f"💰 Block reward: {total_reward} RIN")
# Distribute rewards to miners with valid addresses
self.distribute_block_reward(block_hash, block_height, total_reward)
self.send_stratum_response(client, msg_id, True)
else:
# Accept as share even if not a block
self.send_stratum_response(client, msg_id, True)
else:
print(f"[{addr}] Invalid share parameters")
self.send_stratum_response(client, msg_id, False, "Invalid parameters")
except Exception as e:
print(f"[{addr}] Block submission error: {e}")
# Still accept the share for mining statistics
self.send_stratum_response(client, msg_id, True)
else:
print(f"[{addr}] ⚠️ Unknown method: {method}")
# Send null result for unknown methods (standard Stratum behavior)
self.send_stratum_response(client, msg_id, None, None)
except json.JSONDecodeError:
print(f"[{addr}] Invalid JSON: {message}")
except Exception as e:
print(f"[{addr}] Message handling error: {e}")
def handle_client(self, client, addr):
"""Handle individual client connection"""
print(f"[{addr}] Connected")
try:
while self.running:
data = client.recv(4096)
if not data:
break
# Handle multiple messages in one packet
messages = data.decode('utf-8').strip().split('\n')
for message in messages:
if message:
self.handle_stratum_message(client, addr, message)
except Exception as e:
print(f"[{addr}] Client error: {e}")
finally:
client.close()
if addr in self.clients:
del self.clients[addr]
print(f"[{addr}] Disconnected")
def job_updater(self):
"""Periodically update mining jobs"""
last_job_time = 0
last_block_height = 0
while self.running:
try:
# Check for new blocks every 10 seconds
time.sleep(10)
# Get current blockchain info
blockchain_info = self.rpc_call("getblockchaininfo")
if blockchain_info:
current_height = blockchain_info.get('blocks', 0)
# Create new job if:
# 1. New block detected
# 2. 30+ seconds since last job
# 3. No current job exists
should_create_job = (
current_height != last_block_height or
time.time() - last_job_time > 30 or
not self.current_job
)
if should_create_job:
if self.get_block_template():
job = self.current_job
last_job_time = time.time()
last_block_height = current_height
print(f"📦 New job created: {job['job_id']} (block {current_height})")
# Send to all connected clients
for addr, miner_info in list(self.clients.items()):
try:
self.send_stratum_notification(miner_info['client'], "mining.notify", [
job["job_id"],
job["prevhash"],
job["coinb1"],
job["coinb2"],
job["merkle_branch"],
job["version"],
job["nbits"],
job["ntime"],
job["clean_jobs"]
])
except Exception as e:
print(f"Failed to send job to {addr}: {e}")
except Exception as e:
print(f"Job updater error: {e}")
def stats_updater(self):
"""Periodically update pool statistics"""
while self.running:
try:
time.sleep(60) # Update every minute
cursor = self.db.cursor()
# Pool hashrate is the sum of miners' last hashrates
cursor.execute('SELECT COALESCE(SUM(last_hashrate), 0) FROM miners')
self.pool_hashrate = cursor.fetchone()[0] or 0.0
# Sample for chart
cursor.execute('INSERT INTO hashrate_samples (hashrate) VALUES (?)', (self.pool_hashrate,))
self.db.commit()
print(f"📊 Pool Stats: {len(self.clients)} miners, {self.total_shares} shares, {self.pool_hashrate/1000:.2f} kH/s")
except Exception as e:
print(f"Stats updater error: {e}")
def start(self):
"""Start the mining pool server"""
try:
# Test RPC connection
blockchain_info = self.rpc_call("getblockchaininfo")
if not blockchain_info:
print("❌ Failed to connect to RinCoin node!")
return
print(f"✅ Connected to RinCoin node (block {blockchain_info.get('blocks', 'unknown')})")
# Start background threads
job_thread = threading.Thread(target=self.job_updater, daemon=True)
job_thread.start()
stats_thread = threading.Thread(target=self.stats_updater, daemon=True)
stats_thread.start()
# Start web interface in background
web_thread = threading.Thread(target=start_web_interface, args=(self.db, '0.0.0.0', 8083), daemon=True)
web_thread.start()
print(f"🌐 Web dashboard started on http://0.0.0.0:8083")
# Start Stratum server
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((self.stratum_host, self.stratum_port))
server_socket.listen(10)
print(f"🚀 Mining pool listening on {self.stratum_host}:{self.stratum_port}")
print("Ready for multiple miners...")
print("")
print(f"💰 Pool address: {self.pool_address}")
print(f"💰 Pool fee: {self.pool_fee_percent}%")
print("")
print("Connect miners with:")
print(f"./cpuminer -a rinhash -o stratum+tcp://{self.stratum_host}:{self.stratum_port} -u username.workername -p x")
print("")
while self.running:
try:
client, addr = server_socket.accept()
client_thread = threading.Thread(target=self.handle_client, args=(client, addr), daemon=True)
client_thread.start()
except KeyboardInterrupt:
print("\n🛑 Shutting down pool...")
self.running = False
break
except Exception as e:
print(f"Server error: {e}")
except OSError as e:
if "Address already in use" in str(e):
print(f"❌ Port {self.stratum_port} is already in use!")
print("")
print("🔍 Check what's using the port:")
print(f"sudo netstat -tlnp | grep :{self.stratum_port}")
print("")
print("🛑 Kill existing process:")
print(f"sudo lsof -ti:{self.stratum_port} | xargs sudo kill -9")
print("")
print("🔄 Or use a different port by editing the script")
else:
print(f"Failed to start server: {e}")
except Exception as e:
print(f"Failed to start server: {e}")
finally:
print("Pool server stopped")
if __name__ == "__main__":
pool = RinCoinMiningPool()
pool.start()