Merge topic 'threaded_particleAdvection'

8f2ec9e46 add better comment.
967fb9fad Remove unneeded cast in worklet, clarify the code in threaded algo.
5297c36e2 fix compile error.
5992f9f69 Restore the cmake after the clip issue was fixed.
9cdcddb0c Merge branch 'master' of https://gitlab.kitware.com/vtk/vtk-m into threaded_particleAdvection
eb7d791f2 Rename the worker-sync variables.
b0f97f480 Add condition_variable to sleep the worker.
675f2d50f Add a debug mode assert to verify buffer size.
...

Acked-by: Kitware Robot <kwrobot@kitware.com>
Acked-by: Kenneth Moreland <kmorel@sandia.gov>
Merge-request: !2323
This commit is contained in:
Dave Pugmire 2020-11-24 21:48:29 +00:00 committed by Kitware Robot
commit 2b6c0df780
18 changed files with 946 additions and 542 deletions

@ -13,7 +13,6 @@
#include <vtkm/Particle.h>
#include <vtkm/filter/FilterDataSetWithField.h>
#include <vtkm/worklet/ParticleAdvection.h>
namespace vtkm
{
@ -40,6 +39,12 @@ public:
VTKM_CONT
void SetSeeds(vtkm::cont::ArrayHandle<vtkm::Particle>& seeds);
VTKM_CONT
bool GetUseThreadedAlgorithm() { return this->UseThreadedAlgorithm; }
VTKM_CONT
void SetUseThreadedAlgorithm(bool val) { this->UseThreadedAlgorithm = val; }
template <typename DerivedPolicy>
vtkm::cont::PartitionedDataSet PrepareForExecution(
const vtkm::cont::PartitionedDataSet& input,
@ -54,6 +59,7 @@ private:
vtkm::Id NumberOfSteps;
vtkm::FloatDefault StepSize;
vtkm::cont::ArrayHandle<vtkm::Particle> Seeds;
bool UseThreadedAlgorithm;
};
}
} // namespace vtkm::filter

@ -19,7 +19,7 @@
#include <vtkm/cont/ParticleArrayCopy.h>
#include <vtkm/filter/particleadvection/BoundsMap.h>
#include <vtkm/filter/particleadvection/DataSetIntegrator.h>
#include <vtkm/filter/particleadvection/ParticleAdvector.h>
#include <vtkm/filter/particleadvection/ParticleAdvectionAlgorithm.h>
namespace vtkm
{
@ -29,6 +29,7 @@ namespace filter
//-----------------------------------------------------------------------------
inline VTKM_CONT ParticleAdvection::ParticleAdvection()
: vtkm::filter::FilterDataSetWithField<ParticleAdvection>()
, UseThreadedAlgorithm(false)
{
}
@ -59,14 +60,15 @@ inline VTKM_CONT vtkm::cont::PartitionedDataSet ParticleAdvection::PrepareForExe
dsi.push_back(DataSetIntegratorType(input.GetPartition(i), blockId, activeField));
}
vtkm::filter::particleadvection::ParticleAdvectionAlgorithm pa(boundsMap, dsi);
pa.SetNumberOfSteps(this->NumberOfSteps);
pa.SetStepSize(this->StepSize);
pa.SetSeeds(this->Seeds);
using AlgorithmType = vtkm::filter::particleadvection::ParticleAdvectionAlgorithm;
using ThreadedAlgorithmType = vtkm::filter::particleadvection::ParticleAdvectionThreadedAlgorithm;
pa.Go();
vtkm::cont::PartitionedDataSet output = pa.GetOutput();
return output;
if (this->UseThreadedAlgorithm)
return vtkm::filter::particleadvection::RunAlgo<ThreadedAlgorithmType>(
boundsMap, dsi, this->NumberOfSteps, this->StepSize, this->Seeds);
else
return vtkm::filter::particleadvection::RunAlgo<AlgorithmType>(
boundsMap, dsi, this->NumberOfSteps, this->StepSize, this->Seeds);
}
//-----------------------------------------------------------------------------

@ -11,8 +11,8 @@
#ifndef vtk_m_filter_Streamline_h
#define vtk_m_filter_Streamline_h
#include <vtkm/Particle.h>
#include <vtkm/filter/FilterDataSetWithField.h>
#include <vtkm/worklet/ParticleAdvection.h>
namespace vtkm
{
@ -39,6 +39,12 @@ public:
VTKM_CONT
void SetSeeds(vtkm::cont::ArrayHandle<vtkm::Particle>& seeds);
VTKM_CONT
bool GetUseThreadedAlgorithm() { return this->UseThreadedAlgorithm; }
VTKM_CONT
void SetUseThreadedAlgorithm(bool val) { this->UseThreadedAlgorithm = val; }
template <typename DerivedPolicy>
vtkm::cont::PartitionedDataSet PrepareForExecution(
const vtkm::cont::PartitionedDataSet& input,
@ -53,7 +59,7 @@ private:
vtkm::Id NumberOfSteps;
vtkm::FloatDefault StepSize;
vtkm::cont::ArrayHandle<vtkm::Particle> Seeds;
vtkm::worklet::Streamline Worklet;
bool UseThreadedAlgorithm;
};
}
} // namespace vtkm::filter

@ -16,10 +16,10 @@
#include <vtkm/cont/ArrayHandleIndex.h>
#include <vtkm/cont/ErrorFilterExecution.h>
#include <vtkm/cont/ParticleArrayCopy.h>
#include <vtkm/filter/particleadvection/BoundsMap.h>
#include <vtkm/filter/particleadvection/DataSetIntegrator.h>
#include <vtkm/filter/particleadvection/ParticleAdvector.h>
#include <vtkm/filter/particleadvection/StreamlineAlgorithm.h>
namespace vtkm
{
@ -29,7 +29,7 @@ namespace filter
//-----------------------------------------------------------------------------
inline VTKM_CONT Streamline::Streamline()
: vtkm::filter::FilterDataSetWithField<Streamline>()
, Worklet()
, UseThreadedAlgorithm(false)
{
}
@ -59,14 +59,15 @@ inline VTKM_CONT vtkm::cont::PartitionedDataSet Streamline::PrepareForExecution(
dsi.push_back(DataSetIntegratorType(input.GetPartition(i), blockId, activeField));
}
vtkm::filter::particleadvection::StreamlineAlgorithm sa(boundsMap, dsi);
sa.SetNumberOfSteps(this->NumberOfSteps);
sa.SetStepSize(this->StepSize);
sa.SetSeeds(this->Seeds);
using AlgorithmType = vtkm::filter::particleadvection::StreamlineAlgorithm;
using ThreadedAlgorithmType = vtkm::filter::particleadvection::StreamlineThreadedAlgorithm;
sa.Go();
vtkm::cont::PartitionedDataSet output = sa.GetOutput();
return output;
if (this->UseThreadedAlgorithm)
return vtkm::filter::particleadvection::RunAlgo<ThreadedAlgorithmType>(
boundsMap, dsi, this->NumberOfSteps, this->StepSize, this->Seeds);
else
return vtkm::filter::particleadvection::RunAlgo<AlgorithmType>(
boundsMap, dsi, this->NumberOfSteps, this->StepSize, this->Seeds);
}
//-----------------------------------------------------------------------------

