Merge topic 'add-gpu-to-gpu-conn'

1e2749580 diy,mpi: Enable GPU AWARE MPI buffers
662f93e07 Merge branch 'upstream-diy' into add-gpu-to-gpu-conn
928900c63 diy 2023-03-28 (6837fb55)
6d69301b7 DIY: bump new version

Acked-by: Kitware Robot <kwrobot@kitware.com>
Acked-by: Kenneth Moreland <morelandkd@ornl.gov>
Merge-request: !2987
This commit is contained in:
Kenneth Moreland 2023-05-31 21:22:50 +00:00 committed by Kitware Robot
commit 7037f64bba
42 changed files with 878 additions and 830 deletions

@ -186,6 +186,11 @@ vtkm_option(VTKm_SKIP_LIBRARY_VERSIONS "Skip versioning VTK-m libraries" OFF)
# through ctest's command-line. Doesn't affect CI unless enabled.
vtkm_option(VTKm_OVERRIDE_CTEST_TIMEOUT "Disable default ctest timeout" OFF)
# VTKm_ENABLE_GPU_MPI makes VTK-m to use DIY routines that enables GPU aware
# MPI. By default, this option is disabled. Also, this option is hidden unless
# VTKm_ENABLE_MPI=ON.
cmake_dependent_option(VTKm_ENABLE_GPU_MPI "Enable GPU AWARE MPI support" OFF "VTKm_ENABLE_MPI" OFF)
mark_as_advanced(
VTKm_ENABLE_LOGGING
VTKm_NO_ASSERT

@ -86,6 +86,7 @@ set(headers
DeviceAdapterAlgorithm.h
DeviceAdapterList.h
DeviceAdapterTag.h
DIYMemoryManagement.h
EnvironmentTracker.h
Error.h
ErrorBadAllocation.h
@ -154,6 +155,7 @@ set(sources
DataSetBuilderRectilinear.cxx
DataSetBuilderUniform.cxx
DeviceAdapterTag.cxx
DIYMemoryManagement.cxx
EnvironmentTracker.cxx
ErrorBadDevice.cxx
ErrorBadType.cxx

@ -0,0 +1,78 @@
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
//
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//============================================================================
#include <vtkm/cont/DIYMemoryManagement.h>
#include <vtkm/cont/DeviceAdapterList.h>
#include <vtkm/cont/DeviceAdapterTag.h>
#include <vtkm/cont/RuntimeDeviceInformation.h>
#include <vtkm/cont/serial/DeviceAdapterSerial.h>
#ifdef VTKM_ENABLE_GPU_MPI
#include <vtkm/cont/kokkos/DeviceAdapterKokkos.h>
#endif
namespace
{
thread_local vtkm::cont::DeviceAdapterId DIYCurrentDeviceAdaptor =
vtkm::cont::DeviceAdapterTagSerial();
vtkm::cont::internal::DeviceAdapterMemoryManagerBase& GetMemoryManager(
vtkm::cont::DeviceAdapterId device)
{
return vtkm::cont::RuntimeDeviceInformation().GetMemoryManager(device);
}
vtkmdiy::MemoryManagement GetDIYMemoryManagement(vtkm::cont::DeviceAdapterId device)
{
return vtkmdiy::MemoryManagement(
[device](int, size_t n) {
return static_cast<char*>(GetMemoryManager(device).AllocateRawPointer(n));
},
[device](const char* p) { GetMemoryManager(device).DeleteRawPointer(const_cast<char*>(p)); },
[device](char* dest, const char* src, size_t count) {
GetMemoryManager(device).CopyDeviceToDeviceRawPointer(src, dest, count);
});
}
}
namespace vtkm
{
namespace cont
{
vtkm::cont::DeviceAdapterId GetDIYDeviceAdapter()
{
return DIYCurrentDeviceAdaptor;
}
void DIYMasterExchange(vtkmdiy::Master& master, bool remote)
{
#ifdef VTKM_ENABLE_GPU_MPI
try
{
DIYCurrentDeviceAdaptor = vtkm::cont::DeviceAdapterTagKokkos();
master.exchange(remote, GetDIYMemoryManagement(vtkm::cont::DeviceAdapterTagKokkos()));
DIYCurrentDeviceAdaptor = vtkm::cont::DeviceAdapterTagSerial();
}
catch (...)
{
DIYCurrentDeviceAdaptor = vtkm::cont::DeviceAdapterTagSerial();
throw;
}
#else
DIYCurrentDeviceAdaptor = vtkm::cont::DeviceAdapterTagSerial();
master.exchange(remote);
#endif
}
}
}

@ -0,0 +1,30 @@
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
//
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//============================================================================
#ifndef vtk_m_cont_internal_DIYMemoryManagement_h
#define vtk_m_cont_internal_DIYMemoryManagement_h
#include <vtkm/cont/DeviceAdapterTag.h>
#include <vtkm/cont/vtkm_cont_export.h>
#include <vtkm/thirdparty/diy/diy.h>
namespace vtkm
{
namespace cont
{
VTKM_CONT_EXPORT vtkm::cont::DeviceAdapterId GetDIYDeviceAdapter();
/// \brief Wraps vtkmdiy::Master::exchange by setting its appropiate vtkmdiy::MemoryManagement.
VTKM_CONT_EXPORT void DIYMasterExchange(vtkmdiy::Master& master, bool remote = false);
}
}
#endif

@ -78,6 +78,11 @@ public:
{
throw vtkm::cont::ErrorBadDevice("Tried to manage memory on an invalid device.");
}
VTKM_CONT virtual void DeleteRawPointer(void*) const override
{
throw vtkm::cont::ErrorBadDevice("Tried to manage memory on an invalid device.");
}
};
class RuntimeDeviceConfigurationInvalid final

@ -240,6 +240,11 @@ void DeviceAdapterMemoryManager<vtkm::cont::DeviceAdapterTagCuda>::CopyDeviceToD
cudaMemcpyDeviceToDevice,
cudaStreamPerThread));
}
void DeviceAdapterMemoryManager<vtkm::cont::DeviceAdapterTagCuda>::DeleteRawPointer(void* mem) const
{
CudaDelete(mem);
};
}
}
} // namespace vtkm::cont::internal

@ -50,6 +50,8 @@ public:
VTKM_CONT virtual void CopyDeviceToDevice(
const vtkm::cont::internal::BufferInfo& src,
const vtkm::cont::internal::BufferInfo& dest) const override;
VTKM_CONT virtual void DeleteRawPointer(void* mem) const override;
};
}
}

@ -10,6 +10,7 @@
#include <vtkm/internal/Assume.h>
#include <vtkm/cont/DIYMemoryManagement.h>
#include <vtkm/cont/DeviceAdapter.h>
#include <vtkm/cont/ErrorBadAllocation.h>
#include <vtkm/cont/ErrorBadDevice.h>
@ -1158,30 +1159,46 @@ void Serialization<vtkm::cont::internal::Buffer>::save(BinaryBuffer& bb,
const vtkm::cont::internal::Buffer& obj)
{
vtkm::BufferSizeType size = obj.GetNumberOfBytes();
vtkmdiy::save(bb, size);
std::unique_ptr<vtkm::cont::Token> token;
const void* ptr = nullptr;
if (size)
if (size > 0)
{
// NOTE: If size == 0, obj.ReadPointerHost will be a nullptr, and saving that via
// vtkmdiy causes test failure on osheim
vtkm::cont::Token token;
const vtkm::UInt8* data = reinterpret_cast<const vtkm::UInt8*>(obj.ReadPointerHost(token));
vtkmdiy::save(bb, data, static_cast<std::size_t>(size));
token.reset(new vtkm::cont::Token);
ptr = obj.ReadPointerDevice(vtkm::cont::GetDIYDeviceAdapter(), *token);
}
// We need to keep the token alive until the data is consumed by DIY,
// otherwise the pointed data could be freed before it is consumed.
// Note that we cannot simply have the unique_ptr captured by the below
// lambda since save_binary_blob 3rd argument is a std::function and
// std::function needs for every parameter to be CopyAsignable, which
// vtkm::cont::Token is not.
bb.save_binary_blob(static_cast<const char*>(ptr),
static_cast<std::size_t>(size),
[token = token.release()](const char[]) {
if (token != nullptr)
{
token->DetachFromAll();
delete token;
}
});
}
void Serialization<vtkm::cont::internal::Buffer>::load(BinaryBuffer& bb,
vtkm::cont::internal::Buffer& obj)
{
vtkm::BufferSizeType size;
vtkmdiy::load(bb, size);
vtkm::cont::Token token;
auto blob = bb.load_binary_blob();
vtkm::BufferSizeType size = blob.size;
obj.SetNumberOfBytes(size, vtkm::CopyFlag::Off, token);
if (size)
{
vtkm::UInt8* data = reinterpret_cast<vtkm::UInt8*>(obj.WritePointerHost(token));
vtkmdiy::load(bb, data, static_cast<std::size_t>(size));
auto device = vtkm::cont::GetDIYDeviceAdapter();
void* ptr = obj.WritePointerDevice(device, token);
vtkm::cont::RuntimeDeviceInformation().GetMemoryManager(device).CopyDeviceToDeviceRawPointer(
blob.pointer.get(), ptr, size);
}
}

