diy 2018-02-15 (3c908c14)

Code extracted from:

    https://gitlab.kitware.com/third-party/diy2.git

at commit 3c908c14da757bff224b91d2b0ebd7db0af1f2ab (for/vtk-m).
This commit is contained in:
Diy Upstream 2018-02-15 09:39:20 -08:00 committed by Utkarsh Ayachit
parent 6fc0794c40
commit e734bab55a
30 changed files with 670 additions and 163 deletions

@ -23,23 +23,23 @@ namespace diy
typedef detail::Load Load;
public:
Collection(Create create,
Destroy destroy,
ExternalStorage* storage,
Save save,
Load load):
create_(create),
destroy_(destroy),
storage_(storage),
save_(save),
load_(load),
Collection(Create create__,
Destroy destroy__,
ExternalStorage* storage__,
Save save__,
Load load__):
create_(create__),
destroy_(destroy__),
storage_(storage__),
save_(save__),
load_(load__),
in_memory_(0) {}
size_t size() const { return elements_.size(); }
const CInt& in_memory() const { return in_memory_; }
inline void clear();
int add(Element e) { elements_.push_back(e); external_.push_back(-1); ++(*in_memory_.access()); return elements_.size() - 1; }
int add(Element e) { elements_.push_back(e); external_.push_back(-1); ++(*in_memory_.access()); return static_cast<int>(elements_.size()) - 1; }
void* release(int i) { void* e = get(i); elements_[i] = 0; return e; }
void* find(int i) const { return elements_[i]; } // possibly returns 0, if the element is unloaded
@ -81,7 +81,7 @@ clear()
{
if (own())
for (size_t i = 0; i < size(); ++i)
destroy(i);
destroy(static_cast<int>(i));
elements_.clear();
external_.clear();
*in_memory_.access() = 0;

@ -19,4 +19,6 @@ enum
DIY_T1 = 0x80 /* maximum-side t (later) neighbor */
};
#define DIY_UNUSED(expr) do { (void)(expr); } while (0)
#endif

