Fixes to block duplication code.

This commit is contained in:
Dave Pugmire 2023-04-14 10:50:30 -04:00
parent d1e68ace91
commit eb3c91494e
4 changed files with 192 additions and 194 deletions

@ -107,7 +107,6 @@ public:
vtkm::Id numTerm = 0, blockId = -1;
if (this->GetActiveParticles(v, blockId))
{
//std::cout<<" RANK= "<<this->Rank<<" *****Advect: "<<v[0].GetID()<<" BID= "<<blockId<<std::endl;
//make this a pointer to avoid the copy?
auto& block = this->GetDataSet(blockId);
DSIHelperInfoType bb =
@ -165,6 +164,7 @@ public:
bit++;
}
//Update Active
this->Active.insert(this->Active.end(), particles.begin(), particles.end());
}
@ -230,6 +230,8 @@ public:
outgoing.reserve(this->Inactive.size());
outgoingRanks.reserve(this->Inactive.size());
std::vector<ParticleType> particlesStaying;
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>> particlesStayingBlockIDs;
//Send out Everything.
for (const auto& p : this->Inactive)
{
@ -238,12 +240,14 @@ public:
auto ranks = this->BoundsMap.FindRank(bid[0]);
VTKM_ASSERT(!ranks.empty());
//std::cout<<"********************** GetOutgoing: "<<bid[0]<<" r= "<<ranks.size()<<std::endl;
if (ranks.size() == 1)
{
if (ranks[0] == this->Rank)
this->Active.emplace_back(p);
{
particlesStaying.emplace_back(p);
particlesStayingBlockIDs[p.GetID()] = this->ParticleBlockIDsMap[p.GetID()];
}
else
{
outgoing.emplace_back(p);
@ -256,9 +260,11 @@ public:
//Random selection:
vtkm::Id outRank = std::rand() % ranks.size();
//std::cout<<"******* CHOICES: "<<ranks.size()<<" --> "<<outRank<<std::endl;
if (outRank == this->Rank)
this->Active.emplace_back(p);
{
particlesStayingBlockIDs[p.GetID()] = this->ParticleBlockIDsMap[p.GetID()];
particlesStaying.emplace_back(p);
}
else
{
outgoing.emplace_back(p);
@ -269,6 +275,10 @@ public:
this->Inactive.clear();
VTKM_ASSERT(outgoing.size() == outgoingRanks.size());
VTKM_ASSERT(particlesStaying.size() == particlesStayingBlockIDs.size());
if (!particlesStaying.empty())
this->UpdateActive(particlesStaying, particlesStayingBlockIDs);
}
virtual void UpdateActive(const std::vector<ParticleType>& particles,

@ -138,6 +138,7 @@ protected:
while (this->TotalNumTerminatedParticles < this->TotalNumParticles)
{
std::unordered_map<vtkm::Id, std::vector<DSIHelperInfoType>> workerResults;
this->GetWorkerResults(workerResults);
@ -145,7 +146,9 @@ protected:
for (auto& it : workerResults)
{
for (auto& r : it.second)
{
numTerm += this->UpdateResult(r.Get<DSIHelperInfo<ParticleType>>());
}
}
vtkm::Id numTermMessages = 0;

@ -44,6 +44,7 @@ void printVec(const std::vector<T>& v)
}
std::cout << "]";
}
namespace vtkm
{
namespace filter

@ -45,6 +45,60 @@ void AddVectorFields(vtkm::cont::PartitionedDataSet& pds,
ds.AddPointField(fieldName, CreateConstantVectorField(ds.GetNumberOfPoints(), vec));
}
std::vector<vtkm::cont::PartitionedDataSet> CreateAllDataSetBounds(vtkm::Id nPerRank, bool useGhost)
{
auto comm = vtkm::cont::EnvironmentTracker::GetCommunicator();
vtkm::Id totNumBlocks = nPerRank * comm.size();
vtkm::Id numDims = 5;
vtkm::FloatDefault x0 = 0;
vtkm::FloatDefault x1 = x0 + static_cast<vtkm::FloatDefault>(numDims - 1);
vtkm::FloatDefault dx = x1 - x0;
vtkm::FloatDefault y0 = 0, y1 = numDims - 1, z0 = 0, z1 = numDims - 1;
if (useGhost)
{
numDims = numDims + 2; //add 1 extra on each side
x0 = x0 - 1;
x1 = x1 + 1;
dx = x1 - x0 - 2;
y0 = y0 - 1;
y1 = y1 + 1;
z0 = z0 - 1;
z1 = z1 + 1;
}
//Create ALL of the blocks.
std::vector<vtkm::Bounds> bounds;
for (vtkm::Id i = 0; i < totNumBlocks; i++)
{
bounds.push_back(vtkm::Bounds(x0, x1, y0, y1, z0, z1));
x0 += dx;
x1 += dx;
}
const vtkm::Id3 dims(numDims, numDims, numDims);
auto allPDS = vtkm::worklet::testing::CreateAllDataSets(bounds, dims, useGhost);
return allPDS;
}
std::vector<vtkm::Range> ExtractMaxXRanges(const vtkm::cont::PartitionedDataSet& pds, bool useGhost)
{
std::vector<vtkm::Range> xMaxRanges;
for (const auto& ds : pds.GetPartitions())
{
auto bounds = ds.GetCoordinateSystem().GetBounds();
auto xMax = bounds.X.Max;
if (useGhost)
xMax = xMax - 1;
xMaxRanges.push_back(vtkm::Range(xMax, xMax + static_cast<vtkm::FloatDefault>(.5)));
}
return xMaxRanges;
}
template <typename FilterType>
void SetFilter(FilterType& filter,
vtkm::FloatDefault stepSize,
@ -53,7 +107,7 @@ void SetFilter(FilterType& filter,
vtkm::cont::ArrayHandle<vtkm::Particle> seedArray,
bool useThreaded,
bool useBlockIds,
vtkm::Id nPerRank)
const std::vector<vtkm::Id>& blockIds)
{
filter.SetStepSize(stepSize);
filter.SetNumberOfSteps(numSteps);
@ -62,18 +116,10 @@ void SetFilter(FilterType& filter,
filter.SetUseThreadedAlgorithm(useThreaded);
if (useBlockIds)
{
auto comm = vtkm::cont::EnvironmentTracker::GetCommunicator();
std::vector<vtkm::Id> blockIds;
for (vtkm::Id i = 0; i < nPerRank; i++)
blockIds.push_back(comm.rank() * nPerRank + i);
filter.SetBlockIDs(blockIds);
}
}
void TestAMRStreamline(FilterType fType, bool useThreaded, bool useBlockIds, vtkm::Id nPerRank)
void TestAMRStreamline(FilterType fType, bool useThreaded)
{
switch (fType)
{
@ -165,15 +211,13 @@ void TestAMRStreamline(FilterType fType, bool useThreaded, bool useBlockIds, vtk
if (fType == STREAMLINE)
{
vtkm::filter::flow::Streamline streamline;
SetFilter(
streamline, stepSize, numSteps, fieldName, seedArray, useThreaded, useBlockIds, nPerRank);
SetFilter(streamline, stepSize, numSteps, fieldName, seedArray, useThreaded, false, {});
out = streamline.Execute(pds);
}
else if (fType == PATHLINE)
{
vtkm::filter::flow::Pathline pathline;
SetFilter(
pathline, stepSize, numSteps, fieldName, seedArray, useThreaded, useBlockIds, nPerRank);
SetFilter(pathline, stepSize, numSteps, fieldName, seedArray, useThreaded, false, {});
//Create timestep 2
auto pds2 = vtkm::cont::PartitionedDataSet(pds);
pathline.SetPreviousTime(0);
@ -292,17 +336,21 @@ void TestAMRStreamline(FilterType fType, bool useThreaded, bool useBlockIds, vtk
void ValidateOutput(const vtkm::cont::DataSet& out,
vtkm::Id numSeeds,
vtkm::Range xMaxRange,
FilterType fType)
const vtkm::Range& xMaxRange,
FilterType fType,
bool checkEndPoint,
bool blockDuplication)
{
//Validate the result is correct.
VTKM_TEST_ASSERT(out.GetNumberOfCoordinateSystems() == 1,
"Wrong number of coordinate systems in the output dataset");
vtkm::cont::UnknownCellSet dcells = out.GetCellSet();
//out.PrintSummary(std::cout);
//std::cout << " nSeeds= " << numSeeds << std::endl;
VTKM_TEST_ASSERT(dcells.GetNumberOfCells() == numSeeds, "Wrong number of cells");
vtkm::Id numCells = out.GetNumberOfCells();
if (!blockDuplication)
VTKM_TEST_ASSERT(numCells == numSeeds, "Wrong number of cells");
auto coords = out.GetCoordinateSystem().GetDataAsMultiplexer();
auto ptPortal = coords.ReadPortal();
@ -311,109 +359,26 @@ void ValidateOutput(const vtkm::cont::DataSet& out,
vtkm::cont::CellSetExplicit<> explicitCells;
VTKM_TEST_ASSERT(dcells.IsType<vtkm::cont::CellSetExplicit<>>(), "Wrong cell type.");
explicitCells = dcells.AsCellSet<vtkm::cont::CellSetExplicit<>>();
for (vtkm::Id j = 0; j < numSeeds; j++)
for (vtkm::Id j = 0; j < numCells; j++)
{
vtkm::cont::ArrayHandle<vtkm::Id> indices;
explicitCells.GetIndices(j, indices);
vtkm::Id nPts = indices.GetNumberOfValues();
auto iPortal = indices.ReadPortal();
vtkm::Vec3f lastPt = ptPortal.Get(iPortal.Get(nPts - 1));
VTKM_TEST_ASSERT(xMaxRange.Contains(lastPt[0]), "Wrong end point for seed");
if (checkEndPoint)
VTKM_TEST_ASSERT(xMaxRange.Contains(lastPt[0]), "Wrong end point for seed");
}
}
else if (fType == PARTICLE_ADVECTION)
{
VTKM_TEST_ASSERT(out.GetNumberOfPoints() == numSeeds, "Wrong number of coordinates");
for (vtkm::Id i = 0; i < numSeeds; i++)
VTKM_TEST_ASSERT(xMaxRange.Contains(ptPortal.Get(i)[0]), "Wrong end point for seed");
}
}
void TestDuplicatedBlocks(vtkm::Id nPerRank)
{
auto comm = vtkm::cont::EnvironmentTracker::GetCommunicator();
vtkm::Id numDims = 5;
vtkm::FloatDefault x0 = 0;
vtkm::FloatDefault x1 = x0 + static_cast<vtkm::FloatDefault>(numDims - 1);
vtkm::FloatDefault dx = x1 - x0;
vtkm::FloatDefault y0 = 0, y1 = numDims - 1, z0 = 0, z1 = numDims - 1;
//Create all of the blocks.
std::vector<vtkm::Bounds> bounds;
for (vtkm::Id i = 0; i < nPerRank * comm.size(); i++)
{
bounds.push_back(vtkm::Bounds(x0, x1, y0, y1, z0, z1));
if (comm.rank() == 0)
if (!blockDuplication)
VTKM_TEST_ASSERT(out.GetNumberOfPoints() == numSeeds, "Wrong number of coordinates");
if (checkEndPoint)
{
std::cout << __FILE__ << " " << __LINE__ << std::endl;
std::cout << i << " :: (" << x0 << ", " << x1 << ")" << std::endl;
for (vtkm::Id i = 0; i < numCells; i++)
VTKM_TEST_ASSERT(xMaxRange.Contains(ptPortal.Get(i)[0]), "Wrong end point for seed");
}
x0 += dx;
x1 += dx;
}
if (comm.rank() == 0)
{
std::cout << __FILE__ << " " << __LINE__ << std::endl;
std::cout << "Bounds: " << std::endl;
int i = 0;
for (const auto& b : bounds)
{
std::cout << " " << i << " " << b << std::endl;
i++;
}
}
comm.barrier();
const vtkm::Id3 dims(numDims, numDims, numDims);
auto allPDS = vtkm::worklet::testing::CreateAllDataSets(bounds, dims, false);
vtkm::Vec3f vecX(1, 0, 0);
std::string fieldName = "vec";
vtkm::FloatDefault stepSize = 0.1f;
vtkm::Id numSteps = 100000;
for (std::size_t n = 0; n < allPDS.size(); n++)
{
auto pds = allPDS[n];
AddVectorFields(pds, fieldName, vecX);
vtkm::cont::ArrayHandle<vtkm::Particle> seedArray;
seedArray = vtkm::cont::make_ArrayHandle({ vtkm::Particle(vtkm::Vec3f(.2f, 1.0f, .2f), 0) });
// vtkm::Particle(vtkm::Vec3f(.2f, 2.0f, .2f), 1) });
vtkm::Id numSeeds = seedArray.GetNumberOfValues();
vtkm::filter::flow::ParticleAdvection particleAdvection;
SetFilter(particleAdvection, stepSize, numSteps, fieldName, seedArray, false, false, nPerRank);
std::vector<vtkm::Id> blockIds;
if (comm.rank() == 0)
blockIds = { 0, 1, 2, 3 };
else
blockIds = { 1, 2, 3 };
vtkm::cont::PartitionedDataSet input;
for (const auto& bid : blockIds)
input.AppendPartition(pds.GetPartition(bid));
particleAdvection.SetBlockIDs(blockIds);
if (comm.rank() == 0)
{
std::cout << __FILE__ << " " << __LINE__ << std::endl;
std::cout << "all blocks: " << pds.GetNumberOfPartitions() << std::endl;
}
for (int r = 0; r < comm.size(); r++)
{
if (r == comm.rank())
std::cout << r << ": my blocks: " << input.GetNumberOfPartitions() << std::endl;
comm.barrier();
}
auto out = particleAdvection.Execute(input);
if (comm.rank() == comm.size() - 1)
out.PrintSummary(std::cout);
return;
}
}
@ -421,70 +386,61 @@ void TestPartitionedDataSet(vtkm::Id nPerRank,
bool useGhost,
FilterType fType,
bool useThreaded,
bool useBlockIds)
bool useBlockIds,
bool duplicateBlocks)
{
switch (fType)
{
case PARTICLE_ADVECTION:
std::cout << "Particle advection";
break;
case STREAMLINE:
std::cout << "Streamline";
break;
case PATHLINE:
std::cout << "Pathline";
break;
}
if (useGhost)
std::cout << " - using ghost cells";
if (useThreaded)
std::cout << " - using threaded";
std::cout << " - on a partitioned data set" << std::endl;
auto comm = vtkm::cont::EnvironmentTracker::GetCommunicator();
vtkm::Id numDims = 5;
vtkm::FloatDefault x0 = static_cast<vtkm::FloatDefault>((numDims - 1) * (nPerRank * comm.rank()));
vtkm::FloatDefault x1 = x0 + static_cast<vtkm::FloatDefault>(numDims - 1);
vtkm::FloatDefault dx = x1 - x0;
vtkm::FloatDefault y0 = 0, y1 = numDims - 1, z0 = 0, z1 = numDims - 1;
if (useGhost)
if (comm.rank() == 0)
{
numDims = numDims + 2; //add 1 extra on each side
x0 = x0 - 1;
x1 = x1 + 1;
dx = x1 - x0 - 2;
y0 = y0 - 1;
y1 = y1 + 1;
z0 = z0 - 1;
z1 = z1 + 1;
}
vtkm::FloatDefault time0 = 0;
vtkm::FloatDefault time1 = (numDims - 1) * (nPerRank * comm.size());
std::vector<vtkm::Bounds> bounds;
for (vtkm::Id i = 0; i < nPerRank; i++)
{
bounds.push_back(vtkm::Bounds(x0, x1, y0, y1, z0, z1));
x0 += dx;
x1 += dx;
}
std::vector<vtkm::Range> xMaxRanges;
for (const auto& b : bounds)
{
auto xMax = b.X.Max;
switch (fType)
{
case PARTICLE_ADVECTION:
std::cout << "Particle advection";
break;
case STREAMLINE:
std::cout << "Streamline";
break;
case PATHLINE:
std::cout << "Pathline";
break;
}
std::cout << " blocksPerRank= " << nPerRank;
if (useGhost)
xMax = xMax - 1;
xMaxRanges.push_back(vtkm::Range(xMax, xMax + static_cast<vtkm::FloatDefault>(.5)));
std::cout << " - using ghost cells";
if (useThreaded)
std::cout << " - using threaded";
if (useBlockIds)
std::cout << " - using block IDs";
if (duplicateBlocks)
std::cout << " - with duplicate blocks";
std::cout << " - on a partitioned data set" << std::endl;
}
std::vector<vtkm::Id> blockIds;
//Uniform assignment.
for (vtkm::Id i = 0; i < nPerRank; i++)
blockIds.push_back(comm.rank() * nPerRank + i);
//For block duplication, give everyone the 2nd to last block.
//We want to keep the last block on the last rank for validation.
if (duplicateBlocks && blockIds.size() > 1)
{
vtkm::Id totNumBlocks = comm.size() * nPerRank;
vtkm::Id dupBlock = totNumBlocks - 2;
for (int r = 0; r < comm.size(); r++)
{
if (std::find(blockIds.begin(), blockIds.end(), dupBlock) == blockIds.end())
blockIds.push_back(dupBlock);
}
}
std::vector<vtkm::cont::PartitionedDataSet> allPDS, allPDS2;
const vtkm::Id3 dims(numDims, numDims, numDims);
allPDS = vtkm::worklet::testing::CreateAllDataSets(bounds, dims, useGhost);
allPDS2 = vtkm::worklet::testing::CreateAllDataSets(bounds, dims, useGhost);
allPDS = CreateAllDataSetBounds(nPerRank, useGhost);
allPDS2 = CreateAllDataSetBounds(nPerRank, useGhost);
auto xMaxRanges = ExtractMaxXRanges(allPDS[0], useGhost);
vtkm::FloatDefault time0 = 0;
vtkm::FloatDefault time1 = xMaxRanges[xMaxRanges.size() - 1].Max;
vtkm::Vec3f vecX(1, 0, 0);
std::string fieldName = "vec";
@ -492,7 +448,9 @@ void TestPartitionedDataSet(vtkm::Id nPerRank,
vtkm::Id numSteps = 100000;
for (std::size_t n = 0; n < allPDS.size(); n++)
{
auto pds = allPDS[n];
vtkm::cont::PartitionedDataSet pds;
for (const auto& bid : blockIds)
pds.AppendPartition(allPDS[n].GetPartition(bid));
AddVectorFields(pds, fieldName, vecX);
vtkm::cont::ArrayHandle<vtkm::Particle> seedArray;
@ -504,11 +462,20 @@ void TestPartitionedDataSet(vtkm::Id nPerRank,
{
vtkm::filter::flow::Streamline streamline;
SetFilter(
streamline, stepSize, numSteps, fieldName, seedArray, useThreaded, useBlockIds, nPerRank);
streamline, stepSize, numSteps, fieldName, seedArray, useThreaded, useBlockIds, blockIds);
auto out = streamline.Execute(pds);
for (vtkm::Id i = 0; i < nPerRank; i++)
ValidateOutput(out.GetPartition(i), numSeeds, xMaxRanges[i], fType);
vtkm::Id numOutputs = out.GetNumberOfPartitions();
bool checkEnds = numOutputs == static_cast<vtkm::Id>(blockIds.size());
for (vtkm::Id i = 0; i < numOutputs; i++)
{
ValidateOutput(out.GetPartition(i),
numSeeds,
xMaxRanges[blockIds[i]],
fType,
checkEnds,
duplicateBlocks);
}
}
else if (fType == PARTICLE_ADVECTION)
{
@ -520,65 +487,82 @@ void TestPartitionedDataSet(vtkm::Id nPerRank,
seedArray,
useThreaded,
useBlockIds,
nPerRank);
blockIds);
auto out = particleAdvection.Execute(pds);
//Particles end up in last rank.
if (comm.rank() == comm.size() - 1)
{
bool checkEnds = out.GetNumberOfPartitions() == static_cast<vtkm::Id>(blockIds.size());
VTKM_TEST_ASSERT(out.GetNumberOfPartitions() == 1, "Wrong number of partitions in output");
ValidateOutput(out.GetPartition(0), numSeeds, xMaxRanges[xMaxRanges.size() - 1], fType);
ValidateOutput(out.GetPartition(0),
numSeeds,
xMaxRanges[xMaxRanges.size() - 1],
fType,
checkEnds,
duplicateBlocks);
}
else
VTKM_TEST_ASSERT(out.GetNumberOfPartitions() == 0, "Wrong number of partitions in output");
}
else if (fType == PATHLINE)
{
auto pds2 = allPDS2[n];
vtkm::cont::PartitionedDataSet pds2;
for (const auto& bid : blockIds)
pds2.AppendPartition(allPDS2[n].GetPartition(bid));
AddVectorFields(pds2, fieldName, vecX);
vtkm::filter::flow::Pathline pathline;
SetFilter(
pathline, stepSize, numSteps, fieldName, seedArray, useThreaded, useBlockIds, nPerRank);
pathline, stepSize, numSteps, fieldName, seedArray, useThreaded, useBlockIds, blockIds);
pathline.SetPreviousTime(time0);
pathline.SetNextTime(time1);
pathline.SetNextDataSet(pds2);
auto out = pathline.Execute(pds);
for (vtkm::Id i = 0; i < nPerRank; i++)
ValidateOutput(out.GetPartition(i), numSeeds, xMaxRanges[i], fType);
vtkm::Id numOutputs = out.GetNumberOfPartitions();
bool checkEnds = numOutputs == static_cast<vtkm::Id>(blockIds.size());
for (vtkm::Id i = 0; i < numOutputs; i++)
ValidateOutput(out.GetPartition(i),
numSeeds,
xMaxRanges[blockIds[i]],
fType,
checkEnds,
duplicateBlocks);
}
}
}
void TestStreamlineFiltersMPI()
{
std::srand(std::time(0));
TestDuplicatedBlocks(2);
return;
auto comm = vtkm::cont::EnvironmentTracker::GetCommunicator();
if (comm.rank() == 0)
std::cout << std::endl << "*** TestStreamlineFiltersMPI" << std::endl;
std::vector<bool> flags = { true, false };
std::vector<FilterType> filterTypes = { PARTICLE_ADVECTION, STREAMLINE, PATHLINE };
TestPartitionedDataSet(2, false, PARTICLE_ADVECTION, false, false);
/*
for (int n = 1; n < 3; n++)
{
for (auto useGhost : flags)
for (auto fType : filterTypes)
for (auto useThreaded : flags)
for (auto useBlockIds : flags)
TestPartitionedDataSet(n, useGhost, fType, useThreaded, useBlockIds);
}
for (auto useBlockIds : flags)
{
//Run blockIds with and without block duplication.
if (useBlockIds && comm.size() > 1)
{
TestPartitionedDataSet(n, useGhost, fType, useThreaded, useBlockIds, false);
TestPartitionedDataSet(n, useGhost, fType, useThreaded, useBlockIds, true);
}
else
TestPartitionedDataSet(n, useGhost, fType, useThreaded, useBlockIds, false);
}
for (auto fType : filterTypes)
for (auto useThreaded : flags)
TestAMRStreamline(fType, useThreaded);
*/
}
}