@ -0,0 +1,379 @@
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
//
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//============================================================================
#ifndef vtk_m_filter_particleadvection_AdvectorBaseAlgorithm_h
#define vtk_m_filter_particleadvection_AdvectorBaseAlgorithm_h
#include <vtkm/cont/ErrorFilterExecution.h>
#include <vtkm/filter/particleadvection/DataSetIntegrator.h>
#include <vtkm/filter/particleadvection/ParticleMessenger.h>
#include <map>
#include <vector>
namespace vtkm
{
namespace filter
{
namespace particleadvection
{
using DataSetIntegratorType = vtkm::filter::particleadvection::DataSetIntegrator;
template <typename AlgorithmType>
vtkm::cont::PartitionedDataSet RunAlgo(const vtkm::filter::particleadvection::BoundsMap& boundsMap,
const std::vector<DataSetIntegratorType>& dsi,
vtkm::Id numSteps,
vtkm::FloatDefault stepSize,
const vtkm::cont::ArrayHandle<vtkm::Particle>& seeds)
{
AlgorithmType algo(boundsMap, dsi);
algo.SetNumberOfSteps(numSteps);
algo.SetStepSize(stepSize);
algo.SetSeeds(seeds);
algo.Go();
return algo.GetOutput();
}
//
// Base class for particle advector
//
template <typename ResultType>
class VTKM_ALWAYS_EXPORT AdvectorBaseAlgorithm
{
using DataSetIntegratorType = vtkm::filter::particleadvection::DataSetIntegrator;
public:
AdvectorBaseAlgorithm(const vtkm::filter::particleadvection::BoundsMap& bm,
const std::vector<DataSetIntegratorType>& blocks)
: Blocks(blocks)
, BoundsMap(bm)
, NumberOfSteps(0)
, NumRanks(this->Comm.size())
, Rank(this->Comm.rank())
, StepSize(0)
, TotalNumParticles(0)
, TotalNumTerminatedParticles(0)
{
}
//Initialize ParticleAdvectorBase
void SetStepSize(vtkm::FloatDefault stepSize) { this->StepSize = stepSize; }
void SetNumberOfSteps(vtkm::Id numSteps) { this->NumberOfSteps = numSteps; }
void SetSeeds(const vtkm::cont::ArrayHandle<vtkm::Particle>& seeds)
{
this->ClearParticles();
vtkm::Id n = seeds.GetNumberOfValues();
auto portal = seeds.ReadPortal();
std::vector<std::vector<vtkm::Id>> blockIDs;
std::vector<vtkm::Particle> particles;
for (vtkm::Id i = 0; i < n; i++)
{
const vtkm::Particle p = portal.Get(i);
std::vector<vtkm::Id> ids = this->BoundsMap.FindBlocks(p.Pos);
if (!ids.empty() && this->BoundsMap.FindRank(ids[0]) == this->Rank)
{
particles.push_back(p);
blockIDs.push_back(ids);
}
}
this->SetSeedArray(particles, blockIDs);
}
//Advect all the particles.
virtual void Go()
{
vtkm::filter::particleadvection::ParticleMessenger messenger(
this->Comm, this->BoundsMap, 1, 128);
vtkm::Id nLocal = static_cast<vtkm::Id>(this->Active.size() + this->Inactive.size());
this->ComputeTotalNumParticles(nLocal);
this->TotalNumTerminatedParticles = 0;
while (this->TotalNumTerminatedParticles < this->TotalNumParticles)
{
std::vector<vtkm::Particle> v;
vtkm::Id numTerm = 0, blockId = -1;
if (GetActiveParticles(v, blockId))
{
const auto& block = this->GetDataSet(blockId);
ResultType r;
block.Advect(v, this->StepSize, this->NumberOfSteps, r);
numTerm = this->UpdateResult(r, blockId);
}
vtkm::Id numTermMessages = 0;
this->Communicate(messenger, numTerm, numTermMessages);
this->TotalNumTerminatedParticles += (numTerm + numTermMessages);
if (this->TotalNumTerminatedParticles > this->TotalNumParticles)
throw vtkm::cont::ErrorFilterExecution("Particle count error");
}
}
inline vtkm::cont::PartitionedDataSet GetOutput();
protected:
virtual void ClearParticles()
{
this->Active.clear();
this->Inactive.clear();
this->Terminated.clear();
this->ParticleBlockIDsMap.clear();
}
void ComputeTotalNumParticles(const vtkm::Id& numLocal)
{
long long total = static_cast<long long>(numLocal);
#ifdef VTKM_ENABLE_MPI
MPI_Comm mpiComm = vtkmdiy::mpi::mpi_cast(this->Comm.handle());
MPI_Allreduce(MPI_IN_PLACE, &total, 1, MPI_LONG_LONG, MPI_SUM, mpiComm);
#endif
this->TotalNumParticles = static_cast<vtkm::Id>(total);
}
const DataSetIntegratorType& GetDataSet(vtkm::Id id) const
{
for (const auto& it : this->Blocks)
if (it.GetID() == id)
return it;
throw vtkm::cont::ErrorFilterExecution("Bad block");
}
void UpdateResultParticle(vtkm::Particle& p,
std::vector<vtkm::Particle>& I,
std::vector<vtkm::Particle>& T,
std::vector<vtkm::Particle>& A,
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& idsMapI,
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& idsMapA) const
{
if (p.Status.CheckTerminate())
T.push_back(p);
else
{
const auto& it = this->ParticleBlockIDsMap.find(p.ID);
VTKM_ASSERT(it != this->ParticleBlockIDsMap.end());
auto currBIDs = it->second;
VTKM_ASSERT(!currBIDs.empty());
std::vector<vtkm::Id> newIDs;
if (p.Status.CheckSpatialBounds() && !p.Status.CheckTookAnySteps())
newIDs.assign(std::next(currBIDs.begin(), 1), currBIDs.end());
else
newIDs = this->BoundsMap.FindBlocks(p.Pos, currBIDs);
//reset the particle status.
p.Status = vtkm::ParticleStatus();
if (newIDs.empty()) //No blocks, we're done.
{
p.Status.SetTerminate();
T.push_back(p);
}
else
{
//If we have more than blockId, we want to minimize communication
//and put any blocks owned by this rank first.
if (newIDs.size() > 1)
{
for (auto idit = newIDs.begin(); idit != newIDs.end(); idit++)
{
vtkm::Id bid = *idit;
if (this->BoundsMap.FindRank(bid) == this->Rank)
{
newIDs.erase(idit);
newIDs.insert(newIDs.begin(), bid);
break;
}
}
}
int dstRank = this->BoundsMap.FindRank(newIDs[0]);
if (dstRank == this->Rank)
{
A.push_back(p);
idsMapA[p.ID] = newIDs;
}
else
{
I.push_back(p);
idsMapI[p.ID] = newIDs;
}
}
}
}
virtual void SetSeedArray(const std::vector<vtkm::Particle>& particles,
const std::vector<std::vector<vtkm::Id>>& blockIds)
{
VTKM_ASSERT(particles.size() == blockIds.size());
auto pit = particles.begin();
auto bit = blockIds.begin();
while (pit != particles.end() && bit != blockIds.end())
{
this->ParticleBlockIDsMap[pit->ID] = *bit;
pit++;
bit++;
}
this->Active.insert(this->Active.end(), particles.begin(), particles.end());
}
virtual bool GetActiveParticles(std::vector<vtkm::Particle>& particles, vtkm::Id& blockId)
{
particles.clear();
blockId = -1;
if (this->Active.empty())
return false;
blockId = this->ParticleBlockIDsMap[this->Active.front().ID][0];
auto it = this->Active.begin();
while (it != this->Active.end())
{
auto p = *it;
if (blockId == this->ParticleBlockIDsMap[p.ID][0])
{
particles.push_back(p);
it = this->Active.erase(it);
}
else
it++;
}
return !particles.empty();
}
virtual void Communicate(vtkm::filter::particleadvection::ParticleMessenger& messenger,
vtkm::Id numLocalTerminations,
vtkm::Id& numTermMessages)
{
std::vector<vtkm::Particle> incoming;
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>> incomingIDs;
numTermMessages = 0;
messenger.Exchange(this->Inactive,
this->ParticleBlockIDsMap,
numLocalTerminations,
incoming,
incomingIDs,
numTermMessages,
this->GetBlockAndWait(numLocalTerminations));
this->Inactive.clear();
this->UpdateActive(incoming, incomingIDs);
}
virtual void UpdateActive(const std::vector<vtkm::Particle>& particles,
const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& idsMap)
{
VTKM_ASSERT(particles.size() == idsMap.size());
if (particles.empty())
return;
this->Active.insert(this->Active.end(), particles.begin(), particles.end());
for (const auto& it : idsMap)
this->ParticleBlockIDsMap[it.first] = it.second;
}
virtual void UpdateInactive(const std::vector<vtkm::Particle>& particles,
const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& idsMap)
{
VTKM_ASSERT(particles.size() == idsMap.size());
if (particles.empty())
return;
this->Inactive.insert(this->Inactive.end(), particles.begin(), particles.end());
for (const auto& it : idsMap)
this->ParticleBlockIDsMap[it.first] = it.second;
}
virtual void UpdateTerminated(const std::vector<vtkm::Particle>& particles, vtkm::Id blockId)
{
if (particles.empty())
return;
for (const auto& t : particles)
this->ParticleBlockIDsMap.erase(t.ID);
auto& it = this->Terminated[blockId];
it.insert(it.end(), particles.begin(), particles.end());
}
vtkm::Id UpdateResult(const ResultType& res, vtkm::Id blockId)
{
vtkm::Id n = res.Particles.GetNumberOfValues();
auto portal = res.Particles.ReadPortal();
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>> idsMapI, idsMapA;
std::vector<vtkm::Particle> I, T, A;
for (vtkm::Id i = 0; i < n; i++)
{
vtkm::Particle p = portal.Get(i);
this->UpdateResultParticle(p, I, T, A, idsMapI, idsMapA);
}
vtkm::Id numTerm = static_cast<vtkm::Id>(T.size());
this->UpdateActive(A, idsMapA);
this->UpdateInactive(I, idsMapI);
this->UpdateTerminated(T, blockId);
this->StoreResult(res, blockId);
return numTerm;
}
virtual bool GetBlockAndWait(const vtkm::Id& numLocalTerm)
{
//There are only two cases where blocking would deadlock.
//1. There are active particles.
//2. numLocalTerm + this->TotalNumberOfTerminatedParticles == this->TotalNumberOfParticles
//So, if neither are true, we can safely block and wait for communication to come in.
if (this->Active.empty() && this->Inactive.empty() &&
(numLocalTerm + this->TotalNumTerminatedParticles < this->TotalNumParticles))
{
return true;
}
return false;
}
inline void StoreResult(const ResultType& res, vtkm::Id blockId);
//Member data
std::vector<vtkm::Particle> Active;
std::vector<DataSetIntegratorType> Blocks;
vtkm::filter::particleadvection::BoundsMap BoundsMap;
vtkmdiy::mpi::communicator Comm = vtkm::cont::EnvironmentTracker::GetCommunicator();
std::vector<vtkm::Particle> Inactive;
vtkm::Id NumberOfSteps;
vtkm::Id NumRanks;
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>> ParticleBlockIDsMap;
vtkm::Id Rank;
std::map<vtkm::Id, std::vector<ResultType>> Results;
vtkm::FloatDefault StepSize;
vtkm::Id TotalNumParticles;
vtkm::Id TotalNumTerminatedParticles;
std::unordered_map<vtkm::Id, std::vector<vtkm::Particle>> Terminated;
};
}
}
} // namespace vtkm::filter::particleadvection
#ifndef vtk_m_filter_particleadvection_AdvectorBaseAlgorithm_hxx
#include <vtkm/filter/particleadvection/AdvectorBaseAlgorithm.hxx>
#endif
#endif