@ -281,7 +281,7 @@ void
diy::RegularDecomposer<Bounds>::
decompose(int rank, const Assigner& assigner, Master& master)
{
decompose(rank, assigner, [&master](int gid, const Bounds& core, const Bounds& bounds, const Bounds& domain, const Link& link)
decompose(rank, assigner, [&master](int gid, const Bounds& core, const Bounds& bounds, const Bounds&, const Link& link)
{
void* b = master.create();
Link* l = new Link(link);
@ -314,49 +314,49 @@ decompose(int rank, const Assigner& assigner, const Creator& create)
while (!all(offsets, 1))
{
// next offset
int i;
for (i = 0; i < dim; ++i)
if (offsets[i] == 1)
offsets[i] = -1;
int j;
for (j = 0; j < dim; ++j)
if (offsets[j] == 1)
offsets[j] = -1;
else
break;
++offsets[i];
++offsets[j];
if (all(offsets, 0)) continue; // skip ourselves
DivisionsVector nhbr_coords(dim);
Direction dir, wrap_dir;
bool inbounds = true;
for (int i = 0; i < dim; ++i)
for (int k = 0; k < dim; ++k)
{
nhbr_coords[i] = coords[i] + offsets[i];
nhbr_coords[k] = coords[k] + offsets[k];
// wrap
if (nhbr_coords[i] < 0)
if (nhbr_coords[k] < 0)
{
if (wrap[i])
if (wrap[k])
{
nhbr_coords[i] = divisions[i] - 1;
wrap_dir[i] = -1;
nhbr_coords[k] = divisions[k] - 1;
wrap_dir[k] = -1;
}
else
inbounds = false;
}
if (nhbr_coords[i] >= divisions[i])
if (nhbr_coords[k] >= divisions[k])
{
if (wrap[i])
if (wrap[k])
{
nhbr_coords[i] = 0;
wrap_dir[i] = 1;
nhbr_coords[k] = 0;
wrap_dir[k] = 1;
}
else
inbounds = false;
}
// NB: this needs to match the addressing scheme in dir_t (in constants.h)
if (offsets[i] == -1 || offsets[i] == 1)
dir[i] = offsets[i];
if (offsets[k] == -1 || offsets[k] == 1)
dir[k] = offsets[k];
}
if (!inbounds) continue;
@ -382,12 +382,12 @@ void
diy::RegularDecomposer<Bounds>::
decompose(int rank, const Assigner& assigner, Master& master, const Updater& update)
{
decompose(rank, assigner, [&master,&update](int gid, const Bounds& core, const Bounds& bounds, const Bounds& domain, const Link& link)
decompose(rank, assigner, [&master,&update](int gid, const Bounds& core, const Bounds& bounds, const Bounds& domain_, const Link& link)
{
int lid = master.lid(gid);
Link* l = new Link(link);
master.replace_link(lid, l);
update(gid, lid, core, bounds, domain, *l);
update(gid, lid, core, bounds, domain_, *l);
});
}
@ -407,7 +407,7 @@ void
diy::RegularDecomposer<Bounds>::
gid_to_coords(int gid, DivisionsVector& coords, const DivisionsVector& divisions)
{
int dim = divisions.size();
int dim = static_cast<int>(divisions.size());
for (int i = 0; i < dim; ++i)
{
coords.push_back(gid % divisions[i]);
@ -421,7 +421,7 @@ diy::RegularDecomposer<Bounds>::
coords_to_gid(const DivisionsVector& coords, const DivisionsVector& divisions)
{
int gid = 0;
for (int i = coords.size() - 1; i >= 0; --i)
for (int i = static_cast<int>(coords.size()) - 1; i >= 0; --i)
{
gid *= divisions[i];
gid += coords[i];
@ -515,21 +515,21 @@ struct Div
template<class Bounds>
void
diy::RegularDecomposer<Bounds>::
fill_divisions(std::vector<int>& divisions) const
fill_divisions(std::vector<int>& divisions_) const
{
// prod = number of blocks unconstrained by user; c = number of unconstrained dimensions
int prod = 1; int c = 0;
for (int i = 0; i < dim; ++i)
if (divisions[i] != 0)
if (divisions_[i] != 0)
{
prod *= divisions[i];
prod *= divisions_[i];
++c;
}
if (nblocks % prod != 0)
throw std::runtime_error("Total number of blocks cannot be factored into provided divs");
if (c == (int) divisions.size()) // nothing to do; user provided all divs
if (c == (int) divisions_.size()) // nothing to do; user provided all divs
return;
// factor number of blocks left in unconstrained dimensions
@ -543,7 +543,7 @@ fill_divisions(std::vector<int>& divisions) const
// init missing_divs
for (int i = 0; i < dim; i++)
{
if (divisions[i] == 0)
if (divisions_[i] == 0)
{
Div<Coordinate> div;
div.dim = i;
@ -592,7 +592,7 @@ fill_divisions(std::vector<int>& divisions) const
// assign the divisions
for (size_t i = 0; i < missing_divs.size(); i++)
divisions[missing_divs[i].dim] = missing_divs[i].nb;
divisions_[missing_divs[i].dim] = missing_divs[i].nb;
}
template<class Bounds>

@ -299,8 +299,8 @@ add_samples(Block* b, const diy::ReduceProxy& srp, Samples& samples) const
Samples smpls;
srp.dequeue(nbr_gid, smpls);
for (size_t i = 0; i < smpls.size(); ++i)
samples.push_back(smpls[i]);
for (size_t j = 0; j < smpls.size(); ++j)
samples.push_back(smpls[j]);
}
}

@ -399,8 +399,8 @@ add_histogram(Block* b, const diy::ReduceProxy& srp, Histogram& histogram) const
Histogram hist;
srp.dequeue(nbr_gid, hist);
for (size_t i = 0; i < hist.size(); ++i)
histogram[i] += hist[i];
for (size_t j = 0; j < hist.size(); ++j)
histogram[j] += hist[j];
}
}

@ -51,10 +51,10 @@ struct SampleSort
{
if (skip_self && rp.in_link().target(i).gid == rp.gid()) continue;
MemoryBuffer& in = rp.incoming(rp.in_link().target(i).gid);
size_t sz = in.size() / sizeof(T);
size_t incoming_sz = in.size() / sizeof(T);
T* bg = (T*) &in.buffer[0];
std::copy(bg, bg + sz, &v[end]);
end += sz;
std::copy(bg, bg + incoming_sz, &v[end]);
end += incoming_sz;
}
} else
{

@ -132,7 +132,6 @@ namespace detail
{
MemoryBuffer& in = srp.incoming(srp.in_link().target(i).gid);
std::pair<int, int> range;
load(in, range);
std::pair<int, int> from_to;

@ -5,15 +5,12 @@
#include <algorithm>
#include <stdexcept>
#include <unistd.h>
#include <sys/stat.h>
#include <dirent.h>
#include "../mpi.hpp"
#include "../assigner.hpp"
#include "../master.hpp"
#include "../storage.hpp"
#include "../log.hpp"
#include "utils.hpp"
// Read and write collections of blocks using MPI-IO
namespace diy
@ -89,14 +86,13 @@ namespace io
// truncate the file
if (comm.rank() == 0)
truncate(outfilename.c_str(), 0);
diy::io::utils::truncate(outfilename.c_str(), 0);
mpi::io::file f(comm, outfilename, mpi::io::file::wronly | mpi::io::file::create);
offset_t start = 0, shift;
std::vector<GidOffsetCount> offset_counts;
unsigned i;
for (i = 0; i < max_size; ++i)
for (unsigned i = 0; i < max_size; ++i)
{
offset_t count = 0,
offset;
@ -140,12 +136,12 @@ namespace io
std::vector<GidOffsetCount> all_offset_counts;
for (unsigned i = 0; i < gathered_offset_count_buffers.size(); ++i)
{
MemoryBuffer oc_buffer; oc_buffer.buffer.swap(gathered_offset_count_buffers[i]);
std::vector<GidOffsetCount> offset_counts;
diy::load(oc_buffer, offset_counts);
for (unsigned j = 0; j < offset_counts.size(); ++j)
if (offset_counts[j].gid != -1)
all_offset_counts.push_back(offset_counts[j]);
MemoryBuffer per_rank_oc_buffer; per_rank_oc_buffer.buffer.swap(gathered_offset_count_buffers[i]);
std::vector<GidOffsetCount> per_rank_offset_counts;
diy::load(per_rank_oc_buffer, per_rank_offset_counts);
for (unsigned j = 0; j < per_rank_offset_counts.size(); ++j)
if (per_rank_offset_counts[j].gid != -1)
all_offset_counts.push_back(per_rank_offset_counts[j]);
}
std::sort(all_offset_counts.begin(), all_offset_counts.end()); // sorts by gid
@ -278,13 +274,11 @@ namespace split
size_t size = 0;
if (comm.rank() == 0)
{
struct stat s;
if (stat(outfilename.c_str(), &s) == 0)
{
if (S_ISDIR(s.st_mode))
proceed = true;
} else if (mkdir(outfilename.c_str(), 0755) == 0)
proceed = true;
if (diy::io::utils::is_directory(outfilename))
proceed = true;
else if (diy::io::utils::make_directory(outfilename))
proceed = true;
mpi::broadcast(comm, proceed, 0);
mpi::reduce(comm, (size_t) master.size(), size, 0, std::plus<size_t>());
} else
@ -338,12 +332,14 @@ namespace split
// load the extra buffer and size
size_t size;
std::string filename = infilename + "/extra";
::diy::detail::FileBuffer bb(fopen(filename.c_str(), "r"));
::diy::load(bb, size);
::diy::load(bb, extra);
extra.reset();
fclose(bb.file);
{
std::string filename = infilename + "/extra";
::diy::detail::FileBuffer bb(fopen(filename.c_str(), "r"));
::diy::load(bb, size);
::diy::load(bb, extra);
extra.reset();
fclose(bb.file);
}
// Get local gids from assigner
assigner.set_nblocks(size);

@ -71,6 +71,7 @@ void
diy::io::BOV::
read(const DiscreteBounds& bounds, T* buffer, bool collective, int chunk) const
{
#ifndef DIY_NO_MPI
int dim = shape_.size();
int total = 1;
std::vector<int> subsizes;
@ -110,6 +111,10 @@ read(const DiscreteBounds& bounds, T* buffer, bool collective, int chunk) const
if (chunk != 1)
MPI_Type_free(&T_type);
MPI_Type_free(&fileblk);
#else
(void) bounds; (void) buffer; (void) collective; (void)chunk;
DIY_UNSUPPORTED_MPI_CALL(diy::io::BOV::read);
#endif
}
template<class T>
@ -125,6 +130,7 @@ void
diy::io::BOV::
write(const DiscreteBounds& bounds, const T* buffer, const DiscreteBounds& core, bool collective, int chunk)
{
#ifndef DIY_NO_MPI
int dim = shape_.size();
std::vector<int> subsizes;
std::vector<int> buffer_shape, buffer_start;
@ -166,6 +172,10 @@ write(const DiscreteBounds& bounds, const T* buffer, const DiscreteBounds& core,
MPI_Type_free(&T_type);
MPI_Type_free(&fileblk);
MPI_Type_free(&subbuffer);
#else
(void) bounds; (void) buffer;(void) core; (void) collective; (void) chunk;
DIY_UNSUPPORTED_MPI_CALL(diy::io::bov::write);
#endif
}
#endif

@ -0,0 +1,113 @@
#ifndef DIY_IO_UTILS_HPP
#define DIY_IO_UTILS_HPP
#if defined(_WIN32)
#include <direct.h>
#include <io.h>
#else
#include <unistd.h> // mkstemp() on Mac
#include <dirent.h>
#endif
#include <cstdio> // remove()
#include <cstdlib> // mkstemp() on Linux
#include <sys/stat.h>
namespace diy
{
namespace io
{
namespace utils
{
/**
* returns true if the filename exists and refers to a directory.
*/
inline bool is_directory(const std::string& filename)
{
#if defined(_WIN32)
DWORD attr = GetFileAttributes(filename.c_str());
return (attr != INVALID_FILE_ATTRIBUTES) && ((attr & FILE_ATTRIBUTE_DIRECTORY) != 0);
#else
struct stat s;
return (stat(filename.c_str(), &s) == 0 && S_ISDIR(s.st_mode));
#endif
}
/**
* creates a new directory. returns true on success.
*/
inline bool make_directory(const std::string& filename)
{
#if defined(_WIN32)
return _mkdir(filename.c_str()) == 0;
#else
return mkdir(filename.c_str(), 0755) == 0;
#endif
}
/**
* truncates a file to the given length, if the file exists and can be opened
* for writing.
*/
inline void truncate(const std::string& filename, size_t length)
{
#if defined(_WIN32)
int fd = -1;
_sopen_s(&fd, filename.c_str(), _O_WRONLY | _O_BINARY, _SH_DENYNO, _S_IWRITE);
if (fd != -1)
{
_chsize_s(fd, 0);
_close(fd);
}
#else
::truncate(filename.c_str(), length);
#endif
}
inline int mkstemp(std::string& filename)
{
const size_t slen = filename.size();
char *s_template = new char[filename.size() + 1];
std::copy_n(filename.c_str(), slen+1, s_template);
int handle = -1;
#if defined(_WIN32)
if (_mktemp_s(s_template, slen+1) == 0) // <- returns 0 on success.
_sopen_s(&handle, s_template, _O_WRONLY | _O_CREAT | _O_BINARY, _SH_DENYNO, _S_IWRITE);
#elif defined(__MACH__)
// TODO: figure out how to open with O_SYNC
handle = ::mkstemp(s_template);
#else
handle = mkostemp(s_template, O_WRONLY | O_SYNC);
#endif
if (handle != -1)
filename = s_template;
return handle;
}
inline void close(int fd)
{
#if defined(_WIN32)
_close(fd);
#else
fsync(fd);
close(fd);
#endif
}
inline void sync(int fd)
{
#if !defined(_WIN32)
fsync(fd);
#endif
}
inline bool remove(const std::string& filename)
{
return ::remove(filename.c_str()) == 0;
}
}
}
}
#endif

@ -17,7 +17,7 @@ namespace diy
public:
virtual ~Link() {} // need to be able to delete derived classes
int size() const { return neighbors_.size(); }
int size() const { return static_cast<int>(neighbors_.size()); }
inline
int size_unique() const;
BlockID target(int i) const { return neighbors_[i]; }
@ -201,7 +201,7 @@ size_unique() const
{
std::vector<BlockID> tmp(neighbors_.begin(), neighbors_.end());
std::sort(tmp.begin(), tmp.end());
return std::unique(tmp.begin(), tmp.end()) - tmp.begin();
return static_cast<int>(std::unique(tmp.begin(), tmp.end()) - tmp.begin());
}
template<class Bounds>

@ -15,12 +15,12 @@ namespace spd
struct logger
{
// logger.info(cppformat_string, arg1, arg2, arg3, ...) call style
template <typename... Args> void trace(const char* fmt, const Args&... args) {}
template <typename... Args> void debug(const char* fmt, const Args&... args) {}
template <typename... Args> void info(const char* fmt, const Args&... args) {}
template <typename... Args> void warn(const char* fmt, const Args&... args) {}
template <typename... Args> void error(const char* fmt, const Args&... args) {}
template <typename... Args> void critical(const char* fmt, const Args&... args) {}
template <typename... Args> void trace(const char*, const Args&...) {}
template <typename... Args> void debug(const char*, const Args&...) {}
template <typename... Args> void info(const char*, const Args&...) {}
template <typename... Args> void warn(const char*, const Args&...) {}
template <typename... Args> void error(const char*, const Args&...) {}
template <typename... Args> void critical(const char*, const Args&...) {}
};
}

@ -52,7 +52,7 @@ namespace diy
using Skip = std::function<bool(int, const Master&)>;
struct SkipNoIncoming;
struct NeverSkip { bool operator()(int i, const Master& master) const { return false; } };
struct NeverSkip { bool operator()(int, const Master&) const { return false; } };
// Collection
typedef Collection::Create CreateBlock;
@ -80,8 +80,8 @@ namespace diy
struct QueueSizePolicy: public QueuePolicy
{
QueueSizePolicy(size_t sz): size(sz) {}
bool unload_incoming(const Master& master, int from, int to, size_t sz) const { return sz > size; }
bool unload_outgoing(const Master& master, int from, size_t sz) const { return sz > size*master.outgoing_count(from); }
bool unload_incoming(const Master&, int, int, size_t sz) const { return sz > size; }
bool unload_outgoing(const Master& master, int from, size_t sz) const { return sz > size*master.outgoing_count(from); }
size_t size;
};
@ -162,15 +162,15 @@ namespace diy
* serialize a block
*/
Master(mpi::communicator comm, //!< communicator
int threads = 1, //!< number of threads DIY can use
int limit = -1, //!< number of blocks to store in memory
CreateBlock create = 0, //!< block create function; master manages creation if create != 0
DestroyBlock destroy = 0, //!< block destroy function; master manages destruction if destroy != 0
ExternalStorage* storage = 0, //!< storage object (path, method, etc.) for storing temporary blocks being shuffled in/out of core
SaveBlock save = 0, //!< block save function; master manages saving if save != 0
LoadBlock load = 0, //!< block load function; master manages loading if load != 0
QueuePolicy* q_policy = new QueueSizePolicy(4096)): //!< policy for managing message queues specifies maximum size of message queues to keep in memory
blocks_(create, destroy, storage, save, load),
int threads = 1, //!< number of threads DIY can use
int limit = -1, //!< number of blocks to store in memory
CreateBlock create_ = 0, //!< block create function; master manages creation if create != 0
DestroyBlock destroy_ = 0, //!< block destroy function; master manages destruction if destroy != 0
ExternalStorage* storage = 0, //!< storage object (path, method, etc.) for storing temporary blocks being shuffled in/out of core
SaveBlock save = 0, //!< block save function; master manages saving if save != 0
LoadBlock load_ = 0, //!< block load function; master manages loading if load != 0
QueuePolicy* q_policy = new QueueSizePolicy(4096)): //!< policy for managing message queues specifies maximum size of message queues to keep in memory
blocks_(create_, destroy_, storage, save, load_),
queue_policy_(q_policy),
limit_(limit),
threads_(threads == -1 ? thread::hardware_concurrency() : threads),
@ -230,7 +230,7 @@ namespace diy
ProxyWithLink proxy(int i) const;
//! return the number of local blocks
unsigned size() const { return blocks_.size(); }
unsigned int size() const { return static_cast<unsigned int>(blocks_.size()); }
void* create() const { return blocks_.create(); }
// accessors
@ -666,7 +666,7 @@ add(int gid, void* b, Link* l)
links_.push_back(l);
gids_.push_back(gid);
int lid = gids_.size() - 1;
int lid = static_cast<int>(gids_.size()) - 1;
lids_[gid] = lid;
add_expected(l->size_unique()); // NB: at every iteration we expect a message from each unique neighbor
@ -703,6 +703,8 @@ diy::Master::
foreach_(const Callback<Block>& f, const Skip& skip)
{
auto scoped = prof.scoped("foreach");
DIY_UNUSED(scoped);
commands_.push_back(new Command<Block>(f, skip));
if (immediate())
@ -715,6 +717,7 @@ execute()
{
log->debug("Entered execute()");
auto scoped = prof.scoped("execute");
DIY_UNUSED(scoped);
//show_incoming_records();
// touch the outgoing and incoming queues as well as collectives to make sure they exist
@ -798,6 +801,8 @@ diy::Master::
exchange()
{
auto scoped = prof.scoped("exchange");
DIY_UNUSED(scoped);
execute();
log->debug("Starting exchange");
@ -1087,9 +1092,9 @@ flush()
// XXX: with queues we could easily maintain a specific space limit
int out_queues_limit;
if (limit_ == -1 || size() == 0)
out_queues_limit = to_send.size();
out_queues_limit = static_cast<int>(to_send.size());
else
out_queues_limit = std::max((size_t) 1, to_send.size()/size()*limit_); // average number of queues per block * in-memory block limit
out_queues_limit = static_cast<int>(std::max((size_t) 1, to_send.size()/size()*limit_)); // average number of queues per block * in-memory block limit
do
{
@ -1119,6 +1124,7 @@ diy::Master::
process_collectives()
{
auto scoped = prof.scoped("collectives");
DIY_UNUSED(scoped);
if (collectives_.empty())
return;
@ -1157,15 +1163,17 @@ diy::Master::
nudge()
{
bool success = false;
for (InFlightSendsList::iterator it = inflight_sends_.begin(); it != inflight_sends_.end(); ++it)
for (InFlightSendsList::iterator it = inflight_sends_.begin(); it != inflight_sends_.end();)
{
mpi::optional<mpi::status> ostatus = it->request.test();
if (ostatus)
{
success = true;
InFlightSendsList::iterator rm = it;
--it;
inflight_sends_.erase(rm);
it = inflight_sends_.erase(it);
}
else
{
++it;
}
}
return success;

@ -1,7 +1,11 @@
#ifndef DIY_MPI_HPP
#define DIY_MPI_HPP
#ifndef DIY_NO_MPI
#include <mpi.h>
#else
#include "mpi/no-mpi.hpp"
#endif
#include "mpi/constants.hpp"
#include "mpi/datatypes.hpp"
@ -21,12 +25,39 @@ namespace mpi
//! \ingroup MPI
struct environment
{
environment() { int argc = 0; char** argv; MPI_Init(&argc, &argv); }
environment(int argc, char* argv[]) { MPI_Init(&argc, &argv); }
~environment() { MPI_Finalize(); }
inline environment();
inline environment(int argc, char* argv[]);
inline ~environment();
};
}
}
diy::mpi::environment::
environment()
{
#ifndef DIY_NO_MPI
int argc = 0; char** argv;
MPI_Init(&argc, &argv);
#endif
}
diy::mpi::environment::
environment(int argc, char* argv[])
{
#ifndef DIY_NO_MPI
MPI_Init(&argc, &argv);
#else
(void) argc; (void) argv;
#endif
}
diy::mpi::environment::
~environment()
{
#ifndef DIY_NO_MPI
MPI_Finalize();
#endif
}
#endif

@ -16,13 +16,16 @@ namespace mpi
static void broadcast(const communicator& comm, T& x, int root)
{
#ifndef DIY_NO_MPI
MPI_Bcast(Datatype::address(x),
Datatype::count(x),
Datatype::datatype(), root, comm);
#endif
}
static void broadcast(const communicator& comm, std::vector<T>& x, int root)
{
#ifndef DIY_NO_MPI
size_t sz = x.size();
Collectives<size_t, void*>::broadcast(comm, sz, root);
@ -32,15 +35,21 @@ namespace mpi
MPI_Bcast(Datatype::address(x[0]),
x.size(),
Datatype::datatype(), root, comm);
#endif
}
static request ibroadcast(const communicator& comm, T& x, int root)
{
#ifndef DIY_NO_MPI
request r;
MPI_Ibcast(Datatype::address(x),
Datatype::count(x),
Datatype::datatype(), root, comm, &r.r);
return r;
#else
(void)x; (void)root;
DIY_UNSUPPORTED_MPI_CALL(MPI_Ibcast);
#endif
}
static void gather(const communicator& comm, const T& in, std::vector<T>& out, int root)
@ -48,6 +57,7 @@ namespace mpi
size_t s = comm.size();
s *= Datatype::count(in);
out.resize(s);
#ifndef DIY_NO_MPI
MPI_Gather(Datatype::address(const_cast<T&>(in)),
Datatype::count(in),
Datatype::datatype(),
@ -55,10 +65,15 @@ namespace mpi
Datatype::count(in),
Datatype::datatype(),
root, comm);
#else
(void) root;
out[0] = in;
#endif
}
static void gather(const communicator& comm, const std::vector<T>& in, std::vector< std::vector<T> >& out, int root)
{
#ifndef DIY_NO_MPI
std::vector<int> counts(comm.size());
Collectives<int,void*>::gather(comm, (int) in.size(), counts, root);
@ -84,10 +99,16 @@ namespace mpi
for (unsigned j = 0; j < (unsigned)counts[i]; ++j)
out[i].push_back(buffer[cur++]);
}
#else
(void) comm; (void) root;
out.resize(1);
out[0] = in;
#endif
}
static void gather(const communicator& comm, const T& in, int root)
{
#ifndef DIY_NO_MPI
MPI_Gather(Datatype::address(const_cast<T&>(in)),
Datatype::count(in),
Datatype::datatype(),
@ -95,10 +116,14 @@ namespace mpi
Datatype::count(in),
Datatype::datatype(),
root, comm);
#else
DIY_UNSUPPORTED_MPI_CALL("MPI_Gather");
#endif
}
static void gather(const communicator& comm, const std::vector<T>& in, int root)
{
#ifndef DIY_NO_MPI
Collectives<int,void*>::gather(comm, (int) in.size(), root);
MPI_Gatherv(Datatype::address(const_cast<T&>(in[0])),
@ -107,6 +132,9 @@ namespace mpi
0, 0, 0,
Datatype::datatype(),
root, comm);
#else
DIY_UNSUPPORTED_MPI_CALL("MPI_Gatherv");
#endif
}
static void all_gather(const communicator& comm, const T& in, std::vector<T>& out)
@ -114,6 +142,7 @@ namespace mpi
size_t s = comm.size();
s *= Datatype::count(in);
out.resize(s);
#ifndef DIY_NO_MPI
MPI_Allgather(Datatype::address(const_cast<T&>(in)),
Datatype::count(in),
Datatype::datatype(),
@ -121,10 +150,14 @@ namespace mpi
Datatype::count(in),
Datatype::datatype(),
comm);
#else
out[0] = in;
#endif
}
static void all_gather(const communicator& comm, const std::vector<T>& in, std::vector< std::vector<T> >& out)
{
#ifndef DIY_NO_MPI
std::vector<int> counts(comm.size());
Collectives<int,void*>::all_gather(comm, (int) in.size(), counts);
@ -150,40 +183,60 @@ namespace mpi
for (int j = 0; j < counts[i]; ++j)
out[i].push_back(buffer[cur++]);
}
#else
(void) comm;
out.resize(1);
out[0] = in;
#endif
}
static void reduce(const communicator& comm, const T& in, T& out, int root, const Op&)
{
#ifndef DIY_NO_MPI
MPI_Reduce(Datatype::address(const_cast<T&>(in)),
Datatype::address(out),
Datatype::count(in),
Datatype::datatype(),
detail::mpi_op<Op>::get(),
root, comm);
#else
(void) comm; (void) root;
out = in;
#endif
}
static void reduce(const communicator& comm, const T& in, int root, const Op& op)
{
#ifndef DIY_NO_MPI
MPI_Reduce(Datatype::address(const_cast<T&>(in)),
Datatype::address(const_cast<T&>(in)),
Datatype::count(in),
Datatype::datatype(),
detail::mpi_op<Op>::get(),
root, comm);
#else
DIY_UNSUPPORTED_MPI_CALL("MPI_Reduce");
#endif
}
static void all_reduce(const communicator& comm, const T& in, T& out, const Op&)
{
#ifndef DIY_NO_MPI
MPI_Allreduce(Datatype::address(const_cast<T&>(in)),
Datatype::address(out),
Datatype::count(in),
Datatype::datatype(),
detail::mpi_op<Op>::get(),
comm);
#else
(void) comm;
out = in;
#endif
}
static void all_reduce(const communicator& comm, const std::vector<T>& in, std::vector<T>& out, const Op&)
{
#ifndef DIY_NO_MPI
out.resize(in.size());
MPI_Allreduce(Datatype::address(const_cast<T&>(in[0])),
Datatype::address(out[0]),
@ -191,26 +244,39 @@ namespace mpi
Datatype::datatype(),
detail::mpi_op<Op>::get(),
comm);
#else
out = in;
#endif
}
static void scan(const communicator& comm, const T& in, T& out, const Op&)
{
#ifndef DIY_NO_MPI
MPI_Scan(Datatype::address(const_cast<T&>(in)),
Datatype::address(out),
Datatype::count(in),
Datatype::datatype(),
detail::mpi_op<Op>::get(),
comm);
#else
(void) comm;
out = in;
#endif
}
static void all_to_all(const communicator& comm, const std::vector<T>& in, std::vector<T>& out, int n = 1)
{
#ifndef DIY_NO_MPI
// NB: this will fail if T is a vector
MPI_Alltoall(Datatype::address(const_cast<T&>(in[0])), n,
Datatype::datatype(),
Datatype::address(out[0]), n,
Datatype::datatype(),
comm);
#else
(void) comm; (void) n;
out = in;
#endif
}
};

@ -8,8 +8,8 @@ namespace mpi
class communicator
{
public:
communicator(MPI_Comm comm = MPI_COMM_WORLD):
comm_(comm), rank_(0), size_(1) { if (comm != MPI_COMM_NULL) { MPI_Comm_rank(comm_, &rank_); MPI_Comm_size(comm_, &size_); } }
inline
communicator(MPI_Comm comm = MPI_COMM_WORLD);
int rank() const { return rank_; }
int size() const { return size_; }
@ -38,7 +38,8 @@ namespace mpi
request irecv(int source, int tag, T& x) const { return detail::irecv<T>()(comm_, source, tag, x); }
//! probe
status probe(int source, int tag) const { status s; MPI_Probe(source, tag, comm_, &s.s); return s; }
inline
status probe(int source, int tag) const;
//! iprobe
inline
@ -46,7 +47,8 @@ namespace mpi
iprobe(int source, int tag) const;
//! barrier
void barrier() const { MPI_Barrier(comm_); }
inline
void barrier() const;
operator MPI_Comm() const { return comm_; }
@ -58,15 +60,56 @@ namespace mpi
}
}
diy::mpi::communicator::
communicator(MPI_Comm comm)
:comm_(comm), rank_(0), size_(1)
{
#ifndef DIY_NO_MPI
if (comm != MPI_COMM_NULL)
{
MPI_Comm_rank(comm_, &rank_);
MPI_Comm_size(comm_, &size_);
}
#endif
}
diy::mpi::status
diy::mpi::communicator::
probe(int source, int tag) const
{
(void) source;
(void) tag;
#ifndef DIY_NO_MPI
status s;
MPI_Probe(source, tag, comm_, &s.s);
return s;
#else
DIY_UNSUPPORTED_MPI_CALL(MPI_Probe);
#endif
}
diy::mpi::optional<diy::mpi::status>
diy::mpi::communicator::
iprobe(int source, int tag) const
{
(void) source;
(void) tag;
#ifndef DIY_NO_MPI
status s;
int flag;
MPI_Iprobe(source, tag, comm_, &flag, &s.s);
if (flag)
return s;
#endif
return optional<status>();
}
void
diy::mpi::communicator::
barrier() const
{
#ifndef DIY_NO_MPI
MPI_Barrier(comm_);
#endif
}

@ -42,7 +42,7 @@ namespace detail
static MPI_Datatype datatype() { return get_mpi_datatype<T>(); }
static const void* address(const T& x) { return &x; }
static void* address(T& x) { return &x; }
static int count(const T& x) { return 1; }
static int count(const T&) { return 1; }
};
template<class U>
@ -53,7 +53,7 @@ namespace detail
static MPI_Datatype datatype() { return get_mpi_datatype<U>(); }
static const void* address(const VecU& x) { return &x[0]; }
static void* address(VecU& x) { return &x[0]; }
static int count(const VecU& x) { return x.size(); }
static int count(const VecU& x) { return static_cast<int>(x.size()); }
};
}

@ -1,6 +1,8 @@
#ifndef DIY_MPI_IO_HPP
#define DIY_MPI_IO_HPP
#include "../constants.h"
#include <vector>
#include <string>
@ -30,15 +32,12 @@ namespace io
};
public:
file(const communicator& comm,
const std::string& filename,
int mode):
comm_(comm) { MPI_File_open(comm, const_cast<char*>(filename.c_str()), mode, MPI_INFO_NULL, &fh); }
inline file(const communicator& comm, const std::string& filename, int mode);
~file() { close(); }
void close() { if (fh != MPI_FILE_NULL) MPI_File_close(&fh); }
inline void close();
offset size() const { offset sz; MPI_File_get_size(fh, &sz); return sz; }
void resize(offset size) { MPI_File_set_size(fh, size); }
inline offset size() const;
inline void resize(offset size);
inline void read_at(offset o, char* buffer, size_t size);
inline void read_at_all(offset o, char* buffer, size_t size);
@ -70,12 +69,67 @@ namespace io
}
}
diy::mpi::io::file::
file(const communicator& comm, const std::string& filename, int mode)
: comm_(comm)
{
#ifndef DIY_NO_MPI
MPI_File_open(comm, const_cast<char*>(filename.c_str()), mode, MPI_INFO_NULL, &fh);
#else
DIY_UNUSED(filename);
DIY_UNUSED(mode);
DIY_UNSUPPORTED_MPI_CALL(MPI_File_open);
#endif
}
void
diy::mpi::io::file::
close()
{
#ifndef DIY_NO_MPI
if (fh != MPI_FILE_NULL)
MPI_File_close(&fh);
#endif
}
diy::mpi::io::offset
diy::mpi::io::file::
size() const
{
#ifndef DIY_NO_MPI
offset sz;
MPI_File_get_size(fh, &sz);
return sz;
#else
DIY_UNSUPPORTED_MPI_CALL(MPI_File_get_size);
#endif
}
void
diy::mpi::io::file::
resize(diy::mpi::io::offset size)
{
#ifndef DIY_NO_MPI
MPI_File_set_size(fh, size);
#else
DIY_UNUSED(size);
DIY_UNSUPPORTED_MPI_CALL(MPI_File_set_size);
#endif
}
void
diy::mpi::io::file::
read_at(offset o, char* buffer, size_t size)
{
#ifndef DIY_NO_MPI
status s;
MPI_File_read_at(fh, o, buffer, size, detail::get_mpi_datatype<char>(), &s.s);
MPI_File_read_at(fh, o, buffer, static_cast<int>(size), detail::get_mpi_datatype<char>(), &s.s);
#else
DIY_UNUSED(o);
DIY_UNUSED(buffer);
DIY_UNUSED(size);
DIY_UNSUPPORTED_MPI_CALL(MPI_File_read_at);
#endif
}
template<class T>
@ -90,8 +144,15 @@ void
diy::mpi::io::file::
read_at_all(offset o, char* buffer, size_t size)
{
#ifndef DIY_NO_MPI
status s;
MPI_File_read_at_all(fh, o, buffer, size, detail::get_mpi_datatype<char>(), &s.s);
MPI_File_read_at_all(fh, o, buffer, static_cast<int>(size), detail::get_mpi_datatype<char>(), &s.s);
#else
DIY_UNUSED(o);
DIY_UNUSED(buffer);
DIY_UNUSED(size);
DIY_UNSUPPORTED_MPI_CALL(MPI_File_read_at_all);
#endif
}
template<class T>
@ -106,8 +167,15 @@ void
diy::mpi::io::file::
write_at(offset o, const char* buffer, size_t size)
{
#ifndef DIY_NO_MPI
status s;
MPI_File_write_at(fh, o, (void *)buffer, size, detail::get_mpi_datatype<char>(), &s.s);
MPI_File_write_at(fh, o, (void *)buffer, static_cast<int>(size), detail::get_mpi_datatype<char>(), &s.s);
#else
DIY_UNUSED(o);
DIY_UNUSED(buffer);
DIY_UNUSED(size);
DIY_UNSUPPORTED_MPI_CALL(MPI_File_write_at);
#endif
}
template<class T>
@ -122,8 +190,15 @@ void
diy::mpi::io::file::
write_at_all(offset o, const char* buffer, size_t size)
{
#ifndef DIY_NO_MPI
status s;
MPI_File_write_at_all(fh, o, (void *)buffer, size, detail::get_mpi_datatype<char>(), &s.s);
MPI_File_write_at_all(fh, o, (void *)buffer, static_cast<int>(size), detail::get_mpi_datatype<char>(), &s.s);
#else
DIY_UNUSED(o);
DIY_UNUSED(buffer);
DIY_UNUSED(size);
DIY_UNSUPPORTED_MPI_CALL(MPI_File_write_at_all);
#endif
}
template<class T>

@ -0,0 +1,70 @@
#ifndef DIY_MPI_NO_MPI_HPP
#define DIY_MPI_NO_MPI_HPP
#include <stdexcept> // std::runtime_error
static const int MPI_SUCCESS = 0;
static const int MPI_ANY_SOURCE = -1;
static const int MPI_ANY_TAG = -1;
/* define communicator type and constants */
using MPI_Comm = int;
static const MPI_Comm MPI_COMM_NULL = 0;
static const MPI_Comm MPI_COMM_WORLD = 1;
/* define datatypes */
using MPI_Datatype = size_t;
#define DIY_NO_MPI_DATATYPE(cpp_type, mpi_type) \
static const MPI_Datatype mpi_type = sizeof(cpp_type);
DIY_NO_MPI_DATATYPE(char, MPI_BYTE);
DIY_NO_MPI_DATATYPE(int, MPI_INT);
DIY_NO_MPI_DATATYPE(unsigned, MPI_UNSIGNED);
DIY_NO_MPI_DATATYPE(long, MPI_LONG);
DIY_NO_MPI_DATATYPE(unsigned long, MPI_UNSIGNED_LONG);
DIY_NO_MPI_DATATYPE(long long, MPI_LONG_LONG_INT);
DIY_NO_MPI_DATATYPE(unsigned long long, MPI_UNSIGNED_LONG_LONG);
DIY_NO_MPI_DATATYPE(float, MPI_FLOAT);
DIY_NO_MPI_DATATYPE(double, MPI_DOUBLE);
#endif
/* status type */
struct MPI_Status
{
/* These fields are publicly defined in the MPI specification.
User applications may freely read from these fields. */
int MPI_SOURCE;
int MPI_TAG;
int MPI_ERROR;
};
/* define MPI_Request */
using MPI_Request = int;
#define DIY_UNSUPPORTED_MPI_CALL(name) \
throw std::runtime_error("`" #name "` not supported when DIY_NO_MPI is defined.");
/* define operations */
using MPI_Op = int;
static const MPI_Op MPI_MAX = 0;
static const MPI_Op MPI_MIN = 0;
static const MPI_Op MPI_SUM = 0;
static const MPI_Op MPI_PROD = 0;
static const MPI_Op MPI_LAND = 0;
static const MPI_Op MPI_LOR = 0;
/* mpi i/o stuff */
using MPI_Offset = size_t;
using MPI_File = int;
static const MPI_File MPI_FILE_NULL = 0;
static const int MPI_MODE_CREATE = 1;
static const int MPI_MODE_RDONLY = 2;
static const int MPI_MODE_WRONLY = 4;
static const int MPI_MODE_RDWR = 8;
static const int MPI_MODE_DELETE_ON_CLOSE = 16;
static const int MPI_MODE_UNIQUE_OPEN = 32;
static const int MPI_MODE_EXCL = 64;
static const int MPI_MODE_APPEND = 128;
static const int MPI_MODE_SEQUENTIAL = 256;

@ -15,11 +15,16 @@ namespace detail
{
void operator()(MPI_Comm comm, int dest, int tag, const T& x) const
{
#ifndef DIY_NO_MPI
typedef mpi_datatype<T> Datatype;
MPI_Send((void*) Datatype::address(x),
Datatype::count(x),
Datatype::datatype(),
dest, tag, comm);
#else
(void) comm; (void) dest; (void) tag; (void) x;
DIY_UNSUPPORTED_MPI_CALL(MPI_Send);
#endif
}
};
@ -32,6 +37,7 @@ namespace detail
{
status operator()(MPI_Comm comm, int source, int tag, T& x) const
{
#ifndef DIY_NO_MPI
typedef mpi_datatype<T> Datatype;
status s;
MPI_Recv((void*) Datatype::address(x),
@ -39,6 +45,10 @@ namespace detail
Datatype::datatype(),
source, tag, comm, &s.s);
return s;
#else
(void) comm; (void) source; (void) tag; (void) x;
DIY_UNSUPPORTED_MPI_CALL(MPI_Recv);
#endif
}
};
@ -47,12 +57,17 @@ namespace detail
{
status operator()(MPI_Comm comm, int source, int tag, std::vector<U>& x) const
{
#ifndef DIY_NO_MPI
status s;
MPI_Probe(source, tag, comm, &s.s);
x.resize(s.count<U>());
MPI_Recv(&x[0], x.size(), get_mpi_datatype<U>(), source, tag, comm, &s.s);
MPI_Recv(&x[0], static_cast<int>(x.size()), get_mpi_datatype<U>(), source, tag, comm, &s.s);
return s;
#else
(void) comm; (void) source; (void) tag; (void) x;
DIY_UNSUPPORTED_MPI_CALL(MPI_Recv);
#endif
}
};
@ -65,6 +80,7 @@ namespace detail
{
request operator()(MPI_Comm comm, int dest, int tag, const T& x) const
{
#ifndef DIY_NO_MPI
request r;
typedef mpi_datatype<T> Datatype;
MPI_Isend((void*) Datatype::address(x),
@ -72,6 +88,10 @@ namespace detail
Datatype::datatype(),
dest, tag, comm, &r.r);
return r;
#else
(void) comm; (void) dest; (void) tag; (void) x;
DIY_UNSUPPORTED_MPI_CALL(MPI_Isend);
#endif
}
};
@ -84,6 +104,7 @@ namespace detail
{
request operator()(MPI_Comm comm, int source, int tag, T& x) const
{
#ifndef DIY_NO_MPI
request r;
typedef mpi_datatype<T> Datatype;
MPI_Irecv(Datatype::address(x),
@ -91,6 +112,10 @@ namespace detail
Datatype::datatype(),
source, tag, comm, &r.r);
return r;
#else
(void) comm; (void) source; (void) tag; (void) x;
DIY_UNSUPPORTED_MPI_CALL(MPI_Irecv);
#endif
}
};
}

@ -4,23 +4,47 @@ namespace mpi
{
struct request
{
status wait() { status s; MPI_Wait(&r, &s.s); return s; }
inline
status wait();
inline
optional<status> test();
void cancel() { MPI_Cancel(&r); }
inline
void cancel();
MPI_Request r;
};
}
}
diy::mpi::status
diy::mpi::request::wait()
{
#ifndef DIY_NO_MPI
status s;
MPI_Wait(&r, &s.s);
return s;
#else
DIY_UNSUPPORTED_MPI_CALL(diy::mpi::request::wait);
#endif
}
diy::mpi::optional<diy::mpi::status>
diy::mpi::request::test()
{
#ifndef DIY_NO_MPI
status s;
int flag;
MPI_Test(&r, &flag, &s.s);
if (flag)
return s;
#endif
return optional<status>();
}
void
diy::mpi::request::cancel()
{
#ifndef DIY_NO_MPI
MPI_Cancel(&r);
#endif
}

