From 4bd340156b029dc63cb230965649bfdb5d05b11e Mon Sep 17 00:00:00 2001 From: Dave Pugmire Date: Tue, 23 Jan 2024 16:57:10 -0500 Subject: [PATCH 1/3] Use async termination. --- vtkm/filter/flow/FilterParticleAdvection.h | 3 +- .../FilterParticleAdvectionSteadyState.cxx | 10 +- .../FilterParticleAdvectionUnsteadyState.cxx | 9 +- vtkm/filter/flow/internal/AdvectAlgorithm.h | 158 +++++++++++++----- .../flow/internal/AdvectAlgorithmThreaded.h | 29 ++-- 5 files changed, 140 insertions(+), 69 deletions(-) diff --git a/vtkm/filter/flow/FilterParticleAdvection.h b/vtkm/filter/flow/FilterParticleAdvection.h index aef43bf37..49f37c633 100644 --- a/vtkm/filter/flow/FilterParticleAdvection.h +++ b/vtkm/filter/flow/FilterParticleAdvection.h @@ -15,6 +15,7 @@ #include #include #include +#include #include namespace vtkm @@ -104,7 +105,7 @@ protected: bool BlockIdsSet = false; std::vector BlockIds; - + vtkm::filter::flow::internal::BoundsMap BoundsMap; vtkm::Id NumberOfSteps = 0; vtkm::cont::UnknownArrayHandle Seeds; vtkm::filter::flow::IntegrationSolverType SolverType = diff --git a/vtkm/filter/flow/FilterParticleAdvectionSteadyState.cxx b/vtkm/filter/flow/FilterParticleAdvectionSteadyState.cxx index c70c2b3d6..5f4711a39 100644 --- a/vtkm/filter/flow/FilterParticleAdvectionSteadyState.cxx +++ b/vtkm/filter/flow/FilterParticleAdvectionSteadyState.cxx @@ -58,13 +58,15 @@ FilterParticleAdvectionSteadyState::DoExecutePartitions( DataSetIntegratorSteadyState; this->ValidateOptions(); + if (this->BlockIdsSet) + this->BoundsMap = vtkm::filter::flow::internal::BoundsMap(input, this->BlockIds); + else + this->BoundsMap = vtkm::filter::flow::internal::BoundsMap(input); - - vtkm::filter::flow::internal::BoundsMap boundsMap(input); std::vector dsi; for (vtkm::Id i = 0; i < input.GetNumberOfPartitions(); i++) { - vtkm::Id blockId = boundsMap.GetLocalBlockId(i); + vtkm::Id blockId = this->BoundsMap.GetLocalBlockId(i); auto dataset = input.GetPartition(i); // Build the field for the current dataset @@ -78,7 +80,7 @@ FilterParticleAdvectionSteadyState::DoExecutePartitions( } vtkm::filter::flow::internal::ParticleAdvector pav( - boundsMap, dsi, this->UseThreadedAlgorithm, this->UseAsynchronousCommunication); + this->BoundsMap, dsi, this->UseThreadedAlgorithm, this->UseAsynchronousCommunication); vtkm::cont::ArrayHandle particles; this->Seeds.AsArrayHandle(particles); diff --git a/vtkm/filter/flow/FilterParticleAdvectionUnsteadyState.cxx b/vtkm/filter/flow/FilterParticleAdvectionUnsteadyState.cxx index 7d72cfcc8..33963047b 100644 --- a/vtkm/filter/flow/FilterParticleAdvectionUnsteadyState.cxx +++ b/vtkm/filter/flow/FilterParticleAdvectionUnsteadyState.cxx @@ -55,12 +55,15 @@ FilterParticleAdvectionUnsteadyState::DoExecutePartitions( using DSIType = vtkm::filter::flow::internal:: DataSetIntegratorUnsteadyState; - vtkm::filter::flow::internal::BoundsMap boundsMap(input); + if (this->BlockIdsSet) + this->BoundsMap = vtkm::filter::flow::internal::BoundsMap(input, this->BlockIds); + else + this->BoundsMap = vtkm::filter::flow::internal::BoundsMap(input); std::vector dsi; for (vtkm::Id i = 0; i < input.GetNumberOfPartitions(); i++) { - vtkm::Id blockId = boundsMap.GetLocalBlockId(i); + vtkm::Id blockId = this->BoundsMap.GetLocalBlockId(i); auto ds1 = input.GetPartition(i); auto ds2 = this->Input2.GetPartition(i); @@ -85,7 +88,7 @@ FilterParticleAdvectionUnsteadyState::DoExecutePartitions( analysis); } vtkm::filter::flow::internal::ParticleAdvector pav( - boundsMap, dsi, this->UseThreadedAlgorithm, this->UseAsynchronousCommunication); + this->BoundsMap, dsi, this->UseThreadedAlgorithm, this->UseAsynchronousCommunication); vtkm::cont::ArrayHandle particles; this->Seeds.AsArrayHandle(particles); diff --git a/vtkm/filter/flow/internal/AdvectAlgorithm.h b/vtkm/filter/flow/internal/AdvectAlgorithm.h index 112d98501..fdc47708a 100644 --- a/vtkm/filter/flow/internal/AdvectAlgorithm.h +++ b/vtkm/filter/flow/internal/AdvectAlgorithm.h @@ -15,6 +15,10 @@ #include #include #include +#ifdef VTKM_ENABLE_MPI +#include +#include +#endif namespace vtkm { @@ -25,6 +29,83 @@ namespace flow namespace internal { +class AdvectAlgorithmTerminator +{ +public: + AdvectAlgorithmTerminator(vtkmdiy::mpi::communicator& comm) + : MPIComm(vtkmdiy::mpi::mpi_cast(comm.handle())) + { + } + + void AddWork() + { +#ifdef VTKM_ENABLE_MPI + this->Dirty = 1; +#endif + } + + bool Done() const { return this->State == AdvectAlgorithmTerminatorState::DONE; } + + void Control(bool haveLocalWork) + { +#ifdef VTKM_ENABLE_MPI + if (this->State == STATE_0 && !haveLocalWork) + { + MPI_Ibarrier(this->MPIComm, &this->StateReq); + this->Dirty = 0; + this->State = STATE_1; + } + else if (this->State == STATE_1) + { + MPI_Status status; + int flag; + MPI_Test(&this->StateReq, &flag, &status); + if (flag == 1) + { + int localDirty = this->Dirty; + MPI_Iallreduce( + &localDirty, &this->AllDirty, 1, MPI_INT, MPI_LOR, this->MPIComm, &this->StateReq); + this->State = STATE_2; + } + } + else if (this->State == STATE_2) + { + MPI_Status status; + int flag; + MPI_Test(&this->StateReq, &flag, &status); + if (flag == 1) + { + if (this->AllDirty == 0) //done + this->State = DONE; + else + this->State = STATE_0; //reset. + } + } +#else + if (!haveLocalWork) + this->State = DONE; +#endif + } + +private: + enum AdvectAlgorithmTerminatorState + { + STATE_0, + STATE_1, + STATE_2, + DONE + }; + + AdvectAlgorithmTerminatorState State = AdvectAlgorithmTerminatorState::STATE_0; + +#ifdef VTKM_ENABLE_MPI + std::atomic Dirty; + int AllDirty = 0; + MPI_Request StateReq; + MPI_Comm MPIComm; +#endif +}; + template class AdvectAlgorithm { @@ -39,6 +120,7 @@ public: , NumRanks(this->Comm.size()) , Rank(this->Comm.rank()) , UseAsynchronousCommunication(useAsyncComm) + , Terminator(this->Comm) { } @@ -97,27 +179,21 @@ public: vtkm::filter::flow::internal::ParticleMessenger messenger( this->Comm, this->UseAsynchronousCommunication, this->BoundsMap, 1, 128); - this->ComputeTotalNumParticles(); - - while (this->TotalNumTerminatedParticles < this->TotalNumParticles) + while (!this->Terminator.Done()) { std::vector v; - vtkm::Id numTerm = 0, blockId = -1; + vtkm::Id blockId = -1; if (this->GetActiveParticles(v, blockId)) { //make this a pointer to avoid the copy? auto& block = this->GetDataSet(blockId); DSIHelperInfo bb(v, this->BoundsMap, this->ParticleBlockIDsMap); block.Advect(bb, this->StepSize); - numTerm = this->UpdateResult(bb); + this->UpdateResult(bb); } - vtkm::Id numTermMessages = 0; - this->Communicate(messenger, numTerm, numTermMessages); - - this->TotalNumTerminatedParticles += (numTerm + numTermMessages); - if (this->TotalNumTerminatedParticles > this->TotalNumParticles) - throw vtkm::cont::ErrorFilterExecution("Particle count error"); + this->Communicate(messenger); + this->Terminator.Control(!this->Active.empty()); } } @@ -128,19 +204,6 @@ public: this->ParticleBlockIDsMap.clear(); } - void ComputeTotalNumParticles() - { - vtkm::Id numLocal = static_cast(this->Inactive.size()); - for (const auto& it : this->Active) - numLocal += it.second.size(); - -#ifdef VTKM_ENABLE_MPI - vtkmdiy::mpi::all_reduce(this->Comm, numLocal, this->TotalNumParticles, std::plus{}); -#else - this->TotalNumParticles = numLocal; -#endif - } - DataSetIntegrator& GetDataSet(vtkm::Id id) { for (auto& it : this->Blocks) @@ -213,9 +276,7 @@ public: return !particles.empty(); } - void Communicate(vtkm::filter::flow::internal::ParticleMessenger& messenger, - vtkm::Id numLocalTerminations, - vtkm::Id& numTermMessages) + void Communicate(vtkm::filter::flow::internal::ParticleMessenger& messenger) { std::vector outgoing; std::vector outgoingRanks; @@ -224,16 +285,17 @@ public: std::vector incoming; std::unordered_map> incomingBlockIDs; - numTermMessages = 0; + bool block = false; #ifdef VTKM_ENABLE_MPI - block = this->GetBlockAndWait(messenger.UsingSyncCommunication(), numLocalTerminations); + block = this->GetBlockAndWait(messenger.UsingSyncCommunication()); #endif + vtkm::Id numTermMessages; messenger.Exchange(outgoing, outgoingRanks, this->ParticleBlockIDsMap, - numLocalTerminations, + 0, incoming, incomingBlockIDs, numTermMessages, @@ -311,17 +373,22 @@ public: { VTKM_ASSERT(particles.size() == idsMap.size()); - for (auto pit = particles.begin(); pit != particles.end(); pit++) + if (!particles.empty()) { - vtkm::Id particleID = pit->GetID(); - const auto& it = idsMap.find(particleID); - VTKM_ASSERT(it != idsMap.end() && !it->second.empty()); - vtkm::Id blockId = it->second[0]; - this->Active[blockId].emplace_back(*pit); - } + this->Terminator.AddWork(); - for (const auto& it : idsMap) - this->ParticleBlockIDsMap[it.first] = it.second; + for (auto pit = particles.begin(); pit != particles.end(); pit++) + { + vtkm::Id particleID = pit->GetID(); + const auto& it = idsMap.find(particleID); + VTKM_ASSERT(it != idsMap.end() && !it->second.empty()); + vtkm::Id blockId = it->second[0]; + this->Active[blockId].emplace_back(*pit); + } + + for (const auto& it : idsMap) + this->ParticleBlockIDsMap[it.first] = it.second; + } } virtual void UpdateInactive(const std::vector& particles, @@ -351,7 +418,7 @@ public: } - virtual bool GetBlockAndWait(const bool& syncComm, const vtkm::Id& numLocalTerm) + virtual bool GetBlockAndWait(const bool& syncComm) { bool haveNoWork = this->Active.empty() && this->Inactive.empty(); @@ -367,9 +434,11 @@ public: //2. numLocalTerm + this->TotalNumberOfTerminatedParticles == this->TotalNumberOfParticles //So, if neither are true, we can safely block and wait for communication to come in. - if (haveNoWork && - (numLocalTerm + this->TotalNumTerminatedParticles < this->TotalNumParticles)) - return true; + // if (this->Terminator.State == AdvectAlgorithmTerminator::AdvectAlgorithmTerminatorState::STATE_2) + // return true; + + // if (haveNoWork && (numLocalTerm + this->TotalNumTerminatedParticles < this->TotalNumParticles)) + // return true; return false; } @@ -388,9 +457,8 @@ public: std::unordered_map> ParticleBlockIDsMap; vtkm::Id Rank; vtkm::FloatDefault StepSize; - vtkm::Id TotalNumParticles = 0; - vtkm::Id TotalNumTerminatedParticles = 0; bool UseAsynchronousCommunication = true; + AdvectAlgorithmTerminator Terminator; }; } diff --git a/vtkm/filter/flow/internal/AdvectAlgorithmThreaded.h b/vtkm/filter/flow/internal/AdvectAlgorithmThreaded.h index db4e651fb..8531644bb 100644 --- a/vtkm/filter/flow/internal/AdvectAlgorithmThreaded.h +++ b/vtkm/filter/flow/internal/AdvectAlgorithmThreaded.h @@ -50,8 +50,6 @@ public: void Go() override { - this->ComputeTotalNumParticles(); - std::vector workerThreads; workerThreads.emplace_back(std::thread(AdvectAlgorithmThreaded::Worker, this)); this->Manage(); @@ -63,6 +61,12 @@ public: } protected: + bool HaveActiveParticles() + { + std::lock_guard lock(this->Mutex); + return !this->Active.empty(); + } + bool GetActiveParticles(std::vector& particles, vtkm::Id& blockId) override { std::lock_guard lock(this->Mutex); @@ -144,38 +148,31 @@ protected: vtkm::filter::flow::internal::ParticleMessenger messenger( this->Comm, useAsync, this->BoundsMap, 1, 128); - while (this->TotalNumTerminatedParticles < this->TotalNumParticles) + while (!this->Terminator.Done()) { std::unordered_map>> workerResults; this->GetWorkerResults(workerResults); - vtkm::Id numTerm = 0; for (auto& it : workerResults) - { for (auto& r : it.second) - numTerm += this->UpdateResult(r); - } + this->UpdateResult(r); - vtkm::Id numTermMessages = 0; - this->Communicate(messenger, numTerm, numTermMessages); - - this->TotalNumTerminatedParticles += (numTerm + numTermMessages); - if (this->TotalNumTerminatedParticles > this->TotalNumParticles) - throw vtkm::cont::ErrorFilterExecution("Particle count error"); + this->Communicate(messenger); + this->Terminator.Control(this->HaveActiveParticles()); } //Let the workers know that we are done. this->SetDone(); } - bool GetBlockAndWait(const bool& syncComm, const vtkm::Id& numLocalTerm) override + bool GetBlockAndWait(const bool& syncComm) override { std::lock_guard lock(this->Mutex); if (this->Done) return true; - return (this->AdvectAlgorithm::GetBlockAndWait(syncComm, numLocalTerm) && - !this->WorkerActivate && this->WorkerResults.empty()); + return (this->AdvectAlgorithm::GetBlockAndWait(syncComm) && !this->WorkerActivate && + this->WorkerResults.empty()); } void GetWorkerResults( From dd96f1144b4c50324e7432fd4087dee831f2ce0c Mon Sep 17 00:00:00 2001 From: Dave Pugmire Date: Wed, 24 Jan 2024 07:23:55 -0500 Subject: [PATCH 2/3] Fixes for non-mpi code... --- vtkm/filter/flow/internal/AdvectAlgorithm.h | 4 ++++ vtkm/filter/flow/internal/AdvectAlgorithmThreaded.h | 10 +++++----- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/vtkm/filter/flow/internal/AdvectAlgorithm.h b/vtkm/filter/flow/internal/AdvectAlgorithm.h index fdc47708a..bb716dde8 100644 --- a/vtkm/filter/flow/internal/AdvectAlgorithm.h +++ b/vtkm/filter/flow/internal/AdvectAlgorithm.h @@ -32,8 +32,12 @@ namespace internal class AdvectAlgorithmTerminator { public: +#ifdef VTKM_ENABLE_MPI AdvectAlgorithmTerminator(vtkmdiy::mpi::communicator& comm) : MPIComm(vtkmdiy::mpi::mpi_cast(comm.handle())) +#else + AdvectAlgorithmTerminator(vtkmdiy::mpi::communicator& vtkmNotUsed(comm)) +#endif { } diff --git a/vtkm/filter/flow/internal/AdvectAlgorithmThreaded.h b/vtkm/filter/flow/internal/AdvectAlgorithmThreaded.h index 8531644bb..015673e65 100644 --- a/vtkm/filter/flow/internal/AdvectAlgorithmThreaded.h +++ b/vtkm/filter/flow/internal/AdvectAlgorithmThreaded.h @@ -39,7 +39,6 @@ public: bool useAsyncComm) : AdvectAlgorithm(bm, blocks, useAsyncComm) , Done(false) - , WorkerActivate(false) { //For threaded algorithm, the particles go out of scope in the Work method. //When this happens, they are destructed by the time the Manage thread gets them. @@ -61,10 +60,11 @@ public: } protected: - bool HaveActiveParticles() + bool HaveAnyWork() { std::lock_guard lock(this->Mutex); - return !this->Active.empty(); + //We have work if there particles in any queues or a worker is busy. + return !this->Active.empty() || !this->Inactive.empty() || this->WorkerActivate; } bool GetActiveParticles(std::vector& particles, vtkm::Id& blockId) override @@ -158,7 +158,7 @@ protected: this->UpdateResult(r); this->Communicate(messenger); - this->Terminator.Control(this->HaveActiveParticles()); + this->Terminator.Control(this->HaveAnyWork()); } //Let the workers know that we are done. @@ -190,7 +190,7 @@ protected: std::atomic Done; std::mutex Mutex; - bool WorkerActivate; + bool WorkerActivate = false; std::condition_variable WorkerActivateCondition; std::unordered_map>> WorkerResults; }; From ade25db6d56ae8391a895b03a7fd39e490741a13 Mon Sep 17 00:00:00 2001 From: Dave Pugmire Date: Wed, 3 Jul 2024 07:42:39 -0400 Subject: [PATCH 3/3] Create ParticleExchanger. --- vtkm/filter/flow/internal/AdvectAlgorithm.h | 106 +++----- .../flow/internal/AdvectAlgorithmTerminator.h | 111 +++++++++ vtkm/filter/flow/internal/CMakeLists.txt | 2 + vtkm/filter/flow/internal/ParticleExchanger.h | 228 ++++++++++++++++++ 4 files changed, 370 insertions(+), 77 deletions(-) create mode 100644 vtkm/filter/flow/internal/AdvectAlgorithmTerminator.h create mode 100644 vtkm/filter/flow/internal/ParticleExchanger.h diff --git a/vtkm/filter/flow/internal/AdvectAlgorithm.h b/vtkm/filter/flow/internal/AdvectAlgorithm.h index bb716dde8..b826a01a8 100644 --- a/vtkm/filter/flow/internal/AdvectAlgorithm.h +++ b/vtkm/filter/flow/internal/AdvectAlgorithm.h @@ -12,8 +12,10 @@ #define vtk_m_filter_flow_internal_AdvectAlgorithm_h #include +#include #include #include +#include #include #ifdef VTKM_ENABLE_MPI #include @@ -29,86 +31,26 @@ namespace flow namespace internal { -class AdvectAlgorithmTerminator -{ -public: -#ifdef VTKM_ENABLE_MPI - AdvectAlgorithmTerminator(vtkmdiy::mpi::communicator& comm) - : MPIComm(vtkmdiy::mpi::mpi_cast(comm.handle())) -#else - AdvectAlgorithmTerminator(vtkmdiy::mpi::communicator& vtkmNotUsed(comm)) -#endif - { - } +/* +ParticleMessenger::Exchange() + - SendParticles(outData--> map[dstRank]=vector of pairs); + -- SendParticles(map...) + --- for each m : map SendParticles(m); + ---- SendParticles(dst, container) + ----- serialize, SendData(dst, buff); + ------ SendDataAsync(dst,buff) + ------- header??, req=mpi_isend(), store req. - void AddWork() - { -#ifdef VTKM_ENABLE_MPI - this->Dirty = 1; -#endif - } + - RecvAny(data, block); + -- RecvData(tags, buffers, block) + --- RecvDataAsyncProbe(tag, buffers, block) + ---- while (true) + ----- if block: MPI_Probe() msgReceived=true + ----- else : MPI_Iprobe msgReceived = check + ----- if msgRecvd: MPI_Get_count(), MPI_Recv(), buffers, blockAndWait=false - bool Done() const { return this->State == AdvectAlgorithmTerminatorState::DONE; } - void Control(bool haveLocalWork) - { -#ifdef VTKM_ENABLE_MPI - if (this->State == STATE_0 && !haveLocalWork) - { - MPI_Ibarrier(this->MPIComm, &this->StateReq); - this->Dirty = 0; - this->State = STATE_1; - } - else if (this->State == STATE_1) - { - MPI_Status status; - int flag; - MPI_Test(&this->StateReq, &flag, &status); - if (flag == 1) - { - int localDirty = this->Dirty; - MPI_Iallreduce( - &localDirty, &this->AllDirty, 1, MPI_INT, MPI_LOR, this->MPIComm, &this->StateReq); - this->State = STATE_2; - } - } - else if (this->State == STATE_2) - { - MPI_Status status; - int flag; - MPI_Test(&this->StateReq, &flag, &status); - if (flag == 1) - { - if (this->AllDirty == 0) //done - this->State = DONE; - else - this->State = STATE_0; //reset. - } - } -#else - if (!haveLocalWork) - this->State = DONE; -#endif - } - -private: - enum AdvectAlgorithmTerminatorState - { - STATE_0, - STATE_1, - STATE_2, - DONE - }; - - AdvectAlgorithmTerminatorState State = AdvectAlgorithmTerminatorState::STATE_0; - -#ifdef VTKM_ENABLE_MPI - std::atomic Dirty; - int AllDirty = 0; - MPI_Request StateReq; - MPI_Comm MPIComm; -#endif -}; +*/ template class AdvectAlgorithm @@ -125,6 +67,7 @@ public: , Rank(this->Comm.rank()) , UseAsynchronousCommunication(useAsyncComm) , Terminator(this->Comm) + , Exchanger(this->Comm) { } @@ -280,6 +223,11 @@ public: return !particles.empty(); } + void ExchangeParticles() + { + // this->Exchanger.Exchange(outgoing, outgoingRanks, this->ParticleBlockIDsMap, incoming, incomingBlockIDs, block); + } + void Communicate(vtkm::filter::flow::internal::ParticleMessenger& messenger) { std::vector outgoing; @@ -295,6 +243,8 @@ public: block = this->GetBlockAndWait(messenger.UsingSyncCommunication()); #endif + // this->Exchanger.Exchange(outgoing, outgoingRanks, this->ParticleBlockIDsMap, incoming, incomingBlockIDs, block); + vtkm::Id numTermMessages; messenger.Exchange(outgoing, outgoingRanks, @@ -463,6 +413,8 @@ public: vtkm::FloatDefault StepSize; bool UseAsynchronousCommunication = true; AdvectAlgorithmTerminator Terminator; + + ParticleExchanger Exchanger; }; } diff --git a/vtkm/filter/flow/internal/AdvectAlgorithmTerminator.h b/vtkm/filter/flow/internal/AdvectAlgorithmTerminator.h new file mode 100644 index 000000000..7b955a326 --- /dev/null +++ b/vtkm/filter/flow/internal/AdvectAlgorithmTerminator.h @@ -0,0 +1,111 @@ +//============================================================================ +// Copyright (c) Kitware, Inc. +// All rights reserved. +// See LICENSE.txt for details. +// +// This software is distributed WITHOUT ANY WARRANTY; without even +// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +// PURPOSE. See the above copyright notice for more information. +//============================================================================ + +#ifndef vtk_m_filter_flow_internal_AdvectAlgorithmTerminator_h +#define vtk_m_filter_flow_internal_AdvectAlgorithmTerminator_h + +namespace vtkm +{ +namespace filter +{ +namespace flow +{ +namespace internal +{ + +class AdvectAlgorithmTerminator +{ +public: +#ifdef VTKM_ENABLE_MPI + AdvectAlgorithmTerminator(vtkmdiy::mpi::communicator& comm) + : MPIComm(vtkmdiy::mpi::mpi_cast(comm.handle())) +#else + AdvectAlgorithmTerminator(vtkmdiy::mpi::communicator& vtkmNotUsed(comm)) +#endif + { + } + + void AddWork() + { +#ifdef VTKM_ENABLE_MPI + this->Dirty = 1; +#endif + } + + bool Done() const { return this->State == AdvectAlgorithmTerminatorState::DONE; } + + void Control(bool haveLocalWork) + { +#ifdef VTKM_ENABLE_MPI + if (this->State == STATE_0 && !haveLocalWork) + { + MPI_Ibarrier(this->MPIComm, &this->StateReq); + this->Dirty = 0; + this->State = STATE_1; + } + else if (this->State == STATE_1) + { + MPI_Status status; + int flag; + MPI_Test(&this->StateReq, &flag, &status); + if (flag == 1) + { + int localDirty = this->Dirty; + MPI_Iallreduce( + &localDirty, &this->AllDirty, 1, MPI_INT, MPI_LOR, this->MPIComm, &this->StateReq); + this->State = STATE_2; + } + } + else if (this->State == STATE_2) + { + MPI_Status status; + int flag; + MPI_Test(&this->StateReq, &flag, &status); + if (flag == 1) + { + if (this->AllDirty == 0) //done + this->State = DONE; + else + this->State = STATE_0; //reset. + } + } +#else + if (!haveLocalWork) + this->State = DONE; +#endif + } + +private: + enum AdvectAlgorithmTerminatorState + { + STATE_0, + STATE_1, + STATE_2, + DONE + }; + + AdvectAlgorithmTerminatorState State = AdvectAlgorithmTerminatorState::STATE_0; + +#ifdef VTKM_ENABLE_MPI + std::atomic Dirty; + int AllDirty = 0; + MPI_Request StateReq; + MPI_Comm MPIComm; +#endif +}; + + +} +} +} +} //vtkm::filter::flow::internal + + +#endif //vtk_m_filter_flow_internal_AdvectAlgorithmTerminator_h diff --git a/vtkm/filter/flow/internal/CMakeLists.txt b/vtkm/filter/flow/internal/CMakeLists.txt index b59dedceb..880c04fa5 100644 --- a/vtkm/filter/flow/internal/CMakeLists.txt +++ b/vtkm/filter/flow/internal/CMakeLists.txt @@ -10,6 +10,7 @@ set(headers AdvectAlgorithm.h + AdvectAlgorithmTerminator.h AdvectAlgorithmThreaded.h BoundsMap.h DataSetIntegrator.h @@ -19,6 +20,7 @@ set(headers LagrangianStructureHelpers.h Messenger.h ParticleAdvector.h + ParticleExchanger.h ParticleMessenger.h ) diff --git a/vtkm/filter/flow/internal/ParticleExchanger.h b/vtkm/filter/flow/internal/ParticleExchanger.h new file mode 100644 index 000000000..f5ad02ca6 --- /dev/null +++ b/vtkm/filter/flow/internal/ParticleExchanger.h @@ -0,0 +1,228 @@ +//============================================================================ +// Copyright (c) Kitware, Inc. +// All rights reserved. +// See LICENSE.txt for details. +// +// This software is distributed WITHOUT ANY WARRANTY; without even +// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +// PURPOSE. See the above copyright notice for more information. +//============================================================================ + +#ifndef vtk_m_filter_flow_internal_ParticleExchanger_h +#define vtk_m_filter_flow_internal_ParticleExchanger_h + +namespace vtkm +{ +namespace filter +{ +namespace flow +{ +namespace internal +{ + +template +class ParticleExchanger +{ +public: +#ifdef VTKM_ENABLE_MPI + ParticleExchanger(vtkmdiy::mpi::communicator& comm) + : MPIComm(vtkmdiy::mpi::mpi_cast(comm.handle())) + , NumRanks(comm.size()) + , Rank(comm.rank()) +#else + ParticleExchanger(vtkmdiy::mpi::communicator& vtkmNotUsed(comm)) +#endif + { + } +#ifdef VTKM_ENABLE_MPI + ~ParticleExchanger() { this->CleanupSendBuffers(); } +#endif + + void Exchange(const std::vector& outData, + const std::vector& outRanks, + const std::unordered_map>& outBlockIDsMap, + std::vector& inData, + std::unordered_map>& inDataBlockIDsMap, + bool blockAndWait) + { + VTKM_ASSERT(outData.size() == outRanks.size()); + + if (this->NumRanks == 1) + this->SerialExchange(outData, outBlockIDsMap, inData, inDataBlockIDsMap); +#ifdef VTKM_ENABLE_MPI + else + { + this->SendParticles(outData, outRanks, outBlockIDsMap); + this->RecvParticles(inData, inDataBlockIDsMap, blockAndWait); + this->CleanupSendBuffers(); + } +#endif + } + +private: + // pair(vector of particles, vector of blockIds) + //using ParticleCommType = std::pair, std::vector>; + // pair(particle, bids); + using ParticleCommType = std::pair>; + + void CleanupSendBuffers() { std::cout << "IMPLEMENT ME!!!" << std::endl; } + + void SendParticles(const std::vector& outData, + const std::vector& outRanks, + const std::unordered_map>& outBlockIDsMap) + { + if (outData.empty()) + return; + + //create the send data: vector of particles, vector of vector of blockIds. + std::size_t n = outData.size(); + std::unordered_map> sendData; + + // dst, vector of pair(particles, blockIds) + for (std::size_t i = 0; i < n; i++) + { + const auto& bids = outBlockIDsMap.find(outData[i].GetID())->second; + sendData[outRanks[i]].emplace_back(std::make_pair(std::move(outData[i]), std::move(bids))); + } + + //Send to dst, vector> + for (auto& si : sendData) + this->SendParticlesToDst(si.first, si.second); + } + + void SendParticlesToDst(int dst, const std::vector& data) + { + if (dst == this->Rank) + { + VTKM_LOG_S(vtkm::cont::LogLevel::Error, "Error. Sending a particle to yourself."); + return; + } + + //Serialize vector(pair(particle, bids)) and send. + vtkmdiy::MemoryBuffer* bb = new vtkmdiy::MemoryBuffer(); + vtkmdiy::save(*bb, data); + + MPI_Request req; + int err = MPI_Isend(bb->buffer.data(), bb->size(), MPI_BYTE, dst, 0, this->MPIComm, &req); + if (err != MPI_SUCCESS) + throw vtkm::cont::ErrorFilterExecution("Error in MPI_Isend inside Messenger::SendData"); + this->SendBuffers[req] = bb; + } + + void RecvParticles(std::vector& inData, + std::unordered_map>& inDataBlockIDsMap, + bool blockAndWait) const + { + inData.resize(0); + inDataBlockIDsMap.clear(); + + std::vector buffers; + + MPI_Status status; + while (true) + { + bool msgReceived = false; + int err; + if (blockAndWait) + { + err = MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, this->MPIComm, &status); + if (err != MPI_SUCCESS) + throw vtkm::cont::ErrorFilterExecution( + "Error in MPI_Probe in ParticleExchanger::RecvParticles"); + msgReceived = true; + } + else + { + int flag = 0; + err = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, this->MPIComm, &flag, &status); + if (err != MPI_SUCCESS) + throw vtkm::cont::ErrorFilterExecution( + "Error in MPI_Probe in ParticleExchanger::RecvParticles"); + msgReceived = (flag == 1); + } + + if (msgReceived) + { + int incomingSize; + err = MPI_Get_count(&status, MPI_BYTE, &incomingSize); + if (err != MPI_SUCCESS) + throw vtkm::cont::ErrorFilterExecution( + "Error in MPI_Probe in ParticleExchanger::RecvParticles"); + + std::vector recvBuff; + recvBuff.resize(incomingSize); + MPI_Status recvStatus; + + err = MPI_Recv(recvBuff.data(), + incomingSize, + MPI_BYTE, + status.MPI_SOURCE, + status.MPI_TAG, + this->MPIComm, + &recvStatus); + if (err != MPI_SUCCESS) + throw vtkm::cont::ErrorFilterExecution( + "Error in MPI_Probe in ParticleExchanger::RecvParticles"); + + vtkmdiy::MemoryBuffer memBuff; + vtkmdiy::save(memBuff, recvBuff); + buffers.emplace_back(std::move(memBuff)); + + blockAndWait = false; //Check one more time to see if anything else arrived. + } + else + { + break; + } + } + + //Unpack buffers into particle data. + //buffers: vector)> + for (auto& b : buffers) + { + std::vector data; + vtkmdiy::load(b, data); + + for (auto& d : data) + { + const auto& particle = d.first; + const auto& bids = d.second; + inDataBlockIDsMap[particle.GetID()] = std::move(bids); + inData.emplace_back(std::move(particle)); + } + } + } + + void SerialExchange(const std::vector& outData, + const std::unordered_map>& outBlockIDsMap, + std::vector& inData, + std::unordered_map>& inDataBlockIDsMap) + { + //Copy output to input. + for (const auto& p : outData) + { + const auto& bids = outBlockIDsMap.find(p.GetID())->second; + inData.emplace_back(p); + inDataBlockIDsMap[p.GetID()] = bids; + } + } + + +#ifdef VTKM_ENABLE_MPI + MPI_Comm MPIComm; + vtkm::Id NumRanks; + vtkm::Id Rank; + std::unordered_map SendBuffers; +#else + vtkm::Id NumRanks = 1; + vtkm::Id Rank = 0; +#endif +}; + +} +} +} +} //vtkm::filter::flow::internal + + +#endif //vtk_m_filter_flow_internal_ParticleExchanger_h