This commit is contained in:
Dobromir Popov
2025-09-07 15:03:47 +03:00
parent 00cda24e71
commit 2d2653551b
132 changed files with 34281 additions and 5 deletions

View File

@@ -0,0 +1,14 @@
set(SOURCES
PoolURI.cpp PoolURI.h
PoolClient.h
PoolManager.h PoolManager.cpp
testing/SimulateClient.h testing/SimulateClient.cpp
stratum/EthStratumClient.h stratum/EthStratumClient.cpp
getwork/EthGetworkClient.h getwork/EthGetworkClient.cpp
)
find_package(OpenSSL REQUIRED)
add_library(poolprotocols ${SOURCES})
target_link_libraries(poolprotocols PRIVATE devcore progminer-buildinfo ethash Boost::system jsoncpp OpenSSL::SSL OpenSSL::Crypto)
target_include_directories(poolprotocols PRIVATE ..)

View File

@@ -0,0 +1,121 @@
#pragma once
#include <queue>
#include <boost/asio/ip/address.hpp>
#include <libethcore/Miner.h>
#include <libpoolprotocols/PoolURI.h>
extern boost::asio::io_service g_io_service;
using namespace std;
namespace dev
{
namespace eth
{
struct Session
{
// Tstamp of sessio start
chrono::steady_clock::time_point start = chrono::steady_clock::now();
// Whether or not worker is subscribed
atomic<bool> subscribed = {false};
// Whether or not worker is authorized
atomic<bool> authorized = {false};
// Total duration of session in minutes
unsigned long duration()
{
return (chrono::duration_cast<chrono::minutes>(chrono::steady_clock::now() - start))
.count();
}
// EthereumStratum (1 and 2)
// Extranonce currently active
uint64_t extraNonce = 0;
// Length of extranonce in bytes
unsigned int extraNonceSizeBytes = 0;
// Next work target
h256 nextWorkBoundary =
h256("0x00000000ffff0000000000000000000000000000000000000000000000000000");
// EthereumStratum (2 only)
bool firstMiningSet = false;
unsigned int timeout = 30; // Default to 30 seconds
string sessionId = "";
string workerId = "";
string algo = "ethash";
unsigned int epoch = 0;
chrono::steady_clock::time_point lastTxStamp = chrono::steady_clock::now();
};
class PoolClient
{
public:
virtual ~PoolClient() noexcept = default;
// Sets the connection definition to be used by the client
void setConnection(std::shared_ptr<URI> _conn)
{
m_conn = _conn;
m_conn->Responds(false);
}
// Gets a pointer to the currently active connection definition
std::shared_ptr<URI> getConnection() { return m_conn; }
// Releases the pointer to the connection definition
void unsetConnection() { m_conn = nullptr; }
virtual void connect() = 0;
virtual void disconnect() = 0;
virtual void submitHashrate(uint64_t const& rate, string const& id) = 0;
virtual void submitSolution(const Solution& solution) = 0;
virtual bool isConnected() { return m_connected.load(memory_order_relaxed); }
virtual bool isPendingState() { return false; }
virtual bool isSubscribed()
{
return (m_session ? m_session->subscribed.load(memory_order_relaxed) : false);
}
virtual bool isAuthorized()
{
return (m_session ? m_session->authorized.load(memory_order_relaxed) : false);
}
virtual string ActiveEndPoint()
{
return (m_connected.load(memory_order_relaxed) ? " [" + toString(m_endpoint) + "]" : "");
}
using SolutionAccepted = function<void(chrono::milliseconds const&, unsigned const&, bool)>;
using SolutionRejected = function<void(chrono::milliseconds const&, unsigned const&)>;
using Disconnected = function<void()>;
using Connected = function<void()>;
using WorkReceived = function<void(WorkPackage const&)>;
void onSolutionAccepted(SolutionAccepted const& _handler) { m_onSolutionAccepted = _handler; }
void onSolutionRejected(SolutionRejected const& _handler) { m_onSolutionRejected = _handler; }
void onDisconnected(Disconnected const& _handler) { m_onDisconnected = _handler; }
void onConnected(Connected const& _handler) { m_onConnected = _handler; }
void onWorkReceived(WorkReceived const& _handler) { m_onWorkReceived = _handler; }
protected:
unique_ptr<Session> m_session = nullptr;
std::atomic<bool> m_connected = {false}; // This is related to socket ! Not session
boost::asio::ip::basic_endpoint<boost::asio::ip::tcp> m_endpoint;
std::shared_ptr<URI> m_conn = nullptr;
SolutionAccepted m_onSolutionAccepted;
SolutionRejected m_onSolutionRejected;
Disconnected m_onDisconnected;
Connected m_onConnected;
WorkReceived m_onWorkReceived;
};
} // namespace eth
} // namespace dev

View File