@ -7,7 +7,9 @@ namespace mpi
int source() const { return s.MPI_SOURCE; }
int tag() const { return s.MPI_TAG; }
int error() const { return s.MPI_ERROR; }
bool cancelled() const { int flag; MPI_Test_cancelled(const_cast<MPI_Status*>(&s), &flag); return flag; }
inline
bool cancelled() const;
template<class T>
int count() const;
@ -20,11 +22,28 @@ namespace mpi
}
}
bool
diy::mpi::status::cancelled() const
{
#ifndef DIY_NO_MPI
int flag;
MPI_Test_cancelled(const_cast<MPI_Status*>(&s), &flag);
return flag;
#else
DIY_UNSUPPORTED_MPI_CALL(diy::mpi::status::cancelled);
#endif
}
template<class T>
int
diy::mpi::status::count() const
{
#ifndef DIY_NO_MPI
int c;
MPI_Get_count(const_cast<MPI_Status*>(&s), detail::get_mpi_datatype<T>(), &c);
return c;
#else
DIY_UNSUPPORTED_MPI_CALL(diy::mpi::status::count);
#endif
}

@ -45,7 +45,7 @@ struct RegularAllReducePartners: public RegularMergePartners
//! returns whether a given block in a given round has dropped out of the merge yet or not
inline bool active(int round, int gid, const Master& m) const { return Parent::active(parent_round(round), gid, m); }
//! returns what the current round would be in the first or second parent merge reduction
int parent_round(int round) const { return round < (int) Parent::rounds() ? round : rounds() - round; }
int parent_round(int round) const { return round < (int) Parent::rounds() ? round : static_cast<int>(rounds()) - round; }
// incoming is only valid for an active gid; it will only be called with an active gid
inline void incoming(int round, int gid, std::vector<int>& partners, const Master& m) const