@ -42,7 +42,11 @@
#include <cstddef>
#include <cstdlib>
namespace
namespace vtkm
{
namespace cont
{
namespace internal
{
/// A deleter object that can be used with our aligned mallocs
@ -120,15 +124,6 @@ void HostReallocate(void*& memory,
memory = container = newBuffer;
}
} // anonymous namespace
namespace vtkm
{
namespace cont
{
namespace internal
{
VTKM_CONT void InvalidRealloc(void*&, void*&, vtkm::BufferSizeType, vtkm::BufferSizeType)
{
throw vtkm::cont::ErrorBadAllocation("User provided memory does not have a reallocater.");
@ -340,6 +335,28 @@ vtkm::cont::internal::BufferInfo DeviceAdapterMemoryManagerBase::ManageArray(
return vtkm::cont::internal::BufferInfo(
this->GetDevice(), memory, container, size, deleter, reallocater);
}
void* DeviceAdapterMemoryManagerBase::AllocateRawPointer(vtkm::BufferSizeType size) const
{
return this->Allocate(size).TransferOwnership().Memory;
}
void DeviceAdapterMemoryManagerBase::CopyDeviceToDeviceRawPointer(const void* src,
void* dest,
vtkm::BufferSizeType size) const
{
this->CopyDeviceToDevice(
vtkm::cont::internal::BufferInfo(
this->GetDevice(),
const_cast<void*>(src),
const_cast<void*>(src),
size,
[](void*) {},
vtkm::cont::internal::InvalidRealloc),
vtkm::cont::internal::BufferInfo(
this->GetDevice(), dest, dest, size, [](void*) {}, vtkm::cont::internal::InvalidRealloc));
}
}
}
} // namespace vtkm::cont::internal

@ -196,6 +196,37 @@ public:
/// objects were created by a previous call to this object.
VTKM_CONT virtual void CopyDeviceToDevice(const vtkm::cont::internal::BufferInfo& src,
const vtkm::cont::internal::BufferInfo& dest) const = 0;
/// \brief Low-level method to allocate memory on the device.
///
/// This method allocates an array of the given number of bytes on the device and returns
/// a void pointer to the array. The preferred method to allocate memory is to use the
/// `Allocate` method, which returns a `BufferInfo` that manages its own memory. However,
/// for cases where you are interfacing with code outside of VTK-m and need just a raw
/// pointer, this method can be used. The returned memory can be freed with
/// `DeleteRawPointer`.
VTKM_CONT virtual void* AllocateRawPointer(vtkm::BufferSizeType size) const;
/// \brief Low-level method to copy data on the device.
///
/// This method copies data from one raw pointer to another. It performs the same
/// function as `CopyDeviceToDevice`, except that it operates on raw pointers
/// instead of `BufferInfo` objects. This is a useful low-level mechanism to move
/// data on a device in memory locations created externally to VTK-m.
VTKM_CONT virtual void CopyDeviceToDeviceRawPointer(const void* src,
void* dest,
vtkm::BufferSizeType size) const;
/// \brief Low-level method to delete memory on the device.
///
/// This method takes a pointer to memory allocated on the device and frees it.
/// The preferred method to delete memory is to use the deallocation routines in
/// `BufferInfo` objects created with `Allocate`. But for cases where you only
/// have a raw pointer to the data, this method can be used to manage it. This
/// method should only be used on memory allocated with this
/// `DeviceAdaperMemoryManager`.
VTKM_CONT virtual void DeleteRawPointer(void*) const = 0;
};
/// \brief The device adapter memory manager.
@ -207,6 +238,14 @@ public:
template <typename DeviceAdapterTag>
class DeviceAdapterMemoryManager;
VTKM_CONT_EXPORT VTKM_CONT void HostDeleter(void*);
VTKM_CONT_EXPORT VTKM_CONT void* HostAllocate(vtkm::BufferSizeType);
VTKM_CONT_EXPORT VTKM_CONT void HostReallocate(void*&,
void*&,
vtkm::BufferSizeType,
vtkm::BufferSizeType);
VTKM_CONT_EXPORT VTKM_CONT void InvalidRealloc(void*&,
void*&,
vtkm::BufferSizeType,

@ -83,6 +83,12 @@ void DeviceAdapterMemoryManagerShared::CopyDeviceToDevice(
std::memcpy(dest.GetPointer(), src.GetPointer(), static_cast<std::size_t>(src.GetSize()));
}
void DeviceAdapterMemoryManagerShared::DeleteRawPointer(void* mem) const
{
vtkm::cont::internal::HostDeleter(mem);
}
}
}
} // namespace vtkm::cont::internal

@ -50,6 +50,8 @@ public:
VTKM_CONT virtual void CopyDeviceToDevice(
const vtkm::cont::internal::BufferInfo& src,
const vtkm::cont::internal::BufferInfo& dest) const override;
VTKM_CONT virtual void DeleteRawPointer(void* mem) const override;
};
}
}

@ -153,6 +153,32 @@ void DeviceAdapterMemoryManager<vtkm::cont::DeviceAdapterTagKokkos>::CopyDeviceT
static_cast<vtkm::UInt8*>(dest.GetPointer()), static_cast<std::size_t>(size));
Kokkos::deep_copy(vtkm::cont::kokkos::internal::GetExecutionSpaceInstance(), destView, srcView);
}
// Low level memory management methods
void* DeviceAdapterMemoryManager<vtkm::cont::DeviceAdapterTagKokkos>::AllocateRawPointer(
vtkm::BufferSizeType size) const
{
return vtkm::cont::kokkos::internal::Allocate(size);
}
void DeviceAdapterMemoryManager<vtkm::cont::DeviceAdapterTagKokkos>::CopyDeviceToDeviceRawPointer(
const void* src,
void* dest,
vtkm::BufferSizeType size) const
{
Kokkos::View<char*, Kokkos::MemoryTraits<Kokkos::Unmanaged>> destView(static_cast<char*>(dest),
size);
Kokkos::View<const char*, Kokkos::MemoryTraits<Kokkos::Unmanaged>> srcView(
static_cast<const char*>(src), size);
Kokkos::deep_copy(vtkm::cont::kokkos::internal::GetExecutionSpaceInstance(), destView, srcView);
}
void DeviceAdapterMemoryManager<vtkm::cont::DeviceAdapterTagKokkos>::DeleteRawPointer(
void* mem) const
{
vtkm::cont::kokkos::internal::Free(mem);
}
}
}
} // vtkm::cont::internal

@ -50,6 +50,12 @@ public:
VTKM_CONT virtual void CopyDeviceToDevice(
const vtkm::cont::internal::BufferInfo& src,
const vtkm::cont::internal::BufferInfo& dest) const override;
VTKM_CONT void* AllocateRawPointer(vtkm::BufferSizeType size) const override;
VTKM_CONT void CopyDeviceToDeviceRawPointer(const void* src,
void* dest,
vtkm::BufferSizeType size) const override;
VTKM_CONT void DeleteRawPointer(void* mem) const override;
};
}
}

