Support for syncronous communication.

This commit is contained in:
Dave Pugmire 2022-12-01 11:04:47 -05:00
parent 2c2f5677d2
commit c9f2c7fe67
6 changed files with 260 additions and 26 deletions

@ -96,18 +96,22 @@ public:
vtkm::Id nLocal = static_cast<vtkm::Id>(this->Active.size() + this->Inactive.size());
this->ComputeTotalNumParticles(nLocal);
messenger.Log << "Begin" << std::endl;
while (this->TotalNumTerminatedParticles < this->TotalNumParticles)
{
messenger.Log << " TotNumTerminated= " << this->TotalNumTerminatedParticles << std::endl;
std::vector<ParticleType> v;
vtkm::Id numTerm = 0, blockId = -1;
if (this->GetActiveParticles(v, blockId))
{
messenger.Log << " Advect " << v.size() << " in block " << blockId << std::endl;
//make this a pointer to avoid the copy?
auto& block = this->GetDataSet(blockId);
DSIHelperInfoType bb =
DSIHelperInfo<ParticleType>(v, this->BoundsMap, this->ParticleBlockIDsMap);
block.Advect(bb, this->StepSize, this->NumberOfSteps);
numTerm = this->UpdateResult(bb.Get<DSIHelperInfo<ParticleType>>());
messenger.Log << " numTerm= " << numTerm << std::endl;
}
vtkm::Id numTermMessages = 0;
@ -117,6 +121,7 @@ public:
if (this->TotalNumTerminatedParticles > this->TotalNumParticles)
throw vtkm::cont::ErrorFilterExecution("Particle count error");
}
messenger.Log << "Done" << std::endl;
}
@ -194,16 +199,23 @@ public:
std::vector<ParticleType> incoming;
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>> incomingIDs;
numTermMessages = 0;
bool block = this->GetBlockAndWait(messenger.UsingSyncCommunication(), numLocalTerminations);
messenger.Log << " Communicate: AI= " << this->Active.size() << " " << this->Inactive.size()
<< " localTerm= " << numLocalTerminations << " Block= " << block << std::endl;
messenger.Exchange(this->Inactive,
this->ParticleBlockIDsMap,
numLocalTerminations,
incoming,
incomingIDs,
numTermMessages,
this->GetBlockAndWait(numLocalTerminations));
block);
//this->GetBlockAndWait(messenger.UsingSyncCommunication(), numLocalTerminations));
this->Inactive.clear();
this->UpdateActive(incoming, incomingIDs);
messenger.Log << " Communicate Done: AI= " << this->Active.size() << " "
<< this->Inactive.size() << " numTermMsg= " << numTermMessages << std::endl;
}
virtual void UpdateActive(const std::vector<ParticleType>& particles,
@ -245,20 +257,29 @@ public:
return numTerm;
}
virtual bool GetBlockAndWait(const vtkm::Id& numLocalTerm)
virtual bool GetBlockAndWait(const bool& syncComm, const vtkm::Id& numLocalTerm)
{
//There are only two cases where blocking would deadlock.
//1. There are active particles.
//2. numLocalTerm + this->TotalNumberOfTerminatedParticles == this->TotalNumberOfParticles
//So, if neither are true, we can safely block and wait for communication to come in.
bool haveNoWork = this->Active.empty() && this->Inactive.empty();
if (this->Active.empty() && this->Inactive.empty() &&
(numLocalTerm + this->TotalNumTerminatedParticles < this->TotalNumParticles))
//Using syncronous communication we should only block and wait if we have no particles
if (syncComm)
{
return true;
return haveNoWork;
}
else
{
//Otherwise, for asyncronous communication, there are only two cases where blocking would deadlock.
//1. There are active particles.
//2. numLocalTerm + this->TotalNumberOfTerminatedParticles == this->TotalNumberOfParticles
//So, if neither are true, we can safely block and wait for communication to come in.
return false;
if (haveNoWork &&
(numLocalTerm + this->TotalNumTerminatedParticles < this->TotalNumParticles))
return true;
return false;
}
}
//Member data