@ -0,0 +1,161 @@
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
//
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//============================================================================
#ifndef vtk_m_filter_particleadvection_AdvectorBaseAlgorithm_hxx
#define vtk_m_filter_particleadvection_AdvectorBaseAlgorithm_hxx
namespace vtkm
{
namespace filter
{
namespace particleadvection
{
using PAResultType = vtkm::worklet::ParticleAdvectionResult<vtkm::Particle>;
using SLResultType = vtkm::worklet::StreamlineResult<vtkm::Particle>;
//Result specific implementation.
template <>
inline void AdvectorBaseAlgorithm<PAResultType>::StoreResult(const PAResultType& vtkmNotUsed(res),
vtkm::Id vtkmNotUsed(blockId))
{
}
template <>
inline void AdvectorBaseAlgorithm<SLResultType>::StoreResult(const SLResultType& res,
vtkm::Id blockId)
{
this->Results[blockId].push_back(res);
}
template <>
inline vtkm::cont::PartitionedDataSet AdvectorBaseAlgorithm<PAResultType>::GetOutput()
{
vtkm::cont::PartitionedDataSet output;
for (const auto& it : this->Terminated)
{
if (it.second.empty())
continue;
auto particles = vtkm::cont::make_ArrayHandle(it.second, vtkm::CopyFlag::Off);
vtkm::cont::ArrayHandle<vtkm::Vec3f> pos;
vtkm::cont::ParticleArrayCopy(particles, pos);
vtkm::cont::DataSet ds;
vtkm::cont::CoordinateSystem outCoords("coordinates", pos);
ds.AddCoordinateSystem(outCoords);
//Create vertex cell set
vtkm::Id numPoints = pos.GetNumberOfValues();
vtkm::cont::CellSetSingleType<> cells;
vtkm::cont::ArrayHandleIndex conn(numPoints);
vtkm::cont::ArrayHandle<vtkm::Id> connectivity;
vtkm::cont::ArrayCopy(conn, connectivity);
cells.Fill(numPoints, vtkm::CELL_SHAPE_VERTEX, 1, connectivity);
ds.SetCellSet(cells);
output.AppendPartition(ds);
}
return output;
}
template <>
inline vtkm::cont::PartitionedDataSet AdvectorBaseAlgorithm<SLResultType>::GetOutput()
{
vtkm::cont::PartitionedDataSet output;
for (const auto& it : this->Results)
{
std::size_t nResults = it.second.size();
if (nResults == 0)
continue;
vtkm::cont::DataSet ds;
//Easy case with one result.
if (nResults == 1)
{
ds.AddCoordinateSystem(vtkm::cont::CoordinateSystem("coordinates", it.second[0].Positions));
ds.SetCellSet(it.second[0].PolyLines);
}
else
{
//Append all the results into one data set.
vtkm::cont::ArrayHandle<vtkm::Vec3f> appendPts;
std::vector<vtkm::Id> posOffsets(nResults);
const auto& res0 = it.second[0];
vtkm::Id totalNumCells = res0.PolyLines.GetNumberOfCells();
vtkm::Id totalNumPts = res0.Positions.GetNumberOfValues();
posOffsets[0] = 0;
for (std::size_t i = 1; i < nResults; i++)
{
const auto& res = it.second[i];
posOffsets[i] = totalNumPts;
totalNumPts += res.Positions.GetNumberOfValues();
totalNumCells += res.PolyLines.GetNumberOfCells();
}
//Append all the points together.
appendPts.Allocate(totalNumPts);
for (std::size_t i = 0; i < nResults; i++)
{
const auto& res = it.second[i];
// copy all values into appendPts starting at offset.
vtkm::cont::Algorithm::CopySubRange(
res.Positions, 0, res.Positions.GetNumberOfValues(), appendPts, posOffsets[i]);
}
vtkm::cont::CoordinateSystem outputCoords =
vtkm::cont::CoordinateSystem("coordinates", appendPts);
ds.AddCoordinateSystem(outputCoords);
//Create polylines.
std::vector<vtkm::Id> numPtsPerCell(static_cast<std::size_t>(totalNumCells));
std::size_t off = 0;
for (std::size_t i = 0; i < nResults; i++)
{
const auto& res = it.second[i];
vtkm::Id nCells = res.PolyLines.GetNumberOfCells();
for (vtkm::Id j = 0; j < nCells; j++)
numPtsPerCell[off++] = static_cast<vtkm::Id>(res.PolyLines.GetNumberOfPointsInCell(j));
}
auto numPointsPerCellArray = vtkm::cont::make_ArrayHandle(numPtsPerCell, vtkm::CopyFlag::Off);
vtkm::cont::ArrayHandle<vtkm::Id> cellIndex;
vtkm::Id connectivityLen =
vtkm::cont::Algorithm::ScanExclusive(numPointsPerCellArray, cellIndex);
vtkm::cont::ArrayHandleIndex connCount(connectivityLen);
vtkm::cont::ArrayHandle<vtkm::Id> connectivity;
vtkm::cont::ArrayCopy(connCount, connectivity);
vtkm::cont::ArrayHandle<vtkm::UInt8> cellTypes;
auto polyLineShape = vtkm::cont::make_ArrayHandleConstant<vtkm::UInt8>(
vtkm::CELL_SHAPE_POLY_LINE, totalNumCells);
vtkm::cont::ArrayCopy(polyLineShape, cellTypes);
auto offsets = vtkm::cont::ConvertNumIndicesToOffsets(numPointsPerCellArray);
vtkm::cont::CellSetExplicit<> polyLines;
polyLines.Fill(totalNumPts, cellTypes, connectivity, offsets);
ds.SetCellSet(polyLines);
}
output.AppendPartition(ds);
}
return output;
}
}
}
} // namespace vtkm::filter::particleadvection
#endif //vtk_m_filter_particleadvection_AdvectorBaseAlgorithm_hxx