@ -13,6 +13,7 @@
#include <vtkm/cont/ArrayHandle.h>
#include <vtkm/cont/testing/Testing.h>
#include <vtkm/cont/DIYMemoryManagement.h>
#include <vtkm/thirdparty/diy/serialization.h>
#include <random>
@ -189,7 +190,9 @@ void TestSerialization(const T& obj, const TestEqualFunctor& test)
master.foreach ([](Block<T>* b, const vtkmdiy::Master::ProxyWithLink& cp) {
cp.enqueue(cp.link()->target(0), b->send);
});
master.exchange();
vtkm::cont::DIYMasterExchange(master);
master.foreach ([](Block<T>* b, const vtkmdiy::Master::ProxyWithLink& cp) {
cp.dequeue(cp.link()->target(1).gid, b->received);
});

@ -26,6 +26,7 @@ set(VTKM_ENABLE_OPENMP ${VTKm_ENABLE_OPENMP})
set(VTKM_ENABLE_TBB ${VTKm_ENABLE_TBB})
set(VTKM_ENABLE_MPI ${VTKm_ENABLE_MPI})
set(VTKM_ENABLE_GPU_MPI ${VTKm_ENABLE_GPU_MPI})
if(VTKM_ENABLE_CUDA)
string(REGEX REPLACE "([0-9]+)\\.([0-9]+).*" "\\1" VTKM_CUDA_VERSION_MAJOR ${CMAKE_CUDA_COMPILER_VERSION})

@ -308,6 +308,9 @@
//Mark if we are building with MPI enabled.
#cmakedefine VTKM_ENABLE_MPI
//Mark if we are building with GPU AWARE MPI enabled.
#cmakedefine VTKM_ENABLE_GPU_MPI
//Mark what version of the CUDA compiler we have. This is needed to correctly
//choose consistent implementation ( so we don't violate ODR ) when we compile
//with CUDA 7.5

@ -8,7 +8,7 @@ readonly name="diy"
readonly ownership="Diy Upstream <kwrobot@kitware.com>"
readonly subtree="vtkm/thirdparty/$name/vtkm$name"
readonly repo="https://gitlab.kitware.com/third-party/diy2.git"
readonly tag="for/vtk-m-20220914-master-g0f1c387"
readonly tag="for/vtk-m-20230328-g9bea15a1"
readonly paths="
cmake
include