@@ -0,0 +1,525 @@
#include <chrono>
#include "PoolManager.h"
using namespace std;
using namespace dev;
using namespace eth;
PoolManager* PoolManager::m_this = nullptr;
PoolManager::PoolManager(PoolSettings _settings)
: m_Settings(std::move(_settings)),
m_io_strand(g_io_service),
m_failovertimer(g_io_service),
m_submithrtimer(g_io_service)
{
m_this = this;
m_currentWp.header = h256();
Farm::f().onMinerRestart([&]() {
cnote << "Restart miners...";
if (Farm::f().isMining())
{
cnote << "Shutting down miners...";
Farm::f().stop();
}
cnote << "Spinning up miners...";
Farm::f().start();
});
Farm::f().onSolutionFound([&](const Solution& sol) {
// Solution should passthrough only if client is
// properly connected. Otherwise we'll have the bad behavior
// to log nonce submission but receive no response
if (p_client && p_client->isConnected())
{
p_client->submitSolution(sol);
}
else
{
cnote << string(EthOrange "Solution 0x") + toHex(sol.nonce)
<< " wasted. Waiting for connection...";
}
return false;
});
}
void PoolManager::setClientHandlers()
{
p_client->onConnected([&]() {
{
// If HostName is already an IP address no need to append the
// effective ip address.
if (p_client->getConnection()->HostNameType() == dev::UriHostNameType::Dns ||
p_client->getConnection()->HostNameType() == dev::UriHostNameType::Basic)
{
string ep = p_client->ActiveEndPoint();
if (!ep.empty())
m_selectedHost = p_client->getConnection()->Host() + ep;
}
cnote << "Established connection to " << m_selectedHost;
// Reset current WorkPackage
m_currentWp.job.clear();
m_currentWp.header = h256();
// Shuffle if needed
if (Farm::f().get_ergodicity() == 1U)
Farm::f().shuffle();
// Rough implementation to return to primary pool
// after specified amount of time
if (m_activeConnectionIdx != 0 && m_Settings.poolFailoverTimeout)
{
m_failovertimer.expires_from_now(
boost::posix_time::minutes(m_Settings.poolFailoverTimeout));
m_failovertimer.async_wait(m_io_strand.wrap(boost::bind(
&PoolManager::failovertimer_elapsed, this, boost::asio::placeholders::error)));
}
else
{
m_failovertimer.cancel();
}
}
if (!Farm::f().isMining())
{
cnote << "Spinning up miners...";
Farm::f().start();
}
else if (Farm::f().paused())
{
cnote << "Resume mining ...";
Farm::f().resume();
}
// Activate timing for HR submission
if (m_Settings.reportHashrate)
{
m_submithrtimer.expires_from_now(boost::posix_time::seconds(m_Settings.hashRateInterval));
m_submithrtimer.async_wait(m_io_strand.wrap(boost::bind(
&PoolManager::submithrtimer_elapsed, this, boost::asio::placeholders::error)));
}
// Signal async operations have completed
m_async_pending.store(false, std::memory_order_relaxed);
});
p_client->onDisconnected([&]() {
cnote << "Disconnected from " << m_selectedHost;
// Clear current connection
p_client->unsetConnection();
m_currentWp.header = h256();
// Stop timing actors
m_failovertimer.cancel();
m_submithrtimer.cancel();
if (m_stopping.load(std::memory_order_relaxed))
{
if (Farm::f().isMining())
{
cnote << "Shutting down miners...";
Farm::f().stop();
}
m_running.store(false, std::memory_order_relaxed);
}
else
{
// Signal we will reconnect async
m_async_pending.store(true, std::memory_order_relaxed);
// Suspend mining and submit new connection request
cnote << "No connection. Suspend mining ...";
Farm::f().pause();
g_io_service.post(m_io_strand.wrap(boost::bind(&PoolManager::rotateConnect, this)));
}
});
p_client->onWorkReceived([&](WorkPackage const& wp) {
// Should not happen !
if (!wp)
return;
int _currentEpoch = m_currentWp.epoch;
bool newEpoch = (_currentEpoch == -1);
// In EthereumStratum/2.0.0 epoch number is set in session
if (!newEpoch)
{
if (p_client->getConnection()->StratumMode() == 3)
newEpoch = (wp.epoch != m_currentWp.epoch);
else
newEpoch = (wp.seed != m_currentWp.seed);
}
bool newDiff = (wp.boundary != m_currentWp.boundary);
m_currentWp = wp;
if (newEpoch)
{
m_epochChanges.fetch_add(1, std::memory_order_relaxed);
// If epoch is valued in workpackage take it
if (wp.epoch == -1)
{
if (m_currentWp.block > 0)
m_currentWp.epoch = m_currentWp.block / 30000;
else
m_currentWp.epoch = ethash::find_epoch_number(
ethash::hash256_from_bytes(m_currentWp.seed.data()));
}
}
else
{
m_currentWp.epoch = _currentEpoch;
}
if (newDiff || newEpoch)
showMiningAt();
cnote << "Job: " EthWhite << m_currentWp.header.abridged()
<< (m_currentWp.block != -1 ? (" block " + to_string(m_currentWp.block)) : "")
<< EthReset << " " << m_selectedHost;
Farm::f().setWork(m_currentWp);
});
p_client->onSolutionAccepted(
[&](std::chrono::milliseconds const& _responseDelay, unsigned const& _minerIdx, bool _asStale) {
std::stringstream ss;
ss << std::setw(4) << std::setfill(' ') << _responseDelay.count() << " ms. "
<< m_selectedHost;
cnote << EthLime "**Accepted" << (_asStale ? " stale": "") << EthReset << ss.str();
Farm::f().accountSolution(_minerIdx, SolutionAccountingEnum::Accepted);
});
p_client->onSolutionRejected(
[&](std::chrono::milliseconds const& _responseDelay, unsigned const& _minerIdx) {
std::stringstream ss;
ss << std::setw(4) << std::setfill(' ') << _responseDelay.count() << " ms. "
<< m_selectedHost;
cwarn << EthRed "**Rejected" EthReset << ss.str();
Farm::f().accountSolution(_minerIdx, SolutionAccountingEnum::Rejected);
});
}
void PoolManager::stop()
{
if (m_running.load(std::memory_order_relaxed))
{
m_async_pending.store(true, std::memory_order_relaxed);
m_stopping.store(true, std::memory_order_relaxed);
if (p_client && p_client->isConnected())
{
p_client->disconnect();
// Wait for async operations to complete
while (m_running.load(std::memory_order_relaxed))
this_thread::sleep_for(chrono::milliseconds(500));
p_client = nullptr;
}
else
{
// Stop timing actors
m_failovertimer.cancel();
m_submithrtimer.cancel();
if (Farm::f().isMining())
{
cnote << "Shutting down miners...";
Farm::f().stop();
}
}
}
}
void PoolManager::addConnection(std::string _connstring)
{
m_Settings.connections.push_back(std::shared_ptr<URI>(new URI(_connstring)));
}
void PoolManager::addConnection(std::shared_ptr<URI> _uri)
{
m_Settings.connections.push_back(_uri);
}
/*
* Remove a connection
* Returns: 0 on success
* -1 failure (out of bounds)
* -2 failure (active connection should be deleted)
*/
void PoolManager::removeConnection(unsigned int idx)
{
// Are there any outstanding operations ?
if (m_async_pending.load(std::memory_order_relaxed))
throw std::runtime_error("Outstanding operations. Retry ...");
// Check bounds
if (idx >= m_Settings.connections.size())
throw std::runtime_error("Index out-of bounds.");
// Can't delete active connection
if (idx == m_activeConnectionIdx)
throw std::runtime_error("Can't remove active connection");
// Remove the selected connection
m_Settings.connections.erase(m_Settings.connections.begin() + idx);
if (m_activeConnectionIdx > idx)
m_activeConnectionIdx--;
}
void PoolManager::setActiveConnectionCommon(unsigned int idx)
{
// Are there any outstanding operations ?
bool ex = false;
if (!m_async_pending.compare_exchange_weak(ex, true, std::memory_order_relaxed))
throw std::runtime_error("Outstanding operations. Retry ...");
if (idx != m_activeConnectionIdx)
{
m_connectionSwitches.fetch_add(1, std::memory_order_relaxed);
m_activeConnectionIdx = idx;
m_connectionAttempt = 0;
p_client->disconnect();
}
else
{
// Release the flag immediately
m_async_pending.store(false, std::memory_order_relaxed);
}
}
/*
* Sets the active connection
* Returns: 0 on success, -1 on failure (out of bounds)
*/
void PoolManager::setActiveConnection(unsigned int idx)
{
// Sets the active connection to the requested index
if (idx >= m_Settings.connections.size())
throw std::runtime_error("Index out-of bounds.");
setActiveConnectionCommon(idx);
}
void PoolManager::setActiveConnection(std::string& _connstring)
{
bool found = false;
for (size_t idx = 0; idx < m_Settings.connections.size(); idx++)
if (boost::iequals(m_Settings.connections[idx]->str(), _connstring))
{
setActiveConnectionCommon(idx);
break;
}
if (!found)
throw std::runtime_error("Not found.");
}
std::shared_ptr<URI> PoolManager::getActiveConnection()
{
try
{
return m_Settings.connections.at(m_activeConnectionIdx);
}
catch (const std::exception&)
{
return nullptr;
}
}
Json::Value PoolManager::getConnectionsJson()
{
// Returns the list of configured connections
Json::Value jRes;
for (size_t i = 0; i < m_Settings.connections.size(); i++)
{
Json::Value JConn;
JConn["index"] = (unsigned)i;
JConn["active"] = (i == m_activeConnectionIdx ? true : false);
JConn["uri"] = m_Settings.connections[i]->str();
jRes.append(JConn);
}
return jRes;
}
void PoolManager::start()
{
m_running.store(true, std::memory_order_relaxed);
m_async_pending.store(true, std::memory_order_relaxed);
m_connectionSwitches.fetch_add(1, std::memory_order_relaxed);
g_io_service.post(m_io_strand.wrap(boost::bind(&PoolManager::rotateConnect, this)));
}
void PoolManager::rotateConnect()
{
if (p_client && p_client->isConnected())
return;
// Check we're within bounds
if (m_activeConnectionIdx >= m_Settings.connections.size())
m_activeConnectionIdx = 0;
// If this connection is marked Unrecoverable then discard it
if (m_Settings.connections.at(m_activeConnectionIdx)->IsUnrecoverable())
{
m_Settings.connections.erase(m_Settings.connections.begin() + m_activeConnectionIdx);
m_connectionAttempt = 0;
if (m_activeConnectionIdx >= m_Settings.connections.size())
m_activeConnectionIdx = 0;
m_connectionSwitches.fetch_add(1, std::memory_order_relaxed);
}
else if (m_connectionAttempt >= m_Settings.connectionMaxRetries)
{
// If this is the only connection we can't rotate
// forever
if (m_Settings.connections.size() == 1)
{
m_Settings.connections.erase(m_Settings.connections.begin() + m_activeConnectionIdx);
}
// Rotate connections if above max attempts threshold
else
{
m_connectionAttempt = 0;
m_activeConnectionIdx++;
if (m_activeConnectionIdx >= m_Settings.connections.size())
m_activeConnectionIdx = 0;
m_connectionSwitches.fetch_add(1, std::memory_order_relaxed);
}
}
if (!m_Settings.connections.empty() && m_Settings.connections.at(m_activeConnectionIdx)->Host() != "exit")
{
if (p_client)
p_client = nullptr;
if (m_Settings.connections.at(m_activeConnectionIdx)->Family() == ProtocolFamily::GETWORK)
p_client =
std::unique_ptr<PoolClient>(new EthGetworkClient(m_Settings.noWorkTimeout, m_Settings.getWorkPollInterval));
if (m_Settings.connections.at(m_activeConnectionIdx)->Family() == ProtocolFamily::STRATUM)
p_client = std::unique_ptr<PoolClient>(
new EthStratumClient(m_Settings.noWorkTimeout, m_Settings.noResponseTimeout));
if (m_Settings.connections.at(m_activeConnectionIdx)->Family() == ProtocolFamily::SIMULATION)
p_client = std::unique_ptr<PoolClient>(
new SimulateClient(m_Settings.benchmarkBlock, m_Settings.benchmarkDiff));
if (p_client)
setClientHandlers();
// Count connectionAttempts
m_connectionAttempt++;
// Invoke connections
m_selectedHost = m_Settings.connections.at(m_activeConnectionIdx)->Host() + ":" +
to_string(m_Settings.connections.at(m_activeConnectionIdx)->Port());
p_client->setConnection(m_Settings.connections.at(m_activeConnectionIdx));
cnote << "Selected pool " << m_selectedHost;
p_client->connect();
}
else
{
if (m_Settings.connections.empty())
cnote << "No more connections to try. Exiting...";
else
cnote << "'exit' failover just got hit. Exiting...";
// Stop mining if applicable
if (Farm::f().isMining())
{
cnote << "Shutting down miners...";
Farm::f().stop();
}
m_running.store(false, std::memory_order_relaxed);
raise(SIGTERM);
}
}
void PoolManager::showMiningAt()
{
// Should not happen
if (!m_currentWp)
return;
double d = dev::getHashesToTarget(m_currentWp.boundary.hex(HexPrefix::Add));
cnote << "Epoch : " EthWhite << m_currentWp.epoch << EthReset << " Difficulty : " EthWhite
<< dev::getFormattedHashes(d) << EthReset;
}
void PoolManager::failovertimer_elapsed(const boost::system::error_code& ec)
{
if (!ec)
{
if (m_running.load(std::memory_order_relaxed))
{
if (m_activeConnectionIdx != 0)
{
m_activeConnectionIdx = 0;
m_connectionAttempt = 0;
m_connectionSwitches.fetch_add(1, std::memory_order_relaxed);
cnote << "Failover timeout reached, retrying connection to primary pool";
p_client->disconnect();
}
}
}
}
void PoolManager::submithrtimer_elapsed(const boost::system::error_code& ec)
{
if (!ec)
{
if (m_running.load(std::memory_order_relaxed))
{
if (p_client && p_client->isConnected())
p_client->submitHashrate((uint32_t)Farm::f().HashRate(), m_Settings.hashRateId);
// Resubmit actor
m_submithrtimer.expires_from_now(boost::posix_time::seconds(m_Settings.hashRateInterval));
m_submithrtimer.async_wait(m_io_strand.wrap(boost::bind(
&PoolManager::submithrtimer_elapsed, this, boost::asio::placeholders::error)));
}
}
}
int PoolManager::getCurrentEpoch()
{
return m_currentWp.epoch;
}
double PoolManager::getCurrentDifficulty()
{
if (!m_currentWp)
return 0.0;
return dev::getHashesToTarget(m_currentWp.boundary.hex(HexPrefix::Add));
}
unsigned PoolManager::getConnectionSwitches()
{
return m_connectionSwitches.load(std::memory_order_relaxed);
}
unsigned PoolManager::getEpochChanges()
{
return m_epochChanges.load(std::memory_order_relaxed);
}