@ -52,6 +52,8 @@ public:
std::vector<std::thread> workerThreads;
workerThreads.emplace_back(std::thread(AdvectAlgorithmThreaded::Worker, this));
this->Comm.barrier();
//MPI_Barrier(this->Comm);
this->Manage();
//This will only work for 1 thread. For > 1, the Blocks will need a mutex.
@ -136,6 +138,7 @@ protected:
vtkm::filter::flow::internal::ParticleMessenger<ParticleType> messenger(
this->Comm, this->BoundsMap, 1, 128);
messenger.Log << "Begin" << std::endl;
while (this->TotalNumTerminatedParticles < this->TotalNumParticles)
{
std::unordered_map<vtkm::Id, std::vector<DSIHelperInfoType>> workerResults;
@ -147,6 +150,8 @@ protected:
for (auto& r : it.second)
numTerm += this->UpdateResult(r.Get<DSIHelperInfo<ParticleType>>());
}
messenger.Log << " Advected: " << workerResults.size() << " numTerm= " << numTerm
<< std::endl;
vtkm::Id numTermMessages = 0;
this->Communicate(messenger, numTerm, numTermMessages);
@ -156,17 +161,24 @@ protected:
throw vtkm::cont::ErrorFilterExecution("Particle count error");
}
messenger.Log << "DONE" << std::endl;
//Let the workers know that we are done.
this->SetDone();
//Do one last communicate to send out the messages.
vtkm::Id dummy;
this->Communicate(messenger, 0, dummy);
}
bool GetBlockAndWait(const vtkm::Id& numLocalTerm) override
bool GetBlockAndWait(const bool& syncComm, const vtkm::Id& numLocalTerm) override
{
std::lock_guard<std::mutex> lock(this->Mutex);
if (this->Done)
return true;
return (
this->AdvectAlgorithm<DSIType, ResultType, ParticleType>::GetBlockAndWait(numLocalTerm) &&
!this->WorkerActivate && this->WorkerResults.empty());
return (this->AdvectAlgorithm<DSIType, ResultType, ParticleType>::GetBlockAndWait(
syncComm, numLocalTerm) &&
!this->WorkerActivate && this->WorkerResults.empty());
}
void GetWorkerResults(std::unordered_map<vtkm::Id, std::vector<DSIHelperInfoType>>& results)