@ -0,0 +1,189 @@
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
//
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//============================================================================
#ifndef vtk_m_filter_particleadvection_AdvectorBaseThreadedAlgorithm_h
#define vtk_m_filter_particleadvection_AdvectorBaseThreadedAlgorithm_h
#include <vtkm/filter/particleadvection/AdvectorBaseAlgorithm.h>
#include <thread>
namespace vtkm
{
namespace filter
{
namespace particleadvection
{
template <typename ResultType>
class VTKM_ALWAYS_EXPORT AdvectorBaseThreadedAlgorithm : public AdvectorBaseAlgorithm<ResultType>
{
public:
using DataSetIntegratorType = vtkm::filter::particleadvection::DataSetIntegrator;
AdvectorBaseThreadedAlgorithm(const vtkm::filter::particleadvection::BoundsMap& bm,
const std::vector<DataSetIntegratorType>& blocks)
: AdvectorBaseAlgorithm<ResultType>(bm, blocks)
, Done(false)
, WorkerActivate(false)
{
//For threaded algorithm, the particles go out of scope in the Work method.
//When this happens, they are destructed by the time the Manage thread gets them.
//Set the copy flag so the std::vector is copied into the ArrayHandle
for (auto& block : this->Blocks)
block.SetCopySeedFlag(true);
}
void Go() override
{
vtkm::Id nLocal = static_cast<vtkm::Id>(this->Active.size() + this->Inactive.size());
this->ComputeTotalNumParticles(nLocal);
std::vector<std::thread> workerThreads;
workerThreads.push_back(std::thread(AdvectorBaseThreadedAlgorithm::Worker, this));
this->Manage();
for (auto& t : workerThreads)
t.join();
}
protected:
bool GetActiveParticles(std::vector<vtkm::Particle>& particles, vtkm::Id& blockId) override
{
std::lock_guard<std::mutex> lock(this->Mutex);
bool val = this->AdvectorBaseAlgorithm<ResultType>::GetActiveParticles(particles, blockId);
this->WorkerActivate = val;
return val;
}
void UpdateActive(const std::vector<vtkm::Particle>& particles,
const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& idsMap) override
{
if (!particles.empty())
{
std::lock_guard<std::mutex> lock(this->Mutex);
this->AdvectorBaseAlgorithm<ResultType>::UpdateActive(particles, idsMap);
//Let workers know there is new work
this->WorkerActivateCondition.notify_all();
this->WorkerActivate = true;
}
}
bool CheckDone()
{
std::lock_guard<std::mutex> lock(this->Mutex);
return this->Done;
}
void SetDone()
{
std::lock_guard<std::mutex> lock(this->Mutex);
this->Done = true;
this->WorkerActivateCondition.notify_all();
}
static void Worker(AdvectorBaseThreadedAlgorithm* algo) { algo->Work(); }
void WorkerWait()
{
std::unique_lock<std::mutex> lock(this->Mutex);
this->WorkerActivateCondition.wait(lock, [this] { return WorkerActivate || Done; });
}
void Work()
{
while (!this->CheckDone())
{
std::vector<vtkm::Particle> v;
vtkm::Id blockId = -1;
if (this->GetActiveParticles(v, blockId))
{
const auto& block = this->GetDataSet(blockId);
ResultType r;
block.Advect(v, this->StepSize, this->NumberOfSteps, r);
this->UpdateWorkerResult(blockId, r);
}
else
this->WorkerWait();
}
}
void Manage()
{
vtkm::filter::particleadvection::ParticleMessenger messenger(
this->Comm, this->BoundsMap, 1, 128);
while (this->TotalNumTerminatedParticles < this->TotalNumParticles)
{
std::unordered_map<vtkm::Id, std::vector<ResultType>> workerResults;
this->GetWorkerResults(workerResults);
vtkm::Id numTerm = 0;
for (const auto& it : workerResults)
{
vtkm::Id blockId = it.first;
const auto& results = it.second;
for (const auto& r : results)
numTerm += this->UpdateResult(r, blockId);
}
vtkm::Id numTermMessages = 0;
this->Communicate(messenger, numTerm, numTermMessages);
this->TotalNumTerminatedParticles += (numTerm + numTermMessages);
if (this->TotalNumTerminatedParticles > this->TotalNumParticles)
throw vtkm::cont::ErrorFilterExecution("Particle count error");
}
//Let the workers know that we are done.
this->SetDone();
}
bool GetBlockAndWait(const vtkm::Id& numLocalTerm) override
{
std::lock_guard<std::mutex> lock(this->Mutex);
return (this->AdvectorBaseAlgorithm<ResultType>::GetBlockAndWait(numLocalTerm) &&
!this->WorkerActivate && this->WorkerResults.empty());
}
void GetWorkerResults(std::unordered_map<vtkm::Id, std::vector<ResultType>>& results)
{
results.clear();
std::lock_guard<std::mutex> lock(this->Mutex);
if (!this->WorkerResults.empty())
{
results = this->WorkerResults;
this->WorkerResults.clear();
}
}
void UpdateWorkerResult(vtkm::Id blockId, const ResultType& result)
{
std::lock_guard<std::mutex> lock(this->Mutex);
auto& it = this->WorkerResults[blockId];
it.push_back(result);
}
std::atomic<bool> Done;
std::mutex Mutex;
bool WorkerActivate;
std::condition_variable WorkerActivateCondition;
std::unordered_map<vtkm::Id, std::vector<ResultType>> WorkerResults;
};
}
}
} // namespace vtkm::filter::particleadvection
#endif

@ -9,12 +9,15 @@
##============================================================================
set(headers
AdvectorBaseAlgorithm.h
AdvectorBaseAlgorithm.hxx
AdvectorBaseThreadedAlgorithm.h
BoundsMap.h
DataSetIntegrator.h
Messenger.h
ParticleAdvector.h
ParticleAdvector.hxx
ParticleAdvectionAlgorithm.h
ParticleMessenger.h
StreamlineAlgorithm.h
)
#-----------------------------------------------------------------------------