View File

@@ -0,0 +1,98 @@
#pragma once
#include <iostream>
#include <jsoncpp/json/json.h>
#include <libdevcore/Worker.h>
#include <libethcore/Farm.h>
#include <libethcore/Miner.h>
#include "PoolClient.h"
#include "getwork/EthGetworkClient.h"
#include "stratum/EthStratumClient.h"
#include "testing/SimulateClient.h"
using namespace std;
namespace dev
{
namespace eth
{
struct PoolSettings
{
std::vector<std::shared_ptr<URI>> connections; // List of connection definitions
unsigned getWorkPollInterval = 500; // Interval (ms) between getwork requests
unsigned noWorkTimeout = 100000; // If no new jobs in this number of seconds drop connection
unsigned noResponseTimeout = 2; // If no response in this number of seconds drop connection
unsigned poolFailoverTimeout = 0; // Return to primary pool after this number of minutes
bool reportHashrate = false; // Whether or not to report hashrate to pool
unsigned hashRateInterval = 60; // Interval in seconds among hashrate submissions
std::string hashRateId =
h256::random().hex(HexPrefix::Add); // Unique identifier for HashRate submission
unsigned connectionMaxRetries = 3; // Max number of connection retries
unsigned benchmarkBlock = 0; // Block number used by SimulateClient to test performances
float benchmarkDiff = 1.0; // Difficulty used by SimulateClient to test performances
};
class PoolManager
{
public:
PoolManager(PoolSettings _settings);
static PoolManager& p() { return *m_this; }
void addConnection(std::string _connstring);
void addConnection(std::shared_ptr<URI> _uri);
Json::Value getConnectionsJson();
void setActiveConnection(unsigned int idx);
void setActiveConnection(std::string& _connstring);
std::shared_ptr<URI> getActiveConnection();
void removeConnection(unsigned int idx);
void start();
void stop();
bool isConnected() { return p_client->isConnected(); };
bool isRunning() { return m_running; };
int getCurrentEpoch();
double getCurrentDifficulty();
unsigned getConnectionSwitches();
unsigned getEpochChanges();
private:
void rotateConnect();
void setClientHandlers();
void showMiningAt();
void setActiveConnectionCommon(unsigned int idx);
PoolSettings m_Settings;
void failovertimer_elapsed(const boost::system::error_code& ec);
void submithrtimer_elapsed(const boost::system::error_code& ec);
std::atomic<bool> m_running = {false};
std::atomic<bool> m_stopping = {false};
std::atomic<bool> m_async_pending = {false};
unsigned m_connectionAttempt = 0;
std::string m_selectedHost = ""; // Holds host name (and endpoint) of selected connection
std::atomic<unsigned> m_connectionSwitches = {0};
unsigned m_activeConnectionIdx = 0;
WorkPackage m_currentWp;
boost::asio::io_service::strand m_io_strand;
boost::asio::deadline_timer m_failovertimer;
boost::asio::deadline_timer m_submithrtimer;
std::unique_ptr<PoolClient> p_client = nullptr;
std::atomic<unsigned> m_epochChanges = {0};
static PoolManager* m_this;
};
} // namespace eth
} // namespace dev

View File