@ -8,6 +8,9 @@
// PURPOSE. See the above copyright notice for more information.
//============================================================================
int R = 1;
#include <vtkm/Math.h>
#include <vtkm/cont/ErrorFilterExecution.h>
#include <vtkm/filter/flow/internal/Messenger.h>
@ -39,6 +42,8 @@ Messenger::Messenger(vtkmdiy::mpi::communicator& comm)
Messenger::Messenger(vtkmdiy::mpi::communicator& vtkmNotUsed(comm))
#endif
{
std::string fname = "out." + std::to_string(this->Rank) + ".log";
this->Log.open(fname, std::ofstream::out);
}
#ifdef VTKM_ENABLE_MPI
@ -244,7 +249,7 @@ void Messenger::PrepareForSend(int tag,
}
}
void Messenger::SendData(int dst, int tag, const vtkmdiy::MemoryBuffer& buff)
void Messenger::SendDataAsync(int dst, int tag, const vtkmdiy::MemoryBuffer& buff)
{
std::vector<char*> bufferList;
@ -266,6 +271,27 @@ void Messenger::SendData(int dst, int tag, const vtkmdiy::MemoryBuffer& buff)
}
}
void Messenger::SendDataSync(int dst, int tag, vtkmdiy::MemoryBuffer& buff)
{
this->Log << " SendDataSync: sendTo= " << dst << " tag= " << tag << " sz= " << buff.size()
<< std::endl;
auto entry = std::make_pair(dst, std::move(buff));
auto it = this->SyncSendBuffers.find(tag);
if (it == this->SyncSendBuffers.end())
{
std::vector<std::pair<int, vtkmdiy::MemoryBuffer>> vec;
vec.push_back(std::move(entry));
this->SyncSendBuffers.insert(std::make_pair(tag, std::move(vec)));
}
else
it->second.emplace_back(std::move(entry));
it = this->SyncSendBuffers.find(tag);
this->Log << " SendDataSync: SyncSendBuffs[tag] = " << it->second.size() << std::endl;
}
/*
bool Messenger::RecvData(int tag, std::vector<vtkmdiy::MemoryBuffer>& buffers, bool blockAndWait)
{
std::set<int> setTag;
@ -281,10 +307,11 @@ bool Messenger::RecvData(int tag, std::vector<vtkmdiy::MemoryBuffer>& buffers, b
}
return false;
}
*/
bool Messenger::RecvData(const std::set<int>& tags,
std::vector<std::pair<int, vtkmdiy::MemoryBuffer>>& buffers,
bool blockAndWait)
bool Messenger::RecvDataAsync(const std::set<int>& tags,
std::vector<std::pair<int, vtkmdiy::MemoryBuffer>>& buffers,
bool blockAndWait)
{
buffers.resize(0);
@ -316,6 +343,120 @@ bool Messenger::RecvData(const std::set<int>& tags,
return !buffers.empty();
}
bool Messenger::RecvDataSync(const std::set<int>& tags,
std::vector<std::pair<int, vtkmdiy::MemoryBuffer>>& buffers,
bool blockAndWait)
{
buffers.resize(0);
this->Log << " RecvDataSync: block= " << blockAndWait << std::endl;
if (!blockAndWait)
return false;
//Exchange data
for (auto tag : tags)
{
//Determine the number of messages being sent to each rank and the maximum size.
std::vector<int> maxBuffSize(this->NumRanks, 0), numMessages(this->NumRanks, 0);
const auto& it = this->SyncSendBuffers.find(tag);
if (it != this->SyncSendBuffers.end())
{
for (const auto& i : it->second)
{
int buffSz = i.second.size();
maxBuffSize[i.first] =
vtkm::Max(maxBuffSize[i.first], buffSz); //static_cast<int>(i.second.size()));
numMessages[i.first]++;
}
}
int err = MPI_Allreduce(
MPI_IN_PLACE, maxBuffSize.data(), this->NumRanks, MPI_INT, MPI_MAX, this->MPIComm);
if (err != MPI_SUCCESS)
throw vtkm::cont::ErrorFilterExecution("Error in MPI_Isend inside Messenger::RecvDataSync");
err = MPI_Allreduce(
MPI_IN_PLACE, numMessages.data(), this->NumRanks, MPI_INT, MPI_SUM, this->MPIComm);
if (err != MPI_SUCCESS)
throw vtkm::cont::ErrorFilterExecution("Error in MPI_Isend inside Messenger::RecvDataSync");
this->Log << " tag= " << tag << std::endl;
this->Log << " numMessages= " << numMessages << std::endl;
this->Log << " maxBuffSize= " << maxBuffSize << std::endl;
MPI_Status status;
std::vector<char> recvBuff;
for (int r = 0; r < this->NumRanks; r++)
{
int numMsgs = numMessages[r];
this->Log << " R= " << r << " nMsgs= " << numMsgs << std::endl;
if (numMsgs == 0)
continue;
//Rank r needs some stuff..
if (this->Rank == r)
{
int maxSz = maxBuffSize[r];
recvBuff.resize(maxSz);
for (int n = 0; n < numMsgs; n++)
{
this->Log << " Recv " << n << " of " << numMsgs << " sz= " << maxSz << std::endl;
err =
MPI_Recv(recvBuff.data(), maxSz, MPI_BYTE, MPI_ANY_SOURCE, 0, this->MPIComm, &status);
if (err != MPI_SUCCESS)
throw vtkm::cont::ErrorFilterExecution(
"Error in MPI_Recv inside Messenger::RecvDataSync");
this->Log << " Recv " << n << " of " << numMsgs << " sz= " << maxSz << std::endl;
this->Log << " Recv complete: " << status.MPI_SOURCE << " " << status.MPI_TAG << " "
<< status.MPI_ERROR << std::endl;
std::pair<int, vtkmdiy::MemoryBuffer> entry;
entry.first = tag;
entry.second.buffer = recvBuff;
buffers.emplace_back(std::move(entry));
}
}
else
{
if (it != this->SyncSendBuffers.end())
{
//it = <tag, <dst,buffer>>
this->Log << " Send to: " << r << " I have " << it->second.size() << std::endl;
for (const auto& dst_buff : it->second)
{
this->Log << " SendDst= " << dst_buff.first << std::endl;
if (dst_buff.first == r)
{
this->Log << " Send to " << r << " sz= " << dst_buff.second.size() << std::endl;
err = MPI_Send(dst_buff.second.buffer.data(),
dst_buff.second.size(),
MPI_BYTE,
r,
0,
this->MPIComm);
if (err != MPI_SUCCESS)
throw vtkm::cont::ErrorFilterExecution(
"Error in MPI_Send inside Messenger::RecvDataSync");
this->Log << " Send complete" << std::endl;
}
}
}
}
}
//Clean up the send buffers.
if (it != this->SyncSendBuffers.end())
this->SyncSendBuffers.erase(it);
}
this->Log << " Barrier Begin" << std::endl;
MPI_Barrier(this->MPIComm);
this->Log << " Barrier End" << std::endl;
this->Log << " buffers.size() = " << buffers.size() << std::endl;
return !buffers.empty();
}
void Messenger::ProcessReceivedBuffers(std::vector<char*>& incomingBuffers,
std::vector<std::pair<int, vtkmdiy::MemoryBuffer>>& buffers)
{

@ -50,18 +50,43 @@ public:
#ifdef VTKM_ENABLE_MPI
VTKM_CONT void RegisterTag(int tag, std::size_t numRecvs, std::size_t size);
bool UsingSyncCommunication() const { return !this->UsingAsyncCommunication(); }
bool UsingAsyncCommunication() const { return this->AsyncCommunication; }
std::ofstream Log;
protected:
static std::size_t CalcMessageBufferSize(std::size_t msgSz);
void InitializeBuffers();
void CheckPendingSendRequests();
void CleanupRequests(int tag = TAG_ANY);
void SendData(int dst, int tag, const vtkmdiy::MemoryBuffer& buff);
void SendData(int dst, int tag, vtkmdiy::MemoryBuffer& buff)
{
if (this->AsyncCommunication)
this->SendDataAsync(dst, tag, buff);
else
this->SendDataSync(dst, tag, buff);
}
bool RecvData(const std::set<int>& tags,
std::vector<std::pair<int, vtkmdiy::MemoryBuffer>>& buffers,
bool blockAndWait = false);
bool blockAndWait = false)
{
if (this->AsyncCommunication)
return this->RecvDataAsync(tags, buffers, blockAndWait);
else
return this->RecvDataSync(tags, buffers, blockAndWait);
}
private:
void SendDataAsync(int dst, int tag, const vtkmdiy::MemoryBuffer& buff);
void SendDataSync(int dst, int tag, vtkmdiy::MemoryBuffer& buff);
bool RecvDataAsync(const std::set<int>& tags,
std::vector<std::pair<int, vtkmdiy::MemoryBuffer>>& buffers,
bool blockAndWait);
bool RecvDataSync(const std::set<int>& tags,
std::vector<std::pair<int, vtkmdiy::MemoryBuffer>>& buffers,
bool blockAndWait);
void PostRecv(int tag);
void PostRecv(int tag, std::size_t sz, int src = -1);
@ -73,7 +98,7 @@ private:
std::size_t id, numPackets, packet, packetSz, dataSz;
} Header;
bool RecvData(int tag, std::vector<vtkmdiy::MemoryBuffer>& buffers, bool blockAndWait = false);
//bool RecvData(int tag, std::vector<vtkmdiy::MemoryBuffer>& buffers, bool blockAndWait = false);
void PrepareForSend(int tag, const vtkmdiy::MemoryBuffer& buff, std::vector<char*>& buffList);
vtkm::Id GetMsgID() { return this->MsgID++; }
@ -86,6 +111,9 @@ private:
using RankIdPair = std::pair<int, int>;
//Member data
bool AsyncCommunication = false; //true;
// <tag, {dst, buffer}>
std::map<int, std::vector<std::pair<int, vtkmdiy::MemoryBuffer>>> SyncSendBuffers;
std::map<int, std::pair<std::size_t, std::size_t>> MessageTagInfo;
MPI_Comm MPIComm;
std::size_t MsgID;
@ -100,6 +128,7 @@ private:
const std::set<int>& tags,
bool BlockAndWait,
std::vector<RequestTagPair>& reqTags);
#else
protected:
static constexpr int NumRanks = 1;
@ -107,6 +136,21 @@ protected:
#endif
};
template <typename T>
std::ostream& operator<<(std::ostream& os, const std::vector<T>& v)
{
os << "[";
for (std::size_t i = 0; i < v.size(); ++i)
{
os << v[i];
if (i != v.size() - 1)
os << ", ";
}
os << "]";
return os;
}
}
}
}

