Add useAsyncComm flags.

This commit is contained in:
Dave Pugmire 2023-03-23 13:41:57 -04:00
parent b74e9efc06
commit f6ac8d2857
8 changed files with 71 additions and 35 deletions

@ -85,6 +85,12 @@ public:
VTKM_CONT
void SetUseThreadedAlgorithm(bool val) { this->UseThreadedAlgorithm = val; }
VTKM_CONT
void SetUseAsynchronousCommunication() { this->UseAsynchronousCommunication = true; }
VTKM_CONT
void SetUseSynchronousCommunication() { this->UseAsynchronousCommunication = false; }
protected:
VTKM_CONT virtual void ValidateOptions() const;
@ -95,6 +101,7 @@ protected:
vtkm::filter::flow::IntegrationSolverType SolverType =
vtkm::filter::flow::IntegrationSolverType::RK4_TYPE;
vtkm::FloatDefault StepSize = 0;
bool UseAsynchronousCommunication = true;
bool UseThreadedAlgorithm = false;
vtkm::filter::flow::VectorFieldType VecFieldType =
vtkm::filter::flow::VectorFieldType::VELOCITY_FIELD_TYPE;

@ -91,7 +91,7 @@ public:
virtual void Go()
{
vtkm::filter::flow::internal::ParticleMessenger<ParticleType> messenger(
this->Comm, this->BoundsMap, 1, 128);
this->Comm, this->UseAsynchronousCommunication, this->BoundsMap, 1, 128);
vtkm::Id nLocal = static_cast<vtkm::Id>(this->Active.size() + this->Inactive.size());
this->ComputeTotalNumParticles(nLocal);
@ -295,6 +295,7 @@ public:
vtkm::FloatDefault StepSize;
vtkm::Id TotalNumParticles = 0;
vtkm::Id TotalNumTerminatedParticles = 0;
bool UseAsynchronousCommunication = true;
};
}

@ -136,7 +136,7 @@ protected:
void Manage()
{
vtkm::filter::flow::internal::ParticleMessenger<ParticleType> messenger(
this->Comm, this->BoundsMap, 1, 128);
this->Comm, this->UseAsynchronousCommunication, this->BoundsMap, 1, 128);
messenger.Log << "Begin" << std::endl;
while (this->TotalNumTerminatedParticles < this->TotalNumParticles)