@ -13,6 +13,7 @@
#include <vtkm/cont/ArrayHandleTransform.h>
#include <vtkm/cont/DataSet.h>
#include <vtkm/worklet/ParticleAdvection.h>
#include <vtkm/worklet/particleadvection/RK4Integrator.h>
#include <memory>
@ -35,6 +36,7 @@ class VTKM_ALWAYS_EXPORT DataSetIntegrator
public:
DataSetIntegrator(const vtkm::cont::DataSet& ds, vtkm::Id id, std::string fieldNm)
: ActiveField(ds.GetField(fieldNm))
, CopySeedArray(false)
, Eval(nullptr)
, ID(id)
{
@ -50,6 +52,7 @@ public:
}
vtkm::Id GetID() const { return this->ID; }
void SetCopySeedFlag(bool val) { this->CopySeedArray = val; }
template <typename ResultType>
void Advect(std::vector<vtkm::Particle>& v,
@ -57,7 +60,8 @@ public:
vtkm::Id maxSteps,
ResultType& result) const
{
auto seedArray = vtkm::cont::make_ArrayHandle(v, vtkm::CopyFlag::Off);
auto copyFlag = (this->CopySeedArray ? vtkm::CopyFlag::On : vtkm::CopyFlag::Off);
auto seedArray = vtkm::cont::make_ArrayHandle(v, copyFlag);
RK4Type rk4(*this->Eval, stepSize);
this->DoAdvect(seedArray, rk4, maxSteps, result);
}
@ -70,6 +74,7 @@ private:
ResultType& result) const;
vtkm::cont::Field ActiveField;
bool CopySeedArray;
std::shared_ptr<GridEvalType> Eval;
vtkm::Id ID;
};

@ -0,0 +1,56 @@
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
//
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//============================================================================
#ifndef vtk_m_filter_particleadvection_ParticleAdvectionAlgorithm_h
#define vtk_m_filter_particleadvection_ParticleAdvectionAlgorithm_h
#include <vtkm/filter/particleadvection/AdvectorBaseAlgorithm.h>
#include <vtkm/filter/particleadvection/AdvectorBaseThreadedAlgorithm.h>
#include <vtkm/filter/particleadvection/DataSetIntegrator.h>
namespace vtkm
{
namespace filter
{
namespace particleadvection
{
class VTKM_ALWAYS_EXPORT ParticleAdvectionAlgorithm
: public AdvectorBaseAlgorithm<vtkm::worklet::ParticleAdvectionResult<vtkm::Particle>>
{
using DataSetIntegratorType = vtkm::filter::particleadvection::DataSetIntegrator;
public:
ParticleAdvectionAlgorithm(const vtkm::filter::particleadvection::BoundsMap& bm,
const std::vector<DataSetIntegratorType>& blocks)
: AdvectorBaseAlgorithm<vtkm::worklet::ParticleAdvectionResult<vtkm::Particle>>(bm, blocks)
{
}
};
class VTKM_ALWAYS_EXPORT ParticleAdvectionThreadedAlgorithm
: public AdvectorBaseThreadedAlgorithm<vtkm::worklet::ParticleAdvectionResult<vtkm::Particle>>
{
using DataSetIntegratorType = vtkm::filter::particleadvection::DataSetIntegrator;
public:
ParticleAdvectionThreadedAlgorithm(const vtkm::filter::particleadvection::BoundsMap& bm,
const std::vector<DataSetIntegratorType>& blocks)
: AdvectorBaseThreadedAlgorithm<vtkm::worklet::ParticleAdvectionResult<vtkm::Particle>>(bm,
blocks)
{
}
};
}
}
} // namespace vtkm::filter::particleadvection
#endif //vtk_m_filter_particleadvection_ParticleAdvectionAlgorithm_h

