propagate the sync/async comm flag down.

This commit is contained in:
Dave Pugmire 2023-03-23 14:46:35 -04:00
parent f6ac8d2857
commit ad49b1e25b
8 changed files with 46 additions and 28 deletions

@ -87,9 +87,13 @@ public:
VTKM_CONT
void SetUseAsynchronousCommunication() { this->UseAsynchronousCommunication = true; }
VTKM_CONT
bool GetUseAsynchronousCommunication() { return this->UseAsynchronousCommunication; }
VTKM_CONT
void SetUseSynchronousCommunication() { this->UseAsynchronousCommunication = false; }
VTKM_CONT
bool GetUseSynchronousCommunication() { return !this->GetUseAsynchronousCommunication(); }
protected:
VTKM_CONT virtual void ValidateOptions() const;

@ -87,8 +87,12 @@ VTKM_CONT vtkm::cont::PartitionedDataSet FilterParticleAdvectionSteadyState::DoE
auto dsi = CreateDataSetIntegrators(
input, variant, boundsMap, this->SolverType, this->VecFieldType, this->GetResultType());
vtkm::filter::flow::internal::ParticleAdvector<DSIType> pav(
boundsMap, dsi, this->UseThreadedAlgorithm, this->GetResultType());
vtkm::filter::flow::internal::ParticleAdvector<DSIType> pav(boundsMap,
dsi,
this->UseThreadedAlgorithm,
this->UseAsynchronousCommunication,
this->GetResultType());
return pav.Execute(this->NumberOfSteps, this->StepSize, this->Seeds);
}

@ -75,8 +75,11 @@ VTKM_CONT vtkm::cont::PartitionedDataSet FilterParticleAdvectionUnsteadyState::D
this->VecFieldType,
this->GetResultType());
vtkm::filter::flow::internal::ParticleAdvector<DSIType> pav(
boundsMap, dsi, this->UseThreadedAlgorithm, this->GetResultType());
vtkm::filter::flow::internal::ParticleAdvector<DSIType> pav(boundsMap,
dsi,
this->UseThreadedAlgorithm,
this->UseAsynchronousCommunication,
this->GetResultType());
return pav.Execute(this->NumberOfSteps, this->StepSize, this->Seeds);
}

@ -29,11 +29,14 @@ template <typename DSIType, template <typename> class ResultType, typename Parti
class AdvectAlgorithm
{
public:
AdvectAlgorithm(const vtkm::filter::flow::internal::BoundsMap& bm, std::vector<DSIType>& blocks)
AdvectAlgorithm(const vtkm::filter::flow::internal::BoundsMap& bm,
std::vector<DSIType>& blocks,
bool useAsyncComm)
: Blocks(blocks)
, BoundsMap(bm)
, NumRanks(this->Comm.size())
, Rank(this->Comm.rank())
, UseAsynchronousCommunication(useAsyncComm)
{
}

@ -33,11 +33,16 @@ class AdvectAlgorithmThreaded : public AdvectAlgorithm<DSIType, ResultType, Part
{
public:
AdvectAlgorithmThreaded(const vtkm::filter::flow::internal::BoundsMap& bm,
std::vector<DSIType>& blocks)
: AdvectAlgorithm<DSIType, ResultType, ParticleType>(bm, blocks)
std::vector<DSIType>& blocks,
bool useAsyncComm)
: AdvectAlgorithm<DSIType, ResultType, ParticleType>(bm, blocks, useAsyncComm)
, Done(false)
, WorkerActivate(false)
{
VTKM_LOG_S(vtkm::cont::LogLevel::Info,
"Synchronous communication not supported for AdvectAlgorithmThreaded. Forcing "
"asynchronous communication.");
//For threaded algorithm, the particles go out of scope in the Work method.
//When this happens, they are destructed by the time the Manage thread gets them.
//Set the copy flag so the std::vector is copied into the ArrayHandle
@ -52,8 +57,6 @@ public:
std::vector<std::thread> workerThreads;
workerThreads.emplace_back(std::thread(AdvectAlgorithmThreaded::Worker, this));
this->Comm.barrier();
//MPI_Barrier(this->Comm);
this->Manage();
//This will only work for 1 thread. For > 1, the Blocks will need a mutex.
@ -135,8 +138,14 @@ protected:
void Manage()
{
if (!this->UseAsynchronousCommunication)
VTKM_LOG_S(vtkm::cont::LogLevel::Info,
"Synchronous communication not supported for AdvectAlgorithmThreaded. Forcing "
"asynchronous communication.");
bool useAsync = true;
vtkm::filter::flow::internal::ParticleMessenger<ParticleType> messenger(
this->Comm, this->UseAsynchronousCommunication, this->BoundsMap, 1, 128);
this->Comm, useAsync, this->BoundsMap, 1, 128);
messenger.Log << "Begin" << std::endl;
while (this->TotalNumTerminatedParticles < this->TotalNumParticles)
@ -164,10 +173,6 @@ protected:
messenger.Log << "DONE" << std::endl;
//Let the workers know that we are done.
this->SetDone();
//Do one last communicate to send out the messages.
vtkm::Id dummy;
this->Communicate(messenger, 0, dummy);
}
bool GetBlockAndWait(const bool& syncComm, const vtkm::Id& numLocalTerm) override

@ -32,10 +32,12 @@ public:
ParticleAdvector(const vtkm::filter::flow::internal::BoundsMap& bm,
const std::vector<DSIType>& blocks,
const bool& useThreaded,
const bool& useAsyncComm,
const vtkm::filter::flow::FlowResultType& parType)
: Blocks(blocks)
, BoundsMap(bm)
, ResultType(parType)
, UseAsynchronousCommunication(useAsyncComm)
, UseThreadedAlgorithm(useThreaded)
{
}
@ -61,7 +63,7 @@ private:
vtkm::FloatDefault stepSize,
const vtkm::cont::ArrayHandle<ParticleType>& seeds)
{
AlgorithmType algo(this->BoundsMap, this->Blocks);
AlgorithmType algo(this->BoundsMap, this->Blocks, this->UseAsynchronousCommunication);
algo.Execute(numSteps, stepSize, seeds);
return algo.GetOutput();
}
@ -113,6 +115,7 @@ private:
std::vector<DSIType> Blocks;
vtkm::filter::flow::internal::BoundsMap BoundsMap;
FlowResultType ResultType;
bool UseAsynchronousCommunication = true;
bool UseThreadedAlgorithm;
};

