Threaded algorithms for particle advection

This commit is contained in:
Dave Pugmire 2020-11-18 17:10:28 -05:00
parent 3239add3a8
commit 7a4b2ff0c5
12 changed files with 599 additions and 201 deletions

@ -13,7 +13,6 @@
#include <vtkm/Particle.h>
#include <vtkm/filter/FilterDataSetWithField.h>
#include <vtkm/worklet/ParticleAdvection.h>
namespace vtkm
{
@ -40,6 +39,12 @@ public:
VTKM_CONT
void SetSeeds(vtkm::cont::ArrayHandle<vtkm::Particle>& seeds);
VTKM_CONT
bool GetUseThreadedAlgorithm() { return this->UseThreadedAlgorithm; }
VTKM_CONT
void SetUseThreadedAlgorithm(bool val) { this->UseThreadedAlgorithm = val; }
template <typename DerivedPolicy>
vtkm::cont::PartitionedDataSet PrepareForExecution(
const vtkm::cont::PartitionedDataSet& input,
@ -54,6 +59,7 @@ private:
vtkm::Id NumberOfSteps;
vtkm::FloatDefault StepSize;
vtkm::cont::ArrayHandle<vtkm::Particle> Seeds;
bool UseThreadedAlgorithm;
};
}
} // namespace vtkm::filter

@ -29,6 +29,7 @@ namespace filter
//-----------------------------------------------------------------------------
inline VTKM_CONT ParticleAdvection::ParticleAdvection()
: vtkm::filter::FilterDataSetWithField<ParticleAdvection>()
, UseThreadedAlgorithm(false)
{
}
@ -59,7 +60,18 @@ inline VTKM_CONT vtkm::cont::PartitionedDataSet ParticleAdvection::PrepareForExe
dsi.push_back(DataSetIntegratorType(input.GetPartition(i), blockId, activeField));
}
vtkm::filter::particleadvection::ParticleAdvectionAlgorithm pa(boundsMap, dsi);
using AlgorithmType = vtkm::filter::particleadvection::ParticleAdvectionAlgorithm;
using ThreadedAlgorithmType = vtkm::filter::particleadvection::ParticleAdvectionThreadedAlgorithm;
if (this->UseThreadedAlgorithm)
return vtkm::filter::particleadvection::RunAlgo<ThreadedAlgorithmType>(
boundsMap, dsi, this->NumberOfSteps, this->StepSize, this->Seeds);
else
return vtkm::filter::particleadvection::RunAlgo<AlgorithmType>(
boundsMap, dsi, this->NumberOfSteps, this->StepSize, this->Seeds);
// vtkm::filter::particleadvection::ParticleAdvectionAlgorithm pa(boundsMap, dsi);
vtkm::filter::particleadvection::ParticleAdvectionThreadedAlgorithm pa(boundsMap, dsi);
pa.SetNumberOfSteps(this->NumberOfSteps);
pa.SetStepSize(this->StepSize);
pa.SetSeeds(this->Seeds);

@ -12,7 +12,6 @@
#define vtk_m_filter_Streamline_h
#include <vtkm/filter/FilterDataSetWithField.h>
#include <vtkm/worklet/ParticleAdvection.h>
namespace vtkm
{
@ -39,6 +38,12 @@ public:
VTKM_CONT
void SetSeeds(vtkm::cont::ArrayHandle<vtkm::Particle>& seeds);
VTKM_CONT
bool GetUseThreadedAlgorithm() { return this->UseThreadedAlgorithm; }
VTKM_CONT
void SetUseThreadedAlgorithm(bool val) { this->UseThreadedAlgorithm = val; }
template <typename DerivedPolicy>
vtkm::cont::PartitionedDataSet PrepareForExecution(
const vtkm::cont::PartitionedDataSet& input,
@ -53,7 +58,7 @@ private:
vtkm::Id NumberOfSteps;
vtkm::FloatDefault StepSize;
vtkm::cont::ArrayHandle<vtkm::Particle> Seeds;
vtkm::worklet::Streamline Worklet;
bool UseThreadedAlgorithm;
};
}
} // namespace vtkm::filter

@ -29,7 +29,7 @@ namespace filter
//-----------------------------------------------------------------------------
inline VTKM_CONT Streamline::Streamline()
: vtkm::filter::FilterDataSetWithField<Streamline>()
, Worklet()
, UseThreadedAlgorithm(false)
{
}
@ -59,14 +59,15 @@ inline VTKM_CONT vtkm::cont::PartitionedDataSet Streamline::PrepareForExecution(
dsi.push_back(DataSetIntegratorType(input.GetPartition(i), blockId, activeField));
}
vtkm::filter::particleadvection::StreamlineAlgorithm sa(boundsMap, dsi);
sa.SetNumberOfSteps(this->NumberOfSteps);
sa.SetStepSize(this->StepSize);
sa.SetSeeds(this->Seeds);
using AlgorithmType = vtkm::filter::particleadvection::StreamlineAlgorithm;
using ThreadedAlgorithmType = vtkm::filter::particleadvection::StreamlineThreadedAlgorithm;
sa.Go();
vtkm::cont::PartitionedDataSet output = sa.GetOutput();
return output;
if (this->UseThreadedAlgorithm)
return vtkm::filter::particleadvection::RunAlgo<ThreadedAlgorithmType>(
boundsMap, dsi, this->NumberOfSteps, this->StepSize, this->Seeds);
else
return vtkm::filter::particleadvection::RunAlgo<AlgorithmType>(
boundsMap, dsi, this->NumberOfSteps, this->StepSize, this->Seeds);
}
//-----------------------------------------------------------------------------