@ -1,370 +0,0 @@
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
//
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//============================================================================
#ifndef vtk_m_filter_ParticleAdvector_h
#define vtk_m_filter_ParticleAdvector_h
#include <vtkm/cont/Algorithm.h>
#include <vtkm/cont/ArrayCopy.h>
#include <vtkm/cont/ArrayHandleIndex.h>
#include <vtkm/cont/ArrayHandleTransform.h>
#include <vtkm/cont/DataSet.h>
#include <vtkm/cont/ErrorFilterExecution.h>
#include <vtkm/cont/ParticleArrayCopy.h>
#include <vtkm/filter/particleadvection/DataSetIntegrator.h>
#include <vtkm/filter/particleadvection/ParticleMessenger.h>
#include <map>
#include <vector>
namespace vtkm
{
namespace filter
{
namespace particleadvection
{
//
// Base class for particle advector
//
template <typename ResultType>
class VTKM_ALWAYS_EXPORT ParticleAdvectorBase
{
using DataSetIntegratorType = vtkm::filter::particleadvection::DataSetIntegrator;
public:
ParticleAdvectorBase(const vtkm::filter::particleadvection::BoundsMap& bm,
const std::vector<DataSetIntegratorType>& blocks)
: Blocks(blocks)
, BoundsMap(bm)
, NumberOfSteps(0)
, NumRanks(this->Comm.size())
, Rank(this->Comm.rank())
, StepSize(0)
{
}
void SetStepSize(vtkm::FloatDefault stepSize) { this->StepSize = stepSize; }
void SetNumberOfSteps(vtkm::Id numSteps) { this->NumberOfSteps = numSteps; }
virtual void SetSeeds(const vtkm::cont::ArrayHandle<vtkm::Particle>& seeds) = 0;
virtual void Go() = 0;
virtual vtkm::cont::PartitionedDataSet GetOutput() = 0;
protected:
virtual bool GetActiveParticles(std::vector<vtkm::Particle>& particles) = 0;
inline vtkm::Id ComputeTotalNumParticles(vtkm::Id numLocal) const;
inline const DataSetIntegratorType& GetDataSet(vtkm::Id id) const;
virtual void StoreResult(const ResultType& vtkmNotUsed(res), vtkm::Id vtkmNotUsed(blockId)) {}
inline void UpdateResult(const ResultType& res,
vtkm::Id blockId,
std::vector<vtkm::Particle>& I,
std::vector<vtkm::Particle>& T,
std::vector<vtkm::Particle>& A);
std::vector<DataSetIntegratorType> Blocks;
vtkm::filter::particleadvection::BoundsMap BoundsMap;
vtkmdiy::mpi::communicator Comm = vtkm::cont::EnvironmentTracker::GetCommunicator();
vtkm::Id NumberOfSteps;
vtkm::Id NumRanks;
vtkm::Id Rank;
vtkm::FloatDefault StepSize;
std::map<vtkm::Id, std::vector<vtkm::Id>> ParticleBlockIDsMap;
};
//
// Base algorithm for particle advection.
// This is a single-threaded algorithm.
//
template <typename ResultType>
class VTKM_ALWAYS_EXPORT PABaseAlgorithm : public ParticleAdvectorBase<ResultType>
{
using DataSetIntegratorType = vtkm::filter::particleadvection::DataSetIntegrator;
public:
PABaseAlgorithm(const vtkm::filter::particleadvection::BoundsMap& bm,
const std::vector<DataSetIntegratorType>& blocks)
: ParticleAdvectorBase<ResultType>(bm, blocks)
{
}
void SetSeeds(const vtkm::cont::ArrayHandle<vtkm::Particle>& seeds) override
{
this->ParticleBlockIDsMap.clear();
vtkm::Id n = seeds.GetNumberOfValues();
auto portal = seeds.ReadPortal();
for (vtkm::Id i = 0; i < n; i++)
{
vtkm::Particle p = portal.Get(i);
std::vector<vtkm::Id> blockIDs = this->BoundsMap.FindBlocks(p.Pos);
if (!blockIDs.empty() && this->BoundsMap.FindRank(blockIDs[0]) == this->Rank)
{
this->Active.push_back(p);
this->ParticleBlockIDsMap[p.ID] = blockIDs;
}
}
}
void Go() override
{
vtkm::filter::particleadvection::ParticleMessenger messenger(
this->Comm, this->BoundsMap, 1, 128);
vtkm::Id nLocal = static_cast<vtkm::Id>(this->Active.size() + this->Inactive.size());
vtkm::Id totalNumSeeds = this->ComputeTotalNumParticles(nLocal);
vtkm::Id N = 0;
while (N < totalNumSeeds)
{
std::vector<vtkm::Particle> v, I, T, A;
vtkm::Id blockId = -1;
if (GetActiveParticles(v))
{
blockId = this->ParticleBlockIDsMap[v[0].ID][0];
auto& block = this->GetDataSet(blockId);
ResultType r;
block.Advect(v, this->StepSize, this->NumberOfSteps, r);
this->UpdateResult(r, blockId, I, T, A);
if (!A.empty())
this->Active.insert(this->Active.end(), A.begin(), A.end());
}
std::vector<vtkm::Particle> incoming;
std::map<vtkm::Id, std::vector<vtkm::Id>> incomingBlockIDsMap;
vtkm::Id myTerm = static_cast<vtkm::Id>(T.size());
vtkm::Id numTermMessages = 0;
messenger.Exchange(
I, this->ParticleBlockIDsMap, myTerm, incoming, incomingBlockIDsMap, numTermMessages);
if (!incoming.empty())
{
this->Active.insert(this->Active.end(), incoming.begin(), incoming.end());
for (const auto& it : incomingBlockIDsMap)
this->ParticleBlockIDsMap[it.first] = it.second;
}
if (!T.empty())
{
auto& it = this->Terminated[blockId];
it.insert(it.end(), T.begin(), T.end());
}
N += (myTerm + numTermMessages);
if (N > totalNumSeeds)
throw vtkm::cont::ErrorFilterExecution("Particle count error");
}
}
protected:
bool GetActiveParticles(std::vector<vtkm::Particle>& particles) override
{
particles.clear();
if (this->Active.empty())
return false;
vtkm::Id workingBlockID = this->ParticleBlockIDsMap[this->Active.front().ID][0];
auto it = this->Active.begin();
while (it != this->Active.end())
{
auto p = *it;
if (workingBlockID == this->ParticleBlockIDsMap[p.ID][0])
{
particles.push_back(p);
it = this->Active.erase(it);
}
else
it++;
}
return !particles.empty();
}
protected:
std::vector<vtkm::Particle> Active;
std::vector<vtkm::Particle> Inactive;
std::map<vtkm::Id, std::vector<vtkm::Particle>> Terminated;
};
class VTKM_ALWAYS_EXPORT ParticleAdvectionAlgorithm
: public PABaseAlgorithm<vtkm::worklet::ParticleAdvectionResult<vtkm::Particle>>
{
using DataSetIntegratorType = vtkm::filter::particleadvection::DataSetIntegrator;
public:
ParticleAdvectionAlgorithm(const vtkm::filter::particleadvection::BoundsMap& bm,
const std::vector<DataSetIntegratorType>& blocks)
: PABaseAlgorithm<vtkm::worklet::ParticleAdvectionResult<vtkm::Particle>>(bm, blocks)
{
}
vtkm::cont::PartitionedDataSet GetOutput() override
{
vtkm::cont::PartitionedDataSet output;
for (const auto& it : this->Terminated)
{
if (it.second.empty())
continue;
auto particles = vtkm::cont::make_ArrayHandle(it.second, vtkm::CopyFlag::Off);
vtkm::cont::ArrayHandle<vtkm::Vec3f> pos;
vtkm::cont::ParticleArrayCopy(particles, pos);
vtkm::cont::DataSet ds;
vtkm::cont::CoordinateSystem outCoords("coordinates", pos);
ds.AddCoordinateSystem(outCoords);
//Create vertex cell set
vtkm::Id numPoints = pos.GetNumberOfValues();
vtkm::cont::CellSetSingleType<> cells;
vtkm::cont::ArrayHandleIndex conn(numPoints);
vtkm::cont::ArrayHandle<vtkm::Id> connectivity;
vtkm::cont::ArrayCopy(conn, connectivity);
cells.Fill(numPoints, vtkm::CELL_SHAPE_VERTEX, 1, connectivity);
ds.SetCellSet(cells);
output.AppendPartition(ds);
}
return output;
}
protected:
};
class VTKM_ALWAYS_EXPORT StreamlineAlgorithm
: public PABaseAlgorithm<vtkm::worklet::StreamlineResult<vtkm::Particle>>
{
using DataSetIntegratorType = vtkm::filter::particleadvection::DataSetIntegrator;
public:
StreamlineAlgorithm(const vtkm::filter::particleadvection::BoundsMap& bm,
const std::vector<DataSetIntegratorType>& blocks)
: PABaseAlgorithm<vtkm::worklet::StreamlineResult<vtkm::Particle>>(bm, blocks)
{
}
vtkm::cont::PartitionedDataSet GetOutput() override
{
vtkm::cont::PartitionedDataSet output;
for (const auto& it : this->Results)
{
std::size_t nResults = it.second.size();
if (nResults == 0)
continue;
vtkm::cont::DataSet ds;
//Easy case with one result.
if (nResults == 1)
{
ds.AddCoordinateSystem(vtkm::cont::CoordinateSystem("coordinates", it.second[0].Positions));
ds.SetCellSet(it.second[0].PolyLines);
}
else
{
//Append all the results into one data set.
vtkm::cont::ArrayHandle<vtkm::Vec3f> appendPts;
std::vector<vtkm::Id> posOffsets(nResults);
const auto& res0 = it.second[0];
vtkm::Id totalNumCells = res0.PolyLines.GetNumberOfCells();
vtkm::Id totalNumPts = res0.Positions.GetNumberOfValues();
posOffsets[0] = 0;
for (std::size_t i = 1; i < nResults; i++)
{
const auto& res = it.second[i];
posOffsets[i] = totalNumPts;
totalNumPts += res.Positions.GetNumberOfValues();
totalNumCells += res.PolyLines.GetNumberOfCells();
}
//Append all the points together.
appendPts.Allocate(totalNumPts);
for (std::size_t i = 0; i < nResults; i++)
{
const auto& res = it.second[i];
// copy all values into appendPts starting at offset.
vtkm::cont::Algorithm::CopySubRange(
res.Positions, 0, res.Positions.GetNumberOfValues(), appendPts, posOffsets[i]);
}
vtkm::cont::CoordinateSystem outputCoords =
vtkm::cont::CoordinateSystem("coordinates", appendPts);
ds.AddCoordinateSystem(outputCoords);
//Create polylines.
std::vector<vtkm::Id> numPtsPerCell(static_cast<std::size_t>(totalNumCells));
std::size_t off = 0;
for (std::size_t i = 0; i < nResults; i++)
{
const auto& res = it.second[i];
vtkm::Id nCells = res.PolyLines.GetNumberOfCells();
for (vtkm::Id j = 0; j < nCells; j++)
numPtsPerCell[off++] = static_cast<vtkm::Id>(res.PolyLines.GetNumberOfPointsInCell(j));
}
auto numPointsPerCellArray =
vtkm::cont::make_ArrayHandle(numPtsPerCell, vtkm::CopyFlag::Off);
vtkm::cont::ArrayHandle<vtkm::Id> cellIndex;
vtkm::Id connectivityLen =
vtkm::cont::Algorithm::ScanExclusive(numPointsPerCellArray, cellIndex);
vtkm::cont::ArrayHandleIndex connCount(connectivityLen);
vtkm::cont::ArrayHandle<vtkm::Id> connectivity;
vtkm::cont::ArrayCopy(connCount, connectivity);
vtkm::cont::ArrayHandle<vtkm::UInt8> cellTypes;
auto polyLineShape = vtkm::cont::make_ArrayHandleConstant<vtkm::UInt8>(
vtkm::CELL_SHAPE_POLY_LINE, totalNumCells);
vtkm::cont::ArrayCopy(polyLineShape, cellTypes);
auto offsets = vtkm::cont::ConvertNumIndicesToOffsets(numPointsPerCellArray);
vtkm::cont::CellSetExplicit<> polyLines;
polyLines.Fill(totalNumPts, cellTypes, connectivity, offsets);
ds.SetCellSet(polyLines);
}
output.AppendPartition(ds);
}
return output;
}
protected:
virtual void StoreResult(const vtkm::worklet::StreamlineResult<vtkm::Particle>& res,
vtkm::Id blockId) override
{
this->Results[blockId].push_back(res);
}
std::map<vtkm::Id, std::vector<vtkm::worklet::StreamlineResult<vtkm::Particle>>> Results;
};
}
}
} // namespace vtkm::filter::particleadvection
#ifndef vtk_m_filter_ParticleAdvector_hxx
#include <vtkm/filter/particleadvection/ParticleAdvector.hxx>
#endif
#endif

