mirror of
https://gitlab.kitware.com/vtk/vtk-m
synced 2024-10-05 01:49:02 +00:00
Compare commits
7 Commits
d804d0fcdf
...
e6dbc071a2
Author | SHA1 | Date | |
---|---|---|---|
|
e6dbc071a2 | ||
|
ade25db6d5 | ||
|
c75dcace78 | ||
|
02553c0b6f | ||
|
eef93ff825 | ||
|
dd96f1144b | ||
|
4bd340156b |
@ -15,6 +15,7 @@
|
||||
#include <vtkm/cont/ErrorFilterExecution.h>
|
||||
#include <vtkm/filter/Filter.h>
|
||||
#include <vtkm/filter/flow/FlowTypes.h>
|
||||
#include <vtkm/filter/flow/internal/BoundsMap.h>
|
||||
#include <vtkm/filter/flow/vtkm_filter_flow_export.h>
|
||||
|
||||
namespace vtkm
|
||||
@ -104,7 +105,7 @@ protected:
|
||||
|
||||
bool BlockIdsSet = false;
|
||||
std::vector<vtkm::Id> BlockIds;
|
||||
|
||||
vtkm::filter::flow::internal::BoundsMap BoundsMap;
|
||||
vtkm::Id NumberOfSteps = 0;
|
||||
vtkm::cont::UnknownArrayHandle Seeds;
|
||||
vtkm::filter::flow::IntegrationSolverType SolverType =
|
||||
|
@ -58,13 +58,15 @@ FilterParticleAdvectionSteadyState<Derived>::DoExecutePartitions(
|
||||
DataSetIntegratorSteadyState<ParticleType, FieldType, TerminationType, AnalysisType>;
|
||||
|
||||
this->ValidateOptions();
|
||||
if (this->BlockIdsSet)
|
||||
this->BoundsMap = vtkm::filter::flow::internal::BoundsMap(input, this->BlockIds);
|
||||
else
|
||||
this->BoundsMap = vtkm::filter::flow::internal::BoundsMap(input);
|
||||
|
||||
|
||||
vtkm::filter::flow::internal::BoundsMap boundsMap(input);
|
||||
std::vector<DSIType> dsi;
|
||||
for (vtkm::Id i = 0; i < input.GetNumberOfPartitions(); i++)
|
||||
{
|
||||
vtkm::Id blockId = boundsMap.GetLocalBlockId(i);
|
||||
vtkm::Id blockId = this->BoundsMap.GetLocalBlockId(i);
|
||||
auto dataset = input.GetPartition(i);
|
||||
|
||||
// Build the field for the current dataset
|
||||
@ -78,7 +80,7 @@ FilterParticleAdvectionSteadyState<Derived>::DoExecutePartitions(
|
||||
}
|
||||
|
||||
vtkm::filter::flow::internal::ParticleAdvector<DSIType> pav(
|
||||
boundsMap, dsi, this->UseThreadedAlgorithm, this->UseAsynchronousCommunication);
|
||||
this->BoundsMap, dsi, this->UseThreadedAlgorithm, this->UseAsynchronousCommunication);
|
||||
|
||||
vtkm::cont::ArrayHandle<ParticleType> particles;
|
||||
this->Seeds.AsArrayHandle(particles);
|
||||
|
@ -55,12 +55,15 @@ FilterParticleAdvectionUnsteadyState<Derived>::DoExecutePartitions(
|
||||
using DSIType = vtkm::filter::flow::internal::
|
||||
DataSetIntegratorUnsteadyState<ParticleType, FieldType, TerminationType, AnalysisType>;
|
||||
|
||||
vtkm::filter::flow::internal::BoundsMap boundsMap(input);
|
||||
if (this->BlockIdsSet)
|
||||
this->BoundsMap = vtkm::filter::flow::internal::BoundsMap(input, this->BlockIds);
|
||||
else
|
||||
this->BoundsMap = vtkm::filter::flow::internal::BoundsMap(input);
|
||||
|
||||
std::vector<DSIType> dsi;
|
||||
for (vtkm::Id i = 0; i < input.GetNumberOfPartitions(); i++)
|
||||
{
|
||||
vtkm::Id blockId = boundsMap.GetLocalBlockId(i);
|
||||
vtkm::Id blockId = this->BoundsMap.GetLocalBlockId(i);
|
||||
auto ds1 = input.GetPartition(i);
|
||||
auto ds2 = this->Input2.GetPartition(i);
|
||||
|
||||
@ -85,7 +88,7 @@ FilterParticleAdvectionUnsteadyState<Derived>::DoExecutePartitions(
|
||||
analysis);
|
||||
}
|
||||
vtkm::filter::flow::internal::ParticleAdvector<DSIType> pav(
|
||||
boundsMap, dsi, this->UseThreadedAlgorithm, this->UseAsynchronousCommunication);
|
||||
this->BoundsMap, dsi, this->UseThreadedAlgorithm, this->UseAsynchronousCommunication);
|
||||
|
||||
vtkm::cont::ArrayHandle<ParticleType> particles;
|
||||
this->Seeds.AsArrayHandle(particles);
|
||||
|
@ -12,9 +12,15 @@
|
||||
#define vtk_m_filter_flow_internal_AdvectAlgorithm_h
|
||||
|
||||
#include <vtkm/cont/PartitionedDataSet.h>
|
||||
#include <vtkm/filter/flow/internal/AdvectAlgorithmTerminator.h>
|
||||
#include <vtkm/filter/flow/internal/BoundsMap.h>
|
||||
#include <vtkm/filter/flow/internal/DataSetIntegrator.h>
|
||||
#include <vtkm/filter/flow/internal/ParticleExchanger.h>
|
||||
#include <vtkm/filter/flow/internal/ParticleMessenger.h>
|
||||
#ifdef VTKM_ENABLE_MPI
|
||||
#include <vtkm/thirdparty/diy/diy.h>
|
||||
#include <vtkm/thirdparty/diy/mpi-cast.h>
|
||||
#endif
|
||||
|
||||
namespace vtkm
|
||||
{
|
||||
@ -25,6 +31,27 @@ namespace flow
|
||||
namespace internal
|
||||
{
|
||||
|
||||
/*
|
||||
ParticleMessenger::Exchange()
|
||||
- SendParticles(outData--> map[dstRank]=vector of pairs);
|
||||
-- SendParticles(map...)
|
||||
--- for each m : map SendParticles(m);
|
||||
---- SendParticles(dst, container)
|
||||
----- serialize, SendData(dst, buff);
|
||||
------ SendDataAsync(dst,buff)
|
||||
------- header??, req=mpi_isend(), store req.
|
||||
|
||||
- RecvAny(data, block);
|
||||
-- RecvData(tags, buffers, block)
|
||||
--- RecvDataAsyncProbe(tag, buffers, block)
|
||||
---- while (true)
|
||||
----- if block: MPI_Probe() msgReceived=true
|
||||
----- else : MPI_Iprobe msgReceived = check
|
||||
----- if msgRecvd: MPI_Get_count(), MPI_Recv(), buffers, blockAndWait=false
|
||||
|
||||
|
||||
*/
|
||||
|
||||
template <typename DSIType>
|
||||
class AdvectAlgorithm
|
||||
{
|
||||
@ -39,6 +66,8 @@ public:
|
||||
, NumRanks(this->Comm.size())
|
||||
, Rank(this->Comm.rank())
|
||||
, UseAsynchronousCommunication(useAsyncComm)
|
||||
, Terminator(this->Comm)
|
||||
, Exchanger(this->Comm)
|
||||
{
|
||||
}
|
||||
|
||||
@ -97,27 +126,21 @@ public:
|
||||
vtkm::filter::flow::internal::ParticleMessenger<ParticleType> messenger(
|
||||
this->Comm, this->UseAsynchronousCommunication, this->BoundsMap, 1, 128);
|
||||
|
||||
this->ComputeTotalNumParticles();
|
||||
|
||||
while (this->TotalNumTerminatedParticles < this->TotalNumParticles)
|
||||
while (!this->Terminator.Done())
|
||||
{
|
||||
std::vector<ParticleType> v;
|
||||
vtkm::Id numTerm = 0, blockId = -1;
|
||||
vtkm::Id blockId = -1;
|
||||
if (this->GetActiveParticles(v, blockId))
|
||||
{
|
||||
//make this a pointer to avoid the copy?
|
||||
auto& block = this->GetDataSet(blockId);
|
||||
DSIHelperInfo<ParticleType> bb(v, this->BoundsMap, this->ParticleBlockIDsMap);
|
||||
block.Advect(bb, this->StepSize);
|
||||
numTerm = this->UpdateResult(bb);
|
||||
this->UpdateResult(bb);
|
||||
}
|
||||
|
||||
vtkm::Id numTermMessages = 0;
|
||||
this->Communicate(messenger, numTerm, numTermMessages);
|
||||
|
||||
this->TotalNumTerminatedParticles += (numTerm + numTermMessages);
|
||||
if (this->TotalNumTerminatedParticles > this->TotalNumParticles)
|
||||
throw vtkm::cont::ErrorFilterExecution("Particle count error");
|
||||
this->Communicate(messenger);
|
||||
this->Terminator.Control(!this->Active.empty());
|
||||
}
|
||||
}
|
||||
|
||||
@ -128,19 +151,6 @@ public:
|
||||
this->ParticleBlockIDsMap.clear();
|
||||
}
|
||||
|
||||
void ComputeTotalNumParticles()
|
||||
{
|
||||
vtkm::Id numLocal = static_cast<vtkm::Id>(this->Inactive.size());
|
||||
for (const auto& it : this->Active)
|
||||
numLocal += it.second.size();
|
||||
|
||||
#ifdef VTKM_ENABLE_MPI
|
||||
vtkmdiy::mpi::all_reduce(this->Comm, numLocal, this->TotalNumParticles, std::plus<vtkm::Id>{});
|
||||
#else
|
||||
this->TotalNumParticles = numLocal;
|
||||
#endif
|
||||
}
|
||||
|
||||
DataSetIntegrator<DSIType, ParticleType>& GetDataSet(vtkm::Id id)
|
||||
{
|
||||
for (auto& it : this->Blocks)
|
||||
@ -213,9 +223,12 @@ public:
|
||||
return !particles.empty();
|
||||
}
|
||||
|
||||
void Communicate(vtkm::filter::flow::internal::ParticleMessenger<ParticleType>& messenger,
|
||||
vtkm::Id numLocalTerminations,
|
||||
vtkm::Id& numTermMessages)
|
||||
void ExchangeParticles()
|
||||
{
|
||||
// this->Exchanger.Exchange(outgoing, outgoingRanks, this->ParticleBlockIDsMap, incoming, incomingBlockIDs, block);
|
||||
}
|
||||
|
||||
void Communicate(vtkm::filter::flow::internal::ParticleMessenger<ParticleType>& messenger)
|
||||
{
|
||||
std::vector<ParticleType> outgoing;
|
||||
std::vector<vtkm::Id> outgoingRanks;
|
||||
@ -224,16 +237,19 @@ public:
|
||||
|
||||
std::vector<ParticleType> incoming;
|
||||
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>> incomingBlockIDs;
|
||||
numTermMessages = 0;
|
||||
|
||||
bool block = false;
|
||||
#ifdef VTKM_ENABLE_MPI
|
||||
block = this->GetBlockAndWait(messenger.UsingSyncCommunication(), numLocalTerminations);
|
||||
block = this->GetBlockAndWait(messenger.UsingSyncCommunication());
|
||||
#endif
|
||||
|
||||
// this->Exchanger.Exchange(outgoing, outgoingRanks, this->ParticleBlockIDsMap, incoming, incomingBlockIDs, block);
|
||||
|
||||
vtkm::Id numTermMessages;
|
||||
messenger.Exchange(outgoing,
|
||||
outgoingRanks,
|
||||
this->ParticleBlockIDsMap,
|
||||
numLocalTerminations,
|
||||
0,
|
||||
incoming,
|
||||
incomingBlockIDs,
|
||||
numTermMessages,
|
||||
@ -311,17 +327,22 @@ public:
|
||||
{
|
||||
VTKM_ASSERT(particles.size() == idsMap.size());
|
||||
|
||||
for (auto pit = particles.begin(); pit != particles.end(); pit++)
|
||||
if (!particles.empty())
|
||||
{
|
||||
vtkm::Id particleID = pit->GetID();
|
||||
const auto& it = idsMap.find(particleID);
|
||||
VTKM_ASSERT(it != idsMap.end() && !it->second.empty());
|
||||
vtkm::Id blockId = it->second[0];
|
||||
this->Active[blockId].emplace_back(*pit);
|
||||
}
|
||||
this->Terminator.AddWork();
|
||||
|
||||
for (const auto& it : idsMap)
|
||||
this->ParticleBlockIDsMap[it.first] = it.second;
|
||||
for (auto pit = particles.begin(); pit != particles.end(); pit++)
|
||||
{
|
||||
vtkm::Id particleID = pit->GetID();
|
||||
const auto& it = idsMap.find(particleID);
|
||||
VTKM_ASSERT(it != idsMap.end() && !it->second.empty());
|
||||
vtkm::Id blockId = it->second[0];
|
||||
this->Active[blockId].emplace_back(*pit);
|
||||
}
|
||||
|
||||
for (const auto& it : idsMap)
|
||||
this->ParticleBlockIDsMap[it.first] = it.second;
|
||||
}
|
||||
}
|
||||
|
||||
virtual void UpdateInactive(const std::vector<ParticleType>& particles,
|
||||
@ -351,7 +372,7 @@ public:
|
||||
}
|
||||
|
||||
|
||||
virtual bool GetBlockAndWait(const bool& syncComm, const vtkm::Id& numLocalTerm)
|
||||
virtual bool GetBlockAndWait(const bool& syncComm)
|
||||
{
|
||||
bool haveNoWork = this->Active.empty() && this->Inactive.empty();
|
||||
|
||||
@ -367,9 +388,11 @@ public:
|
||||
//2. numLocalTerm + this->TotalNumberOfTerminatedParticles == this->TotalNumberOfParticles
|
||||
//So, if neither are true, we can safely block and wait for communication to come in.
|
||||
|
||||
if (haveNoWork &&
|
||||
(numLocalTerm + this->TotalNumTerminatedParticles < this->TotalNumParticles))
|
||||
return true;
|
||||
// if (this->Terminator.State == AdvectAlgorithmTerminator::AdvectAlgorithmTerminatorState::STATE_2)
|
||||
// return true;
|
||||
|
||||
// if (haveNoWork && (numLocalTerm + this->TotalNumTerminatedParticles < this->TotalNumParticles))
|
||||
// return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
@ -388,9 +411,10 @@ public:
|
||||
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>> ParticleBlockIDsMap;
|
||||
vtkm::Id Rank;
|
||||
vtkm::FloatDefault StepSize;
|
||||
vtkm::Id TotalNumParticles = 0;
|
||||
vtkm::Id TotalNumTerminatedParticles = 0;
|
||||
bool UseAsynchronousCommunication = true;
|
||||
AdvectAlgorithmTerminator Terminator;
|
||||
|
||||
ParticleExchanger<ParticleType> Exchanger;
|
||||
};
|
||||
|
||||
}
|
||||
|
111
vtkm/filter/flow/internal/AdvectAlgorithmTerminator.h
Normal file
111
vtkm/filter/flow/internal/AdvectAlgorithmTerminator.h
Normal file
@ -0,0 +1,111 @@
|
||||
//============================================================================
|
||||
// Copyright (c) Kitware, Inc.
|
||||
// All rights reserved.
|
||||
// See LICENSE.txt for details.
|
||||
//
|
||||
// This software is distributed WITHOUT ANY WARRANTY; without even
|
||||
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
|
||||
// PURPOSE. See the above copyright notice for more information.
|
||||
//============================================================================
|
||||
|
||||
#ifndef vtk_m_filter_flow_internal_AdvectAlgorithmTerminator_h
|
||||
#define vtk_m_filter_flow_internal_AdvectAlgorithmTerminator_h
|
||||
|
||||
namespace vtkm
|
||||
{
|
||||
namespace filter
|
||||
{
|
||||
namespace flow
|
||||
{
|
||||
namespace internal
|
||||
{
|
||||
|
||||
class AdvectAlgorithmTerminator
|
||||
{
|
||||
public:
|
||||
#ifdef VTKM_ENABLE_MPI
|
||||
AdvectAlgorithmTerminator(vtkmdiy::mpi::communicator& comm)
|
||||
: MPIComm(vtkmdiy::mpi::mpi_cast(comm.handle()))
|
||||
#else
|
||||
AdvectAlgorithmTerminator(vtkmdiy::mpi::communicator& vtkmNotUsed(comm))
|
||||
#endif
|
||||
{
|
||||
}
|
||||
|
||||
void AddWork()
|
||||
{
|
||||
#ifdef VTKM_ENABLE_MPI
|
||||
this->Dirty = 1;
|
||||
#endif
|
||||
}
|
||||
|
||||
bool Done() const { return this->State == AdvectAlgorithmTerminatorState::DONE; }
|
||||
|
||||
void Control(bool haveLocalWork)
|
||||
{
|
||||
#ifdef VTKM_ENABLE_MPI
|
||||
if (this->State == STATE_0 && !haveLocalWork)
|
||||
{
|
||||
MPI_Ibarrier(this->MPIComm, &this->StateReq);
|
||||
this->Dirty = 0;
|
||||
this->State = STATE_1;
|
||||
}
|
||||
else if (this->State == STATE_1)
|
||||
{
|
||||
MPI_Status status;
|
||||
int flag;
|
||||
MPI_Test(&this->StateReq, &flag, &status);
|
||||
if (flag == 1)
|
||||
{
|
||||
int localDirty = this->Dirty;
|
||||
MPI_Iallreduce(
|
||||
&localDirty, &this->AllDirty, 1, MPI_INT, MPI_LOR, this->MPIComm, &this->StateReq);
|
||||
this->State = STATE_2;
|
||||
}
|
||||
}
|
||||
else if (this->State == STATE_2)
|
||||
{
|
||||
MPI_Status status;
|
||||
int flag;
|
||||
MPI_Test(&this->StateReq, &flag, &status);
|
||||
if (flag == 1)
|
||||
{
|
||||
if (this->AllDirty == 0) //done
|
||||
this->State = DONE;
|
||||
else
|
||||
this->State = STATE_0; //reset.
|
||||
}
|
||||
}
|
||||
#else
|
||||
if (!haveLocalWork)
|
||||
this->State = DONE;
|
||||
#endif
|
||||
}
|
||||
|
||||
private:
|
||||
enum AdvectAlgorithmTerminatorState
|
||||
{
|
||||
STATE_0,
|
||||
STATE_1,
|
||||
STATE_2,
|
||||
DONE
|
||||
};
|
||||
|
||||
AdvectAlgorithmTerminatorState State = AdvectAlgorithmTerminatorState::STATE_0;
|
||||
|
||||
#ifdef VTKM_ENABLE_MPI
|
||||
std::atomic<int> Dirty;
|
||||
int AllDirty = 0;
|
||||
MPI_Request StateReq;
|
||||
MPI_Comm MPIComm;
|
||||
#endif
|
||||
};
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
} //vtkm::filter::flow::internal
|
||||
|
||||
|
||||
#endif //vtk_m_filter_flow_internal_AdvectAlgorithmTerminator_h
|
@ -39,7 +39,6 @@ public:
|
||||
bool useAsyncComm)
|
||||
: AdvectAlgorithm<DSIType>(bm, blocks, useAsyncComm)
|
||||
, Done(false)
|
||||
, WorkerActivate(false)
|
||||
{
|
||||
//For threaded algorithm, the particles go out of scope in the Work method.
|
||||
//When this happens, they are destructed by the time the Manage thread gets them.
|
||||
@ -50,8 +49,6 @@ public:
|
||||
|
||||
void Go() override
|
||||
{
|
||||
this->ComputeTotalNumParticles();
|
||||
|
||||
std::vector<std::thread> workerThreads;
|
||||
workerThreads.emplace_back(std::thread(AdvectAlgorithmThreaded::Worker, this));
|
||||
this->Manage();
|
||||
@ -63,6 +60,13 @@ public:
|
||||
}
|
||||
|
||||
protected:
|
||||
bool HaveAnyWork()
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(this->Mutex);
|
||||
//We have work if there particles in any queues or a worker is busy.
|
||||
return !this->Active.empty() || !this->Inactive.empty() || this->WorkerActivate;
|
||||
}
|
||||
|
||||
bool GetActiveParticles(std::vector<ParticleType>& particles, vtkm::Id& blockId) override
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(this->Mutex);
|
||||
@ -144,38 +148,31 @@ protected:
|
||||
vtkm::filter::flow::internal::ParticleMessenger<ParticleType> messenger(
|
||||
this->Comm, useAsync, this->BoundsMap, 1, 128);
|
||||
|
||||
while (this->TotalNumTerminatedParticles < this->TotalNumParticles)
|
||||
while (!this->Terminator.Done())
|
||||
{
|
||||
std::unordered_map<vtkm::Id, std::vector<DSIHelperInfo<ParticleType>>> workerResults;
|
||||
this->GetWorkerResults(workerResults);
|
||||
|
||||
vtkm::Id numTerm = 0;
|
||||
for (auto& it : workerResults)
|
||||
{
|
||||
for (auto& r : it.second)
|
||||
numTerm += this->UpdateResult(r);
|
||||
}
|
||||
this->UpdateResult(r);
|
||||
|
||||
vtkm::Id numTermMessages = 0;
|
||||
this->Communicate(messenger, numTerm, numTermMessages);
|
||||
|
||||
this->TotalNumTerminatedParticles += (numTerm + numTermMessages);
|
||||
if (this->TotalNumTerminatedParticles > this->TotalNumParticles)
|
||||
throw vtkm::cont::ErrorFilterExecution("Particle count error");
|
||||
this->Communicate(messenger);
|
||||
this->Terminator.Control(this->HaveAnyWork());
|
||||
}
|
||||
|
||||
//Let the workers know that we are done.
|
||||
this->SetDone();
|
||||
}
|
||||
|
||||
bool GetBlockAndWait(const bool& syncComm, const vtkm::Id& numLocalTerm) override
|
||||
bool GetBlockAndWait(const bool& syncComm) override
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(this->Mutex);
|
||||
if (this->Done)
|
||||
return true;
|
||||
|
||||
return (this->AdvectAlgorithm<DSIType>::GetBlockAndWait(syncComm, numLocalTerm) &&
|
||||
!this->WorkerActivate && this->WorkerResults.empty());
|
||||
return (this->AdvectAlgorithm<DSIType>::GetBlockAndWait(syncComm) && !this->WorkerActivate &&
|
||||
this->WorkerResults.empty());
|
||||
}
|
||||
|
||||
void GetWorkerResults(
|
||||
@ -193,7 +190,7 @@ protected:
|
||||
|
||||
std::atomic<bool> Done;
|
||||
std::mutex Mutex;
|
||||
bool WorkerActivate;
|
||||
bool WorkerActivate = false;
|
||||
std::condition_variable WorkerActivateCondition;
|
||||
std::unordered_map<vtkm::Id, std::vector<DSIHelperInfo<ParticleType>>> WorkerResults;
|
||||
};
|
||||
|
@ -10,6 +10,7 @@
|
||||
|
||||
set(headers
|
||||
AdvectAlgorithm.h
|
||||
AdvectAlgorithmTerminator.h
|
||||
AdvectAlgorithmThreaded.h
|
||||
BoundsMap.h
|
||||
DataSetIntegrator.h
|
||||
@ -19,6 +20,7 @@ set(headers
|
||||
LagrangianStructureHelpers.h
|
||||
Messenger.h
|
||||
ParticleAdvector.h
|
||||
ParticleExchanger.h
|
||||
ParticleMessenger.h
|
||||
)
|
||||
|
||||
|
228
vtkm/filter/flow/internal/ParticleExchanger.h
Normal file
228
vtkm/filter/flow/internal/ParticleExchanger.h
Normal file
@ -0,0 +1,228 @@
|
||||
//============================================================================
|
||||
// Copyright (c) Kitware, Inc.
|
||||
// All rights reserved.
|
||||
// See LICENSE.txt for details.
|
||||
//
|
||||
// This software is distributed WITHOUT ANY WARRANTY; without even
|
||||
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
|
||||
// PURPOSE. See the above copyright notice for more information.
|
||||
//============================================================================
|
||||
|
||||
#ifndef vtk_m_filter_flow_internal_ParticleExchanger_h
|
||||
#define vtk_m_filter_flow_internal_ParticleExchanger_h
|
||||
|
||||
namespace vtkm
|
||||
{
|
||||
namespace filter
|
||||
{
|
||||
namespace flow
|
||||
{
|
||||
namespace internal
|
||||
{
|
||||
|
||||
template <typename ParticleType>
|
||||
class ParticleExchanger
|
||||
{
|
||||
public:
|
||||
#ifdef VTKM_ENABLE_MPI
|
||||
ParticleExchanger(vtkmdiy::mpi::communicator& comm)
|
||||
: MPIComm(vtkmdiy::mpi::mpi_cast(comm.handle()))
|
||||
, NumRanks(comm.size())
|
||||
, Rank(comm.rank())
|
||||
#else
|
||||
ParticleExchanger(vtkmdiy::mpi::communicator& vtkmNotUsed(comm))
|
||||
#endif
|
||||
{
|
||||
}
|
||||
#ifdef VTKM_ENABLE_MPI
|
||||
~ParticleExchanger() { this->CleanupSendBuffers(); }
|
||||
#endif
|
||||
|
||||
void Exchange(const std::vector<ParticleType>& outData,
|
||||
const std::vector<vtkm::Id>& outRanks,
|
||||
const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
|
||||
std::vector<ParticleType>& inData,
|
||||
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap,
|
||||
bool blockAndWait)
|
||||
{
|
||||
VTKM_ASSERT(outData.size() == outRanks.size());
|
||||
|
||||
if (this->NumRanks == 1)
|
||||
this->SerialExchange(outData, outBlockIDsMap, inData, inDataBlockIDsMap);
|
||||
#ifdef VTKM_ENABLE_MPI
|
||||
else
|
||||
{
|
||||
this->SendParticles(outData, outRanks, outBlockIDsMap);
|
||||
this->RecvParticles(inData, inDataBlockIDsMap, blockAndWait);
|
||||
this->CleanupSendBuffers();
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
private:
|
||||
// pair(vector of particles, vector of blockIds)
|
||||
//using ParticleCommType = std::pair<std::vector<ParticleType>, std::vector<vtkm::Id>>;
|
||||
// pair(particle, bids);
|
||||
using ParticleCommType = std::pair<ParticleType, std::vector<vtkm::Id>>;
|
||||
|
||||
void CleanupSendBuffers() { std::cout << "IMPLEMENT ME!!!" << std::endl; }
|
||||
|
||||
void SendParticles(const std::vector<ParticleType>& outData,
|
||||
const std::vector<vtkm::Id>& outRanks,
|
||||
const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap)
|
||||
{
|
||||
if (outData.empty())
|
||||
return;
|
||||
|
||||
//create the send data: vector of particles, vector of vector of blockIds.
|
||||
std::size_t n = outData.size();
|
||||
std::unordered_map<int, std::vector<ParticleCommType>> sendData;
|
||||
|
||||
// dst, vector of pair(particles, blockIds)
|
||||
for (std::size_t i = 0; i < n; i++)
|
||||
{
|
||||
const auto& bids = outBlockIDsMap.find(outData[i].GetID())->second;
|
||||
sendData[outRanks[i]].emplace_back(std::make_pair(std::move(outData[i]), std::move(bids)));
|
||||
}
|
||||
|
||||
//Send to dst, vector<pair<particle, bids>>
|
||||
for (auto& si : sendData)
|
||||
this->SendParticlesToDst(si.first, si.second);
|
||||
}
|
||||
|
||||
void SendParticlesToDst(int dst, const std::vector<ParticleCommType>& data)
|
||||
{
|
||||
if (dst == this->Rank)
|
||||
{
|
||||
VTKM_LOG_S(vtkm::cont::LogLevel::Error, "Error. Sending a particle to yourself.");
|
||||
return;
|
||||
}
|
||||
|
||||
//Serialize vector(pair(particle, bids)) and send.
|
||||
vtkmdiy::MemoryBuffer* bb = new vtkmdiy::MemoryBuffer();
|
||||
vtkmdiy::save(*bb, data);
|
||||
|
||||
MPI_Request req;
|
||||
int err = MPI_Isend(bb->buffer.data(), bb->size(), MPI_BYTE, dst, 0, this->MPIComm, &req);
|
||||
if (err != MPI_SUCCESS)
|
||||
throw vtkm::cont::ErrorFilterExecution("Error in MPI_Isend inside Messenger::SendData");
|
||||
this->SendBuffers[req] = bb;
|
||||
}
|
||||
|
||||
void RecvParticles(std::vector<ParticleType>& inData,
|
||||
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap,
|
||||
bool blockAndWait) const
|
||||
{
|
||||
inData.resize(0);
|
||||
inDataBlockIDsMap.clear();
|
||||
|
||||
std::vector<vtkmdiy::MemoryBuffer> buffers;
|
||||
|
||||
MPI_Status status;
|
||||
while (true)
|
||||
{
|
||||
bool msgReceived = false;
|
||||
int err;
|
||||
if (blockAndWait)
|
||||
{
|
||||
err = MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, this->MPIComm, &status);
|
||||
if (err != MPI_SUCCESS)
|
||||
throw vtkm::cont::ErrorFilterExecution(
|
||||
"Error in MPI_Probe in ParticleExchanger::RecvParticles");
|
||||
msgReceived = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
int flag = 0;
|
||||
err = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, this->MPIComm, &flag, &status);
|
||||
if (err != MPI_SUCCESS)
|
||||
throw vtkm::cont::ErrorFilterExecution(
|
||||
"Error in MPI_Probe in ParticleExchanger::RecvParticles");
|
||||
msgReceived = (flag == 1);
|
||||
}
|
||||
|
||||
if (msgReceived)
|
||||
{
|
||||
int incomingSize;
|
||||
err = MPI_Get_count(&status, MPI_BYTE, &incomingSize);
|
||||
if (err != MPI_SUCCESS)
|
||||
throw vtkm::cont::ErrorFilterExecution(
|
||||
"Error in MPI_Probe in ParticleExchanger::RecvParticles");
|
||||
|
||||
std::vector<char> recvBuff;
|
||||
recvBuff.resize(incomingSize);
|
||||
MPI_Status recvStatus;
|
||||
|
||||
err = MPI_Recv(recvBuff.data(),
|
||||
incomingSize,
|
||||
MPI_BYTE,
|
||||
status.MPI_SOURCE,
|
||||
status.MPI_TAG,
|
||||
this->MPIComm,
|
||||
&recvStatus);
|
||||
if (err != MPI_SUCCESS)
|
||||
throw vtkm::cont::ErrorFilterExecution(
|
||||
"Error in MPI_Probe in ParticleExchanger::RecvParticles");
|
||||
|
||||
vtkmdiy::MemoryBuffer memBuff;
|
||||
vtkmdiy::save(memBuff, recvBuff);
|
||||
buffers.emplace_back(std::move(memBuff));
|
||||
|
||||
blockAndWait = false; //Check one more time to see if anything else arrived.
|
||||
}
|
||||
else
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
//Unpack buffers into particle data.
|
||||
//buffers: vector<pair(particle, vector<vtkm::Id>)>
|
||||
for (auto& b : buffers)
|
||||
{
|
||||
std::vector<ParticleCommType> data;
|
||||
vtkmdiy::load(b, data);
|
||||
|
||||
for (auto& d : data)
|
||||
{
|
||||
const auto& particle = d.first;
|
||||
const auto& bids = d.second;
|
||||
inDataBlockIDsMap[particle.GetID()] = std::move(bids);
|
||||
inData.emplace_back(std::move(particle));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void SerialExchange(const std::vector<ParticleType>& outData,
|
||||
const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
|
||||
std::vector<ParticleType>& inData,
|
||||
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap)
|
||||
{
|
||||
//Copy output to input.
|
||||
for (const auto& p : outData)
|
||||
{
|
||||
const auto& bids = outBlockIDsMap.find(p.GetID())->second;
|
||||
inData.emplace_back(p);
|
||||
inDataBlockIDsMap[p.GetID()] = bids;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#ifdef VTKM_ENABLE_MPI
|
||||
MPI_Comm MPIComm;
|
||||
vtkm::Id NumRanks;
|
||||
vtkm::Id Rank;
|
||||
std::unordered_map<MPI_Request, vtkmdiy::MemoryBuffer*> SendBuffers;
|
||||
#else
|
||||
vtkm::Id NumRanks = 1;
|
||||
vtkm::Id Rank = 0;
|
||||
#endif
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
} //vtkm::filter::flow::internal
|
||||
|
||||
|
||||
#endif //vtk_m_filter_flow_internal_ParticleExchanger_h
|
Loading…
Reference in New Issue
Block a user