@ -13,6 +13,7 @@
#include <vtkm/cont/ArrayHandleTransform.h>
#include <vtkm/cont/DataSet.h>
#include <vtkm/worklet/ParticleAdvection.h>
#include <vtkm/worklet/particleadvection/RK4Integrator.h>
#include <memory>
@ -35,6 +36,7 @@ class VTKM_ALWAYS_EXPORT DataSetIntegrator
public:
DataSetIntegrator(const vtkm::cont::DataSet& ds, vtkm::Id id, std::string fieldNm)
: ActiveField(ds.GetField(fieldNm))
, CopySeedArray(false)
, Eval(nullptr)
, ID(id)
{
@ -50,6 +52,7 @@ public:
}
vtkm::Id GetID() const { return this->ID; }
void SetCopySeedFlag(bool val) { this->CopySeedArray = val; }
template <typename ResultType>
void Advect(std::vector<vtkm::Particle>& v,
@ -57,7 +60,8 @@ public:
vtkm::Id maxSteps,
ResultType& result) const
{
auto seedArray = vtkm::cont::make_ArrayHandle(v, vtkm::CopyFlag::Off);
auto copyFlag = (this->CopySeedArray ? vtkm::CopyFlag::On : vtkm::CopyFlag::Off);
auto seedArray = vtkm::cont::make_ArrayHandle(v, copyFlag);
RK4Type rk4(*this->Eval, stepSize);
this->DoAdvect(seedArray, rk4, maxSteps, result);
}
@ -70,6 +74,7 @@ private:
ResultType& result) const;
vtkm::cont::Field ActiveField;
bool CopySeedArray;
std::shared_ptr<GridEvalType> Eval;
vtkm::Id ID;
};