@ -113,7 +113,7 @@ fill(int round, int gid, std::vector<int>& partners) const
{
partner += step;
coords[kv.dim] = partner;
int partner_gid = Decomposer::coords_to_gid(coords, divisions_);
partner_gid = Decomposer::coords_to_gid(coords, divisions_);
partners.push_back(partner_gid);
}
}

@ -32,7 +32,7 @@ struct RegularSwapPartners: public RegularPartners
):
Parent(divs, kvs, contiguous) {}
bool active(int round, int gid, const Master&) const { return true; } // in swap-reduce every block is always active
bool active(int, int, const Master&) const { return true; } // in swap-reduce every block is always active
void incoming(int round, int gid, std::vector<int>& partners, const Master&) const { Parent::fill(round - 1, gid, partners); }
void outgoing(int round, int gid, std::vector<int>& partners, const Master&) const { Parent::fill(round, gid, partners); }

@ -22,6 +22,7 @@ all_to_all(Master& master, //!< block owner
)
{
auto scoped = master.prof.scoped("all_to_all");
(void)scoped;
RegularDecomposer<DiscreteBounds> decomposer(1, interval(0,assigner.nblocks()-1), assigner.nblocks());
RegularSwapPartners partners(decomposer, k, false);
reduce(master, assigner, partners, detail::AllToAllReduce<Op>(op, assigner), detail::SkipIntermediate(partners.rounds()));

@ -95,7 +95,7 @@ namespace detail
struct ReduceNeverSkip
{
bool operator()(int round, int lid, const Master& master) const { return false; }
bool operator()(int, int, const Master&) const { return false; }
};
}
@ -133,7 +133,7 @@ void reduce(Master& master, //!< master object
{
std::vector<int> incoming_gids;
partners.incoming(round + 1, master.gid(i), incoming_gids, master);
expected += incoming_gids.size();
expected += static_cast<int>(incoming_gids.size());
master.incoming(master.gid(i)).clear();
}
}

