Create ParticleExchanger.

This commit is contained in:
Dave Pugmire 2024-07-03 07:42:39 -04:00
parent c75dcace78
commit ade25db6d5
4 changed files with 370 additions and 77 deletions

@ -12,8 +12,10 @@
#define vtk_m_filter_flow_internal_AdvectAlgorithm_h
#include <vtkm/cont/PartitionedDataSet.h>
#include <vtkm/filter/flow/internal/AdvectAlgorithmTerminator.h>
#include <vtkm/filter/flow/internal/BoundsMap.h>
#include <vtkm/filter/flow/internal/DataSetIntegrator.h>
#include <vtkm/filter/flow/internal/ParticleExchanger.h>
#include <vtkm/filter/flow/internal/ParticleMessenger.h>
#ifdef VTKM_ENABLE_MPI
#include <vtkm/thirdparty/diy/diy.h>
@ -29,86 +31,26 @@ namespace flow
namespace internal
{
class AdvectAlgorithmTerminator
{
public:
#ifdef VTKM_ENABLE_MPI
AdvectAlgorithmTerminator(vtkmdiy::mpi::communicator& comm)
: MPIComm(vtkmdiy::mpi::mpi_cast(comm.handle()))
#else
AdvectAlgorithmTerminator(vtkmdiy::mpi::communicator& vtkmNotUsed(comm))
#endif
{
}
/*
ParticleMessenger::Exchange()
- SendParticles(outData--> map[dstRank]=vector of pairs);
-- SendParticles(map...)
--- for each m : map SendParticles(m);
---- SendParticles(dst, container)
----- serialize, SendData(dst, buff);
------ SendDataAsync(dst,buff)
------- header??, req=mpi_isend(), store req.
void AddWork()
{
#ifdef VTKM_ENABLE_MPI
this->Dirty = 1;
#endif
}
- RecvAny(data, block);
-- RecvData(tags, buffers, block)
--- RecvDataAsyncProbe(tag, buffers, block)
---- while (true)
----- if block: MPI_Probe() msgReceived=true
----- else : MPI_Iprobe msgReceived = check
----- if msgRecvd: MPI_Get_count(), MPI_Recv(), buffers, blockAndWait=false
bool Done() const { return this->State == AdvectAlgorithmTerminatorState::DONE; }
void Control(bool haveLocalWork)
{
#ifdef VTKM_ENABLE_MPI
if (this->State == STATE_0 && !haveLocalWork)
{
MPI_Ibarrier(this->MPIComm, &this->StateReq);
this->Dirty = 0;
this->State = STATE_1;
}
else if (this->State == STATE_1)
{
MPI_Status status;
int flag;
MPI_Test(&this->StateReq, &flag, &status);
if (flag == 1)
{
int localDirty = this->Dirty;
MPI_Iallreduce(
&localDirty, &this->AllDirty, 1, MPI_INT, MPI_LOR, this->MPIComm, &this->StateReq);
this->State = STATE_2;
}
}
else if (this->State == STATE_2)
{
MPI_Status status;
int flag;
MPI_Test(&this->StateReq, &flag, &status);
if (flag == 1)
{
if (this->AllDirty == 0) //done
this->State = DONE;
else
this->State = STATE_0; //reset.
}
}
#else
if (!haveLocalWork)
this->State = DONE;
#endif
}
private:
enum AdvectAlgorithmTerminatorState
{
STATE_0,
STATE_1,
STATE_2,
DONE
};
AdvectAlgorithmTerminatorState State = AdvectAlgorithmTerminatorState::STATE_0;
#ifdef VTKM_ENABLE_MPI
std::atomic<int> Dirty;
int AllDirty = 0;
MPI_Request StateReq;
MPI_Comm MPIComm;
#endif
};
*/
template <typename DSIType>
class AdvectAlgorithm
@ -125,6 +67,7 @@ public:
, Rank(this->Comm.rank())
, UseAsynchronousCommunication(useAsyncComm)
, Terminator(this->Comm)
, Exchanger(this->Comm)
{
}
@ -280,6 +223,11 @@ public:
return !particles.empty();
}
void ExchangeParticles()
{
// this->Exchanger.Exchange(outgoing, outgoingRanks, this->ParticleBlockIDsMap, incoming, incomingBlockIDs, block);
}
void Communicate(vtkm::filter::flow::internal::ParticleMessenger<ParticleType>& messenger)
{
std::vector<ParticleType> outgoing;
@ -295,6 +243,8 @@ public:
block = this->GetBlockAndWait(messenger.UsingSyncCommunication());
#endif
// this->Exchanger.Exchange(outgoing, outgoingRanks, this->ParticleBlockIDsMap, incoming, incomingBlockIDs, block);
vtkm::Id numTermMessages;
messenger.Exchange(outgoing,
outgoingRanks,
@ -463,6 +413,8 @@ public:
vtkm::FloatDefault StepSize;
bool UseAsynchronousCommunication = true;
AdvectAlgorithmTerminator Terminator;
ParticleExchanger<ParticleType> Exchanger;
};
}

@ -0,0 +1,111 @@
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
//
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//============================================================================
#ifndef vtk_m_filter_flow_internal_AdvectAlgorithmTerminator_h
#define vtk_m_filter_flow_internal_AdvectAlgorithmTerminator_h
namespace vtkm
{
namespace filter
{
namespace flow
{
namespace internal
{
class AdvectAlgorithmTerminator
{
public:
#ifdef VTKM_ENABLE_MPI
AdvectAlgorithmTerminator(vtkmdiy::mpi::communicator& comm)
: MPIComm(vtkmdiy::mpi::mpi_cast(comm.handle()))
#else
AdvectAlgorithmTerminator(vtkmdiy::mpi::communicator& vtkmNotUsed(comm))
#endif
{
}
void AddWork()
{
#ifdef VTKM_ENABLE_MPI
this->Dirty = 1;
#endif
}
bool Done() const { return this->State == AdvectAlgorithmTerminatorState::DONE; }
void Control(bool haveLocalWork)
{
#ifdef VTKM_ENABLE_MPI
if (this->State == STATE_0 && !haveLocalWork)
{
MPI_Ibarrier(this->MPIComm, &this->StateReq);
this->Dirty = 0;
this->State = STATE_1;
}
else if (this->State == STATE_1)
{
MPI_Status status;
int flag;
MPI_Test(&this->StateReq, &flag, &status);
if (flag == 1)
{
int localDirty = this->Dirty;
MPI_Iallreduce(
&localDirty, &this->AllDirty, 1, MPI_INT, MPI_LOR, this->MPIComm, &this->StateReq);
this->State = STATE_2;
}
}
else if (this->State == STATE_2)
{
MPI_Status status;
int flag;
MPI_Test(&this->StateReq, &flag, &status);
if (flag == 1)
{
if (this->AllDirty == 0) //done
this->State = DONE;
else
this->State = STATE_0; //reset.
}
}
#else
if (!haveLocalWork)
this->State = DONE;
#endif
}
private:
enum AdvectAlgorithmTerminatorState
{
STATE_0,
STATE_1,
STATE_2,
DONE
};
AdvectAlgorithmTerminatorState State = AdvectAlgorithmTerminatorState::STATE_0;
#ifdef VTKM_ENABLE_MPI
std::atomic<int> Dirty;
int AllDirty = 0;
MPI_Request StateReq;
MPI_Comm MPIComm;
#endif
};
}
}
}
} //vtkm::filter::flow::internal
#endif //vtk_m_filter_flow_internal_AdvectAlgorithmTerminator_h

@ -10,6 +10,7 @@
set(headers
AdvectAlgorithm.h
AdvectAlgorithmTerminator.h
AdvectAlgorithmThreaded.h
BoundsMap.h
DataSetIntegrator.h
@ -19,6 +20,7 @@ set(headers
LagrangianStructureHelpers.h
Messenger.h
ParticleAdvector.h
ParticleExchanger.h
ParticleMessenger.h
)

@ -0,0 +1,228 @@
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
//
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//============================================================================
#ifndef vtk_m_filter_flow_internal_ParticleExchanger_h
#define vtk_m_filter_flow_internal_ParticleExchanger_h
namespace vtkm
{
namespace filter
{
namespace flow
{
namespace internal
{
template <typename ParticleType>
class ParticleExchanger
{
public:
#ifdef VTKM_ENABLE_MPI
ParticleExchanger(vtkmdiy::mpi::communicator& comm)
: MPIComm(vtkmdiy::mpi::mpi_cast(comm.handle()))
, NumRanks(comm.size())
, Rank(comm.rank())
#else
ParticleExchanger(vtkmdiy::mpi::communicator& vtkmNotUsed(comm))
#endif
{
}
#ifdef VTKM_ENABLE_MPI
~ParticleExchanger() { this->CleanupSendBuffers(); }
#endif
void Exchange(const std::vector<ParticleType>& outData,
const std::vector<vtkm::Id>& outRanks,
const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
std::vector<ParticleType>& inData,
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap,
bool blockAndWait)
{
VTKM_ASSERT(outData.size() == outRanks.size());
if (this->NumRanks == 1)
this->SerialExchange(outData, outBlockIDsMap, inData, inDataBlockIDsMap);
#ifdef VTKM_ENABLE_MPI
else
{
this->SendParticles(outData, outRanks, outBlockIDsMap);
this->RecvParticles(inData, inDataBlockIDsMap, blockAndWait);
this->CleanupSendBuffers();
}
#endif
}
private:
// pair(vector of particles, vector of blockIds)
//using ParticleCommType = std::pair<std::vector<ParticleType>, std::vector<vtkm::Id>>;
// pair(particle, bids);
using ParticleCommType = std::pair<ParticleType, std::vector<vtkm::Id>>;
void CleanupSendBuffers() { std::cout << "IMPLEMENT ME!!!" << std::endl; }
void SendParticles(const std::vector<ParticleType>& outData,
const std::vector<vtkm::Id>& outRanks,
const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap)
{
if (outData.empty())
return;
//create the send data: vector of particles, vector of vector of blockIds.
std::size_t n = outData.size();
std::unordered_map<int, std::vector<ParticleCommType>> sendData;
// dst, vector of pair(particles, blockIds)
for (std::size_t i = 0; i < n; i++)
{
const auto& bids = outBlockIDsMap.find(outData[i].GetID())->second;
sendData[outRanks[i]].emplace_back(std::make_pair(std::move(outData[i]), std::move(bids)));
}
//Send to dst, vector<pair<particle, bids>>
for (auto& si : sendData)
this->SendParticlesToDst(si.first, si.second);
}
void SendParticlesToDst(int dst, const std::vector<ParticleCommType>& data)
{
if (dst == this->Rank)
{
VTKM_LOG_S(vtkm::cont::LogLevel::Error, "Error. Sending a particle to yourself.");
return;
}
//Serialize vector(pair(particle, bids)) and send.
vtkmdiy::MemoryBuffer* bb = new vtkmdiy::MemoryBuffer();
vtkmdiy::save(*bb, data);
MPI_Request req;
int err = MPI_Isend(bb->buffer.data(), bb->size(), MPI_BYTE, dst, 0, this->MPIComm, &req);
if (err != MPI_SUCCESS)
throw vtkm::cont::ErrorFilterExecution("Error in MPI_Isend inside Messenger::SendData");
this->SendBuffers[req] = bb;
}
void RecvParticles(std::vector<ParticleType>& inData,
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap,
bool blockAndWait) const
{
inData.resize(0);
inDataBlockIDsMap.clear();
std::vector<vtkmdiy::MemoryBuffer> buffers;
MPI_Status status;
while (true)
{
bool msgReceived = false;
int err;
if (blockAndWait)
{
err = MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, this->MPIComm, &status);
if (err != MPI_SUCCESS)
throw vtkm::cont::ErrorFilterExecution(
"Error in MPI_Probe in ParticleExchanger::RecvParticles");
msgReceived = true;
}
else
{
int flag = 0;
err = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, this->MPIComm, &flag, &status);
if (err != MPI_SUCCESS)
throw vtkm::cont::ErrorFilterExecution(
"Error in MPI_Probe in ParticleExchanger::RecvParticles");
msgReceived = (flag == 1);
}
if (msgReceived)
{
int incomingSize;
err = MPI_Get_count(&status, MPI_BYTE, &incomingSize);
if (err != MPI_SUCCESS)
throw vtkm::cont::ErrorFilterExecution(
"Error in MPI_Probe in ParticleExchanger::RecvParticles");
std::vector<char> recvBuff;
recvBuff.resize(incomingSize);
MPI_Status recvStatus;
err = MPI_Recv(recvBuff.data(),
incomingSize,
MPI_BYTE,
status.MPI_SOURCE,
status.MPI_TAG,
this->MPIComm,
&recvStatus);
if (err != MPI_SUCCESS)
throw vtkm::cont::ErrorFilterExecution(
"Error in MPI_Probe in ParticleExchanger::RecvParticles");
vtkmdiy::MemoryBuffer memBuff;
vtkmdiy::save(memBuff, recvBuff);
buffers.emplace_back(std::move(memBuff));
blockAndWait = false; //Check one more time to see if anything else arrived.
}
else
{
break;
}
}
//Unpack buffers into particle data.
//buffers: vector<pair(particle, vector<vtkm::Id>)>
for (auto& b : buffers)
{
std::vector<ParticleCommType> data;
vtkmdiy::load(b, data);
for (auto& d : data)
{
const auto& particle = d.first;
const auto& bids = d.second;
inDataBlockIDsMap[particle.GetID()] = std::move(bids);
inData.emplace_back(std::move(particle));
}
}
}
void SerialExchange(const std::vector<ParticleType>& outData,
const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
std::vector<ParticleType>& inData,
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap)
{
//Copy output to input.
for (const auto& p : outData)
{
const auto& bids = outBlockIDsMap.find(p.GetID())->second;
inData.emplace_back(p);
inDataBlockIDsMap[p.GetID()] = bids;
}
}
#ifdef VTKM_ENABLE_MPI
MPI_Comm MPIComm;
vtkm::Id NumRanks;
vtkm::Id Rank;
std::unordered_map<MPI_Request, vtkmdiy::MemoryBuffer*> SendBuffers;
#else
vtkm::Id NumRanks = 1;
vtkm::Id Rank = 0;
#endif
};
}
}
}
} //vtkm::filter::flow::internal
#endif //vtk_m_filter_flow_internal_ParticleExchanger_h