@ -22,14 +22,35 @@
#include <vtkm/filter/particleadvection/ParticleMessenger.h>
#include <map>
#include <thread>
#include <vector>
//TODO:
// fix inheritance?
// streamlines...
namespace vtkm
{
namespace filter
{
namespace particleadvection
{
using DataSetIntegratorType = vtkm::filter::particleadvection::DataSetIntegrator;
template <typename AlgorithmType>
vtkm::cont::PartitionedDataSet RunAlgo(const vtkm::filter::particleadvection::BoundsMap& boundsMap,
const std::vector<DataSetIntegratorType>& dsi,
vtkm::Id numSteps,
vtkm::FloatDefault stepSize,
const vtkm::cont::ArrayHandle<vtkm::Particle>& seeds)
{
AlgorithmType algo(boundsMap, dsi);
algo.SetNumberOfSteps(numSteps);
algo.SetStepSize(stepSize);
algo.SetSeeds(seeds);
algo.Go();
return algo.GetOutput();
}
//
// Base class for particle advector
//
@ -51,27 +72,45 @@ public:
{
}
//Initialize ParticleAdvectorBase
void SetStepSize(vtkm::FloatDefault stepSize) { this->StepSize = stepSize; }
void SetNumberOfSteps(vtkm::Id numSteps) { this->NumberOfSteps = numSteps; }
virtual void SetSeeds(const vtkm::cont::ArrayHandle<vtkm::Particle>& seeds) = 0;
//Virtuals for advection and getting output.
virtual void Go() = 0;
virtual vtkm::cont::PartitionedDataSet GetOutput() = 0;
protected:
virtual bool GetActiveParticles(std::vector<vtkm::Particle>& particles) = 0;
inline vtkm::Id ComputeTotalNumParticles(vtkm::Id numLocal) const;
inline const DataSetIntegratorType& GetDataSet(vtkm::Id id); // const;
inline void UpdateResultParticle(
vtkm::Particle& p,
std::vector<vtkm::Particle>& I,
std::vector<vtkm::Particle>& T,
std::vector<vtkm::Particle>& A,
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& idsMapI,
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& idsMapA) const;
virtual void ClearParticles() = 0;
virtual void SetSeedArray(const std::vector<vtkm::Particle>& particles,
const std::vector<std::vector<vtkm::Id>>& blockIds) = 0;
virtual void Communicate(vtkm::filter::particleadvection::ParticleMessenger& messenger,
vtkm::Id numLocalTerminations,
vtkm::Id& numTermMessages) = 0;
virtual bool GetActiveParticles(std::vector<vtkm::Particle>& particles, vtkm::Id& blockId) = 0;
virtual void UpdateActive(const std::vector<vtkm::Particle>& particles,
const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& idsMap) = 0;
virtual void UpdateInactive(
const std::vector<vtkm::Particle>& particles,
const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& idsMap) = 0;
virtual void UpdateTerminated(const std::vector<vtkm::Particle>& particles, vtkm::Id blockId) = 0;
inline const DataSetIntegratorType& GetDataSet(vtkm::Id id) const;
virtual void StoreResult(const ResultType& vtkmNotUsed(res), vtkm::Id vtkmNotUsed(blockId)) {}
inline vtkm::Id UpdateResult(const ResultType& res, vtkm::Id blockId);
inline void UpdateResult(const ResultType& res,
vtkm::Id blockId,
std::vector<vtkm::Particle>& I,
std::vector<vtkm::Particle>& T,
std::vector<vtkm::Particle>& A);
//Member data
std::vector<DataSetIntegratorType> Blocks;
vtkm::filter::particleadvection::BoundsMap BoundsMap;
vtkmdiy::mpi::communicator Comm = vtkm::cont::EnvironmentTracker::GetCommunicator();
@ -79,7 +118,7 @@ protected:
vtkm::Id NumRanks;
vtkm::Id Rank;
vtkm::FloatDefault StepSize;
std::map<vtkm::Id, std::vector<vtkm::Id>> ParticleBlockIDsMap;
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>> ParticleBlockIDsMap;
};
//
@ -101,20 +140,25 @@ public:
void SetSeeds(const vtkm::cont::ArrayHandle<vtkm::Particle>& seeds) override
{
this->ParticleBlockIDsMap.clear();
this->ClearParticles();
vtkm::Id n = seeds.GetNumberOfValues();
auto portal = seeds.ReadPortal();
std::vector<std::vector<vtkm::Id>> blockIDs;
std::vector<vtkm::Particle> particles;
for (vtkm::Id i = 0; i < n; i++)
{
vtkm::Particle p = portal.Get(i);
std::vector<vtkm::Id> blockIDs = this->BoundsMap.FindBlocks(p.Pos);
if (!blockIDs.empty() && this->BoundsMap.FindRank(blockIDs[0]) == this->Rank)
const vtkm::Particle p = portal.Get(i);
std::vector<vtkm::Id> ids = this->BoundsMap.FindBlocks(p.Pos);
if (!ids.empty() && this->BoundsMap.FindRank(ids[0]) == this->Rank)
{
this->Active.push_back(p);
this->ParticleBlockIDsMap[p.ID] = blockIDs;
particles.push_back(p);
blockIDs.push_back(ids);
}
}
this->SetSeedArray(particles, blockIDs);
}
void Go() override
@ -128,61 +172,65 @@ public:
vtkm::Id N = 0;
while (N < totalNumSeeds)
{
std::vector<vtkm::Particle> v, I, T, A;
vtkm::Id blockId = -1;
if (GetActiveParticles(v))
std::vector<vtkm::Particle> v;
vtkm::Id numTerm = 0, blockId = -1;
if (GetActiveParticles(v, blockId))
{
blockId = this->ParticleBlockIDsMap[v[0].ID][0];
auto& block = this->GetDataSet(blockId);
const auto& block = this->GetDataSet(blockId);
ResultType r;
block.Advect(v, this->StepSize, this->NumberOfSteps, r);
this->UpdateResult(r, blockId, I, T, A);
if (!A.empty())
this->Active.insert(this->Active.end(), A.begin(), A.end());
numTerm = this->UpdateResult(r, blockId);
}
std::vector<vtkm::Particle> incoming;
std::map<vtkm::Id, std::vector<vtkm::Id>> incomingBlockIDsMap;
vtkm::Id myTerm = static_cast<vtkm::Id>(T.size());
vtkm::Id numTermMessages = 0;
messenger.Exchange(
I, this->ParticleBlockIDsMap, myTerm, incoming, incomingBlockIDsMap, numTermMessages);
this->Communicate(messenger, numTerm, numTermMessages);
if (!incoming.empty())
{
this->Active.insert(this->Active.end(), incoming.begin(), incoming.end());
for (const auto& it : incomingBlockIDsMap)
this->ParticleBlockIDsMap[it.first] = it.second;
}
if (!T.empty())
{
auto& it = this->Terminated[blockId];
it.insert(it.end(), T.begin(), T.end());
}
N += (myTerm + numTermMessages);
N += (numTerm + numTermMessages);
if (N > totalNumSeeds)
throw vtkm::cont::ErrorFilterExecution("Particle count error");
}
}
protected:
bool GetActiveParticles(std::vector<vtkm::Particle>& particles) override
void ClearParticles() override
{
this->Active.clear();
this->Inactive.clear();
this->Terminated.clear();
this->ParticleBlockIDsMap.clear();
}
void SetSeedArray(const std::vector<vtkm::Particle>& particles,
const std::vector<std::vector<vtkm::Id>>& blockIds) override
{
VTKM_ASSERT(particles.size() == blockIds.size());
auto pit = particles.begin();
auto bit = blockIds.begin();
while (pit != particles.end() && bit != blockIds.end())
{
this->ParticleBlockIDsMap[pit->ID] = *bit;
pit++;
bit++;
}
this->Active.insert(this->Active.end(), particles.begin(), particles.end());
}
bool GetActiveParticles(std::vector<vtkm::Particle>& particles, vtkm::Id& blockId) override
{
particles.clear();
blockId = -1;
if (this->Active.empty())
return false;
vtkm::Id workingBlockID = this->ParticleBlockIDsMap[this->Active.front().ID][0];
blockId = this->ParticleBlockIDsMap[this->Active.front().ID][0];
auto it = this->Active.begin();
while (it != this->Active.end())
{
auto p = *it;
if (workingBlockID == this->ParticleBlockIDsMap[p.ID][0])
if (blockId == this->ParticleBlockIDsMap[p.ID][0])
{
particles.push_back(p);
it = this->Active.erase(it);
@ -194,10 +242,196 @@ protected:
return !particles.empty();
}
protected:
void Communicate(vtkm::filter::particleadvection::ParticleMessenger& messenger,
vtkm::Id numLocalTerminations,
vtkm::Id& numTermMessages) override
{
std::vector<vtkm::Particle> incoming;
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>> incomingIDs;
numTermMessages = 0;
messenger.Exchange(this->Inactive,
this->ParticleBlockIDsMap,
numLocalTerminations,
incoming,
incomingIDs,
numTermMessages);
this->Inactive.clear();
this->UpdateActive(incoming, incomingIDs);
}
void UpdateActive(const std::vector<vtkm::Particle>& particles,
const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& idsMap) override
{
VTKM_ASSERT(particles.size() == idsMap.size());
if (particles.empty())
return;
this->Active.insert(this->Active.end(), particles.begin(), particles.end());
for (const auto& it : idsMap)
this->ParticleBlockIDsMap[it.first] = it.second;
}
void UpdateInactive(const std::vector<vtkm::Particle>& particles,
const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& idsMap) override
{
VTKM_ASSERT(particles.size() == idsMap.size());
if (particles.empty())
return;
this->Inactive.insert(this->Inactive.end(), particles.begin(), particles.end());
for (const auto& it : idsMap)
this->ParticleBlockIDsMap[it.first] = it.second;
}
void UpdateTerminated(const std::vector<vtkm::Particle>& particles, vtkm::Id blockId) override
{
if (particles.empty())
return;
for (const auto& t : particles)
this->ParticleBlockIDsMap.erase(t.ID);
auto& it = this->Terminated[blockId];
it.insert(it.end(), particles.begin(), particles.end());
}
std::vector<vtkm::Particle> Active;
std::vector<vtkm::Particle> Inactive;
std::map<vtkm::Id, std::vector<vtkm::Particle>> Terminated;
std::unordered_map<vtkm::Id, std::vector<vtkm::Particle>> Terminated;
};
template <typename ResultType>
class VTKM_ALWAYS_EXPORT PABaseThreadedAlgorithm : public PABaseAlgorithm<ResultType>
{
public:
using DataSetIntegratorType = vtkm::filter::particleadvection::DataSetIntegrator;
PABaseThreadedAlgorithm(const vtkm::filter::particleadvection::BoundsMap& bm,
const std::vector<DataSetIntegratorType>& blocks)
: PABaseAlgorithm<ResultType>(bm, blocks)
, Done(false)
{
//For threaded algorithm, the particles go out of scope in the Work method.
//When this happens, they are destructed by the time the Manage thread gets them.
for (auto& block : this->Blocks)
block.SetCopySeedFlag(true);
}
void Go() override
{
vtkm::Id nLocal = static_cast<vtkm::Id>(this->Active.size() + this->Inactive.size());
vtkm::Id totalNumSeeds = this->ComputeTotalNumParticles(nLocal);
std::vector<std::thread> workerThreads;
workerThreads.push_back(std::thread(PABaseThreadedAlgorithm::Worker, this));
this->Manage(totalNumSeeds);
for (auto& t : workerThreads)
t.join();
}
protected:
bool GetActiveParticles(std::vector<vtkm::Particle>& particles, vtkm::Id& blockId) override
{
std::lock_guard<std::mutex> lock(this->Mutex);
return this->PABaseAlgorithm<ResultType>::GetActiveParticles(particles, blockId);
}
void UpdateActive(const std::vector<vtkm::Particle>& particles,
const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& idsMap) override
{
if (!particles.empty())
{
std::lock_guard<std::mutex> lock(this->Mutex);
this->PABaseAlgorithm<ResultType>::UpdateActive(particles, idsMap);
}
}
bool CheckDone()
{
std::lock_guard<std::mutex> lock(this->Mutex);
return this->Done;
}
void SetDone()
{
std::lock_guard<std::mutex> lock(this->Mutex);
this->Done = true;
}
static void Worker(PABaseThreadedAlgorithm* algo) { algo->Work(); }
void Work()
{
while (!this->CheckDone())
{
std::vector<vtkm::Particle> v;
vtkm::Id blockId = -1;
if (this->GetActiveParticles(v, blockId))
{
const auto& block = this->GetDataSet(blockId);
ResultType r;
block.Advect(v, this->StepSize, this->NumberOfSteps, r);
this->UpdateWorkerResult(blockId, r);
}
}
}
void Manage(vtkm::Id totalNumSeeds)
{
vtkm::filter::particleadvection::ParticleMessenger messenger(
this->Comm, this->BoundsMap, 1, 128);
vtkm::Id N = 0;
while (N < totalNumSeeds)
{
std::unordered_map<vtkm::Id, std::vector<ResultType>> workerResults;
this->GetWorkerResults(workerResults);
vtkm::Id numTerm = 0;
for (const auto& it : workerResults)
{
vtkm::Id blockId = it.first;
const auto& results = it.second;
for (const auto& r : results)
numTerm += this->UpdateResult(r, blockId);
}
vtkm::Id numTermMessages = 0;
this->Communicate(messenger, numTerm, numTermMessages);
N += (numTerm + numTermMessages);
if (N > totalNumSeeds)
throw vtkm::cont::ErrorFilterExecution("Particle count error");
}
//Let the workers know that we are done.
this->SetDone();
}
void GetWorkerResults(std::unordered_map<vtkm::Id, std::vector<ResultType>>& results)
{
results.clear();
std::lock_guard<std::mutex> lock(this->Mutex);
if (!this->WorkerResults.empty())
{
results = this->WorkerResults;
this->WorkerResults.clear();
}
}
void UpdateWorkerResult(vtkm::Id blockId, const ResultType& result)
{
std::lock_guard<std::mutex> lock(this->Mutex);
auto& it = this->WorkerResults[blockId];
it.push_back(result);
}
bool Done;
std::mutex Mutex;
std::unordered_map<vtkm::Id, std::vector<ResultType>> WorkerResults;
};
@ -249,6 +483,53 @@ public:
protected:
};
class VTKM_ALWAYS_EXPORT ParticleAdvectionThreadedAlgorithm
: public PABaseThreadedAlgorithm<vtkm::worklet::ParticleAdvectionResult<vtkm::Particle>>
{
using DataSetIntegratorType = vtkm::filter::particleadvection::DataSetIntegrator;
public:
ParticleAdvectionThreadedAlgorithm(const vtkm::filter::particleadvection::BoundsMap& bm,
const std::vector<DataSetIntegratorType>& blocks)
: PABaseThreadedAlgorithm<vtkm::worklet::ParticleAdvectionResult<vtkm::Particle>>(bm, blocks)
{
}
vtkm::cont::PartitionedDataSet GetOutput() override
{
vtkm::cont::PartitionedDataSet output;
for (const auto& it : this->Terminated)
{
if (it.second.empty())
continue;
auto particles = vtkm::cont::make_ArrayHandle(it.second, vtkm::CopyFlag::Off);
vtkm::cont::ArrayHandle<vtkm::Vec3f> pos;
vtkm::cont::ParticleArrayCopy(particles, pos);
vtkm::cont::DataSet ds;
vtkm::cont::CoordinateSystem outCoords("coordinates", pos);
ds.AddCoordinateSystem(outCoords);
//Create vertex cell set
vtkm::Id numPoints = pos.GetNumberOfValues();
vtkm::cont::CellSetSingleType<> cells;
vtkm::cont::ArrayHandleIndex conn(numPoints);
vtkm::cont::ArrayHandle<vtkm::Id> connectivity;
vtkm::cont::ArrayCopy(conn, connectivity);
cells.Fill(numPoints, vtkm::CELL_SHAPE_VERTEX, 1, connectivity);
ds.SetCellSet(cells);
output.AppendPartition(ds);
}
return output;
}
protected:
};
class VTKM_ALWAYS_EXPORT StreamlineAlgorithm
: public PABaseAlgorithm<vtkm::worklet::StreamlineResult<vtkm::Particle>>
@ -262,6 +543,115 @@ public:
{
}
//DAVE dave
vtkm::cont::PartitionedDataSet GetOutput() override
{
vtkm::cont::PartitionedDataSet output;
for (const auto& it : this->Results)
{
std::size_t nResults = it.second.size();
if (nResults == 0)
continue;
vtkm::cont::DataSet ds;
//Easy case with one result.
if (nResults == 1)
{
ds.AddCoordinateSystem(vtkm::cont::CoordinateSystem("coordinates", it.second[0].Positions));
ds.SetCellSet(it.second[0].PolyLines);
}
else
{
//Append all the results into one data set.
vtkm::cont::ArrayHandle<vtkm::Vec3f> appendPts;
std::vector<vtkm::Id> posOffsets(nResults);
const auto& res0 = it.second[0];
vtkm::Id totalNumCells = res0.PolyLines.GetNumberOfCells();
vtkm::Id totalNumPts = res0.Positions.GetNumberOfValues();
posOffsets[0] = 0;
for (std::size_t i = 1; i < nResults; i++)
{
const auto& res = it.second[i];
posOffsets[i] = totalNumPts;
totalNumPts += res.Positions.GetNumberOfValues();
totalNumCells += res.PolyLines.GetNumberOfCells();
}
//Append all the points together.
appendPts.Allocate(totalNumPts);
for (std::size_t i = 0; i < nResults; i++)
{
const auto& res = it.second[i];
// copy all values into appendPts starting at offset.
vtkm::cont::Algorithm::CopySubRange(
res.Positions, 0, res.Positions.GetNumberOfValues(), appendPts, posOffsets[i]);
}
vtkm::cont::CoordinateSystem outputCoords =
vtkm::cont::CoordinateSystem("coordinates", appendPts);
ds.AddCoordinateSystem(outputCoords);
//Create polylines.
std::vector<vtkm::Id> numPtsPerCell(static_cast<std::size_t>(totalNumCells));
std::size_t off = 0;
for (std::size_t i = 0; i < nResults; i++)
{
const auto& res = it.second[i];
vtkm::Id nCells = res.PolyLines.GetNumberOfCells();
for (vtkm::Id j = 0; j < nCells; j++)
numPtsPerCell[off++] = static_cast<vtkm::Id>(res.PolyLines.GetNumberOfPointsInCell(j));
}
auto numPointsPerCellArray =
vtkm::cont::make_ArrayHandle(numPtsPerCell, vtkm::CopyFlag::Off);
vtkm::cont::ArrayHandle<vtkm::Id> cellIndex;
vtkm::Id connectivityLen =
vtkm::cont::Algorithm::ScanExclusive(numPointsPerCellArray, cellIndex);
vtkm::cont::ArrayHandleIndex connCount(connectivityLen);
vtkm::cont::ArrayHandle<vtkm::Id> connectivity;
vtkm::cont::ArrayCopy(connCount, connectivity);
vtkm::cont::ArrayHandle<vtkm::UInt8> cellTypes;
auto polyLineShape = vtkm::cont::make_ArrayHandleConstant<vtkm::UInt8>(
vtkm::CELL_SHAPE_POLY_LINE, totalNumCells);
vtkm::cont::ArrayCopy(polyLineShape, cellTypes);
auto offsets = vtkm::cont::ConvertNumIndicesToOffsets(numPointsPerCellArray);
vtkm::cont::CellSetExplicit<> polyLines;
polyLines.Fill(totalNumPts, cellTypes, connectivity, offsets);
ds.SetCellSet(polyLines);
}
output.AppendPartition(ds);
}
return output;
}
protected:
virtual void StoreResult(const vtkm::worklet::StreamlineResult<vtkm::Particle>& res,
vtkm::Id blockId) override
{
this->Results[blockId].push_back(res);
}
std::map<vtkm::Id, std::vector<vtkm::worklet::StreamlineResult<vtkm::Particle>>> Results;
};
class VTKM_ALWAYS_EXPORT StreamlineThreadedAlgorithm
: public PABaseThreadedAlgorithm<vtkm::worklet::StreamlineResult<vtkm::Particle>>
{
using DataSetIntegratorType = vtkm::filter::particleadvection::DataSetIntegrator;
public:
StreamlineThreadedAlgorithm(const vtkm::filter::particleadvection::BoundsMap& bm,
const std::vector<DataSetIntegratorType>& blocks)
: PABaseThreadedAlgorithm<vtkm::worklet::StreamlineResult<vtkm::Particle>>(bm, blocks)
{
}
vtkm::cont::PartitionedDataSet GetOutput() override
{
vtkm::cont::PartitionedDataSet output;
@ -358,6 +748,8 @@ protected:
std::map<vtkm::Id, std::vector<vtkm::worklet::StreamlineResult<vtkm::Particle>>> Results;
};
}
}
} // namespace vtkm::filter::particleadvection