@ -186,7 +186,8 @@ namespace diy
{
size_t s = v.size();
diy::save(bb, s);
diy::save(bb, &v[0], v.size());
if (s > 0)
diy::save(bb, &v[0], v.size());
}
static void load(BinaryBuffer& bb, Vector& v)
@ -194,7 +195,8 @@ namespace diy
size_t s;
diy::load(bb, s);
v.resize(s);
diy::load(bb, &v[0], s);
if (s > 0)
diy::load(bb, &v[0], s);
}
};
@ -207,7 +209,8 @@ namespace diy
{
size_t s = v.size();
diy::save(bb, s);
diy::save(bb, &v[0], v.size());
if (s > 0)
diy::save(bb, &v[0], v.size());
}
static void load(BinaryBuffer& bb, ValArray& v)
@ -215,7 +218,8 @@ namespace diy
size_t s;
diy::load(bb, s);
v.resize(s);
diy::load(bb, &v[0], s);
if (s > 0)
diy::load(bb, &v[0], s);
}
};
@ -413,12 +417,15 @@ diy::MemoryBuffer::
save_binary(const char* x, size_t count)
{
if (position + count > buffer.capacity())
buffer.reserve((position + count) * growth_multiplier()); // if we have to grow, grow geometrically
{
double newsize = static_cast<double>(position + count) * growth_multiplier(); // if we have to grow, grow geometrically
buffer.reserve(static_cast<size_t>(newsize));
}
if (position + count > buffer.size())
buffer.resize(position + count);
std::copy(x, x + count, &buffer[position]);
std::copy_n(x, count, &buffer[position]);
position += count;
}
@ -426,7 +433,7 @@ void
diy::MemoryBuffer::
load_binary(char* x, size_t count)
{
std::copy(&buffer[position], &buffer[position + count], x);
std::copy_n(&buffer[position], count, x);
position += count;
}
@ -434,7 +441,7 @@ void
diy::MemoryBuffer::
load_binary_back(char* x, size_t count)
{
std::copy(&buffer[buffer.size() - count], &buffer[buffer.size()], x);
std::copy_n(&buffer[buffer.size() - count], count, x);
buffer.resize(buffer.size() - count);
}
@ -448,7 +455,7 @@ copy(MemoryBuffer& from, MemoryBuffer& to)
size_t total = sizeof(size_t) + sz;
to.buffer.resize(to.position + total);
std::copy(&from.buffer[from.position], &from.buffer[from.position + total], &to.buffer[to.position]);
std::copy_n(&from.buffer[from.position], total, &to.buffer[to.position]);
to.position += total;
from.position += total;
}