@@ -0,0 +1,428 @@
/*
This file is part of progminer.
progminer is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
progminer is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with progminer. If not, see <http://www.gnu.org/licenses/>.
*/
#include <algorithm>
#include <iostream>
#include <map>
#include <sstream>
#include <cstring>
#include <libpoolprotocols/PoolURI.h>
using namespace dev;
struct SchemeAttributes
{
ProtocolFamily family;
SecureLevel secure;
unsigned version;
};
static std::map<std::string, SchemeAttributes> s_schemes = {
/*
This schemes are kept for backwards compatibility.
Progminer do perform stratum autodetection
*/
{"stratum+tcp", {ProtocolFamily::STRATUM, SecureLevel::NONE, 0}},
{"stratum1+tcp", {ProtocolFamily::STRATUM, SecureLevel::NONE, 1}},
{"stratum2+tcp", {ProtocolFamily::STRATUM, SecureLevel::NONE, 2}},
{"stratum3+tcp", {ProtocolFamily::STRATUM, SecureLevel::NONE, 3}},
{"stratum+tls", {ProtocolFamily::STRATUM, SecureLevel::TLS, 0}},
{"stratum1+tls", {ProtocolFamily::STRATUM, SecureLevel::TLS, 1}},
{"stratum2+tls", {ProtocolFamily::STRATUM, SecureLevel::TLS, 2}},
{"stratum3+tls", {ProtocolFamily::STRATUM, SecureLevel::TLS, 3}},
{"stratum+tls12", {ProtocolFamily::STRATUM, SecureLevel::TLS12, 0}},
{"stratum1+tls12", {ProtocolFamily::STRATUM, SecureLevel::TLS12, 1}},
{"stratum2+tls12", {ProtocolFamily::STRATUM, SecureLevel::TLS12, 2}},
{"stratum3+tls12", {ProtocolFamily::STRATUM, SecureLevel::TLS12, 3}},
{"stratum+ssl", {ProtocolFamily::STRATUM, SecureLevel::TLS12, 0}},
{"stratum1+ssl", {ProtocolFamily::STRATUM, SecureLevel::TLS12, 1}},
{"stratum2+ssl", {ProtocolFamily::STRATUM, SecureLevel::TLS12, 2}},
{"stratum3+ssl", {ProtocolFamily::STRATUM, SecureLevel::TLS12, 3}},
{"http", {ProtocolFamily::GETWORK, SecureLevel::NONE, 0}},
{"getwork", {ProtocolFamily::GETWORK, SecureLevel::NONE, 0}},
/*
Any TCP scheme has, at the moment, only STRATUM protocol thus
reiterating "stratum" word would be pleonastic
Version 9 means auto-detect stratum mode
*/
{"stratum", {ProtocolFamily::STRATUM, SecureLevel::NONE, 999}},
{"stratums", {ProtocolFamily::STRATUM, SecureLevel::TLS, 999}},
{"stratumss", {ProtocolFamily::STRATUM, SecureLevel::TLS12, 999}},
/*
The following scheme is only meant for simulation operations
It's not meant to be used with -P arguments
*/
{"simulation", {ProtocolFamily::SIMULATION, SecureLevel::NONE, 999}}
};
static bool url_decode(const std::string& in, std::string& out)
{
out.clear();
out.reserve(in.size());
for (std::size_t i = 0; i < in.size(); ++i)
{
if (in[i] == '%')
{
if (i + 3 <= in.size())
{
int value = 0;
std::istringstream is(in.substr(i + 1, 2));
if (is >> std::hex >> value)
{
out += static_cast<char>(value);
i += 2;
}
else
{
return false;
}
}
else
{
return false;
}
}
else if (in[i] == '+')
{
out += ' ';
}
else
{
out += in[i];
}
}
return true;
}
/*
For a well designed explanation of URI parts
refer to https://cpp-netlib.org/0.10.1/in_depth/uri.html
*/
URI::URI(std::string uri, bool _sim) : m_uri{std::move(uri)}
{
std::regex sch_auth("^([a-zA-Z0-9\\+]{1,})\\:\\/\\/(.*)$");
std::smatch matches;
if (!std::regex_search(m_uri, matches, sch_auth, std::regex_constants::match_default))
return;
// Split scheme and authoority
// Authority MUST be valued
m_scheme = matches[1].str();
boost::algorithm::to_lower(m_scheme);
m_authority = matches[2].str();
// Missing authority is not possible
if (m_authority.empty())
throw std::runtime_error("Invalid authority");
// Simulation scheme is only allowed if specifically set
if (!_sim && m_scheme == "simulation")
throw std::runtime_error("Invalid scheme");
// Check scheme is allowed
if ((s_schemes.find(m_scheme) == s_schemes.end()))
throw std::runtime_error("Invalid scheme");
// Now let's see if authority part can be split into userinfo and "the rest"
std::regex usr_url("^(.*)\\@(.*)$");
if (std::regex_search(m_authority, matches, usr_url, std::regex_constants::match_default))
{
m_userinfo = matches[1].str();
m_urlinfo = matches[2].str();
}
else
{
m_urlinfo = m_authority;
}
/*
If m_userinfo present and valued it can be composed by either :
- user
- user.worker
- user.worker:password
- user:password
In other words . delimits the beginning of worker and : delimits
the beginning of password
*/
if (!m_userinfo.empty())
{
// Save all parts enclosed in backticks into a dictionary
// and replace them with tokens in the authority
std::regex btick("`((?:[^`])*)`");
std::map<std::string, std::string> btick_blocks;
auto btick_blocks_begin =
std::sregex_iterator(m_authority.begin(), m_authority.end(), btick);
auto btick_blocks_end = std::sregex_iterator();
int i = 0;
for (std::sregex_iterator it = btick_blocks_begin; it != btick_blocks_end; ++it)
{
std::smatch match = *it;
std::string match_str = match[1].str();
btick_blocks["_" + std::to_string(i++)] = match[1].str();
}
if (btick_blocks.size())
{
std::map<std::string, std::string>::iterator it;
for (it = btick_blocks.begin(); it != btick_blocks.end(); it++)
boost::replace_all(m_userinfo, "`" + it->second + "`", "`" + it->first + "`");
}
std::vector<std::regex> usr_patterns;
usr_patterns.push_back(std::regex("^(.*)\\.(.*)\\:(.*)$"));
usr_patterns.push_back(std::regex("^(.*)\\:(.*)$"));
usr_patterns.push_back(std::regex("^(.*)\\.(.*)$"));
bool usrMatchFound = false;
for (size_t i = 0; i < usr_patterns.size() && !usrMatchFound; i++)
{
if (std::regex_search(
m_userinfo, matches, usr_patterns.at(i), std::regex_constants::match_default))
{
usrMatchFound = true;
switch (i)
{
case 0:
m_user = matches[1].str();
m_worker = matches[2].str();
m_password = matches[3].str();
break;
case 1:
m_user = matches[1];
m_password = matches[2];
break;
case 2:
m_user = matches[1];
m_worker = matches[2];
break;
default:
break;
}
}
}
// If no matches found after this loop it means all the user
// part is only user login
if (!usrMatchFound)
m_user = m_userinfo;
// Replace all tokens with their respective values
if (btick_blocks.size())
{
std::map<std::string, std::string>::iterator it;
for (it = btick_blocks.begin(); it != btick_blocks.end(); it++)
{
boost::replace_all(m_userinfo, "`" + it->first + "`", it->second);
boost::replace_all(m_user, "`" + it->first + "`", it->second);
boost::replace_all(m_worker, "`" + it->first + "`", it->second);
boost::replace_all(m_password, "`" + it->first + "`", it->second);
}
}
}
/*
Let's process the url part which must contain at least a host
an optional port and eventually a path (which may include a query
and a fragment)
Host can be a DNS host or an IP address.
Thus we can have
- host
- host/path
- host:port
- host:port/path
*/
size_t offset = m_urlinfo.find('/');
if (offset != std::string::npos)
{
m_hostinfo = m_urlinfo.substr(0, offset);
m_pathinfo = m_urlinfo.substr(offset);
}
else
{
m_hostinfo = m_urlinfo;
}
boost::algorithm::to_lower(m_hostinfo); // needed to ensure we properly hit "exit" as host
std::regex host_pattern("^(.*)\\:([0-9]{1,5})$");
if (std::regex_search(m_hostinfo, matches, host_pattern, std::regex_constants::match_default))
{
m_host = matches[1].str();
m_port = boost::lexical_cast<short>(matches[2].str());
}
else
{
m_host = m_hostinfo;
}
// Host info must be present and valued
if (m_host.empty())
throw std::runtime_error("Missing host");
/*
Eventually split path info into path query fragment
*/
if (!m_pathinfo.empty())
{
// Url Decode Path
std::vector<std::regex> path_patterns;
path_patterns.push_back(std::regex("(\\/.*)\\?(.*)\\#(.*)$"));
path_patterns.push_back(std::regex("(\\/.*)\\#(.*)$"));
path_patterns.push_back(std::regex("(\\/.*)\\?(.*)$"));
bool pathMatchFound = false;
for (size_t i = 0; i < path_patterns.size() && !pathMatchFound; i++)
{
if (std::regex_search(
m_pathinfo, matches, path_patterns.at(i), std::regex_constants::match_default))
{
pathMatchFound = true;
switch (i)
{
case 0:
m_path = matches[1].str();
m_query = matches[2].str();
m_fragment = matches[3].str();
break;
case 1:
m_path = matches[1].str();
m_fragment = matches[2].str();
break;
case 2:
m_path = matches[1].str();
m_query = matches[2].str();
break;
default:
break;
}
}
// If no matches found after this loop it means all the pathinfo
// part is only path login
if (!pathMatchFound)
m_path = m_pathinfo;
}
}
// Determine host type
boost::system::error_code ec;
boost::asio::ip::address address = boost::asio::ip::address::from_string(m_host, ec);
if (!ec)
{
// This is a valid Ip Address
if (address.is_v4())
m_hostType = UriHostNameType::IPV4;
if (address.is_v6())
m_hostType = UriHostNameType::IPV6;
m_isLoopBack = address.is_loopback();
}
else
{
// Check if valid DNS hostname
std::regex hostNamePattern(
"^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\\-]*[a-zA-Z0-9])\\.)*([A-Za-z0-9]|[A-Za-z0-9][A-"
"Za-z0-9\\-]*[A-Za-z0-9])$");
if (std::regex_match(m_host, hostNamePattern))
m_hostType = UriHostNameType::Dns;
else
m_hostType = UriHostNameType::Basic;
}
if (!m_user.empty())
boost::replace_all(m_user, "`", "");
if (!m_password.empty())
boost::replace_all(m_password, "`", "");
if (!m_worker.empty())
boost::replace_all(m_worker, "`", "");
// Eventually decode every encoded char
std::string tmpStr;
if (url_decode(m_userinfo, tmpStr))
m_userinfo = tmpStr;
if (url_decode(m_urlinfo, tmpStr))
m_urlinfo = tmpStr;
if (url_decode(m_hostinfo, tmpStr))
m_hostinfo = tmpStr;
if (url_decode(m_pathinfo, tmpStr))
m_pathinfo = tmpStr;
if (url_decode(m_path, tmpStr))
m_path = tmpStr;
if (url_decode(m_query, tmpStr))
m_query = tmpStr;
if (url_decode(m_fragment, tmpStr))
m_fragment = tmpStr;
if (url_decode(m_user, tmpStr))
m_user = tmpStr;
if (url_decode(m_password, tmpStr))
m_password = tmpStr;
if (url_decode(m_worker, tmpStr))
m_worker = tmpStr;
}
ProtocolFamily URI::Family() const
{
return s_schemes[m_scheme].family;
}
unsigned URI::Version() const
{
return s_schemes[m_scheme].version;
}
std::string URI::UserDotWorker() const
{
std::string _ret = m_user;
if (!m_worker.empty())
_ret.append("." + m_worker);
return _ret;
}
SecureLevel URI::SecLevel() const
{
return s_schemes[m_scheme].secure;
}
UriHostNameType URI::HostNameType() const
{
return m_hostType;
}
bool URI::IsLoopBack() const
{
return m_isLoopBack;
}
std::string URI::KnownSchemes(ProtocolFamily family)
{
std::string schemes;
for (const auto& s : s_schemes)
{
if ((s.second.family == family) && (s.second.version != 999))
schemes += s.first + " ";
}
return schemes;
}