@ -62,7 +62,9 @@ protected:
#ifdef VTKM_ENABLE_MPI
static constexpr int MSG_TERMINATE = 1;
enum { MESSAGE_TAG = 0x42000, PARTICLE_TAG = 0x42001 };
//enum { MESSAGE_TAG = 0x42000, PARTICLE_TAG = 0x42001 };
enum { MESSAGE_TAG = 100, PARTICLE_TAG = 200 };
//int MESSAGE_TAG = 101, PARTICLE_TAG = 102;
VTKM_CONT void RegisterMessages(int msgSz, int nParticles, int numBlockIds);

@ -285,8 +285,8 @@ void ValidateOutput(const vtkm::cont::DataSet& out,
"Wrong number of coordinate systems in the output dataset");
vtkm::cont::UnknownCellSet dcells = out.GetCellSet();
out.PrintSummary(std::cout);
std::cout << " nSeeds= " << numSeeds << std::endl;
//out.PrintSummary(std::cout);
//std::cout << " nSeeds= " << numSeeds << std::endl;
VTKM_TEST_ASSERT(dcells.GetNumberOfCells() == numSeeds, "Wrong number of cells");
auto coords = out.GetCoordinateSystem().GetDataAsMultiplexer();
auto ptPortal = coords.ReadPortal();
@ -390,8 +390,8 @@ void TestPartitionedDataSet(vtkm::Id nPerRank, bool useGhost, FilterType fType,
AddVectorFields(pds, fieldName, vecX);
vtkm::cont::ArrayHandle<vtkm::Particle> seedArray;
seedArray = vtkm::cont::make_ArrayHandle({ vtkm::Particle(vtkm::Vec3f(.2f, 1.0f, .2f), 0),
vtkm::Particle(vtkm::Vec3f(.2f, 2.0f, .2f), 1) });
seedArray = vtkm::cont::make_ArrayHandle({ vtkm::Particle(vtkm::Vec3f(.2f, 1.0f, .2f), 0) });
// vtkm::Particle(vtkm::Vec3f(.2f, 2.0f, .2f), 1) });
vtkm::Id numSeeds = seedArray.GetNumberOfValues();
if (fType == STREAMLINE)
@ -441,18 +441,32 @@ void TestStreamlineFiltersMPI()
{
std::vector<bool> flags = { true, false };
std::vector<FilterType> filterTypes = { PARTICLE_ADVECTION, STREAMLINE, PATHLINE };
//filterTypes = {filterTypes[1]};
//flags = {false};
//TestPartitionedDataSet(1, false, PARTICLE_ADVECTION, true);
auto comm = vtkm::cont::EnvironmentTracker::GetCommunicator();
for (int n = 1; n < 3; n++)
{
for (auto useGhost : flags)
for (auto fType : filterTypes)
for (auto useThreaded : flags)
{
useThreaded = false;
if (comm.rank() == 0)
std::cout << " N= " << n << " " << useGhost << " " << useThreaded << std::endl;
TestPartitionedDataSet(n, useGhost, fType, useThreaded);
}
}
//streamline, threaded will sometimes hang.
for (auto fType : filterTypes)
for (auto useThreaded : flags)
{
useThreaded = false;
TestAMRStreamline(fType, useThreaded);
}
}
}