@ -116,7 +116,7 @@ void ValidateReceivedMessage(int sendRank,
VTKM_TEST_ASSERT(reqMsg[i] == recvMsg[i], "Wrong message value received");
}
void TestParticleMessenger(bool useAsyncComm)
void TestParticleMessenger()
{
auto comm = vtkm::cont::EnvironmentTracker::GetCommunicator();
@ -129,7 +129,7 @@ void TestParticleMessenger(bool useAsyncComm)
int maxNumParticles = 128;
int maxNumBlockIds = 5 * comm.size();
TestMessenger messenger(
comm, useAsyncComm, boundsMap, maxMsgSz / 2, maxNumParticles / 2, maxNumBlockIds / 2);
comm, true, boundsMap, maxMsgSz / 2, maxNumParticles / 2, maxNumBlockIds / 2);
//create some data.
std::vector<std::vector<vtkm::Particle>> particles(comm.size());
@ -242,7 +242,7 @@ void TestParticleMessenger(bool useAsyncComm)
comm.barrier();
}
void TestBufferSizes(bool useAsyncComm)
void TestBufferSizes()
{
//Make sure the buffer sizes are correct.
auto comm = vtkm::cont::EnvironmentTracker::GetCommunicator();
@ -256,7 +256,7 @@ void TestBufferSizes(bool useAsyncComm)
for (const auto& numP : numPs)
for (const auto& nBids : numBids)
{
TestMessenger messenger(comm, useAsyncComm, boundsMap);
TestMessenger messenger(comm, true, boundsMap);
std::size_t pSize, mSize;
messenger.GetBufferSizes(numP, nBids, mSz, pSize, mSize);
@ -287,11 +287,8 @@ void TestBufferSizes(bool useAsyncComm)
void TestParticleMessengerMPI()
{
for (const auto& flag : { true, false })
{
TestBufferSizes(flag);
TestParticleMessenger(flag);
}
TestBufferSizes();
TestParticleMessenger();
}
}

@ -464,6 +464,9 @@ void TestStreamlineFiltersMPI()
//TestPartitionedDataSet(1, false, PARTICLE_ADVECTION, true);
//TestPartitionedDataSet(1, false, PARTICLE_ADVECTION, true, true);
//return;
auto comm = vtkm::cont::EnvironmentTracker::GetCommunicator();
for (int n = 1; n < 3; n++)
{
@ -472,8 +475,6 @@ void TestStreamlineFiltersMPI()
for (auto useThreaded : flags)
for (auto useAsyncComm : flags)
{
if (useThreaded) // && !useAsyncComm)
continue;
TestPartitionedDataSet(n, useGhost, fType, useThreaded, useAsyncComm);
}
}
@ -483,8 +484,6 @@ void TestStreamlineFiltersMPI()
for (auto useThreaded : flags)
for (auto useAsyncComm : flags)
{
if (useThreaded) // && !useAsyncComm)
continue;
TestAMRStreamline(fType, useThreaded, useAsyncComm);
}
}