Merge branch 'upstream-diy' into correct_clang_6_warnings

* upstream-diy:
  diy 2018-05-02 (a2fdc50d)
This commit is contained in:
Robert Maynard 2018-05-02 14:28:46 -04:00
commit 03ce61f299
15 changed files with 970 additions and 351 deletions

@ -2,6 +2,9 @@
#define DIY_ASSIGNER_HPP
#include <vector>
#include <tuple>
#include "mpi.hpp" // needed for DynamicAssigner
namespace diy
{
@ -25,31 +28,43 @@ namespace diy
//! returns the total number of global blocks
int nblocks() const { return nblocks_; }
//! sets the total number of global blocks
void set_nblocks(int nblocks__) { nblocks_ = nblocks__; }
//! gets the local gids for a given process rank
virtual void local_gids(int rank, std::vector<int>& gids) const =0;
virtual void set_nblocks(int nblocks__) { nblocks_ = nblocks__; }
//! returns the process rank of the block with global id gid (need not be local)
virtual int rank(int gid) const =0;
//! batch lookup; returns the process ranks of the blocks with global id in the vector gids
inline
virtual std::vector<int>
ranks(const std::vector<int>& gids) const;
private:
int size_; // total number of ranks
int nblocks_; // total number of blocks
};
class ContiguousAssigner: public Assigner
class StaticAssigner: public Assigner
{
public:
/**
* \ingroup Assignment
* \brief Intermediate type to express assignment that cannot change; adds `local_gids` query method
*/
using Assigner::Assigner;
//! gets the local gids for a given process rank
virtual void local_gids(int rank, std::vector<int>& gids) const =0;
};
class ContiguousAssigner: public StaticAssigner
{
public:
/**
* \ingroup Assignment
* \brief Assigns blocks to processes in contiguous gid (block global id) order
*/
ContiguousAssigner(int size__, //!< total number of processes
int nblocks__ //!< total (global) number of blocks
):
Assigner(size__, nblocks__) {}
using StaticAssigner::StaticAssigner;
using Assigner::size;
using Assigner::nblocks;
using StaticAssigner::size;
using StaticAssigner::nblocks;
int rank(int gid) const override
{
@ -68,25 +83,65 @@ namespace diy
void local_gids(int rank, std::vector<int>& gids) const override;
};
class RoundRobinAssigner: public Assigner
class RoundRobinAssigner: public StaticAssigner
{
public:
/**
* \ingroup Assignment
* \brief Assigns blocks to processes in cyclic or round-robin gid (block global id) order
*/
RoundRobinAssigner(int size__, //!< total number of processes
int nblocks__ //!< total (global) number of blocks
):
Assigner(size__, nblocks__) {}
using StaticAssigner::StaticAssigner;
using Assigner::size;
using Assigner::nblocks;
using StaticAssigner::size;
using StaticAssigner::nblocks;
int rank(int gid) const override { return gid % size(); }
inline
void local_gids(int rank, std::vector<int>& gids) const override;
};
class DynamicAssigner: public Assigner
{
public:
DynamicAssigner(const mpi::communicator& comm, int size__, int nblocks__):
Assigner(size__, nblocks__),
comm_(comm),
div_(nblocks__ / size__ + ((nblocks__ % size__) == 0 ? 0 : 1)), // NB: same size window everywhere means the last rank may allocate extra space
rank_map_(comm_, div_) { rank_map_.lock_all(MPI_MODE_NOCHECK); }
~DynamicAssigner() { rank_map_.unlock_all(); }
inline
virtual void set_nblocks(int nblocks__) override;
inline
virtual int rank(int gid) const override;
inline
virtual std::vector<int>
ranks(const std::vector<int>& gids) const override;
inline std::tuple<bool,int>
get_rank(int& rk, int gid) const;
inline void set_rank(int rk, int gid, bool flush = true);
inline void set_ranks(const std::vector<std::tuple<int,int>>& rank_gids);
std::tuple<int,int>
rank_offset(int gid) const { return std::make_tuple(gid / div_, gid % div_); }
private:
mpi::communicator comm_;
int div_;
mutable mpi::window<int> rank_map_;
};
}
std::vector<int>
diy::Assigner::
ranks(const std::vector<int>& gids) const
{
std::vector<int> result(gids.size());
for (size_t i = 0; i < gids.size(); ++i)
result[i] = rank(gids[i]);
return result;
}
void
@ -123,4 +178,87 @@ local_gids(int rank_, std::vector<int>& gids) const
}
}
void
diy::DynamicAssigner::
set_nblocks(int nblocks__)
{
Assigner::set_nblocks(nblocks__);
div_ = nblocks() / size() + ((nblocks() % size()) == 0 ? 0 : 1);
rank_map_.unlock_all();
rank_map_ = mpi::window<int>(comm_, div_);
rank_map_.lock_all(MPI_MODE_NOCHECK);
}
std::tuple<bool,int>
diy::DynamicAssigner::
get_rank(int& rk, int gid) const
{
// TODO: check if the gid is in cache
int r,offset;
std::tie(r,offset) = rank_offset(gid);
rank_map_.get(rk, r, offset);
return std::make_tuple(false, r); // false indicates that the data wasn't read from cache
}
int
diy::DynamicAssigner::
rank(int gid) const
{
int rk;
auto cached_gidrk = get_rank(rk, gid);
int gidrk = std::get<1>(cached_gidrk);
rank_map_.flush_local(gidrk);
return rk;
}
std::vector<int>
diy::DynamicAssigner::
ranks(const std::vector<int>& gids) const
{
bool all_cached = true;
std::vector<int> result(gids.size());
for (size_t i = 0; i < gids.size(); ++i)
{
auto cached_gidrk = get_rank(result[i], gids[i]);
bool cached = std::get<0>(cached_gidrk);
all_cached &= cached;
}
if (!all_cached)
rank_map_.flush_local_all();
return result;
}
void
diy::DynamicAssigner::
set_rank(int rk, int gid, bool flush)
{
// TODO: update cache
int r,offset;
std::tie(r,offset) = rank_offset(gid);
rank_map_.put(rk, r, offset);
if (flush)
rank_map_.flush(r);
}
void
diy::DynamicAssigner::
set_ranks(const std::vector<std::tuple<int,int>>& rank_gids)
{
for (auto& rg : rank_gids)
set_rank(std::get<0>(rg), std::get<1>(rg), false);
rank_map_.flush_all();
}
#endif

