From 59b62dcb77429f476b7440b1010793687a1d9ce7 Mon Sep 17 00:00:00 2001 From: XMRig Date: Sun, 6 Oct 2019 07:47:41 +0700 Subject: [PATCH] Added class RxQueue, class Rx now thin static wrapper on top of RxQueue. --- cmake/randomx.cmake | 2 + src/core/Miner.cpp | 18 ++-- src/crypto/rx/Rx.cpp | 145 +++--------------------------- src/crypto/rx/Rx.h | 4 +- src/crypto/rx/RxQueue.cpp | 181 ++++++++++++++++++++++++++++++++++++++ src/crypto/rx/RxQueue.h | 108 +++++++++++++++++++++++ 6 files changed, 313 insertions(+), 145 deletions(-) create mode 100644 src/crypto/rx/RxQueue.cpp create mode 100644 src/crypto/rx/RxQueue.h diff --git a/cmake/randomx.cmake b/cmake/randomx.cmake index 27b9d9f6..d05ceb89 100644 --- a/cmake/randomx.cmake +++ b/cmake/randomx.cmake @@ -8,6 +8,7 @@ if (WITH_RANDOMX) src/crypto/rx/RxCache.h src/crypto/rx/RxConfig.h src/crypto/rx/RxDataset.h + src/crypto/rx/RxQueue.h src/crypto/rx/RxSeed.h src/crypto/rx/RxVm.h ) @@ -38,6 +39,7 @@ if (WITH_RANDOMX) src/crypto/rx/RxCache.cpp src/crypto/rx/RxConfig.cpp src/crypto/rx/RxDataset.cpp + src/crypto/rx/RxQueue.cpp src/crypto/rx/RxVm.cpp ) diff --git a/src/core/Miner.cpp b/src/core/Miner.cpp index cbba3de3..45a903ba 100644 --- a/src/core/Miner.cpp +++ b/src/core/Miner.cpp @@ -72,12 +72,8 @@ class MinerPrivate public: XMRIG_DISABLE_COPY_MOVE_DEFAULT(MinerPrivate) - inline MinerPrivate(Controller *controller) : controller(controller) - { -# ifdef XMRIG_ALGO_RANDOMX - Rx::init(); -# endif - } + + inline MinerPrivate(Controller *controller) : controller(controller) {} inline ~MinerPrivate() @@ -232,9 +228,9 @@ public: # ifdef XMRIG_ALGO_RANDOMX - bool initRX(IRxListener *listener) + inline bool initRX() { - return Rx::init(job, controller->config()->rx(), controller->config()->cpu().isHugePages(), listener); + return Rx::init(job, controller->config()->rx(), controller->config()->cpu().isHugePages()); } # endif @@ -261,6 +257,10 @@ public: xmrig::Miner::Miner(Controller *controller) : d_ptr(new MinerPrivate(controller)) { +# ifdef XMRIG_ALGO_RANDOMX + Rx::init(this); +# endif + controller->addListener(this); # ifdef XMRIG_FEATURE_API @@ -402,7 +402,7 @@ void xmrig::Miner::setJob(const Job &job, bool donate) } # ifdef XMRIG_ALGO_RANDOMX - const bool ready = d_ptr->initRX(this); + const bool ready = d_ptr->initRX(); # else constexpr const bool ready = true; # endif diff --git a/src/crypto/rx/Rx.cpp b/src/crypto/rx/Rx.cpp index 0642b3cb..115c0f62 100644 --- a/src/crypto/rx/Rx.cpp +++ b/src/crypto/rx/Rx.cpp @@ -26,36 +26,10 @@ #include "crypto/rx/Rx.h" - -#include "backend/common/interfaces/IRxListener.h" -#include "backend/common/interfaces/IRxStorage.h" #include "backend/common/Tags.h" -#include "backend/cpu/Cpu.h" #include "base/io/log/Log.h" -#include "base/kernel/Platform.h" -#include "base/net/stratum/Job.h" -#include "base/tools/Buffer.h" -#include "base/tools/Chrono.h" -#include "base/tools/Handle.h" -#include "base/tools/Object.h" -#include "crypto/rx/RxAlgo.h" -#include "crypto/rx/RxBasicStorage.h" -#include "crypto/rx/RxCache.h" #include "crypto/rx/RxConfig.h" -#include "crypto/rx/RxDataset.h" -#include "crypto/rx/RxSeed.h" - - -#ifdef XMRIG_FEATURE_HWLOC -# include "crypto/rx/RxNUMAStorage.h" -#endif - - -#include -#include -#include -#include -#include +#include "crypto/rx/RxQueue.h" namespace xmrig { @@ -66,104 +40,14 @@ class RxPrivate; static const char *tag = BLUE_BG(WHITE_BOLD_S " rx ") " "; static RxPrivate *d_ptr = nullptr; -static std::mutex mutex; class RxPrivate { public: - XMRIG_DISABLE_COPY_MOVE(RxPrivate) + inline RxPrivate(IRxListener *listener) : queue(listener) {} - inline RxPrivate() : - m_pending(0) - { - m_async = new uv_async_t; - m_async->data = this; - - uv_async_init(uv_default_loop(), m_async, [](uv_async_t *handle) { static_cast(handle->data)->onReady(); }); - } - - - inline ~RxPrivate() - { - m_pending = std::numeric_limits::max(); - - std::lock_guard lock(mutex); - Handle::close(m_async); - - delete m_storage; - } - - - inline bool isReady(const Job &job) const { return pending() == 0 && m_seed == job; } - inline RxDataset *dataset(const Job &job, uint32_t nodeId) { return m_storage ? m_storage->dataset(job, nodeId) : nullptr; } - inline std::pair hugePages() { return m_storage ? m_storage->hugePages() : std::pair(0u, 0u); } - inline uint64_t pending() const { return m_pending.load(std::memory_order_relaxed); } - inline void asyncSend() { --m_pending; if (pending() == 0) { uv_async_send(m_async); } } - - - inline IRxStorage *storage(const std::vector &nodeset) - { - if (!m_storage) { -# ifdef XMRIG_FEATURE_HWLOC - if (!nodeset.empty()) { - m_storage = new RxNUMAStorage(nodeset); - } - else -# endif - { - m_storage = new RxBasicStorage(); - } - } - - return m_storage; - } - - - static void initDataset(const RxSeed &seed, const std::vector &nodeset, uint32_t threads, bool hugePages) - { - std::lock_guard lock(mutex); - - if (d_ptr->pending() > std::numeric_limits::max()) { - return; - } - - LOG_INFO("%s" MAGENTA_BOLD("init dataset%s") " algo " WHITE_BOLD("%s (") CYAN_BOLD("%u") WHITE_BOLD(" threads)") BLACK_BOLD(" seed %s..."), - tag, - nodeset.size() > 1 ? "s" : "", - seed.algorithm().shortName(), - threads, - Buffer::toHex(seed.data().data(), 8).data() - ); - - d_ptr->storage(nodeset)->init(seed, threads, hugePages); - d_ptr->asyncSend(); - } - - - inline void setState(const Job &job, IRxListener *listener) - { - m_listener = listener; - m_seed = job; - - ++m_pending; - } - - -private: - inline void onReady() - { - if (m_listener && pending() == 0) { - m_listener->onDatasetReady(); - } - } - - - IRxListener *m_listener = nullptr; - IRxStorage *m_storage = nullptr; - RxSeed m_seed; - std::atomic m_pending; - uv_async_t *m_async = nullptr; + RxQueue queue; }; @@ -176,20 +60,17 @@ const char *xmrig::rx_tag() } -bool xmrig::Rx::init(const Job &job, const RxConfig &config, bool hugePages, IRxListener *listener) +bool xmrig::Rx::init(const Job &job, const RxConfig &config, bool hugePages) { if (job.algorithm().family() != Algorithm::RANDOM_X) { return true; } - if (d_ptr->isReady(job)) { + if (isReady(job)) { return true; } - d_ptr->setState(job, listener); - - std::thread thread(RxPrivate::initDataset, job, config.nodeset(), config.threads(), hugePages); - thread.detach(); + d_ptr->queue.enqueue(job, config.nodeset(), config.threads(), hugePages); return false; } @@ -197,23 +78,19 @@ bool xmrig::Rx::init(const Job &job, const RxConfig &config, bool hugePages, IRx bool xmrig::Rx::isReady(const Job &job) { - return d_ptr->isReady(job); + return d_ptr->queue.isReady(job); } xmrig::RxDataset *xmrig::Rx::dataset(const Job &job, uint32_t nodeId) { - std::lock_guard lock(mutex); - - return d_ptr->dataset(job, nodeId); + return d_ptr->queue.dataset(job, nodeId); } std::pair xmrig::Rx::hugePages() { - std::lock_guard lock(mutex); - - return d_ptr->hugePages(); + return d_ptr->queue.hugePages(); } @@ -225,7 +102,7 @@ void xmrig::Rx::destroy() } -void xmrig::Rx::init() +void xmrig::Rx::init(IRxListener *listener) { - d_ptr = new RxPrivate(); + d_ptr = new RxPrivate(listener); } diff --git a/src/crypto/rx/Rx.h b/src/crypto/rx/Rx.h index df319413..4a81f5d5 100644 --- a/src/crypto/rx/Rx.h +++ b/src/crypto/rx/Rx.h @@ -46,12 +46,12 @@ class RxDataset; class Rx { public: - static bool init(const Job &job, const RxConfig &config, bool hugePages, IRxListener *listener); + static bool init(const Job &job, const RxConfig &config, bool hugePages); static bool isReady(const Job &job); static RxDataset *dataset(const Job &job, uint32_t nodeId); static std::pair hugePages(); static void destroy(); - static void init(); + static void init(IRxListener *listener); }; diff --git a/src/crypto/rx/RxQueue.cpp b/src/crypto/rx/RxQueue.cpp new file mode 100644 index 00000000..48a1f979 --- /dev/null +++ b/src/crypto/rx/RxQueue.cpp @@ -0,0 +1,181 @@ +/* XMRig + * Copyright 2010 Jeff Garzik + * Copyright 2012-2014 pooler + * Copyright 2014 Lucas Jones + * Copyright 2014-2016 Wolf9466 + * Copyright 2016 Jay D Dee + * Copyright 2017-2019 XMR-Stak , + * Copyright 2018 Lee Clagett + * Copyright 2018-2019 tevador + * Copyright 2018-2019 SChernykh + * Copyright 2016-2019 XMRig , + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + + +#include "crypto/rx/RxQueue.h" +#include "backend/common/Tags.h" +#include "base/io/log/Log.h" +#include "crypto/rx/RxBasicStorage.h" +#include "base/tools/Handle.h" +#include "backend/common/interfaces/IRxListener.h" + + +#ifdef XMRIG_FEATURE_HWLOC +# include "crypto/rx/RxNUMAStorage.h" +#endif + + +xmrig::RxQueue::RxQueue(IRxListener *listener) : + m_listener(listener) +{ + m_async = new uv_async_t; + m_async->data = this; + + uv_async_init(uv_default_loop(), m_async, [](uv_async_t *handle) { static_cast(handle->data)->onReady(); }); + + m_thread = std::move(std::thread(&RxQueue::backgroundInit, this)); +} + + +xmrig::RxQueue::~RxQueue() +{ + std::unique_lock lock(m_mutex); + m_state = STATE_SHUTDOWN; + lock.unlock(); + + m_cv.notify_one(); + + m_thread.join(); + + delete m_storage; + + Handle::close(m_async); +} + + +bool xmrig::RxQueue::isReady(const Job &job) +{ + std::lock_guard lock(m_mutex); + + return isReadyUnsafe(job); +} + + +xmrig::RxDataset *xmrig::RxQueue::dataset(const Job &job, uint32_t nodeId) +{ + std::lock_guard lock(m_mutex); + + if (isReadyUnsafe(job)) { + return m_storage->dataset(job, nodeId); + } + + return nullptr; +} + + +std::pair xmrig::RxQueue::hugePages() +{ + std::lock_guard lock(m_mutex); + + return m_storage && m_state == STATE_IDLE ? m_storage->hugePages() : std::pair(0u, 0u); +} + + +void xmrig::RxQueue::enqueue(const RxSeed &seed, const std::vector &nodeset, uint32_t threads, bool hugePages) +{ + std::unique_lock lock(m_mutex); + + if (!m_storage) { +# ifdef XMRIG_FEATURE_HWLOC + if (!nodeset.empty()) { + m_storage = new RxNUMAStorage(nodeset); + } + else +# endif + { + m_storage = new RxBasicStorage(); + } + } + + if (m_state == STATE_PENDING && m_seed == seed) { + return; + } + + m_queue.emplace_back(seed, nodeset, threads, hugePages); + m_seed = seed; + m_state = STATE_PENDING; + + lock.unlock(); + + m_cv.notify_one(); +} + + +bool xmrig::RxQueue::isReadyUnsafe(const Job &job) const +{ + return m_storage != nullptr && m_state == STATE_IDLE && m_seed == job; +} + + +void xmrig::RxQueue::backgroundInit() +{ + while (true) { + std::unique_lock lock(m_mutex); + + if (m_state == STATE_IDLE) { + m_cv.wait(lock, [this]{ return m_state != STATE_IDLE; }); + } + + if (m_state == STATE_SHUTDOWN) { + break; + } + + const auto item = m_queue.back(); + m_queue.clear(); + + lock.unlock(); + + LOG_INFO("%s" MAGENTA_BOLD("init dataset%s") " algo " WHITE_BOLD("%s (") CYAN_BOLD("%u") WHITE_BOLD(" threads)") BLACK_BOLD(" seed %s..."), + rx_tag(), + item.nodeset.size() > 1 ? "s" : "", + item.seed.algorithm().shortName(), + item.threads, + Buffer::toHex(item.seed.data().data(), 8).data() + ); + + m_storage->init(item.seed, item.threads, item.hugePages); + + lock = std::move(std::unique_lock(m_mutex)); + if (!m_queue.empty()) { + continue; + } + + m_state = STATE_IDLE; + uv_async_send(m_async); + } +} + + +void xmrig::RxQueue::onReady() +{ + std::unique_lock lock(m_mutex); + const bool ready = m_listener && m_state == STATE_IDLE; + lock.unlock(); + + if (ready) { + m_listener->onDatasetReady(); + } +} diff --git a/src/crypto/rx/RxQueue.h b/src/crypto/rx/RxQueue.h new file mode 100644 index 00000000..28407a87 --- /dev/null +++ b/src/crypto/rx/RxQueue.h @@ -0,0 +1,108 @@ +/* XMRig + * Copyright 2010 Jeff Garzik + * Copyright 2012-2014 pooler + * Copyright 2014 Lucas Jones + * Copyright 2014-2016 Wolf9466 + * Copyright 2016 Jay D Dee + * Copyright 2017-2019 XMR-Stak , + * Copyright 2018 Lee Clagett + * Copyright 2018-2019 tevador + * Copyright 2018-2019 SChernykh + * Copyright 2016-2019 XMRig , + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#ifndef XMRIG_RX_QUEUE_H +#define XMRIG_RX_QUEUE_H + + +#include "base/tools/Object.h" +#include "crypto/rx/RxSeed.h" + + +#include +#include +#include + + +using uv_async_t = struct uv_async_s; + + +namespace xmrig +{ + + +class IRxListener; +class IRxStorage; +class RxDataset; + + +class RxQueueItem +{ +public: + RxQueueItem(const RxSeed &seed, const std::vector &nodeset, uint32_t threads, bool hugePages) : + hugePages(hugePages), + seed(seed), + nodeset(nodeset), + threads(threads) + {} + + const bool hugePages; + const RxSeed seed; + const std::vector nodeset; + const uint32_t threads; +}; + + +class RxQueue +{ +public: + XMRIG_DISABLE_COPY_MOVE(RxQueue); + + RxQueue(IRxListener *listener); + ~RxQueue(); + + bool isReady(const Job &job); + RxDataset *dataset(const Job &job, uint32_t nodeId); + std::pair hugePages(); + void enqueue(const RxSeed &seed, const std::vector &nodeset, uint32_t threads, bool hugePages); + +private: + enum State { + STATE_IDLE, + STATE_PENDING, + STATE_SHUTDOWN + }; + + bool isReadyUnsafe(const Job &job) const; + void backgroundInit(); + void onReady(); + + IRxListener *m_listener = nullptr; + IRxStorage *m_storage = nullptr; + RxSeed m_seed; + State m_state = STATE_IDLE; + std::condition_variable m_cv; + std::mutex m_mutex; + std::thread m_thread; + std::vector m_queue; + uv_async_t *m_async = nullptr; +}; + + +} /* namespace xmrig */ + + +#endif /* XMRIG_RX_QUEUE_H */