@ -33,13 +33,14 @@ namespace internal
VTKM_CONT
#ifdef VTKM_ENABLE_MPI
Messenger::Messenger(vtkmdiy::mpi::communicator& comm)
Messenger::Messenger(vtkmdiy::mpi::communicator& comm, bool useAsyncComm)
: MPIComm(vtkmdiy::mpi::mpi_cast(comm.handle()))
, MsgID(0)
, NumRanks(comm.size())
, Rank(comm.rank())
, UseAsynchronousCommunication(useAsyncComm)
#else
Messenger::Messenger(vtkmdiy::mpi::communicator& vtkmNotUsed(comm))
Messenger::Messenger(vtkmdiy::mpi::communicator& vtkmNotUsed(comm), bool vtkmNotUsed(useAsyncComm))
#endif
{
std::string fname = "out." + std::to_string(this->Rank) + ".log";

@ -36,7 +36,7 @@ namespace internal
class VTKM_FILTER_FLOW_EXPORT Messenger
{
public:
VTKM_CONT Messenger(vtkmdiy::mpi::communicator& comm);
VTKM_CONT Messenger(vtkmdiy::mpi::communicator& comm, bool useAsyncComm);
VTKM_CONT virtual ~Messenger()
{
#ifdef VTKM_ENABLE_MPI
@ -51,7 +51,7 @@ public:
VTKM_CONT void RegisterTag(int tag, std::size_t numRecvs, std::size_t size);
bool UsingSyncCommunication() const { return !this->UsingAsyncCommunication(); }
bool UsingAsyncCommunication() const { return this->AsyncCommunication; }
bool UsingAsyncCommunication() const { return this->UseAsynchronousCommunication; }
std::ofstream Log;
@ -63,7 +63,7 @@ protected:
void CleanupRequests(int tag = TAG_ANY);
void SendData(int dst, int tag, vtkmdiy::MemoryBuffer& buff)
{
if (this->AsyncCommunication)
if (this->UseAsynchronousCommunication)
this->SendDataAsync(dst, tag, buff);
else
this->SendDataSync(dst, tag, buff);
@ -72,7 +72,7 @@ protected:
std::vector<std::pair<int, vtkmdiy::MemoryBuffer>>& buffers,
bool blockAndWait = false)
{
if (this->AsyncCommunication)
if (this->UseAsynchronousCommunication)
return this->RecvDataAsync(tags, buffers, blockAndWait);
else
return this->RecvDataSync(tags, buffers, blockAndWait);
@ -111,7 +111,6 @@ private:
using RankIdPair = std::pair<int, int>;
//Member data
bool AsyncCommunication = false; //true;
// <tag, {dst, buffer}>
std::map<int, std::vector<std::pair<int, vtkmdiy::MemoryBuffer>>> SyncSendBuffers;
std::map<int, std::pair<std::size_t, std::size_t>> MessageTagInfo;
@ -123,6 +122,7 @@ private:
std::map<RankIdPair, std::list<char*>> RecvPackets;
std::map<RequestTagPair, char*> SendBuffers;
static constexpr int TAG_ANY = -1;
bool UseAsynchronousCommunication = true;
void CheckRequests(const std::map<RequestTagPair, char*>& buffer,
const std::set<int>& tags,

@ -44,6 +44,7 @@ class VTKM_FILTER_FLOW_EXPORT ParticleMessenger : public vtkm::filter::flow::int
public:
VTKM_CONT ParticleMessenger(vtkmdiy::mpi::communicator& comm,
bool useAsyncComm,
const vtkm::filter::flow::internal::BoundsMap& bm,
int msgSz = 1,
int numParticles = 128,
@ -113,11 +114,12 @@ VTKM_CONT
template <typename ParticleType>
ParticleMessenger<ParticleType>::ParticleMessenger(
vtkmdiy::mpi::communicator& comm,
bool useAsyncComm,
const vtkm::filter::flow::internal::BoundsMap& boundsMap,
int msgSz,
int numParticles,
int numBlockIds)
: Messenger(comm)
: Messenger(comm, useAsyncComm)
#ifdef VTKM_ENABLE_MPI
, BoundsMap(boundsMap)
#endif

@ -27,11 +27,12 @@ class TestMessenger : public vtkm::filter::flow::internal::ParticleMessenger<vtk
{
public:
TestMessenger(vtkmdiy::mpi::communicator& comm,
bool useAsyncComm,
const vtkm::filter::flow::internal::BoundsMap& bm,
int msgSz = 1,
int numParticles = 1,
int numBlockIds = 1)
: ParticleMessenger(comm, bm, msgSz, numParticles, numBlockIds)
: ParticleMessenger(comm, useAsyncComm, bm, msgSz, numParticles, numBlockIds)
{
}
@ -115,7 +116,7 @@ void ValidateReceivedMessage(int sendRank,
VTKM_TEST_ASSERT(reqMsg[i] == recvMsg[i], "Wrong message value received");
}
void TestParticleMessenger()
void TestParticleMessenger(bool useAsyncComm)
{
auto comm = vtkm::cont::EnvironmentTracker::GetCommunicator();
@ -127,7 +128,8 @@ void TestParticleMessenger()
int maxMsgSz = 100;
int maxNumParticles = 128;
int maxNumBlockIds = 5 * comm.size();
TestMessenger messenger(comm, boundsMap, maxMsgSz / 2, maxNumParticles / 2, maxNumBlockIds / 2);
TestMessenger messenger(
comm, useAsyncComm, boundsMap, maxMsgSz / 2, maxNumParticles / 2, maxNumBlockIds / 2);
//create some data.
std::vector<std::vector<vtkm::Particle>> particles(comm.size());
@ -240,7 +242,7 @@ void TestParticleMessenger()
comm.barrier();
}
void TestBufferSizes()
void TestBufferSizes(bool useAsyncComm)
{
//Make sure the buffer sizes are correct.
auto comm = vtkm::cont::EnvironmentTracker::GetCommunicator();
@ -254,7 +256,7 @@ void TestBufferSizes()
for (const auto& numP : numPs)
for (const auto& nBids : numBids)
{
TestMessenger messenger(comm, boundsMap);
TestMessenger messenger(comm, useAsyncComm, boundsMap);
std::size_t pSize, mSize;
messenger.GetBufferSizes(numP, nBids, mSz, pSize, mSize);
@ -285,8 +287,11 @@ void TestBufferSizes()
void TestParticleMessengerMPI()
{
TestBufferSizes();
TestParticleMessenger();
for (const auto& flag : { true, false })
{
TestBufferSizes(flag);
TestParticleMessenger(flag);
}
}
}

@ -51,16 +51,21 @@ void SetFilter(FilterType& filter,
vtkm::Id numSteps,
const std::string& fieldName,
vtkm::cont::ArrayHandle<vtkm::Particle> seedArray,
bool useThreaded)
bool useThreaded,
bool useAsyncComm)
{
filter.SetStepSize(stepSize);
filter.SetNumberOfSteps(numSteps);
filter.SetSeeds(seedArray);
filter.SetActiveField(fieldName);
filter.SetUseThreadedAlgorithm(useThreaded);
if (useAsyncComm)
filter.SetUseAsynchronousCommunication();
else
filter.SetUseSynchronousCommunication();
}
void TestAMRStreamline(FilterType fType, bool useThreaded)
void TestAMRStreamline(FilterType fType, bool useThreaded, bool useAsyncComm)
{
switch (fType)
{
@ -76,6 +81,10 @@ void TestAMRStreamline(FilterType fType, bool useThreaded)
}
if (useThreaded)
std::cout << " - using threaded";
if (useAsyncComm)
std::cout << " - usingAsyncComm";
else
std::cout << " - usingSyncComm";
std::cout << " - on an AMR data set" << std::endl;
auto comm = vtkm::cont::EnvironmentTracker::GetCommunicator();
@ -152,13 +161,13 @@ void TestAMRStreamline(FilterType fType, bool useThreaded)
if (fType == STREAMLINE)
{
vtkm::filter::flow::Streamline streamline;
SetFilter(streamline, stepSize, numSteps, fieldName, seedArray, useThreaded);
SetFilter(streamline, stepSize, numSteps, fieldName, seedArray, useThreaded, useAsyncComm);
out = streamline.Execute(pds);
}
else if (fType == PATHLINE)
{
vtkm::filter::flow::Pathline pathline;
SetFilter(pathline, stepSize, numSteps, fieldName, seedArray, useThreaded);
SetFilter(pathline, stepSize, numSteps, fieldName, seedArray, useThreaded, useAsyncComm);
//Create timestep 2
auto pds2 = vtkm::cont::PartitionedDataSet(pds);
pathline.SetPreviousTime(0);
@ -314,7 +323,11 @@ void ValidateOutput(const vtkm::cont::DataSet& out,
}
}
void TestPartitionedDataSet(vtkm::Id nPerRank, bool useGhost, FilterType fType, bool useThreaded)
void TestPartitionedDataSet(vtkm::Id nPerRank,
bool useGhost,
FilterType fType,
bool useThreaded,
bool useAsyncComm)
{
switch (fType)
{
@ -332,6 +345,10 @@ void TestPartitionedDataSet(vtkm::Id nPerRank, bool useGhost, FilterType fType,
std::cout << " - using ghost cells";
if (useThreaded)
std::cout << " - using threaded";
if (useAsyncComm)
std::cout << " - usingAsyncComm";
else
std::cout << " - usingSyncComm";
std::cout << " - on a partitioned data set" << std::endl;
auto comm = vtkm::cont::EnvironmentTracker::GetCommunicator();
@ -397,7 +414,7 @@ void TestPartitionedDataSet(vtkm::Id nPerRank, bool useGhost, FilterType fType,
if (fType == STREAMLINE)
{
vtkm::filter::flow::Streamline streamline;
SetFilter(streamline, stepSize, numSteps, fieldName, seedArray, useThreaded);
SetFilter(streamline, stepSize, numSteps, fieldName, seedArray, useThreaded, useAsyncComm);
auto out = streamline.Execute(pds);
for (vtkm::Id i = 0; i < nPerRank; i++)
@ -406,7 +423,8 @@ void TestPartitionedDataSet(vtkm::Id nPerRank, bool useGhost, FilterType fType,
else if (fType == PARTICLE_ADVECTION)
{
vtkm::filter::flow::ParticleAdvection particleAdvection;
SetFilter(particleAdvection, stepSize, numSteps, fieldName, seedArray, useThreaded);
SetFilter(
particleAdvection, stepSize, numSteps, fieldName, seedArray, useThreaded, useAsyncComm);
auto out = particleAdvection.Execute(pds);
//Particles end up in last rank.
@ -424,7 +442,7 @@ void TestPartitionedDataSet(vtkm::Id nPerRank, bool useGhost, FilterType fType,
AddVectorFields(pds2, fieldName, vecX);
vtkm::filter::flow::Pathline pathline;
SetFilter(pathline, stepSize, numSteps, fieldName, seedArray, useThreaded);
SetFilter(pathline, stepSize, numSteps, fieldName, seedArray, useThreaded, useAsyncComm);
pathline.SetPreviousTime(time0);
pathline.SetNextTime(time1);
@ -452,21 +470,23 @@ void TestStreamlineFiltersMPI()
for (auto useGhost : flags)
for (auto fType : filterTypes)
for (auto useThreaded : flags)
{
useThreaded = false;
if (comm.rank() == 0)
std::cout << " N= " << n << " " << useGhost << " " << useThreaded << std::endl;
TestPartitionedDataSet(n, useGhost, fType, useThreaded);
}
for (auto useAsyncComm : flags)
{
if (useThreaded) // && !useAsyncComm)
continue;
TestPartitionedDataSet(n, useGhost, fType, useThreaded, useAsyncComm);
}
}
//streamline, threaded will sometimes hang.
for (auto fType : filterTypes)
for (auto useThreaded : flags)
{
useThreaded = false;
TestAMRStreamline(fType, useThreaded);
}
for (auto useAsyncComm : flags)
{
if (useThreaded) // && !useAsyncComm)
continue;
TestAMRStreamline(fType, useThreaded, useAsyncComm);
}
}
}