Replace MemStream with the diy serialization code.

This commit is contained in:
Dave Pugmire 2020-08-25 11:38:01 -04:00
parent cdc41cc987
commit 63b31ffb24
9 changed files with 166 additions and 461 deletions

@ -13,6 +13,7 @@
#include <vtkm/Bitset.h>
#include <vtkm/VecVariable.h>
#include <vtkm/VectorAnalysis.h>
#include <vtkm/cont/Serialization.h>
namespace vtkm
{
@ -258,4 +259,31 @@ private:
} //namespace vtkm
namespace mangled_diy_namespace
{
template <>
struct Serialization<vtkm::Massless>
{
public:
static VTKM_CONT void save(BinaryBuffer& bb, const vtkm::Massless& p)
{
vtkmdiy::save(bb, p.Pos);
vtkmdiy::save(bb, p.ID);
vtkmdiy::save(bb, p.NumSteps);
vtkmdiy::save(bb, p.Status);
vtkmdiy::save(bb, p.Time);
}
static VTKM_CONT void load(BinaryBuffer& bb, vtkm::Massless& p)
{
vtkmdiy::load(bb, p.Pos);
vtkmdiy::load(bb, p.ID);
vtkmdiy::load(bb, p.NumSteps);
vtkmdiy::load(bb, p.Status);
vtkmdiy::load(bb, p.Time);
}
};
}
#endif // vtk_m_Particle_h

@ -160,7 +160,6 @@ set(extra_sources_device
PointAverage.cxx
VectorMagnitude.cxx
ExternalFaces.cxx
particleadvection/MemStream.cxx
particleadvection/Messenger.cxx
particleadvection/ParticleMessenger.cxx
)

@ -11,11 +11,10 @@
set(headers
BoundsMap.h
DataSetIntegrator.h
MemStream.h
Messenger.h
ParticleMessenger.h
ParticleAdvector.h
ParticleAdvector.hxx
ParticleMessenger.h
)
#-----------------------------------------------------------------------------

@ -1,101 +0,0 @@
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
//
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//============================================================================
#include <vtkm/filter/particleadvection/MemStream.h>
namespace vtkm
{
namespace filter
{
namespace particleadvection
{
MemStream::MemStream(std::size_t sz0)
: Data(nullptr)
, Len(0)
, MaxLen(0)
, Pos(0)
{
CheckSize(sz0);
}
MemStream::MemStream(std::size_t sz, const unsigned char* buff)
: Data(nullptr)
, Len(sz)
, MaxLen(sz)
, Pos(0)
{
this->Data = new unsigned char[this->Len];
std::memcpy(this->Data, buff, this->Len);
}
MemStream::MemStream(const MemStream& s)
{
this->Pos = s.GetPos();
this->Len = s.GetLen();
this->MaxLen = this->Len;
this->Data = new unsigned char[this->Len];
std::memcpy(this->Data, s.GetData(), this->Len);
}
MemStream::MemStream(MemStream&& s)
{
this->Pos = 0;
this->Len = s.GetLen();
this->MaxLen = this->Len;
this->Data = s.Data;
s.Pos = 0;
s.Len = 0;
s.MaxLen = 0;
s.Data = nullptr;
}
MemStream::~MemStream()
{
this->ClearMemStream();
}
void MemStream::ClearMemStream()
{
if (this->Data)
{
delete[] this->Data;
this->Data = nullptr;
}
this->Pos = 0;
this->Len = 0;
this->MaxLen = 0;
}
void MemStream::CheckSize(std::size_t sz)
{
std::size_t reqLen = this->Pos + sz;
if (reqLen > this->MaxLen)
{
std::size_t newLen = 2 * this->MaxLen; // double current size.
if (newLen < reqLen)
newLen = reqLen;
unsigned char* newData = new unsigned char[newLen];
if (this->Data)
{
std::memcpy(newData, this->Data, this->Len); // copy existing data to new buffer.
delete[] this->Data;
}
this->Data = newData;
this->MaxLen = newLen;
}
}
}
}
} // namespace vtkm::filter::particleadvection

