Compare commits

...

10 Commits

Author SHA1 Message Date
Dave Pugmire
b5dbd0d4c8 Merge branch 'sl_comm_probe' into 'master'
Draft: Use async termination.

See merge request vtk/vtk-m!3182
2024-06-28 14:40:18 -04:00
Kenneth Moreland
fc570a75a5 Merge branch 'release' into master 2024-06-28 11:10:45 -04:00
Kenneth Moreland
cb07d8400c Merge branch 'release-2.0' into release 2024-06-28 11:10:45 -04:00
Kenneth Moreland
f610044d79 Merge topic 'split-contour-bench-2-1' into release-2.0
6f5f65487 Split the contour benchmark into structured/unstructured

Acked-by: Kitware Robot <kwrobot@kitware.com>
Acked-by: Vicente Bolea <vicente.bolea@kitware.com>
Merge-request: !3239
2024-06-28 11:10:45 -04:00
Kenneth Moreland
6f5f654878 Split the contour benchmark into structured/unstructured
We've been having problems with PerformanceTestBenchContour. In the last
few iteration, the runtime goes way up. We cannot find any reason for
this in the source code. There don't appear to be any particular
problems with memory or tables. The best we can figure is an issue with
the device hardware in the container.

The easy solution should be to break the benchmark into smaller peices
to avoid the problem.
2024-06-26 17:47:27 -04:00
Dave Pugmire
c75dcace78 Merge branch 'master' of https://gitlab.kitware.com/vtk/vtk-m into sl_comm_probe 2024-04-01 15:44:57 -04:00
Dave Pugmire
02553c0b6f Merge branch 'master' of https://gitlab.kitware.com/vtk/vtk-m into sl_comm_probe 2024-03-19 15:33:13 -04:00
Dave Pugmire
eef93ff825 Merge branch 'master' of https://gitlab.kitware.com/vtk/vtk-m into sl_comm_probe 2024-02-02 15:05:57 -05:00
Dave Pugmire
dd96f1144b Fixes for non-mpi code... 2024-01-24 07:23:55 -05:00
Dave Pugmire
4bd340156b Use async termination. 2024-01-23 16:57:10 -05:00
5 changed files with 146 additions and 71 deletions

@ -15,6 +15,7 @@
#include <vtkm/cont/ErrorFilterExecution.h>
#include <vtkm/filter/Filter.h>
#include <vtkm/filter/flow/FlowTypes.h>
#include <vtkm/filter/flow/internal/BoundsMap.h>
#include <vtkm/filter/flow/vtkm_filter_flow_export.h>
namespace vtkm
@ -104,7 +105,7 @@ protected:
bool BlockIdsSet = false;
std::vector<vtkm::Id> BlockIds;
vtkm::filter::flow::internal::BoundsMap BoundsMap;
vtkm::Id NumberOfSteps = 0;
vtkm::cont::UnknownArrayHandle Seeds;
vtkm::filter::flow::IntegrationSolverType SolverType =

