Allow block duplication

This commit is contained in:
Dave Pugmire 2023-04-07 09:50:29 -04:00
parent a0cdc62200
commit 6de278d8a4
9 changed files with 525 additions and 108 deletions

@ -53,6 +53,12 @@ public:
this->Seeds = vtkm::cont::make_ArrayHandle(seeds, copyFlag);
}
VTKM_CONT void SetBlockIDs(const std::vector<vtkm::Id>& blockIds)
{
this->BlockIds = blockIds;
this->BlockIdsSet = true;
}
VTKM_CONT
void SetSolverRK4() { this->SolverType = vtkm::filter::flow::IntegrationSolverType::RK4_TYPE; }
@ -90,6 +96,8 @@ protected:
VTKM_CONT virtual vtkm::filter::flow::FlowResultType GetResultType() const = 0;
bool BlockIdsSet = false;
std::vector<vtkm::Id> BlockIds;
vtkm::Id NumberOfSteps = 0;
vtkm::cont::UnknownArrayHandle Seeds;
vtkm::filter::flow::IntegrationSolverType SolverType =

@ -83,7 +83,12 @@ VTKM_CONT vtkm::cont::PartitionedDataSet FilterParticleAdvectionSteadyState::DoE
variant.Emplace<DSIType::ElectroMagneticFieldNameType>(electric, magnetic);
}
vtkm::filter::flow::internal::BoundsMap boundsMap(input);
vtkm::filter::flow::internal::BoundsMap boundsMap;
if (this->BlockIdsSet)
boundsMap = vtkm::filter::flow::internal::BoundsMap(input, this->BlockIds);
else
boundsMap = vtkm::filter::flow::internal::BoundsMap(input);
auto dsi = CreateDataSetIntegrators(
input, variant, boundsMap, this->SolverType, this->VecFieldType, this->GetResultType());