@ -113,11 +113,11 @@ namespace detail
}
// Calls create(int gid, const Bounds& bounds, const Link& link)
void decompose(int rank, const Assigner& assigner, const Creator& create);
void decompose(int rank, const StaticAssigner& assigner, const Creator& create);
void decompose(int rank, const Assigner& assigner, Master& master, const Updater& update);
void decompose(int rank, const StaticAssigner& assigner, Master& master, const Updater& update);
void decompose(int rank, const Assigner& assigner, Master& master);
void decompose(int rank, const StaticAssigner& assigner, Master& master);
// find lowest gid that owns a particular point
template<class Point>
@ -178,10 +178,10 @@ namespace detail
* `create(...)` is called with each block assigned to the local domain. See [decomposition example](#decomposition-example).
*/
template<class Bounds>
void decompose(int dim,
int rank,
const Bounds& domain,
const Assigner& assigner,
void decompose(int dim,
int rank,
const Bounds& domain,
const StaticAssigner& assigner,
const typename RegularDecomposer<Bounds>::Creator& create,
typename RegularDecomposer<Bounds>::BoolVector share_face = typename RegularDecomposer<Bounds>::BoolVector(),
typename RegularDecomposer<Bounds>::BoolVector wrap = typename RegularDecomposer<Bounds>::BoolVector(),
@ -208,11 +208,11 @@ namespace detail
* `master` must have been supplied a create function in order for this function to work.
*/
template<class Bounds>
void decompose(int dim,
int rank,
const Bounds& domain,
const Assigner& assigner,
Master& master,
void decompose(int dim,
int rank,
const Bounds& domain,
const StaticAssigner& assigner,
Master& master,
typename RegularDecomposer<Bounds>::BoolVector share_face = typename RegularDecomposer<Bounds>::BoolVector(),
typename RegularDecomposer<Bounds>::BoolVector wrap = typename RegularDecomposer<Bounds>::BoolVector(),
typename RegularDecomposer<Bounds>::CoordinateVector ghosts = typename RegularDecomposer<Bounds>::CoordinateVector(),
@ -231,9 +231,9 @@ namespace detail
* @param master gets the blocks once this function returns
*/
inline
void decompose(int rank,
const Assigner& assigner,
Master& master)
void decompose(int rank,
const StaticAssigner& assigner,
Master& master)
{
std::vector<int> local_gids;
assigner.local_gids(rank, local_gids);
@ -252,11 +252,11 @@ namespace detail
* also communicates the total number of blocks
*/
template<class Bounds>
void decompose(int dim,
int rank,
const Bounds& domain,
const Assigner& assigner,
Master& master,
void decompose(int dim,
int rank,
const Bounds& domain,
const StaticAssigner& assigner,
Master& master,
const typename RegularDecomposer<Bounds>::Updater& update,
typename RegularDecomposer<Bounds>::BoolVector share_face =
typename RegularDecomposer<Bounds>::BoolVector(),
@ -279,7 +279,7 @@ namespace detail
template<class Bounds>
void
diy::RegularDecomposer<Bounds>::
decompose(int rank, const Assigner& assigner, Master& master)
decompose(int rank, const StaticAssigner& assigner, Master& master)
{
decompose(rank, assigner, [&master](int gid, const Bounds&, const Bounds&, const Bounds&, const Link& link)
{
@ -292,7 +292,7 @@ decompose(int rank, const Assigner& assigner, Master& master)
template<class Bounds>
void
diy::RegularDecomposer<Bounds>::
decompose(int rank, const Assigner& assigner, const Creator& create)
decompose(int rank, const StaticAssigner& assigner, const Creator& create)
{
std::vector<int> gids;
assigner.local_gids(rank, gids);
@ -380,7 +380,7 @@ decompose(int rank, const Assigner& assigner, const Creator& create)
template<class Bounds>
void
diy::RegularDecomposer<Bounds>::
decompose(int rank, const Assigner& assigner, Master& master, const Updater& update)
decompose(int rank, const StaticAssigner& assigner, Master& master, const Updater& update)
{
decompose(rank, assigner, [&master,&update](int gid, const Bounds& core, const Bounds& bounds, const Bounds& domain_, const Link& link)
{

@ -175,7 +175,7 @@ namespace io
void
read_blocks(const std::string& infilename, //!< input file name
const mpi::communicator& comm, //!< communicator
Assigner& assigner, //!< assigner object
StaticAssigner& assigner, //!< assigner object
Master& master, //!< master object
MemoryBuffer& extra, //!< user-defined metadata in file header
Master::LoadBlock load = 0) //!< load block function in case different than or unefined in the master
@ -246,7 +246,7 @@ namespace io
void
read_blocks(const std::string& infilename,
const mpi::communicator& comm,
Assigner& assigner,
StaticAssigner& assigner,
Master& master,
Master::LoadBlock load = 0)
{
@ -323,7 +323,7 @@ namespace split
void
read_blocks(const std::string& infilename, //!< input file name
const mpi::communicator& comm, //!< communicator
Assigner& assigner, //!< assigner object
StaticAssigner& assigner, //!< assigner object
Master& master, //!< master object
MemoryBuffer& extra, //!< user-defined metadata in file header
Master::LoadBlock load = 0) //!< block load function in case different than or undefined in master
@ -378,7 +378,7 @@ namespace split
void
read_blocks(const std::string& infilename,
const mpi::communicator& comm,
Assigner& assigner,
StaticAssigner& assigner,
Master& master,
Master::LoadBlock load = 0)
{

@ -155,7 +155,7 @@ write(const DiscreteBounds& bounds, const T* buffer, const DiscreteBounds& core,
}
MPI_Datatype fileblk, subbuffer;
MPI_Type_create_subarray(dim, (int*) &shape_[0], &subsizes[0], (int*) &bounds.min[0], MPI_ORDER_C, T_type, &fileblk);
MPI_Type_create_subarray(dim, (int*) &shape_[0], &subsizes[0], (int*) &core.min[0], MPI_ORDER_C, T_type, &fileblk);
MPI_Type_create_subarray(dim, (int*) &buffer_shape[0], &subsizes[0], (int*) &buffer_start[0], MPI_ORDER_C, T_type, &subbuffer);
MPI_Type_commit(&fileblk);
MPI_Type_commit(&subbuffer);

@ -21,6 +21,25 @@ namespace io
{
namespace utils
{
namespace detail
{
// internal method to split a filename into path and fullname.
inline void splitpath(const std::string& fullname, std::string& path, std::string& name)
{
auto pos = fullname.rfind('/');
if (pos != std::string::npos)
{
path = fullname.substr(0, pos);
name = fullname.substr(pos+1);
}
else
{
path = ".";
name = fullname;
}
}
} // namespace detail
/**
* returns true if the filename exists and refers to a directory.
*/
@ -68,23 +87,44 @@ namespace utils
inline int mkstemp(std::string& filename)
{
#if defined(_WIN32)
std::string path, name;
detail::splitpath(filename, path, name);
char temppath[MAX_PATH];
// note: GetTempFileName only uses the first 3 chars of the prefix (name)
// specified.
if (GetTempFileName(path.c_str(), name.c_str(), 0, temppath) == 0)
{
// failed! return invalid handle.
return -1;
}
int handle = -1;
// _sopen_s sets handle to -1 on error.
_sopen_s(&handle, temppath, _O_WRONLY | _O_CREAT | _O_BINARY, _SH_DENYNO, _S_IWRITE);
if (handle != -1)
filename = temppath;
return handle;
#else // defined(_WIN32)
const size_t slen = filename.size();
char *s_template = new char[filename.size() + 1];
std::copy_n(filename.c_str(), slen+1, s_template);
std::unique_ptr<char[]> s_template(new char[slen + 1]);
std::copy(filename.begin(), filename.end(), s_template.get());
s_template[slen] = 0;
int handle = -1;
#if defined(_WIN32)
if (_mktemp_s(s_template, slen+1) == 0) // <- returns 0 on success.
_sopen_s(&handle, s_template, _O_WRONLY | _O_CREAT | _O_BINARY, _SH_DENYNO, _S_IWRITE);
#elif defined(__MACH__)
#if defined(__MACH__)
// TODO: figure out how to open with O_SYNC
handle = ::mkstemp(s_template);
handle = ::mkstemp(s_template.get());
#else
handle = mkostemp(s_template, O_WRONLY | O_SYNC);
handle = mkostemp(s_template.get(), O_WRONLY | O_SYNC);
#endif
if (handle != -1)
filename = s_template;
filename = s_template.get();
return handle;
#endif // defined(_WIN32)
}
inline void close(int fd)
@ -99,16 +139,20 @@ namespace utils
inline void sync(int fd)
{
#if !defined(_WIN32)
fsync(fd);
#else
#if defined(_WIN32)
DIY_UNUSED(fd);
#else
fsync(fd);
#endif
}
inline bool remove(const std::string& filename)
{
#if defined(_WIN32)
return DeleteFile(filename.c_str()) != 0;
#else
return ::remove(filename.c_str()) == 0;
#endif
}
}
}

@ -223,7 +223,7 @@ namespace diy
bool local(int gid__) const { return lids_.find(gid__) != lids_.end(); }
//! exchange the queues between all the blocks (collective operation)
inline void exchange();
inline void exchange(bool remote = false);
inline void process_collectives();
inline
@ -285,12 +285,18 @@ namespace diy
public:
// Communicator functionality
inline void flush(); // makes sure all the serialized queues migrate to their target processors
inline void flush(bool remote = false); // makes sure all the serialized queues migrate to their target processors
private:
// Communicator functionality
inline void comm_exchange(ToSendList& to_send, int out_queues_limit); // possibly called in between block computations
inline void rcomm_exchange(ToSendList& to_send, int out_queues_limit); // possibly called in between block computations
inline bool nudge();
inline void send_outgoing_queues(ToSendList& to_send,
int out_queues_limit,
IncomingRound& current_incoming,
bool remote);
inline void check_incoming_queues();
void cancel_requests(); // TODO
@ -798,7 +804,7 @@ execute()
void
diy::Master::
exchange()
exchange(bool remote)
{
auto scoped = prof.scoped("exchange");
DIY_UNUSED(scoped);
@ -807,20 +813,29 @@ exchange()
log->debug("Starting exchange");
#ifdef DIY_NO_MPI
// remote doesn't need to do anything special if there is no mpi, but we also
// can't just use it because of the ibarrier
remote = false;
#endif
// make sure there is a queue for each neighbor
for (int i = 0; i < (int)size(); ++i)
if (!remote)
{
OutgoingQueues& outgoing_queues = outgoing_[gid(i)].queues;
OutQueueRecords& external_local = outgoing_[gid(i)].external_local;
if (outgoing_queues.size() < (size_t)link(i)->size())
for (unsigned j = 0; j < (unsigned)link(i)->size(); ++j)
for (int i = 0; i < (int)size(); ++i)
{
if (external_local.find(link(i)->target(j)) == external_local.end())
outgoing_queues[link(i)->target(j)]; // touch the outgoing queue, creating it if necessary
OutgoingQueues& outgoing_queues = outgoing_[gid(i)].queues;
OutQueueRecords& external_local = outgoing_[gid(i)].external_local;
if (outgoing_queues.size() < (size_t)link(i)->size())
for (unsigned j = 0; j < (unsigned)link(i)->size(); ++j)
{
if (external_local.find(link(i)->target(j)) == external_local.end())
outgoing_queues[link(i)->target(j)]; // touch the outgoing queue, creating it if necessary
}
}
}
flush();
flush(remote);
log->debug("Finished exchange");
}
@ -861,211 +876,298 @@ void
diy::Master::
comm_exchange(ToSendList& to_send, int out_queues_limit)
{
static const size_t MAX_MPI_MESSAGE_COUNT = INT_MAX;
IncomingRound &current_incoming = incoming_[exchange_round_];
// isend outgoing queues, up to the out_queues_limit
while(inflight_sends_.size() < (size_t)out_queues_limit && !to_send.empty())
{
int from = to_send.front();
// deal with external_local queues
for (OutQueueRecords::iterator it = outgoing_[from].external_local.begin(); it != outgoing_[from].external_local.end(); ++it)
{
int to = it->first.gid;
log->debug("Processing local queue: {} <- {} of size {}", to, from, it->second.size);
QueueRecord& in_qr = current_incoming.map[to].records[from];
bool in_external = block(lid(to)) == 0;
if (in_external)
in_qr = it->second;
else
{
// load the queue
in_qr.size = it->second.size;
in_qr.external = -1;
MemoryBuffer bb;
storage_->get(it->second.external, bb);
current_incoming.map[to].queues[from].swap(bb);
}
++current_incoming.received;
}
outgoing_[from].external_local.clear();
if (outgoing_[from].external != -1)
load_outgoing(from);
to_send.pop_front();
OutgoingQueues& outgoing = outgoing_[from].queues;
for (OutgoingQueues::iterator it = outgoing.begin(); it != outgoing.end(); ++it)
{
BlockID to_proc = it->first;
int to = to_proc.gid;
int proc = to_proc.proc;
log->debug("Processing queue: {} <- {} of size {}", to, from, outgoing_[from].queues[to_proc].size());
// There may be local outgoing queues that remained in memory
if (proc == comm_.rank()) // sending to ourselves: simply swap buffers
{
log->debug("Moving queue in-place: {} <- {}", to, from);
QueueRecord& in_qr = current_incoming.map[to].records[from];
bool in_external = block(lid(to)) == 0;
if (in_external)
{
log->debug("Unloading outgoing directly as incoming: {} <- {}", to, from);
MemoryBuffer& bb = it->second;
in_qr.size = bb.size();
if (queue_policy_->unload_incoming(*this, from, to, in_qr.size))
in_qr.external = storage_->put(bb);
else
{
MemoryBuffer& in_bb = current_incoming.map[to].queues[from];
in_bb.swap(bb);
in_bb.reset();
in_qr.external = -1;
}
} else // !in_external
{
log->debug("Swapping in memory: {} <- {}", to, from);
MemoryBuffer& bb = current_incoming.map[to].queues[from];
bb.swap(it->second);
bb.reset();
in_qr.size = bb.size();
in_qr.external = -1;
}
++current_incoming.received;
continue;
}
std::shared_ptr<MemoryBuffer> buffer = std::make_shared<MemoryBuffer>();
buffer->swap(it->second);
MessageInfo info{from, to, exchange_round_};
if (buffer->size() <= (MAX_MPI_MESSAGE_COUNT - sizeof(info)))
{
diy::save(*buffer, info);
inflight_sends_.emplace_back();
inflight_sends_.back().info = info;
inflight_sends_.back().request = comm_.isend(proc, tags::queue, buffer->buffer);
inflight_sends_.back().message = buffer;
}
else
{
int npieces = static_cast<int>((buffer->size() + MAX_MPI_MESSAGE_COUNT - 1)/MAX_MPI_MESSAGE_COUNT);
// first send the head
std::shared_ptr<MemoryBuffer> hb = std::make_shared<MemoryBuffer>();
diy::save(*hb, buffer->size());
diy::save(*hb, info);
inflight_sends_.emplace_back();
inflight_sends_.back().info = info;
inflight_sends_.back().request = comm_.isend(proc, tags::piece, hb->buffer);
inflight_sends_.back().message = hb;
// send the message pieces
size_t msg_buff_idx = 0;
for (int i = 0; i < npieces; ++i, msg_buff_idx += MAX_MPI_MESSAGE_COUNT)
{
int tag = (i == (npieces - 1)) ? tags::queue : tags::piece;
detail::VectorWindow<char> window;
window.begin = &buffer->buffer[msg_buff_idx];
window.count = std::min(MAX_MPI_MESSAGE_COUNT, buffer->size() - msg_buff_idx);
inflight_sends_.emplace_back();
inflight_sends_.back().info = info;
inflight_sends_.back().request = comm_.isend(proc, tag, window);
inflight_sends_.back().message = buffer;
}
}
}
}
send_outgoing_queues(to_send, out_queues_limit, current_incoming, false);
// kick requests
while(nudge());
// check incoming queues
mpi::optional<mpi::status> ostatus = comm_.iprobe(mpi::any_source, mpi::any_tag);
while(ostatus)
{
InFlightRecv &ir = inflight_recvs_[ostatus->source()];
check_incoming_queues();
}
if (ir.info.from == -1) // uninitialized
/* Remote communicator */
// pseudocode for rexchange protocol based on NBX algorithm of Hoefler et al.,
// Scalable Communication Protocols for Dynamic Sparse Data Exchange, 2010.
//
// rcomm_exchange()
// {
// while (!done)
// while (sends_in_flight < limit_on_queues_in_memory and there are unprocessed queues)
// q = next outgoing queue (going over the in-memory queues first)
// if (q not in memory)
// load q
// issend(q)
//
// test all requests
// if (iprobe)
// recv
// if (barrier_active)
// if (test barrier)
// done = true
// else
// if (all sends finished and all queues have been processed (including out-of-core queues))
// ibarrier
// barrier_active = true
// }
//
void
diy::Master::
rcomm_exchange(ToSendList& to_send, int out_queues_limit)
{
IncomingRound &current_incoming = incoming_[exchange_round_];
bool done = false;
bool ibarr_act = false;
mpi::request ibarr_req; // mpi request associated with ibarrier
while (!done)
{
MemoryBuffer bb;
comm_.recv(ostatus->source(), ostatus->tag(), bb.buffer);
send_outgoing_queues(to_send, out_queues_limit, current_incoming, true);
if (ostatus->tag() == tags::piece)
{
size_t msg_size;
diy::load(bb, msg_size);
diy::load(bb, ir.info);
// kick requests
nudge();
ir.message.buffer.reserve(msg_size);
}
else // tags::queue
{
diy::load_back(bb, ir.info);
ir.message.swap(bb);
}
}
else
{
size_t start_idx = ir.message.buffer.size();
size_t count = ostatus->count<char>();
ir.message.buffer.resize(start_idx + count);
detail::VectorWindow<char> window;
window.begin = &ir.message.buffer[start_idx];
window.count = count;
comm_.recv(ostatus->source(), ostatus->tag(), window);
}
if (ostatus->tag() == tags::queue)
{
size_t size = ir.message.size();
int from = ir.info.from;
int to = ir.info.to;
int external = -1;
assert(ir.info.round >= exchange_round_);
IncomingRound *in = &incoming_[ir.info.round];
bool unload_queue = ((ir.info.round == exchange_round_) ? (block(lid(to)) == 0) : (limit_ != -1)) &&
queue_policy_->unload_incoming(*this, from, to, size);
if (unload_queue)
{
log->debug("Directly unloading queue {} <- {}", to, from);
external = storage_->put(ir.message); // unload directly
}
else
{
in->map[to].queues[from].swap(ir.message);
in->map[to].queues[from].reset(); // buffer position = 0
}
in->map[to].records[from] = QueueRecord(size, external);
++(in->received);
ir = InFlightRecv(); // reset
}
ostatus = comm_.iprobe(mpi::any_source, mpi::any_tag);
}
check_incoming_queues();
if (ibarr_act)
{
if (ibarr_req.test())
done = true;
}
else
{
if (to_send.empty() && inflight_sends_.empty())
{
ibarr_req = comm_.ibarrier();
ibarr_act = true;
}
}
} // while !done
}
void
diy::Master::
flush()
send_outgoing_queues(
ToSendList& to_send,
int out_queues_limit,
IncomingRound& current_incoming,
bool remote)
{
static const size_t MAX_MPI_MESSAGE_COUNT = INT_MAX;
while (inflight_sends_.size() < (size_t)out_queues_limit && !to_send.empty())
{
int from = to_send.front();
// deal with external_local queues
for (OutQueueRecords::iterator it = outgoing_[from].external_local.begin(); it != outgoing_[from].external_local.end(); ++it)
{
int to = it->first.gid;
log->debug("Processing local queue: {} <- {} of size {}", to, from, it->second.size);
QueueRecord& in_qr = current_incoming.map[to].records[from];
bool in_external = block(lid(to)) == 0;
if (in_external)
in_qr = it->second;
else
{
// load the queue
in_qr.size = it->second.size;
in_qr.external = -1;
MemoryBuffer bb;
storage_->get(it->second.external, bb);
current_incoming.map[to].queues[from].swap(bb);
}
++current_incoming.received;
}
outgoing_[from].external_local.clear();
if (outgoing_[from].external != -1)
load_outgoing(from);
to_send.pop_front();
OutgoingQueues& outgoing = outgoing_[from].queues;
for (OutgoingQueues::iterator it = outgoing.begin(); it != outgoing.end(); ++it)
{
BlockID to_proc = it->first;
int to = to_proc.gid;
int proc = to_proc.proc;
log->debug("Processing queue: {} <- {} of size {}", to, from, outgoing_[from].queues[to_proc].size());
// There may be local outgoing queues that remained in memory
if (proc == comm_.rank()) // sending to ourselves: simply swap buffers
{
log->debug("Moving queue in-place: {} <- {}", to, from);
QueueRecord& in_qr = current_incoming.map[to].records[from];
bool in_external = block(lid(to)) == 0;
if (in_external)
{
log->debug("Unloading outgoing directly as incoming: {} <- {}", to, from);
MemoryBuffer& bb = it->second;
in_qr.size = bb.size();
if (queue_policy_->unload_incoming(*this, from, to, in_qr.size))
in_qr.external = storage_->put(bb);
else
{
MemoryBuffer& in_bb = current_incoming.map[to].queues[from];
in_bb.swap(bb);
in_bb.reset();
in_qr.external = -1;
}
} else // !in_external
{
log->debug("Swapping in memory: {} <- {}", to, from);
MemoryBuffer& bb = current_incoming.map[to].queues[from];
bb.swap(it->second);
bb.reset();
in_qr.size = bb.size();
in_qr.external = -1;
}
++current_incoming.received;
continue;
}
std::shared_ptr<MemoryBuffer> buffer = std::make_shared<MemoryBuffer>();
buffer->swap(it->second);
MessageInfo info{from, to, exchange_round_};
if (buffer->size() <= (MAX_MPI_MESSAGE_COUNT - sizeof(info)))
{
diy::save(*buffer, info);
inflight_sends_.emplace_back();
inflight_sends_.back().info = info;
if (remote)
inflight_sends_.back().request = comm_.issend(proc, tags::queue, buffer->buffer);
else
inflight_sends_.back().request = comm_.isend(proc, tags::queue, buffer->buffer);
inflight_sends_.back().message = buffer;
}
else
{
int npieces = static_cast<int>((buffer->size() + MAX_MPI_MESSAGE_COUNT - 1)/MAX_MPI_MESSAGE_COUNT);
// first send the head
std::shared_ptr<MemoryBuffer> hb = std::make_shared<MemoryBuffer>();
diy::save(*hb, buffer->size());
diy::save(*hb, info);
inflight_sends_.emplace_back();
inflight_sends_.back().info = info;
if (remote)
inflight_sends_.back().request = comm_.issend(proc, tags::piece, hb->buffer);
else
inflight_sends_.back().request = comm_.isend(proc, tags::piece, hb->buffer);
inflight_sends_.back().message = hb;
// send the message pieces
size_t msg_buff_idx = 0;
for (int i = 0; i < npieces; ++i, msg_buff_idx += MAX_MPI_MESSAGE_COUNT)
{
int tag = (i == (npieces - 1)) ? tags::queue : tags::piece;
detail::VectorWindow<char> window;
window.begin = &buffer->buffer[msg_buff_idx];
window.count = std::min(MAX_MPI_MESSAGE_COUNT, buffer->size() - msg_buff_idx);
inflight_sends_.emplace_back();
inflight_sends_.back().info = info;
if (remote)
inflight_sends_.back().request = comm_.issend(proc, tag, window);
else
inflight_sends_.back().request = comm_.isend(proc, tag, window);
inflight_sends_.back().message = buffer;
}
}
} // for (OutgoingQueues::iterator it ...
} // while (inflight_sends_.size() ...
}
void
diy::Master::
check_incoming_queues()
{
mpi::optional<mpi::status> ostatus = comm_.iprobe(mpi::any_source, mpi::any_tag);
while (ostatus)
{
InFlightRecv &ir = inflight_recvs_[ostatus->source()];
if (ir.info.from == -1) // uninitialized
{
MemoryBuffer bb;
comm_.recv(ostatus->source(), ostatus->tag(), bb.buffer);
if (ostatus->tag() == tags::piece)
{
size_t msg_size;
diy::load(bb, msg_size);
diy::load(bb, ir.info);
ir.message.buffer.reserve(msg_size);
}
else // tags::queue
{
diy::load_back(bb, ir.info);
ir.message.swap(bb);
}
}
else
{
size_t start_idx = ir.message.buffer.size();
size_t count = ostatus->count<char>();
ir.message.buffer.resize(start_idx + count);
detail::VectorWindow<char> window;
window.begin = &ir.message.buffer[start_idx];
window.count = count;
comm_.recv(ostatus->source(), ostatus->tag(), window);
}
// unload message
if (ostatus->tag() == tags::queue)
{
size_t size = ir.message.size();
int from = ir.info.from;
int to = ir.info.to;
int external = -1;
assert(ir.info.round >= exchange_round_);
IncomingRound *in = &incoming_[ir.info.round];
bool unload_queue = ((ir.info.round == exchange_round_) ? (block(lid(to)) == 0) : (limit_ != -1)) &&
queue_policy_->unload_incoming(*this, from, to, size);
if (unload_queue)
{
log->debug("Directly unloading queue {} <- {}", to, from);
external = storage_->put(ir.message); // unload directly
}
else
{
in->map[to].queues[from].swap(ir.message);
in->map[to].queues[from].reset(); // buffer position = 0
}
in->map[to].records[from] = QueueRecord(size, external);
++(in->received);
ir = InFlightRecv(); // reset
}
ostatus = comm_.iprobe(mpi::any_source, mpi::any_tag);
} // while ostatus
}
void
diy::Master::
flush(bool remote)
{
#ifdef DEBUG
time_type start = get_time();
@ -1096,20 +1198,25 @@ flush()
else
out_queues_limit = static_cast<int>(std::max((size_t) 1, to_send.size()/size()*limit_)); // average number of queues per block * in-memory block limit
do
if (remote)
rcomm_exchange(to_send, out_queues_limit);
else
{
comm_exchange(to_send, out_queues_limit);
do
{
comm_exchange(to_send, out_queues_limit);
#ifdef DEBUG
time_type cur = get_time();
if (cur - start > wait*1000)
{
log->warn("Waiting in flush [{}]: {} - {} out of {}",
comm_.rank(), inflight_sends_.size(), incoming_[exchange_round_].received, expected_);
wait *= 2;
}
time_type cur = get_time();
if (cur - start > wait*1000)
{
log->warn("Waiting in flush [{}]: {} - {} out of {}",
comm_.rank(), inflight_sends_.size(), incoming_[exchange_round_].received, expected_);
wait *= 2;
}
#endif
} while (!inflight_sends_.empty() || incoming_[exchange_round_].received < expected_ || !to_send.empty());
} while (!inflight_sends_.empty() || incoming_[exchange_round_].received < expected_ || !to_send.empty());
}
outgoing_.clear();

@ -16,6 +16,7 @@
#include "mpi/communicator.hpp"
#include "mpi/collectives.hpp"
#include "mpi/io.hpp"
#include "mpi/window.hpp"
namespace diy
{

@ -13,14 +13,10 @@ namespace mpi
template<class T, class Op>
struct Collectives
{
typedef detail::mpi_datatype<T> Datatype;
static void broadcast(const communicator& comm, T& x, int root)
{
#ifndef DIY_NO_MPI
MPI_Bcast(Datatype::address(x),
Datatype::count(x),
Datatype::datatype(), root, comm);
MPI_Bcast(address(x), count(x), datatype(x), root, comm);
#else
DIY_UNUSED(comm);
DIY_UNUSED(x);
@ -31,16 +27,13 @@ namespace mpi
static void broadcast(const communicator& comm, std::vector<T>& x, int root)
{
#ifndef DIY_NO_MPI
size_t sz = x.size();
int elem_size = Datatype::count(x[0]); // size of 1 vector element in units of mpi datatype
size_t sz = x.size();
Collectives<size_t, void*>::broadcast(comm, sz, root);
if (comm.rank() != root)
x.resize(sz);
MPI_Bcast(Datatype::address(x[0]),
elem_size * x.size(),
Datatype::datatype(), root, comm);
MPI_Bcast(address(x), count(x), datatype(x), root, comm);
#else
DIY_UNUSED(comm);
DIY_UNUSED(x);
@ -52,9 +45,7 @@ namespace mpi
{
#ifndef DIY_NO_MPI
request r;
MPI_Ibcast(Datatype::address(x),
Datatype::count(x),
Datatype::datatype(), root, comm, &r.r);
MPI_Ibcast(address(x), count(x), datatype(x), root, comm, &r.r);
return r;
#else
DIY_UNUSED(comm);
@ -68,13 +59,7 @@ namespace mpi
{
out.resize(comm.size());
#ifndef DIY_NO_MPI
MPI_Gather(Datatype::address(const_cast<T&>(in)),
Datatype::count(in),
Datatype::datatype(),
Datatype::address(out[0]),
Datatype::count(in),
Datatype::datatype(),
root, comm);
MPI_Gather(address(in), count(in), datatype(in), address(out), count(in), datatype(out), root, comm);
#else
DIY_UNUSED(comm);
DIY_UNUSED(root);
@ -86,21 +71,19 @@ namespace mpi
{
#ifndef DIY_NO_MPI
std::vector<int> counts(comm.size());
int elem_size = Datatype::count(in[0]); // size of 1 vector element in units of mpi datatype
Collectives<int,void*>::gather(comm, (int)(elem_size * in.size()), counts, root);
Collectives<int,void*>::gather(comm, count(in), counts, root);
std::vector<int> offsets(comm.size(), 0);
for (unsigned i = 1; i < offsets.size(); ++i)
offsets[i] = offsets[i-1] + counts[i-1];
int elem_size = count(in[0]); // size of 1 vector element in units of mpi datatype
std::vector<T> buffer((offsets.back() + counts.back()) / elem_size);
MPI_Gatherv(Datatype::address(const_cast<T&>(in[0])),
elem_size * in.size(),
Datatype::datatype(),
Datatype::address(buffer[0]),
MPI_Gatherv(address(in), count(in), datatype(in),
address(buffer),
&counts[0],
&offsets[0],
Datatype::datatype(),
datatype(buffer),
root, comm);
out.resize(comm.size());
@ -122,13 +105,7 @@ namespace mpi
static void gather(const communicator& comm, const T& in, int root)
{
#ifndef DIY_NO_MPI
MPI_Gather(Datatype::address(const_cast<T&>(in)),
Datatype::count(in),
Datatype::datatype(),
Datatype::address(const_cast<T&>(in)),
Datatype::count(in),
Datatype::datatype(),
root, comm);
MPI_Gather(address(in), count(in), datatype(in), address(in), count(in), datatype(in), root, comm);
#else
DIY_UNUSED(comm);
DIY_UNUSED(in);
@ -140,14 +117,11 @@ namespace mpi
static void gather(const communicator& comm, const std::vector<T>& in, int root)
{
#ifndef DIY_NO_MPI
int elem_size = Datatype::count(in[0]); // size of 1 vector element in units of mpi datatype
Collectives<int,void*>::gather(comm, (int)(elem_size * in.size()), root);
Collectives<int,void*>::gather(comm, count(in), root);
MPI_Gatherv(Datatype::address(const_cast<T&>(in[0])),
elem_size * in.size(),
Datatype::datatype(),
MPI_Gatherv(address(in), count(in), datatype(in),
0, 0, 0,
Datatype::datatype(),
datatype(in),
root, comm);
#else
DIY_UNUSED(comm);
@ -161,12 +135,8 @@ namespace mpi
{
out.resize(comm.size());
#ifndef DIY_NO_MPI
MPI_Allgather(Datatype::address(const_cast<T&>(in)),
Datatype::count(in),
Datatype::datatype(),
Datatype::address(out[0]),
Datatype::count(in),
Datatype::datatype(),
MPI_Allgather(address(in), count(in), datatype(in),
address(out), count(in), datatype(in),
comm);
#else
DIY_UNUSED(comm);
@ -178,21 +148,19 @@ namespace mpi
{
#ifndef DIY_NO_MPI
std::vector<int> counts(comm.size());
int elem_size = Datatype::count(in[0]); // size of 1 vector element in units of mpi datatype
Collectives<int,void*>::all_gather(comm, (int)(elem_size * in.size()), counts);
Collectives<int,void*>::all_gather(comm, count(in), counts);
std::vector<int> offsets(comm.size(), 0);
for (unsigned i = 1; i < offsets.size(); ++i)
offsets[i] = offsets[i-1] + counts[i-1];
int elem_size = count(in[0]); // size of 1 vector element in units of mpi datatype
std::vector<T> buffer((offsets.back() + counts.back()) / elem_size);
MPI_Allgatherv(Datatype::address(const_cast<T&>(in[0])),
elem_size * in.size(),
Datatype::datatype(),
Datatype::address(buffer[0]),
MPI_Allgatherv(address(in), count(in), datatype(in),
address(buffer),
&counts[0],
&offsets[0],
Datatype::datatype(),
datatype(buffer),
comm);
out.resize(comm.size());
@ -213,10 +181,7 @@ namespace mpi
static void reduce(const communicator& comm, const T& in, T& out, int root, const Op&)
{
#ifndef DIY_NO_MPI
MPI_Reduce(Datatype::address(const_cast<T&>(in)),
Datatype::address(out),
Datatype::count(in),
Datatype::datatype(),
MPI_Reduce(address(in), address(out), count(in), datatype(in),
detail::mpi_op<Op>::get(),
root, comm);
#else
@ -229,10 +194,7 @@ namespace mpi
static void reduce(const communicator& comm, const T& in, int root, const Op&)
{
#ifndef DIY_NO_MPI
MPI_Reduce(Datatype::address(const_cast<T&>(in)),
Datatype::address(const_cast<T&>(in)),
Datatype::count(in),
Datatype::datatype(),
MPI_Reduce(address(in), address(in), count(in), datatype(in),
detail::mpi_op<Op>::get(),
root, comm);
#else
@ -246,10 +208,7 @@ namespace mpi
static void all_reduce(const communicator& comm, const T& in, T& out, const Op&)
{
#ifndef DIY_NO_MPI
MPI_Allreduce(Datatype::address(const_cast<T&>(in)),
Datatype::address(out),
Datatype::count(in),
Datatype::datatype(),
MPI_Allreduce(address(in), address(out), count(in), datatype(in),
detail::mpi_op<Op>::get(),
comm);
#else
@ -262,11 +221,8 @@ namespace mpi
{
#ifndef DIY_NO_MPI
out.resize(in.size());
int elem_size = Datatype::count(in[0]); // size of 1 vector element in units of mpi datatype
MPI_Allreduce(Datatype::address(const_cast<T&>(in[0])),
Datatype::address(out[0]),
elem_size * in.size(),
Datatype::datatype(),
MPI_Allreduce(address(in), address(out), count(in),
datatype(in),
detail::mpi_op<Op>::get(),
comm);
#else
@ -278,10 +234,7 @@ namespace mpi
static void scan(const communicator& comm, const T& in, T& out, const Op&)
{
#ifndef DIY_NO_MPI
MPI_Scan(Datatype::address(const_cast<T&>(in)),
Datatype::address(out),
Datatype::count(in),
Datatype::datatype(),
MPI_Scan(address(in), address(out), count(in), datatype(in),
detail::mpi_op<Op>::get(),
comm);
#else
@ -293,14 +246,17 @@ namespace mpi
static void all_to_all(const communicator& comm, const std::vector<T>& in, std::vector<T>& out, int n = 1)
{
#ifndef DIY_NO_MPI
int elem_size = Datatype::count(in[0]); // size of 1 vector element in units of mpi datatype
// n specifies how many elements go to/from every process from every process;
// the sizes of in and out are expected to be n * comm.size()
int elem_size = count(in[0]); // size of 1 vector element in units of mpi datatype
// NB: this will fail if T is a vector
MPI_Alltoall(Datatype::address(const_cast<T&>(in[0])),
MPI_Alltoall(address(in),
elem_size * n,
Datatype::datatype(),
Datatype::address(out[0]),
datatype(in),
address(out),
elem_size * n,
Datatype::datatype(),
datatype(out),
comm);
#else
DIY_UNUSED(comm);

@ -63,6 +63,10 @@ namespace mpi
#endif
}
//! Non-blocking version of `ssend()`.
template<class T>
request issend(int dest, int tag, const T& x) const { return detail::issend<T>()(comm_, dest, tag, x); }
//! Non-blocking version of `recv()`.
//! If `T` is an `std::vector<...>`, its size must be big enough to accommodate the sent values.
template <class T>
@ -89,6 +93,10 @@ namespace mpi
inline
void barrier() const;
//! Nonblocking version of barrier
inline
request ibarrier() const;
operator MPI_Comm() const { return comm_; }
//! split
@ -181,3 +189,19 @@ split(int color, int key) const
return communicator();
#endif
}
diy::mpi::request
diy::mpi::communicator::
ibarrier() const
{
#ifndef DIY_NO_MPI
request r_;
MPI_Ibarrier(comm_, &r_.r);
return r_;
#else
// this is not the ideal fix; in principle we should just return a status
// that tests true, but this requires redesigning some parts of our no-mpi
// handling
DIY_UNSUPPORTED_MPI_CALL(MPI_Ibarrier);
#endif
}

@ -50,14 +50,44 @@ namespace detail
{
typedef std::vector<U> VecU;
static MPI_Datatype datatype() { return get_mpi_datatype<U>(); }
static MPI_Datatype datatype() { return mpi_datatype<U>::datatype(); }
static const void* address(const VecU& x) { return &x[0]; }
static void* address(VecU& x) { return &x[0]; }
static int count(const VecU& x) { return static_cast<int>(x.size()); }
static int count(const VecU& x) { return static_cast<int>(x.size()) * mpi_datatype<U>::count(x[0]); }
};
} // detail
template<class U>
static MPI_Datatype datatype(const U&)
{
using Datatype = detail::mpi_datatype<U>;
return Datatype::datatype();
}
template<class U>
static void* address(const U& x)
{
using Datatype = detail::mpi_datatype<U>;
return const_cast<void*>(Datatype::address(x));
}
template<class U>
static void* address(U& x)
{
using Datatype = detail::mpi_datatype<U>;
return Datatype::address(x);
}
template<class U>
static int count(const U& x)
{
using Datatype = detail::mpi_datatype<U>;
return Datatype::count(x);
}
} // mpi
} // diy
#endif

@ -74,3 +74,17 @@ static const int MPI_MODE_UNIQUE_OPEN = 32;
static const int MPI_MODE_EXCL = 64;
static const int MPI_MODE_APPEND = 128;
static const int MPI_MODE_SEQUENTIAL = 256;
/* define window type */
using MPI_Win = int;
/* window fence assertions */
static const int MPI_MODE_NOSTORE = 1;
static const int MPI_MODE_NOPUT = 2;
static const int MPI_MODE_NOPRECEDE = 4;
static const int MPI_MODE_NOSUCCEED = 8;
static const int MPI_MODE_NOCHECK = 16;
/* window lock types */
static const int MPI_LOCK_SHARED = 1;
static const int MPI_LOCK_EXCLUSIVE = 2;

@ -95,6 +95,30 @@ namespace detail
}
};
// issend
template< class T, class is_mpi_datatype_ = typename is_mpi_datatype<T>::type >
struct issend;
template<class T>
struct issend<T, true_type>
{
request operator()(MPI_Comm comm, int dest, int tag, const T& x) const
{
#ifndef DIY_NO_MPI
request r;
typedef mpi_datatype<T> Datatype;
MPI_Issend((void*) Datatype::address(x),
Datatype::count(x),
Datatype::datatype(),
dest, tag, comm, &r.r);
return r;
#else
(void) comm; (void) dest; (void) tag; (void) x;
DIY_UNSUPPORTED_MPI_CALL(MPI_Issend);
#endif
}
};
// irecv
template< class T, class is_mpi_datatype_ = typename is_mpi_datatype<T>::type >
struct irecv;

@ -0,0 +1,280 @@
#include <type_traits>
namespace diy
{
namespace mpi
{
//! \ingroup MPI
//! Simple wrapper around MPI window functions.
template<class T>
class window
{
static_assert(std::is_same<typename detail::is_mpi_datatype<T>::type, detail::true_type>::value, "Only MPI datatypes are allowed in windows");
public:
inline window(const communicator& comm, unsigned size);
inline ~window();
// moving is Ok
window(window&&) = default;
window& operator=(window&&) = default;
// cannot copy because of the buffer_
window(const window&) = delete;
window& operator=(const window&) = delete;
inline void put(const T& x, int rank, unsigned offset);
inline void put(const std::vector<T>& x, int rank, unsigned offset);
inline void get(T& x, int rank, unsigned offset);
inline void get(std::vector<T>& x, int rank, unsigned offset);
inline void fence(int assert);
inline void lock(int lock_type, int rank, int assert = 0);
inline void unlock(int rank);
inline void lock_all(int assert = 0);
inline void unlock_all();
inline void fetch_and_op(const T* origin, T* result, int rank, unsigned offset, MPI_Op op);
inline void fetch(T& result, int rank, unsigned offset);
inline void replace(const T& value, int rank, unsigned offset);
inline void sync();
inline void flush(int rank);
inline void flush_all();
inline void flush_local(int rank);
inline void flush_local_all();
private:
std::vector<T> buffer_;
int rank_;
MPI_Win window_;
};
} // mpi
} // diy
template<class T>
diy::mpi::window<T>::
window(const communicator& comm, unsigned size):
buffer_(size), rank_(comm.rank())
{
#ifndef DIY_NO_MPI
MPI_Win_create(buffer_.data(), buffer_.size()*sizeof(T), sizeof(T), MPI_INFO_NULL, comm, &window_);
#endif
}
template<class T>
diy::mpi::window<T>::
~window()
{
#ifndef DIY_NO_MPI
MPI_Win_free(&window_);
#endif
}
template<class T>
void
diy::mpi::window<T>::
put(const T& x, int rank, unsigned offset)
{
#ifndef DIY_NO_MPI
MPI_Put(address(x), count(x), datatype(x),
rank,
offset,
count(x), datatype(x),
window_);
#else
buffer_[offset] = x;
#endif
}
template<class T>
void
diy::mpi::window<T>::
put(const std::vector<T>& x, int rank, unsigned offset)
{
#ifndef DIY_NO_MPI
MPI_Put(address(x), count(x), datatype(x),
rank,
offset,
count(x), datatype(x),
window_);
#else
for (size_t i = 0; i < x.size(); ++i)
buffer_[offset + i] = x[i];
#endif
}
template<class T>
void
diy::mpi::window<T>::
get(T& x, int rank, unsigned offset)
{
#ifndef DIY_NO_MPI
MPI_Get(address(x), count(x), datatype(x),
rank,
offset,
count(x), datatype(x),
window_);
#else
x = buffer_[offset];
#endif
}
template<class T>
void
diy::mpi::window<T>::
get(std::vector<T>& x, int rank, unsigned offset)
{
#ifndef DIY_NO_MPI
MPI_Get(address(x), count(x), datatype(x),
rank,
offset,
count(x), datatype(x),
window_);
#else
for (size_t i = 0; i < x.size(); ++i)
x[i] = buffer_[offset + i];
#endif
}
template<class T>
void
diy::mpi::window<T>::
fence(int assert)
{
#ifndef DIY_NO_MPI
MPI_Win_fence(assert, window_);
#endif
}
template<class T>
void
diy::mpi::window<T>::
lock(int lock_type, int rank, int assert)
{
#ifndef DIY_NO_MPI
MPI_Win_lock(lock_type, rank, assert, window_);
#endif
}
template<class T>
void
diy::mpi::window<T>::
unlock(int rank)
{
#ifndef DIY_NO_MPI
MPI_Win_unlock(rank, window_);
#endif
}
template<class T>
void
diy::mpi::window<T>::
lock_all(int assert)
{
#ifndef DIY_NO_MPI
MPI_Win_lock_all(assert, window_);
#endif
}
template<class T>
void
diy::mpi::window<T>::
unlock_all()
{
#ifndef DIY_NO_MPI
MPI_Win_unlock_all(window_);
#endif
}
template<class T>
void
diy::mpi::window<T>::
fetch_and_op(const T* origin, T* result, int rank, unsigned offset, MPI_Op op)
{
#ifndef DIY_NO_MPI
MPI_Fetch_and_op(origin, result, datatype(*origin), rank, offset, op, window_);
#else
DIY_UNSUPPORTED_MPI_CALL(MPI_Fetch_and_op);
#endif
}
template<class T>
void
diy::mpi::window<T>::
fetch(T& result, int rank, unsigned offset)
{
#ifndef DIY_NO_MPI
T unused;
fetch_and_op(&unused, &result, rank, offset, MPI_NO_OP);
#else
result = buffer_[offset];
#endif
}
template<class T>
void
diy::mpi::window<T>::
replace(const T& value, int rank, unsigned offset)
{
#ifndef DIY_NO_MPI
T unused;
fetch_and_op(&value, &unused, rank, offset, MPI_REPLACE);
#else
buffer_[offset] = value;
#endif
}
template<class T>
void
diy::mpi::window<T>::
sync()
{
#ifndef DIY_NO_MPI
MPI_Win_sync(window_);
#endif
}
template<class T>
void
diy::mpi::window<T>::
flush(int rank)
{
#ifndef DIY_NO_MPI
MPI_Win_flush(rank, window_);
#endif
}
template<class T>
void
diy::mpi::window<T>::
flush_all()
{
#ifndef DIY_NO_MPI
MPI_Win_flush_all(window_);
#endif
}
template<class T>
void
diy::mpi::window<T>::
flush_local(int rank)
{
#ifndef DIY_NO_MPI
MPI_Win_flush_local(rank, window_);
#endif
}
template<class T>
void
diy::mpi::window<T>::
flush_local_all()
{
#ifndef DIY_NO_MPI
MPI_Win_flush_local_all(window_);
#endif
}

@ -187,7 +187,7 @@ factor(int k, int tot_b, std::vector<int>& kv)
if (rem % j == 0)
{
kv.push_back(j);
rem /= k;
rem /= j;
break;
}
}

@ -18,6 +18,7 @@ namespace diy
//! A serialization buffer. \ingroup Serialization
struct BinaryBuffer
{
virtual ~BinaryBuffer() =default;
virtual void save_binary(const char* x, size_t count) =0; //!< copy `count` bytes from `x` into the buffer
virtual void load_binary(char* x, size_t count) =0; //!< copy `count` bytes into `x` from the buffer
virtual void load_binary_back(char* x, size_t count) =0; //!< copy `count` bytes into `x` from the back of the buffer