Merge branch 'upstream-diy' into add-gpu-to-gpu-conn

# By Diy Upstream
* upstream-diy:
  diy 2023-03-28 (6837fb55)
This commit is contained in:
Vicente Adolfo Bolea Sanchez 2023-05-30 12:12:48 -04:00
commit 662f93e07a
24 changed files with 607 additions and 806 deletions

@ -33,6 +33,13 @@ macro (diy_dependent_option variable)
endif ()
endmacro ()
set (compiler_supports_sanitizers OFF)
if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang" OR
CMAKE_CXX_COMPILER_ID STREQUAL "AppleClang" OR
CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set (compiler_supports_sanitizers ON)
endif ()
diy_option (threads "Build DIY with threading" ON)
diy_option (log "Build DIY with logging" OFF)
diy_option (profile "Build DIY with profiling" OFF)
@ -44,6 +51,8 @@ diy_dependent_option (BUILD_SHARED_LIBS "Create shared libraries if on"
diy_dependent_option (build_diy_nompi_lib "Also build the nompi version of diy::mpi" OFF "mpi;build_diy_mpi_lib" OFF)
diy_option (build_examples "Build DIY examples" ON)
diy_option (build_tests "Build DIY tests" ON)
diy_option (python "Build Python bindings" OFF)
cmake_dependent_option (enable_sanitizers "Build DIY with sanitizer support" OFF "compiler_supports_sanitizers" OFF)
# Default to Release
if (NOT CMAKE_BUILD_TYPE)
@ -64,9 +73,8 @@ endif ()
# Logging
if (log)
list (APPEND diy_definitions "-DVTKMDIY_USE_SPDLOG")
find_path (SPDLOG_INCLUDE_DIR spdlog/spdlog.h)
list (APPEND diy_include_thirdparty_directories $<BUILD_INTERFACE:${SPDLOG_INCLUDE_DIR}>)
list (APPEND diy_definitions "-DVTMDIY_USE_SPDLOG")
find_package (spdlog REQUIRED)
endif()
# Profiling
@ -114,8 +122,12 @@ if (NOT DEFINED diy_export_name)
set(diy_export_name "diy_targets")
endif()
set (CMAKE_ARCHIVE_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}/lib")
set (CMAKE_LIBRARY_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}/lib")
if (NOT DEFINED CMAKE_ARCHIVE_OUTPUT_DIRECTORY)
set (CMAKE_ARCHIVE_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}/lib")
endif()
if (NOT DEFINED CMAKE_LIBRARY_OUTPUT_DIRECTORY)
set (CMAKE_LIBRARY_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}/lib")
endif()
# for diy_developer_flags
include(DIYCompilerFlags)
@ -152,6 +164,9 @@ function(add_diy_mpi_library use_mpi)
target_include_directories(${lib_name} SYSTEM PRIVATE ${diy_include_directories}) # for mpitypes.hpp
target_include_directories(${lib_name} SYSTEM PRIVATE ${diy_include_thirdparty_directories})
target_link_libraries(${lib_name} PRIVATE diy_developer_flags)
if (log)
target_link_libraries(${lib_name} PUBLIC spdlog::spdlog_header_only)
endif ()
if (use_mpi AND TARGET MPI::MPI_CXX)
target_link_libraries(${lib_name} PRIVATE MPI::MPI_CXX)
endif()
@ -195,6 +210,9 @@ target_include_directories(${diy_prefix} SYSTEM INTERFACE ${diy_include_thirdpar
if (diy_include_directories)
target_include_directories(${diy_prefix} SYSTEM INTERFACE ${diy_include_directories})
endif()
if (log)
target_link_libraries(${diy_prefix} INTERFACE spdlog::spdlog_header_only)
endif ()
target_link_libraries(${diy_prefix} INTERFACE ${diy_libraries})
if (NOT build_diy_mpi_lib)
if (mpi)
@ -224,6 +242,16 @@ elseif (${diy_prefix}mpi_nompi IN_LIST diy_targets)
endif()
list(APPEND libraries diy_developer_flags)
# Sanitizers
if (enable_sanitizers)
set(sanitizer "address" CACHE STRING "The sanitizer to use")
string (APPEND CMAKE_CXX_FLAGS " -fsanitize=${sanitizer}")
string (APPEND CMAKE_C_FLAGS " -fsanitize=${sanitizer}")
string (APPEND CMAKE_EXE_LINKER_FLAGS " -fsanitize=${sanitizer}")
string (APPEND CMAKE_SHARED_LINKER_FLAGS " -fsanitize=${sanitizer}")
endif ()
# enable testing and CDash dashboard submission
enable_testing ()
include (CTest)
@ -262,3 +290,7 @@ if (CMAKE_SOURCE_DIR STREQUAL PROJECT_SOURCE_DIR) # Only generate these files wh
install(EXPORT ${diy_export_name} NAMESPACE DIY:: DESTINATION "." FILE diy-targets.cmake)
install(FILES "${PROJECT_BINARY_DIR}/diy-config.cmake" DESTINATION ".")
endif()
if (python)
add_subdirectory(bindings/python)
endif (python)

@ -25,12 +25,10 @@ if (threads)
endif()
if (log)
find_path(SPDLOG_INCLUDE_DIR "spdlog/spdlog.h")
if (SPDLOG_INCLUDE_DIR STREQUAL "SPDLOG_INCLUDE_DIR-NOTFOUND")
find_package(spdlog ${_diy_find_quietly})
if (NOT spdlog_FOUND)
list(APPEND "${CMAKE_FIND_PACKAGE_NAME}_NOT_FOUND_MESSAGE" "SPDLOG not found")
set("${CMAKE_FIND_PACKAGE_NAME}_FOUND" 0)
else()
target_include_directories(DIY::@diy_prefix@ INTERFACE $<INSTALL_INTERFACE:${SPDLOG_INCLUDE_DIR}>)
endif()
endif()

@ -17,10 +17,10 @@ namespace diy
typedef std::vector<Element> Elements;
typedef critical_resource<int, recursive_mutex> CInt;
typedef void* (*Create)();
typedef void (*Destroy)(void*);
typedef detail::Save Save;
typedef detail::Load Load;
using Create = std::function<void*()>;
using Destroy = std::function<void(void*)>;
using Save = detail::Save;
using Load = detail::Load;
public:
Collection(Create create__,

@ -5,11 +5,13 @@ namespace diy
int from, to;
int nparts;
int round;
int nblobs;
};
struct Master::InFlightSend
{
std::shared_ptr<MemoryBuffer> message;
BinaryBlob blob;
mpi::request request;
MessageInfo info; // for debug purposes
@ -18,12 +20,18 @@ namespace diy
struct Master::InFlightRecv
{
MemoryBuffer message;
MessageInfo info { -1, -1, -1, -1 };
MessageInfo info { -1, -1, -1, -1, -1 };
bool done = false;
MemoryManagement mem;
inline bool recv(mpi::communicator& comm, const mpi::status& status);
inline void place(IncomingRound* in, bool unload, ExternalStorage* storage, IExchangeInfo* iexchange);
void reset() { *this = InFlightRecv(); }
void reset()
{
MemoryManagement mem_ = mem;
*this = InFlightRecv();
mem = mem_;
}
};
struct Master::InFlightRecvsMap: public std::map<int, InFlightRecv>
@ -111,7 +119,7 @@ recv(mpi::communicator& comm, const mpi::status& status)
result = true;
}
else
else if (info.nparts > 0)
{
size_t start_idx = message.buffer.size();
size_t count = status.count<char>();
@ -124,9 +132,24 @@ recv(mpi::communicator& comm, const mpi::status& status)
comm.recv(status.source(), status.tag(), window);
info.nparts--;
} else if (info.nblobs > 0)
{
size_t count = status.count<char>();
detail::VectorWindow<char> window;
char* buffer = mem.allocate(info.to, count);
window.begin = buffer;
window.count = count;
comm.recv(status.source(), status.tag(), window);
message.save_binary_blob(buffer, count, mem.deallocate);
info.nblobs--;
}
if (info.nparts == 0)
if (info.nparts == 0 && info.nblobs == 0)
done = true;
return result;

@ -1,3 +1,5 @@
#include <algorithm>
struct diy::Master::ProcessBlock
{
ProcessBlock(Master& master_,

@ -7,17 +7,17 @@
#include <algorithm>
#include "constants.h"
#include "thirdparty/chobo/small_vector.hpp"
#include "thirdparty/itlib/small_vector.hpp"
namespace diy
{
template<class Coordinate_, size_t static_size = VTKMDIY_MAX_DIM>
class DynamicPoint: public chobo::small_vector<Coordinate_, static_size>
class DynamicPoint: public itlib::small_vector<Coordinate_, static_size>
{
public:
using Coordinate = Coordinate_;
using Parent = chobo::small_vector<Coordinate_, static_size>;
using Parent = itlib::small_vector<Coordinate_, static_size>;
template<class U>
struct rebind { typedef DynamicPoint<U> type; };

@ -30,9 +30,9 @@ class SharedOutFile: public std::ostringstream
diy::mpi::gather(world_, contents, all_contents, root_);
// write the file serially
std::ofstream out(filename_);
std::ofstream fout(filename_);
for (auto& cntnts : all_contents)
out.write(cntnts.data(), cntnts.size());
fout.write(cntnts.data(), cntnts.size());
} else
diy::mpi::gather(world_, contents, root_);
}

@ -5,6 +5,8 @@
#include <direct.h>
#include <io.h>
#include <share.h>
#define NOMINMAX
#include <windows.h>
#else
#include <unistd.h> // mkstemp() on Mac
#include <dirent.h>

@ -55,8 +55,8 @@ set_logger(Args...)
#include <spdlog/sinks/null_sink.h>
#include <spdlog/sinks/stdout_sinks.h>
#include <spdlog/fmt/bundled/format.h>
#include <spdlog/fmt/bundled/ostream.h>
#include <spdlog/fmt/fmt.h>
#include <spdlog/fmt/ostr.h>
namespace diy
{

@ -10,6 +10,7 @@
#include <numeric>
#include <memory>
#include <chrono>
#include <climits>
#include "link.hpp"
#include "collection.hpp"
@ -28,6 +29,23 @@
namespace diy
{
struct MemoryManagement
{
using Allocate = std::function<char* (int, size_t)>;
using Deallocate = BinaryBlob::Deleter;
using MemCopy = std::function<void(char*, const char*, size_t)>;
MemoryManagement() = default;
MemoryManagement(Allocate allocate_, Deallocate deallocate_, MemCopy copy_):
allocate(allocate_), deallocate(deallocate_), copy(copy_) {}
Allocate allocate = [](int /* gid */, size_t n) { return new char[n]; };
Deallocate deallocate = [](const char* p) { delete[] p; };
MemCopy copy = [](char* dest, const char* src, size_t count) { std::memcpy(dest, src, count); };
};
// Stores and manages blocks; initiates serialization and communication when necessary.
//
// Provides a foreach function, which is meant as the main entry point.
@ -126,6 +144,8 @@ namespace diy
void unload(ExternalStorage* storage) { size_ = buffer_.size(); external_ = storage->put(buffer_); }
void load(ExternalStorage* storage) { storage->get(external_, buffer_); external_ = -1; }
MemoryBuffer& buffer() { return buffer_; }
private:
size_t size_;
int external_;
@ -147,7 +167,6 @@ namespace diy
};
typedef std::map<int, IncomingRound> IncomingRoundMap;
public:
/**
* \ingroup Initialization
@ -173,6 +192,7 @@ namespace diy
inline void destroy(int i) { if (blocks_.own()) blocks_.destroy(i); }
inline int add(int gid, void* b, Link* l); //!< add a block
inline int add(int gid, void* b, const Link& l){ return add(gid, b, l.clone()); }
inline void* release(int i); //!< release ownership of the block
//!< return the `i`-th block
@ -213,17 +233,17 @@ namespace diy
bool local(int gid__) const { return lids_.find(gid__) != lids_.end(); }
//! exchange the queues between all the blocks (collective operation)
inline void exchange(bool remote = false);
inline void exchange(bool remote = false, MemoryManagement mem = MemoryManagement());
//! nonblocking exchange of the queues between all the blocks
template<class Block>
void iexchange_(const ICallback<Block>& f);
void iexchange_(const ICallback<Block>& f, MemoryManagement mem);
template<class F>
void iexchange(const F& f)
void iexchange(const F& f, MemoryManagement mem = MemoryManagement())
{
using Block = typename detail::block_traits<F>::type;
iexchange_<Block>(f);
iexchange_<Block>(f, mem);
}
inline void process_collectives();
@ -283,29 +303,30 @@ namespace diy
public:
// Communicator functionality
inline void flush(bool remote = false); // makes sure all the serialized queues migrate to their target processors
inline void flush(bool remote, MemoryManagement mem = MemoryManagement()); // makes sure all the serialized queues migrate to their target processors
private:
// Communicator functionality
inline void comm_exchange(GidSendOrder& gid_order, IExchangeInfo* iex = 0);
inline void rcomm_exchange(); // possibly called in between block computations
inline void comm_exchange(GidSendOrder& gid_order, MemoryManagement mem, IExchangeInfo* iex = 0);
inline void rcomm_exchange(MemoryManagement mem); // possibly called in between block computations
inline bool nudge(IExchangeInfo* iex = 0);
inline void send_queue(int from_gid, int to_gid, int to_proc, QueueRecord& qr, bool remote, IExchangeInfo* iex);
inline void send_queue(int from_gid, int to_gid, int to_proc, QueueRecord& qr, bool remote, MemoryManagement mem, IExchangeInfo* iex);
inline void send_outgoing_queues(GidSendOrder& gid_order,
bool remote,
MemoryManagement mem,
IExchangeInfo* iex = 0);
inline void check_incoming_queues(IExchangeInfo* iex = 0);
inline void check_incoming_queues(MemoryManagement mem, IExchangeInfo* iex = 0);
inline GidSendOrder
order_gids();
inline void touch_queues();
inline void send_same_rank(int from, int to, QueueRecord& qr, IExchangeInfo* iex);
inline void send_same_rank(int from, int to, QueueRecord& qr, MemoryManagement mem, IExchangeInfo* iex);
inline void send_different_rank(int from, int to, int proc, QueueRecord& qr, bool remote, IExchangeInfo* iex);
inline InFlightRecv& inflight_recv(int proc);
inline InFlightSendsList& inflight_sends();
// iexchange commmunication
inline void icommunicate(IExchangeInfo* iex); // async communication
inline void icommunicate(IExchangeInfo* iex, MemoryManagement mem); // async communication
struct tags { enum {
queue,
@ -607,7 +628,7 @@ foreach_(const Callback<Block>& f, const Skip& skip)
void
diy::Master::
exchange(bool remote)
exchange(bool remote, MemoryManagement mem)
{
auto scoped = prof.scoped("exchange");
VTKMDIY_UNUSED(scoped);
@ -625,7 +646,7 @@ exchange(bool remote)
if (!remote)
touch_queues();
flush(remote);
flush(remote, mem);
log->debug("Finished exchange");
}
@ -658,7 +679,7 @@ touch_queues()
template<class Block>
void
diy::Master::
iexchange_(const ICallback<Block>& f)
iexchange_(const ICallback<Block>& f, MemoryManagement mem)
{
auto scoped = prof.scoped("iexchange");
VTKMDIY_UNUSED(scoped);
@ -685,11 +706,11 @@ iexchange_(const ICallback<Block>& f)
thread comm_thread;
if (threads() > 1)
comm_thread = thread([this,&iex]()
comm_thread = thread([this,&iex,mem]()
{
while(!iex.all_done())
{
icommunicate(&iex);
icommunicate(&iex, mem);
iex.control();
//std::this_thread::sleep_for(std::chrono::microseconds(1));
}
@ -713,7 +734,7 @@ iexchange_(const ICallback<Block>& f)
stats::Annotation::Guard g( stats::Annotation("diy.block").set(gid) );
if (threads() == 1)
icommunicate(&iex);
icommunicate(&iex, mem);
bool done = done_result[gid];
if (!done || !empty_incoming(gid))
{
@ -762,17 +783,17 @@ iexchange_(const ICallback<Block>& f)
/* Communicator */
void
diy::Master::
comm_exchange(GidSendOrder& gid_order, IExchangeInfo* iex)
comm_exchange(GidSendOrder& gid_order, MemoryManagement mem, IExchangeInfo* iex)
{
auto scoped = prof.scoped("comm-exchange");
VTKMDIY_UNUSED(scoped);
send_outgoing_queues(gid_order, false, iex);
send_outgoing_queues(gid_order, false, mem, iex);
while(nudge(iex)) // kick requests
;
check_incoming_queues(iex);
check_incoming_queues(mem, iex);
}
/* Remote communicator */
@ -803,7 +824,7 @@ comm_exchange(GidSendOrder& gid_order, IExchangeInfo* iex)
//
void
diy::Master::
rcomm_exchange()
rcomm_exchange(MemoryManagement mem)
{
bool done = false;
bool ibarr_act = false;
@ -814,12 +835,12 @@ rcomm_exchange()
while (!done)
{
send_outgoing_queues(gid_order, true, 0);
send_outgoing_queues(gid_order, true, mem, 0);
// kick requests
nudge();
check_incoming_queues();
check_incoming_queues(mem);
if (ibarr_act)
{
if (ibarr_req.test())
@ -877,7 +898,7 @@ order_gids()
// iexchange communicator
void
diy::Master::
icommunicate(IExchangeInfo* iex)
icommunicate(IExchangeInfo* iex, MemoryManagement mem)
{
auto scoped = prof.scoped("icommunicate");
VTKMDIY_UNUSED(scoped);
@ -887,7 +908,7 @@ icommunicate(IExchangeInfo* iex)
auto gid_order = order_gids();
// exchange
comm_exchange(gid_order, iex);
comm_exchange(gid_order, mem, iex);
// cleanup
@ -906,6 +927,7 @@ send_queue(int from_gid,
int to_proc,
QueueRecord& qr,
bool remote,
MemoryManagement mem,
IExchangeInfo* iex)
{
stats::Annotation::Guard gb( stats::Annotation("diy.block").set(from_gid) );
@ -917,7 +939,7 @@ send_queue(int from_gid,
log->debug("[{}] Sending queue: {} <- {} of size {}, iexchange = {}", comm_.rank(), to_gid, from_gid, qr.size(), iex ? 1 : 0);
if (to_proc == comm_.rank()) // sending to same rank, simply swap buffers
send_same_rank(from_gid, to_gid, qr, iex);
send_same_rank(from_gid, to_gid, qr, mem, iex);
else // sending to an actual message to a different rank
send_different_rank(from_gid, to_gid, to_proc, qr, remote, iex);
}
@ -926,6 +948,7 @@ void
diy::Master::
send_outgoing_queues(GidSendOrder& gid_order,
bool remote, // TODO: are remote and iexchange mutually exclusive? If so, use single enum?
MemoryManagement mem,
IExchangeInfo* iex)
{
auto scoped = prof.scoped("send-outgoing-queues");
@ -950,7 +973,7 @@ send_outgoing_queues(GidSendOrder& gid_order,
access.unlock(); // others can push on this queue, while we are working
assert(!qr.external());
log->debug("Processing queue: {} <- {} of size {}", to_gid, from, qr.size());
send_queue(from, to_gid, to_proc, qr, remote, iex);
send_queue(from, to_gid, to_proc, qr, remote, mem, iex);
access.lock();
}
}
@ -978,7 +1001,7 @@ send_outgoing_queues(GidSendOrder& gid_order,
// NB: send only front
auto& qr = access->front();
log->debug("Processing queue: {} <- {} of size {}", to_gid, from_gid, qr.size());
send_queue(from_gid, to_gid, to_proc, qr, remote, iex);
send_queue(from_gid, to_gid, to_proc, qr, remote, mem, iex);
access->pop_front();
}
}
@ -987,7 +1010,7 @@ send_outgoing_queues(GidSendOrder& gid_order,
void
diy::Master::
send_same_rank(int from, int to, QueueRecord& qr, IExchangeInfo*)
send_same_rank(int from, int to, QueueRecord& qr, MemoryManagement mem, IExchangeInfo*)
{
auto scoped = prof.scoped("send-same-rank");
@ -997,9 +1020,24 @@ send_same_rank(int from, int to, QueueRecord& qr, IExchangeInfo*)
auto access_incoming = current_incoming.map[to][from].access();
// save blobs to copy them explicitly
std::vector<BinaryBlob> blobs;
qr.buffer().blobs.swap(blobs);
qr.buffer().blob_position = 0;
access_incoming->emplace_back(std::move(qr));
QueueRecord& in_qr = access_incoming->back();
// copy blobs explicitly; we cannot just move them in place, since we don't
// own their memory and must guarantee that it's safe to free, once
// exchange() is done
for (BinaryBlob& blob : blobs)
{
char* p = mem.allocate(to, blob.size);
mem.copy(p, blob.pointer.get(), blob.size);
in_qr.buffer().save_binary_blob(p, blob.size, mem.deallocate);
}
if (!in_qr.external())
{
in_qr.reset();
@ -1029,7 +1067,7 @@ send_different_rank(int from, int to, int proc, QueueRecord& qr, bool remote, IE
// sending to a different rank
std::shared_ptr<MemoryBuffer> buffer = std::make_shared<MemoryBuffer>(qr.move());
MessageInfo info{from, to, 1, exchange_round_};
MessageInfo info{from, to, 1, exchange_round_, static_cast<int>(buffer->nblobs())};
// size fits in one message
if (Serialization<MemoryBuffer>::size(*buffer) + Serialization<MessageInfo>::size(info) <= MAX_MPI_MESSAGE_COUNT)
{
@ -1103,11 +1141,33 @@ send_different_rank(int from, int to, int proc, QueueRecord& qr, bool remote, IE
inflight_send.message = buffer;
}
} // large message broken into pieces
// send binary blobs
for (size_t i = 0; i < buffer->nblobs(); ++i)
{
auto blob = buffer->load_binary_blob();
assert(blob.size < MAX_MPI_MESSAGE_COUNT); // for now assume blobs are small enough that we don't need to break them into multiple parts
inflight_sends().emplace_back();
auto& inflight_send = inflight_sends().back();
inflight_send.info = info;
detail::VectorWindow<char> window;
window.begin = const_cast<char*>(blob.pointer.get());
window.count = blob.size;
if (remote || iex)
inflight_send.request = comm_.issend(proc, tags::queue, window);
else
inflight_send.request = comm_.isend(proc, tags::queue, window);
inflight_send.blob = std::move(blob);
}
}
void
diy::Master::
check_incoming_queues(IExchangeInfo* iex)
check_incoming_queues(MemoryManagement mem, IExchangeInfo* iex)
{
auto scoped = prof.scoped("check-incoming-queues");
VTKMDIY_UNUSED(scoped);
@ -1116,6 +1176,7 @@ check_incoming_queues(IExchangeInfo* iex)
while (ostatus)
{
InFlightRecv& ir = inflight_recv(ostatus->source());
ir.mem = mem;
if (iex)
iex->inc_work(); // increment work before sender's issend request can complete (so we are now responsible for the queue)
@ -1141,7 +1202,7 @@ check_incoming_queues(IExchangeInfo* iex)
void
diy::Master::
flush(bool remote)
flush(bool remote, MemoryManagement mem)
{
#ifdef VTKMDIY_DEBUG
time_type start = get_time();
@ -1155,13 +1216,13 @@ flush(bool remote)
if (remote)
rcomm_exchange();
rcomm_exchange(mem);
else
{
auto gid_order = order_gids();
do
{
comm_exchange(gid_order);
comm_exchange(gid_order, mem);
#ifdef VTKMDIY_DEBUG
time_type cur = get_time();

@ -1,6 +1,8 @@
#ifndef VTKMDIY_MPI_CONFIG_HPP
#define VTKMDIY_MPI_CONFIG_HPP
#include <utility>
/// We want to allow the use of `diy::mpi` in either header-only or library mode.
/// VTKMDIY_MPI_AS_LIB is defined when using library mode.
/// This file contains some configuration macros. To maintain backwards compatibility
@ -49,13 +51,26 @@ struct DIY_##mpitype { \
mpitype data; \
};
#define DEFINE_DIY_MPI_TYPE_MOVE(mpitype) \
struct DIY_##mpitype { \
DIY_##mpitype() = default; \
DIY_##mpitype(const mpitype&) = delete; \
DIY_##mpitype(mpitype&& obj) : data(std::move(obj)) {} \
DIY_##mpitype& operator=(const mpitype&) = delete; \
DIY_##mpitype& operator=(mpitype&& obj) { data = std::move(obj); return *this; } \
operator const mpitype&() const { return data; } \
void reset() { data = mpitype(); } \
private: \
mpitype data; \
};
DEFINE_DIY_MPI_TYPE(MPI_Comm)
DEFINE_DIY_MPI_TYPE(MPI_Datatype)
DEFINE_DIY_MPI_TYPE(MPI_Status)
DEFINE_DIY_MPI_TYPE(MPI_Request)
DEFINE_DIY_MPI_TYPE(MPI_Op)
DEFINE_DIY_MPI_TYPE(MPI_File)
DEFINE_DIY_MPI_TYPE(MPI_Win)
DEFINE_DIY_MPI_TYPE_MOVE(MPI_Win)
#undef DEFINE_DIY_MPI_TYPE

@ -18,13 +18,18 @@ inline mpitype& mpi_cast(DIY_##mpitype& obj) { return *reinterpret_cast<mpitype*
inline const mpitype& mpi_cast(const DIY_##mpitype& obj) { return *reinterpret_cast<const mpitype*>(&obj); } \
inline DIY_##mpitype make_DIY_##mpitype(const mpitype& obj) { DIY_##mpitype ret; mpi_cast(ret) = obj; return ret; }
#define DEFINE_MPI_CAST_MOVE(mpitype) \
inline mpitype& mpi_cast(DIY_##mpitype& obj) { return *reinterpret_cast<mpitype*>(&obj); } \
inline const mpitype& mpi_cast(const DIY_##mpitype& obj) { return *reinterpret_cast<const mpitype*>(&obj); } \
inline DIY_##mpitype make_DIY_##mpitype(mpitype&& obj) { DIY_##mpitype ret = std::move(obj); return ret; }
DEFINE_MPI_CAST(MPI_Comm)
DEFINE_MPI_CAST(MPI_Datatype)
DEFINE_MPI_CAST(MPI_Status)
DEFINE_MPI_CAST(MPI_Request)
DEFINE_MPI_CAST(MPI_Op)
DEFINE_MPI_CAST(MPI_File)
DEFINE_MPI_CAST(MPI_Win)
DEFINE_MPI_CAST_MOVE(MPI_Win)
#undef DEFINE_MPI_CAST

@ -1,6 +1,8 @@
#ifndef VTKMDIY_MPI_MPITYPES_H
#define VTKMDIY_MPI_MPITYPES_H
#include <cstring>
#cmakedefine TYPESIZE_MPI_Comm @TYPESIZE_MPI_Comm@
#cmakedefine TYPESIZE_MPI_Datatype @TYPESIZE_MPI_Datatype@
#cmakedefine TYPESIZE_MPI_Status @TYPESIZE_MPI_Status@
@ -18,6 +20,7 @@ namespace mpi
# define ASSERT_MPI_TYPE_SIZE(mpitype) static_assert(sizeof(mpitype) <= sizeof(DIY_##mpitype), "");
#else
# define ASSERT_MPI_TYPE_SIZE(mpitype)
struct MPI_Win;
#endif
#define DEFINE_DIY_MPI_TYPE(mpitype) \
@ -26,15 +29,41 @@ struct DIY_##mpitype { \
}; \
ASSERT_MPI_TYPE_SIZE(mpitype)
#define DEFINE_DIY_MPI_TYPE_MOVE(mpitype) \
struct DIY_##mpitype \
{ \
DIY_##mpitype() = default; \
DIY_##mpitype(const mpitype&) = delete; \
DIY_##mpitype& operator=(const mpitype&) = delete; \
DIY_##mpitype(mpitype&& obj) \
{ \
std::memcpy(data, &obj, TYPESIZE_##mpitype); \
std::memset(&obj, 0, TYPESIZE_##mpitype); \
} \
DIY_##mpitype& operator=(mpitype&& obj) \
{ \
std::memcpy(data, &obj, TYPESIZE_##mpitype); \
std::memset(&obj, 0, TYPESIZE_##mpitype); \
return *this; \
} \
operator const mpitype&() const { return *reinterpret_cast<const mpitype*>(data); } \
void reset() { std::memset(data, 0, TYPESIZE_##mpitype); } \
\
private: \
char* data[TYPESIZE_##mpitype]; \
}; \
ASSERT_MPI_TYPE_SIZE(mpitype);
DEFINE_DIY_MPI_TYPE(MPI_Comm)
DEFINE_DIY_MPI_TYPE(MPI_Datatype)
DEFINE_DIY_MPI_TYPE(MPI_Status)
DEFINE_DIY_MPI_TYPE(MPI_Request)
DEFINE_DIY_MPI_TYPE(MPI_Op)
DEFINE_DIY_MPI_TYPE(MPI_File)
DEFINE_DIY_MPI_TYPE(MPI_Win)
DEFINE_DIY_MPI_TYPE_MOVE(MPI_Win)
#undef DEFINE_DIY_MPI_TYPE
#undef DEFINE_DIY_MPI_TYPE_MOVE
#undef ASSERT_MPI_TYPE_SIZE
}

@ -1,6 +1,7 @@
#ifndef VTKMDIY_MPI_NO_MPI_HPP
#define VTKMDIY_MPI_NO_MPI_HPP
#include <cassert> // std::assert
#include <stdexcept> // std::runtime_error
@ -75,7 +76,39 @@ static const int MPI_MODE_APPEND = 128;
static const int MPI_MODE_SEQUENTIAL = 256;
/* define window type */
using MPI_Win = void*;
struct MPI_Win {
MPI_Win(): data_(0) {}
MPI_Win(void* data, bool owned = false): data_(uintptr_t(data) | (owned ? 0x1 : 0x0))
{
// We assume that pointers have at least some higher-byte alignment.
assert(!(uintptr_t(data) & 0x1));
}
void* data() const { return (void*)(data_ & ~0x1); }
bool owned() const { return data_ & 0x1; }
// We cannot copy owned windows.
MPI_Win(MPI_Win const&) = delete;
MPI_Win& operator=(MPI_Win const&) = delete;
// We cannot move owned windows (we don't know how to delete them in general).
MPI_Win(MPI_Win&& rhs): data_(rhs.data_)
{
rhs.data_ = 0;
}
MPI_Win& operator=(MPI_Win&& rhs)
{
if (this == &rhs)
return *this;
data_ = rhs.data_;
rhs.data_ = 0;
return *this;
}
private:
uintptr_t data_;
};
#define MPI_WIN_NULL MPI_Win()
/* window fence assertions */
static const int MPI_MODE_NOSTORE = 1;

@ -37,8 +37,8 @@ namespace mpi
const void* address() const { return buf_; }
private:
alignas(T) char buf_[sizeof(T)];
bool init_;
char buf_[sizeof(T)];
};
}
}

@ -22,6 +22,21 @@ EXPORT_MACRO const int nocheck = MPI_MODE_NOCHECK;
namespace detail
{
DIY_MPI_Win win_allocate(const communicator& comm, void** base, unsigned size, int disp)
{
#if VTKMDIY_HAS_MPI
DIY_MPI_Win win;
MPI_Win_allocate(size, disp, MPI_INFO_NULL, mpi_cast(comm.handle()), base, &mpi_cast(win));
return win;
#else
(void)comm; (void)disp;
*base = malloc(size);
auto mpi_win = MPI_Win(*base, true);
auto win = make_DIY_MPI_Win(std::move(mpi_win));
return win;
#endif
}
DIY_MPI_Win win_create(const communicator& comm, void* base, unsigned size, int disp)
{
#if VTKMDIY_HAS_MPI
@ -30,7 +45,8 @@ DIY_MPI_Win win_create(const communicator& comm, void* base, unsigned size, int
return win;
#else
(void)comm; (void)size; (void)disp;
auto win = make_DIY_MPI_Win(base);
auto mpi_win = MPI_Win(base);
auto win = make_DIY_MPI_Win(std::move(mpi_win));
return win;
#endif
}
@ -40,7 +56,9 @@ void win_free(DIY_MPI_Win& win)
#if VTKMDIY_HAS_MPI
MPI_Win_free(&mpi_cast(win));
#else
(void)win;
auto& mpi_win = mpi_cast(win);
if (mpi_win.owned())
free(mpi_win.data());
#endif
}
@ -49,7 +67,7 @@ void put(const DIY_MPI_Win& win, const void* data, int count, const datatype& ty
#if VTKMDIY_HAS_MPI
MPI_Put(data, count, mpi_cast(type.handle), rank, offset, count, mpi_cast(type.handle), mpi_cast(win));
#else
void* buffer = mpi_cast(win);
void* buffer = mpi_cast(win).data();
size_t size = mpi_cast(type.handle);
std::copy_n(static_cast<const int8_t*>(data),
size * static_cast<size_t>(count),
@ -63,7 +81,7 @@ void get(const DIY_MPI_Win& win, void* data, int count, const datatype& type, in
#if VTKMDIY_HAS_MPI
MPI_Get(data, count, mpi_cast(type.handle), rank, offset, count, mpi_cast(type.handle), mpi_cast(win));
#else
const void* buffer = mpi_cast(win);
const void* buffer = mpi_cast(win).data();
size_t size = mpi_cast(type.handle);
std::copy_n(static_cast<const int8_t*>(buffer) + (offset * size),
size * static_cast<size_t>(count),
@ -136,7 +154,7 @@ void fetch(const DIY_MPI_Win& win, void* result, const datatype& type, int rank,
MPI_Fetch_and_op(nullptr, result, mpi_cast(type.handle), rank, offset, MPI_NO_OP, mpi_cast(win));
#else
(void) rank;
const void* buffer = mpi_cast(win);
const void* buffer = mpi_cast(win).data();
size_t size = mpi_cast(type.handle);
std::copy_n(static_cast<const int8_t*>(buffer) + (offset * size),
size,
@ -150,7 +168,7 @@ void replace(const DIY_MPI_Win& win, const void* value, const datatype& type, in
MPI_Fetch_and_op(value, nullptr, mpi_cast(type.handle), rank, offset, MPI_REPLACE, mpi_cast(win));
#else
(void) rank;
void* buffer = mpi_cast(win);
void* buffer = mpi_cast(win).data();
size_t size = mpi_cast(type.handle);
std::copy_n(static_cast<const int8_t*>(value),
size,

@ -22,6 +22,9 @@ VTKMDIY_MPI_EXPORT extern const int nocheck;
namespace detail
{
VTKMDIY_MPI_EXPORT_FUNCTION
DIY_MPI_Win win_allocate(const communicator& comm, void** base, unsigned size, int disp);
VTKMDIY_MPI_EXPORT_FUNCTION
DIY_MPI_Win win_create(const communicator& comm, void* base, unsigned size, int disp);
@ -96,8 +99,8 @@ void flush_local_all(const DIY_MPI_Win& win);
inline ~window();
// moving is Ok
window(window&&) = default;
window& operator=(window&&) = default;
inline window(window&&);
inline window& operator=(window&&);
// cannot copy because of the buffer_
window(const window&) = delete;
@ -129,7 +132,7 @@ void flush_local_all(const DIY_MPI_Win& win);
inline void flush_local_all();
private:
std::vector<T> buffer_;
void* buffer_;
int rank_;
DIY_MPI_Win window_;
};
@ -140,16 +143,46 @@ void flush_local_all(const DIY_MPI_Win& win);
template<class T>
diy::mpi::window<T>::
window(const diy::mpi::communicator& comm, unsigned size):
buffer_(size), rank_(comm.rank())
buffer_(nullptr), rank_(comm.rank())
{
window_ = detail::win_create(comm, buffer_.data(), static_cast<unsigned>(buffer_.size()*sizeof(T)), static_cast<int>(sizeof(T)));
window_ = detail::win_allocate(comm, &buffer_, static_cast<unsigned>(size*sizeof(T)), static_cast<int>(sizeof(T)));
}
template<class T>
diy::mpi::window<T>::
~window()
{
detail::win_free(window_);
if (buffer_)
detail::win_free(window_);
}
template<class T>
diy::mpi::window<T>::
window(window&& rhs):
buffer_(rhs.buffer_), rank_(rhs.rank_), window_(std::move(rhs.window_))
{
rhs.buffer_ = nullptr;
rhs.window_.reset();
}
template<class T>
diy::mpi::window<T>&
diy::mpi::window<T>::
operator=(window&& rhs)
{
if (this == &rhs)
return *this;
if (buffer_)
detail::win_free(window_);
buffer_ = rhs.buffer_;
rhs.buffer_ = nullptr;
rank_ = rhs.rank_;
window_ = std::move(rhs.window_);
rhs.window_.reset();
return *this;
}
template<class T>

@ -105,6 +105,12 @@ namespace diy
void (*save)(BinaryBuffer&, const T&) = &::diy::save //!< optional serialization function
) const;
void inline enqueue_blob
(const BlockID& to, //!< target block (gid,proc)
const char* x, //!< pointer to the data
size_t n //!< size in data elements (eg. ints)
) const;
//! Dequeue data whose size can be determined automatically (e.g., STL vector) and that was
//! previously enqueued so that diy knows its size when it is received.
//! In this case, diy will allocate the receive buffer; the user does not need to do so.
@ -142,6 +148,9 @@ namespace diy
void (*load)(BinaryBuffer&, T&) = &::diy::load //!< optional serialization function
) const { dequeue(from.gid, x, n, load); }
BinaryBlob inline dequeue_blob
(int from) const;
template<class T>
EnqueueIterator<T> enqueuer(const T& x,
void (*save)(BinaryBuffer&, const T&) = &::diy::save ) const
@ -347,5 +356,20 @@ dequeue(int from, T* x, size_t n,
load(bb, x[i]);
}
void
diy::Master::Proxy::
enqueue_blob(const BlockID& to, const char* x, size_t n) const
{
BinaryBuffer& bb = outgoing_[to];
bb.save_binary_blob(x,n);
}
diy::BinaryBlob
diy::Master::Proxy::
dequeue_blob(int from) const
{
BinaryBuffer& bb = incoming_[from];
return bb.load_binary_blob();
}
#endif

@ -138,7 +138,7 @@ void reduce(Master& master, //!< master object
}
}
master.set_expected(expected);
master.flush();
master.flush(false);
}
// final round
log->debug("Round {}", round);

@ -1,22 +1,30 @@
#ifndef VTKMDIY_SERIALIZATION_HPP
#define VTKMDIY_SERIALIZATION_HPP
#include <vector>
#include <valarray>
#include <cassert>
#include <fstream>
#include <functional>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <fstream>
#include <tuple>
#include <type_traits> // this is used for a safety check for default serialization
#include <unordered_map>
#include <unordered_set>
#include <type_traits> // this is used for a safety check for default serialization
#include <cassert>
#include <valarray>
#include <vector>
namespace diy
{
struct BinaryBlob
{
using Deleter = std::function<void(const char[])>;
using Pointer = std::unique_ptr<const char[], Deleter>;
Pointer pointer;
size_t size;
};
//! A serialization buffer. \ingroup Serialization
struct BinaryBuffer
{
@ -25,10 +33,18 @@ namespace diy
virtual inline void append_binary(const char* x, size_t count) =0; //!< append `count` bytes from `x` to end of buffer
virtual void load_binary(char* x, size_t count) =0; //!< copy `count` bytes into `x` from the buffer
virtual void load_binary_back(char* x, size_t count) =0; //!< copy `count` bytes into `x` from the back of the buffer
virtual char* grow(size_t count) =0; //!< allocate enough space for `count` bytes and return the pointer to the beginning
virtual char* advance(size_t count) =0; //!< advance buffer position by `count` bytes and return the pointer to the beginning
virtual void save_binary_blob(const char*, size_t) =0;
virtual void save_binary_blob(const char*, size_t, BinaryBlob::Deleter) = 0;
virtual BinaryBlob load_binary_blob() =0;
};
struct MemoryBuffer: public BinaryBuffer
{
using Blob = BinaryBlob;
MemoryBuffer(size_t position_ = 0):
position(position_) {}
@ -41,6 +57,13 @@ namespace diy
virtual inline void append_binary(const char* x, size_t count) override; //!< append `count` bytes from `x` to end of buffer
virtual inline void load_binary(char* x, size_t count) override; //!< copy `count` bytes into `x` from the buffer
virtual inline void load_binary_back(char* x, size_t count) override; //!< copy `count` bytes into `x` from the back of the buffer
virtual inline char* grow(size_t count) override; //!< allocate enough space for `count` bytes and return the pointer to the beginning
virtual inline char* advance(size_t count) override; //!< advance buffer position by `count` bytes and return the pointer to the beginning
virtual inline void save_binary_blob(const char* x, size_t count) override;
virtual inline void save_binary_blob(const char* x, size_t count, Blob::Deleter deleter) override;
virtual inline Blob load_binary_blob() override;
size_t nblobs() const { return blobs.size(); }
void clear() { buffer.clear(); reset(); }
void wipe() { std::vector<char>().swap(buffer); reset(); }
@ -71,6 +94,9 @@ namespace diy
size_t position;
std::vector<char> buffer;
size_t blob_position = 0;
std::vector<Blob> blobs;
};
namespace detail
@ -140,7 +166,7 @@ namespace diy
template<class T>
void load_back(BinaryBuffer& bb, T& x) { bb.load_binary_back((char*) &x, sizeof(T)); }
//@}
//!@}
namespace detail
@ -444,17 +470,7 @@ void
diy::MemoryBuffer::
save_binary(const char* x, size_t count)
{
if (position + count > buffer.capacity())
{
double newsize = static_cast<double>(position + count) * growth_multiplier(); // if we have to grow, grow geometrically
buffer.reserve(static_cast<size_t>(newsize));
}
if (position + count > buffer.size())
buffer.resize(position + count);
std::copy_n(x, count, &buffer[position]);
position += count;
std::copy_n(x, count, grow(count));
}
void
@ -509,6 +525,58 @@ load_binary_back(char* x, size_t count)
buffer.resize(buffer.size() - count);
}
char*
diy::MemoryBuffer::
grow(size_t count)
{
if (position + count > buffer.capacity())
{
double newsize = static_cast<double>(position + count) * growth_multiplier(); // if we have to grow, grow geometrically
buffer.reserve(static_cast<size_t>(newsize));
}
if (position + count > buffer.size())
buffer.resize(position + count);
char* destination = &buffer[position];
position += count;
return destination;
}
char*
diy::MemoryBuffer::
advance(size_t count)
{
char* origin = &buffer[position];
position += count;
return origin;
}
void
diy::MemoryBuffer::
save_binary_blob(const char* x, size_t count)
{
// empty deleter means we don't take ownership
save_binary_blob(x, count, [](const char[]) {});
}
void
diy::MemoryBuffer::
save_binary_blob(const char* x, size_t count, Blob::Deleter deleter)
{
blobs.emplace_back(Blob { Blob::Pointer {x, deleter}, count });
}
diy::MemoryBuffer::Blob
diy::MemoryBuffer::
load_binary_blob()
{
return std::move(blobs[blob_position++]);
}
void
diy::MemoryBuffer::
copy(MemoryBuffer& from, MemoryBuffer& to)

@ -15,8 +15,8 @@ namespace diy
{
namespace detail
{
typedef void (*Save)(const void*, BinaryBuffer& buf);
typedef void (*Load)(void*, BinaryBuffer& buf);
using Save = std::function<void(const void*, BinaryBuffer&)>;
using Load = std::function<void(void*, BinaryBuffer&)>;
struct FileBuffer: public BinaryBuffer
{
@ -34,6 +34,16 @@ namespace diy
}
virtual inline void load_binary(char* x, size_t count) override { auto n = fread(x, 1, count, file); VTKMDIY_UNUSED(n);}
virtual inline void load_binary_back(char* x, size_t count) override { fseek(file, static_cast<long>(tail), SEEK_END); auto n = fread(x, 1, count, file); tail += count; fseek(file, static_cast<long>(head), SEEK_SET); VTKMDIY_UNUSED(n);}
virtual inline char* grow(size_t) override { throw std::runtime_error("Cannot grow a FileBuffer"); }
virtual inline char* advance(size_t) override { throw std::runtime_error("Cannot advance a FileBuffer"); }
// TODO: for now, we just throw, but obviously it should be possile to store binary blobs in a file; might want to fall back
using Blob = BinaryBlob;
virtual inline void save_binary_blob(const char*, size_t) override { throw std::runtime_error("Cannot save binary blobs in a FileBuffer"); }
virtual inline void save_binary_blob(const char*, size_t, Blob::Deleter) override { throw std::runtime_error("Cannot save binary blobs in a FileBuffer"); }
virtual inline Blob load_binary_blob() override { throw std::runtime_error("Cannot load binary blobs from a FileBuffer"); }
size_t size() const { return head; }

@ -41,6 +41,9 @@ namespace diy
#include "critical-resource.hpp"
#if !defined(VTKMDIY_NO_THREADS)
#include <memory> // for shared_ptr
template<class T, class U>
struct diy::concurrent_map
{

@ -3,6 +3,6 @@
#define VTKMDIY_VERSION_MAJOR 3
#define VTKMDIY_VERSION_MINOR 5
#define DIY_VERSION_PATCH dev1
#define VTKMDIY_VERSION_PATCH dev1
#endif