@ -4,15 +4,12 @@
#include <string>
#include <map>
#include <fstream>
#include <unistd.h> // mkstemp() on Mac
#include <cstdlib> // mkstemp() on Linux
#include <cstdio> // remove()
#include <fcntl.h>
#include "serialization.hpp"
#include "thread.hpp"
#include "log.hpp"
#include "io/utils.hpp"
namespace diy
{
@ -75,11 +72,14 @@ namespace diy
log->debug("FileStorage::put(): {}; buffer size: {}", filename, bb.size());
size_t sz = bb.buffer.size();
#if defined(_WIN32)
size_t written = _write(fh, &bb.buffer[0], static_cast<unsigned int>(sz));
#else
size_t written = write(fh, &bb.buffer[0], sz);
#endif
if (written < sz || written == (size_t)-1)
log->warn("Could not write the full buffer to {}: written = {}; size = {}", filename, written, sz);
fsync(fh);
close(fh);
io::utils::close(fh);
bb.wipe();
#if 0 // double-check the written file size: only for extreme debugging
@ -98,12 +98,15 @@ namespace diy
{
std::string filename;
int fh = open_random(filename);
#if defined(_WIN32)
detail::FileBuffer fb(_fdopen(fh, "wb"));
#else
detail::FileBuffer fb(fdopen(fh, "w"));
#endif
save(x, fb);
size_t sz = fb.size();
fclose(fb.file);
fsync(fh);
io::utils::sync(fh);
return make_file_record(filename, sz);
}
@ -116,10 +119,15 @@ namespace diy
bb.buffer.reserve(fr.size + extra);
bb.buffer.resize(fr.size);
#if defined(_WIN32)
int fh = -1;
_sopen_s(&fh, fr.name.c_str(), _O_RDONLY | _O_BINARY, _SH_DENYNO, _S_IREAD);
_read(fh, &bb.buffer[0], static_cast<unsigned int>(fr.size));
#else
int fh = open(fr.name.c_str(), O_RDONLY | O_SYNC, 0600);
read(fh, &bb.buffer[0], fr.size);
close(fh);
#endif
io::utils::close(fh);
remove_file(fr);
}
@ -128,8 +136,14 @@ namespace diy
FileRecord fr = extract_file_record(i);
//int fh = open(fr.name.c_str(), O_RDONLY | O_SYNC, 0600);
#if defined(_WIN32)
int fh = -1;
_sopen_s(&fh, fr.name.c_str(), _O_RDONLY | _O_BINARY, _SH_DENYNO, _S_IREAD);
detail::FileBuffer fb(_fdopen(fh, "rb"));
#else
int fh = open(fr.name.c_str(), O_RDONLY, 0600);
detail::FileBuffer fb(fdopen(fh, "r"));
#endif
load(x, fb);
fclose(fb.file);
@ -144,7 +158,7 @@ namespace diy
fr = (*accessor)[i];
accessor->erase(i);
}
remove(fr.name.c_str());
io::utils::remove(fr.name);
(*current_size_.access()) -= fr.size;
}
@ -158,7 +172,7 @@ namespace diy
it != filenames_.const_access()->end();
++it)
{
remove(it->second.name.c_str());
io::utils::remove(it->second.name);
}
}
@ -172,13 +186,7 @@ namespace diy
// pick a template at random (very basic load balancing mechanism)
filename = filename_templates_[std::rand() % filename_templates_.size()].c_str();
}
#ifdef __MACH__
// TODO: figure out how to open with O_SYNC
int fh = mkstemp(const_cast<char*>(filename.c_str()));
#else
int fh = mkostemp(const_cast<char*>(filename.c_str()), O_WRONLY | O_SYNC);
#endif
int fh = diy::io::utils::mkstemp(filename);
return fh;
}
@ -208,7 +216,7 @@ namespace diy
void remove_file(const FileRecord& fr)
{
remove(fr.name.c_str());
io::utils::remove(fr.name);
(*current_size_.access()) -= fr.size;
}