@ -31,7 +31,7 @@ inline vtkm::Id ParticleAdvectorBase<ResultType>::ComputeTotalNumParticles(vtkm:
template <typename ResultType>
inline const vtkm::filter::particleadvection::DataSetIntegrator&
ParticleAdvectorBase<ResultType>::GetDataSet(vtkm::Id id) const
ParticleAdvectorBase<ResultType>::GetDataSet(vtkm::Id id) // const
{
for (const auto& it : this->Blocks)
if (it.GetID() == id)
@ -40,72 +40,94 @@ ParticleAdvectorBase<ResultType>::GetDataSet(vtkm::Id id) const
}
template <typename ResultType>
inline void ParticleAdvectorBase<ResultType>::UpdateResult(const ResultType& res,
vtkm::Id blockId,
std::vector<vtkm::Particle>& I,
std::vector<vtkm::Particle>& T,
std::vector<vtkm::Particle>& A)
inline void ParticleAdvectorBase<ResultType>::UpdateResultParticle(
vtkm::Particle& p,
std::vector<vtkm::Particle>& I,
std::vector<vtkm::Particle>& T,
std::vector<vtkm::Particle>& A,
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& idsMapI,
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& idsMapA) const
{
if (p.Status.CheckTerminate())
T.push_back(p);
else
{
const auto& it = this->ParticleBlockIDsMap.find(p.ID);
VTKM_ASSERT(it != this->ParticleBlockIDsMap.end());
auto currBIDs = it->second;
VTKM_ASSERT(!currBIDs.empty());
std::vector<vtkm::Id> newIDs;
if (p.Status.CheckSpatialBounds() && !p.Status.CheckTookAnySteps())
newIDs.assign(std::next(currBIDs.begin(), 1), currBIDs.end());
else
newIDs = this->BoundsMap.FindBlocks(p.Pos, currBIDs);
//reset the particle status.
p.Status = vtkm::ParticleStatus();
if (newIDs.empty()) //No blocks, we're done.
{
p.Status.SetTerminate();
T.push_back(p);
}
else
{
//If we have more than blockId, we want to minimize communication
//and put any blocks owned by this rank first.
if (newIDs.size() > 1)
{
for (auto idit = newIDs.begin(); idit != newIDs.end(); idit++)
{
vtkm::Id bid = *idit;
if (this->BoundsMap.FindRank(bid) == this->Rank)
{
newIDs.erase(idit);
newIDs.insert(newIDs.begin(), bid);
break;
}
}
}
int dstRank = this->BoundsMap.FindRank(newIDs[0]);
if (dstRank == this->Rank)
{
A.push_back(p);
idsMapA[p.ID] = newIDs;
}
else
{
I.push_back(p);
idsMapI[p.ID] = newIDs;
}
}
}
}
template <typename ResultType>
inline vtkm::Id ParticleAdvectorBase<ResultType>::UpdateResult(const ResultType& res,
vtkm::Id blockId)
{
vtkm::Id n = res.Particles.GetNumberOfValues();
auto portal = res.Particles.ReadPortal();
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>> idsMapI, idsMapA;
std::vector<vtkm::Particle> I, T, A;
for (vtkm::Id i = 0; i < n; i++)
{
//auto& p = portal.Get(i);
vtkm::Particle p = portal.Get(i);
if (p.Status.CheckTerminate())
{
T.push_back(p);
this->ParticleBlockIDsMap.erase(p.ID);
}
else
{
std::vector<vtkm::Id> currBIDs = this->ParticleBlockIDsMap[p.ID];
VTKM_ASSERT(!currBIDs.empty());
std::vector<vtkm::Id> newIDs;
if (p.Status.CheckSpatialBounds() && !p.Status.CheckTookAnySteps())
newIDs.assign(std::next(currBIDs.begin(), 1), currBIDs.end());
else
newIDs = this->BoundsMap.FindBlocks(p.Pos, currBIDs);
//reset the particle status.
p.Status = vtkm::ParticleStatus();
if (newIDs.empty()) //No blocks, we're done.
{
T.push_back(p);
this->ParticleBlockIDsMap.erase(p.ID);
}
else
{
//If we have more than blockId, we want to minimize communication
//and put any blocks owned by this rank first.
if (newIDs.size() > 1)
{
for (auto it = newIDs.begin(); it != newIDs.end(); it++)
{
vtkm::Id bid = *it;
if (this->BoundsMap.FindRank(bid) == this->Rank)
{
newIDs.erase(it);
newIDs.insert(newIDs.begin(), bid);
break;
}
}
}
int dstRank = this->BoundsMap.FindRank(newIDs[0]);
this->ParticleBlockIDsMap[p.ID] = newIDs;
if (dstRank == this->Rank)
A.push_back(p);
else
I.push_back(p);
}
}
this->UpdateResultParticle(p, I, T, A, idsMapI, idsMapA);
}
vtkm::Id numTerm = static_cast<vtkm::Id>(T.size());
this->UpdateActive(A, idsMapA);
this->UpdateInactive(I, idsMapI);
this->UpdateTerminated(T, blockId);
this->StoreResult(res, blockId);
return numTerm;
}
}
}

