diff --git a/vtkm/filter/flow/FilterParticleAdvection.h b/vtkm/filter/flow/FilterParticleAdvection.h index 8083905e3..9f0ecb71c 100644 --- a/vtkm/filter/flow/FilterParticleAdvection.h +++ b/vtkm/filter/flow/FilterParticleAdvection.h @@ -15,6 +15,7 @@ #include #include #include +#include #include namespace vtkm @@ -104,7 +105,7 @@ protected: bool BlockIdsSet = false; std::vector BlockIds; - + vtkm::filter::flow::internal::BoundsMap BoundsMap; vtkm::Id NumberOfSteps = 0; vtkm::cont::UnknownArrayHandle Seeds; vtkm::filter::flow::IntegrationSolverType SolverType = diff --git a/vtkm/filter/flow/FilterParticleAdvectionSteadyState.cxx b/vtkm/filter/flow/FilterParticleAdvectionSteadyState.cxx index c70c2b3d6..5f4711a39 100644 --- a/vtkm/filter/flow/FilterParticleAdvectionSteadyState.cxx +++ b/vtkm/filter/flow/FilterParticleAdvectionSteadyState.cxx @@ -58,13 +58,15 @@ FilterParticleAdvectionSteadyState::DoExecutePartitions( DataSetIntegratorSteadyState; this->ValidateOptions(); + if (this->BlockIdsSet) + this->BoundsMap = vtkm::filter::flow::internal::BoundsMap(input, this->BlockIds); + else + this->BoundsMap = vtkm::filter::flow::internal::BoundsMap(input); - - vtkm::filter::flow::internal::BoundsMap boundsMap(input); std::vector dsi; for (vtkm::Id i = 0; i < input.GetNumberOfPartitions(); i++) { - vtkm::Id blockId = boundsMap.GetLocalBlockId(i); + vtkm::Id blockId = this->BoundsMap.GetLocalBlockId(i); auto dataset = input.GetPartition(i); // Build the field for the current dataset @@ -78,7 +80,7 @@ FilterParticleAdvectionSteadyState::DoExecutePartitions( } vtkm::filter::flow::internal::ParticleAdvector pav( - boundsMap, dsi, this->UseThreadedAlgorithm, this->UseAsynchronousCommunication); + this->BoundsMap, dsi, this->UseThreadedAlgorithm, this->UseAsynchronousCommunication); vtkm::cont::ArrayHandle particles; this->Seeds.AsArrayHandle(particles); diff --git a/vtkm/filter/flow/FilterParticleAdvectionUnsteadyState.cxx b/vtkm/filter/flow/FilterParticleAdvectionUnsteadyState.cxx index 7d72cfcc8..33963047b 100644 --- a/vtkm/filter/flow/FilterParticleAdvectionUnsteadyState.cxx +++ b/vtkm/filter/flow/FilterParticleAdvectionUnsteadyState.cxx @@ -55,12 +55,15 @@ FilterParticleAdvectionUnsteadyState::DoExecutePartitions( using DSIType = vtkm::filter::flow::internal:: DataSetIntegratorUnsteadyState; - vtkm::filter::flow::internal::BoundsMap boundsMap(input); + if (this->BlockIdsSet) + this->BoundsMap = vtkm::filter::flow::internal::BoundsMap(input, this->BlockIds); + else + this->BoundsMap = vtkm::filter::flow::internal::BoundsMap(input); std::vector dsi; for (vtkm::Id i = 0; i < input.GetNumberOfPartitions(); i++) { - vtkm::Id blockId = boundsMap.GetLocalBlockId(i); + vtkm::Id blockId = this->BoundsMap.GetLocalBlockId(i); auto ds1 = input.GetPartition(i); auto ds2 = this->Input2.GetPartition(i); @@ -85,7 +88,7 @@ FilterParticleAdvectionUnsteadyState::DoExecutePartitions( analysis); } vtkm::filter::flow::internal::ParticleAdvector pav( - boundsMap, dsi, this->UseThreadedAlgorithm, this->UseAsynchronousCommunication); + this->BoundsMap, dsi, this->UseThreadedAlgorithm, this->UseAsynchronousCommunication); vtkm::cont::ArrayHandle particles; this->Seeds.AsArrayHandle(particles); diff --git a/vtkm/filter/flow/internal/AdvectAlgorithm.h b/vtkm/filter/flow/internal/AdvectAlgorithm.h index 112d98501..bb716dde8 100644 --- a/vtkm/filter/flow/internal/AdvectAlgorithm.h +++ b/vtkm/filter/flow/internal/AdvectAlgorithm.h @@ -15,6 +15,10 @@ #include #include #include +#ifdef VTKM_ENABLE_MPI +#include +#include +#endif namespace vtkm { @@ -25,6 +29,87 @@ namespace flow namespace internal { +class AdvectAlgorithmTerminator +{ +public: +#ifdef VTKM_ENABLE_MPI + AdvectAlgorithmTerminator(vtkmdiy::mpi::communicator& comm) + : MPIComm(vtkmdiy::mpi::mpi_cast(comm.handle())) +#else + AdvectAlgorithmTerminator(vtkmdiy::mpi::communicator& vtkmNotUsed(comm)) +#endif + { + } + + void AddWork() + { +#ifdef VTKM_ENABLE_MPI + this->Dirty = 1; +#endif + } + + bool Done() const { return this->State == AdvectAlgorithmTerminatorState::DONE; } + + void Control(bool haveLocalWork) + { +#ifdef VTKM_ENABLE_MPI + if (this->State == STATE_0 && !haveLocalWork) + { + MPI_Ibarrier(this->MPIComm, &this->StateReq); + this->Dirty = 0; + this->State = STATE_1; + } + else if (this->State == STATE_1) + { + MPI_Status status; + int flag; + MPI_Test(&this->StateReq, &flag, &status); + if (flag == 1) + { + int localDirty = this->Dirty; + MPI_Iallreduce( + &localDirty, &this->AllDirty, 1, MPI_INT, MPI_LOR, this->MPIComm, &this->StateReq); + this->State = STATE_2; + } + } + else if (this->State == STATE_2) + { + MPI_Status status; + int flag; + MPI_Test(&this->StateReq, &flag, &status); + if (flag == 1) + { + if (this->AllDirty == 0) //done + this->State = DONE; + else + this->State = STATE_0; //reset. + } + } +#else + if (!haveLocalWork) + this->State = DONE; +#endif + } + +private: + enum AdvectAlgorithmTerminatorState + { + STATE_0, + STATE_1, + STATE_2, + DONE + }; + + AdvectAlgorithmTerminatorState State = AdvectAlgorithmTerminatorState::STATE_0; + +#ifdef VTKM_ENABLE_MPI + std::atomic Dirty; + int AllDirty = 0; + MPI_Request StateReq; + MPI_Comm MPIComm; +#endif +}; + template class AdvectAlgorithm { @@ -39,6 +124,7 @@ public: , NumRanks(this->Comm.size()) , Rank(this->Comm.rank()) , UseAsynchronousCommunication(useAsyncComm) + , Terminator(this->Comm) { } @@ -97,27 +183,21 @@ public: vtkm::filter::flow::internal::ParticleMessenger messenger( this->Comm, this->UseAsynchronousCommunication, this->BoundsMap, 1, 128); - this->ComputeTotalNumParticles(); - - while (this->TotalNumTerminatedParticles < this->TotalNumParticles) + while (!this->Terminator.Done()) { std::vector v; - vtkm::Id numTerm = 0, blockId = -1; + vtkm::Id blockId = -1; if (this->GetActiveParticles(v, blockId)) { //make this a pointer to avoid the copy? auto& block = this->GetDataSet(blockId); DSIHelperInfo bb(v, this->BoundsMap, this->ParticleBlockIDsMap); block.Advect(bb, this->StepSize); - numTerm = this->UpdateResult(bb); + this->UpdateResult(bb); } - vtkm::Id numTermMessages = 0; - this->Communicate(messenger, numTerm, numTermMessages); - - this->TotalNumTerminatedParticles += (numTerm + numTermMessages); - if (this->TotalNumTerminatedParticles > this->TotalNumParticles) - throw vtkm::cont::ErrorFilterExecution("Particle count error"); + this->Communicate(messenger); + this->Terminator.Control(!this->Active.empty()); } } @@ -128,19 +208,6 @@ public: this->ParticleBlockIDsMap.clear(); } - void ComputeTotalNumParticles() - { - vtkm::Id numLocal = static_cast(this->Inactive.size()); - for (const auto& it : this->Active) - numLocal += it.second.size(); - -#ifdef VTKM_ENABLE_MPI - vtkmdiy::mpi::all_reduce(this->Comm, numLocal, this->TotalNumParticles, std::plus{}); -#else - this->TotalNumParticles = numLocal; -#endif - } - DataSetIntegrator& GetDataSet(vtkm::Id id) { for (auto& it : this->Blocks) @@ -213,9 +280,7 @@ public: return !particles.empty(); } - void Communicate(vtkm::filter::flow::internal::ParticleMessenger& messenger, - vtkm::Id numLocalTerminations, - vtkm::Id& numTermMessages) + void Communicate(vtkm::filter::flow::internal::ParticleMessenger& messenger) { std::vector outgoing; std::vector outgoingRanks; @@ -224,16 +289,17 @@ public: std::vector incoming; std::unordered_map> incomingBlockIDs; - numTermMessages = 0; + bool block = false; #ifdef VTKM_ENABLE_MPI - block = this->GetBlockAndWait(messenger.UsingSyncCommunication(), numLocalTerminations); + block = this->GetBlockAndWait(messenger.UsingSyncCommunication()); #endif + vtkm::Id numTermMessages; messenger.Exchange(outgoing, outgoingRanks, this->ParticleBlockIDsMap, - numLocalTerminations, + 0, incoming, incomingBlockIDs, numTermMessages, @@ -311,17 +377,22 @@ public: { VTKM_ASSERT(particles.size() == idsMap.size()); - for (auto pit = particles.begin(); pit != particles.end(); pit++) + if (!particles.empty()) { - vtkm::Id particleID = pit->GetID(); - const auto& it = idsMap.find(particleID); - VTKM_ASSERT(it != idsMap.end() && !it->second.empty()); - vtkm::Id blockId = it->second[0]; - this->Active[blockId].emplace_back(*pit); - } + this->Terminator.AddWork(); - for (const auto& it : idsMap) - this->ParticleBlockIDsMap[it.first] = it.second; + for (auto pit = particles.begin(); pit != particles.end(); pit++) + { + vtkm::Id particleID = pit->GetID(); + const auto& it = idsMap.find(particleID); + VTKM_ASSERT(it != idsMap.end() && !it->second.empty()); + vtkm::Id blockId = it->second[0]; + this->Active[blockId].emplace_back(*pit); + } + + for (const auto& it : idsMap) + this->ParticleBlockIDsMap[it.first] = it.second; + } } virtual void UpdateInactive(const std::vector& particles, @@ -351,7 +422,7 @@ public: } - virtual bool GetBlockAndWait(const bool& syncComm, const vtkm::Id& numLocalTerm) + virtual bool GetBlockAndWait(const bool& syncComm) { bool haveNoWork = this->Active.empty() && this->Inactive.empty(); @@ -367,9 +438,11 @@ public: //2. numLocalTerm + this->TotalNumberOfTerminatedParticles == this->TotalNumberOfParticles //So, if neither are true, we can safely block and wait for communication to come in. - if (haveNoWork && - (numLocalTerm + this->TotalNumTerminatedParticles < this->TotalNumParticles)) - return true; + // if (this->Terminator.State == AdvectAlgorithmTerminator::AdvectAlgorithmTerminatorState::STATE_2) + // return true; + + // if (haveNoWork && (numLocalTerm + this->TotalNumTerminatedParticles < this->TotalNumParticles)) + // return true; return false; } @@ -388,9 +461,8 @@ public: std::unordered_map> ParticleBlockIDsMap; vtkm::Id Rank; vtkm::FloatDefault StepSize; - vtkm::Id TotalNumParticles = 0; - vtkm::Id TotalNumTerminatedParticles = 0; bool UseAsynchronousCommunication = true; + AdvectAlgorithmTerminator Terminator; }; } diff --git a/vtkm/filter/flow/internal/AdvectAlgorithmThreaded.h b/vtkm/filter/flow/internal/AdvectAlgorithmThreaded.h index db4e651fb..015673e65 100644 --- a/vtkm/filter/flow/internal/AdvectAlgorithmThreaded.h +++ b/vtkm/filter/flow/internal/AdvectAlgorithmThreaded.h @@ -39,7 +39,6 @@ public: bool useAsyncComm) : AdvectAlgorithm(bm, blocks, useAsyncComm) , Done(false) - , WorkerActivate(false) { //For threaded algorithm, the particles go out of scope in the Work method. //When this happens, they are destructed by the time the Manage thread gets them. @@ -50,8 +49,6 @@ public: void Go() override { - this->ComputeTotalNumParticles(); - std::vector workerThreads; workerThreads.emplace_back(std::thread(AdvectAlgorithmThreaded::Worker, this)); this->Manage(); @@ -63,6 +60,13 @@ public: } protected: + bool HaveAnyWork() + { + std::lock_guard lock(this->Mutex); + //We have work if there particles in any queues or a worker is busy. + return !this->Active.empty() || !this->Inactive.empty() || this->WorkerActivate; + } + bool GetActiveParticles(std::vector& particles, vtkm::Id& blockId) override { std::lock_guard lock(this->Mutex); @@ -144,38 +148,31 @@ protected: vtkm::filter::flow::internal::ParticleMessenger messenger( this->Comm, useAsync, this->BoundsMap, 1, 128); - while (this->TotalNumTerminatedParticles < this->TotalNumParticles) + while (!this->Terminator.Done()) { std::unordered_map>> workerResults; this->GetWorkerResults(workerResults); - vtkm::Id numTerm = 0; for (auto& it : workerResults) - { for (auto& r : it.second) - numTerm += this->UpdateResult(r); - } + this->UpdateResult(r); - vtkm::Id numTermMessages = 0; - this->Communicate(messenger, numTerm, numTermMessages); - - this->TotalNumTerminatedParticles += (numTerm + numTermMessages); - if (this->TotalNumTerminatedParticles > this->TotalNumParticles) - throw vtkm::cont::ErrorFilterExecution("Particle count error"); + this->Communicate(messenger); + this->Terminator.Control(this->HaveAnyWork()); } //Let the workers know that we are done. this->SetDone(); } - bool GetBlockAndWait(const bool& syncComm, const vtkm::Id& numLocalTerm) override + bool GetBlockAndWait(const bool& syncComm) override { std::lock_guard lock(this->Mutex); if (this->Done) return true; - return (this->AdvectAlgorithm::GetBlockAndWait(syncComm, numLocalTerm) && - !this->WorkerActivate && this->WorkerResults.empty()); + return (this->AdvectAlgorithm::GetBlockAndWait(syncComm) && !this->WorkerActivate && + this->WorkerResults.empty()); } void GetWorkerResults( @@ -193,7 +190,7 @@ protected: std::atomic Done; std::mutex Mutex; - bool WorkerActivate; + bool WorkerActivate = false; std::condition_variable WorkerActivateCondition; std::unordered_map>> WorkerResults; };