@ -1,114 +0,0 @@
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
//
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//============================================================================
#ifndef vtk_m_filter_ParticleAdvector_hxx
#define vtk_m_filter_ParticleAdvector_hxx
namespace vtkm
{
namespace filter
{
namespace particleadvection
{
template <typename ResultType>
inline vtkm::Id ParticleAdvectorBase<ResultType>::ComputeTotalNumParticles(vtkm::Id numLocal) const
{
long long totalNumParticles = static_cast<long long>(numLocal);
#ifdef VTKM_ENABLE_MPI
MPI_Comm mpiComm = vtkmdiy::mpi::mpi_cast(this->Comm.handle());
MPI_Allreduce(MPI_IN_PLACE, &totalNumParticles, 1, MPI_LONG_LONG, MPI_SUM, mpiComm);
#endif
return static_cast<vtkm::Id>(totalNumParticles);
}
template <typename ResultType>
inline const vtkm::filter::particleadvection::DataSetIntegrator&
ParticleAdvectorBase<ResultType>::GetDataSet(vtkm::Id id) const
{
for (const auto& it : this->Blocks)
if (it.GetID() == id)
return it;
throw vtkm::cont::ErrorFilterExecution("Bad block");
}
template <typename ResultType>
inline void ParticleAdvectorBase<ResultType>::UpdateResult(const ResultType& res,
vtkm::Id blockId,
std::vector<vtkm::Particle>& I,
std::vector<vtkm::Particle>& T,
std::vector<vtkm::Particle>& A)
{
vtkm::Id n = res.Particles.GetNumberOfValues();
auto portal = res.Particles.ReadPortal();
for (vtkm::Id i = 0; i < n; i++)
{
vtkm::Particle p = portal.Get(i);
if (p.Status.CheckTerminate())
{
T.push_back(p);
this->ParticleBlockIDsMap.erase(p.ID);
}
else
{
std::vector<vtkm::Id> currBIDs = this->ParticleBlockIDsMap[p.ID];
VTKM_ASSERT(!currBIDs.empty());
std::vector<vtkm::Id> newIDs;
if (p.Status.CheckSpatialBounds() && !p.Status.CheckTookAnySteps())
newIDs.assign(std::next(currBIDs.begin(), 1), currBIDs.end());
else
newIDs = this->BoundsMap.FindBlocks(p.Pos, currBIDs);
//reset the particle status.
p.Status = vtkm::ParticleStatus();
if (newIDs.empty()) //No blocks, we're done.
{
T.push_back(p);
this->ParticleBlockIDsMap.erase(p.ID);
}
else
{
//If we have more than blockId, we want to minimize communication
//and put any blocks owned by this rank first.
if (newIDs.size() > 1)
{
for (auto it = newIDs.begin(); it != newIDs.end(); it++)
{
vtkm::Id bid = *it;
if (this->BoundsMap.FindRank(bid) == this->Rank)
{
newIDs.erase(it);
newIDs.insert(newIDs.begin(), bid);
break;
}
}
}
int dstRank = this->BoundsMap.FindRank(newIDs[0]);
this->ParticleBlockIDsMap[p.ID] = newIDs;
if (dstRank == this->Rank)
A.push_back(p);
else
I.push_back(p);
}
}
}
this->StoreResult(res, blockId);
}
}
}
} // namespace vtkm::filter::particleadvection
#endif