@ -58,13 +58,15 @@ FilterParticleAdvectionSteadyState<Derived>::DoExecutePartitions(
DataSetIntegratorSteadyState<ParticleType, FieldType, TerminationType, AnalysisType>;
this->ValidateOptions();
if (this->BlockIdsSet)
this->BoundsMap = vtkm::filter::flow::internal::BoundsMap(input, this->BlockIds);
else
this->BoundsMap = vtkm::filter::flow::internal::BoundsMap(input);
vtkm::filter::flow::internal::BoundsMap boundsMap(input);
std::vector<DSIType> dsi;
for (vtkm::Id i = 0; i < input.GetNumberOfPartitions(); i++)
{
vtkm::Id blockId = boundsMap.GetLocalBlockId(i);
vtkm::Id blockId = this->BoundsMap.GetLocalBlockId(i);
auto dataset = input.GetPartition(i);
// Build the field for the current dataset
@ -78,7 +80,7 @@ FilterParticleAdvectionSteadyState<Derived>::DoExecutePartitions(
}
vtkm::filter::flow::internal::ParticleAdvector<DSIType> pav(
boundsMap, dsi, this->UseThreadedAlgorithm, this->UseAsynchronousCommunication);
this->BoundsMap, dsi, this->UseThreadedAlgorithm, this->UseAsynchronousCommunication);
vtkm::cont::ArrayHandle<ParticleType> particles;
this->Seeds.AsArrayHandle(particles);

@ -55,12 +55,15 @@ FilterParticleAdvectionUnsteadyState<Derived>::DoExecutePartitions(
using DSIType = vtkm::filter::flow::internal::
DataSetIntegratorUnsteadyState<ParticleType, FieldType, TerminationType, AnalysisType>;
vtkm::filter::flow::internal::BoundsMap boundsMap(input);
if (this->BlockIdsSet)
this->BoundsMap = vtkm::filter::flow::internal::BoundsMap(input, this->BlockIds);
else
this->BoundsMap = vtkm::filter::flow::internal::BoundsMap(input);
std::vector<DSIType> dsi;
for (vtkm::Id i = 0; i < input.GetNumberOfPartitions(); i++)
{
vtkm::Id blockId = boundsMap.GetLocalBlockId(i);
vtkm::Id blockId = this->BoundsMap.GetLocalBlockId(i);
auto ds1 = input.GetPartition(i);
auto ds2 = this->Input2.GetPartition(i);
@ -85,7 +88,7 @@ FilterParticleAdvectionUnsteadyState<Derived>::DoExecutePartitions(
analysis);
}
vtkm::filter::flow::internal::ParticleAdvector<DSIType> pav(
boundsMap, dsi, this->UseThreadedAlgorithm, this->UseAsynchronousCommunication);
this->BoundsMap, dsi, this->UseThreadedAlgorithm, this->UseAsynchronousCommunication);
vtkm::cont::ArrayHandle<ParticleType> particles;
this->Seeds.AsArrayHandle(particles);

@ -15,6 +15,10 @@
#include <vtkm/filter/flow/internal/BoundsMap.h>
#include <vtkm/filter/flow/internal/DataSetIntegrator.h>
#include <vtkm/filter/flow/internal/ParticleMessenger.h>
#ifdef VTKM_ENABLE_MPI
#include <vtkm/thirdparty/diy/diy.h>
#include <vtkm/thirdparty/diy/mpi-cast.h>
#endif
namespace vtkm
{
@ -25,6 +29,87 @@ namespace flow
namespace internal
{
class AdvectAlgorithmTerminator
{
public:
#ifdef VTKM_ENABLE_MPI
AdvectAlgorithmTerminator(vtkmdiy::mpi::communicator& comm)
: MPIComm(vtkmdiy::mpi::mpi_cast(comm.handle()))
#else
AdvectAlgorithmTerminator(vtkmdiy::mpi::communicator& vtkmNotUsed(comm))
#endif
{
}
void AddWork()
{
#ifdef VTKM_ENABLE_MPI
this->Dirty = 1;
#endif
}
bool Done() const { return this->State == AdvectAlgorithmTerminatorState::DONE; }
void Control(bool haveLocalWork)
{
#ifdef VTKM_ENABLE_MPI
if (this->State == STATE_0 && !haveLocalWork)
{
MPI_Ibarrier(this->MPIComm, &this->StateReq);
this->Dirty = 0;
this->State = STATE_1;
}
else if (this->State == STATE_1)
{
MPI_Status status;
int flag;
MPI_Test(&this->StateReq, &flag, &status);
if (flag == 1)
{
int localDirty = this->Dirty;
MPI_Iallreduce(
&localDirty, &this->AllDirty, 1, MPI_INT, MPI_LOR, this->MPIComm, &this->StateReq);
this->State = STATE_2;
}
}
else if (this->State == STATE_2)
{
MPI_Status status;
int flag;
MPI_Test(&this->StateReq, &flag, &status);
if (flag == 1)
{
if (this->AllDirty == 0) //done
this->State = DONE;
else
this->State = STATE_0; //reset.
}
}
#else
if (!haveLocalWork)
this->State = DONE;
#endif
}
private:
enum AdvectAlgorithmTerminatorState
{
STATE_0,
STATE_1,
STATE_2,
DONE
};
AdvectAlgorithmTerminatorState State = AdvectAlgorithmTerminatorState::STATE_0;
#ifdef VTKM_ENABLE_MPI
std::atomic<int> Dirty;
int AllDirty = 0;
MPI_Request StateReq;
MPI_Comm MPIComm;
#endif
};
template <typename DSIType>
class AdvectAlgorithm
{
@ -39,6 +124,7 @@ public:
, NumRanks(this->Comm.size())
, Rank(this->Comm.rank())
, UseAsynchronousCommunication(useAsyncComm)
, Terminator(this->Comm)
{
}
@ -97,27 +183,21 @@ public:
vtkm::filter::flow::internal::ParticleMessenger<ParticleType> messenger(
this->Comm, this->UseAsynchronousCommunication, this->BoundsMap, 1, 128);
this->ComputeTotalNumParticles();
while (this->TotalNumTerminatedParticles < this->TotalNumParticles)
while (!this->Terminator.Done())
{
std::vector<ParticleType> v;
vtkm::Id numTerm = 0, blockId = -1;
vtkm::Id blockId = -1;
if (this->GetActiveParticles(v, blockId))
{
//make this a pointer to avoid the copy?
auto& block = this->GetDataSet(blockId);
DSIHelperInfo<ParticleType> bb(v, this->BoundsMap, this->ParticleBlockIDsMap);
block.Advect(bb, this->StepSize);
numTerm = this->UpdateResult(bb);
this->UpdateResult(bb);
}
vtkm::Id numTermMessages = 0;
this->Communicate(messenger, numTerm, numTermMessages);
this->TotalNumTerminatedParticles += (numTerm + numTermMessages);
if (this->TotalNumTerminatedParticles > this->TotalNumParticles)
throw vtkm::cont::ErrorFilterExecution("Particle count error");
this->Communicate(messenger);
this->Terminator.Control(!this->Active.empty());
}
}
@ -128,19 +208,6 @@ public:
this->ParticleBlockIDsMap.clear();
}
void ComputeTotalNumParticles()
{
vtkm::Id numLocal = static_cast<vtkm::Id>(this->Inactive.size());
for (const auto& it : this->Active)
numLocal += it.second.size();
#ifdef VTKM_ENABLE_MPI
vtkmdiy::mpi::all_reduce(this->Comm, numLocal, this->TotalNumParticles, std::plus<vtkm::Id>{});
#else
this->TotalNumParticles = numLocal;
#endif
}
DataSetIntegrator<DSIType, ParticleType>& GetDataSet(vtkm::Id id)
{
for (auto& it : this->Blocks)
@ -213,9 +280,7 @@ public:
return !particles.empty();
}
void Communicate(vtkm::filter::flow::internal::ParticleMessenger<ParticleType>& messenger,
vtkm::Id numLocalTerminations,
vtkm::Id& numTermMessages)
void Communicate(vtkm::filter::flow::internal::ParticleMessenger<ParticleType>& messenger)
{
std::vector<ParticleType> outgoing;
std::vector<vtkm::Id> outgoingRanks;
@ -224,16 +289,17 @@ public:
std::vector<ParticleType> incoming;
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>> incomingBlockIDs;
numTermMessages = 0;
bool block = false;
#ifdef VTKM_ENABLE_MPI
block = this->GetBlockAndWait(messenger.UsingSyncCommunication(), numLocalTerminations);
block = this->GetBlockAndWait(messenger.UsingSyncCommunication());
#endif
vtkm::Id numTermMessages;
messenger.Exchange(outgoing,
outgoingRanks,
this->ParticleBlockIDsMap,
numLocalTerminations,
0,
incoming,
incomingBlockIDs,
numTermMessages,
@ -311,17 +377,22 @@ public:
{
VTKM_ASSERT(particles.size() == idsMap.size());
for (auto pit = particles.begin(); pit != particles.end(); pit++)
if (!particles.empty())
{
vtkm::Id particleID = pit->GetID();
const auto& it = idsMap.find(particleID);
VTKM_ASSERT(it != idsMap.end() && !it->second.empty());
vtkm::Id blockId = it->second[0];
this->Active[blockId].emplace_back(*pit);
}
this->Terminator.AddWork();
for (const auto& it : idsMap)
this->ParticleBlockIDsMap[it.first] = it.second;
for (auto pit = particles.begin(); pit != particles.end(); pit++)
{
vtkm::Id particleID = pit->GetID();
const auto& it = idsMap.find(particleID);
VTKM_ASSERT(it != idsMap.end() && !it->second.empty());
vtkm::Id blockId = it->second[0];
this->Active[blockId].emplace_back(*pit);
}
for (const auto& it : idsMap)
this->ParticleBlockIDsMap[it.first] = it.second;
}
}
virtual void UpdateInactive(const std::vector<ParticleType>& particles,
@ -351,7 +422,7 @@ public:
}
virtual bool GetBlockAndWait(const bool& syncComm, const vtkm::Id& numLocalTerm)
virtual bool GetBlockAndWait(const bool& syncComm)
{
bool haveNoWork = this->Active.empty() && this->Inactive.empty();
@ -367,9 +438,11 @@ public:
//2. numLocalTerm + this->TotalNumberOfTerminatedParticles == this->TotalNumberOfParticles
//So, if neither are true, we can safely block and wait for communication to come in.
if (haveNoWork &&
(numLocalTerm + this->TotalNumTerminatedParticles < this->TotalNumParticles))
return true;
// if (this->Terminator.State == AdvectAlgorithmTerminator::AdvectAlgorithmTerminatorState::STATE_2)
// return true;
// if (haveNoWork && (numLocalTerm + this->TotalNumTerminatedParticles < this->TotalNumParticles))
// return true;
return false;
}
@ -388,9 +461,8 @@ public:
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>> ParticleBlockIDsMap;
vtkm::Id Rank;
vtkm::FloatDefault StepSize;
vtkm::Id TotalNumParticles = 0;
vtkm::Id TotalNumTerminatedParticles = 0;
bool UseAsynchronousCommunication = true;
AdvectAlgorithmTerminator Terminator;
};
}

@ -39,7 +39,6 @@ public:
bool useAsyncComm)
: AdvectAlgorithm<DSIType>(bm, blocks, useAsyncComm)
, Done(false)
, WorkerActivate(false)
{
//For threaded algorithm, the particles go out of scope in the Work method.
//When this happens, they are destructed by the time the Manage thread gets them.
@ -50,8 +49,6 @@ public:
void Go() override
{
this->ComputeTotalNumParticles();
std::vector<std::thread> workerThreads;
workerThreads.emplace_back(std::thread(AdvectAlgorithmThreaded::Worker, this));
this->Manage();
@ -63,6 +60,13 @@ public:
}
protected:
bool HaveAnyWork()
{
std::lock_guard<std::mutex> lock(this->Mutex);
//We have work if there particles in any queues or a worker is busy.
return !this->Active.empty() || !this->Inactive.empty() || this->WorkerActivate;
}
bool GetActiveParticles(std::vector<ParticleType>& particles, vtkm::Id& blockId) override
{
std::lock_guard<std::mutex> lock(this->Mutex);
@ -144,38 +148,31 @@ protected:
vtkm::filter::flow::internal::ParticleMessenger<ParticleType> messenger(
this->Comm, useAsync, this->BoundsMap, 1, 128);
while (this->TotalNumTerminatedParticles < this->TotalNumParticles)
while (!this->Terminator.Done())
{
std::unordered_map<vtkm::Id, std::vector<DSIHelperInfo<ParticleType>>> workerResults;
this->GetWorkerResults(workerResults);
vtkm::Id numTerm = 0;
for (auto& it : workerResults)
{
for (auto& r : it.second)
numTerm += this->UpdateResult(r);
}
this->UpdateResult(r);
vtkm::Id numTermMessages = 0;
this->Communicate(messenger, numTerm, numTermMessages);
this->TotalNumTerminatedParticles += (numTerm + numTermMessages);
if (this->TotalNumTerminatedParticles > this->TotalNumParticles)
throw vtkm::cont::ErrorFilterExecution("Particle count error");
this->Communicate(messenger);
this->Terminator.Control(this->HaveAnyWork());
}
//Let the workers know that we are done.
this->SetDone();
}
bool GetBlockAndWait(const bool& syncComm, const vtkm::Id& numLocalTerm) override
bool GetBlockAndWait(const bool& syncComm) override
{
std::lock_guard<std::mutex> lock(this->Mutex);
if (this->Done)
return true;
return (this->AdvectAlgorithm<DSIType>::GetBlockAndWait(syncComm, numLocalTerm) &&
!this->WorkerActivate && this->WorkerResults.empty());
return (this->AdvectAlgorithm<DSIType>::GetBlockAndWait(syncComm) && !this->WorkerActivate &&
this->WorkerResults.empty());
}
void GetWorkerResults(
@ -193,7 +190,7 @@ protected:
std::atomic<bool> Done;
std::mutex Mutex;
bool WorkerActivate;
bool WorkerActivate = false;
std::condition_variable WorkerActivateCondition;
std::unordered_map<vtkm::Id, std::vector<DSIHelperInfo<ParticleType>>> WorkerResults;
};