@ -1,12 +1,13 @@
#ifndef DIY_TIME_HPP
#define DIY_TIME_HPP
#ifndef _WIN32
#include <sys/time.h>
#ifdef __MACH__
#include <mach/clock.h>
#include <mach/mach.h>
#endif
#endif // __MACH__
#endif // ifndef _WIN32
namespace diy
{
@ -21,11 +22,20 @@ inline time_type get_time()
host_get_clock_service(mach_host_self(), CALENDAR_CLOCK, &cclock);
clock_get_time(cclock, &ts);
mach_port_deallocate(mach_task_self(), cclock);
return ts.tv_sec*1000 + ts.tv_nsec/1000000;
#elif defined(_WIN32)
// SOURCE: http://stackoverflow.com/questions/5404277/porting-clock-gettime-to-windows
__int64 wintime;
GetSystemTimeAsFileTime((FILETIME*)&wintime);
wintime -=116444736000000000i64; //1jan1601 to 1jan1970
long tv_sec = static_cast<long>(wintime / 10000000i64); //seconds
long tv_nsec = static_cast<long>(wintime % 10000000i64 *100); //nano-seconds
return static_cast<time_type>(tv_sec*1000 + tv_nsec/1000000);
#else
timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
#endif
return ts.tv_sec*1000 + ts.tv_nsec/1000000;
#endif
}
}