diy 2023-03-28 (6837fb55)
Code extracted from: https://gitlab.kitware.com/third-party/diy2.git at commit 6837fb55f24a9a38dfb2b6a481cc4de5f7ac455d (for/vtk-m-20230328-g9bea15a1).
This commit is contained in:
parent
a0083af63f
commit
928900c63d
@ -33,6 +33,13 @@ macro (diy_dependent_option variable)
|
||||
endif ()
|
||||
endmacro ()
|
||||
|
||||
set (compiler_supports_sanitizers OFF)
|
||||
if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang" OR
|
||||
CMAKE_CXX_COMPILER_ID STREQUAL "AppleClang" OR
|
||||
CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
|
||||
set (compiler_supports_sanitizers ON)
|
||||
endif ()
|
||||
|
||||
diy_option (threads "Build DIY with threading" ON)
|
||||
diy_option (log "Build DIY with logging" OFF)
|
||||
diy_option (profile "Build DIY with profiling" OFF)
|
||||
@ -44,6 +51,8 @@ diy_dependent_option (BUILD_SHARED_LIBS "Create shared libraries if on"
|
||||
diy_dependent_option (build_diy_nompi_lib "Also build the nompi version of diy::mpi" OFF "mpi;build_diy_mpi_lib" OFF)
|
||||
diy_option (build_examples "Build DIY examples" ON)
|
||||
diy_option (build_tests "Build DIY tests" ON)
|
||||
diy_option (python "Build Python bindings" OFF)
|
||||
cmake_dependent_option (enable_sanitizers "Build DIY with sanitizer support" OFF "compiler_supports_sanitizers" OFF)
|
||||
|
||||
# Default to Release
|
||||
if (NOT CMAKE_BUILD_TYPE)
|
||||
@ -64,9 +73,8 @@ endif ()
|
||||
|
||||
# Logging
|
||||
if (log)
|
||||
list (APPEND diy_definitions "-DVTKMDIY_USE_SPDLOG")
|
||||
find_path (SPDLOG_INCLUDE_DIR spdlog/spdlog.h)
|
||||
list (APPEND diy_include_thirdparty_directories $<BUILD_INTERFACE:${SPDLOG_INCLUDE_DIR}>)
|
||||
list (APPEND diy_definitions "-DVTMDIY_USE_SPDLOG")
|
||||
find_package (spdlog REQUIRED)
|
||||
endif()
|
||||
|
||||
# Profiling
|
||||
@ -114,8 +122,12 @@ if (NOT DEFINED diy_export_name)
|
||||
set(diy_export_name "diy_targets")
|
||||
endif()
|
||||
|
||||
set (CMAKE_ARCHIVE_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}/lib")
|
||||
set (CMAKE_LIBRARY_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}/lib")
|
||||
if (NOT DEFINED CMAKE_ARCHIVE_OUTPUT_DIRECTORY)
|
||||
set (CMAKE_ARCHIVE_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}/lib")
|
||||
endif()
|
||||
if (NOT DEFINED CMAKE_LIBRARY_OUTPUT_DIRECTORY)
|
||||
set (CMAKE_LIBRARY_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}/lib")
|
||||
endif()
|
||||
|
||||
# for diy_developer_flags
|
||||
include(DIYCompilerFlags)
|
||||
@ -152,6 +164,9 @@ function(add_diy_mpi_library use_mpi)
|
||||
target_include_directories(${lib_name} SYSTEM PRIVATE ${diy_include_directories}) # for mpitypes.hpp
|
||||
target_include_directories(${lib_name} SYSTEM PRIVATE ${diy_include_thirdparty_directories})
|
||||
target_link_libraries(${lib_name} PRIVATE diy_developer_flags)
|
||||
if (log)
|
||||
target_link_libraries(${lib_name} PUBLIC spdlog::spdlog_header_only)
|
||||
endif ()
|
||||
if (use_mpi AND TARGET MPI::MPI_CXX)
|
||||
target_link_libraries(${lib_name} PRIVATE MPI::MPI_CXX)
|
||||
endif()
|
||||
@ -195,6 +210,9 @@ target_include_directories(${diy_prefix} SYSTEM INTERFACE ${diy_include_thirdpar
|
||||
if (diy_include_directories)
|
||||
target_include_directories(${diy_prefix} SYSTEM INTERFACE ${diy_include_directories})
|
||||
endif()
|
||||
if (log)
|
||||
target_link_libraries(${diy_prefix} INTERFACE spdlog::spdlog_header_only)
|
||||
endif ()
|
||||
target_link_libraries(${diy_prefix} INTERFACE ${diy_libraries})
|
||||
if (NOT build_diy_mpi_lib)
|
||||
if (mpi)
|
||||
@ -224,6 +242,16 @@ elseif (${diy_prefix}mpi_nompi IN_LIST diy_targets)
|
||||
endif()
|
||||
list(APPEND libraries diy_developer_flags)
|
||||
|
||||
# Sanitizers
|
||||
if (enable_sanitizers)
|
||||
set(sanitizer "address" CACHE STRING "The sanitizer to use")
|
||||
|
||||
string (APPEND CMAKE_CXX_FLAGS " -fsanitize=${sanitizer}")
|
||||
string (APPEND CMAKE_C_FLAGS " -fsanitize=${sanitizer}")
|
||||
string (APPEND CMAKE_EXE_LINKER_FLAGS " -fsanitize=${sanitizer}")
|
||||
string (APPEND CMAKE_SHARED_LINKER_FLAGS " -fsanitize=${sanitizer}")
|
||||
endif ()
|
||||
|
||||
# enable testing and CDash dashboard submission
|
||||
enable_testing ()
|
||||
include (CTest)
|
||||
@ -262,3 +290,7 @@ if (CMAKE_SOURCE_DIR STREQUAL PROJECT_SOURCE_DIR) # Only generate these files wh
|
||||
install(EXPORT ${diy_export_name} NAMESPACE DIY:: DESTINATION "." FILE diy-targets.cmake)
|
||||
install(FILES "${PROJECT_BINARY_DIR}/diy-config.cmake" DESTINATION ".")
|
||||
endif()
|
||||
|
||||
if (python)
|
||||
add_subdirectory(bindings/python)
|
||||
endif (python)
|
||||
|
@ -25,12 +25,10 @@ if (threads)
|
||||
endif()
|
||||
|
||||
if (log)
|
||||
find_path(SPDLOG_INCLUDE_DIR "spdlog/spdlog.h")
|
||||
if (SPDLOG_INCLUDE_DIR STREQUAL "SPDLOG_INCLUDE_DIR-NOTFOUND")
|
||||
find_package(spdlog ${_diy_find_quietly})
|
||||
if (NOT spdlog_FOUND)
|
||||
list(APPEND "${CMAKE_FIND_PACKAGE_NAME}_NOT_FOUND_MESSAGE" "SPDLOG not found")
|
||||
set("${CMAKE_FIND_PACKAGE_NAME}_FOUND" 0)
|
||||
else()
|
||||
target_include_directories(DIY::@diy_prefix@ INTERFACE $<INSTALL_INTERFACE:${SPDLOG_INCLUDE_DIR}>)
|
||||
endif()
|
||||
endif()
|
||||
|
||||
|
@ -17,10 +17,10 @@ namespace diy
|
||||
typedef std::vector<Element> Elements;
|
||||
typedef critical_resource<int, recursive_mutex> CInt;
|
||||
|
||||
typedef void* (*Create)();
|
||||
typedef void (*Destroy)(void*);
|
||||
typedef detail::Save Save;
|
||||
typedef detail::Load Load;
|
||||
using Create = std::function<void*()>;
|
||||
using Destroy = std::function<void(void*)>;
|
||||
using Save = detail::Save;
|
||||
using Load = detail::Load;
|
||||
|
||||
public:
|
||||
Collection(Create create__,
|
||||
|
@ -5,11 +5,13 @@ namespace diy
|
||||
int from, to;
|
||||
int nparts;
|
||||
int round;
|
||||
int nblobs;
|
||||
};
|
||||
|
||||
struct Master::InFlightSend
|
||||
{
|
||||
std::shared_ptr<MemoryBuffer> message;
|
||||
BinaryBlob blob;
|
||||
mpi::request request;
|
||||
|
||||
MessageInfo info; // for debug purposes
|
||||
@ -18,12 +20,18 @@ namespace diy
|
||||
struct Master::InFlightRecv
|
||||
{
|
||||
MemoryBuffer message;
|
||||
MessageInfo info { -1, -1, -1, -1 };
|
||||
MessageInfo info { -1, -1, -1, -1, -1 };
|
||||
bool done = false;
|
||||
MemoryManagement mem;
|
||||
|
||||
inline bool recv(mpi::communicator& comm, const mpi::status& status);
|
||||
inline void place(IncomingRound* in, bool unload, ExternalStorage* storage, IExchangeInfo* iexchange);
|
||||
void reset() { *this = InFlightRecv(); }
|
||||
void reset()
|
||||
{
|
||||
MemoryManagement mem_ = mem;
|
||||
*this = InFlightRecv();
|
||||
mem = mem_;
|
||||
}
|
||||
};
|
||||
|
||||
struct Master::InFlightRecvsMap: public std::map<int, InFlightRecv>
|
||||
@ -111,7 +119,7 @@ recv(mpi::communicator& comm, const mpi::status& status)
|
||||
|
||||
result = true;
|
||||
}
|
||||
else
|
||||
else if (info.nparts > 0)
|
||||
{
|
||||
size_t start_idx = message.buffer.size();
|
||||
size_t count = status.count<char>();
|
||||
@ -124,9 +132,24 @@ recv(mpi::communicator& comm, const mpi::status& status)
|
||||
comm.recv(status.source(), status.tag(), window);
|
||||
|
||||
info.nparts--;
|
||||
} else if (info.nblobs > 0)
|
||||
{
|
||||
size_t count = status.count<char>();
|
||||
detail::VectorWindow<char> window;
|
||||
|
||||
char* buffer = mem.allocate(info.to, count);
|
||||
|
||||
window.begin = buffer;
|
||||
window.count = count;
|
||||
|
||||
comm.recv(status.source(), status.tag(), window);
|
||||
|
||||
message.save_binary_blob(buffer, count, mem.deallocate);
|
||||
|
||||
info.nblobs--;
|
||||
}
|
||||
|
||||
if (info.nparts == 0)
|
||||
if (info.nparts == 0 && info.nblobs == 0)
|
||||
done = true;
|
||||
|
||||
return result;
|
||||
|
@ -1,3 +1,5 @@
|
||||
#include <algorithm>
|
||||
|
||||
struct diy::Master::ProcessBlock
|
||||
{
|
||||
ProcessBlock(Master& master_,
|
||||
|
@ -7,17 +7,17 @@
|
||||
#include <algorithm>
|
||||
|
||||
#include "constants.h"
|
||||
#include "thirdparty/chobo/small_vector.hpp"
|
||||
#include "thirdparty/itlib/small_vector.hpp"
|
||||
|
||||
namespace diy
|
||||
{
|
||||
|
||||
template<class Coordinate_, size_t static_size = VTKMDIY_MAX_DIM>
|
||||
class DynamicPoint: public chobo::small_vector<Coordinate_, static_size>
|
||||
class DynamicPoint: public itlib::small_vector<Coordinate_, static_size>
|
||||
{
|
||||
public:
|
||||
using Coordinate = Coordinate_;
|
||||
using Parent = chobo::small_vector<Coordinate_, static_size>;
|
||||
using Parent = itlib::small_vector<Coordinate_, static_size>;
|
||||
|
||||
template<class U>
|
||||
struct rebind { typedef DynamicPoint<U> type; };
|
||||
|
@ -30,9 +30,9 @@ class SharedOutFile: public std::ostringstream
|
||||
diy::mpi::gather(world_, contents, all_contents, root_);
|
||||
|
||||
// write the file serially
|
||||
std::ofstream out(filename_);
|
||||
std::ofstream fout(filename_);
|
||||
for (auto& cntnts : all_contents)
|
||||
out.write(cntnts.data(), cntnts.size());
|
||||
fout.write(cntnts.data(), cntnts.size());
|
||||
} else
|
||||
diy::mpi::gather(world_, contents, root_);
|
||||
}
|
||||
|
@ -5,6 +5,8 @@
|
||||
#include <direct.h>
|
||||
#include <io.h>
|
||||
#include <share.h>
|
||||
#define NOMINMAX
|
||||
#include <windows.h>
|
||||
#else
|
||||
#include <unistd.h> // mkstemp() on Mac
|
||||
#include <dirent.h>
|
||||
|
@ -55,8 +55,8 @@ set_logger(Args...)
|
||||
#include <spdlog/sinks/null_sink.h>
|
||||
#include <spdlog/sinks/stdout_sinks.h>
|
||||
|
||||
#include <spdlog/fmt/bundled/format.h>
|
||||
#include <spdlog/fmt/bundled/ostream.h>
|
||||
#include <spdlog/fmt/fmt.h>
|
||||
#include <spdlog/fmt/ostr.h>
|
||||
|
||||
namespace diy
|
||||
{
|
||||
|
@ -10,6 +10,7 @@
|
||||
#include <numeric>
|
||||
#include <memory>
|
||||
#include <chrono>
|
||||
#include <climits>
|
||||
|
||||
#include "link.hpp"
|
||||
#include "collection.hpp"
|
||||
@ -28,6 +29,23 @@
|
||||
|
||||
namespace diy
|
||||
{
|
||||
|
||||
struct MemoryManagement
|
||||
{
|
||||
using Allocate = std::function<char* (int, size_t)>;
|
||||
using Deallocate = BinaryBlob::Deleter;
|
||||
using MemCopy = std::function<void(char*, const char*, size_t)>;
|
||||
|
||||
MemoryManagement() = default;
|
||||
MemoryManagement(Allocate allocate_, Deallocate deallocate_, MemCopy copy_):
|
||||
allocate(allocate_), deallocate(deallocate_), copy(copy_) {}
|
||||
|
||||
Allocate allocate = [](int /* gid */, size_t n) { return new char[n]; };
|
||||
Deallocate deallocate = [](const char* p) { delete[] p; };
|
||||
MemCopy copy = [](char* dest, const char* src, size_t count) { std::memcpy(dest, src, count); };
|
||||
};
|
||||
|
||||
|
||||
// Stores and manages blocks; initiates serialization and communication when necessary.
|
||||
//
|
||||
// Provides a foreach function, which is meant as the main entry point.
|
||||
@ -126,6 +144,8 @@ namespace diy
|
||||
void unload(ExternalStorage* storage) { size_ = buffer_.size(); external_ = storage->put(buffer_); }
|
||||
void load(ExternalStorage* storage) { storage->get(external_, buffer_); external_ = -1; }
|
||||
|
||||
MemoryBuffer& buffer() { return buffer_; }
|
||||
|
||||
private:
|
||||
size_t size_;
|
||||
int external_;
|
||||
@ -147,7 +167,6 @@ namespace diy
|
||||
};
|
||||
typedef std::map<int, IncomingRound> IncomingRoundMap;
|
||||
|
||||
|
||||
public:
|
||||
/**
|
||||
* \ingroup Initialization
|
||||
@ -173,6 +192,7 @@ namespace diy
|
||||
inline void destroy(int i) { if (blocks_.own()) blocks_.destroy(i); }
|
||||
|
||||
inline int add(int gid, void* b, Link* l); //!< add a block
|
||||
inline int add(int gid, void* b, const Link& l){ return add(gid, b, l.clone()); }
|
||||
inline void* release(int i); //!< release ownership of the block
|
||||
|
||||
//!< return the `i`-th block
|
||||
@ -213,17 +233,17 @@ namespace diy
|
||||
bool local(int gid__) const { return lids_.find(gid__) != lids_.end(); }
|
||||
|
||||
//! exchange the queues between all the blocks (collective operation)
|
||||
inline void exchange(bool remote = false);
|
||||
inline void exchange(bool remote = false, MemoryManagement mem = MemoryManagement());
|
||||
|
||||
//! nonblocking exchange of the queues between all the blocks
|
||||
template<class Block>
|
||||
void iexchange_(const ICallback<Block>& f);
|
||||
void iexchange_(const ICallback<Block>& f, MemoryManagement mem);
|
||||
|
||||
template<class F>
|
||||
void iexchange(const F& f)
|
||||
void iexchange(const F& f, MemoryManagement mem = MemoryManagement())
|
||||
{
|
||||
using Block = typename detail::block_traits<F>::type;
|
||||
iexchange_<Block>(f);
|
||||
iexchange_<Block>(f, mem);
|
||||
}
|
||||
|
||||
inline void process_collectives();
|
||||
@ -283,29 +303,30 @@ namespace diy
|
||||
|
||||
public:
|
||||
// Communicator functionality
|
||||
inline void flush(bool remote = false); // makes sure all the serialized queues migrate to their target processors
|
||||
inline void flush(bool remote, MemoryManagement mem = MemoryManagement()); // makes sure all the serialized queues migrate to their target processors
|
||||
|
||||
private:
|
||||
// Communicator functionality
|
||||
inline void comm_exchange(GidSendOrder& gid_order, IExchangeInfo* iex = 0);
|
||||
inline void rcomm_exchange(); // possibly called in between block computations
|
||||
inline void comm_exchange(GidSendOrder& gid_order, MemoryManagement mem, IExchangeInfo* iex = 0);
|
||||
inline void rcomm_exchange(MemoryManagement mem); // possibly called in between block computations
|
||||
inline bool nudge(IExchangeInfo* iex = 0);
|
||||
inline void send_queue(int from_gid, int to_gid, int to_proc, QueueRecord& qr, bool remote, IExchangeInfo* iex);
|
||||
inline void send_queue(int from_gid, int to_gid, int to_proc, QueueRecord& qr, bool remote, MemoryManagement mem, IExchangeInfo* iex);
|
||||
inline void send_outgoing_queues(GidSendOrder& gid_order,
|
||||
bool remote,
|
||||
MemoryManagement mem,
|
||||
IExchangeInfo* iex = 0);
|
||||
inline void check_incoming_queues(IExchangeInfo* iex = 0);
|
||||
inline void check_incoming_queues(MemoryManagement mem, IExchangeInfo* iex = 0);
|
||||
inline GidSendOrder
|
||||
order_gids();
|
||||
inline void touch_queues();
|
||||
inline void send_same_rank(int from, int to, QueueRecord& qr, IExchangeInfo* iex);
|
||||
inline void send_same_rank(int from, int to, QueueRecord& qr, MemoryManagement mem, IExchangeInfo* iex);
|
||||
inline void send_different_rank(int from, int to, int proc, QueueRecord& qr, bool remote, IExchangeInfo* iex);
|
||||
|
||||
inline InFlightRecv& inflight_recv(int proc);
|
||||
inline InFlightSendsList& inflight_sends();
|
||||
|
||||
// iexchange commmunication
|
||||
inline void icommunicate(IExchangeInfo* iex); // async communication
|
||||
inline void icommunicate(IExchangeInfo* iex, MemoryManagement mem); // async communication
|
||||
|
||||
struct tags { enum {
|
||||
queue,
|
||||
@ -607,7 +628,7 @@ foreach_(const Callback<Block>& f, const Skip& skip)
|
||||
|
||||
void
|
||||
diy::Master::
|
||||
exchange(bool remote)
|
||||
exchange(bool remote, MemoryManagement mem)
|
||||
{
|
||||
auto scoped = prof.scoped("exchange");
|
||||
VTKMDIY_UNUSED(scoped);
|
||||
@ -625,7 +646,7 @@ exchange(bool remote)
|
||||
if (!remote)
|
||||
touch_queues();
|
||||
|
||||
flush(remote);
|
||||
flush(remote, mem);
|
||||
log->debug("Finished exchange");
|
||||
}
|
||||
|
||||
@ -658,7 +679,7 @@ touch_queues()
|
||||
template<class Block>
|
||||
void
|
||||
diy::Master::
|
||||
iexchange_(const ICallback<Block>& f)
|
||||
iexchange_(const ICallback<Block>& f, MemoryManagement mem)
|
||||
{
|
||||
auto scoped = prof.scoped("iexchange");
|
||||
VTKMDIY_UNUSED(scoped);
|
||||
@ -685,11 +706,11 @@ iexchange_(const ICallback<Block>& f)
|
||||
|
||||
thread comm_thread;
|
||||
if (threads() > 1)
|
||||
comm_thread = thread([this,&iex]()
|
||||
comm_thread = thread([this,&iex,mem]()
|
||||
{
|
||||
while(!iex.all_done())
|
||||
{
|
||||
icommunicate(&iex);
|
||||
icommunicate(&iex, mem);
|
||||
iex.control();
|
||||
//std::this_thread::sleep_for(std::chrono::microseconds(1));
|
||||
}
|
||||
@ -713,7 +734,7 @@ iexchange_(const ICallback<Block>& f)
|
||||
stats::Annotation::Guard g( stats::Annotation("diy.block").set(gid) );
|
||||
|
||||
if (threads() == 1)
|
||||
icommunicate(&iex);
|
||||
icommunicate(&iex, mem);
|
||||
bool done = done_result[gid];
|
||||
if (!done || !empty_incoming(gid))
|
||||
{
|
||||
@ -762,17 +783,17 @@ iexchange_(const ICallback<Block>& f)
|
||||
/* Communicator */
|
||||
void
|
||||
diy::Master::
|
||||
comm_exchange(GidSendOrder& gid_order, IExchangeInfo* iex)
|
||||
comm_exchange(GidSendOrder& gid_order, MemoryManagement mem, IExchangeInfo* iex)
|
||||
{
|
||||
auto scoped = prof.scoped("comm-exchange");
|
||||
VTKMDIY_UNUSED(scoped);
|
||||
|
||||
send_outgoing_queues(gid_order, false, iex);
|
||||
send_outgoing_queues(gid_order, false, mem, iex);
|
||||
|
||||
while(nudge(iex)) // kick requests
|
||||
;
|
||||
|
||||
check_incoming_queues(iex);
|
||||
check_incoming_queues(mem, iex);
|
||||
}
|
||||
|
||||
/* Remote communicator */
|
||||
@ -803,7 +824,7 @@ comm_exchange(GidSendOrder& gid_order, IExchangeInfo* iex)
|
||||
//
|
||||
void
|
||||
diy::Master::
|
||||
rcomm_exchange()
|
||||
rcomm_exchange(MemoryManagement mem)
|
||||
{
|
||||
bool done = false;
|
||||
bool ibarr_act = false;
|
||||
@ -814,12 +835,12 @@ rcomm_exchange()
|
||||
|
||||
while (!done)
|
||||
{
|
||||
send_outgoing_queues(gid_order, true, 0);
|
||||
send_outgoing_queues(gid_order, true, mem, 0);
|
||||
|
||||
// kick requests
|
||||
nudge();
|
||||
|
||||
check_incoming_queues();
|
||||
check_incoming_queues(mem);
|
||||
if (ibarr_act)
|
||||
{
|
||||
if (ibarr_req.test())
|
||||
@ -877,7 +898,7 @@ order_gids()
|
||||
// iexchange communicator
|
||||
void
|
||||
diy::Master::
|
||||
icommunicate(IExchangeInfo* iex)
|
||||
icommunicate(IExchangeInfo* iex, MemoryManagement mem)
|
||||
{
|
||||
auto scoped = prof.scoped("icommunicate");
|
||||
VTKMDIY_UNUSED(scoped);
|
||||
@ -887,7 +908,7 @@ icommunicate(IExchangeInfo* iex)
|
||||
auto gid_order = order_gids();
|
||||
|
||||
// exchange
|
||||
comm_exchange(gid_order, iex);
|
||||
comm_exchange(gid_order, mem, iex);
|
||||
|
||||
// cleanup
|
||||
|
||||
@ -906,6 +927,7 @@ send_queue(int from_gid,
|
||||
int to_proc,
|
||||
QueueRecord& qr,
|
||||
bool remote,
|
||||
MemoryManagement mem,
|
||||
IExchangeInfo* iex)
|
||||
{
|
||||
stats::Annotation::Guard gb( stats::Annotation("diy.block").set(from_gid) );
|
||||
@ -917,7 +939,7 @@ send_queue(int from_gid,
|
||||
log->debug("[{}] Sending queue: {} <- {} of size {}, iexchange = {}", comm_.rank(), to_gid, from_gid, qr.size(), iex ? 1 : 0);
|
||||
|
||||
if (to_proc == comm_.rank()) // sending to same rank, simply swap buffers
|
||||
send_same_rank(from_gid, to_gid, qr, iex);
|
||||
send_same_rank(from_gid, to_gid, qr, mem, iex);
|
||||
else // sending to an actual message to a different rank
|
||||
send_different_rank(from_gid, to_gid, to_proc, qr, remote, iex);
|
||||
}
|
||||
@ -926,6 +948,7 @@ void
|
||||
diy::Master::
|
||||
send_outgoing_queues(GidSendOrder& gid_order,
|
||||
bool remote, // TODO: are remote and iexchange mutually exclusive? If so, use single enum?
|
||||
MemoryManagement mem,
|
||||
IExchangeInfo* iex)
|
||||
{
|
||||
auto scoped = prof.scoped("send-outgoing-queues");
|
||||
@ -950,7 +973,7 @@ send_outgoing_queues(GidSendOrder& gid_order,
|
||||
access.unlock(); // others can push on this queue, while we are working
|
||||
assert(!qr.external());
|
||||
log->debug("Processing queue: {} <- {} of size {}", to_gid, from, qr.size());
|
||||
send_queue(from, to_gid, to_proc, qr, remote, iex);
|
||||
send_queue(from, to_gid, to_proc, qr, remote, mem, iex);
|
||||
access.lock();
|
||||
}
|
||||
}
|
||||
@ -978,7 +1001,7 @@ send_outgoing_queues(GidSendOrder& gid_order,
|
||||
// NB: send only front
|
||||
auto& qr = access->front();
|
||||
log->debug("Processing queue: {} <- {} of size {}", to_gid, from_gid, qr.size());
|
||||
send_queue(from_gid, to_gid, to_proc, qr, remote, iex);
|
||||
send_queue(from_gid, to_gid, to_proc, qr, remote, mem, iex);
|
||||
access->pop_front();
|
||||
}
|
||||
}
|
||||
@ -987,7 +1010,7 @@ send_outgoing_queues(GidSendOrder& gid_order,
|
||||
|
||||
void
|
||||
diy::Master::
|
||||
send_same_rank(int from, int to, QueueRecord& qr, IExchangeInfo*)
|
||||
send_same_rank(int from, int to, QueueRecord& qr, MemoryManagement mem, IExchangeInfo*)
|
||||
{
|
||||
auto scoped = prof.scoped("send-same-rank");
|
||||
|
||||
@ -997,9 +1020,24 @@ send_same_rank(int from, int to, QueueRecord& qr, IExchangeInfo*)
|
||||
|
||||
auto access_incoming = current_incoming.map[to][from].access();
|
||||
|
||||
// save blobs to copy them explicitly
|
||||
std::vector<BinaryBlob> blobs;
|
||||
qr.buffer().blobs.swap(blobs);
|
||||
qr.buffer().blob_position = 0;
|
||||
|
||||
access_incoming->emplace_back(std::move(qr));
|
||||
QueueRecord& in_qr = access_incoming->back();
|
||||
|
||||
// copy blobs explicitly; we cannot just move them in place, since we don't
|
||||
// own their memory and must guarantee that it's safe to free, once
|
||||
// exchange() is done
|
||||
for (BinaryBlob& blob : blobs)
|
||||
{
|
||||
char* p = mem.allocate(to, blob.size);
|
||||
mem.copy(p, blob.pointer.get(), blob.size);
|
||||
in_qr.buffer().save_binary_blob(p, blob.size, mem.deallocate);
|
||||
}
|
||||
|
||||
if (!in_qr.external())
|
||||
{
|
||||
in_qr.reset();
|
||||
@ -1029,7 +1067,7 @@ send_different_rank(int from, int to, int proc, QueueRecord& qr, bool remote, IE
|
||||
// sending to a different rank
|
||||
std::shared_ptr<MemoryBuffer> buffer = std::make_shared<MemoryBuffer>(qr.move());
|
||||
|
||||
MessageInfo info{from, to, 1, exchange_round_};
|
||||
MessageInfo info{from, to, 1, exchange_round_, static_cast<int>(buffer->nblobs())};
|
||||
// size fits in one message
|
||||
if (Serialization<MemoryBuffer>::size(*buffer) + Serialization<MessageInfo>::size(info) <= MAX_MPI_MESSAGE_COUNT)
|
||||
{
|
||||
@ -1103,11 +1141,33 @@ send_different_rank(int from, int to, int proc, QueueRecord& qr, bool remote, IE
|
||||
inflight_send.message = buffer;
|
||||
}
|
||||
} // large message broken into pieces
|
||||
|
||||
// send binary blobs
|
||||
for (size_t i = 0; i < buffer->nblobs(); ++i)
|
||||
{
|
||||
auto blob = buffer->load_binary_blob();
|
||||
assert(blob.size < MAX_MPI_MESSAGE_COUNT); // for now assume blobs are small enough that we don't need to break them into multiple parts
|
||||
|
||||
inflight_sends().emplace_back();
|
||||
auto& inflight_send = inflight_sends().back();
|
||||
|
||||
inflight_send.info = info;
|
||||
|
||||
detail::VectorWindow<char> window;
|
||||
window.begin = const_cast<char*>(blob.pointer.get());
|
||||
window.count = blob.size;
|
||||
|
||||
if (remote || iex)
|
||||
inflight_send.request = comm_.issend(proc, tags::queue, window);
|
||||
else
|
||||
inflight_send.request = comm_.isend(proc, tags::queue, window);
|
||||
inflight_send.blob = std::move(blob);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
diy::Master::
|
||||
check_incoming_queues(IExchangeInfo* iex)
|
||||
check_incoming_queues(MemoryManagement mem, IExchangeInfo* iex)
|
||||
{
|
||||
auto scoped = prof.scoped("check-incoming-queues");
|
||||
VTKMDIY_UNUSED(scoped);
|
||||
@ -1116,6 +1176,7 @@ check_incoming_queues(IExchangeInfo* iex)
|
||||
while (ostatus)
|
||||
{
|
||||
InFlightRecv& ir = inflight_recv(ostatus->source());
|
||||
ir.mem = mem;
|
||||
|
||||
if (iex)
|
||||
iex->inc_work(); // increment work before sender's issend request can complete (so we are now responsible for the queue)
|
||||
@ -1141,7 +1202,7 @@ check_incoming_queues(IExchangeInfo* iex)
|
||||
|
||||
void
|
||||
diy::Master::
|
||||
flush(bool remote)
|
||||
flush(bool remote, MemoryManagement mem)
|
||||
{
|
||||
#ifdef VTKMDIY_DEBUG
|
||||
time_type start = get_time();
|
||||
@ -1155,13 +1216,13 @@ flush(bool remote)
|
||||
|
||||
|
||||
if (remote)
|
||||
rcomm_exchange();
|
||||
rcomm_exchange(mem);
|
||||
else
|
||||
{
|
||||
auto gid_order = order_gids();
|
||||
do
|
||||
{
|
||||
comm_exchange(gid_order);
|
||||
comm_exchange(gid_order, mem);
|
||||
|
||||
#ifdef VTKMDIY_DEBUG
|
||||
time_type cur = get_time();
|
||||
|
@ -1,6 +1,8 @@
|
||||
#ifndef VTKMDIY_MPI_CONFIG_HPP
|
||||
#define VTKMDIY_MPI_CONFIG_HPP
|
||||
|
||||
#include <utility>
|
||||
|
||||
/// We want to allow the use of `diy::mpi` in either header-only or library mode.
|
||||
/// VTKMDIY_MPI_AS_LIB is defined when using library mode.
|
||||
/// This file contains some configuration macros. To maintain backwards compatibility
|
||||
@ -49,13 +51,26 @@ struct DIY_##mpitype { \
|
||||
mpitype data; \
|
||||
};
|
||||
|
||||
#define DEFINE_DIY_MPI_TYPE_MOVE(mpitype) \
|
||||
struct DIY_##mpitype { \
|
||||
DIY_##mpitype() = default; \
|
||||
DIY_##mpitype(const mpitype&) = delete; \
|
||||
DIY_##mpitype(mpitype&& obj) : data(std::move(obj)) {} \
|
||||
DIY_##mpitype& operator=(const mpitype&) = delete; \
|
||||
DIY_##mpitype& operator=(mpitype&& obj) { data = std::move(obj); return *this; } \
|
||||
operator const mpitype&() const { return data; } \
|
||||
void reset() { data = mpitype(); } \
|
||||
private: \
|
||||
mpitype data; \
|
||||
};
|
||||
|
||||
DEFINE_DIY_MPI_TYPE(MPI_Comm)
|
||||
DEFINE_DIY_MPI_TYPE(MPI_Datatype)
|
||||
DEFINE_DIY_MPI_TYPE(MPI_Status)
|
||||
DEFINE_DIY_MPI_TYPE(MPI_Request)
|
||||
DEFINE_DIY_MPI_TYPE(MPI_Op)
|
||||
DEFINE_DIY_MPI_TYPE(MPI_File)
|
||||
DEFINE_DIY_MPI_TYPE(MPI_Win)
|
||||
DEFINE_DIY_MPI_TYPE_MOVE(MPI_Win)
|
||||
|
||||
#undef DEFINE_DIY_MPI_TYPE
|
||||
|
||||
|
@ -18,13 +18,18 @@ inline mpitype& mpi_cast(DIY_##mpitype& obj) { return *reinterpret_cast<mpitype*
|
||||
inline const mpitype& mpi_cast(const DIY_##mpitype& obj) { return *reinterpret_cast<const mpitype*>(&obj); } \
|
||||
inline DIY_##mpitype make_DIY_##mpitype(const mpitype& obj) { DIY_##mpitype ret; mpi_cast(ret) = obj; return ret; }
|
||||
|
||||
#define DEFINE_MPI_CAST_MOVE(mpitype) \
|
||||
inline mpitype& mpi_cast(DIY_##mpitype& obj) { return *reinterpret_cast<mpitype*>(&obj); } \
|
||||
inline const mpitype& mpi_cast(const DIY_##mpitype& obj) { return *reinterpret_cast<const mpitype*>(&obj); } \
|
||||
inline DIY_##mpitype make_DIY_##mpitype(mpitype&& obj) { DIY_##mpitype ret = std::move(obj); return ret; }
|
||||
|
||||
DEFINE_MPI_CAST(MPI_Comm)
|
||||
DEFINE_MPI_CAST(MPI_Datatype)
|
||||
DEFINE_MPI_CAST(MPI_Status)
|
||||
DEFINE_MPI_CAST(MPI_Request)
|
||||
DEFINE_MPI_CAST(MPI_Op)
|
||||
DEFINE_MPI_CAST(MPI_File)
|
||||
DEFINE_MPI_CAST(MPI_Win)
|
||||
DEFINE_MPI_CAST_MOVE(MPI_Win)
|
||||
|
||||
#undef DEFINE_MPI_CAST
|
||||
|
||||
|
@ -1,6 +1,8 @@
|
||||
#ifndef VTKMDIY_MPI_MPITYPES_H
|
||||
#define VTKMDIY_MPI_MPITYPES_H
|
||||
|
||||
#include <cstring>
|
||||
|
||||
#cmakedefine TYPESIZE_MPI_Comm @TYPESIZE_MPI_Comm@
|
||||
#cmakedefine TYPESIZE_MPI_Datatype @TYPESIZE_MPI_Datatype@
|
||||
#cmakedefine TYPESIZE_MPI_Status @TYPESIZE_MPI_Status@
|
||||
@ -18,6 +20,7 @@ namespace mpi
|
||||
# define ASSERT_MPI_TYPE_SIZE(mpitype) static_assert(sizeof(mpitype) <= sizeof(DIY_##mpitype), "");
|
||||
#else
|
||||
# define ASSERT_MPI_TYPE_SIZE(mpitype)
|
||||
struct MPI_Win;
|
||||
#endif
|
||||
|
||||
#define DEFINE_DIY_MPI_TYPE(mpitype) \
|
||||
@ -26,15 +29,41 @@ struct DIY_##mpitype { \
|
||||
}; \
|
||||
ASSERT_MPI_TYPE_SIZE(mpitype)
|
||||
|
||||
#define DEFINE_DIY_MPI_TYPE_MOVE(mpitype) \
|
||||
struct DIY_##mpitype \
|
||||
{ \
|
||||
DIY_##mpitype() = default; \
|
||||
DIY_##mpitype(const mpitype&) = delete; \
|
||||
DIY_##mpitype& operator=(const mpitype&) = delete; \
|
||||
DIY_##mpitype(mpitype&& obj) \
|
||||
{ \
|
||||
std::memcpy(data, &obj, TYPESIZE_##mpitype); \
|
||||
std::memset(&obj, 0, TYPESIZE_##mpitype); \
|
||||
} \
|
||||
DIY_##mpitype& operator=(mpitype&& obj) \
|
||||
{ \
|
||||
std::memcpy(data, &obj, TYPESIZE_##mpitype); \
|
||||
std::memset(&obj, 0, TYPESIZE_##mpitype); \
|
||||
return *this; \
|
||||
} \
|
||||
operator const mpitype&() const { return *reinterpret_cast<const mpitype*>(data); } \
|
||||
void reset() { std::memset(data, 0, TYPESIZE_##mpitype); } \
|
||||
\
|
||||
private: \
|
||||
char* data[TYPESIZE_##mpitype]; \
|
||||
}; \
|
||||
ASSERT_MPI_TYPE_SIZE(mpitype);
|
||||
|
||||
DEFINE_DIY_MPI_TYPE(MPI_Comm)
|
||||
DEFINE_DIY_MPI_TYPE(MPI_Datatype)
|
||||
DEFINE_DIY_MPI_TYPE(MPI_Status)
|
||||
DEFINE_DIY_MPI_TYPE(MPI_Request)
|
||||
DEFINE_DIY_MPI_TYPE(MPI_Op)
|
||||
DEFINE_DIY_MPI_TYPE(MPI_File)
|
||||
DEFINE_DIY_MPI_TYPE(MPI_Win)
|
||||
DEFINE_DIY_MPI_TYPE_MOVE(MPI_Win)
|
||||
|
||||
#undef DEFINE_DIY_MPI_TYPE
|
||||
#undef DEFINE_DIY_MPI_TYPE_MOVE
|
||||
#undef ASSERT_MPI_TYPE_SIZE
|
||||
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
#ifndef VTKMDIY_MPI_NO_MPI_HPP
|
||||
#define VTKMDIY_MPI_NO_MPI_HPP
|
||||
|
||||
#include <cassert> // std::assert
|
||||
#include <stdexcept> // std::runtime_error
|
||||
|
||||
|
||||
@ -75,7 +76,39 @@ static const int MPI_MODE_APPEND = 128;
|
||||
static const int MPI_MODE_SEQUENTIAL = 256;
|
||||
|
||||
/* define window type */
|
||||
using MPI_Win = void*;
|
||||
struct MPI_Win {
|
||||
MPI_Win(): data_(0) {}
|
||||
MPI_Win(void* data, bool owned = false): data_(uintptr_t(data) | (owned ? 0x1 : 0x0))
|
||||
{
|
||||
// We assume that pointers have at least some higher-byte alignment.
|
||||
assert(!(uintptr_t(data) & 0x1));
|
||||
}
|
||||
void* data() const { return (void*)(data_ & ~0x1); }
|
||||
bool owned() const { return data_ & 0x1; }
|
||||
|
||||
// We cannot copy owned windows.
|
||||
MPI_Win(MPI_Win const&) = delete;
|
||||
MPI_Win& operator=(MPI_Win const&) = delete;
|
||||
|
||||
// We cannot move owned windows (we don't know how to delete them in general).
|
||||
MPI_Win(MPI_Win&& rhs): data_(rhs.data_)
|
||||
{
|
||||
rhs.data_ = 0;
|
||||
}
|
||||
MPI_Win& operator=(MPI_Win&& rhs)
|
||||
{
|
||||
if (this == &rhs)
|
||||
return *this;
|
||||
|
||||
data_ = rhs.data_;
|
||||
rhs.data_ = 0;
|
||||
|
||||
return *this;
|
||||
}
|
||||
private:
|
||||
uintptr_t data_;
|
||||
};
|
||||
#define MPI_WIN_NULL MPI_Win()
|
||||
|
||||
/* window fence assertions */
|
||||
static const int MPI_MODE_NOSTORE = 1;
|
||||
|
@ -37,8 +37,8 @@ namespace mpi
|
||||
const void* address() const { return buf_; }
|
||||
|
||||
private:
|
||||
alignas(T) char buf_[sizeof(T)];
|
||||
bool init_;
|
||||
char buf_[sizeof(T)];
|
||||
};
|
||||
}
|
||||
}
|
||||
|
@ -22,6 +22,21 @@ EXPORT_MACRO const int nocheck = MPI_MODE_NOCHECK;
|
||||
namespace detail
|
||||
{
|
||||
|
||||
DIY_MPI_Win win_allocate(const communicator& comm, void** base, unsigned size, int disp)
|
||||
{
|
||||
#if VTKMDIY_HAS_MPI
|
||||
DIY_MPI_Win win;
|
||||
MPI_Win_allocate(size, disp, MPI_INFO_NULL, mpi_cast(comm.handle()), base, &mpi_cast(win));
|
||||
return win;
|
||||
#else
|
||||
(void)comm; (void)disp;
|
||||
*base = malloc(size);
|
||||
auto mpi_win = MPI_Win(*base, true);
|
||||
auto win = make_DIY_MPI_Win(std::move(mpi_win));
|
||||
return win;
|
||||
#endif
|
||||
}
|
||||
|
||||
DIY_MPI_Win win_create(const communicator& comm, void* base, unsigned size, int disp)
|
||||
{
|
||||
#if VTKMDIY_HAS_MPI
|
||||
@ -30,7 +45,8 @@ DIY_MPI_Win win_create(const communicator& comm, void* base, unsigned size, int
|
||||
return win;
|
||||
#else
|
||||
(void)comm; (void)size; (void)disp;
|
||||
auto win = make_DIY_MPI_Win(base);
|
||||
auto mpi_win = MPI_Win(base);
|
||||
auto win = make_DIY_MPI_Win(std::move(mpi_win));
|
||||
return win;
|
||||
#endif
|
||||
}
|
||||
@ -40,7 +56,9 @@ void win_free(DIY_MPI_Win& win)
|
||||
#if VTKMDIY_HAS_MPI
|
||||
MPI_Win_free(&mpi_cast(win));
|
||||
#else
|
||||
(void)win;
|
||||
auto& mpi_win = mpi_cast(win);
|
||||
if (mpi_win.owned())
|
||||
free(mpi_win.data());
|
||||
#endif
|
||||
}
|
||||
|
||||
@ -49,7 +67,7 @@ void put(const DIY_MPI_Win& win, const void* data, int count, const datatype& ty
|
||||
#if VTKMDIY_HAS_MPI
|
||||
MPI_Put(data, count, mpi_cast(type.handle), rank, offset, count, mpi_cast(type.handle), mpi_cast(win));
|
||||
#else
|
||||
void* buffer = mpi_cast(win);
|
||||
void* buffer = mpi_cast(win).data();
|
||||
size_t size = mpi_cast(type.handle);
|
||||
std::copy_n(static_cast<const int8_t*>(data),
|
||||
size * static_cast<size_t>(count),
|
||||
@ -63,7 +81,7 @@ void get(const DIY_MPI_Win& win, void* data, int count, const datatype& type, in
|
||||
#if VTKMDIY_HAS_MPI
|
||||
MPI_Get(data, count, mpi_cast(type.handle), rank, offset, count, mpi_cast(type.handle), mpi_cast(win));
|
||||
#else
|
||||
const void* buffer = mpi_cast(win);
|
||||
const void* buffer = mpi_cast(win).data();
|
||||
size_t size = mpi_cast(type.handle);
|
||||
std::copy_n(static_cast<const int8_t*>(buffer) + (offset * size),
|
||||
size * static_cast<size_t>(count),
|
||||
@ -136,7 +154,7 @@ void fetch(const DIY_MPI_Win& win, void* result, const datatype& type, int rank,
|
||||
MPI_Fetch_and_op(nullptr, result, mpi_cast(type.handle), rank, offset, MPI_NO_OP, mpi_cast(win));
|
||||
#else
|
||||
(void) rank;
|
||||
const void* buffer = mpi_cast(win);
|
||||
const void* buffer = mpi_cast(win).data();
|
||||
size_t size = mpi_cast(type.handle);
|
||||
std::copy_n(static_cast<const int8_t*>(buffer) + (offset * size),
|
||||
size,
|
||||
@ -150,7 +168,7 @@ void replace(const DIY_MPI_Win& win, const void* value, const datatype& type, in
|
||||
MPI_Fetch_and_op(value, nullptr, mpi_cast(type.handle), rank, offset, MPI_REPLACE, mpi_cast(win));
|
||||
#else
|
||||
(void) rank;
|
||||
void* buffer = mpi_cast(win);
|
||||
void* buffer = mpi_cast(win).data();
|
||||
size_t size = mpi_cast(type.handle);
|
||||
std::copy_n(static_cast<const int8_t*>(value),
|
||||
size,
|
||||
|
@ -22,6 +22,9 @@ VTKMDIY_MPI_EXPORT extern const int nocheck;
|
||||
namespace detail
|
||||
{
|
||||
|
||||
VTKMDIY_MPI_EXPORT_FUNCTION
|
||||
DIY_MPI_Win win_allocate(const communicator& comm, void** base, unsigned size, int disp);
|
||||
|
||||
VTKMDIY_MPI_EXPORT_FUNCTION
|
||||
DIY_MPI_Win win_create(const communicator& comm, void* base, unsigned size, int disp);
|
||||
|
||||
@ -96,8 +99,8 @@ void flush_local_all(const DIY_MPI_Win& win);
|
||||
inline ~window();
|
||||
|
||||
// moving is Ok
|
||||
window(window&&) = default;
|
||||
window& operator=(window&&) = default;
|
||||
inline window(window&&);
|
||||
inline window& operator=(window&&);
|
||||
|
||||
// cannot copy because of the buffer_
|
||||
window(const window&) = delete;
|
||||
@ -129,7 +132,7 @@ void flush_local_all(const DIY_MPI_Win& win);
|
||||
inline void flush_local_all();
|
||||
|
||||
private:
|
||||
std::vector<T> buffer_;
|
||||
void* buffer_;
|
||||
int rank_;
|
||||
DIY_MPI_Win window_;
|
||||
};
|
||||
@ -140,16 +143,46 @@ void flush_local_all(const DIY_MPI_Win& win);
|
||||
template<class T>
|
||||
diy::mpi::window<T>::
|
||||
window(const diy::mpi::communicator& comm, unsigned size):
|
||||
buffer_(size), rank_(comm.rank())
|
||||
buffer_(nullptr), rank_(comm.rank())
|
||||
{
|
||||
window_ = detail::win_create(comm, buffer_.data(), static_cast<unsigned>(buffer_.size()*sizeof(T)), static_cast<int>(sizeof(T)));
|
||||
window_ = detail::win_allocate(comm, &buffer_, static_cast<unsigned>(size*sizeof(T)), static_cast<int>(sizeof(T)));
|
||||
}
|
||||
|
||||
template<class T>
|
||||
diy::mpi::window<T>::
|
||||
~window()
|
||||
{
|
||||
detail::win_free(window_);
|
||||
if (buffer_)
|
||||
detail::win_free(window_);
|
||||
}
|
||||
|
||||
template<class T>
|
||||
diy::mpi::window<T>::
|
||||
window(window&& rhs):
|
||||
buffer_(rhs.buffer_), rank_(rhs.rank_), window_(std::move(rhs.window_))
|
||||
{
|
||||
rhs.buffer_ = nullptr;
|
||||
rhs.window_.reset();
|
||||
}
|
||||
|
||||
template<class T>
|
||||
diy::mpi::window<T>&
|
||||
diy::mpi::window<T>::
|
||||
operator=(window&& rhs)
|
||||
{
|
||||
if (this == &rhs)
|
||||
return *this;
|
||||
|
||||
if (buffer_)
|
||||
detail::win_free(window_);
|
||||
|
||||
buffer_ = rhs.buffer_;
|
||||
rhs.buffer_ = nullptr;
|
||||
rank_ = rhs.rank_;
|
||||
window_ = std::move(rhs.window_);
|
||||
rhs.window_.reset();
|
||||
|
||||
return *this;
|
||||
}
|
||||
|
||||
template<class T>
|
||||
|
@ -105,6 +105,12 @@ namespace diy
|
||||
void (*save)(BinaryBuffer&, const T&) = &::diy::save //!< optional serialization function
|
||||
) const;
|
||||
|
||||
void inline enqueue_blob
|
||||
(const BlockID& to, //!< target block (gid,proc)
|
||||
const char* x, //!< pointer to the data
|
||||
size_t n //!< size in data elements (eg. ints)
|
||||
) const;
|
||||
|
||||
//! Dequeue data whose size can be determined automatically (e.g., STL vector) and that was
|
||||
//! previously enqueued so that diy knows its size when it is received.
|
||||
//! In this case, diy will allocate the receive buffer; the user does not need to do so.
|
||||
@ -142,6 +148,9 @@ namespace diy
|
||||
void (*load)(BinaryBuffer&, T&) = &::diy::load //!< optional serialization function
|
||||
) const { dequeue(from.gid, x, n, load); }
|
||||
|
||||
BinaryBlob inline dequeue_blob
|
||||
(int from) const;
|
||||
|
||||
template<class T>
|
||||
EnqueueIterator<T> enqueuer(const T& x,
|
||||
void (*save)(BinaryBuffer&, const T&) = &::diy::save ) const
|
||||
@ -347,5 +356,20 @@ dequeue(int from, T* x, size_t n,
|
||||
load(bb, x[i]);
|
||||
}
|
||||
|
||||
void
|
||||
diy::Master::Proxy::
|
||||
enqueue_blob(const BlockID& to, const char* x, size_t n) const
|
||||
{
|
||||
BinaryBuffer& bb = outgoing_[to];
|
||||
bb.save_binary_blob(x,n);
|
||||
}
|
||||
|
||||
diy::BinaryBlob
|
||||
diy::Master::Proxy::
|
||||
dequeue_blob(int from) const
|
||||
{
|
||||
BinaryBuffer& bb = incoming_[from];
|
||||
return bb.load_binary_blob();
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -138,7 +138,7 @@ void reduce(Master& master, //!< master object
|
||||
}
|
||||
}
|
||||
master.set_expected(expected);
|
||||
master.flush();
|
||||
master.flush(false);
|
||||
}
|
||||
// final round
|
||||
log->debug("Round {}", round);
|
||||
|
@ -1,22 +1,30 @@
|
||||
#ifndef VTKMDIY_SERIALIZATION_HPP
|
||||
#define VTKMDIY_SERIALIZATION_HPP
|
||||
|
||||
#include <vector>
|
||||
#include <valarray>
|
||||
#include <cassert>
|
||||
#include <fstream>
|
||||
#include <functional>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <set>
|
||||
#include <string>
|
||||
#include <fstream>
|
||||
|
||||
#include <tuple>
|
||||
#include <type_traits> // this is used for a safety check for default serialization
|
||||
#include <unordered_map>
|
||||
#include <unordered_set>
|
||||
#include <type_traits> // this is used for a safety check for default serialization
|
||||
|
||||
#include <cassert>
|
||||
#include <valarray>
|
||||
#include <vector>
|
||||
|
||||
namespace diy
|
||||
{
|
||||
struct BinaryBlob
|
||||
{
|
||||
using Deleter = std::function<void(const char[])>;
|
||||
using Pointer = std::unique_ptr<const char[], Deleter>;
|
||||
Pointer pointer;
|
||||
size_t size;
|
||||
};
|
||||
|
||||
//! A serialization buffer. \ingroup Serialization
|
||||
struct BinaryBuffer
|
||||
{
|
||||
@ -25,10 +33,18 @@ namespace diy
|
||||
virtual inline void append_binary(const char* x, size_t count) =0; //!< append `count` bytes from `x` to end of buffer
|
||||
virtual void load_binary(char* x, size_t count) =0; //!< copy `count` bytes into `x` from the buffer
|
||||
virtual void load_binary_back(char* x, size_t count) =0; //!< copy `count` bytes into `x` from the back of the buffer
|
||||
virtual char* grow(size_t count) =0; //!< allocate enough space for `count` bytes and return the pointer to the beginning
|
||||
virtual char* advance(size_t count) =0; //!< advance buffer position by `count` bytes and return the pointer to the beginning
|
||||
|
||||
virtual void save_binary_blob(const char*, size_t) =0;
|
||||
virtual void save_binary_blob(const char*, size_t, BinaryBlob::Deleter) = 0;
|
||||
virtual BinaryBlob load_binary_blob() =0;
|
||||
};
|
||||
|
||||
struct MemoryBuffer: public BinaryBuffer
|
||||
{
|
||||
using Blob = BinaryBlob;
|
||||
|
||||
MemoryBuffer(size_t position_ = 0):
|
||||
position(position_) {}
|
||||
|
||||
@ -41,6 +57,13 @@ namespace diy
|
||||
virtual inline void append_binary(const char* x, size_t count) override; //!< append `count` bytes from `x` to end of buffer
|
||||
virtual inline void load_binary(char* x, size_t count) override; //!< copy `count` bytes into `x` from the buffer
|
||||
virtual inline void load_binary_back(char* x, size_t count) override; //!< copy `count` bytes into `x` from the back of the buffer
|
||||
virtual inline char* grow(size_t count) override; //!< allocate enough space for `count` bytes and return the pointer to the beginning
|
||||
virtual inline char* advance(size_t count) override; //!< advance buffer position by `count` bytes and return the pointer to the beginning
|
||||
|
||||
virtual inline void save_binary_blob(const char* x, size_t count) override;
|
||||
virtual inline void save_binary_blob(const char* x, size_t count, Blob::Deleter deleter) override;
|
||||
virtual inline Blob load_binary_blob() override;
|
||||
size_t nblobs() const { return blobs.size(); }
|
||||
|
||||
void clear() { buffer.clear(); reset(); }
|
||||
void wipe() { std::vector<char>().swap(buffer); reset(); }
|
||||
@ -71,6 +94,9 @@ namespace diy
|
||||
|
||||
size_t position;
|
||||
std::vector<char> buffer;
|
||||
|
||||
size_t blob_position = 0;
|
||||
std::vector<Blob> blobs;
|
||||
};
|
||||
|
||||
namespace detail
|
||||
@ -140,7 +166,7 @@ namespace diy
|
||||
template<class T>
|
||||
void load_back(BinaryBuffer& bb, T& x) { bb.load_binary_back((char*) &x, sizeof(T)); }
|
||||
|
||||
//@}
|
||||
//!@}
|
||||
|
||||
|
||||
namespace detail
|
||||
@ -444,17 +470,7 @@ void
|
||||
diy::MemoryBuffer::
|
||||
save_binary(const char* x, size_t count)
|
||||
{
|
||||
if (position + count > buffer.capacity())
|
||||
{
|
||||
double newsize = static_cast<double>(position + count) * growth_multiplier(); // if we have to grow, grow geometrically
|
||||
buffer.reserve(static_cast<size_t>(newsize));
|
||||
}
|
||||
|
||||
if (position + count > buffer.size())
|
||||
buffer.resize(position + count);
|
||||
|
||||
std::copy_n(x, count, &buffer[position]);
|
||||
position += count;
|
||||
std::copy_n(x, count, grow(count));
|
||||
}
|
||||
|
||||
void
|
||||
@ -509,6 +525,58 @@ load_binary_back(char* x, size_t count)
|
||||
buffer.resize(buffer.size() - count);
|
||||
}
|
||||
|
||||
char*
|
||||
diy::MemoryBuffer::
|
||||
grow(size_t count)
|
||||
{
|
||||
if (position + count > buffer.capacity())
|
||||
{
|
||||
double newsize = static_cast<double>(position + count) * growth_multiplier(); // if we have to grow, grow geometrically
|
||||
buffer.reserve(static_cast<size_t>(newsize));
|
||||
}
|
||||
|
||||
if (position + count > buffer.size())
|
||||
buffer.resize(position + count);
|
||||
|
||||
char* destination = &buffer[position];
|
||||
|
||||
position += count;
|
||||
|
||||
return destination;
|
||||
}
|
||||
|
||||
char*
|
||||
diy::MemoryBuffer::
|
||||
advance(size_t count)
|
||||
{
|
||||
char* origin = &buffer[position];
|
||||
position += count;
|
||||
return origin;
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
diy::MemoryBuffer::
|
||||
save_binary_blob(const char* x, size_t count)
|
||||
{
|
||||
// empty deleter means we don't take ownership
|
||||
save_binary_blob(x, count, [](const char[]) {});
|
||||
}
|
||||
|
||||
void
|
||||
diy::MemoryBuffer::
|
||||
save_binary_blob(const char* x, size_t count, Blob::Deleter deleter)
|
||||
{
|
||||
blobs.emplace_back(Blob { Blob::Pointer {x, deleter}, count });
|
||||
}
|
||||
|
||||
diy::MemoryBuffer::Blob
|
||||
diy::MemoryBuffer::
|
||||
load_binary_blob()
|
||||
{
|
||||
return std::move(blobs[blob_position++]);
|
||||
}
|
||||
|
||||
void
|
||||
diy::MemoryBuffer::
|
||||
copy(MemoryBuffer& from, MemoryBuffer& to)
|
||||
|
@ -15,8 +15,8 @@ namespace diy
|
||||
{
|
||||
namespace detail
|
||||
{
|
||||
typedef void (*Save)(const void*, BinaryBuffer& buf);
|
||||
typedef void (*Load)(void*, BinaryBuffer& buf);
|
||||
using Save = std::function<void(const void*, BinaryBuffer&)>;
|
||||
using Load = std::function<void(void*, BinaryBuffer&)>;
|
||||
|
||||
struct FileBuffer: public BinaryBuffer
|
||||
{
|
||||
@ -34,6 +34,16 @@ namespace diy
|
||||
}
|
||||
virtual inline void load_binary(char* x, size_t count) override { auto n = fread(x, 1, count, file); VTKMDIY_UNUSED(n);}
|
||||
virtual inline void load_binary_back(char* x, size_t count) override { fseek(file, static_cast<long>(tail), SEEK_END); auto n = fread(x, 1, count, file); tail += count; fseek(file, static_cast<long>(head), SEEK_SET); VTKMDIY_UNUSED(n);}
|
||||
virtual inline char* grow(size_t) override { throw std::runtime_error("Cannot grow a FileBuffer"); }
|
||||
virtual inline char* advance(size_t) override { throw std::runtime_error("Cannot advance a FileBuffer"); }
|
||||
|
||||
// TODO: for now, we just throw, but obviously it should be possile to store binary blobs in a file; might want to fall back
|
||||
using Blob = BinaryBlob;
|
||||
virtual inline void save_binary_blob(const char*, size_t) override { throw std::runtime_error("Cannot save binary blobs in a FileBuffer"); }
|
||||
|
||||
virtual inline void save_binary_blob(const char*, size_t, Blob::Deleter) override { throw std::runtime_error("Cannot save binary blobs in a FileBuffer"); }
|
||||
|
||||
virtual inline Blob load_binary_blob() override { throw std::runtime_error("Cannot load binary blobs from a FileBuffer"); }
|
||||
|
||||
size_t size() const { return head; }
|
||||
|
||||
|
859
include/vtkmdiy/thirdparty/chobo/small_vector.hpp → include/vtkmdiy/thirdparty/itlib/small_vector.hpp
vendored
859
include/vtkmdiy/thirdparty/chobo/small_vector.hpp → include/vtkmdiy/thirdparty/itlib/small_vector.hpp
vendored
File diff suppressed because it is too large
Load Diff
@ -41,6 +41,9 @@ namespace diy
|
||||
#include "critical-resource.hpp"
|
||||
|
||||
#if !defined(VTKMDIY_NO_THREADS)
|
||||
|
||||
#include <memory> // for shared_ptr
|
||||
|
||||
template<class T, class U>
|
||||
struct diy::concurrent_map
|
||||
{
|
||||
|
@ -3,6 +3,6 @@
|
||||
|
||||
#define VTKMDIY_VERSION_MAJOR 3
|
||||
#define VTKMDIY_VERSION_MINOR 5
|
||||
#define DIY_VERSION_PATCH dev1
|
||||
#define VTKMDIY_VERSION_PATCH dev1
|
||||
|
||||
#endif
|
||||
|
Loading…
Reference in New Issue
Block a user