@ -69,10 +69,10 @@ std::size_t ParticleMessenger::CalcParticleBufferSize(std::size_t nParticles, st
VTKM_CONT
void ParticleMessenger::SerialExchange(
const std::vector<vtkm::Particle>& outData,
const std::map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
vtkm::Id vtkmNotUsed(numLocalTerm),
std::vector<vtkm::Particle>& inData,
std::map<vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap) const
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap) const
{
for (auto& p : outData)
{
@ -83,12 +83,13 @@ void ParticleMessenger::SerialExchange(
}
VTKM_CONT
void ParticleMessenger::Exchange(const std::vector<vtkm::Particle>& outData,
const std::map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
vtkm::Id numLocalTerm,
std::vector<vtkm::Particle>& inData,
std::map<vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap,
vtkm::Id& numTerminateMessages)
void ParticleMessenger::Exchange(
const std::vector<vtkm::Particle>& outData,
const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
vtkm::Id numLocalTerm,
std::vector<vtkm::Particle>& inData,
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap,
vtkm::Id& numTerminateMessages)
{
numTerminateMessages = 0;
inDataBlockIDsMap.clear();
@ -99,7 +100,7 @@ void ParticleMessenger::Exchange(const std::vector<vtkm::Particle>& outData,
#ifdef VTKM_ENABLE_MPI
//dstRank, vector of (particles,blockIDs)
std::map<int, std::vector<ParticleCommType>> sendData;
std::unordered_map<int, std::vector<ParticleCommType>> sendData;
for (const auto& p : outData)
{

@ -50,10 +50,10 @@ public:
VTKM_CONT ~ParticleMessenger() {}
VTKM_CONT void Exchange(const std::vector<vtkm::Particle>& outData,
const std::map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
vtkm::Id numLocalTerm,
std::vector<vtkm::Particle>& inData,
std::map<vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap,
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap,
vtkm::Id& numTerminateMessages);
protected:
@ -81,7 +81,7 @@ protected:
template <typename, typename>
class Container,
typename Allocator = std::allocator<P>>
inline void SendParticles(const std::map<int, Container<P, Allocator>>& m);
inline void SendParticles(const std::unordered_map<int, Container<P, Allocator>>& m);
// Send/Recv messages.
VTKM_CONT void SendMsg(int dst, const std::vector<int>& msg);
@ -96,11 +96,12 @@ protected:
#endif
VTKM_CONT void SerialExchange(const std::vector<vtkm::Particle>& outData,
const std::map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
vtkm::Id numLocalTerm,
std::vector<vtkm::Particle>& inData,
std::map<vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap) const;
VTKM_CONT void SerialExchange(
const std::vector<vtkm::Particle>& outData,
const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
vtkm::Id numLocalTerm,
std::vector<vtkm::Particle>& inData,
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap) const;
static std::size_t CalcParticleBufferSize(std::size_t nParticles, std::size_t numBlockIds = 2);
};
@ -127,7 +128,8 @@ inline void ParticleMessenger::SendParticles(int dst, const Container<P, Allocat
VTKM_CONT
template <typename P, template <typename, typename> class Container, typename Allocator>
inline void ParticleMessenger::SendParticles(const std::map<int, Container<P, Allocator>>& m)
inline void ParticleMessenger::SendParticles(
const std::unordered_map<int, Container<P, Allocator>>& m)
{
for (auto mit = m.begin(); mit != m.end(); mit++)
if (!mit->second.empty())

@ -9,64 +9,7 @@
##============================================================================
set(unit_tests
UnitTestCellAverageFilter.cxx
UnitTestCellMeasuresFilter.cxx
UnitTestCellSetConnectivityFilter.cxx
UnitTestCleanGrid.cxx
UnitTestClipWithFieldFilter.cxx
UnitTestClipWithImplicitFunctionFilter.cxx
UnitTestContourFilter.cxx
UnitTestContourFilterNormals.cxx
UnitTestContourTreeUniformFilter.cxx
UnitTestContourTreeUniformAugmentedFilter.cxx
UnitTestCoordinateSystemTransform.cxx
UnitTestCrossProductFilter.cxx
UnitTestDotProductFilter.cxx
UnitTestEntropyFilter.cxx
UnitTestExternalFacesFilter.cxx
UnitTestExtractGeometryFilter.cxx
UnitTestExtractPointsFilter.cxx
UnitTestExtractStructuredFilter.cxx
UnitTestFieldMetadata.cxx
UnitTestFieldSelection.cxx
UnitTestFieldToColors.cxx
UnitTestGradientExplicit.cxx
UnitTestGradientUniform.cxx
UnitTestGhostCellClassify.cxx
UnitTestGhostCellRemove.cxx
UnitTestHistogramFilter.cxx
UnitTestImageConnectivityFilter.cxx
UnitTestImageMedianFilter.cxx
UnitTestLagrangianFilter.cxx
UnitTestLagrangianStructuresFilter.cxx
UnitTestMapFieldMergeAverage.cxx
UnitTestMapFieldPermutation.cxx
UnitTestMaskFilter.cxx
UnitTestMaskPointsFilter.cxx
UnitTestMeshQualityFilter.cxx
UnitTestNDEntropyFilter.cxx
UnitTestNDHistogramFilter.cxx
UnitTestParticleDensity.cxx
UnitTestPartitionedDataSetFilters.cxx
UnitTestPartitionedDataSetHistogramFilter.cxx
UnitTestPointAverageFilter.cxx
UnitTestPointElevationFilter.cxx
UnitTestPointTransform.cxx
UnitTestProbe.cxx
UnitTestSplitSharpEdgesFilter.cxx
UnitTestStreamlineFilter.cxx
UnitTestStreamSurfaceFilter.cxx
UnitTestSurfaceNormalsFilter.cxx
UnitTestTetrahedralizeFilter.cxx
UnitTestThresholdFilter.cxx
UnitTestThresholdPointsFilter.cxx
UnitTestTriangulateFilter.cxx
UnitTestTubeFilter.cxx
UnitTestVectorMagnitudeFilter.cxx
UnitTestVertexClusteringFilter.cxx
UnitTestWarpScalarFilter.cxx
UnitTestWarpVectorFilter.cxx
UnitTestZFP.cxx
)
vtkm_unit_tests(

@ -508,7 +508,10 @@ void TestStreamlineFilters()
{
for (auto useGhost : flags)
for (auto useSL : flags)
{
useSL = false;
TestPartitionedDataSet(n, useGhost, useSL);
}
}
TestStreamline();

@ -36,7 +36,7 @@ void AddVectorFields(vtkm::cont::PartitionedDataSet& pds,
ds.AddPointField(fieldName, CreateConstantVectorField(ds.GetNumberOfPoints(), vec));
}
void TestAMRStreamline(bool useSL)
void TestAMRStreamline(bool useSL, bool useThreaded)
{
auto comm = vtkm::cont::EnvironmentTracker::GetCommunicator();
if (comm.size() < 2)
@ -105,6 +105,7 @@ void TestAMRStreamline(bool useSL)
if (useSL)
{
vtkm::filter::Streamline filter;
filter.SetUseThreadedAlgorithm(useThreaded);
filter.SetStepSize(0.1f);
filter.SetNumberOfSteps(100000);
filter.SetSeeds(seedArray);
@ -187,6 +188,7 @@ void TestAMRStreamline(bool useSL)
else
{
vtkm::filter::ParticleAdvection filter;
filter.SetUseThreadedAlgorithm(useThreaded);
filter.SetStepSize(0.1f);
filter.SetNumberOfSteps(100000);
filter.SetSeeds(seedArray);
@ -219,7 +221,7 @@ void TestAMRStreamline(bool useSL)
}
}
void TestPartitionedDataSet(vtkm::Id nPerRank, bool useGhost, bool useSL)
void TestPartitionedDataSet(vtkm::Id nPerRank, bool useGhost, bool useSL, bool useThreaded)
{
auto comm = vtkm::cont::EnvironmentTracker::GetCommunicator();
@ -268,6 +270,7 @@ void TestPartitionedDataSet(vtkm::Id nPerRank, bool useGhost, bool useSL)
{
vtkm::filter::Streamline streamline;
streamline.SetUseThreadedAlgorithm(useThreaded);
streamline.SetStepSize(0.1f);
streamline.SetNumberOfSteps(100000);
streamline.SetSeeds(seedArray);
@ -313,6 +316,7 @@ void TestPartitionedDataSet(vtkm::Id nPerRank, bool useGhost, bool useSL)
{
vtkm::filter::ParticleAdvection particleAdvection;
particleAdvection.SetUseThreadedAlgorithm(useThreaded);
particleAdvection.SetStepSize(0.1f);
particleAdvection.SetNumberOfSteps(100000);
particleAdvection.SetSeeds(seedArray);
@ -356,11 +360,13 @@ void TestStreamlineFiltersMPI()
{
for (auto useGhost : flags)
for (auto useSL : flags)
TestPartitionedDataSet(n, useGhost, useSL);
for (auto useThreaded : flags)
TestPartitionedDataSet(n, useGhost, useSL, useThreaded);
}
for (auto useSL : flags)
TestAMRStreamline(useSL);
for (auto useThreaded : flags)
TestAMRStreamline(useSL, useThreaded);
}
}