@ -1,192 +0,0 @@
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
//
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//============================================================================
#ifndef vtk_m_filter_MemStream_h
#define vtk_m_filter_MemStream_h
#include <vtkm/filter/vtkm_filter_extra_export.h>
#include <cstring>
#include <iostream>
#include <list>
#include <string>
#include <vector>
namespace vtkm
{
namespace filter
{
namespace particleadvection
{
class MemStream
{
public:
MemStream(std::size_t sz0 = 32);
MemStream(std::size_t sz, const unsigned char* buff);
MemStream(const MemStream& s);
MemStream(MemStream&& s);
~MemStream();
void Rewind() { this->Pos = 0; }
std::size_t GetPos() const { return this->Pos; }
void SetPos(std::size_t p);
std::size_t GetLen() const { return this->Len; }
std::size_t GetCapacity() const { return this->MaxLen; }
unsigned char* GetData() const { return this->Data; }
//Read from buffer.
void ReadBinary(unsigned char* buff, const std::size_t& size);
//Write to buffer.
void WriteBinary(const unsigned char* buff, std::size_t size);
void ClearMemStream();
private:
// data members
unsigned char* Data;
std::size_t Len;
std::size_t MaxLen;
std::size_t Pos;
void CheckSize(std::size_t sz);
friend std::ostream& operator<<(std::ostream& out, const MemStream& m)
{
out << " MemStream(p= " << m.GetPos() << ", l= " << m.GetLen() << "[" << m.GetCapacity()
<< "]) data=[";
/*
for (std::size_t i=0; i < m.GetLen(); i++)
out<<(int)(m.Data[i])<<" ";
*/
out << "]";
return out;
}
};
inline void MemStream::ReadBinary(unsigned char* buff, const std::size_t& size)
{
std::size_t nBytes = sizeof(unsigned char) * size;
std::memcpy(buff, &this->Data[this->Pos], nBytes);
this->Pos += nBytes;
}
inline void MemStream::WriteBinary(const unsigned char* buff, std::size_t size)
{
std::size_t nBytes = sizeof(unsigned char) * size;
this->CheckSize(nBytes);
std::memcpy(&this->Data[this->Pos], buff, nBytes);
this->Pos += nBytes;
if (this->Pos > this->Len)
this->Len = this->Pos;
}
inline void MemStream::SetPos(std::size_t p)
{
this->Pos = p;
if (this->Pos > this->GetLen())
throw "MemStream::setPos failed";
}
template <typename T>
struct Serialization
{
#if (defined(__clang__) && !defined(__ppc64__)) || (defined(__GNUC__) && __GNUC__ >= 5)
static_assert(std::is_trivially_copyable<T>::value,
"Default serialization works only for trivially copyable types");
#endif
static void write(MemStream& memstream, const T& data)
{
memstream.WriteBinary((const unsigned char*)&data, sizeof(T));
}
static void read(MemStream& memstream, T& data)
{
memstream.ReadBinary((unsigned char*)&data, sizeof(T));
}
};
template <typename T>
static void write(MemStream& memstream, const T& data)
{
Serialization<T>::write(memstream, data);
}
template <typename T>
static void read(MemStream& memstream, T& data)
{
Serialization<T>::read(memstream, data);
}
template <class T>
struct Serialization<std::vector<T>>
{
static void write(MemStream& memstream, const std::vector<T>& data)
{
const std::size_t sz = data.size();
vtkm::filter::particleadvection::write(memstream, sz);
for (std::size_t i = 0; i < sz; i++)
vtkm::filter::particleadvection::write(memstream, data[i]);
}
static void read(MemStream& memstream, std::vector<T>& data)
{
std::size_t sz;
vtkm::filter::particleadvection::read(memstream, sz);
data.resize(sz);
for (std::size_t i = 0; i < sz; i++)
vtkm::filter::particleadvection::read(memstream, data[i]);
}
};
template <class T>
struct Serialization<std::list<T>>
{
static void write(MemStream& memstream, const std::list<T>& data)
{
vtkm::filter::particleadvection::write(memstream, data.size());
typename std::list<T>::const_iterator it;
for (it = data.begin(); it != data.end(); it++)
vtkm::filter::particleadvection::write(memstream, *it);
}
static void read(MemStream& memstream, std::list<T>& data)
{
std::size_t sz;
vtkm::filter::particleadvection::read(memstream, sz);
for (std::size_t i = 0; i < sz; i++)
{
T v;
vtkm::filter::particleadvection::read(memstream, v);
data.push_back(v);
}
}
};
template <class T, class U>
struct Serialization<std::pair<T, U>>
{
static void write(MemStream& memstream, const std::pair<T, U>& data)
{
vtkm::filter::particleadvection::write(memstream, data.first);
vtkm::filter::particleadvection::write(memstream, data.second);
}
static void read(MemStream& memstream, std::pair<T, U>& data)
{
vtkm::filter::particleadvection::read(memstream, data.first);
vtkm::filter::particleadvection::read(memstream, data.second);
}
};
}
}
} // namespace vtkm::filter::particleadvection
#endif

