From ade25db6d56ae8391a895b03a7fd39e490741a13 Mon Sep 17 00:00:00 2001 From: Dave Pugmire Date: Wed, 3 Jul 2024 07:42:39 -0400 Subject: [PATCH] Create ParticleExchanger. --- vtkm/filter/flow/internal/AdvectAlgorithm.h | 106 +++----- .../flow/internal/AdvectAlgorithmTerminator.h | 111 +++++++++ vtkm/filter/flow/internal/CMakeLists.txt | 2 + vtkm/filter/flow/internal/ParticleExchanger.h | 228 ++++++++++++++++++ 4 files changed, 370 insertions(+), 77 deletions(-) create mode 100644 vtkm/filter/flow/internal/AdvectAlgorithmTerminator.h create mode 100644 vtkm/filter/flow/internal/ParticleExchanger.h diff --git a/vtkm/filter/flow/internal/AdvectAlgorithm.h b/vtkm/filter/flow/internal/AdvectAlgorithm.h index bb716dde8..b826a01a8 100644 --- a/vtkm/filter/flow/internal/AdvectAlgorithm.h +++ b/vtkm/filter/flow/internal/AdvectAlgorithm.h @@ -12,8 +12,10 @@ #define vtk_m_filter_flow_internal_AdvectAlgorithm_h #include +#include #include #include +#include #include #ifdef VTKM_ENABLE_MPI #include @@ -29,86 +31,26 @@ namespace flow namespace internal { -class AdvectAlgorithmTerminator -{ -public: -#ifdef VTKM_ENABLE_MPI - AdvectAlgorithmTerminator(vtkmdiy::mpi::communicator& comm) - : MPIComm(vtkmdiy::mpi::mpi_cast(comm.handle())) -#else - AdvectAlgorithmTerminator(vtkmdiy::mpi::communicator& vtkmNotUsed(comm)) -#endif - { - } +/* +ParticleMessenger::Exchange() + - SendParticles(outData--> map[dstRank]=vector of pairs); + -- SendParticles(map...) + --- for each m : map SendParticles(m); + ---- SendParticles(dst, container) + ----- serialize, SendData(dst, buff); + ------ SendDataAsync(dst,buff) + ------- header??, req=mpi_isend(), store req. - void AddWork() - { -#ifdef VTKM_ENABLE_MPI - this->Dirty = 1; -#endif - } + - RecvAny(data, block); + -- RecvData(tags, buffers, block) + --- RecvDataAsyncProbe(tag, buffers, block) + ---- while (true) + ----- if block: MPI_Probe() msgReceived=true + ----- else : MPI_Iprobe msgReceived = check + ----- if msgRecvd: MPI_Get_count(), MPI_Recv(), buffers, blockAndWait=false - bool Done() const { return this->State == AdvectAlgorithmTerminatorState::DONE; } - void Control(bool haveLocalWork) - { -#ifdef VTKM_ENABLE_MPI - if (this->State == STATE_0 && !haveLocalWork) - { - MPI_Ibarrier(this->MPIComm, &this->StateReq); - this->Dirty = 0; - this->State = STATE_1; - } - else if (this->State == STATE_1) - { - MPI_Status status; - int flag; - MPI_Test(&this->StateReq, &flag, &status); - if (flag == 1) - { - int localDirty = this->Dirty; - MPI_Iallreduce( - &localDirty, &this->AllDirty, 1, MPI_INT, MPI_LOR, this->MPIComm, &this->StateReq); - this->State = STATE_2; - } - } - else if (this->State == STATE_2) - { - MPI_Status status; - int flag; - MPI_Test(&this->StateReq, &flag, &status); - if (flag == 1) - { - if (this->AllDirty == 0) //done - this->State = DONE; - else - this->State = STATE_0; //reset. - } - } -#else - if (!haveLocalWork) - this->State = DONE; -#endif - } - -private: - enum AdvectAlgorithmTerminatorState - { - STATE_0, - STATE_1, - STATE_2, - DONE - }; - - AdvectAlgorithmTerminatorState State = AdvectAlgorithmTerminatorState::STATE_0; - -#ifdef VTKM_ENABLE_MPI - std::atomic Dirty; - int AllDirty = 0; - MPI_Request StateReq; - MPI_Comm MPIComm; -#endif -}; +*/ template class AdvectAlgorithm @@ -125,6 +67,7 @@ public: , Rank(this->Comm.rank()) , UseAsynchronousCommunication(useAsyncComm) , Terminator(this->Comm) + , Exchanger(this->Comm) { } @@ -280,6 +223,11 @@ public: return !particles.empty(); } + void ExchangeParticles() + { + // this->Exchanger.Exchange(outgoing, outgoingRanks, this->ParticleBlockIDsMap, incoming, incomingBlockIDs, block); + } + void Communicate(vtkm::filter::flow::internal::ParticleMessenger& messenger) { std::vector outgoing; @@ -295,6 +243,8 @@ public: block = this->GetBlockAndWait(messenger.UsingSyncCommunication()); #endif + // this->Exchanger.Exchange(outgoing, outgoingRanks, this->ParticleBlockIDsMap, incoming, incomingBlockIDs, block); + vtkm::Id numTermMessages; messenger.Exchange(outgoing, outgoingRanks, @@ -463,6 +413,8 @@ public: vtkm::FloatDefault StepSize; bool UseAsynchronousCommunication = true; AdvectAlgorithmTerminator Terminator; + + ParticleExchanger Exchanger; }; } diff --git a/vtkm/filter/flow/internal/AdvectAlgorithmTerminator.h b/vtkm/filter/flow/internal/AdvectAlgorithmTerminator.h new file mode 100644 index 000000000..7b955a326 --- /dev/null +++ b/vtkm/filter/flow/internal/AdvectAlgorithmTerminator.h @@ -0,0 +1,111 @@ +//============================================================================ +// Copyright (c) Kitware, Inc. +// All rights reserved. +// See LICENSE.txt for details. +// +// This software is distributed WITHOUT ANY WARRANTY; without even +// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +// PURPOSE. See the above copyright notice for more information. +//============================================================================ + +#ifndef vtk_m_filter_flow_internal_AdvectAlgorithmTerminator_h +#define vtk_m_filter_flow_internal_AdvectAlgorithmTerminator_h + +namespace vtkm +{ +namespace filter +{ +namespace flow +{ +namespace internal +{ + +class AdvectAlgorithmTerminator +{ +public: +#ifdef VTKM_ENABLE_MPI + AdvectAlgorithmTerminator(vtkmdiy::mpi::communicator& comm) + : MPIComm(vtkmdiy::mpi::mpi_cast(comm.handle())) +#else + AdvectAlgorithmTerminator(vtkmdiy::mpi::communicator& vtkmNotUsed(comm)) +#endif + { + } + + void AddWork() + { +#ifdef VTKM_ENABLE_MPI + this->Dirty = 1; +#endif + } + + bool Done() const { return this->State == AdvectAlgorithmTerminatorState::DONE; } + + void Control(bool haveLocalWork) + { +#ifdef VTKM_ENABLE_MPI + if (this->State == STATE_0 && !haveLocalWork) + { + MPI_Ibarrier(this->MPIComm, &this->StateReq); + this->Dirty = 0; + this->State = STATE_1; + } + else if (this->State == STATE_1) + { + MPI_Status status; + int flag; + MPI_Test(&this->StateReq, &flag, &status); + if (flag == 1) + { + int localDirty = this->Dirty; + MPI_Iallreduce( + &localDirty, &this->AllDirty, 1, MPI_INT, MPI_LOR, this->MPIComm, &this->StateReq); + this->State = STATE_2; + } + } + else if (this->State == STATE_2) + { + MPI_Status status; + int flag; + MPI_Test(&this->StateReq, &flag, &status); + if (flag == 1) + { + if (this->AllDirty == 0) //done + this->State = DONE; + else + this->State = STATE_0; //reset. + } + } +#else + if (!haveLocalWork) + this->State = DONE; +#endif + } + +private: + enum AdvectAlgorithmTerminatorState + { + STATE_0, + STATE_1, + STATE_2, + DONE + }; + + AdvectAlgorithmTerminatorState State = AdvectAlgorithmTerminatorState::STATE_0; + +#ifdef VTKM_ENABLE_MPI + std::atomic Dirty; + int AllDirty = 0; + MPI_Request StateReq; + MPI_Comm MPIComm; +#endif +}; + + +} +} +} +} //vtkm::filter::flow::internal + + +#endif //vtk_m_filter_flow_internal_AdvectAlgorithmTerminator_h diff --git a/vtkm/filter/flow/internal/CMakeLists.txt b/vtkm/filter/flow/internal/CMakeLists.txt index b59dedceb..880c04fa5 100644 --- a/vtkm/filter/flow/internal/CMakeLists.txt +++ b/vtkm/filter/flow/internal/CMakeLists.txt @@ -10,6 +10,7 @@ set(headers AdvectAlgorithm.h + AdvectAlgorithmTerminator.h AdvectAlgorithmThreaded.h BoundsMap.h DataSetIntegrator.h @@ -19,6 +20,7 @@ set(headers LagrangianStructureHelpers.h Messenger.h ParticleAdvector.h + ParticleExchanger.h ParticleMessenger.h ) diff --git a/vtkm/filter/flow/internal/ParticleExchanger.h b/vtkm/filter/flow/internal/ParticleExchanger.h new file mode 100644 index 000000000..f5ad02ca6 --- /dev/null +++ b/vtkm/filter/flow/internal/ParticleExchanger.h @@ -0,0 +1,228 @@ +//============================================================================ +// Copyright (c) Kitware, Inc. +// All rights reserved. +// See LICENSE.txt for details. +// +// This software is distributed WITHOUT ANY WARRANTY; without even +// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +// PURPOSE. See the above copyright notice for more information. +//============================================================================ + +#ifndef vtk_m_filter_flow_internal_ParticleExchanger_h +#define vtk_m_filter_flow_internal_ParticleExchanger_h + +namespace vtkm +{ +namespace filter +{ +namespace flow +{ +namespace internal +{ + +template +class ParticleExchanger +{ +public: +#ifdef VTKM_ENABLE_MPI + ParticleExchanger(vtkmdiy::mpi::communicator& comm) + : MPIComm(vtkmdiy::mpi::mpi_cast(comm.handle())) + , NumRanks(comm.size()) + , Rank(comm.rank()) +#else + ParticleExchanger(vtkmdiy::mpi::communicator& vtkmNotUsed(comm)) +#endif + { + } +#ifdef VTKM_ENABLE_MPI + ~ParticleExchanger() { this->CleanupSendBuffers(); } +#endif + + void Exchange(const std::vector& outData, + const std::vector& outRanks, + const std::unordered_map>& outBlockIDsMap, + std::vector& inData, + std::unordered_map>& inDataBlockIDsMap, + bool blockAndWait) + { + VTKM_ASSERT(outData.size() == outRanks.size()); + + if (this->NumRanks == 1) + this->SerialExchange(outData, outBlockIDsMap, inData, inDataBlockIDsMap); +#ifdef VTKM_ENABLE_MPI + else + { + this->SendParticles(outData, outRanks, outBlockIDsMap); + this->RecvParticles(inData, inDataBlockIDsMap, blockAndWait); + this->CleanupSendBuffers(); + } +#endif + } + +private: + // pair(vector of particles, vector of blockIds) + //using ParticleCommType = std::pair, std::vector>; + // pair(particle, bids); + using ParticleCommType = std::pair>; + + void CleanupSendBuffers() { std::cout << "IMPLEMENT ME!!!" << std::endl; } + + void SendParticles(const std::vector& outData, + const std::vector& outRanks, + const std::unordered_map>& outBlockIDsMap) + { + if (outData.empty()) + return; + + //create the send data: vector of particles, vector of vector of blockIds. + std::size_t n = outData.size(); + std::unordered_map> sendData; + + // dst, vector of pair(particles, blockIds) + for (std::size_t i = 0; i < n; i++) + { + const auto& bids = outBlockIDsMap.find(outData[i].GetID())->second; + sendData[outRanks[i]].emplace_back(std::make_pair(std::move(outData[i]), std::move(bids))); + } + + //Send to dst, vector> + for (auto& si : sendData) + this->SendParticlesToDst(si.first, si.second); + } + + void SendParticlesToDst(int dst, const std::vector& data) + { + if (dst == this->Rank) + { + VTKM_LOG_S(vtkm::cont::LogLevel::Error, "Error. Sending a particle to yourself."); + return; + } + + //Serialize vector(pair(particle, bids)) and send. + vtkmdiy::MemoryBuffer* bb = new vtkmdiy::MemoryBuffer(); + vtkmdiy::save(*bb, data); + + MPI_Request req; + int err = MPI_Isend(bb->buffer.data(), bb->size(), MPI_BYTE, dst, 0, this->MPIComm, &req); + if (err != MPI_SUCCESS) + throw vtkm::cont::ErrorFilterExecution("Error in MPI_Isend inside Messenger::SendData"); + this->SendBuffers[req] = bb; + } + + void RecvParticles(std::vector& inData, + std::unordered_map>& inDataBlockIDsMap, + bool blockAndWait) const + { + inData.resize(0); + inDataBlockIDsMap.clear(); + + std::vector buffers; + + MPI_Status status; + while (true) + { + bool msgReceived = false; + int err; + if (blockAndWait) + { + err = MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, this->MPIComm, &status); + if (err != MPI_SUCCESS) + throw vtkm::cont::ErrorFilterExecution( + "Error in MPI_Probe in ParticleExchanger::RecvParticles"); + msgReceived = true; + } + else + { + int flag = 0; + err = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, this->MPIComm, &flag, &status); + if (err != MPI_SUCCESS) + throw vtkm::cont::ErrorFilterExecution( + "Error in MPI_Probe in ParticleExchanger::RecvParticles"); + msgReceived = (flag == 1); + } + + if (msgReceived) + { + int incomingSize; + err = MPI_Get_count(&status, MPI_BYTE, &incomingSize); + if (err != MPI_SUCCESS) + throw vtkm::cont::ErrorFilterExecution( + "Error in MPI_Probe in ParticleExchanger::RecvParticles"); + + std::vector recvBuff; + recvBuff.resize(incomingSize); + MPI_Status recvStatus; + + err = MPI_Recv(recvBuff.data(), + incomingSize, + MPI_BYTE, + status.MPI_SOURCE, + status.MPI_TAG, + this->MPIComm, + &recvStatus); + if (err != MPI_SUCCESS) + throw vtkm::cont::ErrorFilterExecution( + "Error in MPI_Probe in ParticleExchanger::RecvParticles"); + + vtkmdiy::MemoryBuffer memBuff; + vtkmdiy::save(memBuff, recvBuff); + buffers.emplace_back(std::move(memBuff)); + + blockAndWait = false; //Check one more time to see if anything else arrived. + } + else + { + break; + } + } + + //Unpack buffers into particle data. + //buffers: vector)> + for (auto& b : buffers) + { + std::vector data; + vtkmdiy::load(b, data); + + for (auto& d : data) + { + const auto& particle = d.first; + const auto& bids = d.second; + inDataBlockIDsMap[particle.GetID()] = std::move(bids); + inData.emplace_back(std::move(particle)); + } + } + } + + void SerialExchange(const std::vector& outData, + const std::unordered_map>& outBlockIDsMap, + std::vector& inData, + std::unordered_map>& inDataBlockIDsMap) + { + //Copy output to input. + for (const auto& p : outData) + { + const auto& bids = outBlockIDsMap.find(p.GetID())->second; + inData.emplace_back(p); + inDataBlockIDsMap[p.GetID()] = bids; + } + } + + +#ifdef VTKM_ENABLE_MPI + MPI_Comm MPIComm; + vtkm::Id NumRanks; + vtkm::Id Rank; + std::unordered_map SendBuffers; +#else + vtkm::Id NumRanks = 1; + vtkm::Id Rank = 0; +#endif +}; + +} +} +} +} //vtkm::filter::flow::internal + + +#endif //vtk_m_filter_flow_internal_ParticleExchanger_h