@ -77,10 +77,15 @@ public:
const ParticleType p = portal.Get(i);
std::vector<vtkm::Id> ids = this->BoundsMap.FindBlocks(p.GetPosition());
if (!ids.empty() && this->BoundsMap.FindRank(ids[0]) == this->Rank)
//DRP
if (!ids.empty())
{
particles.emplace_back(p);
blockIDs.emplace_back(ids);
auto ranks = this->BoundsMap.FindRank(ids[0]);
if (!ranks.empty() && this->Rank == ranks[0])
{
particles.emplace_back(p);
blockIDs.emplace_back(ids);
}
}
}
@ -102,6 +107,8 @@ public:
vtkm::Id numTerm = 0, blockId = -1;
if (this->GetActiveParticles(v, blockId))
{
std::cout << " RANK= " << this->Rank << " *****Advect: " << v[0].GetID()
<< " BID= " << blockId << std::endl;
//make this a pointer to avoid the copy?
auto& block = this->GetDataSet(blockId);
DSIHelperInfoType bb =
@ -165,6 +172,7 @@ public:
virtual bool GetActiveParticles(std::vector<ParticleType>& particles, vtkm::Id& blockId)
{
//DRP: particles = std::move(this->Active2[blockId]);
particles.clear();
blockId = -1;
if (this->Active.empty())
@ -187,23 +195,83 @@ public:
return !particles.empty();
}
virtual void Communicate(vtkm::filter::flow::internal::ParticleMessenger<ParticleType>& messenger,
vtkm::Id numLocalTerminations,
vtkm::Id& numTermMessages)
void Communicate(vtkm::filter::flow::internal::ParticleMessenger<ParticleType>& messenger,
vtkm::Id numLocalTerminations,
vtkm::Id& numTermMessages)
{
std::vector<ParticleType> outgoing;
std::vector<vtkm::Id> outgoingRanks;
this->GetOutgoingParticles(outgoing, outgoingRanks);
std::vector<ParticleType> incoming;
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>> incomingIDs;
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>> incomingBlockIDs;
numTermMessages = 0;
messenger.Exchange(this->Inactive,
messenger.Exchange(outgoing,
outgoingRanks,
this->ParticleBlockIDsMap,
numLocalTerminations,
incoming,
incomingIDs,
incomingBlockIDs,
numTermMessages,
this->GetBlockAndWait(numLocalTerminations));
//Cleanup what was sent.
for (const auto& p : outgoing)
this->ParticleBlockIDsMap.erase(p.GetID());
this->UpdateActive(incoming, incomingBlockIDs);
}
void GetOutgoingParticles(std::vector<ParticleType>& outgoing,
std::vector<vtkm::Id>& outgoingRanks)
{
outgoing.clear();
outgoingRanks.clear();
outgoing.reserve(this->Inactive.size());
outgoingRanks.reserve(this->Inactive.size());
//Send out Everything.
for (const auto& p : this->Inactive)
{
const auto& bid = this->ParticleBlockIDsMap[p.GetID()];
VTKM_ASSERT(!bid.empty());
auto ranks = this->BoundsMap.FindRank(bid[0]);
VTKM_ASSERT(!ranks.empty());
std::cout << "********************** GetOutgoing: " << bid[0] << " r= " << ranks.size()
<< std::endl;
if (ranks.size() == 1)
{
if (ranks[0] == this->Rank)
this->Active.emplace_back(p);
else
{
outgoing.emplace_back(p);
outgoingRanks.emplace_back(ranks[0]);
}
}
else
{
//Decide where it should go...
//Random selection:
vtkm::Id outRank = std::rand() % ranks.size();
std::cout << "******* CHOICES: " << ranks.size() << " --> " << outRank << std::endl;
if (outRank == this->Rank)
this->Active.emplace_back(p);
else
{
outgoing.emplace_back(p);
outgoingRanks.emplace_back(outRank);
}
}
}
this->Inactive.clear();
this->UpdateActive(incoming, incomingIDs);
VTKM_ASSERT(outgoing.size() == outgoingRanks.size());
}
virtual void UpdateActive(const std::vector<ParticleType>& particles,
@ -231,8 +299,8 @@ public:
vtkm::Id UpdateResult(const DSIHelperInfo<ParticleType>& stuff)
{
this->UpdateActive(stuff.A, stuff.IdMapA);
this->UpdateInactive(stuff.I, stuff.IdMapI);
this->UpdateActive(stuff.InBounds.Particles, stuff.InBounds.BlockIDs);
this->UpdateInactive(stuff.OutOfBounds.Particles, stuff.OutOfBounds.BlockIDs);
vtkm::Id numTerm = static_cast<vtkm::Id>(stuff.TermID.size());
//Update terminated particles.
@ -269,11 +337,22 @@ public:
std::vector<ParticleType> Inactive;
vtkm::Id NumberOfSteps;
vtkm::Id NumRanks;
//{particleId : {block IDs}}
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>> ParticleBlockIDsMap;
vtkm::Id Rank;
vtkm::FloatDefault StepSize;
vtkm::Id TotalNumParticles = 0;
vtkm::Id TotalNumTerminatedParticles = 0;
//Active particles:
// {blockID : {particles}
// std::unordered_map<vtkm::Id, std::vector<ParticleType>> Active2;
// {ParticleID : {blockIds}
// std::unordered_map<vtkm::Id, std::vector<vtkm::Id>> ActiveBlockIds;
// std::vector<ParticleType> Inactive2;
// {ParticleID : {blockIds}
// std::unordered_map<vtkm::Id, std::vector<vtkm::Id>> InactiveBlockIds;
};
}

@ -15,6 +15,7 @@
#include <vtkm/cont/AssignerPartitionedDataSet.h>
#include <vtkm/cont/DataSet.h>
#include <vtkm/cont/EnvironmentTracker.h>
#include <vtkm/cont/ErrorFilterExecution.h>
#include <vtkm/cont/Field.h>
#include <vtkm/cont/PartitionedDataSet.h>
@ -25,8 +26,24 @@
#include <vtkm/thirdparty/diy/mpi-cast.h>
#endif
#include <algorithm>
#include <iostream>
#include <set>
#include <vector>
template <typename T>
void printVec(const std::vector<T>& v)
{
std::cout << "[";
int n = v.size();
if (n > 0)
{
for (int i = 0; i < n - 1; i++)
std::cout << v[i] << " ";
std::cout << v[n - 1];
}
std::cout << "]";
}
namespace vtkm
{
namespace filter
@ -39,31 +56,22 @@ namespace internal
class VTKM_ALWAYS_EXPORT BoundsMap
{
public:
BoundsMap()
: LocalNumBlocks(0)
, TotalNumBlocks(0)
BoundsMap() {}
BoundsMap(const vtkm::cont::DataSet& dataSet) { this->Init({ dataSet }); }
BoundsMap(const vtkm::cont::DataSet& dataSet, const vtkm::Id& blockId)
{
this->Init({ dataSet }, { blockId });
}
BoundsMap(const vtkm::cont::DataSet& dataSet)
: LocalNumBlocks(1)
, TotalNumBlocks(0)
{
this->Init({ dataSet });
}
BoundsMap(const std::vector<vtkm::cont::DataSet>& dataSets) { this->Init(dataSets); }
BoundsMap(const std::vector<vtkm::cont::DataSet>& dataSets)
: LocalNumBlocks(static_cast<vtkm::Id>(dataSets.size()))
, TotalNumBlocks(0)
{
this->Init(dataSets);
}
BoundsMap(const vtkm::cont::PartitionedDataSet& pds) { this->Init(pds.GetPartitions()); }
BoundsMap(const vtkm::cont::PartitionedDataSet& pds)
: LocalNumBlocks(pds.GetNumberOfPartitions())
, TotalNumBlocks(0)
BoundsMap(const vtkm::cont::PartitionedDataSet& pds, const std::vector<vtkm::Id>& blockIds)
{
this->Init(pds.GetPartitions());
this->Init(pds.GetPartitions(), blockIds);
}
vtkm::Id GetLocalBlockId(vtkm::Id idx) const
@ -72,11 +80,16 @@ public:
return this->LocalIDs[static_cast<std::size_t>(idx)];
}
int FindRank(vtkm::Id blockId) const
std::vector<int> FindRank(vtkm::Id blockId) const
{
auto it = this->BlockToRankMap.find(blockId);
std::cout << "FindRank(" << blockId << ") --> ";
printVec(it->second);
std::cout << std::endl;
if (it == this->BlockToRankMap.end())
return -1;
return {};
return it->second;
}
@ -110,8 +123,124 @@ public:
vtkm::Id GetLocalNumBlocks() const { return this->LocalNumBlocks; }
private:
void Init(const std::vector<vtkm::cont::DataSet>& dataSets, const std::vector<vtkm::Id>& blockIds)
{
if (dataSets.size() != blockIds.size())
throw vtkm::cont::ErrorFilterExecution("Number of datasets and block ids must match");
this->LocalIDs = blockIds;
this->LocalNumBlocks = dataSets.size();
vtkmdiy::mpi::communicator comm = vtkm::cont::EnvironmentTracker::GetCommunicator();
if (comm.rank() == 0)
std::cout << "Init: " << blockIds.size() << std::endl;
//1. Get the min/max blockId
vtkm::Id locMinId = 0, locMaxId = 0;
if (!this->LocalIDs.empty())
{
locMinId = *std::min_element(this->LocalIDs.begin(), this->LocalIDs.end());
locMaxId = *std::max_element(this->LocalIDs.begin(), this->LocalIDs.end());
}
vtkm::Id globalMinId = 0, globalMaxId = 0;
vtkmdiy::mpi::all_reduce(comm, locMinId, globalMinId, vtkmdiy::mpi::minimum<vtkm::Id>{});
vtkmdiy::mpi::all_reduce(comm, locMaxId, globalMaxId, vtkmdiy::mpi::maximum<vtkm::Id>{});
if (globalMinId != 0 || (globalMaxId - globalMinId) < 1)
throw vtkm::cont::ErrorFilterExecution("Invalid block ids");
//2. Find out how many blocks everyone has.
std::vector<vtkm::Id> locBlockCounts(comm.size(), 0), globalBlockCounts(comm.size(), 0);
locBlockCounts[comm.rank()] = this->LocalIDs.size();
vtkmdiy::mpi::all_reduce(comm, locBlockCounts, globalBlockCounts, std::plus<vtkm::Id>{});
//note: there might be duplicates...
vtkm::Id globalNumBlocks =
std::accumulate(globalBlockCounts.begin(), globalBlockCounts.end(), 0);
//3. given the counts per rank, calc offset for this rank.
vtkm::Id offset = 0;
for (int i = 0; i < comm.rank(); i++)
offset += globalBlockCounts[i];
//4. calc the blocks on each rank.
std::vector<vtkm::Id> localBlockIds(globalNumBlocks, 0);
vtkm::Id idx = 0;
for (const auto& bid : this->LocalIDs)
localBlockIds[offset + idx++] = bid;
//use an MPI_Alltoallv instead.
std::vector<vtkm::Id> globalBlockIds(globalNumBlocks, 0);
vtkmdiy::mpi::all_reduce(comm, localBlockIds, globalBlockIds, std::plus<vtkm::Id>{});
//5. create a rank -> blockId map.
// rankToBlockIds[rank] = {this->LocalIDs on rank}.
std::vector<std::vector<vtkm::Id>> rankToBlockIds(comm.size());
offset = 0;
for (int rank = 0; rank < comm.size(); rank++)
{
vtkm::Id numBIds = globalBlockCounts[rank];
rankToBlockIds[rank].resize(numBIds);
for (vtkm::Id i = 0; i < numBIds; i++)
rankToBlockIds[rank][i] = globalBlockIds[offset + i];
offset += numBIds;
}
//6. there might be duplicates, so count number of UNIQUE blocks.
std::set<vtkm::Id> globalUniqueBlockIds;
globalUniqueBlockIds.insert(globalBlockIds.begin(), globalBlockIds.end());
this->TotalNumBlocks = globalUniqueBlockIds.size();
//Build a vector of : blockIdsToRank[blockId] = {ranks that have this blockId}
std::vector<std::vector<vtkm::Id>> blockIdsToRank(this->TotalNumBlocks);
for (int rank = 0; rank < comm.size(); rank++)
{
for (const auto& bid : rankToBlockIds[rank])
{
blockIdsToRank[bid].push_back(rank);
this->BlockToRankMap[bid].push_back(rank);
}
}
if (comm.rank() == 0)
{
std::cout << "Rank --> BlockIds" << std::endl;
for (int i = 0; i < comm.size(); i++)
{
std::cout << i << ": ";
printVec(rankToBlockIds[i]);
std::cout << std::endl;
}
std::cout << "BlockIds --> Ranks" << std::endl;
for (const auto& it : this->BlockToRankMap)
{
std::cout << it.first << " : ";
printVec(it.second);
std::cout << std::endl;
}
}
this->Build(dataSets);
//ensure dataSets.size() == blockIds.size()
//find min/max block id.
//count number of copies of each block. It better be >= 1 for each.
//find blocks per rank for all ranks.
//exchange who has what blocks.
//exchange ranks for each block
//building block tree will be the same: duplicates wont mess up the min/max
//
}
void Init(const std::vector<vtkm::cont::DataSet>& dataSets)
{
this->LocalNumBlocks = dataSets.size();
vtkm::cont::AssignerPartitionedDataSet assigner(this->LocalNumBlocks);
this->TotalNumBlocks = assigner.nblocks();
std::vector<int> ids;
@ -122,7 +251,7 @@ private:
this->LocalIDs.emplace_back(static_cast<vtkm::Id>(i));
for (vtkm::Id id = 0; id < this->TotalNumBlocks; id++)
this->BlockToRankMap[id] = assigner.rank(static_cast<int>(id));
this->BlockToRankMap[id] = { assigner.rank(static_cast<int>(id)) };
this->Build(dataSets);
}
@ -131,48 +260,68 @@ private:
std::vector<vtkm::Float64> vals(static_cast<std::size_t>(this->TotalNumBlocks * 6), 0);
std::vector<vtkm::Float64> vals2(vals.size());
std::vector<vtkm::Float64> localMins((this->TotalNumBlocks * 3),
std::numeric_limits<vtkm::Float64>::max());
std::vector<vtkm::Float64> localMaxs((this->TotalNumBlocks * 3),
-std::numeric_limits<vtkm::Float64>::max());
for (std::size_t i = 0; i < this->LocalIDs.size(); i++)
{
const vtkm::cont::DataSet& ds = dataSets[static_cast<std::size_t>(i)];
vtkm::Bounds bounds = ds.GetCoordinateSystem().GetBounds();
std::size_t idx = static_cast<std::size_t>(this->LocalIDs[i] * 6);
vals[idx++] = bounds.X.Min;
vals[idx++] = bounds.X.Max;
vals[idx++] = bounds.Y.Min;
vals[idx++] = bounds.Y.Max;
vals[idx++] = bounds.Z.Min;
vals[idx++] = bounds.Z.Max;
vtkm::Id localID = this->LocalIDs[i];
localMins[localID * 3 + 0] = bounds.X.Min;
localMins[localID * 3 + 1] = bounds.Y.Min;
localMins[localID * 3 + 2] = bounds.Z.Min;
localMaxs[localID * 3 + 0] = bounds.X.Max;
localMaxs[localID * 3 + 1] = bounds.Y.Max;
localMaxs[localID * 3 + 2] = bounds.Z.Max;
}
std::vector<vtkm::Float64> globalMins, globalMaxs;
#ifdef VTKM_ENABLE_MPI
vtkmdiy::mpi::communicator Comm = vtkm::cont::EnvironmentTracker::GetCommunicator();
MPI_Comm mpiComm = vtkmdiy::mpi::mpi_cast(Comm.handle());
MPI_Allreduce(vals.data(), vals2.data(), vals.size(), MPI_DOUBLE, MPI_SUM, mpiComm);
globalMins.resize(this->TotalNumBlocks * 3);
globalMaxs.resize(this->TotalNumBlocks * 3);
vtkmdiy::mpi::communicator comm = vtkm::cont::EnvironmentTracker::GetCommunicator();
vtkmdiy::mpi::all_reduce(comm, localMins, globalMins, vtkmdiy::mpi::minimum<vtkm::Float64>{});
vtkmdiy::mpi::all_reduce(comm, localMaxs, globalMaxs, vtkmdiy::mpi::maximum<vtkm::Float64>{});
#else
vals2 = vals;
globalMins = localMins;
globalMaxs = localMaxs;
#endif
this->BlockBounds.resize(static_cast<std::size_t>(this->TotalNumBlocks));
this->GlobalBounds = vtkm::Bounds();
std::size_t idx = 0;
for (auto& block : this->BlockBounds)
{
block = vtkm::Bounds(vals2[idx + 0],
vals2[idx + 1],
vals2[idx + 2],
vals2[idx + 3],
vals2[idx + 4],
vals2[idx + 5]);
block = vtkm::Bounds(globalMins[idx + 0],
globalMaxs[idx + 0],
globalMins[idx + 1],
globalMaxs[idx + 1],
globalMins[idx + 2],
globalMaxs[idx + 2]);
this->GlobalBounds.Include(block);
idx += 6;
idx += 3;
}
if (comm.rank() == 0)
{
std::cout << "BlockBounds: " << this->GlobalBounds << std::endl;
for (const auto& block : this->BlockBounds)
std::cout << " " << block << std::endl;
}
}
vtkm::Id LocalNumBlocks;
vtkm::Id LocalNumBlocks = 0;
std::vector<vtkm::Id> LocalIDs;
std::map<vtkm::Id, vtkm::Int32> BlockToRankMap;
vtkm::Id TotalNumBlocks;
std::map<vtkm::Id, std::vector<vtkm::Int32>> BlockToRankMap;
vtkm::Id TotalNumBlocks = 0;
std::vector<vtkm::Bounds> BlockBounds;
vtkm::Bounds GlobalBounds;
};

@ -42,16 +42,63 @@ public:
const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& particleBlockIDsMap)
: BoundsMap(boundsMap)
, ParticleBlockIDsMap(particleBlockIDsMap)
, V(v)
, Particles(v)
{
}
struct ParticleBlockIds
{
void Clear()
{
this->Particles.clear();
this->BlockIDs.clear();
}
void Add(const ParticleType& p, const std::vector<vtkm::Id>& bids)
{
this->Particles.emplace_back(p);
this->BlockIDs[p.GetID()] = std::move(bids);
}
std::vector<ParticleType> Particles;
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>> BlockIDs;
};
void Clear()
{
this->InBounds.Clear();
this->OutOfBounds.Clear();
this->TermIdx.clear();
this->TermID.clear();
}
void Validate(vtkm::Id num)
{
#ifndef NDEBUG
//Make sure we didn't miss anything. Every particle goes into a single bucket.
VTKM_ASSERT(static_cast<std::size_t>(num) ==
(this->InBounds.Particles.size() + this->OutOfBounds.Particles.size() +
this->TermIdx.size()));
VTKM_ASSERT(this->InBounds.Particles.size() == this->InBounds.BlockIDs.size());
VTKM_ASSERT(this->OutOfBounds.Particles.size() == this->OutOfBounds.BlockIDs.size());
VTKM_ASSERT(this->TermIdx.size() == this->TermID.size());
#endif
}
void AddTerminated(vtkm::Id idx, vtkm::Id pID)
{
this->TermIdx.emplace_back(idx);
this->TermID.emplace_back(pID);
}
const vtkm::filter::flow::internal::BoundsMap BoundsMap;
const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>> ParticleBlockIDsMap;
std::vector<ParticleType> A, I, V;
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>> IdMapA, IdMapI;
std::vector<vtkm::Id> TermIdx, TermID;
ParticleBlockIds InBounds;
ParticleBlockIds OutOfBounds;
std::vector<ParticleType> Particles;
std::vector<vtkm::Id> TermID;
std::vector<vtkm::Id> TermIdx;
};
using DSIHelperInfoType =
@ -147,12 +194,13 @@ VTKM_CONT inline void DataSetIntegrator<Derived>::ClassifyParticles(
const vtkm::cont::ArrayHandle<ParticleType>& particles,
DSIHelperInfo<ParticleType>& dsiInfo) const
{
dsiInfo.A.clear();
dsiInfo.I.clear();
dsiInfo.TermID.clear();
dsiInfo.TermIdx.clear();
dsiInfo.IdMapI.clear();
dsiInfo.IdMapA.clear();
/*
each particle: --> T,I,A
if T: update TermIdx, TermID
if A: update IdMapA;
if I: update IdMapI;
*/
dsiInfo.Clear();
auto portal = particles.WritePortal();
vtkm::Id n = portal.GetNumberOfValues();
@ -161,13 +209,15 @@ VTKM_CONT inline void DataSetIntegrator<Derived>::ClassifyParticles(
{
auto p = portal.Get(i);
//Terminated.
if (p.GetStatus().CheckTerminate())
{
dsiInfo.TermIdx.emplace_back(i);
dsiInfo.TermID.emplace_back(p.GetID());
dsiInfo.AddTerminated(i, p.GetID());
}
else
{
//Didn't terminate.
//Get the blockIDs.
const auto& it = dsiInfo.ParticleBlockIDsMap.find(p.GetID());
VTKM_ASSERT(it != dsiInfo.ParticleBlockIDsMap.end());
auto currBIDs = it->second;
@ -175,9 +225,17 @@ VTKM_CONT inline void DataSetIntegrator<Derived>::ClassifyParticles(
std::vector<vtkm::Id> newIDs;
if (p.GetStatus().CheckSpatialBounds() && !p.GetStatus().CheckTookAnySteps())
{
//particle is OUTSIDE but didn't take any steps.
//this means that the particle wasn't in the block.
//assign newIDs to currBIDs[1:]
newIDs.assign(std::next(currBIDs.begin(), 1), currBIDs.end());
}
else
{
//Otherwise, get new blocks from the current position.
newIDs = dsiInfo.BoundsMap.FindBlocks(p.GetPosition(), currBIDs);
}
//reset the particle status.
p.GetStatus() = vtkm::ParticleStatus();
@ -185,19 +243,19 @@ VTKM_CONT inline void DataSetIntegrator<Derived>::ClassifyParticles(
if (newIDs.empty()) //No blocks, we're done.
{
p.GetStatus().SetTerminate();
dsiInfo.TermIdx.emplace_back(i);
dsiInfo.TermID.emplace_back(p.GetID());
dsiInfo.AddTerminated(i, p.GetID());
}
else
{
//If we have more than blockId, we want to minimize communication
//If we have more than one blockId, we want to minimize communication
//and put any blocks owned by this rank first.
if (newIDs.size() > 1)
{
for (auto idit = newIDs.begin(); idit != newIDs.end(); idit++)
{
vtkm::Id bid = *idit;
if (dsiInfo.BoundsMap.FindRank(bid) == this->Rank)
auto ranks = dsiInfo.BoundsMap.FindRank(bid);
if (std::find(ranks.begin(), ranks.end(), this->Rank) != ranks.end())
{
newIDs.erase(idit);
newIDs.insert(newIDs.begin(), bid);
@ -206,26 +264,14 @@ VTKM_CONT inline void DataSetIntegrator<Derived>::ClassifyParticles(
}
}
int dstRank = dsiInfo.BoundsMap.FindRank(newIDs[0]);
if (dstRank == this->Rank)
{
dsiInfo.A.emplace_back(p);
dsiInfo.IdMapA[p.GetID()] = newIDs;
}
else
{
dsiInfo.I.emplace_back(p);
dsiInfo.IdMapI[p.GetID()] = newIDs;
}
dsiInfo.OutOfBounds.Add(p, newIDs);
}
portal.Set(i, p);
}
portal.Set(i, p);
}
//Make sure we didn't miss anything. Every particle goes into a single bucket.
VTKM_ASSERT(static_cast<std::size_t>(n) ==
(dsiInfo.A.size() + dsiInfo.I.size() + dsiInfo.TermIdx.size()));
VTKM_ASSERT(dsiInfo.TermIdx.size() == dsiInfo.TermID.size());
//Make sure everything is copacetic.
dsiInfo.Validate(n);
}
template <typename Derived>

@ -199,7 +199,7 @@ VTKM_CONT inline void DataSetIntegratorSteadyState::DoAdvect(DSIHelperInfo<vtkm:
using ArrayType = vtkm::cont::ArrayHandle<vtkm::Vec3f>;
auto copyFlag = (this->CopySeedArray ? vtkm::CopyFlag::On : vtkm::CopyFlag::Off);
auto seedArray = vtkm::cont::make_ArrayHandle(b.V, copyFlag);
auto seedArray = vtkm::cont::make_ArrayHandle(b.Particles, copyFlag);
if (this->VecFieldType == VectorFieldType::VELOCITY_FIELD_TYPE)
{
@ -238,7 +238,7 @@ VTKM_CONT inline void DataSetIntegratorSteadyState::DoAdvect(
using ArrayType = vtkm::cont::ArrayHandle<vtkm::Vec3f>;
auto copyFlag = (this->CopySeedArray ? vtkm::CopyFlag::On : vtkm::CopyFlag::Off);
auto seedArray = vtkm::cont::make_ArrayHandle(b.V, copyFlag);
auto seedArray = vtkm::cont::make_ArrayHandle(b.Particles, copyFlag);
if (this->VecFieldType == VectorFieldType::ELECTRO_MAGNETIC_FIELD_TYPE)
{

@ -200,7 +200,7 @@ VTKM_CONT inline void DataSetIntegratorUnsteadyState::DoAdvect(DSIHelperInfo<vtk
using ArrayType = vtkm::cont::ArrayHandle<vtkm::Vec3f>;
auto copyFlag = (this->CopySeedArray ? vtkm::CopyFlag::On : vtkm::CopyFlag::Off);
auto seedArray = vtkm::cont::make_ArrayHandle(b.V, copyFlag);
auto seedArray = vtkm::cont::make_ArrayHandle(b.Particles, copyFlag);
using AHType = internal::AdvectHelper<internal::UnsteadyStateGridEvalType, vtkm::Particle>;

@ -51,6 +51,7 @@ public:
VTKM_CONT ~ParticleMessenger() {}
VTKM_CONT void Exchange(const std::vector<ParticleType>& outData,
const std::vector<vtkm::Id>& outRanks,
const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
vtkm::Id numLocalTerm,
std::vector<ParticleType>& inData,
@ -96,6 +97,7 @@ protected:
VTKM_CONT void SerialExchange(
const std::vector<ParticleType>& outData,
const std::vector<vtkm::Id>& outRanks,
const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
vtkm::Id numLocalTerm,
std::vector<ParticleType>& inData,
@ -166,6 +168,7 @@ VTKM_CONT
template <typename ParticleType>
void ParticleMessenger<ParticleType>::SerialExchange(
const std::vector<ParticleType>& outData,
const std::vector<vtkm::Id>& vtkmNotUsed(outRanks),
const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
vtkm::Id vtkmNotUsed(numLocalTerm),
std::vector<ParticleType>& inData,
@ -184,6 +187,7 @@ VTKM_CONT
template <typename ParticleType>
void ParticleMessenger<ParticleType>::Exchange(
const std::vector<ParticleType>& outData,
const std::vector<vtkm::Id>& outRanks,
const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
vtkm::Id numLocalTerm,
std::vector<ParticleType>& inData,
@ -191,23 +195,25 @@ void ParticleMessenger<ParticleType>::Exchange(
vtkm::Id& numTerminateMessages,
bool blockAndWait)
{
VTKM_ASSERT(outData.size() == outRanks.size());
numTerminateMessages = 0;
inDataBlockIDsMap.clear();
if (this->GetNumRanks() == 1)
return this->SerialExchange(
outData, outBlockIDsMap, numLocalTerm, inData, inDataBlockIDsMap, blockAndWait);
outData, outRanks, outBlockIDsMap, numLocalTerm, inData, inDataBlockIDsMap, blockAndWait);
#ifdef VTKM_ENABLE_MPI
//dstRank, vector of (particles,blockIDs)
std::unordered_map<int, std::vector<ParticleCommType>> sendData;
for (const auto& p : outData)
std::size_t numP = outData.size();
for (std::size_t i = 0; i < numP; i++)
{
const auto& bids = outBlockIDsMap.find(p.GetID())->second;
int dstRank = this->BoundsMap.FindRank(bids[0]);
sendData[dstRank].emplace_back(std::make_pair(p, bids));
const auto& bids = outBlockIDsMap.find(outData[i].GetID())->second;
sendData[outRanks[i]].emplace_back(std::make_pair(outData[i], bids));
}
//Do all the sends first.

@ -51,16 +51,29 @@ void SetFilter(FilterType& filter,
vtkm::Id numSteps,
const std::string& fieldName,
vtkm::cont::ArrayHandle<vtkm::Particle> seedArray,
bool useThreaded)
bool useThreaded,
bool useBlockIds,
vtkm::Id nPerRank)
{
filter.SetStepSize(stepSize);
filter.SetNumberOfSteps(numSteps);
filter.SetSeeds(seedArray);
filter.SetActiveField(fieldName);
filter.SetUseThreadedAlgorithm(useThreaded);
if (useBlockIds)
{
auto comm = vtkm::cont::EnvironmentTracker::GetCommunicator();
std::vector<vtkm::Id> blockIds;
for (vtkm::Id i = 0; i < nPerRank; i++)
blockIds.push_back(comm.rank() * nPerRank + i);
filter.SetBlockIDs(blockIds);
}
}
void TestAMRStreamline(FilterType fType, bool useThreaded)
void TestAMRStreamline(FilterType fType, bool useThreaded, bool useBlockIds, vtkm::Id nPerRank)
{
switch (fType)
{
@ -152,13 +165,15 @@ void TestAMRStreamline(FilterType fType, bool useThreaded)
if (fType == STREAMLINE)
{
vtkm::filter::flow::Streamline streamline;
SetFilter(streamline, stepSize, numSteps, fieldName, seedArray, useThreaded);
SetFilter(
streamline, stepSize, numSteps, fieldName, seedArray, useThreaded, useBlockIds, nPerRank);
out = streamline.Execute(pds);
}
else if (fType == PATHLINE)
{
vtkm::filter::flow::Pathline pathline;
SetFilter(pathline, stepSize, numSteps, fieldName, seedArray, useThreaded);
SetFilter(
pathline, stepSize, numSteps, fieldName, seedArray, useThreaded, useBlockIds, nPerRank);
//Create timestep 2
auto pds2 = vtkm::cont::PartitionedDataSet(pds);
pathline.SetPreviousTime(0);
@ -285,8 +300,8 @@ void ValidateOutput(const vtkm::cont::DataSet& out,
"Wrong number of coordinate systems in the output dataset");
vtkm::cont::UnknownCellSet dcells = out.GetCellSet();
out.PrintSummary(std::cout);
std::cout << " nSeeds= " << numSeeds << std::endl;
//out.PrintSummary(std::cout);
//std::cout << " nSeeds= " << numSeeds << std::endl;
VTKM_TEST_ASSERT(dcells.GetNumberOfCells() == numSeeds, "Wrong number of cells");
auto coords = out.GetCoordinateSystem().GetDataAsMultiplexer();
auto ptPortal = coords.ReadPortal();
@ -314,7 +329,98 @@ void ValidateOutput(const vtkm::cont::DataSet& out,
}
}
void TestPartitionedDataSet(vtkm::Id nPerRank, bool useGhost, FilterType fType, bool useThreaded)
void TestDuplicatedBlocks(vtkm::Id nPerRank)
{
auto comm = vtkm::cont::EnvironmentTracker::GetCommunicator();
vtkm::Id numDims = 5;
vtkm::FloatDefault x0 = 0;
vtkm::FloatDefault x1 = x0 + static_cast<vtkm::FloatDefault>(numDims - 1);
vtkm::FloatDefault dx = x1 - x0;
vtkm::FloatDefault y0 = 0, y1 = numDims - 1, z0 = 0, z1 = numDims - 1;
std::vector<vtkm::Bounds> bounds;
for (vtkm::Id i = 0; i < nPerRank * comm.size(); i++)
{
bounds.push_back(vtkm::Bounds(x0, x1, y0, y1, z0, z1));
if (comm.rank() == 0)
{
std::cout << __FILE__ << " " << __LINE__ << std::endl;
std::cout << i << " :: (" << x0 << ", " << x1 << ")" << std::endl;
}
x0 += dx;
x1 += dx;
}
if (comm.rank() == 0)
{
std::cout << __FILE__ << " " << __LINE__ << std::endl;
std::cout << "Bounds: " << std::endl;
int i = 0;
for (const auto& b : bounds)
{
std::cout << " " << i << " " << b << std::endl;
i++;
}
}
comm.barrier();
const vtkm::Id3 dims(numDims, numDims, numDims);
auto allPDS = vtkm::worklet::testing::CreateAllDataSets(bounds, dims, false);
vtkm::Vec3f vecX(1, 0, 0);
std::string fieldName = "vec";
vtkm::FloatDefault stepSize = 0.1f;
vtkm::Id numSteps = 100000;
for (std::size_t n = 0; n < allPDS.size(); n++)
{
auto pds = allPDS[n];
AddVectorFields(pds, fieldName, vecX);
vtkm::cont::ArrayHandle<vtkm::Particle> seedArray;
seedArray = vtkm::cont::make_ArrayHandle({ vtkm::Particle(vtkm::Vec3f(.2f, 1.0f, .2f), 0) });
// vtkm::Particle(vtkm::Vec3f(.2f, 2.0f, .2f), 1) });
vtkm::Id numSeeds = seedArray.GetNumberOfValues();
vtkm::filter::flow::ParticleAdvection particleAdvection;
SetFilter(particleAdvection, stepSize, numSteps, fieldName, seedArray, false, false, nPerRank);
std::vector<vtkm::Id> blockIds;
if (comm.rank() == 0)
blockIds = { 0, 1, 2, 3 };
else
blockIds = { 1, 2, 3 };
vtkm::cont::PartitionedDataSet input;
for (const auto& bid : blockIds)
input.AppendPartition(pds.GetPartition(bid));
particleAdvection.SetBlockIDs(blockIds);
if (comm.rank() == 0)
{
std::cout << __FILE__ << " " << __LINE__ << std::endl;
std::cout << "all blocks: " << pds.GetNumberOfPartitions() << std::endl;
}
for (int r = 0; r < comm.size(); r++)
{
if (r == comm.rank())
std::cout << r << ": my blocks: " << input.GetNumberOfPartitions() << std::endl;
comm.barrier();
}
auto out = particleAdvection.Execute(input);
if (comm.rank() == comm.size() - 1)
out.PrintSummary(std::cout);
return;
}
}
void TestPartitionedDataSet(vtkm::Id nPerRank,
bool useGhost,
FilterType fType,
bool useThreaded,
bool useBlockIds)
{
switch (fType)
{
@ -374,7 +480,6 @@ void TestPartitionedDataSet(vtkm::Id nPerRank, bool useGhost, FilterType fType,
xMaxRanges.push_back(vtkm::Range(xMax, xMax + static_cast<vtkm::FloatDefault>(.5)));
}
std::vector<vtkm::cont::PartitionedDataSet> allPDS, allPDS2;
const vtkm::Id3 dims(numDims, numDims, numDims);
allPDS = vtkm::worklet::testing::CreateAllDataSets(bounds, dims, useGhost);
@ -397,7 +502,8 @@ void TestPartitionedDataSet(vtkm::Id nPerRank, bool useGhost, FilterType fType,
if (fType == STREAMLINE)
{
vtkm::filter::flow::Streamline streamline;
SetFilter(streamline, stepSize, numSteps, fieldName, seedArray, useThreaded);
SetFilter(
streamline, stepSize, numSteps, fieldName, seedArray, useThreaded, useBlockIds, nPerRank);
auto out = streamline.Execute(pds);
for (vtkm::Id i = 0; i < nPerRank; i++)
@ -406,7 +512,15 @@ void TestPartitionedDataSet(vtkm::Id nPerRank, bool useGhost, FilterType fType,
else if (fType == PARTICLE_ADVECTION)
{
vtkm::filter::flow::ParticleAdvection particleAdvection;
SetFilter(particleAdvection, stepSize, numSteps, fieldName, seedArray, useThreaded);
SetFilter(particleAdvection,
stepSize,
numSteps,
fieldName,
seedArray,
useThreaded,
useBlockIds,
nPerRank);
auto out = particleAdvection.Execute(pds);
//Particles end up in last rank.
@ -424,7 +538,8 @@ void TestPartitionedDataSet(vtkm::Id nPerRank, bool useGhost, FilterType fType,
AddVectorFields(pds2, fieldName, vecX);
vtkm::filter::flow::Pathline pathline;
SetFilter(pathline, stepSize, numSteps, fieldName, seedArray, useThreaded);
SetFilter(
pathline, stepSize, numSteps, fieldName, seedArray, useThreaded, useBlockIds, nPerRank);
pathline.SetPreviousTime(time0);
pathline.SetNextTime(time1);
@ -439,20 +554,29 @@ void TestPartitionedDataSet(vtkm::Id nPerRank, bool useGhost, FilterType fType,
void TestStreamlineFiltersMPI()
{
std::srand(std::time(0));
TestDuplicatedBlocks(2);
return;
std::vector<bool> flags = { true, false };
std::vector<FilterType> filterTypes = { PARTICLE_ADVECTION, STREAMLINE, PATHLINE };
TestPartitionedDataSet(2, false, PARTICLE_ADVECTION, false, false);
/*
for (int n = 1; n < 3; n++)
{
for (auto useGhost : flags)
for (auto fType : filterTypes)
for (auto useThreaded : flags)
TestPartitionedDataSet(n, useGhost, fType, useThreaded);
for (auto useBlockIds : flags)
TestPartitionedDataSet(n, useGhost, fType, useThreaded, useBlockIds);
}
for (auto fType : filterTypes)
for (auto useThreaded : flags)
TestAMRStreamline(fType, useThreaded);
*/
}
}