View File

@@ -0,0 +1,122 @@
/*
This file is part of progminer.
progminer is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
progminer is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with progminer. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <regex>
#include <string>
#include <boost/algorithm/string.hpp>
#include <boost/asio.hpp>
#include <boost/lexical_cast.hpp>
// A simple URI parser specifically for mining pool endpoints
namespace dev
{
enum class SecureLevel
{
NONE = 0,
TLS12,
TLS
};
enum class ProtocolFamily
{
GETWORK = 0,
STRATUM,
SIMULATION
};
enum class UriHostNameType
{
Unknown = 0, // The type of the host name is not supplied
Basic = 1, // The host is set, but the type cannot be determined
Dns = 2, // The host name is a domain name system(DNS) style host name
IPV4 = 3, // The host name is an Internet Protocol(IP) version 4 host address
IPV6 = 4 // The host name is an Internet Protocol(IP) version 6 host address.
};
class URI
{
public:
URI() = delete;
URI(std::string uri, bool _sim = false);
std::string Scheme() const { return m_scheme; }
std::string Host() const { return m_host; }
std::string Path() const { return m_path; }
unsigned short Port() const { return m_port; }
std::string User() const { return m_user; }
std::string Pass() const { return m_password; }
std::string Workername() const { return m_worker; }
std::string UserDotWorker() const;
SecureLevel SecLevel() const;
ProtocolFamily Family() const;
UriHostNameType HostNameType() const;
bool IsLoopBack() const;
unsigned Version() const;
std::string str() const { return m_uri; }
static std::string KnownSchemes(ProtocolFamily family);
void SetStratumMode(unsigned mode, bool confirmed)
{
m_stratumMode = mode;
m_stratumModeConfirmed = confirmed;
}
void SetStratumMode(unsigned mode) { m_stratumMode = mode; }
unsigned StratumMode() { return m_stratumMode; }
bool StratumModeConfirmed() { return m_stratumModeConfirmed; }
bool IsUnrecoverable() { return m_unrecoverable; }
void MarkUnrecoverable() { m_unrecoverable = true; }
bool Responds() { return m_responds; }
void Responds(bool _value) { m_responds = _value; }
void addDuration(unsigned long _minutes) { m_totalDuration += _minutes; }
unsigned long getDuration() { return m_totalDuration; }
private:
std::string m_scheme;
std::string m_authority; // Contains all text after scheme
std::string m_userinfo; // Contains the userinfo part
std::string m_urlinfo; // Contains the urlinfo part
std::string m_hostinfo; // Contains the hostinfo part
std::string m_pathinfo; // Contains the pathinfo part
std::string m_host;
std::string m_path;
std::string m_query;
std::string m_fragment;
std::string m_user;
std::string m_password = "X";
std::string m_worker;
std::string m_uri;
unsigned short m_stratumMode = 999; // Initial value 999 means not tested yet
unsigned short m_port = 0;
bool m_stratumModeConfirmed = false;
bool m_unrecoverable = false;
bool m_responds = false;
UriHostNameType m_hostType = UriHostNameType::Unknown;
bool m_isLoopBack;
unsigned long m_totalDuration; // Total duration on this connection in minutes
};
} // namespace dev

View File

@@ -0,0 +1,583 @@
#include "EthGetworkClient.h"
#include <chrono>
#include <boost/bind/bind.hpp>
#include <ethash/ethash.hpp>
using namespace std;
using namespace dev;
using namespace eth;
using boost::asio::ip::tcp;
EthGetworkClient::EthGetworkClient(int worktimeout, unsigned farmRecheckPeriod)
: PoolClient(),
m_farmRecheckPeriod(farmRecheckPeriod),
m_io_strand(g_io_service),
m_socket(g_io_service),
m_resolver(g_io_service),
m_endpoints(),
m_getwork_timer(g_io_service),
m_worktimeout(worktimeout)
{
m_jSwBuilder.settings_["indentation"] = "";
Json::Value jGetWork;
jGetWork["id"] = unsigned(1);
jGetWork["jsonrpc"] = "2.0";
jGetWork["method"] = "eth_getWork";
jGetWork["params"] = Json::Value(Json::arrayValue);
m_jsonGetWork = std::string(Json::writeString(m_jSwBuilder, jGetWork));
}
EthGetworkClient::~EthGetworkClient()
{
// Do not stop io service.
// It's global
}
void EthGetworkClient::connect()
{
// Prevent unnecessary and potentially dangerous recursion
bool expected = false;
if (!m_connecting.compare_exchange_weak(expected, true, memory_order::memory_order_relaxed))
return;
// Reset status flags
m_getwork_timer.cancel();
// Initialize a new queue of end points
m_endpoints = std::queue<boost::asio::ip::basic_endpoint<boost::asio::ip::tcp>>();
m_endpoint = boost::asio::ip::basic_endpoint<boost::asio::ip::tcp>();
if (m_conn->HostNameType() == dev::UriHostNameType::Dns ||
m_conn->HostNameType() == dev::UriHostNameType::Basic)
{
// Begin resolve all ips associated to hostname
// calling the resolver each time is useful as most
// load balancers will give Ips in different order
m_resolver = boost::asio::ip::tcp::resolver(g_io_service);
boost::asio::ip::tcp::resolver::query q(m_conn->Host(), toString(m_conn->Port()));
// Start resolving async
m_resolver.async_resolve(
q, m_io_strand.wrap(boost::bind(&EthGetworkClient::handle_resolve, this,
boost::asio::placeholders::error, boost::asio::placeholders::iterator)));
}
else
{
// No need to use the resolver if host is already an IP address
m_endpoints.push(boost::asio::ip::tcp::endpoint(
boost::asio::ip::address::from_string(m_conn->Host()), m_conn->Port()));
send(m_jsonGetWork);
}
}
void EthGetworkClient::disconnect()
{
// Release session
m_connected.store(false, memory_order_relaxed);
m_conn->addDuration(m_session->duration());
m_session = nullptr;
m_connecting.store(false, std::memory_order_relaxed);
m_txPending.store(false, std::memory_order_relaxed);
m_getwork_timer.cancel();
m_txQueue.consume_all([](std::string* l) { delete l; });
m_request.consume(m_request.capacity());
m_response.consume(m_response.capacity());
if (m_onDisconnected)
m_onDisconnected();
}
void EthGetworkClient::begin_connect()
{
if (!m_endpoints.empty())
{
// Pick the first endpoint in list.
// Eventually endpoints get discarded on connection errors
m_endpoint = m_endpoints.front();
m_socket.async_connect(
m_endpoint, m_io_strand.wrap(boost::bind(&EthGetworkClient::handle_connect, this, boost::placeholders::_1)));
}
else
{
cwarn << "No more IP addresses to try for host: " << m_conn->Host();
disconnect();
}
}
void EthGetworkClient::handle_connect(const boost::system::error_code& ec)
{
if (!ec && m_socket.is_open())
{
// If in "connecting" phase raise the proper event
if (m_connecting.load(std::memory_order_relaxed))
{
// Initialize new session
m_connected.store(true, memory_order_relaxed);
m_session = unique_ptr<Session>(new Session);
m_session->subscribed.store(true, memory_order_relaxed);
m_session->authorized.store(true, memory_order_relaxed);
m_connecting.store(false, std::memory_order_relaxed);
if (m_onConnected)
m_onConnected();
m_current_tstamp = std::chrono::steady_clock::now();
}
// Retrieve 1st line waiting in the queue and submit
// if other lines waiting they will be processed
// at the end of the processed request
Json::Reader jRdr;
std::string* line;
std::ostream os(&m_request);
if (!m_txQueue.empty())
{
while (m_txQueue.pop(line))
{
if (line->size())
{
jRdr.parse(*line, m_pendingJReq);
m_pending_tstamp = std::chrono::steady_clock::now();
// Make sure path begins with "/"
string _path = (m_conn->Path().empty() ? "/" : m_conn->Path());
os << "POST " << _path << " HTTP/1.0\r\n";
os << "Host: " << m_conn->Host() << "\r\n";
os << "Content-Type: application/json"
<< "\r\n";
os << "Content-Length: " << line->length() << "\r\n";
os << "Connection: close\r\n\r\n"; // Double line feed to mark the
// beginning of body
// The payload
os << *line;
// Out received message only for debug purpouses
if (g_logOptions & LOG_JSON)
cnote << " >> " << *line;
delete line;
async_write(m_socket, m_request,
m_io_strand.wrap(boost::bind(&EthGetworkClient::handle_write, this,
boost::asio::placeholders::error)));
break;
}
delete line;
}
}
else
{
m_txPending.store(false, std::memory_order_relaxed);
}
}
else
{
if (ec != boost::asio::error::operation_aborted)
{
// This endpoint does not respond
// Pop it and retry
cwarn << "Error connecting to " << m_conn->Host() << ":" << toString(m_conn->Port())
<< " : " << ec.message();
m_endpoints.pop();
begin_connect();
}
}
}
void EthGetworkClient::handle_write(const boost::system::error_code& ec)
{
if (!ec)
{
// Transmission succesfully sent.
// Read the response async.
async_read(m_socket, m_response, boost::asio::transfer_at_least(1),
m_io_strand.wrap(boost::bind(&EthGetworkClient::handle_read, this,
boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)));
}
else
{
if (ec != boost::asio::error::operation_aborted)
{
cwarn << "Error writing to " << m_conn->Host() << ":" << toString(m_conn->Port())
<< " : " << ec.message();
m_endpoints.pop();
begin_connect();
}
}
}
void EthGetworkClient::handle_read(
const boost::system::error_code& ec, std::size_t bytes_transferred)
{
if (!ec)
{
// Close socket
if (m_socket.is_open())
m_socket.close();
// Get the whole message
std::string rx_message(
boost::asio::buffer_cast<const char*>(m_response.data()), bytes_transferred);
m_response.consume(bytes_transferred);
// Empty response ?
if (!rx_message.size())
{
cwarn << "Invalid response from " << m_conn->Host() << ":" << toString(m_conn->Port());
disconnect();
return;
}
// Read message by lines.
// First line is http status
// Other lines are headers
// A double "\r\n" identifies begin of body
// The rest is body
std::string line;
std::string linedelimiter = "\r\n";
std::size_t delimiteroffset = rx_message.find(linedelimiter);
unsigned int linenum = 0;
bool isHeader = true;
while (rx_message.length() && delimiteroffset != std::string::npos)
{
linenum++;
line = rx_message.substr(0, delimiteroffset);
rx_message.erase(0, delimiteroffset + 2);
// This identifies the beginning of body
if (line.empty())
{
isHeader = false;
delimiteroffset = rx_message.find(linedelimiter);
if (delimiteroffset != std::string::npos)
continue;
boost::replace_all(rx_message, "\n", "");
line = rx_message;
}
// Http status
if (isHeader && linenum == 1)
{
if (line.substr(0, 7) != "HTTP/1.")
{
cwarn << "Invalid response from " << m_conn->Host() << ":"
<< toString(m_conn->Port());
disconnect();
return;
}
std::size_t spaceoffset = line.find(' ');
if (spaceoffset == std::string::npos)
{
cwarn << "Invalid response from " << m_conn->Host() << ":"
<< toString(m_conn->Port());
disconnect();
return;
}
std::string status = line.substr(spaceoffset + 1);
if (status.substr(0, 3) != "200")
{
cwarn << m_conn->Host() << ":" << toString(m_conn->Port())
<< " reported status " << status;
disconnect();
return;
}
}
// Body
if (!isHeader)
{
// Out received message only for debug purpouses
if (g_logOptions & LOG_JSON)
cnote << " << " << line;
// Test validity of chunk and process
Json::Value jRes;
Json::Reader jRdr;
if (jRdr.parse(line, jRes))
{
// Run in sync so no 2 different async reads may overlap
processResponse(jRes);
}
else
{
string what = jRdr.getFormattedErrorMessages();
boost::replace_all(what, "\n", " ");
cwarn << "Got invalid Json message : " << what;
}
}
delimiteroffset = rx_message.find(linedelimiter);
}
// Is there anything else in the queue
if (!m_txQueue.empty())
{
begin_connect();
}
else
{
// Signal end of async send/receive operations
m_txPending.store(false, std::memory_order_relaxed);
}
}
else
{
if (ec != boost::asio::error::operation_aborted)
{
cwarn << "Error reading from :" << m_conn->Host() << ":" << toString(m_conn->Port())
<< " : "
<< ec.message();
disconnect();
}
}
}
void EthGetworkClient::handle_resolve(
const boost::system::error_code& ec, tcp::resolver::iterator i)
{
if (!ec)
{
while (i != tcp::resolver::iterator())
{
m_endpoints.push(i->endpoint());
i++;
}
m_resolver.cancel();
// Resolver has finished so invoke connection asynchronously
send(m_jsonGetWork);
}
else
{
cwarn << "Could not resolve host " << m_conn->Host() << ", " << ec.message();
disconnect();
}
}
void EthGetworkClient::processResponse(Json::Value& JRes)
{
unsigned _id = 0; // This SHOULD be the same id as the request it is responding to
bool _isSuccess = false; // Whether or not this is a succesful or failed response
string _errReason = ""; // Content of the error reason
if (!JRes.isMember("id"))
{
cwarn << "Missing id member in response from " << m_conn->Host() << ":"
<< toString(m_conn->Port());
return;
}
// We get the id from pending jrequest
// It's not guaranteed we get response labelled with same id
// For instance Dwarfpool always responds with "id":0
_id = m_pendingJReq.get("id", unsigned(0)).asUInt();
_isSuccess = JRes.get("error", Json::Value::null).empty();
_errReason = (_isSuccess ? "" : processError(JRes));
// We have only theese possible ids
// 0 or 1 as job notification
// 9 as response for eth_submitHashrate
// 40+ for responses to mining submissions
if (_id == 0 || _id == 1)
{
// Getwork might respond with an error to
// a request. (eg. node is still syncing)
// In such case delay further requests
// by 30 seconds.
// Otherwise resubmit another getwork request
// with a delay of m_farmRecheckPeriod ms.
if (!_isSuccess)
{
cwarn << "Got " << _errReason << " from " << m_conn->Host() << ":"
<< toString(m_conn->Port());
m_getwork_timer.expires_from_now(boost::posix_time::seconds(30));
m_getwork_timer.async_wait(
m_io_strand.wrap(boost::bind(&EthGetworkClient::getwork_timer_elapsed, this,
boost::asio::placeholders::error)));
}
else
{
if (!JRes.isMember("result"))
{
cwarn << "Missing data for eth_getWork request from " << m_conn->Host() << ":"
<< toString(m_conn->Port());
}
else
{
Json::Value JPrm = JRes.get("result", Json::Value::null);
WorkPackage newWp;
newWp.header = h256(JPrm.get(Json::Value::ArrayIndex(0), "").asString());
newWp.seed = h256(JPrm.get(Json::Value::ArrayIndex(1), "").asString());
newWp.boundary = h256(JPrm.get(Json::Value::ArrayIndex(2), "").asString());
newWp.block = strtoul(JPrm.get(Json::Value::ArrayIndex(3), "").asString().c_str(), nullptr, 0);
newWp.job = newWp.header.hex();
if (m_current.header != newWp.header)
{
m_current = newWp;
m_current_tstamp = std::chrono::steady_clock::now();
if (m_onWorkReceived)
m_onWorkReceived(m_current);
}
m_getwork_timer.expires_from_now(boost::posix_time::milliseconds(m_farmRecheckPeriod));
m_getwork_timer.async_wait(
m_io_strand.wrap(boost::bind(&EthGetworkClient::getwork_timer_elapsed, this,
boost::asio::placeholders::error)));
}
}
}
else if (_id == 9)
{
// Response to hashrate submission
// Actually don't do anything
}
else if (_id >= 40 && _id <= m_solution_submitted_max_id)
{
if (_isSuccess && JRes["result"].isConvertibleTo(Json::ValueType::booleanValue))
_isSuccess = JRes["result"].asBool();
std::chrono::milliseconds _delay = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - m_pending_tstamp);
const unsigned miner_index = _id - 40;
if (_isSuccess)
{
if (m_onSolutionAccepted)
m_onSolutionAccepted(_delay, miner_index, false);
}
else
{
if (m_onSolutionRejected)
m_onSolutionRejected(_delay, miner_index);
}
}
}
std::string EthGetworkClient::processError(Json::Value& JRes)
{
std::string retVar;
if (JRes.isMember("error") &&
!JRes.get("error", Json::Value::null).isNull())
{
if (JRes["error"].isConvertibleTo(Json::ValueType::stringValue))
{
retVar = JRes.get("error", "Unknown error").asString();
}
else if (JRes["error"].isConvertibleTo(Json::ValueType::arrayValue))
{
for (auto i : JRes["error"])
{
retVar += i.asString() + " ";
}
}
else if (JRes["error"].isConvertibleTo(Json::ValueType::objectValue))
{
for (Json::Value::iterator i = JRes["error"].begin(); i != JRes["error"].end(); ++i)
{
Json::Value k = i.key();
Json::Value v = (*i);
retVar += (std::string)i.name() + ":" + v.asString() + " ";
}
}
}
else
{
retVar = "Unknown error";
}
return retVar;
}
void EthGetworkClient::send(Json::Value const& jReq)
{
send(std::string(Json::writeString(m_jSwBuilder, jReq)));
}
void EthGetworkClient::send(std::string const& sReq)
{
std::string* line = new std::string(sReq);
m_txQueue.push(line);
bool ex = false;
if (m_txPending.compare_exchange_weak(ex, true, std::memory_order_relaxed))
begin_connect();
}
void EthGetworkClient::submitHashrate(uint64_t const& rate, string const& id)
{
// No need to check for authorization
if (m_session)
{
Json::Value jReq;
jReq["id"] = unsigned(9);
jReq["jsonrpc"] = "2.0";
jReq["method"] = "eth_submitHashrate";
jReq["params"] = Json::Value(Json::arrayValue);
jReq["params"].append(toHex(rate, HexPrefix::Add)); // Already expressed as hex
jReq["params"].append(id); // Already prefixed by 0x
send(jReq);
}
}
void EthGetworkClient::submitSolution(const Solution& solution)
{
if (m_session)
{
Json::Value jReq;
string nonceHex = toHex(solution.nonce);
unsigned id = 40 + solution.midx;
jReq["id"] = id;
jReq["jsonrpc"] = "2.0";
m_solution_submitted_max_id = max(m_solution_submitted_max_id, id);
jReq["method"] = "eth_submitWork";
jReq["params"] = Json::Value(Json::arrayValue);
jReq["params"].append("0x" + nonceHex);
jReq["params"].append("0x" + solution.work.header.hex());
jReq["params"].append("0x" + solution.mixHash.hex());
send(jReq);
}
}
void EthGetworkClient::getwork_timer_elapsed(const boost::system::error_code& ec)
{
// Triggers the resubmission of a getWork request
if (!ec)
{
// Check if last work is older than timeout
std::chrono::seconds _delay = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now() - m_current_tstamp);
if (_delay.count() > m_worktimeout)
{
cwarn << "No new work received in " << m_worktimeout << " seconds.";
m_endpoints.pop();
disconnect();
}
else
{
send(m_jsonGetWork);
}
}
}

View File

@@ -0,0 +1,72 @@
#pragma once
#include <iostream>
#include <string>
#include <boost/asio.hpp>
#include <boost/algorithm/string/predicate.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/lockfree/queue.hpp>
#include <jsoncpp/json/json.h>
#include "../PoolClient.h"
using namespace std;
using namespace dev;
using namespace eth;
class EthGetworkClient : public PoolClient
{
public:
EthGetworkClient(int worktimeout, unsigned farmRecheckPeriod);
~EthGetworkClient();
void connect() override;
void disconnect() override;
void submitHashrate(uint64_t const& rate, string const& id) override;
void submitSolution(const Solution& solution) override;
private:
unsigned m_farmRecheckPeriod = 500; // In milliseconds
void begin_connect();
void handle_resolve(
const boost::system::error_code& ec, boost::asio::ip::tcp::resolver::iterator i);
void handle_connect(const boost::system::error_code& ec);
void handle_write(const boost::system::error_code& ec);
void handle_read(const boost::system::error_code& ec, std::size_t bytes_transferred);
std::string processError(Json::Value& JRes);
void processResponse(Json::Value& JRes);
void send(Json::Value const& jReq);
void send(std::string const& sReq);
void getwork_timer_elapsed(const boost::system::error_code& ec);
WorkPackage m_current;
std::atomic<bool> m_connecting = {false}; // Whether or not socket is on first try connect
std::atomic<bool> m_txPending = {false}; // Whether or not an async socket operation is pending
boost::lockfree::queue<std::string*> m_txQueue;
boost::asio::io_service::strand m_io_strand;
boost::asio::ip::tcp::socket m_socket;
boost::asio::ip::tcp::resolver m_resolver;
std::queue<boost::asio::ip::basic_endpoint<boost::asio::ip::tcp>> m_endpoints;
boost::asio::streambuf m_request;
boost::asio::streambuf m_response;
Json::StreamWriterBuilder m_jSwBuilder;
std::string m_jsonGetWork;
Json::Value m_pendingJReq;
std::chrono::time_point<std::chrono::steady_clock> m_pending_tstamp;
boost::asio::deadline_timer m_getwork_timer; // The timer which triggers getWork requests
// seconds to trigger a work_timeout (overwritten in constructor)
int m_worktimeout;
std::chrono::time_point<std::chrono::steady_clock> m_current_tstamp;
unsigned m_solution_submitted_max_id; // maximum json id we used to send a solution
};

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,162 @@
#pragma once
#include <iostream>
#include <boost/array.hpp>
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/bind.hpp>
#include <boost/lockfree/queue.hpp>
#include <jsoncpp/json/json.h>
#include <libdevcore/FixedHash.h>
#include <libdevcore/Log.h>
#include <libethcore/EthashAux.h>
#include <libethcore/Farm.h>
#include <libethcore/Miner.h>
#include "../PoolClient.h"
using namespace std;
using namespace dev;
using namespace dev::eth;
template <typename Verifier>
class verbose_verification
{
public:
verbose_verification(Verifier verifier) : verifier_(verifier) {}
bool operator()(bool preverified, boost::asio::ssl::verify_context& ctx)
{
char subject_name[256];
X509* cert = X509_STORE_CTX_get_current_cert(ctx.native_handle());
X509_NAME_oneline(X509_get_subject_name(cert), subject_name, 256);
bool verified = verifier_(preverified, ctx);
#ifdef DEV_BUILD
cnote << "Certificate: " << subject_name << " " << (verified ? "Ok" : "Failed");
#else
if (!verified)
cnote << "Certificate: " << subject_name << " "
<< "Failed";
#endif
return verified;
}
private:
Verifier verifier_;
};
class EthStratumClient : public PoolClient
{
public:
enum StratumProtocol
{
STRATUM = 0,
ETHPROXY,
ETHEREUMSTRATUM,
ETHEREUMSTRATUM2
};
EthStratumClient(int worktimeout, int responsetimeout);
void init_socket();
void connect() override;
void disconnect() override;
// Connected and Connection Statuses
bool isConnected() override
{
bool _ret = PoolClient::isConnected();
return _ret && !isPendingState();
}
bool isPendingState() override
{
return (m_connecting.load(std::memory_order_relaxed) ||
m_disconnecting.load(std::memory_order_relaxed));
}
void submitHashrate(uint64_t const& rate, string const& id) override;
void submitSolution(const Solution& solution) override;
h256 currentHeaderHash() { return m_current.header; }
bool current() { return static_cast<bool>(m_current); }
private:
void startSession();
void disconnect_finalize();
void enqueue_response_plea();
std::chrono::milliseconds dequeue_response_plea();
void clear_response_pleas();
void resolve_handler(
const boost::system::error_code& ec, boost::asio::ip::tcp::resolver::iterator i);
void start_connect();
void connect_handler(const boost::system::error_code& ec);
void workloop_timer_elapsed(const boost::system::error_code& ec);
void processResponse(Json::Value& responseObject);
std::string processError(Json::Value& erroresponseObject);
void processExtranonce(std::string& enonce);
void recvSocketData();
void onRecvSocketDataCompleted(
const boost::system::error_code& ec, std::size_t bytes_transferred);
void send(Json::Value const& jReq);
void sendSocketData();
void onSendSocketDataCompleted(const boost::system::error_code& ec);
void onSSLShutdownCompleted(const boost::system::error_code& ec);
std::atomic<bool> m_disconnecting = {false};
std::atomic<bool> m_connecting = {false};
std::atomic<bool> m_authpending = {false};
// seconds to trigger a work_timeout (overwritten in constructor)
int m_worktimeout;
// seconds timeout for responses and connection (overwritten in constructor)
int m_responsetimeout;
// default interval for workloop timer (milliseconds)
int m_workloop_interval = 1000;
WorkPackage m_current;
std::chrono::time_point<std::chrono::steady_clock> m_current_timestamp;
boost::asio::io_service& m_io_service; // The IO service reference passed in the constructor
boost::asio::io_service::strand m_io_strand;
boost::asio::ip::tcp::socket* m_socket;
std::string m_message; // The internal message string buffer
bool m_newjobprocessed = false;
// Use shared ptrs to avoid crashes due to async_writes
// see
// https://stackoverflow.com/questions/41526553/can-async-write-cause-segmentation-fault-when-this-is-deleted
std::shared_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>> m_securesocket;
std::shared_ptr<boost::asio::ip::tcp::socket> m_nonsecuresocket;
boost::asio::streambuf m_sendBuffer;
boost::asio::streambuf m_recvBuffer;
Json::StreamWriterBuilder m_jSwBuilder;
boost::asio::deadline_timer m_workloop_timer;
std::atomic<int> m_response_pleas_count = {0};
std::atomic<std::chrono::steady_clock::duration> m_response_plea_older;
boost::lockfree::queue<std::chrono::steady_clock::time_point> m_response_plea_times;
std::atomic<bool> m_txPending = {false};
boost::lockfree::queue<std::string*> m_txQueue;
boost::asio::ip::tcp::resolver m_resolver;
std::queue<boost::asio::ip::basic_endpoint<boost::asio::ip::tcp>> m_endpoints;
unsigned m_solution_submitted_max_id; // maximum json id we used to send a solution
///@brief Auxiliary function to make verbose_verification objects.
template <typename Verifier>
verbose_verification<Verifier> make_verbose_verification(Verifier verifier)
{
return verbose_verification<Verifier>(verifier);
}
};

View File

@@ -0,0 +1,103 @@
#include <libdevcore/Log.h>
#include <chrono>
#include "SimulateClient.h"
using namespace std;
using namespace std::chrono;
using namespace dev;
using namespace eth;
SimulateClient::SimulateClient(unsigned const& block, float const& difficulty)
: PoolClient(), Worker("sim"), m_block(block), m_difficulty(difficulty)
{
}
SimulateClient::~SimulateClient() = default;
void SimulateClient::connect()
{
// Initialize new session
m_connected.store(true, memory_order_relaxed);
m_session = unique_ptr<Session>(new Session);
m_session->subscribed.store(true, memory_order_relaxed);
m_session->authorized.store(true, memory_order_relaxed);
if (m_onConnected)
m_onConnected();
// No need to worry about starting again.
// Worker class prevents that
startWorking();
}
void SimulateClient::disconnect()
{
cnote << "Simulation results : " << EthWhiteBold << "Max "
<< dev::getFormattedHashes((double)hr_max, ScaleSuffix::Add, 6) << " Mean "
<< dev::getFormattedHashes((double)hr_mean, ScaleSuffix::Add, 6) << EthReset;
m_conn->addDuration(m_session->duration());
m_session = nullptr;
m_connected.store(false, memory_order_relaxed);
if (m_onDisconnected)
m_onDisconnected();
}
void SimulateClient::submitHashrate(uint64_t const& rate, string const& id)
{
(void)rate;
(void)id;
}
void SimulateClient::submitSolution(const Solution& solution)
{
// This is a fake submission only evaluated locally
std::chrono::steady_clock::time_point submit_start = std::chrono::steady_clock::now();
bool accepted =
EthashAux::eval(solution.work.epoch, solution.work.block, solution.work.header, solution.nonce).value <=
solution.work.boundary;
std::chrono::milliseconds response_delay_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - submit_start);
if (accepted)
{
if (m_onSolutionAccepted)
m_onSolutionAccepted(response_delay_ms, solution.midx, false);
}
else
{
if (m_onSolutionRejected)
m_onSolutionRejected(response_delay_ms, solution.midx);
}
}
// Handles all logic here
void SimulateClient::workLoop()
{
m_start_time = std::chrono::steady_clock::now();
// apply exponential sliding average
// ref: https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
WorkPackage current;
current.seed = h256::random(); // We don't actually need a real seed as the epoch
// is calculated upon block number (see poolmanager)
current.header = h256::random();
current.block = m_block;
current.boundary = h256(dev::getTargetFromDiff(m_difficulty));
m_onWorkReceived(current); // submit new fake job
cnote << "Using block " << m_block << ", difficulty " << m_difficulty;
while (m_session)
{
float hr = Farm::f().HashRate();
hr_max = std::max(hr_max, hr);
hr_mean = hr_alpha * hr_mean + (1.0f - hr_alpha) * hr;
this_thread::sleep_for(chrono::milliseconds(200));
}
}

View File

@@ -0,0 +1,41 @@
#pragma once
#include <iostream>
#include <libdevcore/Worker.h>
#include <libethcore/EthashAux.h>
#include <libethcore/Farm.h>
#include <libethcore/Miner.h>
#include "../PoolClient.h"
using namespace std;
using namespace dev;
using namespace eth;
class SimulateClient : public PoolClient, Worker
{
public:
SimulateClient(unsigned const& block, float const& difficulty);
~SimulateClient() override;
void connect() override;
void disconnect() override;
bool isPendingState() override { return false; }
string ActiveEndPoint() override { return ""; };
void submitHashrate(uint64_t const& rate, string const& id) override;
void submitSolution(const Solution& solution) override;
private:
void workLoop() override;
unsigned m_block;
float m_difficulty;
std::chrono::steady_clock::time_point m_start_time;
float hr_alpha = 0.45f;
float hr_max = 0.0f;
float hr_mean = 0.0f;
};