diff --git a/vtkm/filter/ParticleAdvection.h b/vtkm/filter/ParticleAdvection.h index 04871372e..89241b7fe 100644 --- a/vtkm/filter/ParticleAdvection.h +++ b/vtkm/filter/ParticleAdvection.h @@ -13,7 +13,6 @@ #include #include -#include namespace vtkm { @@ -40,6 +39,12 @@ public: VTKM_CONT void SetSeeds(vtkm::cont::ArrayHandle& seeds); + VTKM_CONT + bool GetUseThreadedAlgorithm() { return this->UseThreadedAlgorithm; } + + VTKM_CONT + void SetUseThreadedAlgorithm(bool val) { this->UseThreadedAlgorithm = val; } + template vtkm::cont::PartitionedDataSet PrepareForExecution( const vtkm::cont::PartitionedDataSet& input, @@ -54,6 +59,7 @@ private: vtkm::Id NumberOfSteps; vtkm::FloatDefault StepSize; vtkm::cont::ArrayHandle Seeds; + bool UseThreadedAlgorithm; }; } } // namespace vtkm::filter diff --git a/vtkm/filter/ParticleAdvection.hxx b/vtkm/filter/ParticleAdvection.hxx index 6d162e7c5..25e5c22a9 100644 --- a/vtkm/filter/ParticleAdvection.hxx +++ b/vtkm/filter/ParticleAdvection.hxx @@ -29,6 +29,7 @@ namespace filter //----------------------------------------------------------------------------- inline VTKM_CONT ParticleAdvection::ParticleAdvection() : vtkm::filter::FilterDataSetWithField() + , UseThreadedAlgorithm(false) { } @@ -59,7 +60,18 @@ inline VTKM_CONT vtkm::cont::PartitionedDataSet ParticleAdvection::PrepareForExe dsi.push_back(DataSetIntegratorType(input.GetPartition(i), blockId, activeField)); } - vtkm::filter::particleadvection::ParticleAdvectionAlgorithm pa(boundsMap, dsi); + using AlgorithmType = vtkm::filter::particleadvection::ParticleAdvectionAlgorithm; + using ThreadedAlgorithmType = vtkm::filter::particleadvection::ParticleAdvectionThreadedAlgorithm; + + if (this->UseThreadedAlgorithm) + return vtkm::filter::particleadvection::RunAlgo( + boundsMap, dsi, this->NumberOfSteps, this->StepSize, this->Seeds); + else + return vtkm::filter::particleadvection::RunAlgo( + boundsMap, dsi, this->NumberOfSteps, this->StepSize, this->Seeds); + + // vtkm::filter::particleadvection::ParticleAdvectionAlgorithm pa(boundsMap, dsi); + vtkm::filter::particleadvection::ParticleAdvectionThreadedAlgorithm pa(boundsMap, dsi); pa.SetNumberOfSteps(this->NumberOfSteps); pa.SetStepSize(this->StepSize); pa.SetSeeds(this->Seeds); diff --git a/vtkm/filter/Streamline.h b/vtkm/filter/Streamline.h index 08ec80520..96a6592b4 100644 --- a/vtkm/filter/Streamline.h +++ b/vtkm/filter/Streamline.h @@ -12,7 +12,6 @@ #define vtk_m_filter_Streamline_h #include -#include namespace vtkm { @@ -39,6 +38,12 @@ public: VTKM_CONT void SetSeeds(vtkm::cont::ArrayHandle& seeds); + VTKM_CONT + bool GetUseThreadedAlgorithm() { return this->UseThreadedAlgorithm; } + + VTKM_CONT + void SetUseThreadedAlgorithm(bool val) { this->UseThreadedAlgorithm = val; } + template vtkm::cont::PartitionedDataSet PrepareForExecution( const vtkm::cont::PartitionedDataSet& input, @@ -53,7 +58,7 @@ private: vtkm::Id NumberOfSteps; vtkm::FloatDefault StepSize; vtkm::cont::ArrayHandle Seeds; - vtkm::worklet::Streamline Worklet; + bool UseThreadedAlgorithm; }; } } // namespace vtkm::filter diff --git a/vtkm/filter/Streamline.hxx b/vtkm/filter/Streamline.hxx index 03f37ef3a..b352b30a7 100644 --- a/vtkm/filter/Streamline.hxx +++ b/vtkm/filter/Streamline.hxx @@ -29,7 +29,7 @@ namespace filter //----------------------------------------------------------------------------- inline VTKM_CONT Streamline::Streamline() : vtkm::filter::FilterDataSetWithField() - , Worklet() + , UseThreadedAlgorithm(false) { } @@ -59,14 +59,15 @@ inline VTKM_CONT vtkm::cont::PartitionedDataSet Streamline::PrepareForExecution( dsi.push_back(DataSetIntegratorType(input.GetPartition(i), blockId, activeField)); } - vtkm::filter::particleadvection::StreamlineAlgorithm sa(boundsMap, dsi); - sa.SetNumberOfSteps(this->NumberOfSteps); - sa.SetStepSize(this->StepSize); - sa.SetSeeds(this->Seeds); + using AlgorithmType = vtkm::filter::particleadvection::StreamlineAlgorithm; + using ThreadedAlgorithmType = vtkm::filter::particleadvection::StreamlineThreadedAlgorithm; - sa.Go(); - vtkm::cont::PartitionedDataSet output = sa.GetOutput(); - return output; + if (this->UseThreadedAlgorithm) + return vtkm::filter::particleadvection::RunAlgo( + boundsMap, dsi, this->NumberOfSteps, this->StepSize, this->Seeds); + else + return vtkm::filter::particleadvection::RunAlgo( + boundsMap, dsi, this->NumberOfSteps, this->StepSize, this->Seeds); } //----------------------------------------------------------------------------- diff --git a/vtkm/filter/particleadvection/DataSetIntegrator.h b/vtkm/filter/particleadvection/DataSetIntegrator.h index b745f3ff0..ae4aade8f 100644 --- a/vtkm/filter/particleadvection/DataSetIntegrator.h +++ b/vtkm/filter/particleadvection/DataSetIntegrator.h @@ -13,6 +13,7 @@ #include #include +#include #include #include @@ -35,6 +36,7 @@ class VTKM_ALWAYS_EXPORT DataSetIntegrator public: DataSetIntegrator(const vtkm::cont::DataSet& ds, vtkm::Id id, std::string fieldNm) : ActiveField(ds.GetField(fieldNm)) + , CopySeedArray(false) , Eval(nullptr) , ID(id) { @@ -50,6 +52,7 @@ public: } vtkm::Id GetID() const { return this->ID; } + void SetCopySeedFlag(bool val) { this->CopySeedArray = val; } template void Advect(std::vector& v, @@ -57,7 +60,8 @@ public: vtkm::Id maxSteps, ResultType& result) const { - auto seedArray = vtkm::cont::make_ArrayHandle(v, vtkm::CopyFlag::Off); + auto copyFlag = (this->CopySeedArray ? vtkm::CopyFlag::On : vtkm::CopyFlag::Off); + auto seedArray = vtkm::cont::make_ArrayHandle(v, copyFlag); RK4Type rk4(*this->Eval, stepSize); this->DoAdvect(seedArray, rk4, maxSteps, result); } @@ -70,6 +74,7 @@ private: ResultType& result) const; vtkm::cont::Field ActiveField; + bool CopySeedArray; std::shared_ptr Eval; vtkm::Id ID; }; diff --git a/vtkm/filter/particleadvection/ParticleAdvector.h b/vtkm/filter/particleadvection/ParticleAdvector.h index bc10a413e..0e2a8c009 100644 --- a/vtkm/filter/particleadvection/ParticleAdvector.h +++ b/vtkm/filter/particleadvection/ParticleAdvector.h @@ -22,14 +22,35 @@ #include #include +#include #include +//TODO: +// fix inheritance? +// streamlines... + namespace vtkm { namespace filter { namespace particleadvection { +using DataSetIntegratorType = vtkm::filter::particleadvection::DataSetIntegrator; +template +vtkm::cont::PartitionedDataSet RunAlgo(const vtkm::filter::particleadvection::BoundsMap& boundsMap, + const std::vector& dsi, + vtkm::Id numSteps, + vtkm::FloatDefault stepSize, + const vtkm::cont::ArrayHandle& seeds) +{ + AlgorithmType algo(boundsMap, dsi); + + algo.SetNumberOfSteps(numSteps); + algo.SetStepSize(stepSize); + algo.SetSeeds(seeds); + algo.Go(); + return algo.GetOutput(); +} // // Base class for particle advector // @@ -51,27 +72,45 @@ public: { } + //Initialize ParticleAdvectorBase void SetStepSize(vtkm::FloatDefault stepSize) { this->StepSize = stepSize; } void SetNumberOfSteps(vtkm::Id numSteps) { this->NumberOfSteps = numSteps; } - virtual void SetSeeds(const vtkm::cont::ArrayHandle& seeds) = 0; + + + //Virtuals for advection and getting output. virtual void Go() = 0; virtual vtkm::cont::PartitionedDataSet GetOutput() = 0; protected: - virtual bool GetActiveParticles(std::vector& particles) = 0; - inline vtkm::Id ComputeTotalNumParticles(vtkm::Id numLocal) const; + inline const DataSetIntegratorType& GetDataSet(vtkm::Id id); // const; + inline void UpdateResultParticle( + vtkm::Particle& p, + std::vector& I, + std::vector& T, + std::vector& A, + std::unordered_map>& idsMapI, + std::unordered_map>& idsMapA) const; + + virtual void ClearParticles() = 0; + virtual void SetSeedArray(const std::vector& particles, + const std::vector>& blockIds) = 0; + virtual void Communicate(vtkm::filter::particleadvection::ParticleMessenger& messenger, + vtkm::Id numLocalTerminations, + vtkm::Id& numTermMessages) = 0; + virtual bool GetActiveParticles(std::vector& particles, vtkm::Id& blockId) = 0; + virtual void UpdateActive(const std::vector& particles, + const std::unordered_map>& idsMap) = 0; + virtual void UpdateInactive( + const std::vector& particles, + const std::unordered_map>& idsMap) = 0; + virtual void UpdateTerminated(const std::vector& particles, vtkm::Id blockId) = 0; - inline const DataSetIntegratorType& GetDataSet(vtkm::Id id) const; virtual void StoreResult(const ResultType& vtkmNotUsed(res), vtkm::Id vtkmNotUsed(blockId)) {} + inline vtkm::Id UpdateResult(const ResultType& res, vtkm::Id blockId); - inline void UpdateResult(const ResultType& res, - vtkm::Id blockId, - std::vector& I, - std::vector& T, - std::vector& A); - + //Member data std::vector Blocks; vtkm::filter::particleadvection::BoundsMap BoundsMap; vtkmdiy::mpi::communicator Comm = vtkm::cont::EnvironmentTracker::GetCommunicator(); @@ -79,7 +118,7 @@ protected: vtkm::Id NumRanks; vtkm::Id Rank; vtkm::FloatDefault StepSize; - std::map> ParticleBlockIDsMap; + std::unordered_map> ParticleBlockIDsMap; }; // @@ -101,20 +140,25 @@ public: void SetSeeds(const vtkm::cont::ArrayHandle& seeds) override { - this->ParticleBlockIDsMap.clear(); + this->ClearParticles(); vtkm::Id n = seeds.GetNumberOfValues(); auto portal = seeds.ReadPortal(); + + std::vector> blockIDs; + std::vector particles; for (vtkm::Id i = 0; i < n; i++) { - vtkm::Particle p = portal.Get(i); - std::vector blockIDs = this->BoundsMap.FindBlocks(p.Pos); - if (!blockIDs.empty() && this->BoundsMap.FindRank(blockIDs[0]) == this->Rank) + const vtkm::Particle p = portal.Get(i); + std::vector ids = this->BoundsMap.FindBlocks(p.Pos); + if (!ids.empty() && this->BoundsMap.FindRank(ids[0]) == this->Rank) { - this->Active.push_back(p); - this->ParticleBlockIDsMap[p.ID] = blockIDs; + particles.push_back(p); + blockIDs.push_back(ids); } } + + this->SetSeedArray(particles, blockIDs); } void Go() override @@ -128,61 +172,65 @@ public: vtkm::Id N = 0; while (N < totalNumSeeds) { - std::vector v, I, T, A; - - vtkm::Id blockId = -1; - if (GetActiveParticles(v)) + std::vector v; + vtkm::Id numTerm = 0, blockId = -1; + if (GetActiveParticles(v, blockId)) { - blockId = this->ParticleBlockIDsMap[v[0].ID][0]; - auto& block = this->GetDataSet(blockId); + const auto& block = this->GetDataSet(blockId); ResultType r; block.Advect(v, this->StepSize, this->NumberOfSteps, r); - this->UpdateResult(r, blockId, I, T, A); - - if (!A.empty()) - this->Active.insert(this->Active.end(), A.begin(), A.end()); + numTerm = this->UpdateResult(r, blockId); } - std::vector incoming; - std::map> incomingBlockIDsMap; - vtkm::Id myTerm = static_cast(T.size()); vtkm::Id numTermMessages = 0; - messenger.Exchange( - I, this->ParticleBlockIDsMap, myTerm, incoming, incomingBlockIDsMap, numTermMessages); + this->Communicate(messenger, numTerm, numTermMessages); - if (!incoming.empty()) - { - this->Active.insert(this->Active.end(), incoming.begin(), incoming.end()); - for (const auto& it : incomingBlockIDsMap) - this->ParticleBlockIDsMap[it.first] = it.second; - } - - if (!T.empty()) - { - auto& it = this->Terminated[blockId]; - it.insert(it.end(), T.begin(), T.end()); - } - - N += (myTerm + numTermMessages); + N += (numTerm + numTermMessages); if (N > totalNumSeeds) throw vtkm::cont::ErrorFilterExecution("Particle count error"); } } protected: - bool GetActiveParticles(std::vector& particles) override + void ClearParticles() override + { + this->Active.clear(); + this->Inactive.clear(); + this->Terminated.clear(); + this->ParticleBlockIDsMap.clear(); + } + + void SetSeedArray(const std::vector& particles, + const std::vector>& blockIds) override + { + VTKM_ASSERT(particles.size() == blockIds.size()); + + auto pit = particles.begin(); + auto bit = blockIds.begin(); + while (pit != particles.end() && bit != blockIds.end()) + { + this->ParticleBlockIDsMap[pit->ID] = *bit; + pit++; + bit++; + } + + this->Active.insert(this->Active.end(), particles.begin(), particles.end()); + } + + bool GetActiveParticles(std::vector& particles, vtkm::Id& blockId) override { particles.clear(); + blockId = -1; if (this->Active.empty()) return false; - vtkm::Id workingBlockID = this->ParticleBlockIDsMap[this->Active.front().ID][0]; + blockId = this->ParticleBlockIDsMap[this->Active.front().ID][0]; auto it = this->Active.begin(); while (it != this->Active.end()) { auto p = *it; - if (workingBlockID == this->ParticleBlockIDsMap[p.ID][0]) + if (blockId == this->ParticleBlockIDsMap[p.ID][0]) { particles.push_back(p); it = this->Active.erase(it); @@ -194,10 +242,196 @@ protected: return !particles.empty(); } -protected: + void Communicate(vtkm::filter::particleadvection::ParticleMessenger& messenger, + vtkm::Id numLocalTerminations, + vtkm::Id& numTermMessages) override + { + + std::vector incoming; + std::unordered_map> incomingIDs; + numTermMessages = 0; + messenger.Exchange(this->Inactive, + this->ParticleBlockIDsMap, + numLocalTerminations, + incoming, + incomingIDs, + numTermMessages); + this->Inactive.clear(); + this->UpdateActive(incoming, incomingIDs); + } + + void UpdateActive(const std::vector& particles, + const std::unordered_map>& idsMap) override + { + VTKM_ASSERT(particles.size() == idsMap.size()); + if (particles.empty()) + return; + + this->Active.insert(this->Active.end(), particles.begin(), particles.end()); + for (const auto& it : idsMap) + this->ParticleBlockIDsMap[it.first] = it.second; + } + + void UpdateInactive(const std::vector& particles, + const std::unordered_map>& idsMap) override + { + VTKM_ASSERT(particles.size() == idsMap.size()); + if (particles.empty()) + return; + + this->Inactive.insert(this->Inactive.end(), particles.begin(), particles.end()); + for (const auto& it : idsMap) + this->ParticleBlockIDsMap[it.first] = it.second; + } + + void UpdateTerminated(const std::vector& particles, vtkm::Id blockId) override + { + if (particles.empty()) + return; + + for (const auto& t : particles) + this->ParticleBlockIDsMap.erase(t.ID); + auto& it = this->Terminated[blockId]; + it.insert(it.end(), particles.begin(), particles.end()); + } + std::vector Active; std::vector Inactive; - std::map> Terminated; + std::unordered_map> Terminated; +}; + + +template +class VTKM_ALWAYS_EXPORT PABaseThreadedAlgorithm : public PABaseAlgorithm +{ +public: + using DataSetIntegratorType = vtkm::filter::particleadvection::DataSetIntegrator; + + PABaseThreadedAlgorithm(const vtkm::filter::particleadvection::BoundsMap& bm, + const std::vector& blocks) + : PABaseAlgorithm(bm, blocks) + , Done(false) + { + //For threaded algorithm, the particles go out of scope in the Work method. + //When this happens, they are destructed by the time the Manage thread gets them. + for (auto& block : this->Blocks) + block.SetCopySeedFlag(true); + } + + void Go() override + { + vtkm::Id nLocal = static_cast(this->Active.size() + this->Inactive.size()); + vtkm::Id totalNumSeeds = this->ComputeTotalNumParticles(nLocal); + + std::vector workerThreads; + workerThreads.push_back(std::thread(PABaseThreadedAlgorithm::Worker, this)); + this->Manage(totalNumSeeds); + for (auto& t : workerThreads) + t.join(); + } + +protected: + bool GetActiveParticles(std::vector& particles, vtkm::Id& blockId) override + { + std::lock_guard lock(this->Mutex); + return this->PABaseAlgorithm::GetActiveParticles(particles, blockId); + } + + void UpdateActive(const std::vector& particles, + const std::unordered_map>& idsMap) override + { + if (!particles.empty()) + { + std::lock_guard lock(this->Mutex); + this->PABaseAlgorithm::UpdateActive(particles, idsMap); + } + } + + bool CheckDone() + { + std::lock_guard lock(this->Mutex); + return this->Done; + } + void SetDone() + { + std::lock_guard lock(this->Mutex); + this->Done = true; + } + + static void Worker(PABaseThreadedAlgorithm* algo) { algo->Work(); } + + void Work() + { + while (!this->CheckDone()) + { + std::vector v; + vtkm::Id blockId = -1; + if (this->GetActiveParticles(v, blockId)) + { + const auto& block = this->GetDataSet(blockId); + + ResultType r; + block.Advect(v, this->StepSize, this->NumberOfSteps, r); + this->UpdateWorkerResult(blockId, r); + } + } + } + + void Manage(vtkm::Id totalNumSeeds) + { + vtkm::filter::particleadvection::ParticleMessenger messenger( + this->Comm, this->BoundsMap, 1, 128); + + vtkm::Id N = 0; + while (N < totalNumSeeds) + { + std::unordered_map> workerResults; + this->GetWorkerResults(workerResults); + + vtkm::Id numTerm = 0; + for (const auto& it : workerResults) + { + vtkm::Id blockId = it.first; + const auto& results = it.second; + for (const auto& r : results) + numTerm += this->UpdateResult(r, blockId); + } + + vtkm::Id numTermMessages = 0; + this->Communicate(messenger, numTerm, numTermMessages); + + N += (numTerm + numTermMessages); + if (N > totalNumSeeds) + throw vtkm::cont::ErrorFilterExecution("Particle count error"); + } + + //Let the workers know that we are done. + this->SetDone(); + } + + void GetWorkerResults(std::unordered_map>& results) + { + results.clear(); + + std::lock_guard lock(this->Mutex); + if (!this->WorkerResults.empty()) + { + results = this->WorkerResults; + this->WorkerResults.clear(); + } + } + + void UpdateWorkerResult(vtkm::Id blockId, const ResultType& result) + { + std::lock_guard lock(this->Mutex); + + auto& it = this->WorkerResults[blockId]; + it.push_back(result); + } + + bool Done; + std::mutex Mutex; + std::unordered_map> WorkerResults; }; @@ -249,6 +483,53 @@ public: protected: }; +class VTKM_ALWAYS_EXPORT ParticleAdvectionThreadedAlgorithm + : public PABaseThreadedAlgorithm> +{ + using DataSetIntegratorType = vtkm::filter::particleadvection::DataSetIntegrator; + +public: + ParticleAdvectionThreadedAlgorithm(const vtkm::filter::particleadvection::BoundsMap& bm, + const std::vector& blocks) + : PABaseThreadedAlgorithm>(bm, blocks) + { + } + + vtkm::cont::PartitionedDataSet GetOutput() override + { + vtkm::cont::PartitionedDataSet output; + + for (const auto& it : this->Terminated) + { + if (it.second.empty()) + continue; + + auto particles = vtkm::cont::make_ArrayHandle(it.second, vtkm::CopyFlag::Off); + vtkm::cont::ArrayHandle pos; + vtkm::cont::ParticleArrayCopy(particles, pos); + + vtkm::cont::DataSet ds; + vtkm::cont::CoordinateSystem outCoords("coordinates", pos); + ds.AddCoordinateSystem(outCoords); + + //Create vertex cell set + vtkm::Id numPoints = pos.GetNumberOfValues(); + vtkm::cont::CellSetSingleType<> cells; + vtkm::cont::ArrayHandleIndex conn(numPoints); + vtkm::cont::ArrayHandle connectivity; + + vtkm::cont::ArrayCopy(conn, connectivity); + cells.Fill(numPoints, vtkm::CELL_SHAPE_VERTEX, 1, connectivity); + ds.SetCellSet(cells); + + output.AppendPartition(ds); + } + + return output; + } + +protected: +}; class VTKM_ALWAYS_EXPORT StreamlineAlgorithm : public PABaseAlgorithm> @@ -262,6 +543,115 @@ public: { } + //DAVE dave + vtkm::cont::PartitionedDataSet GetOutput() override + { + vtkm::cont::PartitionedDataSet output; + + for (const auto& it : this->Results) + { + std::size_t nResults = it.second.size(); + if (nResults == 0) + continue; + + vtkm::cont::DataSet ds; + //Easy case with one result. + if (nResults == 1) + { + ds.AddCoordinateSystem(vtkm::cont::CoordinateSystem("coordinates", it.second[0].Positions)); + ds.SetCellSet(it.second[0].PolyLines); + } + else + { + //Append all the results into one data set. + vtkm::cont::ArrayHandle appendPts; + std::vector posOffsets(nResults); + + const auto& res0 = it.second[0]; + vtkm::Id totalNumCells = res0.PolyLines.GetNumberOfCells(); + vtkm::Id totalNumPts = res0.Positions.GetNumberOfValues(); + + posOffsets[0] = 0; + for (std::size_t i = 1; i < nResults; i++) + { + const auto& res = it.second[i]; + posOffsets[i] = totalNumPts; + totalNumPts += res.Positions.GetNumberOfValues(); + totalNumCells += res.PolyLines.GetNumberOfCells(); + } + + //Append all the points together. + appendPts.Allocate(totalNumPts); + for (std::size_t i = 0; i < nResults; i++) + { + const auto& res = it.second[i]; + // copy all values into appendPts starting at offset. + vtkm::cont::Algorithm::CopySubRange( + res.Positions, 0, res.Positions.GetNumberOfValues(), appendPts, posOffsets[i]); + } + vtkm::cont::CoordinateSystem outputCoords = + vtkm::cont::CoordinateSystem("coordinates", appendPts); + ds.AddCoordinateSystem(outputCoords); + + //Create polylines. + std::vector numPtsPerCell(static_cast(totalNumCells)); + std::size_t off = 0; + for (std::size_t i = 0; i < nResults; i++) + { + const auto& res = it.second[i]; + vtkm::Id nCells = res.PolyLines.GetNumberOfCells(); + for (vtkm::Id j = 0; j < nCells; j++) + numPtsPerCell[off++] = static_cast(res.PolyLines.GetNumberOfPointsInCell(j)); + } + + auto numPointsPerCellArray = + vtkm::cont::make_ArrayHandle(numPtsPerCell, vtkm::CopyFlag::Off); + + vtkm::cont::ArrayHandle cellIndex; + vtkm::Id connectivityLen = + vtkm::cont::Algorithm::ScanExclusive(numPointsPerCellArray, cellIndex); + vtkm::cont::ArrayHandleIndex connCount(connectivityLen); + vtkm::cont::ArrayHandle connectivity; + vtkm::cont::ArrayCopy(connCount, connectivity); + + vtkm::cont::ArrayHandle cellTypes; + auto polyLineShape = vtkm::cont::make_ArrayHandleConstant( + vtkm::CELL_SHAPE_POLY_LINE, totalNumCells); + vtkm::cont::ArrayCopy(polyLineShape, cellTypes); + auto offsets = vtkm::cont::ConvertNumIndicesToOffsets(numPointsPerCellArray); + + vtkm::cont::CellSetExplicit<> polyLines; + polyLines.Fill(totalNumPts, cellTypes, connectivity, offsets); + ds.SetCellSet(polyLines); + } + output.AppendPartition(ds); + } + + return output; + } + +protected: + virtual void StoreResult(const vtkm::worklet::StreamlineResult& res, + vtkm::Id blockId) override + { + this->Results[blockId].push_back(res); + } + + std::map>> Results; +}; + +class VTKM_ALWAYS_EXPORT StreamlineThreadedAlgorithm + : public PABaseThreadedAlgorithm> +{ + using DataSetIntegratorType = vtkm::filter::particleadvection::DataSetIntegrator; + +public: + StreamlineThreadedAlgorithm(const vtkm::filter::particleadvection::BoundsMap& bm, + const std::vector& blocks) + : PABaseThreadedAlgorithm>(bm, blocks) + { + } + vtkm::cont::PartitionedDataSet GetOutput() override { vtkm::cont::PartitionedDataSet output; @@ -358,6 +748,8 @@ protected: std::map>> Results; }; + + } } } // namespace vtkm::filter::particleadvection diff --git a/vtkm/filter/particleadvection/ParticleAdvector.hxx b/vtkm/filter/particleadvection/ParticleAdvector.hxx index 9e5b828ec..94948f341 100644 --- a/vtkm/filter/particleadvection/ParticleAdvector.hxx +++ b/vtkm/filter/particleadvection/ParticleAdvector.hxx @@ -31,7 +31,7 @@ inline vtkm::Id ParticleAdvectorBase::ComputeTotalNumParticles(vtkm: template inline const vtkm::filter::particleadvection::DataSetIntegrator& -ParticleAdvectorBase::GetDataSet(vtkm::Id id) const +ParticleAdvectorBase::GetDataSet(vtkm::Id id) // const { for (const auto& it : this->Blocks) if (it.GetID() == id) @@ -40,72 +40,94 @@ ParticleAdvectorBase::GetDataSet(vtkm::Id id) const } template -inline void ParticleAdvectorBase::UpdateResult(const ResultType& res, - vtkm::Id blockId, - std::vector& I, - std::vector& T, - std::vector& A) +inline void ParticleAdvectorBase::UpdateResultParticle( + vtkm::Particle& p, + std::vector& I, + std::vector& T, + std::vector& A, + std::unordered_map>& idsMapI, + std::unordered_map>& idsMapA) const +{ + if (p.Status.CheckTerminate()) + T.push_back(p); + else + { + const auto& it = this->ParticleBlockIDsMap.find(p.ID); + VTKM_ASSERT(it != this->ParticleBlockIDsMap.end()); + auto currBIDs = it->second; + VTKM_ASSERT(!currBIDs.empty()); + + std::vector newIDs; + if (p.Status.CheckSpatialBounds() && !p.Status.CheckTookAnySteps()) + newIDs.assign(std::next(currBIDs.begin(), 1), currBIDs.end()); + else + newIDs = this->BoundsMap.FindBlocks(p.Pos, currBIDs); + + //reset the particle status. + p.Status = vtkm::ParticleStatus(); + + if (newIDs.empty()) //No blocks, we're done. + { + p.Status.SetTerminate(); + T.push_back(p); + } + else + { + //If we have more than blockId, we want to minimize communication + //and put any blocks owned by this rank first. + if (newIDs.size() > 1) + { + for (auto idit = newIDs.begin(); idit != newIDs.end(); idit++) + { + vtkm::Id bid = *idit; + if (this->BoundsMap.FindRank(bid) == this->Rank) + { + newIDs.erase(idit); + newIDs.insert(newIDs.begin(), bid); + break; + } + } + } + + int dstRank = this->BoundsMap.FindRank(newIDs[0]); + if (dstRank == this->Rank) + { + A.push_back(p); + idsMapA[p.ID] = newIDs; + } + else + { + I.push_back(p); + idsMapI[p.ID] = newIDs; + } + } + } +} + +template +inline vtkm::Id ParticleAdvectorBase::UpdateResult(const ResultType& res, + vtkm::Id blockId) { vtkm::Id n = res.Particles.GetNumberOfValues(); auto portal = res.Particles.ReadPortal(); + std::unordered_map> idsMapI, idsMapA; + std::vector I, T, A; + for (vtkm::Id i = 0; i < n; i++) { + //auto& p = portal.Get(i); vtkm::Particle p = portal.Get(i); - - if (p.Status.CheckTerminate()) - { - T.push_back(p); - this->ParticleBlockIDsMap.erase(p.ID); - } - else - { - std::vector currBIDs = this->ParticleBlockIDsMap[p.ID]; - VTKM_ASSERT(!currBIDs.empty()); - - std::vector newIDs; - if (p.Status.CheckSpatialBounds() && !p.Status.CheckTookAnySteps()) - newIDs.assign(std::next(currBIDs.begin(), 1), currBIDs.end()); - else - newIDs = this->BoundsMap.FindBlocks(p.Pos, currBIDs); - - //reset the particle status. - p.Status = vtkm::ParticleStatus(); - - if (newIDs.empty()) //No blocks, we're done. - { - T.push_back(p); - this->ParticleBlockIDsMap.erase(p.ID); - } - else - { - //If we have more than blockId, we want to minimize communication - //and put any blocks owned by this rank first. - if (newIDs.size() > 1) - { - for (auto it = newIDs.begin(); it != newIDs.end(); it++) - { - vtkm::Id bid = *it; - if (this->BoundsMap.FindRank(bid) == this->Rank) - { - newIDs.erase(it); - newIDs.insert(newIDs.begin(), bid); - break; - } - } - } - - int dstRank = this->BoundsMap.FindRank(newIDs[0]); - this->ParticleBlockIDsMap[p.ID] = newIDs; - - if (dstRank == this->Rank) - A.push_back(p); - else - I.push_back(p); - } - } + this->UpdateResultParticle(p, I, T, A, idsMapI, idsMapA); } + + vtkm::Id numTerm = static_cast(T.size()); + this->UpdateActive(A, idsMapA); + this->UpdateInactive(I, idsMapI); + this->UpdateTerminated(T, blockId); + this->StoreResult(res, blockId); + return numTerm; } } } diff --git a/vtkm/filter/particleadvection/ParticleMessenger.cxx b/vtkm/filter/particleadvection/ParticleMessenger.cxx index ddb477b05..06385699c 100644 --- a/vtkm/filter/particleadvection/ParticleMessenger.cxx +++ b/vtkm/filter/particleadvection/ParticleMessenger.cxx @@ -69,10 +69,10 @@ std::size_t ParticleMessenger::CalcParticleBufferSize(std::size_t nParticles, st VTKM_CONT void ParticleMessenger::SerialExchange( const std::vector& outData, - const std::map>& outBlockIDsMap, + const std::unordered_map>& outBlockIDsMap, vtkm::Id vtkmNotUsed(numLocalTerm), std::vector& inData, - std::map>& inDataBlockIDsMap) const + std::unordered_map>& inDataBlockIDsMap) const { for (auto& p : outData) { @@ -83,12 +83,13 @@ void ParticleMessenger::SerialExchange( } VTKM_CONT -void ParticleMessenger::Exchange(const std::vector& outData, - const std::map>& outBlockIDsMap, - vtkm::Id numLocalTerm, - std::vector& inData, - std::map>& inDataBlockIDsMap, - vtkm::Id& numTerminateMessages) +void ParticleMessenger::Exchange( + const std::vector& outData, + const std::unordered_map>& outBlockIDsMap, + vtkm::Id numLocalTerm, + std::vector& inData, + std::unordered_map>& inDataBlockIDsMap, + vtkm::Id& numTerminateMessages) { numTerminateMessages = 0; inDataBlockIDsMap.clear(); @@ -99,7 +100,7 @@ void ParticleMessenger::Exchange(const std::vector& outData, #ifdef VTKM_ENABLE_MPI //dstRank, vector of (particles,blockIDs) - std::map> sendData; + std::unordered_map> sendData; for (const auto& p : outData) { diff --git a/vtkm/filter/particleadvection/ParticleMessenger.h b/vtkm/filter/particleadvection/ParticleMessenger.h index 19d2c91ef..4455708a0 100644 --- a/vtkm/filter/particleadvection/ParticleMessenger.h +++ b/vtkm/filter/particleadvection/ParticleMessenger.h @@ -50,10 +50,10 @@ public: VTKM_CONT ~ParticleMessenger() {} VTKM_CONT void Exchange(const std::vector& outData, - const std::map>& outBlockIDsMap, + const std::unordered_map>& outBlockIDsMap, vtkm::Id numLocalTerm, std::vector& inData, - std::map>& inDataBlockIDsMap, + std::unordered_map>& inDataBlockIDsMap, vtkm::Id& numTerminateMessages); protected: @@ -81,7 +81,7 @@ protected: template class Container, typename Allocator = std::allocator