@ -33,6 +33,13 @@ macro (diy_dependent_option variable)
endif ()
endmacro ()
set (compiler_supports_sanitizers OFF)
if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang" OR
CMAKE_CXX_COMPILER_ID STREQUAL "AppleClang" OR
CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set (compiler_supports_sanitizers ON)
endif ()
diy_option (threads "Build DIY with threading" ON)
diy_option (log "Build DIY with logging" OFF)
diy_option (profile "Build DIY with profiling" OFF)
@ -44,6 +51,8 @@ diy_dependent_option (BUILD_SHARED_LIBS "Create shared libraries if on"
diy_dependent_option (build_diy_nompi_lib "Also build the nompi version of diy::mpi" OFF "mpi;build_diy_mpi_lib" OFF)
diy_option (build_examples "Build DIY examples" ON)
diy_option (build_tests "Build DIY tests" ON)
diy_option (python "Build Python bindings" OFF)
cmake_dependent_option (enable_sanitizers "Build DIY with sanitizer support" OFF "compiler_supports_sanitizers" OFF)
# Default to Release
if (NOT CMAKE_BUILD_TYPE)
@ -64,9 +73,8 @@ endif ()
# Logging
if (log)
list (APPEND diy_definitions "-DVTKMDIY_USE_SPDLOG")
find_path (SPDLOG_INCLUDE_DIR spdlog/spdlog.h)
list (APPEND diy_include_thirdparty_directories $<BUILD_INTERFACE:${SPDLOG_INCLUDE_DIR}>)
list (APPEND diy_definitions "-DVTMDIY_USE_SPDLOG")
find_package (spdlog REQUIRED)
endif()
# Profiling
@ -114,8 +122,12 @@ if (NOT DEFINED diy_export_name)
set(diy_export_name "diy_targets")
endif()
set (CMAKE_ARCHIVE_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}/lib")
set (CMAKE_LIBRARY_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}/lib")
if (NOT DEFINED CMAKE_ARCHIVE_OUTPUT_DIRECTORY)
set (CMAKE_ARCHIVE_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}/lib")
endif()
if (NOT DEFINED CMAKE_LIBRARY_OUTPUT_DIRECTORY)
set (CMAKE_LIBRARY_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}/lib")
endif()
# for diy_developer_flags
include(DIYCompilerFlags)
@ -152,6 +164,9 @@ function(add_diy_mpi_library use_mpi)
target_include_directories(${lib_name} SYSTEM PRIVATE ${diy_include_directories}) # for mpitypes.hpp
target_include_directories(${lib_name} SYSTEM PRIVATE ${diy_include_thirdparty_directories})
target_link_libraries(${lib_name} PRIVATE diy_developer_flags)
if (log)
target_link_libraries(${lib_name} PUBLIC spdlog::spdlog_header_only)
endif ()
if (use_mpi AND TARGET MPI::MPI_CXX)
target_link_libraries(${lib_name} PRIVATE MPI::MPI_CXX)
endif()
@ -195,6 +210,9 @@ target_include_directories(${diy_prefix} SYSTEM INTERFACE ${diy_include_thirdpar
if (diy_include_directories)
target_include_directories(${diy_prefix} SYSTEM INTERFACE ${diy_include_directories})
endif()
if (log)
target_link_libraries(${diy_prefix} INTERFACE spdlog::spdlog_header_only)
endif ()
target_link_libraries(${diy_prefix} INTERFACE ${diy_libraries})
if (NOT build_diy_mpi_lib)
if (mpi)
@ -224,6 +242,16 @@ elseif (${diy_prefix}mpi_nompi IN_LIST diy_targets)
endif()
list(APPEND libraries diy_developer_flags)
# Sanitizers
if (enable_sanitizers)
set(sanitizer "address" CACHE STRING "The sanitizer to use")
string (APPEND CMAKE_CXX_FLAGS " -fsanitize=${sanitizer}")
string (APPEND CMAKE_C_FLAGS " -fsanitize=${sanitizer}")
string (APPEND CMAKE_EXE_LINKER_FLAGS " -fsanitize=${sanitizer}")
string (APPEND CMAKE_SHARED_LINKER_FLAGS " -fsanitize=${sanitizer}")
endif ()
# enable testing and CDash dashboard submission
enable_testing ()
include (CTest)
@ -262,3 +290,7 @@ if (CMAKE_SOURCE_DIR STREQUAL PROJECT_SOURCE_DIR) # Only generate these files wh
install(EXPORT ${diy_export_name} NAMESPACE DIY:: DESTINATION "." FILE diy-targets.cmake)
install(FILES "${PROJECT_BINARY_DIR}/diy-config.cmake" DESTINATION ".")
endif()
if (python)
add_subdirectory(bindings/python)
endif (python)

@ -25,12 +25,10 @@ if (threads)
endif()
if (log)
find_path(SPDLOG_INCLUDE_DIR "spdlog/spdlog.h")
if (SPDLOG_INCLUDE_DIR STREQUAL "SPDLOG_INCLUDE_DIR-NOTFOUND")
find_package(spdlog ${_diy_find_quietly})
if (NOT spdlog_FOUND)
list(APPEND "${CMAKE_FIND_PACKAGE_NAME}_NOT_FOUND_MESSAGE" "SPDLOG not found")
set("${CMAKE_FIND_PACKAGE_NAME}_FOUND" 0)
else()
target_include_directories(DIY::@diy_prefix@ INTERFACE $<INSTALL_INTERFACE:${SPDLOG_INCLUDE_DIR}>)
endif()
endif()

@ -17,10 +17,10 @@ namespace diy
typedef std::vector<Element> Elements;
typedef critical_resource<int, recursive_mutex> CInt;
typedef void* (*Create)();
typedef void (*Destroy)(void*);
typedef detail::Save Save;
typedef detail::Load Load;
using Create = std::function<void*()>;
using Destroy = std::function<void(void*)>;
using Save = detail::Save;
using Load = detail::Load;
public:
Collection(Create create__,

@ -5,11 +5,13 @@ namespace diy
int from, to;
int nparts;
int round;
int nblobs;
};
struct Master::InFlightSend
{
std::shared_ptr<MemoryBuffer> message;
BinaryBlob blob;
mpi::request request;
MessageInfo info; // for debug purposes
@ -18,12 +20,18 @@ namespace diy
struct Master::InFlightRecv
{
MemoryBuffer message;
MessageInfo info { -1, -1, -1, -1 };
MessageInfo info { -1, -1, -1, -1, -1 };
bool done = false;
MemoryManagement mem;
inline bool recv(mpi::communicator& comm, const mpi::status& status);
inline void place(IncomingRound* in, bool unload, ExternalStorage* storage, IExchangeInfo* iexchange);
void reset() { *this = InFlightRecv(); }
void reset()
{
MemoryManagement mem_ = mem;
*this = InFlightRecv();
mem = mem_;
}
};
struct Master::InFlightRecvsMap: public std::map<int, InFlightRecv>
@ -111,7 +119,7 @@ recv(mpi::communicator& comm, const mpi::status& status)
result = true;
}
else
else if (info.nparts > 0)
{
size_t start_idx = message.buffer.size();
size_t count = status.count<char>();
@ -124,9 +132,24 @@ recv(mpi::communicator& comm, const mpi::status& status)
comm.recv(status.source(), status.tag(), window);
info.nparts--;
} else if (info.nblobs > 0)
{
size_t count = status.count<char>();
detail::VectorWindow<char> window;
char* buffer = mem.allocate(info.to, count);
window.begin = buffer;
window.count = count;
comm.recv(status.source(), status.tag(), window);
message.save_binary_blob(buffer, count, mem.deallocate);
info.nblobs--;
}
if (info.nparts == 0)
if (info.nparts == 0 && info.nblobs == 0)
done = true;
return result;

@ -1,3 +1,5 @@
#include <algorithm>
struct diy::Master::ProcessBlock
{
ProcessBlock(Master& master_,

@ -7,17 +7,17 @@
#include <algorithm>
#include "constants.h"
#include "thirdparty/chobo/small_vector.hpp"
#include "thirdparty/itlib/small_vector.hpp"
namespace diy
{
template<class Coordinate_, size_t static_size = VTKMDIY_MAX_DIM>
class DynamicPoint: public chobo::small_vector<Coordinate_, static_size>
class DynamicPoint: public itlib::small_vector<Coordinate_, static_size>
{
public:
using Coordinate = Coordinate_;
using Parent = chobo::small_vector<Coordinate_, static_size>;
using Parent = itlib::small_vector<Coordinate_, static_size>;
template<class U>
struct rebind { typedef DynamicPoint<U> type; };

@ -30,9 +30,9 @@ class SharedOutFile: public std::ostringstream
diy::mpi::gather(world_, contents, all_contents, root_);
// write the file serially
std::ofstream out(filename_);
std::ofstream fout(filename_);
for (auto& cntnts : all_contents)
out.write(cntnts.data(), cntnts.size());
fout.write(cntnts.data(), cntnts.size());
} else
diy::mpi::gather(world_, contents, root_);
}

@ -5,6 +5,8 @@
#include <direct.h>
#include <io.h>
#include <share.h>
#define NOMINMAX
#include <windows.h>
#else
#include <unistd.h> // mkstemp() on Mac
#include <dirent.h>

@ -55,8 +55,8 @@ set_logger(Args...)
#include <spdlog/sinks/null_sink.h>
#include <spdlog/sinks/stdout_sinks.h>
#include <spdlog/fmt/bundled/format.h>
#include <spdlog/fmt/bundled/ostream.h>
#include <spdlog/fmt/fmt.h>
#include <spdlog/fmt/ostr.h>
namespace diy
{

@ -10,6 +10,7 @@
#include <numeric>
#include <memory>
#include <chrono>
#include <climits>
#include "link.hpp"
#include "collection.hpp"
@ -28,6 +29,23 @@
namespace diy
{
struct MemoryManagement
{
using Allocate = std::function<char* (int, size_t)>;
using Deallocate = BinaryBlob::Deleter;
using MemCopy = std::function<void(char*, const char*, size_t)>;
MemoryManagement() = default;
MemoryManagement(Allocate allocate_, Deallocate deallocate_, MemCopy copy_):
allocate(allocate_), deallocate(deallocate_), copy(copy_) {}
Allocate allocate = [](int /* gid */, size_t n) { return new char[n]; };
Deallocate deallocate = [](const char* p) { delete[] p; };
MemCopy copy = [](char* dest, const char* src, size_t count) { std::memcpy(dest, src, count); };
};
// Stores and manages blocks; initiates serialization and communication when necessary.
//
// Provides a foreach function, which is meant as the main entry point.
@ -126,6 +144,8 @@ namespace diy
void unload(ExternalStorage* storage) { size_ = buffer_.size(); external_ = storage->put(buffer_); }
void load(ExternalStorage* storage) { storage->get(external_, buffer_); external_ = -1; }
MemoryBuffer& buffer() { return buffer_; }
private:
size_t size_;
int external_;
@ -147,7 +167,6 @@ namespace diy
};
typedef std::map<int, IncomingRound> IncomingRoundMap;
public:
/**
* \ingroup Initialization
@ -173,6 +192,7 @@ namespace diy
inline void destroy(int i) { if (blocks_.own()) blocks_.destroy(i); }
inline int add(int gid, void* b, Link* l); //!< add a block
inline int add(int gid, void* b, const Link& l){ return add(gid, b, l.clone()); }
inline void* release(int i); //!< release ownership of the block
//!< return the `i`-th block
@ -213,17 +233,17 @@ namespace diy
bool local(int gid__) const { return lids_.find(gid__) != lids_.end(); }
//! exchange the queues between all the blocks (collective operation)
inline void exchange(bool remote = false);
inline void exchange(bool remote = false, MemoryManagement mem = MemoryManagement());
//! nonblocking exchange of the queues between all the blocks
template<class Block>
void iexchange_(const ICallback<Block>& f);
void iexchange_(const ICallback<Block>& f, MemoryManagement mem);
template<class F>
void iexchange(const F& f)
void iexchange(const F& f, MemoryManagement mem = MemoryManagement())
{
using Block = typename detail::block_traits<F>::type;
iexchange_<Block>(f);
iexchange_<Block>(f, mem);
}
inline void process_collectives();
@ -283,29 +303,30 @@ namespace diy
public:
// Communicator functionality
inline void flush(bool remote = false); // makes sure all the serialized queues migrate to their target processors
inline void flush(bool remote, MemoryManagement mem = MemoryManagement()); // makes sure all the serialized queues migrate to their target processors
private:
// Communicator functionality
inline void comm_exchange(GidSendOrder& gid_order, IExchangeInfo* iex = 0);
inline void rcomm_exchange(); // possibly called in between block computations
inline void comm_exchange(GidSendOrder& gid_order, MemoryManagement mem, IExchangeInfo* iex = 0);
inline void rcomm_exchange(MemoryManagement mem); // possibly called in between block computations
inline bool nudge(IExchangeInfo* iex = 0);
inline void send_queue(int from_gid, int to_gid, int to_proc, QueueRecord& qr, bool remote, IExchangeInfo* iex);
inline void send_queue(int from_gid, int to_gid, int to_proc, QueueRecord& qr, bool remote, MemoryManagement mem, IExchangeInfo* iex);
inline void send_outgoing_queues(GidSendOrder& gid_order,
bool remote,
MemoryManagement mem,
IExchangeInfo* iex = 0);
inline void check_incoming_queues(IExchangeInfo* iex = 0);
inline void check_incoming_queues(MemoryManagement mem, IExchangeInfo* iex = 0);
inline GidSendOrder
order_gids();
inline void touch_queues();
inline void send_same_rank(int from, int to, QueueRecord& qr, IExchangeInfo* iex);
inline void send_same_rank(int from, int to, QueueRecord& qr, MemoryManagement mem, IExchangeInfo* iex);
inline void send_different_rank(int from, int to, int proc, QueueRecord& qr, bool remote, IExchangeInfo* iex);
inline InFlightRecv& inflight_recv(int proc);
inline InFlightSendsList& inflight_sends();
// iexchange commmunication
inline void icommunicate(IExchangeInfo* iex); // async communication
inline void icommunicate(IExchangeInfo* iex, MemoryManagement mem); // async communication
struct tags { enum {
queue,
@ -607,7 +628,7 @@ foreach_(const Callback<Block>& f, const Skip& skip)
void
diy::Master::
exchange(bool remote)
exchange(bool remote, MemoryManagement mem)
{
auto scoped = prof.scoped("exchange");
VTKMDIY_UNUSED(scoped);
@ -625,7 +646,7 @@ exchange(bool remote)
if (!remote)
touch_queues();
flush(remote);
flush(remote, mem);
log->debug("Finished exchange");
}
@ -658,7 +679,7 @@ touch_queues()
template<class Block>
void
diy::Master::
iexchange_(const ICallback<Block>& f)
iexchange_(const ICallback<Block>& f, MemoryManagement mem)
{
auto scoped = prof.scoped("iexchange");
VTKMDIY_UNUSED(scoped);
@ -685,11 +706,11 @@ iexchange_(const ICallback<Block>& f)
thread comm_thread;
if (threads() > 1)
comm_thread = thread([this,&iex]()
comm_thread = thread([this,&iex,mem]()
{
while(!iex.all_done())
{
icommunicate(&iex);
icommunicate(&iex, mem);
iex.control();
//std::this_thread::sleep_for(std::chrono::microseconds(1));
}
@ -713,7 +734,7 @@ iexchange_(const ICallback<Block>& f)
stats::Annotation::Guard g( stats::Annotation("diy.block").set(gid) );
if (threads() == 1)
icommunicate(&iex);
icommunicate(&iex, mem);
bool done = done_result[gid];
if (!done || !empty_incoming(gid))
{
@ -762,17 +783,17 @@ iexchange_(const ICallback<Block>& f)
/* Communicator */
void
diy::Master::
comm_exchange(GidSendOrder& gid_order, IExchangeInfo* iex)
comm_exchange(GidSendOrder& gid_order, MemoryManagement mem, IExchangeInfo* iex)
{
auto scoped = prof.scoped("comm-exchange");
VTKMDIY_UNUSED(scoped);
send_outgoing_queues(gid_order, false, iex);
send_outgoing_queues(gid_order, false, mem, iex);
while(nudge(iex)) // kick requests
;
check_incoming_queues(iex);
check_incoming_queues(mem, iex);
}
/* Remote communicator */
@ -803,7 +824,7 @@ comm_exchange(GidSendOrder& gid_order, IExchangeInfo* iex)
//
void
diy::Master::
rcomm_exchange()
rcomm_exchange(MemoryManagement mem)
{
bool done = false;
bool ibarr_act = false;
@ -814,12 +835,12 @@ rcomm_exchange()
while (!done)
{
send_outgoing_queues(gid_order, true, 0);
send_outgoing_queues(gid_order, true, mem, 0);
// kick requests
nudge();
check_incoming_queues();
check_incoming_queues(mem);
if (ibarr_act)
{
if (ibarr_req.test())
@ -877,7 +898,7 @@ order_gids()
// iexchange communicator
void
diy::Master::
icommunicate(IExchangeInfo* iex)
icommunicate(IExchangeInfo* iex, MemoryManagement mem)
{
auto scoped = prof.scoped("icommunicate");
VTKMDIY_UNUSED(scoped);
@ -887,7 +908,7 @@ icommunicate(IExchangeInfo* iex)
auto gid_order = order_gids();
// exchange
comm_exchange(gid_order, iex);
comm_exchange(gid_order, mem, iex);
// cleanup
@ -906,6 +927,7 @@ send_queue(int from_gid,
int to_proc,
QueueRecord& qr,
bool remote,
MemoryManagement mem,
IExchangeInfo* iex)
{
stats::Annotation::Guard gb( stats::Annotation("diy.block").set(from_gid) );
@ -917,7 +939,7 @@ send_queue(int from_gid,
log->debug("[{}] Sending queue: {} <- {} of size {}, iexchange = {}", comm_.rank(), to_gid, from_gid, qr.size(), iex ? 1 : 0);
if (to_proc == comm_.rank()) // sending to same rank, simply swap buffers
send_same_rank(from_gid, to_gid, qr, iex);
send_same_rank(from_gid, to_gid, qr, mem, iex);
else // sending to an actual message to a different rank
send_different_rank(from_gid, to_gid, to_proc, qr, remote, iex);
}
@ -926,6 +948,7 @@ void
diy::Master::
send_outgoing_queues(GidSendOrder& gid_order,
bool remote, // TODO: are remote and iexchange mutually exclusive? If so, use single enum?
MemoryManagement mem,
IExchangeInfo* iex)
{
auto scoped = prof.scoped("send-outgoing-queues");
@ -950,7 +973,7 @@ send_outgoing_queues(GidSendOrder& gid_order,
access.unlock(); // others can push on this queue, while we are working
assert(!qr.external());
log->debug("Processing queue: {} <- {} of size {}", to_gid, from, qr.size());
send_queue(from, to_gid, to_proc, qr, remote, iex);
send_queue(from, to_gid, to_proc, qr, remote, mem, iex);
access.lock();
}
}
@ -978,7 +1001,7 @@ send_outgoing_queues(GidSendOrder& gid_order,
// NB: send only front
auto& qr = access->front();
log->debug("Processing queue: {} <- {} of size {}", to_gid, from_gid, qr.size());
send_queue(from_gid, to_gid, to_proc, qr, remote, iex);
send_queue(from_gid, to_gid, to_proc, qr, remote, mem, iex);
access->pop_front();
}
}
@ -987,7 +1010,7 @@ send_outgoing_queues(GidSendOrder& gid_order,
void
diy::Master::
send_same_rank(int from, int to, QueueRecord& qr, IExchangeInfo*)
send_same_rank(int from, int to, QueueRecord& qr, MemoryManagement mem, IExchangeInfo*)
{
auto scoped = prof.scoped("send-same-rank");
@ -997,9 +1020,24 @@ send_same_rank(int from, int to, QueueRecord& qr, IExchangeInfo*)
auto access_incoming = current_incoming.map[to][from].access();
// save blobs to copy them explicitly
std::vector<BinaryBlob> blobs;
qr.buffer().blobs.swap(blobs);
qr.buffer().blob_position = 0;
access_incoming->emplace_back(std::move(qr));
QueueRecord& in_qr = access_incoming->back();
// copy blobs explicitly; we cannot just move them in place, since we don't
// own their memory and must guarantee that it's safe to free, once
// exchange() is done
for (BinaryBlob& blob : blobs)
{
char* p = mem.allocate(to, blob.size);
mem.copy(p, blob.pointer.get(), blob.size);
in_qr.buffer().save_binary_blob(p, blob.size, mem.deallocate);
}
if (!in_qr.external())
{
in_qr.reset();
@ -1029,7 +1067,7 @@ send_different_rank(int from, int to, int proc, QueueRecord& qr, bool remote, IE
// sending to a different rank
std::shared_ptr<MemoryBuffer> buffer = std::make_shared<MemoryBuffer>(qr.move());
MessageInfo info{from, to, 1, exchange_round_};
MessageInfo info{from, to, 1, exchange_round_, static_cast<int>(buffer->nblobs())};
// size fits in one message
if (Serialization<MemoryBuffer>::size(*buffer) + Serialization<MessageInfo>::size(info) <= MAX_MPI_MESSAGE_COUNT)
{
@ -1103,11 +1141,33 @@ send_different_rank(int from, int to, int proc, QueueRecord& qr, bool remote, IE
inflight_send.message = buffer;
}
} // large message broken into pieces
// send binary blobs
for (size_t i = 0; i < buffer->nblobs(); ++i)
{
auto blob = buffer->load_binary_blob();
assert(blob.size < MAX_MPI_MESSAGE_COUNT); // for now assume blobs are small enough that we don't need to break them into multiple parts
inflight_sends().emplace_back();
auto& inflight_send = inflight_sends().back();
inflight_send.info = info;
detail::VectorWindow<char> window;
window.begin = const_cast<char*>(blob.pointer.get());
window.count = blob.size;
if (remote || iex)
inflight_send.request = comm_.issend(proc, tags::queue, window);
else
inflight_send.request = comm_.isend(proc, tags::queue, window);
inflight_send.blob = std::move(blob);
}
}
void
diy::Master::
check_incoming_queues(IExchangeInfo* iex)
check_incoming_queues(MemoryManagement mem, IExchangeInfo* iex)
{
auto scoped = prof.scoped("check-incoming-queues");
VTKMDIY_UNUSED(scoped);
@ -1116,6 +1176,7 @@ check_incoming_queues(IExchangeInfo* iex)
while (ostatus)
{
InFlightRecv& ir = inflight_recv(ostatus->source());
ir.mem = mem;
if (iex)
iex->inc_work(); // increment work before sender's issend request can complete (so we are now responsible for the queue)
@ -1141,7 +1202,7 @@ check_incoming_queues(IExchangeInfo* iex)
void
diy::Master::
flush(bool remote)
flush(bool remote, MemoryManagement mem)
{
#ifdef VTKMDIY_DEBUG
time_type start = get_time();
@ -1155,13 +1216,13 @@ flush(bool remote)
if (remote)
rcomm_exchange();
rcomm_exchange(mem);
else
{
auto gid_order = order_gids();
do
{
comm_exchange(gid_order);
comm_exchange(gid_order, mem);
#ifdef VTKMDIY_DEBUG
time_type cur = get_time();

@ -1,6 +1,8 @@
#ifndef VTKMDIY_MPI_CONFIG_HPP
#define VTKMDIY_MPI_CONFIG_HPP
#include <utility>
/// We want to allow the use of `diy::mpi` in either header-only or library mode.
/// VTKMDIY_MPI_AS_LIB is defined when using library mode.
/// This file contains some configuration macros. To maintain backwards compatibility
@ -49,13 +51,26 @@ struct DIY_##mpitype { \
mpitype data; \
};
#define DEFINE_DIY_MPI_TYPE_MOVE(mpitype) \
struct DIY_##mpitype { \
DIY_##mpitype() = default; \
DIY_##mpitype(const mpitype&) = delete; \
DIY_##mpitype(mpitype&& obj) : data(std::move(obj)) {} \
DIY_##mpitype& operator=(const mpitype&) = delete; \
DIY_##mpitype& operator=(mpitype&& obj) { data = std::move(obj); return *this; } \
operator const mpitype&() const { return data; } \
void reset() { data = mpitype(); } \
private: \
mpitype data; \
};
DEFINE_DIY_MPI_TYPE(MPI_Comm)
DEFINE_DIY_MPI_TYPE(MPI_Datatype)
DEFINE_DIY_MPI_TYPE(MPI_Status)
DEFINE_DIY_MPI_TYPE(MPI_Request)
DEFINE_DIY_MPI_TYPE(MPI_Op)
DEFINE_DIY_MPI_TYPE(MPI_File)
DEFINE_DIY_MPI_TYPE(MPI_Win)
DEFINE_DIY_MPI_TYPE_MOVE(MPI_Win)
#undef DEFINE_DIY_MPI_TYPE

@ -18,13 +18,18 @@ inline mpitype& mpi_cast(DIY_##mpitype& obj) { return *reinterpret_cast<mpitype*
inline const mpitype& mpi_cast(const DIY_##mpitype& obj) { return *reinterpret_cast<const mpitype*>(&obj); } \
inline DIY_##mpitype make_DIY_##mpitype(const mpitype& obj) { DIY_##mpitype ret; mpi_cast(ret) = obj; return ret; }
#define DEFINE_MPI_CAST_MOVE(mpitype) \
inline mpitype& mpi_cast(DIY_##mpitype& obj) { return *reinterpret_cast<mpitype*>(&obj); } \
inline const mpitype& mpi_cast(const DIY_##mpitype& obj) { return *reinterpret_cast<const mpitype*>(&obj); } \
inline DIY_##mpitype make_DIY_##mpitype(mpitype&& obj) { DIY_##mpitype ret = std::move(obj); return ret; }
DEFINE_MPI_CAST(MPI_Comm)
DEFINE_MPI_CAST(MPI_Datatype)
DEFINE_MPI_CAST(MPI_Status)
DEFINE_MPI_CAST(MPI_Request)
DEFINE_MPI_CAST(MPI_Op)
DEFINE_MPI_CAST(MPI_File)
DEFINE_MPI_CAST(MPI_Win)
DEFINE_MPI_CAST_MOVE(MPI_Win)
#undef DEFINE_MPI_CAST

@ -1,6 +1,8 @@
#ifndef VTKMDIY_MPI_MPITYPES_H
#define VTKMDIY_MPI_MPITYPES_H
#include <cstring>
#cmakedefine TYPESIZE_MPI_Comm @TYPESIZE_MPI_Comm@
#cmakedefine TYPESIZE_MPI_Datatype @TYPESIZE_MPI_Datatype@
#cmakedefine TYPESIZE_MPI_Status @TYPESIZE_MPI_Status@
@ -18,6 +20,7 @@ namespace mpi
# define ASSERT_MPI_TYPE_SIZE(mpitype) static_assert(sizeof(mpitype) <= sizeof(DIY_##mpitype), "");
#else
# define ASSERT_MPI_TYPE_SIZE(mpitype)
struct MPI_Win;
#endif
#define DEFINE_DIY_MPI_TYPE(mpitype) \
@ -26,15 +29,41 @@ struct DIY_##mpitype { \
}; \
ASSERT_MPI_TYPE_SIZE(mpitype)
#define DEFINE_DIY_MPI_TYPE_MOVE(mpitype) \
struct DIY_##mpitype \
{ \
DIY_##mpitype() = default; \
DIY_##mpitype(const mpitype&) = delete; \
DIY_##mpitype& operator=(const mpitype&) = delete; \
DIY_##mpitype(mpitype&& obj) \
{ \
std::memcpy(data, &obj, TYPESIZE_##mpitype); \
std::memset(&obj, 0, TYPESIZE_##mpitype); \
} \
DIY_##mpitype& operator=(mpitype&& obj) \
{ \
std::memcpy(data, &obj, TYPESIZE_##mpitype); \
std::memset(&obj, 0, TYPESIZE_##mpitype); \
return *this; \
} \
operator const mpitype&() const { return *reinterpret_cast<const mpitype*>(data); } \
void reset() { std::memset(data, 0, TYPESIZE_##mpitype); } \
\
private: \
char* data[TYPESIZE_##mpitype]; \
}; \
ASSERT_MPI_TYPE_SIZE(mpitype);
DEFINE_DIY_MPI_TYPE(MPI_Comm)
DEFINE_DIY_MPI_TYPE(MPI_Datatype)
DEFINE_DIY_MPI_TYPE(MPI_Status)
DEFINE_DIY_MPI_TYPE(MPI_Request)
DEFINE_DIY_MPI_TYPE(MPI_Op)
DEFINE_DIY_MPI_TYPE(MPI_File)
DEFINE_DIY_MPI_TYPE(MPI_Win)
DEFINE_DIY_MPI_TYPE_MOVE(MPI_Win)
#undef DEFINE_DIY_MPI_TYPE
#undef DEFINE_DIY_MPI_TYPE_MOVE
#undef ASSERT_MPI_TYPE_SIZE
}

@ -1,6 +1,7 @@
#ifndef VTKMDIY_MPI_NO_MPI_HPP
#define VTKMDIY_MPI_NO_MPI_HPP
#include <cassert> // std::assert
#include <stdexcept> // std::runtime_error
@ -75,7 +76,39 @@ static const int MPI_MODE_APPEND = 128;
static const int MPI_MODE_SEQUENTIAL = 256;
/* define window type */
using MPI_Win = void*;
struct MPI_Win {
MPI_Win(): data_(0) {}
MPI_Win(void* data, bool owned = false): data_(uintptr_t(data) | (owned ? 0x1 : 0x0))
{
// We assume that pointers have at least some higher-byte alignment.
assert(!(uintptr_t(data) & 0x1));
}
void* data() const { return (void*)(data_ & ~0x1); }
bool owned() const { return data_ & 0x1; }
// We cannot copy owned windows.
MPI_Win(MPI_Win const&) = delete;
MPI_Win& operator=(MPI_Win const&) = delete;
// We cannot move owned windows (we don't know how to delete them in general).
MPI_Win(MPI_Win&& rhs): data_(rhs.data_)
{
rhs.data_ = 0;
}
MPI_Win& operator=(MPI_Win&& rhs)
{
if (this == &rhs)
return *this;
data_ = rhs.data_;
rhs.data_ = 0;
return *this;
}
private:
uintptr_t data_;
};
#define MPI_WIN_NULL MPI_Win()
/* window fence assertions */
static const int MPI_MODE_NOSTORE = 1;

@ -37,8 +37,8 @@ namespace mpi
const void* address() const { return buf_; }
private:
alignas(T) char buf_[sizeof(T)];
bool init_;
char buf_[sizeof(T)];
};
}
}

@ -22,6 +22,21 @@ EXPORT_MACRO const int nocheck = MPI_MODE_NOCHECK;
namespace detail
{
DIY_MPI_Win win_allocate(const communicator& comm, void** base, unsigned size, int disp)
{
#if VTKMDIY_HAS_MPI
DIY_MPI_Win win;
MPI_Win_allocate(size, disp, MPI_INFO_NULL, mpi_cast(comm.handle()), base, &mpi_cast(win));
return win;
#else
(void)comm; (void)disp;
*base = malloc(size);
auto mpi_win = MPI_Win(*base, true);
auto win = make_DIY_MPI_Win(std::move(mpi_win));
return win;
#endif
}
DIY_MPI_Win win_create(const communicator& comm, void* base, unsigned size, int disp)
{
#if VTKMDIY_HAS_MPI
@ -30,7 +45,8 @@ DIY_MPI_Win win_create(const communicator& comm, void* base, unsigned size, int
return win;
#else
(void)comm; (void)size; (void)disp;
auto win = make_DIY_MPI_Win(base);
auto mpi_win = MPI_Win(base);
auto win = make_DIY_MPI_Win(std::move(mpi_win));
return win;
#endif
}
@ -40,7 +56,9 @@ void win_free(DIY_MPI_Win& win)
#if VTKMDIY_HAS_MPI
MPI_Win_free(&mpi_cast(win));
#else
(void)win;
auto& mpi_win = mpi_cast(win);
if (mpi_win.owned())
free(mpi_win.data());
#endif
}
@ -49,7 +67,7 @@ void put(const DIY_MPI_Win& win, const void* data, int count, const datatype& ty
#if VTKMDIY_HAS_MPI
MPI_Put(data, count, mpi_cast(type.handle), rank, offset, count, mpi_cast(type.handle), mpi_cast(win));
#else
void* buffer = mpi_cast(win);
void* buffer = mpi_cast(win).data();
size_t size = mpi_cast(type.handle);
std::copy_n(static_cast<const int8_t*>(data),
size * static_cast<size_t>(count),
@ -63,7 +81,7 @@ void get(const DIY_MPI_Win& win, void* data, int count, const datatype& type, in
#if VTKMDIY_HAS_MPI
MPI_Get(data, count, mpi_cast(type.handle), rank, offset, count, mpi_cast(type.handle), mpi_cast(win));
#else
const void* buffer = mpi_cast(win);
const void* buffer = mpi_cast(win).data();
size_t size = mpi_cast(type.handle);
std::copy_n(static_cast<const int8_t*>(buffer) + (offset * size),
size * static_cast<size_t>(count),
@ -136,7 +154,7 @@ void fetch(const DIY_MPI_Win& win, void* result, const datatype& type, int rank,
MPI_Fetch_and_op(nullptr, result, mpi_cast(type.handle), rank, offset, MPI_NO_OP, mpi_cast(win));
#else
(void) rank;
const void* buffer = mpi_cast(win);
const void* buffer = mpi_cast(win).data();
size_t size = mpi_cast(type.handle);
std::copy_n(static_cast<const int8_t*>(buffer) + (offset * size),
size,
@ -150,7 +168,7 @@ void replace(const DIY_MPI_Win& win, const void* value, const datatype& type, in
MPI_Fetch_and_op(value, nullptr, mpi_cast(type.handle), rank, offset, MPI_REPLACE, mpi_cast(win));
#else
(void) rank;
void* buffer = mpi_cast(win);
void* buffer = mpi_cast(win).data();
size_t size = mpi_cast(type.handle);
std::copy_n(static_cast<const int8_t*>(value),
size,

@ -22,6 +22,9 @@ VTKMDIY_MPI_EXPORT extern const int nocheck;
namespace detail
{
VTKMDIY_MPI_EXPORT_FUNCTION
DIY_MPI_Win win_allocate(const communicator& comm, void** base, unsigned size, int disp);
VTKMDIY_MPI_EXPORT_FUNCTION
DIY_MPI_Win win_create(const communicator& comm, void* base, unsigned size, int disp);
@ -96,8 +99,8 @@ void flush_local_all(const DIY_MPI_Win& win);
inline ~window();
// moving is Ok
window(window&&) = default;
window& operator=(window&&) = default;
inline window(window&&);
inline window& operator=(window&&);
// cannot copy because of the buffer_
window(const window&) = delete;
@ -129,7 +132,7 @@ void flush_local_all(const DIY_MPI_Win& win);
inline void flush_local_all();
private:
std::vector<T> buffer_;
void* buffer_;
int rank_;
DIY_MPI_Win window_;
};
@ -140,16 +143,46 @@ void flush_local_all(const DIY_MPI_Win& win);
template<class T>
diy::mpi::window<T>::
window(const diy::mpi::communicator& comm, unsigned size):
buffer_(size), rank_(comm.rank())
buffer_(nullptr), rank_(comm.rank())
{
window_ = detail::win_create(comm, buffer_.data(), static_cast<unsigned>(buffer_.size()*sizeof(T)), static_cast<int>(sizeof(T)));
window_ = detail::win_allocate(comm, &buffer_, static_cast<unsigned>(size*sizeof(T)), static_cast<int>(sizeof(T)));
}
template<class T>
diy::mpi::window<T>::
~window()
{
detail::win_free(window_);
if (buffer_)
detail::win_free(window_);
}
template<class T>
diy::mpi::window<T>::
window(window&& rhs):
buffer_(rhs.buffer_), rank_(rhs.rank_), window_(std::move(rhs.window_))
{
rhs.buffer_ = nullptr;
rhs.window_.reset();
}
template<class T>
diy::mpi::window<T>&
diy::mpi::window<T>::
operator=(window&& rhs)
{
if (this == &rhs)
return *this;
if (buffer_)
detail::win_free(window_);
buffer_ = rhs.buffer_;
rhs.buffer_ = nullptr;
rank_ = rhs.rank_;
window_ = std::move(rhs.window_);
rhs.window_.reset();
return *this;
}
template<class T>

@ -105,6 +105,12 @@ namespace diy
void (*save)(BinaryBuffer&, const T&) = &::diy::save //!< optional serialization function
) const;
void inline enqueue_blob
(const BlockID& to, //!< target block (gid,proc)
const char* x, //!< pointer to the data
size_t n //!< size in data elements (eg. ints)
) const;
//! Dequeue data whose size can be determined automatically (e.g., STL vector) and that was
//! previously enqueued so that diy knows its size when it is received.
//! In this case, diy will allocate the receive buffer; the user does not need to do so.
@ -142,6 +148,9 @@ namespace diy
void (*load)(BinaryBuffer&, T&) = &::diy::load //!< optional serialization function
) const { dequeue(from.gid, x, n, load); }
BinaryBlob inline dequeue_blob
(int from) const;
template<class T>
EnqueueIterator<T> enqueuer(const T& x,
void (*save)(BinaryBuffer&, const T&) = &::diy::save ) const
@ -347,5 +356,20 @@ dequeue(int from, T* x, size_t n,
load(bb, x[i]);
}
void
diy::Master::Proxy::
enqueue_blob(const BlockID& to, const char* x, size_t n) const
{
BinaryBuffer& bb = outgoing_[to];
bb.save_binary_blob(x,n);
}
diy::BinaryBlob
diy::Master::Proxy::
dequeue_blob(int from) const
{
BinaryBuffer& bb = incoming_[from];
return bb.load_binary_blob();
}
#endif

@ -138,7 +138,7 @@ void reduce(Master& master, //!< master object
}
}
master.set_expected(expected);
master.flush();
master.flush(false);
}
// final round
log->debug("Round {}", round);

@ -1,22 +1,30 @@
#ifndef VTKMDIY_SERIALIZATION_HPP
#define VTKMDIY_SERIALIZATION_HPP
#include <vector>
#include <valarray>
#include <cassert>
#include <fstream>
#include <functional>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <fstream>
#include <tuple>
#include <type_traits> // this is used for a safety check for default serialization
#include <unordered_map>
#include <unordered_set>
#include <type_traits> // this is used for a safety check for default serialization
#include <cassert>
#include <valarray>
#include <vector>
namespace diy
{
struct BinaryBlob
{
using Deleter = std::function<void(const char[])>;
using Pointer = std::unique_ptr<const char[], Deleter>;
Pointer pointer;
size_t size;
};
//! A serialization buffer. \ingroup Serialization
struct BinaryBuffer
{
@ -25,10 +33,18 @@ namespace diy
virtual inline void append_binary(const char* x, size_t count) =0; //!< append `count` bytes from `x` to end of buffer
virtual void load_binary(char* x, size_t count) =0; //!< copy `count` bytes into `x` from the buffer
virtual void load_binary_back(char* x, size_t count) =0; //!< copy `count` bytes into `x` from the back of the buffer
virtual char* grow(size_t count) =0; //!< allocate enough space for `count` bytes and return the pointer to the beginning
virtual char* advance(size_t count) =0; //!< advance buffer position by `count` bytes and return the pointer to the beginning
virtual void save_binary_blob(const char*, size_t) =0;
virtual void save_binary_blob(const char*, size_t, BinaryBlob::Deleter) = 0;
virtual BinaryBlob load_binary_blob() =0;
};
struct MemoryBuffer: public BinaryBuffer
{
using Blob = BinaryBlob;
MemoryBuffer(size_t position_ = 0):
position(position_) {}
@ -41,6 +57,13 @@ namespace diy
virtual inline void append_binary(const char* x, size_t count) override; //!< append `count` bytes from `x` to end of buffer
virtual inline void load_binary(char* x, size_t count) override; //!< copy `count` bytes into `x` from the buffer
virtual inline void load_binary_back(char* x, size_t count) override; //!< copy `count` bytes into `x` from the back of the buffer
virtual inline char* grow(size_t count) override; //!< allocate enough space for `count` bytes and return the pointer to the beginning
virtual inline char* advance(size_t count) override; //!< advance buffer position by `count` bytes and return the pointer to the beginning
virtual inline void save_binary_blob(const char* x, size_t count) override;
virtual inline void save_binary_blob(const char* x, size_t count, Blob::Deleter deleter) override;
virtual inline Blob load_binary_blob() override;
size_t nblobs() const { return blobs.size(); }
void clear() { buffer.clear(); reset(); }
void wipe() { std::vector<char>().swap(buffer); reset(); }
@ -71,6 +94,9 @@ namespace diy
size_t position;
std::vector<char> buffer;
size_t blob_position = 0;
std::vector<Blob> blobs;
};
namespace detail
@ -140,7 +166,7 @@ namespace diy
template<class T>
void load_back(BinaryBuffer& bb, T& x) { bb.load_binary_back((char*) &x, sizeof(T)); }
//@}
//!@}
namespace detail
@ -444,17 +470,7 @@ void
diy::MemoryBuffer::
save_binary(const char* x, size_t count)
{
if (position + count > buffer.capacity())
{
double newsize = static_cast<double>(position + count) * growth_multiplier(); // if we have to grow, grow geometrically
buffer.reserve(static_cast<size_t>(newsize));
}
if (position + count > buffer.size())
buffer.resize(position + count);
std::copy_n(x, count, &buffer[position]);
position += count;
std::copy_n(x, count, grow(count));
}
void
@ -509,6 +525,58 @@ load_binary_back(char* x, size_t count)
buffer.resize(buffer.size() - count);
}
char*
diy::MemoryBuffer::
grow(size_t count)
{
if (position + count > buffer.capacity())
{
double newsize = static_cast<double>(position + count) * growth_multiplier(); // if we have to grow, grow geometrically
buffer.reserve(static_cast<size_t>(newsize));
}
if (position + count > buffer.size())
buffer.resize(position + count);
char* destination = &buffer[position];
position += count;
return destination;
}
char*
diy::MemoryBuffer::
advance(size_t count)
{
char* origin = &buffer[position];
position += count;
return origin;
}
void
diy::MemoryBuffer::
save_binary_blob(const char* x, size_t count)
{
// empty deleter means we don't take ownership
save_binary_blob(x, count, [](const char[]) {});
}
void
diy::MemoryBuffer::
save_binary_blob(const char* x, size_t count, Blob::Deleter deleter)
{
blobs.emplace_back(Blob { Blob::Pointer {x, deleter}, count });
}
diy::MemoryBuffer::Blob
diy::MemoryBuffer::
load_binary_blob()
{
return std::move(blobs[blob_position++]);
}
void
diy::MemoryBuffer::
copy(MemoryBuffer& from, MemoryBuffer& to)

@ -15,8 +15,8 @@ namespace diy
{
namespace detail
{
typedef void (*Save)(const void*, BinaryBuffer& buf);
typedef void (*Load)(void*, BinaryBuffer& buf);
using Save = std::function<void(const void*, BinaryBuffer&)>;
using Load = std::function<void(void*, BinaryBuffer&)>;
struct FileBuffer: public BinaryBuffer
{
@ -34,6 +34,16 @@ namespace diy
}
virtual inline void load_binary(char* x, size_t count) override { auto n = fread(x, 1, count, file); VTKMDIY_UNUSED(n);}
virtual inline void load_binary_back(char* x, size_t count) override { fseek(file, static_cast<long>(tail), SEEK_END); auto n = fread(x, 1, count, file); tail += count; fseek(file, static_cast<long>(head), SEEK_SET); VTKMDIY_UNUSED(n);}
virtual inline char* grow(size_t) override { throw std::runtime_error("Cannot grow a FileBuffer"); }
virtual inline char* advance(size_t) override { throw std::runtime_error("Cannot advance a FileBuffer"); }
// TODO: for now, we just throw, but obviously it should be possile to store binary blobs in a file; might want to fall back
using Blob = BinaryBlob;
virtual inline void save_binary_blob(const char*, size_t) override { throw std::runtime_error("Cannot save binary blobs in a FileBuffer"); }
virtual inline void save_binary_blob(const char*, size_t, Blob::Deleter) override { throw std::runtime_error("Cannot save binary blobs in a FileBuffer"); }
virtual inline Blob load_binary_blob() override { throw std::runtime_error("Cannot load binary blobs from a FileBuffer"); }
size_t size() const { return head; }

@ -41,6 +41,9 @@ namespace diy
#include "critical-resource.hpp"
#if !defined(VTKMDIY_NO_THREADS)
#include <memory> // for shared_ptr
template<class T, class U>
struct diy::concurrent_map
{

@ -3,6 +3,6 @@
#define VTKMDIY_VERSION_MAJOR 3
#define VTKMDIY_VERSION_MINOR 5
#define DIY_VERSION_PATCH dev1
#define VTKMDIY_VERSION_PATCH dev1
#endif