@ -29,6 +29,7 @@ VTKM_CONT
#ifdef VTKM_ENABLE_MPI
Messenger::Messenger(vtkmdiy::mpi::communicator& comm)
: MPIComm(vtkmdiy::mpi::mpi_cast(comm.handle()))
, MsgID(0)
, NumRanks(comm.size())
, Rank(comm.rank())
#else
@ -39,7 +40,7 @@ Messenger::Messenger(vtkmdiy::mpi::communicator& vtkmNotUsed(comm))
#ifdef VTKM_ENABLE_MPI
VTKM_CONT
void Messenger::RegisterTag(int tag, int num_recvs, int size)
void Messenger::RegisterTag(int tag, std::size_t num_recvs, std::size_t size)
{
if (this->MessageTagInfo.find(tag) != this->MessageTagInfo.end() || tag == TAG_ANY)
{
@ -47,28 +48,29 @@ void Messenger::RegisterTag(int tag, int num_recvs, int size)
msg << "Invalid message tag: " << tag << std::endl;
throw vtkm::cont::ErrorFilterExecution(msg.str());
}
this->MessageTagInfo[tag] = std::pair<int, int>(num_recvs, size);
this->MessageTagInfo[tag] = std::pair<std::size_t, std::size_t>(num_recvs, size);
}
int Messenger::CalcMessageBufferSize(int msgSz)
std::size_t Messenger::CalcMessageBufferSize(std::size_t msgSz)
{
return static_cast<int>(sizeof(int)) // rank
return sizeof(int) // rank
// std::vector<int> msg;
// msg.size()
+ static_cast<int>(sizeof(std::size_t))
+ sizeof(std::size_t)
// msgSz ints.
+ msgSz * static_cast<int>(sizeof(int));
+ msgSz * sizeof(int);
}
void Messenger::InitializeBuffers()
{
//Setup receive buffers.
std::map<int, std::pair<int, int>>::const_iterator it;
for (it = this->MessageTagInfo.begin(); it != this->MessageTagInfo.end(); it++)
for (const auto& it : this->MessageTagInfo)
{
int tag = it->first, num = it->second.first;
for (int i = 0; i < num; i++)
this->PostRecv(tag);
int tag = it.first;
std::size_t num = it.second.first;
std::size_t sz = it.second.second;
for (std::size_t i = 0; i < num; i++)
this->PostRecv(tag, sz);
}
}
@ -83,12 +85,11 @@ void Messenger::CleanupRequests(int tag)
if (!delKeys.empty())
{
std::vector<RequestTagPair>::const_iterator it;
for (it = delKeys.begin(); it != delKeys.end(); it++)
for (const auto& it : delKeys)
{
RequestTagPair v = *it;
RequestTagPair v = it;
unsigned char* buff = this->RecvBuffers[v];
char* buff = this->RecvBuffers[v];
MPI_Cancel(&(v.first));
delete[] buff;
this->RecvBuffers.erase(v);
@ -103,10 +104,10 @@ void Messenger::PostRecv(int tag)
this->PostRecv(tag, it->second.second);
}
void Messenger::PostRecv(int tag, int sz, int src)
void Messenger::PostRecv(int tag, std::size_t sz, int src)
{
sz += sizeof(Messenger::Header);
unsigned char* buff = new unsigned char[sz];
char* buff = new char[sz];
memset(buff, 0, sz);
MPI_Request req;
@ -139,9 +140,9 @@ void Messenger::CheckPendingSendRequests()
MPI_Status* status = new MPI_Status[req.size()];
int err = MPI_Testsome(req.size(), &req[0], &num, indices, status);
if (err != MPI_SUCCESS)
{
std::cerr << "Err with MPI_Testsome in PARIC algorithm" << std::endl;
}
throw vtkm::cont::ErrorFilterExecution(
"Error iwth MPI_Testsome in Messenger::CheckPendingSendRequests");
for (int i = 0; i < num; i++)
{
MPI_Request r = copy[indices[i]];
@ -160,7 +161,7 @@ void Messenger::CheckPendingSendRequests()
delete[] status;
}
bool Messenger::PacketCompare(const unsigned char* a, const unsigned char* b)
bool Messenger::PacketCompare(const char* a, const char* b)
{
Messenger::Header ha, hb;
memcpy(&ha, a, sizeof(ha));
@ -169,7 +170,9 @@ bool Messenger::PacketCompare(const unsigned char* a, const unsigned char* b)
return ha.packet < hb.packet;
}
void Messenger::PrepareForSend(int tag, MemStream* buff, std::vector<unsigned char*>& buffList)
void Messenger::PrepareForSend(int tag,
const vtkmdiy::MemoryBuffer& buff,
std::vector<char*>& buffList)
{
auto it = this->MessageTagInfo.find(tag);
if (it == this->MessageTagInfo.end())
@ -179,24 +182,23 @@ void Messenger::PrepareForSend(int tag, MemStream* buff, std::vector<unsigned ch
throw vtkm::cont::ErrorFilterExecution(msg.str());
}
int bytesLeft = buff->GetLen();
int maxDataLen = it->second.second;
std::size_t bytesLeft = buff.size();
std::size_t maxDataLen = it->second.second;
Messenger::Header header;
header.tag = tag;
header.rank = this->Rank;
header.id = this->MsgID;
header.id = this->GetMsgID();
header.numPackets = 1;
if (buff->GetLen() > (unsigned int)maxDataLen)
header.numPackets += buff->GetLen() / maxDataLen;
if (buff.size() > maxDataLen)
header.numPackets += buff.size() / maxDataLen;
header.packet = 0;
header.packetSz = 0;
header.dataSz = 0;
this->MsgID++;
buffList.resize(header.numPackets);
size_t pos = 0;
for (int i = 0; i < header.numPackets; i++)
std::size_t pos = 0;
for (std::size_t i = 0; i < header.numPackets; i++)
{
header.packet = i;
if (i == (header.numPackets - 1))
@ -205,15 +207,15 @@ void Messenger::PrepareForSend(int tag, MemStream* buff, std::vector<unsigned ch
header.dataSz = maxDataLen;
header.packetSz = header.dataSz + sizeof(header);
unsigned char* b = new unsigned char[header.packetSz];
char* b = new char[header.packetSz];
//Write the header.
unsigned char* bPtr = b;
char* bPtr = b;
memcpy(bPtr, &header, sizeof(header));
bPtr += sizeof(header);
//Write the data.
memcpy(bPtr, &buff->GetData()[pos], header.dataSz);
memcpy(bPtr, &buff.buffer[pos], header.dataSz);
pos += header.dataSz;
buffList[i] = b;
@ -221,50 +223,46 @@ void Messenger::PrepareForSend(int tag, MemStream* buff, std::vector<unsigned ch
}
}
void Messenger::SendData(int dst, int tag, MemStream* buff)
void Messenger::SendData(int dst, int tag, const vtkmdiy::MemoryBuffer& buff)
{
std::vector<unsigned char*> bufferList;
std::vector<char*> bufferList;
//Add headers, break into multiple buffers if needed.
PrepareForSend(tag, buff, bufferList);
Messenger::Header header;
for (size_t i = 0; i < bufferList.size(); i++)
for (std::size_t i = 0; i < bufferList.size(); i++)
{
memcpy(&header, bufferList[i], sizeof(header));
MPI_Request req;
int err = MPI_Isend(bufferList[i], header.packetSz, MPI_BYTE, dst, tag, this->MPIComm, &req);
if (err != MPI_SUCCESS)
{
std::cerr << "Err with MPI_Isend in SendData algorithm" << std::endl;
}
throw vtkm::cont::ErrorFilterExecution("Error in MPI_Isend inside Messenger::SendData");
//Add it to sendBuffers
RequestTagPair entry(req, tag);
this->SendBuffers[entry] = bufferList[i];
}
delete buff;
}
bool Messenger::RecvData(int tag, std::vector<MemStream*>& buffers, bool blockAndWait)
bool Messenger::RecvData(int tag, std::vector<vtkmdiy::MemoryBuffer>& buffers, bool blockAndWait)
{
std::set<int> setTag;
setTag.insert(tag);
std::vector<std::pair<int, MemStream*>> b;
std::vector<std::pair<int, vtkmdiy::MemoryBuffer>> b;
buffers.resize(0);
if (RecvData(setTag, b, blockAndWait))
{
buffers.resize(b.size());
for (size_t i = 0; i < b.size(); i++)
buffers[i] = b[i].second;
for (std::size_t i = 0; i < b.size(); i++)
buffers[i] = std::move(b[i].second);
return true;
}
return false;
}
bool Messenger::RecvData(std::set<int>& tags,
std::vector<std::pair<int, MemStream*>>& buffers,
std::vector<std::pair<int, vtkmdiy::MemoryBuffer>>& buffers,
bool blockAndWait)
{
buffers.resize(0);
@ -299,7 +297,7 @@ bool Messenger::RecvData(std::set<int>& tags,
return false;
}
std::vector<unsigned char*> incomingBuffers(num);
std::vector<char*> incomingBuffers(num);
for (int i = 0; i < num; i++)
{
RequestTagPair entry(copy[indices[i]], reqTags[indices[i]]);
@ -326,12 +324,12 @@ bool Messenger::RecvData(std::set<int>& tags,
return !buffers.empty();
}
void Messenger::ProcessReceivedBuffers(std::vector<unsigned char*>& incomingBuffers,
std::vector<std::pair<int, MemStream*>>& buffers)
void Messenger::ProcessReceivedBuffers(std::vector<char*>& incomingBuffers,
std::vector<std::pair<int, vtkmdiy::MemoryBuffer>>& buffers)
{
for (size_t i = 0; i < incomingBuffers.size(); i++)
for (std::size_t i = 0; i < incomingBuffers.size(); i++)
{
unsigned char* buff = incomingBuffers[i];
char* buff = incomingBuffers[i];
//Grab the header.
Messenger::Header header;
@ -340,10 +338,12 @@ void Messenger::ProcessReceivedBuffers(std::vector<unsigned char*>& incomingBuff
//Only 1 packet, strip off header and add to list.
if (header.numPackets == 1)
{
MemStream* b = new MemStream(header.dataSz, (buff + sizeof(header)));
b->Rewind();
std::pair<int, MemStream*> entry(header.tag, b);
buffers.push_back(entry);
std::pair<int, vtkmdiy::MemoryBuffer> entry;
entry.first = header.tag;
entry.second.save_binary((char*)(buff + sizeof(header)), header.dataSz);
entry.second.reset();
buffers.push_back(std::move(entry));
delete[] buff;
}
@ -356,7 +356,7 @@ void Messenger::ProcessReceivedBuffers(std::vector<unsigned char*>& incomingBuff
//First packet. Create a new list and add it.
if (i2 == this->RecvPackets.end())
{
std::list<unsigned char*> l;
std::list<char*> l;
l.push_back(buff);
this->RecvPackets[k] = l;
}
@ -365,33 +365,32 @@ void Messenger::ProcessReceivedBuffers(std::vector<unsigned char*>& incomingBuff
i2->second.push_back(buff);
// The last packet came in, merge into one MemStream.
if (i2->second.size() == (size_t)header.numPackets)
if (i2->second.size() == header.numPackets)
{
//Sort the packets into proper order.
i2->second.sort(Messenger::PacketCompare);
MemStream* mergedBuff = new MemStream;
std::list<unsigned char*>::iterator listIt;
for (listIt = i2->second.begin(); listIt != i2->second.end(); listIt++)
std::pair<int, vtkmdiy::MemoryBuffer> entry;
entry.first = header.tag;
for (const auto& listIt : i2->second)
{
unsigned char* bi = *listIt;
char* bi = listIt;
Messenger::Header header2;
memcpy(&header2, bi, sizeof(header2));
mergedBuff->WriteBinary((bi + sizeof(header2)), header2.dataSz);
entry.second.save_binary((char*)(bi + sizeof(header2)), header2.dataSz);
delete[] bi;
}
mergedBuff->Rewind();
std::pair<int, MemStream*> entry(header.tag, mergedBuff);
buffers.push_back(entry);
entry.second.reset();
buffers.push_back(std::move(entry));
this->RecvPackets.erase(i2);
}
}
}
}
}
#endif
}
}

@ -11,8 +11,10 @@
#ifndef vtk_m_filter_Messenger_h
#define vtk_m_filter_Messenger_h
#include <vtkm/filter/vtkm_filter_extra_export.h>
#include <vtkm/Types.h>
#include <vtkm/filter/particleadvection/MemStream.h>
#include <vtkm/cont/Serialization.h>
#include <vtkm/thirdparty/diy/diy.h>
#include <list>
@ -43,56 +45,54 @@ public:
}
#ifdef VTKM_ENABLE_MPI
VTKM_CONT void RegisterTag(int tag, int numRecvs, int size);
VTKM_CONT void RegisterTag(int tag, std::size_t numRecvs, std::size_t size);
protected:
void InitializeBuffers();
void CleanupRequests(int tag = TAG_ANY);
void CheckPendingSendRequests();
void PostRecv(int tag);
void PostRecv(int tag, int sz, int src = -1);
void SendData(int dst, int tag, MemStream* buff);
void PostRecv(int tag, std::size_t sz, int src = -1);
void SendData(int dst, int tag, const vtkmdiy::MemoryBuffer& buff);
bool RecvData(std::set<int>& tags,
std::vector<std::pair<int, MemStream*>>& buffers,
std::vector<std::pair<int, vtkmdiy::MemoryBuffer>>& buffers,
bool blockAndWait = false);
//Message headers.
typedef struct
{
int rank, id, tag, numPackets, packet, packetSz, dataSz;
int rank, tag;
std::size_t id, numPackets, packet, packetSz, dataSz;
} Header;
bool RecvData(int tag, std::vector<MemStream*>& buffers, bool blockAndWait = false);
void AddHeader(MemStream* buff);
void RemoveHeader(MemStream* input, MemStream* header, MemStream* buff);
bool RecvData(int tag, std::vector<vtkmdiy::MemoryBuffer>& buffers, bool blockAndWait = false);
template <typename P>
bool DoSendICs(int dst, std::vector<P>& ics);
void PrepareForSend(int tag, MemStream* buff, std::vector<unsigned char*>& buffList);
static bool PacketCompare(const unsigned char* a, const unsigned char* b);
void ProcessReceivedBuffers(std::vector<unsigned char*>& incomingBuffers,
std::vector<std::pair<int, MemStream*>>& buffers);
void PrepareForSend(int tag, const vtkmdiy::MemoryBuffer& buff, std::vector<char*>& buffList);
vtkm::Id GetMsgID() { return this->MsgID++; }
static bool PacketCompare(const char* a, const char* b);
void ProcessReceivedBuffers(std::vector<char*>& incomingBuffers,
std::vector<std::pair<int, vtkmdiy::MemoryBuffer>>& buffers);
// Send/Recv buffer management structures.
using RequestTagPair = std::pair<MPI_Request, int>;
using RankIdPair = std::pair<int, int>;
//Member data
std::map<int, std::pair<int, int>> MessageTagInfo;
std::map<int, std::pair<std::size_t, std::size_t>> MessageTagInfo;
MPI_Comm MPIComm;
vtkm::Id MsgID;
std::size_t MsgID;
int NumRanks;
int Rank;
std::map<RequestTagPair, unsigned char*> RecvBuffers;
std::map<RankIdPair, std::list<unsigned char*>> RecvPackets;
std::map<RequestTagPair, unsigned char*> SendBuffers;
std::map<RequestTagPair, char*> RecvBuffers;
std::map<RankIdPair, std::list<char*>> RecvPackets;
std::map<RequestTagPair, char*> SendBuffers;
static constexpr int TAG_ANY = -1;
#else
static constexpr int NumRanks = 1;
static constexpr int Rank = 0;
#endif
static int CalcMessageBufferSize(int msgSz);
static std::size_t CalcMessageBufferSize(std::size_t msgSz);
};
}
}

@ -8,12 +8,12 @@
// PURPOSE. See the above copyright notice for more information.
//============================================================================
#include <vtkm/filter/particleadvection/MemStream.h>
#include <vtkm/filter/particleadvection/ParticleMessenger.h>
#include <iostream>
#include <string.h>
#include <vtkm/cont/Logging.h>
#include <vtkm/cont/Serialization.h>
namespace vtkm
{
@ -26,36 +26,44 @@ VTKM_CONT
ParticleMessenger::ParticleMessenger(vtkmdiy::mpi::communicator& comm,
const vtkm::filter::particleadvection::BoundsMap& boundsMap,
int msgSz,
int numParticles)
int numParticles,
int numBlockIds)
: Messenger(comm)
#ifdef VTKM_ENABLE_MPI
, BoundsMap(boundsMap)
#endif
{
#ifdef VTKM_ENABLE_MPI
this->RegisterMessages(msgSz, numParticles);
this->RegisterMessages(msgSz, numParticles, numBlockIds);
#else
(void)(boundsMap);
(void)(msgSz);
(void)(numParticles);
(void)(numBlockIds);
#endif
}
int ParticleMessenger::CalcParticleBufferSize(int nParticles, int nBlockIds)
std::size_t ParticleMessenger::CalcParticleBufferSize(std::size_t nParticles, std::size_t nBlockIds)
{
constexpr std::size_t pSize = sizeof(vtkm::Vec3f) // Pos
+ sizeof(vtkm::Id) // ID
+ sizeof(vtkm::Id) // NumSteps
+ sizeof(vtkm::UInt8) // Status
+ sizeof(vtkm::FloatDefault); // Time
return
// rank
static_cast<int>(sizeof(int))
sizeof(int)
//std::vector<vtkm::Massless> p;
//p.size()
+ static_cast<int>(sizeof(std::size_t))
+ sizeof(std::size_t)
//nParticles of vtkm::Massless
+ nParticles * static_cast<int>(sizeof(vtkm::Massless))
+ nParticles * pSize
// std::vector<vtkm::Id> blockIDs for each particle.
// blockIDs.size() for each particle
+ nParticles * static_cast<int>(sizeof(std::size_t))
+ nParticles * sizeof(std::size_t)
// nBlockIDs of vtkm::Id for each particle.
+ nParticles * nBlockIds * static_cast<int>(sizeof(vtkm::Id));
+ nParticles * nBlockIds * sizeof(vtkm::Id);
}
VTKM_CONT
@ -93,7 +101,7 @@ void ParticleMessenger::Exchange(const std::vector<vtkm::Massless>& outData,
//dstRank, vector of (particles,blockIDs)
std::map<int, std::vector<ParticleCommType>> sendData;
for (auto& p : outData)
for (const auto& p : outData)
{
const auto& bids = outBlockIDsMap.find(p.ID)->second;
int dstRank = this->BoundsMap.FindRank(bids[0]);
@ -105,7 +113,7 @@ void ParticleMessenger::Exchange(const std::vector<vtkm::Massless>& outData,
std::vector<MsgCommType> msgData;
if (RecvAny(&msgData, &particleData, false))
{
for (auto& it : particleData)
for (const auto& it : particleData)
for (const auto& v : it.second)
{
const auto& p = v.first;
@ -114,7 +122,7 @@ void ParticleMessenger::Exchange(const std::vector<vtkm::Massless>& outData,
inDataBlockIDsMap[p.ID] = bids;
}
for (auto& m : msgData)
for (const auto& m : msgData)
{
if (m.second[0] == MSG_TERMINATE)
numTerminateMessages += static_cast<vtkm::Id>(m.second[1]);
@ -123,10 +131,7 @@ void ParticleMessenger::Exchange(const std::vector<vtkm::Massless>& outData,
//Do all the sending...
if (numLocalTerm > 0)
{
std::vector<int> msg = { MSG_TERMINATE, static_cast<int>(numLocalTerm) };
SendAllMsg(msg);
}
SendAllMsg({ MSG_TERMINATE, static_cast<int>(numLocalTerm) });
this->SendParticles(sendData);
this->CheckPendingSendRequests();
@ -136,14 +141,18 @@ void ParticleMessenger::Exchange(const std::vector<vtkm::Massless>& outData,
#ifdef VTKM_ENABLE_MPI
VTKM_CONT
void ParticleMessenger::RegisterMessages(int msgSz, int nParticles)
void ParticleMessenger::RegisterMessages(int msgSz, int nParticles, int numBlockIds)
{
//Determine buffer size for msg and particle tags.
int messageBuffSz = CalcMessageBufferSize(msgSz + 1);
int particleBuffSz = CalcParticleBufferSize(nParticles);
std::size_t messageBuffSz = CalcMessageBufferSize(msgSz + 1);
std::size_t particleBuffSz = CalcParticleBufferSize(nParticles, numBlockIds);
int numRecvs = std::min(64, this->NumRanks - 1);
if (this->Rank == 0)
std::cout << "RegisterMessages: np= " << nParticles << " " << numBlockIds
<< " buffsz= " << particleBuffSz << std::endl;
this->RegisterTag(ParticleMessenger::MESSAGE_TAG, numRecvs, messageBuffSz);
this->RegisterTag(ParticleMessenger::PARTICLE_TAG, numRecvs, particleBuffSz);
@ -153,11 +162,11 @@ void ParticleMessenger::RegisterMessages(int msgSz, int nParticles)
VTKM_CONT
void ParticleMessenger::SendMsg(int dst, const std::vector<int>& msg)
{
MemStream* buff = new MemStream();
vtkmdiy::MemoryBuffer buff;
//Write data.
vtkm::filter::particleadvection::write(*buff, this->Rank);
vtkm::filter::particleadvection::write(*buff, msg);
vtkmdiy::save(buff, this->Rank);
vtkmdiy::save(buff, msg);
this->SendData(dst, ParticleMessenger::MESSAGE_TAG, buff);
}
@ -189,7 +198,7 @@ bool ParticleMessenger::RecvAny(std::vector<MsgCommType>* msgs,
if (tags.empty())
return false;
std::vector<std::pair<int, MemStream*>> buffers;
std::vector<std::pair<int, vtkmdiy::MemoryBuffer>> buffers;
if (!this->RecvData(tags, buffers, blockAndWait))
return false;
@ -199,30 +208,19 @@ bool ParticleMessenger::RecvAny(std::vector<MsgCommType>* msgs,
{
int sendRank;
std::vector<int> m;
vtkm::filter::particleadvection::read(*buffers[i].second, sendRank);
vtkm::filter::particleadvection::read(*buffers[i].second, m);
vtkmdiy::load(buffers[i].second, sendRank);
vtkmdiy::load(buffers[i].second, m);
msgs->push_back(std::make_pair(sendRank, m));
}
else if (buffers[i].first == ParticleMessenger::PARTICLE_TAG)
{
int sendRank;
std::size_t num;
vtkm::filter::particleadvection::read(*buffers[i].second, sendRank);
vtkm::filter::particleadvection::read(*buffers[i].second, num);
if (num > 0)
{
std::vector<ParticleCommType> particles(num);
for (std::size_t j = 0; j < num; j++)
{
vtkm::filter::particleadvection::read(*(buffers[i].second), particles[j].first);
vtkm::filter::particleadvection::read(*(buffers[i].second), particles[j].second);
}
recvParticles->push_back(std::make_pair(sendRank, particles));
}
}
std::vector<ParticleCommType> particles;
delete buffers[i].second;
vtkmdiy::load(buffers[i].second, sendRank);
vtkmdiy::load(buffers[i].second, particles);
recvParticles->push_back(std::make_pair(sendRank, particles));
}
}
return true;

@ -11,7 +11,10 @@
#ifndef vtk_m_filter_ParticleMessenger_h
#define vtk_m_filter_ParticleMessenger_h
#include <vtkm/filter/vtkm_filter_extra_export.h>
#include <vtkm/Particle.h>
#include <vtkm/cont/Serialization.h>
#include <vtkm/filter/particleadvection/BoundsMap.h>
#include <vtkm/filter/particleadvection/Messenger.h>
@ -42,7 +45,8 @@ public:
VTKM_CONT ParticleMessenger(vtkmdiy::mpi::communicator& comm,
const vtkm::filter::particleadvection::BoundsMap& bm,
int msgSz = 1,
int numParticles = 128);
int numParticles = 128,
int numBlockIds = 2);
VTKM_CONT ~ParticleMessenger() {}
VTKM_CONT void Exchange(const std::vector<vtkm::Massless>& outData,
@ -62,9 +66,9 @@ protected:
PARTICLE_TAG = 0x42001
};
VTKM_CONT void RegisterMessages(int msgSz, int nParticles);
VTKM_CONT void RegisterMessages(int msgSz, int nParticles, int numBlockIds);
// Send/Recv Integral curves.
// Send/Recv particles
VTKM_CONT
template <typename P,
template <typename, typename>
@ -98,7 +102,7 @@ protected:
std::vector<vtkm::Massless>& inData,
std::map<vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap) const;
static int CalcParticleBufferSize(int nParticles, int numBlockIds = 2);
static std::size_t CalcParticleBufferSize(std::size_t nParticles, std::size_t numBlockIds = 2);
};
@ -115,11 +119,10 @@ inline void ParticleMessenger::SendParticles(int dst, const Container<P, Allocat
if (c.empty())
return;
vtkm::filter::particleadvection::MemStream* buff =
new vtkm::filter::particleadvection::MemStream();
vtkm::filter::particleadvection::write(*buff, this->Rank);
vtkm::filter::particleadvection::write(*buff, c);
this->SendData(dst, ParticleMessenger::PARTICLE_TAG, buff);
vtkmdiy::MemoryBuffer bb;
vtkmdiy::save(bb, this->Rank);
vtkmdiy::save(bb, c);
this->SendData(dst, ParticleMessenger::PARTICLE_TAG, bb);
}
VTKM_CONT
@ -131,34 +134,6 @@ inline void ParticleMessenger::SendParticles(const std::map<int, Container<P, Al
this->SendParticles(mit->first, mit->second);
}
#endif
template <>
struct Serialization<vtkm::Massless>
{
static void write(vtkm::filter::particleadvection::MemStream& memstream,
const vtkm::Massless& data)
{
vtkm::filter::particleadvection::write(memstream, data.Pos[0]);
vtkm::filter::particleadvection::write(memstream, data.Pos[1]);
vtkm::filter::particleadvection::write(memstream, data.Pos[2]);
vtkm::filter::particleadvection::write(memstream, data.ID);
vtkm::filter::particleadvection::write(memstream, data.Status);
vtkm::filter::particleadvection::write(memstream, data.NumSteps);
vtkm::filter::particleadvection::write(memstream, data.Time);
}
static void read(vtkm::filter::particleadvection::MemStream& memstream, vtkm::Massless& data)
{
vtkm::filter::particleadvection::read(memstream, data.Pos[0]);
vtkm::filter::particleadvection::read(memstream, data.Pos[1]);
vtkm::filter::particleadvection::read(memstream, data.Pos[2]);
vtkm::filter::particleadvection::read(memstream, data.ID);
vtkm::filter::particleadvection::read(memstream, data.Status);
vtkm::filter::particleadvection::read(memstream, data.NumSteps);
vtkm::filter::particleadvection::read(memstream, data.Time);
}
};
}
}
} // namespace vtkm::filter::particleadvection