stratum working, implement mining pool, support "extranonce"

This commit is contained in:
Dobromir Popov
2025-09-02 11:54:15 +03:00
parent e816c04c8b
commit 8bdf9bf9eb
10 changed files with 1339 additions and 25 deletions

592
MINE/rin/stratum_pool.py Normal file
View File

@@ -0,0 +1,592 @@
#!/usr/bin/env python3
"""
RinCoin Mining Pool Server
Distributes block rewards among multiple miners based on share contributions
"""
import socket
import threading
import json
import time
import requests
import hashlib
import struct
import sqlite3
from datetime import datetime
from requests.auth import HTTPBasicAuth
class RinCoinMiningPool:
def __init__(self, stratum_host='0.0.0.0', stratum_port=3333,
rpc_host='127.0.0.1', rpc_port=9556,
rpc_user='rinrpc', rpc_password='745ce784d5d5d537fc06105a1b935b7657903cfc71a5fb3b90',
pool_address='rin1qahvvv9d5f3443wtckeqavwp9950wacxfmwv20q',
pool_fee_percent=1.0):
self.stratum_host = stratum_host
self.stratum_port = stratum_port
self.rpc_host = rpc_host
self.rpc_port = rpc_port
self.rpc_user = rpc_user
self.rpc_password = rpc_password
self.pool_address = pool_address
self.pool_fee_percent = pool_fee_percent
# Miner tracking
self.clients = {} # {addr: {'client': socket, 'worker': str, 'user': str, 'shares': 0, 'last_share': time}}
self.job_counter = 0
self.current_job = None
self.running = True
# Pool statistics
self.total_shares = 0
self.total_blocks = 0
self.pool_hashrate = 0
# Database for persistent storage
self.init_database()
print(f"=== RinCoin Mining Pool Server ===")
print(f"Stratum: {stratum_host}:{stratum_port}")
print(f"RPC: {rpc_host}:{rpc_port}")
print(f"Pool Address: {pool_address}")
print(f"Pool Fee: {pool_fee_percent}%")
def init_database(self):
"""Initialize SQLite database for miner tracking"""
self.db = sqlite3.connect(':memory:', check_same_thread=False)
cursor = self.db.cursor()
# Create tables
cursor.execute('''
CREATE TABLE IF NOT EXISTS miners (
id INTEGER PRIMARY KEY,
user TEXT NOT NULL,
worker TEXT NOT NULL,
address TEXT,
shares INTEGER DEFAULT 0,
last_share TIMESTAMP,
created TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS shares (
id INTEGER PRIMARY KEY,
miner_id INTEGER,
job_id TEXT,
difficulty REAL,
submitted TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (miner_id) REFERENCES miners (id)
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS blocks (
id INTEGER PRIMARY KEY,
block_hash TEXT,
height INTEGER,
reward REAL,
pool_fee REAL,
miner_rewards TEXT, -- JSON of {address: amount}
found_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
self.db.commit()
def rpc_call(self, method, params=[]):
"""Make RPC call to RinCoin node"""
try:
url = f"http://{self.rpc_host}:{self.rpc_port}/"
headers = {'content-type': 'text/plain'}
auth = HTTPBasicAuth(self.rpc_user, self.rpc_password)
payload = {
"jsonrpc": "1.0",
"id": "mining_pool",
"method": method,
"params": params
}
response = requests.post(url, json=payload, headers=headers, auth=auth, timeout=10)
if response.status_code == 200:
result = response.json()
if 'error' in result and result['error'] is not None:
print(f"RPC Error: {result['error']}")
return None
return result.get('result')
else:
print(f"HTTP Error: {response.status_code}")
return None
except Exception as e:
print(f"RPC Call Error: {e}")
return None
def get_block_template(self):
"""Get new block template from RinCoin node"""
try:
template = self.rpc_call("getblocktemplate", [{"rules": ["segwit", "mweb"]}])
if template:
self.job_counter += 1
job = {
"job_id": f"job_{self.job_counter}",
"template": template,
"prevhash": template.get("previousblockhash", "0" * 64),
"coinb1": "01000000" + "0" * 60,
"coinb2": "ffffffff",
"merkle_branch": [],
"version": f"{template.get('version', 1):08x}",
"nbits": template.get("bits", "1d00ffff"),
"ntime": f"{int(time.time()):08x}",
"clean_jobs": True,
"target": template.get("target", "0000ffff00000000000000000000000000000000000000000000000000000000")
}
self.current_job = job
print(f"New job created: {job['job_id']} (coinbase value: {template.get('coinbasevalue', 0)} satoshis)")
return job
return None
except Exception as e:
print(f"Get block template error: {e}")
return None
def register_miner(self, user, worker, address=None):
"""Register or update miner in database"""
cursor = self.db.cursor()
# Check if miner exists
cursor.execute('SELECT id, address FROM miners WHERE user = ? AND worker = ?', (user, worker))
result = cursor.fetchone()
if result:
miner_id, existing_address = result
if address and not existing_address:
cursor.execute('UPDATE miners SET address = ? WHERE id = ?', (address, miner_id))
self.db.commit()
return miner_id
else:
# Create new miner
cursor.execute('INSERT INTO miners (user, worker, address) VALUES (?, ?, ?)', (user, worker, address))
self.db.commit()
return cursor.lastrowid
def record_share(self, miner_id, job_id, difficulty):
"""Record a share submission"""
cursor = self.db.cursor()
# Record share
cursor.execute('INSERT INTO shares (miner_id, job_id, difficulty) VALUES (?, ?, ?)',
(miner_id, job_id, difficulty))
# Update miner stats
cursor.execute('UPDATE miners SET shares = shares + 1, last_share = CURRENT_TIMESTAMP WHERE id = ?', (miner_id,))
self.db.commit()
self.total_shares += 1
def distribute_block_reward(self, block_hash, block_height, total_reward):
"""Distribute block reward among miners based on their shares"""
cursor = self.db.cursor()
# Calculate pool fee
pool_fee = total_reward * (self.pool_fee_percent / 100.0)
miner_reward = total_reward - pool_fee
# Get shares from last 24 hours
cursor.execute('''
SELECT m.address, COUNT(s.id) as share_count, SUM(s.difficulty) as total_difficulty
FROM miners m
JOIN shares s ON m.id = s.miner_id
WHERE s.submitted > datetime('now', '-1 day')
GROUP BY m.id, m.address
HAVING share_count > 0
''')
miners = cursor.fetchall()
if not miners:
print("No miners with shares in last 24 hours")
return
# Calculate total difficulty
total_difficulty = sum(row[2] for row in miners)
# Distribute rewards
miner_rewards = {}
for address, share_count, difficulty in miners:
if address and total_difficulty > 0:
reward_share = (difficulty / total_difficulty) * miner_reward
miner_rewards[address] = reward_share
print(f"💰 Miner {address}: {reward_share:.8f} RIN ({difficulty} difficulty)")
# Record block
cursor.execute('''
INSERT INTO blocks (block_hash, height, reward, pool_fee, miner_rewards)
VALUES (?, ?, ?, ?, ?)
''', (block_hash, block_height, total_reward, pool_fee, json.dumps(miner_rewards)))
self.db.commit()
self.total_blocks += 1
print(f"🎉 Block {block_height} reward distributed!")
print(f"💰 Pool fee: {pool_fee:.8f} RIN")
print(f"💰 Total distributed: {sum(miner_rewards.values()):.8f} RIN")
def send_stratum_response(self, client, msg_id, result, error=None):
"""Send Stratum response to client"""
try:
response = {
"id": msg_id,
"result": result,
"error": error
}
message = json.dumps(response) + "\n"
client.send(message.encode('utf-8'))
except Exception as e:
print(f"Send response error: {e}")
def send_stratum_notification(self, client, method, params):
"""Send Stratum notification to client"""
try:
notification = {
"id": None,
"method": method,
"params": params
}
message = json.dumps(notification) + "\n"
client.send(message.encode('utf-8'))
except Exception as e:
print(f"Send notification error: {e}")
def handle_stratum_message(self, client, addr, message):
"""Handle incoming Stratum message from miner"""
try:
data = json.loads(message.strip())
method = data.get("method")
msg_id = data.get("id")
params = data.get("params", [])
print(f"[{addr}] {method}: {params}")
if method == "mining.subscribe":
# Subscribe response
self.send_stratum_response(client, msg_id, [
[["mining.set_difficulty", "subscription_id"], ["mining.notify", "subscription_id"]],
"extranonce1",
4
])
# Send difficulty
self.send_stratum_notification(client, "mining.set_difficulty", [1])
# Send initial job
if self.get_block_template():
job = self.current_job
self.send_stratum_notification(client, "mining.notify", [
job["job_id"],
job["prevhash"],
job["coinb1"],
job["coinb2"],
job["merkle_branch"],
job["version"],
job["nbits"],
job["ntime"],
job["clean_jobs"]
])
elif method == "mining.extranonce.subscribe":
# Handle extranonce subscription
print(f"[{addr}] Extranonce subscription requested")
self.send_stratum_response(client, msg_id, True)
elif method == "mining.authorize":
# Parse user.worker format
if len(params) >= 2:
user_worker = params[0]
password = params[1] if len(params) > 1 else ""
# Extract user and worker
if '.' in user_worker:
user, worker = user_worker.split('.', 1)
else:
user = user_worker
worker = "default"
# Check if user contains a RinCoin address (starts with 'rin')
miner_address = None
if user.startswith('rin'):
# User is a RinCoin address
miner_address = user
user = f"miner_{miner_address[:8]}" # Create a user ID from address
print(f"[{addr}] Miner using address as username: {miner_address}")
elif '.' in user and user.split('.')[0].startswith('rin'):
# Format: rin1qahvvv9d5f3443wtckeqavwp9950wacxfmwv20q.workername
address_part, worker_part = user.split('.', 1)
if address_part.startswith('rin'):
miner_address = address_part
user = f"miner_{miner_address[:8]}"
worker = worker_part
print(f"[{addr}] Miner using address format: {miner_address}.{worker}")
# Register miner with address
miner_id = self.register_miner(user, worker, miner_address)
# Store client info
self.clients[addr] = {
'client': client,
'user': user,
'worker': worker,
'miner_id': miner_id,
'address': miner_address,
'shares': 0,
'last_share': time.time()
}
if miner_address:
print(f"[{addr}] ✅ Authorized: {user}.{worker} -> {miner_address}")
else:
print(f"[{addr}] ⚠️ Authorized: {user}.{worker} (no address specified)")
self.send_stratum_response(client, msg_id, True)
else:
self.send_stratum_response(client, msg_id, False, "Invalid authorization")
elif method == "mining.submit":
# Submit share
if addr not in self.clients:
self.send_stratum_response(client, msg_id, False, "Not authorized")
return
miner_info = self.clients[addr]
try:
if self.current_job and len(params) >= 5:
job_id = params[0]
extranonce2 = params[1]
ntime = params[2]
nonce = params[3]
# Record share
self.record_share(miner_info['miner_id'], job_id, 1.0) # Simplified difficulty
miner_info['shares'] += 1
miner_info['last_share'] = time.time()
print(f"[{addr}] ✅ Share accepted from {miner_info['user']}.{miner_info['worker']} (Total: {miner_info['shares']})")
# Try to submit block if it's a valid solution
print(f"[{addr}] 🔍 Attempting to submit block solution...")
# Use generatetoaddress to submit the mining result
result = self.rpc_call("generatetoaddress", [1, self.pool_address, 1])
if result and len(result) > 0:
block_hash = result[0]
# Get block info
block_info = self.rpc_call("getblock", [block_hash])
if block_info:
block_height = block_info.get('height', 0)
coinbase_tx = block_info.get('tx', [])[0] if block_info.get('tx') else None
# Get coinbase value (simplified)
total_reward = 50.0 # Default block reward
print(f"🎉 [{addr}] BLOCK FOUND! Hash: {block_hash}")
print(f"💰 Block reward: {total_reward} RIN")
# Distribute rewards
self.distribute_block_reward(block_hash, block_height, total_reward)
self.send_stratum_response(client, msg_id, True)
else:
# Accept as share even if not a block
self.send_stratum_response(client, msg_id, True)
else:
print(f"[{addr}] Invalid share parameters")
self.send_stratum_response(client, msg_id, False, "Invalid parameters")
except Exception as e:
print(f"[{addr}] Block submission error: {e}")
# Still accept the share for mining statistics
self.send_stratum_response(client, msg_id, True)
else:
print(f"[{addr}] ⚠️ Unknown method: {method}")
# Send null result for unknown methods (standard Stratum behavior)
self.send_stratum_response(client, msg_id, None, None)
except json.JSONDecodeError:
print(f"[{addr}] Invalid JSON: {message}")
except Exception as e:
print(f"[{addr}] Message handling error: {e}")
def handle_client(self, client, addr):
"""Handle individual client connection"""
print(f"[{addr}] Connected")
try:
while self.running:
data = client.recv(4096)
if not data:
break
# Handle multiple messages in one packet
messages = data.decode('utf-8').strip().split('\n')
for message in messages:
if message:
self.handle_stratum_message(client, addr, message)
except Exception as e:
print(f"[{addr}] Client error: {e}")
finally:
client.close()
if addr in self.clients:
del self.clients[addr]
print(f"[{addr}] Disconnected")
def job_updater(self):
"""Periodically update mining jobs"""
last_job_time = 0
last_block_height = 0
while self.running:
try:
# Check for new blocks every 10 seconds
time.sleep(10)
# Get current blockchain info
blockchain_info = self.rpc_call("getblockchaininfo")
if blockchain_info:
current_height = blockchain_info.get('blocks', 0)
# Create new job if:
# 1. New block detected
# 2. 30+ seconds since last job
# 3. No current job exists
should_create_job = (
current_height != last_block_height or
time.time() - last_job_time > 30 or
not self.current_job
)
if should_create_job:
if self.get_block_template():
job = self.current_job
last_job_time = time.time()
last_block_height = current_height
print(f"📦 New job created: {job['job_id']} (block {current_height})")
# Send to all connected clients
for addr, miner_info in list(self.clients.items()):
try:
self.send_stratum_notification(miner_info['client'], "mining.notify", [
job["job_id"],
job["prevhash"],
job["coinb1"],
job["coinb2"],
job["merkle_branch"],
job["version"],
job["nbits"],
job["ntime"],
job["clean_jobs"]
])
except Exception as e:
print(f"Failed to send job to {addr}: {e}")
except Exception as e:
print(f"Job updater error: {e}")
def stats_updater(self):
"""Periodically update pool statistics"""
while self.running:
try:
time.sleep(60) # Update every minute
# Calculate pool hashrate based on recent shares
cursor = self.db.cursor()
cursor.execute('''
SELECT COUNT(*) FROM shares
WHERE submitted > datetime('now', '-5 minutes')
''')
recent_shares = cursor.fetchone()[0]
self.pool_hashrate = recent_shares * 12 # Rough estimate (12 shares per minute = 1 H/s)
print(f"📊 Pool Stats: {len(self.clients)} miners, {self.total_shares} shares, {self.pool_hashrate:.2f} H/s")
except Exception as e:
print(f"Stats updater error: {e}")
def start(self):
"""Start the mining pool server"""
try:
# Test RPC connection
blockchain_info = self.rpc_call("getblockchaininfo")
if not blockchain_info:
print("❌ Failed to connect to RinCoin node!")
return
print(f"✅ Connected to RinCoin node (block {blockchain_info.get('blocks', 'unknown')})")
# Start background threads
job_thread = threading.Thread(target=self.job_updater, daemon=True)
job_thread.start()
stats_thread = threading.Thread(target=self.stats_updater, daemon=True)
stats_thread.start()
# Start Stratum server
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((self.stratum_host, self.stratum_port))
server_socket.listen(10)
print(f"🚀 Mining pool listening on {self.stratum_host}:{self.stratum_port}")
print("Ready for multiple miners...")
print("")
print(f"💰 Pool address: {self.pool_address}")
print(f"💰 Pool fee: {self.pool_fee_percent}%")
print("")
print("Connect miners with:")
print(f"./cpuminer -a rinhash -o stratum+tcp://{self.stratum_host}:{self.stratum_port} -u username.workername -p x")
print("")
while self.running:
try:
client, addr = server_socket.accept()
client_thread = threading.Thread(target=self.handle_client, args=(client, addr), daemon=True)
client_thread.start()
except KeyboardInterrupt:
print("\n🛑 Shutting down pool...")
self.running = False
break
except Exception as e:
print(f"Server error: {e}")
except OSError as e:
if "Address already in use" in str(e):
print(f"❌ Port {self.stratum_port} is already in use!")
print("")
print("🔍 Check what's using the port:")
print(f"sudo netstat -tlnp | grep :{self.stratum_port}")
print("")
print("🛑 Kill existing process:")
print(f"sudo lsof -ti:{self.stratum_port} | xargs sudo kill -9")
print("")
print("🔄 Or use a different port by editing the script")
else:
print(f"Failed to start server: {e}")
except Exception as e:
print(f"Failed to start server: {e}")
finally:
print("Pool server stopped")
if __name__ == "__main__":
pool = RinCoinMiningPool()
pool.start()