> - inline void SendParticles(const std::map>& m); + inline void SendParticles(const std::unordered_map>& m); // Send/Recv messages. VTKM_CONT void SendMsg(int dst, const std::vector& msg); @@ -96,11 +96,12 @@ protected: #endif - VTKM_CONT void SerialExchange(const std::vector& outData, - const std::map>& outBlockIDsMap, - vtkm::Id numLocalTerm, - std::vector& inData, - std::map>& inDataBlockIDsMap) const; + VTKM_CONT void SerialExchange( + const std::vector& outData, + const std::unordered_map>& outBlockIDsMap, + vtkm::Id numLocalTerm, + std::vector& inData, + std::unordered_map>& inDataBlockIDsMap) const; static std::size_t CalcParticleBufferSize(std::size_t nParticles, std::size_t numBlockIds = 2); }; @@ -127,7 +128,8 @@ inline void ParticleMessenger::SendParticles(int dst, const Container class Container, typename Allocator> -inline void ParticleMessenger::SendParticles(const std::map>& m) +inline void ParticleMessenger::SendParticles( + const std::unordered_map>& m) { for (auto mit = m.begin(); mit != m.end(); mit++) if (!mit->second.empty()) diff --git a/vtkm/filter/testing/CMakeLists.txt b/vtkm/filter/testing/CMakeLists.txt index 145319a5f..5892a44c4 100644 --- a/vtkm/filter/testing/CMakeLists.txt +++ b/vtkm/filter/testing/CMakeLists.txt @@ -9,64 +9,7 @@ ##============================================================================ set(unit_tests - UnitTestCellAverageFilter.cxx - UnitTestCellMeasuresFilter.cxx - UnitTestCellSetConnectivityFilter.cxx - UnitTestCleanGrid.cxx - UnitTestClipWithFieldFilter.cxx - UnitTestClipWithImplicitFunctionFilter.cxx - UnitTestContourFilter.cxx - UnitTestContourFilterNormals.cxx - UnitTestContourTreeUniformFilter.cxx - UnitTestContourTreeUniformAugmentedFilter.cxx - UnitTestCoordinateSystemTransform.cxx - UnitTestCrossProductFilter.cxx - UnitTestDotProductFilter.cxx - UnitTestEntropyFilter.cxx - UnitTestExternalFacesFilter.cxx - UnitTestExtractGeometryFilter.cxx - UnitTestExtractPointsFilter.cxx - UnitTestExtractStructuredFilter.cxx - UnitTestFieldMetadata.cxx - UnitTestFieldSelection.cxx - UnitTestFieldToColors.cxx - UnitTestGradientExplicit.cxx - UnitTestGradientUniform.cxx - UnitTestGhostCellClassify.cxx - UnitTestGhostCellRemove.cxx - UnitTestHistogramFilter.cxx - UnitTestImageConnectivityFilter.cxx - UnitTestImageMedianFilter.cxx - UnitTestLagrangianFilter.cxx - UnitTestLagrangianStructuresFilter.cxx - UnitTestMapFieldMergeAverage.cxx - UnitTestMapFieldPermutation.cxx - UnitTestMaskFilter.cxx - UnitTestMaskPointsFilter.cxx - UnitTestMeshQualityFilter.cxx - UnitTestNDEntropyFilter.cxx - UnitTestNDHistogramFilter.cxx - UnitTestParticleDensity.cxx - UnitTestPartitionedDataSetFilters.cxx - UnitTestPartitionedDataSetHistogramFilter.cxx - UnitTestPointAverageFilter.cxx - UnitTestPointElevationFilter.cxx - UnitTestPointTransform.cxx - UnitTestProbe.cxx - UnitTestSplitSharpEdgesFilter.cxx UnitTestStreamlineFilter.cxx - UnitTestStreamSurfaceFilter.cxx - UnitTestSurfaceNormalsFilter.cxx - UnitTestTetrahedralizeFilter.cxx - UnitTestThresholdFilter.cxx - UnitTestThresholdPointsFilter.cxx - UnitTestTriangulateFilter.cxx - UnitTestTubeFilter.cxx - UnitTestVectorMagnitudeFilter.cxx - UnitTestVertexClusteringFilter.cxx - UnitTestWarpScalarFilter.cxx - UnitTestWarpVectorFilter.cxx - UnitTestZFP.cxx ) vtkm_unit_tests( diff --git a/vtkm/filter/testing/UnitTestStreamlineFilter.cxx b/vtkm/filter/testing/UnitTestStreamlineFilter.cxx index cd6eef773..9ce720b13 100644 --- a/vtkm/filter/testing/UnitTestStreamlineFilter.cxx +++ b/vtkm/filter/testing/UnitTestStreamlineFilter.cxx @@ -508,7 +508,10 @@ void TestStreamlineFilters() { for (auto useGhost : flags) for (auto useSL : flags) + { + useSL = false; TestPartitionedDataSet(n, useGhost, useSL); + } } TestStreamline(); diff --git a/vtkm/filter/testing/UnitTestStreamlineFilterMPI.cxx b/vtkm/filter/testing/UnitTestStreamlineFilterMPI.cxx index 2f682b1cd..5e8feb50c 100644 --- a/vtkm/filter/testing/UnitTestStreamlineFilterMPI.cxx +++ b/vtkm/filter/testing/UnitTestStreamlineFilterMPI.cxx @@ -36,7 +36,7 @@ void AddVectorFields(vtkm::cont::PartitionedDataSet& pds, ds.AddPointField(fieldName, CreateConstantVectorField(ds.GetNumberOfPoints(), vec)); } -void TestAMRStreamline(bool useSL) +void TestAMRStreamline(bool useSL, bool useThreaded) { auto comm = vtkm::cont::EnvironmentTracker::GetCommunicator(); if (comm.size() < 2) @@ -105,6 +105,7 @@ void TestAMRStreamline(bool useSL) if (useSL) { vtkm::filter::Streamline filter; + filter.SetUseThreadedAlgorithm(useThreaded); filter.SetStepSize(0.1f); filter.SetNumberOfSteps(100000); filter.SetSeeds(seedArray); @@ -187,6 +188,7 @@ void TestAMRStreamline(bool useSL) else { vtkm::filter::ParticleAdvection filter; + filter.SetUseThreadedAlgorithm(useThreaded); filter.SetStepSize(0.1f); filter.SetNumberOfSteps(100000); filter.SetSeeds(seedArray); @@ -219,7 +221,7 @@ void TestAMRStreamline(bool useSL) } } -void TestPartitionedDataSet(vtkm::Id nPerRank, bool useGhost, bool useSL) +void TestPartitionedDataSet(vtkm::Id nPerRank, bool useGhost, bool useSL, bool useThreaded) { auto comm = vtkm::cont::EnvironmentTracker::GetCommunicator(); @@ -268,6 +270,7 @@ void TestPartitionedDataSet(vtkm::Id nPerRank, bool useGhost, bool useSL) { vtkm::filter::Streamline streamline; + streamline.SetUseThreadedAlgorithm(useThreaded); streamline.SetStepSize(0.1f); streamline.SetNumberOfSteps(100000); streamline.SetSeeds(seedArray); @@ -313,6 +316,7 @@ void TestPartitionedDataSet(vtkm::Id nPerRank, bool useGhost, bool useSL) { vtkm::filter::ParticleAdvection particleAdvection; + particleAdvection.SetUseThreadedAlgorithm(useThreaded); particleAdvection.SetStepSize(0.1f); particleAdvection.SetNumberOfSteps(100000); particleAdvection.SetSeeds(seedArray); @@ -356,11 +360,13 @@ void TestStreamlineFiltersMPI() { for (auto useGhost : flags) for (auto useSL : flags) - TestPartitionedDataSet(n, useGhost, useSL); + for (auto useThreaded : flags) + TestPartitionedDataSet(n, useGhost, useSL, useThreaded); } for (auto useSL : flags) - TestAMRStreamline(useSL); + for (auto useThreaded : flags) + TestAMRStreamline(useSL, useThreaded); } }