@ -51,6 +51,16 @@ std::size_t ParticleMessenger::CalcParticleBufferSize(std::size_t nParticles, st
+ sizeof(vtkm::UInt8) // Status
+ sizeof(vtkm::FloatDefault); // Time
#ifndef NDEBUG
vtkmdiy::MemoryBuffer buff;
vtkm::Particle p;
vtkmdiy::save(buff, p);
//If this assert fires, vtkm::Particle changed
//and pSize should be updated.
VTKM_ASSERT(pSize == buff.size());
#endif
return
// rank
sizeof(int)
@ -69,10 +79,11 @@ std::size_t ParticleMessenger::CalcParticleBufferSize(std::size_t nParticles, st
VTKM_CONT
void ParticleMessenger::SerialExchange(
const std::vector<vtkm::Particle>& outData,
const std::map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
vtkm::Id vtkmNotUsed(numLocalTerm),
std::vector<vtkm::Particle>& inData,
std::map<vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap) const
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap,
bool vtkmNotUsed(blockAndWait)) const
{
for (auto& p : outData)
{
@ -83,23 +94,26 @@ void ParticleMessenger::SerialExchange(
}
VTKM_CONT
void ParticleMessenger::Exchange(const std::vector<vtkm::Particle>& outData,
const std::map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
vtkm::Id numLocalTerm,
std::vector<vtkm::Particle>& inData,
std::map<vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap,
vtkm::Id& numTerminateMessages)
void ParticleMessenger::Exchange(
const std::vector<vtkm::Particle>& outData,
const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
vtkm::Id numLocalTerm,
std::vector<vtkm::Particle>& inData,
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap,
vtkm::Id& numTerminateMessages,
bool blockAndWait)
{
numTerminateMessages = 0;
inDataBlockIDsMap.clear();
if (this->NumRanks == 1)
return this->SerialExchange(outData, outBlockIDsMap, numLocalTerm, inData, inDataBlockIDsMap);
return this->SerialExchange(
outData, outBlockIDsMap, numLocalTerm, inData, inDataBlockIDsMap, blockAndWait);
#ifdef VTKM_ENABLE_MPI
//dstRank, vector of (particles,blockIDs)
std::map<int, std::vector<ParticleCommType>> sendData;
std::unordered_map<int, std::vector<ParticleCommType>> sendData;
for (const auto& p : outData)
{
@ -108,10 +122,16 @@ void ParticleMessenger::Exchange(const std::vector<vtkm::Particle>& outData,
sendData[dstRank].push_back(std::make_pair(p, bids));
}
//Do all the sends first.
if (numLocalTerm > 0)
SendAllMsg({ MSG_TERMINATE, static_cast<int>(numLocalTerm) });
this->SendParticles(sendData);
this->CheckPendingSendRequests();
//Check if we have anything coming in.
std::vector<ParticleRecvCommType> particleData;
std::vector<MsgCommType> msgData;
if (RecvAny(&msgData, &particleData, false))
if (RecvAny(&msgData, &particleData, blockAndWait))
{
for (const auto& it : particleData)
for (const auto& v : it.second)
@ -128,13 +148,6 @@ void ParticleMessenger::Exchange(const std::vector<vtkm::Particle>& outData,
numTerminateMessages += static_cast<vtkm::Id>(m.second[1]);
}
}
//Do all the sending...
if (numLocalTerm > 0)
SendAllMsg({ MSG_TERMINATE, static_cast<int>(numLocalTerm) });
this->SendParticles(sendData);
this->CheckPendingSendRequests();
#endif
}

@ -50,11 +50,12 @@ public:
VTKM_CONT ~ParticleMessenger() {}
VTKM_CONT void Exchange(const std::vector<vtkm::Particle>& outData,
const std::map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
vtkm::Id numLocalTerm,
std::vector<vtkm::Particle>& inData,
std::map<vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap,
vtkm::Id& numTerminateMessages);
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap,
vtkm::Id& numTerminateMessages,
bool blockAndWait = false);
protected:
#ifdef VTKM_ENABLE_MPI
@ -81,7 +82,7 @@ protected:
template <typename, typename>
class Container,
typename Allocator = std::allocator<P>>
inline void SendParticles(const std::map<int, Container<P, Allocator>>& m);
inline void SendParticles(const std::unordered_map<int, Container<P, Allocator>>& m);
// Send/Recv messages.
VTKM_CONT void SendMsg(int dst, const std::vector<int>& msg);
@ -96,11 +97,13 @@ protected:
#endif
VTKM_CONT void SerialExchange(const std::vector<vtkm::Particle>& outData,
const std::map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
vtkm::Id numLocalTerm,
std::vector<vtkm::Particle>& inData,
std::map<vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap) const;
VTKM_CONT void SerialExchange(
const std::vector<vtkm::Particle>& outData,
const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
vtkm::Id numLocalTerm,
std::vector<vtkm::Particle>& inData,
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap,
bool blockAndWait) const;
static std::size_t CalcParticleBufferSize(std::size_t nParticles, std::size_t numBlockIds = 2);
};
@ -127,7 +130,8 @@ inline void ParticleMessenger::SendParticles(int dst, const Container<P, Allocat
VTKM_CONT
template <typename P, template <typename, typename> class Container, typename Allocator>
inline void ParticleMessenger::SendParticles(const std::map<int, Container<P, Allocator>>& m)
inline void ParticleMessenger::SendParticles(
const std::unordered_map<int, Container<P, Allocator>>& m)
{
for (auto mit = m.begin(); mit != m.end(); mit++)
if (!mit->second.empty())

@ -0,0 +1,55 @@
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
//
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//============================================================================
#ifndef vtk_m_filter_particleadvection_StreamlineAlgorithm_h
#define vtk_m_filter_particleadvection_StreamlineAlgorithm_h
#include <vtkm/filter/particleadvection/AdvectorBaseAlgorithm.h>
#include <vtkm/filter/particleadvection/AdvectorBaseThreadedAlgorithm.h>
#include <vtkm/filter/particleadvection/DataSetIntegrator.h>
namespace vtkm
{
namespace filter
{
namespace particleadvection
{
class VTKM_ALWAYS_EXPORT StreamlineAlgorithm
: public AdvectorBaseAlgorithm<vtkm::worklet::StreamlineResult<vtkm::Particle>>
{
using DataSetIntegratorType = vtkm::filter::particleadvection::DataSetIntegrator;
public:
StreamlineAlgorithm(const vtkm::filter::particleadvection::BoundsMap& bm,
const std::vector<DataSetIntegratorType>& blocks)
: AdvectorBaseAlgorithm<vtkm::worklet::StreamlineResult<vtkm::Particle>>(bm, blocks)
{
}
};
class VTKM_ALWAYS_EXPORT StreamlineThreadedAlgorithm
: public AdvectorBaseThreadedAlgorithm<vtkm::worklet::StreamlineResult<vtkm::Particle>>
{
using DataSetIntegratorType = vtkm::filter::particleadvection::DataSetIntegrator;
public:
StreamlineThreadedAlgorithm(const vtkm::filter::particleadvection::BoundsMap& bm,
const std::vector<DataSetIntegratorType>& blocks)
: AdvectorBaseThreadedAlgorithm<vtkm::worklet::StreamlineResult<vtkm::Particle>>(bm, blocks)
{
}
};
}
}
} // namespace vtkm::filter::particleadvection
#endif //vtk_m_filter_particleadvection_StreamlineAlgorithm_h

@ -508,7 +508,10 @@ void TestStreamlineFilters()
{
for (auto useGhost : flags)
for (auto useSL : flags)
{
useSL = false;
TestPartitionedDataSet(n, useGhost, useSL);
}
}
TestStreamline();

@ -36,7 +36,7 @@ void AddVectorFields(vtkm::cont::PartitionedDataSet& pds,
ds.AddPointField(fieldName, CreateConstantVectorField(ds.GetNumberOfPoints(), vec));
}
void TestAMRStreamline(bool useSL)
void TestAMRStreamline(bool useSL, bool useThreaded)
{
auto comm = vtkm::cont::EnvironmentTracker::GetCommunicator();
if (comm.size() < 2)
@ -105,6 +105,7 @@ void TestAMRStreamline(bool useSL)
if (useSL)
{
vtkm::filter::Streamline filter;
filter.SetUseThreadedAlgorithm(useThreaded);
filter.SetStepSize(0.1f);
filter.SetNumberOfSteps(100000);
filter.SetSeeds(seedArray);
@ -187,6 +188,7 @@ void TestAMRStreamline(bool useSL)
else
{
vtkm::filter::ParticleAdvection filter;
filter.SetUseThreadedAlgorithm(useThreaded);
filter.SetStepSize(0.1f);
filter.SetNumberOfSteps(100000);
filter.SetSeeds(seedArray);
@ -219,7 +221,7 @@ void TestAMRStreamline(bool useSL)
}
}
void TestPartitionedDataSet(vtkm::Id nPerRank, bool useGhost, bool useSL)
void TestPartitionedDataSet(vtkm::Id nPerRank, bool useGhost, bool useSL, bool useThreaded)
{
auto comm = vtkm::cont::EnvironmentTracker::GetCommunicator();
@ -268,6 +270,7 @@ void TestPartitionedDataSet(vtkm::Id nPerRank, bool useGhost, bool useSL)
{
vtkm::filter::Streamline streamline;
streamline.SetUseThreadedAlgorithm(useThreaded);
streamline.SetStepSize(0.1f);
streamline.SetNumberOfSteps(100000);
streamline.SetSeeds(seedArray);
@ -313,6 +316,7 @@ void TestPartitionedDataSet(vtkm::Id nPerRank, bool useGhost, bool useSL)
{
vtkm::filter::ParticleAdvection particleAdvection;
particleAdvection.SetUseThreadedAlgorithm(useThreaded);
particleAdvection.SetStepSize(0.1f);
particleAdvection.SetNumberOfSteps(100000);
particleAdvection.SetSeeds(seedArray);
@ -356,11 +360,13 @@ void TestStreamlineFiltersMPI()
{
for (auto useGhost : flags)
for (auto useSL : flags)
TestPartitionedDataSet(n, useGhost, useSL);
for (auto useThreaded : flags)
TestPartitionedDataSet(n, useGhost, useSL, useThreaded);
}
for (auto useSL : flags)
TestAMRStreamline(useSL);
for (auto useThreaded : flags)
TestAMRStreamline(useSL, useThreaded);
}
}

@ -225,8 +225,7 @@ public:
vtkm::cont::make_ArrayHandleConstant<vtkm::UInt8>(vtkm::CELL_SHAPE_POLY_LINE, numSeeds);
vtkm::cont::ArrayCopy(polyLineShape, cellTypes);
auto numIndices = vtkm::cont::make_ArrayHandleCast(numPoints, vtkm::IdComponent());
auto offsets = vtkm::cont::ConvertNumIndicesToOffsets(numIndices);
auto offsets = vtkm::cont::ConvertNumIndicesToOffsets(numPoints);
polyLines.Fill(positions.GetNumberOfValues(), cellTypes, connectivity, offsets);
}
};