Compare commits

...

8 Commits

Author SHA1 Message Date
Dave Pugmire
d804d0fcdf Merge branch 'sl_comm_probe' into 'master'
Draft: Use async termination.

See merge request vtk/vtk-m!3182
2024-07-02 16:33:03 -04:00
Kenneth Moreland
5df372db2b Merge topic 'env-options'
310579b9a Load options from environment variables

Acked-by: Kitware Robot <kwrobot@kitware.com>
Acked-by: Li-Ta Lo <ollie@lanl.gov>
Merge-request: !3243
2024-07-02 16:31:08 -04:00
Kenneth Moreland
310579b9a7 Load options from environment variables
Some common VTK-m options such as the device and log level could be
specified on the command line but not through environment variables. It is
not always possible to set VTK-m command line options, so environment
variables are added.

Also added documentation to the user's guide about what options are
available and how to set them.
2024-07-02 12:47:34 -04:00
Dave Pugmire
c75dcace78 Merge branch 'master' of https://gitlab.kitware.com/vtk/vtk-m into sl_comm_probe 2024-04-01 15:44:57 -04:00
Dave Pugmire
02553c0b6f Merge branch 'master' of https://gitlab.kitware.com/vtk/vtk-m into sl_comm_probe 2024-03-19 15:33:13 -04:00
Dave Pugmire
eef93ff825 Merge branch 'master' of https://gitlab.kitware.com/vtk/vtk-m into sl_comm_probe 2024-02-02 15:05:57 -05:00
Dave Pugmire
dd96f1144b Fixes for non-mpi code... 2024-01-24 07:23:55 -05:00
Dave Pugmire
4bd340156b Use async termination. 2024-01-23 16:57:10 -05:00
10 changed files with 265 additions and 95 deletions

@ -0,0 +1,9 @@
## Load options from environment variables
Some common VTK-m options such as the device and log level could be
specified on the command line but not through environment variables. It is
not always possible to set VTK-m command line options, so environment
variables are added.
Also added documentation to the user's guide about what options are
available and how to set them.

@ -19,6 +19,44 @@ But it can also optionally take the ``argc`` and ``argv`` arguments to the ``mai
|VTKm| accepts arguments that, for example, configure the compute device to use or establish logging levels. |VTKm| accepts arguments that, for example, configure the compute device to use or establish logging levels.
Any arguments that are handled by |VTKm| are removed from the ``argc``/``argv`` list so that your program can then respond to the remaining arguments. Any arguments that are handled by |VTKm| are removed from the ``argc``/``argv`` list so that your program can then respond to the remaining arguments.
Many options can also be set with environment variables.
If both the environment variable and command line argument are provided, the command line argument is used.
The following table lists the currently supported options.
.. list-table:: |VTKm| command line arguments and environment variable options.
:widths: 23 22 15 40
:header-rows: 1
* - Command Line Argument
- Environment Variable
- Default Value
- Description
* - ``--vtkm-help``
-
-
- Causes the program to print information about |VTKm| command line arguments and then exits the program.
* - ``--vtkm-log-level``
- ``VTKM_LOG_LEVEL``
- ``WARNING``
- Specifies the logging level.
Valid values are ``INFO``, ``WARNING``, ``ERROR``, ``FATAL``, and ``OFF``.
This can also be set to a numeric value for the logging level.
* - ``--vtkm-device``
- ``VTKM_DEVICE``
-
- Force |VTKm| to use the specified device.
If not specified or ``Any`` given, then any available device may be used.
* - ``--vtkm-num-threads``
- ``VTKM_NUM_THREADS``
-
- Set the number of threads to use on a multi-core device.
If not specified, the device will use the cores available in the system.
* - ``--vtkm-device-instance``
- ``VTKM_DEVICE_INSTANCE``
-
- Selects the device to use when more than one device device of a given type is available.
The device is specified with a numbered index.
:func:`vtkm::cont::Initialize` returns a :struct:`vtkm::cont::InitializeResult` structure. :func:`vtkm::cont::Initialize` returns a :struct:`vtkm::cont::InitializeResult` structure.
This structure contains information about the supported arguments and options selected during initialization. This structure contains information about the supported arguments and options selected during initialization.

@ -17,6 +17,7 @@
#include <vtkm/thirdparty/diy/environment.h> #include <vtkm/thirdparty/diy/environment.h>
#include <cstdlib>
#include <memory> #include <memory>
#include <sstream> #include <sstream>
@ -123,7 +124,7 @@ InitializeResult Initialize(int& argc, char* argv[], InitializeOptions opts)
} }
else else
{ {
vtkm::cont::InitLogging(argc, argv, loggingFlag); vtkm::cont::InitLogging(argc, argv, loggingFlag, "VTKM_LOG_LEVEL");
} }
if (!vtkmdiy::mpi::environment::initialized()) if (!vtkmdiy::mpi::environment::initialized())
{ {
@ -225,37 +226,70 @@ InitializeResult Initialize(int& argc, char* argv[], InitializeOptions opts)
vtkm::cont::DeviceAdapterTagAny{}, runtimeDeviceOptions, argc, argv); vtkm::cont::DeviceAdapterTagAny{}, runtimeDeviceOptions, argc, argv);
} }
// Check for device on command line.
if (options[opt::OptionIndex::DEVICE]) if (options[opt::OptionIndex::DEVICE])
{ {
const char* arg = options[opt::OptionIndex::DEVICE].arg; const char* arg = options[opt::OptionIndex::DEVICE].arg;
auto id = vtkm::cont::make_DeviceAdapterId(arg); config.Device = vtkm::cont::make_DeviceAdapterId(arg);
if (id != vtkm::cont::DeviceAdapterTagAny{}) }
// If not on command line, check for device in environment variable.
if (config.Device == vtkm::cont::DeviceAdapterTagUndefined{})
{
const char* deviceEnv = std::getenv("VTKM_DEVICE");
if (deviceEnv != nullptr)
{ {
vtkm::cont::GetRuntimeDeviceTracker().ForceDevice(id); auto id = vtkm::cont::make_DeviceAdapterId(std::getenv("VTKM_DEVICE"));
if (VtkmDeviceArg::DeviceIsAvailable(id))
{
config.Device = id;
}
else
{
// Got invalid device. Log an error, but continue to do the default action for
// the device (i.e., ignore the environment variable setting).
VTKM_LOG_S(vtkm::cont::LogLevel::Error,
"Invalid device `"
<< deviceEnv
<< "` specified in VTKM_DEVICE environment variable. Ignoring.");
VTKM_LOG_S(vtkm::cont::LogLevel::Error,
"Valid devices are: " << VtkmDeviceArg::GetValidDeviceNames());
}
}
}
// If still not defined, check to see if "any" device should be added.
if ((config.Device == vtkm::cont::DeviceAdapterTagUndefined{}) &&
(opts & InitializeOptions::DefaultAnyDevice) != InitializeOptions::None)
{
config.Device = vtkm::cont::DeviceAdapterTagAny{};
}
// Set the state for the device selected.
if (config.Device == vtkm::cont::DeviceAdapterTagUndefined{})
{
if ((opts & InitializeOptions::RequireDevice) != InitializeOptions::None)
{
auto devices = VtkmDeviceArg::GetValidDeviceNames();
VTKM_LOG_S(vtkm::cont::LogLevel::Fatal, "Device not given on command line.");
std::cerr << "Target device must be specified via --vtkm-device.\n"
"Valid devices: "
<< devices << std::endl;
if ((opts & InitializeOptions::AddHelp) != InitializeOptions::None)
{
std::cerr << config.Usage;
}
exit(1);
} }
else else
{ {
vtkm::cont::GetRuntimeDeviceTracker().Reset(); // No device specified. Do nothing and let VTK-m decide what it is going to do.
} }
config.Device = id;
} }
else if ((opts & InitializeOptions::DefaultAnyDevice) != InitializeOptions::None) else if (config.Device == vtkm::cont::DeviceAdapterTagAny{})
{ {
vtkm::cont::GetRuntimeDeviceTracker().Reset(); vtkm::cont::GetRuntimeDeviceTracker().Reset();
config.Device = vtkm::cont::DeviceAdapterTagAny{};
} }
else if ((opts & InitializeOptions::RequireDevice) != InitializeOptions::None) else
{ {
auto devices = VtkmDeviceArg::GetValidDeviceNames(); vtkm::cont::GetRuntimeDeviceTracker().ForceDevice(config.Device);
VTKM_LOG_S(vtkm::cont::LogLevel::Error, "Device not given on command line.");
std::cerr << "Target device must be specified via --vtkm-device.\n"
"Valid devices: "
<< devices << std::endl;
if ((opts & InitializeOptions::AddHelp) != InitializeOptions::None)
{
std::cerr << config.Usage;
}
exit(1);
} }

@ -30,7 +30,7 @@
#endif // VTKM_ENABLE_LOGGING #endif // VTKM_ENABLE_LOGGING
#include <cassert> #include <cstdlib>
#include <iomanip> #include <iomanip>
#include <sstream> #include <sstream>
#include <stdexcept> #include <stdexcept>
@ -108,7 +108,10 @@ namespace cont
{ {
VTKM_CONT VTKM_CONT
void InitLogging(int& argc, char* argv[], const std::string& loggingFlag) void InitLogging(int& argc,
char* argv[],
const std::string& loggingFlag,
const std::string& loggingEnv)
{ {
SetLogLevelName(vtkm::cont::LogLevel::Off, "Off"); SetLogLevelName(vtkm::cont::LogLevel::Off, "Off");
SetLogLevelName(vtkm::cont::LogLevel::Fatal, "FATL"); SetLogLevelName(vtkm::cont::LogLevel::Fatal, "FATL");
@ -130,8 +133,16 @@ void InitLogging(int& argc, char* argv[], const std::string& loggingFlag)
loguru::set_verbosity_to_name_callback(&verbosityToNameCallback); loguru::set_verbosity_to_name_callback(&verbosityToNameCallback);
loguru::set_name_to_verbosity_callback(&nameToVerbosityCallback); loguru::set_name_to_verbosity_callback(&nameToVerbosityCallback);
// Set the default log level to warning const char* envLevel = std::getenv(loggingEnv.c_str());
SetStderrLogLevel(vtkm::cont::LogLevel::Warn); if (envLevel != nullptr)
{
SetStderrLogLevel(envLevel);
}
else
{
// Set the default log level to warning
SetStderrLogLevel(vtkm::cont::LogLevel::Warn);
}
loguru::init(argc, argv, loggingFlag.c_str()); loguru::init(argc, argv, loggingFlag.c_str());
} }
#else // VTKM_ENABLE_LOGGING #else // VTKM_ENABLE_LOGGING

@ -371,7 +371,10 @@ enum class LogLevel
*/ */
VTKM_CONT_EXPORT VTKM_CONT_EXPORT
VTKM_CONT VTKM_CONT
void InitLogging(int& argc, char* argv[], const std::string& loggingFlag = "--vtkm-log-level"); void InitLogging(int& argc,
char* argv[],
const std::string& loggingFlag = "--vtkm-log-level",
const std::string& loggingEnv = "VTKM_LOG_LEVEL");
VTKM_CONT_EXPORT VTKM_CONT_EXPORT
VTKM_CONT VTKM_CONT
void InitLogging(); void InitLogging();

@ -15,6 +15,7 @@
#include <vtkm/cont/ErrorFilterExecution.h> #include <vtkm/cont/ErrorFilterExecution.h>
#include <vtkm/filter/Filter.h> #include <vtkm/filter/Filter.h>
#include <vtkm/filter/flow/FlowTypes.h> #include <vtkm/filter/flow/FlowTypes.h>
#include <vtkm/filter/flow/internal/BoundsMap.h>
#include <vtkm/filter/flow/vtkm_filter_flow_export.h> #include <vtkm/filter/flow/vtkm_filter_flow_export.h>
namespace vtkm namespace vtkm
@ -104,7 +105,7 @@ protected:
bool BlockIdsSet = false; bool BlockIdsSet = false;
std::vector<vtkm::Id> BlockIds; std::vector<vtkm::Id> BlockIds;
vtkm::filter::flow::internal::BoundsMap BoundsMap;
vtkm::Id NumberOfSteps = 0; vtkm::Id NumberOfSteps = 0;
vtkm::cont::UnknownArrayHandle Seeds; vtkm::cont::UnknownArrayHandle Seeds;
vtkm::filter::flow::IntegrationSolverType SolverType = vtkm::filter::flow::IntegrationSolverType SolverType =

@ -58,13 +58,15 @@ FilterParticleAdvectionSteadyState<Derived>::DoExecutePartitions(
DataSetIntegratorSteadyState<ParticleType, FieldType, TerminationType, AnalysisType>; DataSetIntegratorSteadyState<ParticleType, FieldType, TerminationType, AnalysisType>;
this->ValidateOptions(); this->ValidateOptions();
if (this->BlockIdsSet)
this->BoundsMap = vtkm::filter::flow::internal::BoundsMap(input, this->BlockIds);
else
this->BoundsMap = vtkm::filter::flow::internal::BoundsMap(input);
vtkm::filter::flow::internal::BoundsMap boundsMap(input);
std::vector<DSIType> dsi; std::vector<DSIType> dsi;
for (vtkm::Id i = 0; i < input.GetNumberOfPartitions(); i++) for (vtkm::Id i = 0; i < input.GetNumberOfPartitions(); i++)
{ {
vtkm::Id blockId = boundsMap.GetLocalBlockId(i); vtkm::Id blockId = this->BoundsMap.GetLocalBlockId(i);
auto dataset = input.GetPartition(i); auto dataset = input.GetPartition(i);
// Build the field for the current dataset // Build the field for the current dataset
@ -78,7 +80,7 @@ FilterParticleAdvectionSteadyState<Derived>::DoExecutePartitions(
} }
vtkm::filter::flow::internal::ParticleAdvector<DSIType> pav( vtkm::filter::flow::internal::ParticleAdvector<DSIType> pav(
boundsMap, dsi, this->UseThreadedAlgorithm, this->UseAsynchronousCommunication); this->BoundsMap, dsi, this->UseThreadedAlgorithm, this->UseAsynchronousCommunication);
vtkm::cont::ArrayHandle<ParticleType> particles; vtkm::cont::ArrayHandle<ParticleType> particles;
this->Seeds.AsArrayHandle(particles); this->Seeds.AsArrayHandle(particles);

@ -55,12 +55,15 @@ FilterParticleAdvectionUnsteadyState<Derived>::DoExecutePartitions(
using DSIType = vtkm::filter::flow::internal:: using DSIType = vtkm::filter::flow::internal::
DataSetIntegratorUnsteadyState<ParticleType, FieldType, TerminationType, AnalysisType>; DataSetIntegratorUnsteadyState<ParticleType, FieldType, TerminationType, AnalysisType>;
vtkm::filter::flow::internal::BoundsMap boundsMap(input); if (this->BlockIdsSet)
this->BoundsMap = vtkm::filter::flow::internal::BoundsMap(input, this->BlockIds);
else
this->BoundsMap = vtkm::filter::flow::internal::BoundsMap(input);
std::vector<DSIType> dsi; std::vector<DSIType> dsi;
for (vtkm::Id i = 0; i < input.GetNumberOfPartitions(); i++) for (vtkm::Id i = 0; i < input.GetNumberOfPartitions(); i++)
{ {
vtkm::Id blockId = boundsMap.GetLocalBlockId(i); vtkm::Id blockId = this->BoundsMap.GetLocalBlockId(i);
auto ds1 = input.GetPartition(i); auto ds1 = input.GetPartition(i);
auto ds2 = this->Input2.GetPartition(i); auto ds2 = this->Input2.GetPartition(i);
@ -85,7 +88,7 @@ FilterParticleAdvectionUnsteadyState<Derived>::DoExecutePartitions(
analysis); analysis);
} }
vtkm::filter::flow::internal::ParticleAdvector<DSIType> pav( vtkm::filter::flow::internal::ParticleAdvector<DSIType> pav(
boundsMap, dsi, this->UseThreadedAlgorithm, this->UseAsynchronousCommunication); this->BoundsMap, dsi, this->UseThreadedAlgorithm, this->UseAsynchronousCommunication);
vtkm::cont::ArrayHandle<ParticleType> particles; vtkm::cont::ArrayHandle<ParticleType> particles;
this->Seeds.AsArrayHandle(particles); this->Seeds.AsArrayHandle(particles);

@ -15,6 +15,10 @@
#include <vtkm/filter/flow/internal/BoundsMap.h> #include <vtkm/filter/flow/internal/BoundsMap.h>
#include <vtkm/filter/flow/internal/DataSetIntegrator.h> #include <vtkm/filter/flow/internal/DataSetIntegrator.h>
#include <vtkm/filter/flow/internal/ParticleMessenger.h> #include <vtkm/filter/flow/internal/ParticleMessenger.h>
#ifdef VTKM_ENABLE_MPI
#include <vtkm/thirdparty/diy/diy.h>
#include <vtkm/thirdparty/diy/mpi-cast.h>
#endif
namespace vtkm namespace vtkm
{ {
@ -25,6 +29,87 @@ namespace flow
namespace internal namespace internal
{ {
class AdvectAlgorithmTerminator
{
public:
#ifdef VTKM_ENABLE_MPI
AdvectAlgorithmTerminator(vtkmdiy::mpi::communicator& comm)
: MPIComm(vtkmdiy::mpi::mpi_cast(comm.handle()))
#else
AdvectAlgorithmTerminator(vtkmdiy::mpi::communicator& vtkmNotUsed(comm))
#endif
{
}
void AddWork()
{
#ifdef VTKM_ENABLE_MPI
this->Dirty = 1;
#endif
}
bool Done() const { return this->State == AdvectAlgorithmTerminatorState::DONE; }
void Control(bool haveLocalWork)
{
#ifdef VTKM_ENABLE_MPI
if (this->State == STATE_0 && !haveLocalWork)
{
MPI_Ibarrier(this->MPIComm, &this->StateReq);
this->Dirty = 0;
this->State = STATE_1;
}
else if (this->State == STATE_1)
{
MPI_Status status;
int flag;
MPI_Test(&this->StateReq, &flag, &status);
if (flag == 1)
{
int localDirty = this->Dirty;
MPI_Iallreduce(
&localDirty, &this->AllDirty, 1, MPI_INT, MPI_LOR, this->MPIComm, &this->StateReq);
this->State = STATE_2;
}
}
else if (this->State == STATE_2)
{
MPI_Status status;
int flag;
MPI_Test(&this->StateReq, &flag, &status);
if (flag == 1)
{
if (this->AllDirty == 0) //done
this->State = DONE;
else
this->State = STATE_0; //reset.
}
}
#else
if (!haveLocalWork)
this->State = DONE;
#endif
}
private:
enum AdvectAlgorithmTerminatorState
{
STATE_0,
STATE_1,
STATE_2,
DONE
};
AdvectAlgorithmTerminatorState State = AdvectAlgorithmTerminatorState::STATE_0;
#ifdef VTKM_ENABLE_MPI
std::atomic<int> Dirty;
int AllDirty = 0;
MPI_Request StateReq;
MPI_Comm MPIComm;
#endif
};
template <typename DSIType> template <typename DSIType>
class AdvectAlgorithm class AdvectAlgorithm
{ {
@ -39,6 +124,7 @@ public:
, NumRanks(this->Comm.size()) , NumRanks(this->Comm.size())
, Rank(this->Comm.rank()) , Rank(this->Comm.rank())
, UseAsynchronousCommunication(useAsyncComm) , UseAsynchronousCommunication(useAsyncComm)
, Terminator(this->Comm)
{ {
} }
@ -97,27 +183,21 @@ public:
vtkm::filter::flow::internal::ParticleMessenger<ParticleType> messenger( vtkm::filter::flow::internal::ParticleMessenger<ParticleType> messenger(
this->Comm, this->UseAsynchronousCommunication, this->BoundsMap, 1, 128); this->Comm, this->UseAsynchronousCommunication, this->BoundsMap, 1, 128);
this->ComputeTotalNumParticles(); while (!this->Terminator.Done())
while (this->TotalNumTerminatedParticles < this->TotalNumParticles)
{ {
std::vector<ParticleType> v; std::vector<ParticleType> v;
vtkm::Id numTerm = 0, blockId = -1; vtkm::Id blockId = -1;
if (this->GetActiveParticles(v, blockId)) if (this->GetActiveParticles(v, blockId))
{ {
//make this a pointer to avoid the copy? //make this a pointer to avoid the copy?
auto& block = this->GetDataSet(blockId); auto& block = this->GetDataSet(blockId);
DSIHelperInfo<ParticleType> bb(v, this->BoundsMap, this->ParticleBlockIDsMap); DSIHelperInfo<ParticleType> bb(v, this->BoundsMap, this->ParticleBlockIDsMap);
block.Advect(bb, this->StepSize); block.Advect(bb, this->StepSize);
numTerm = this->UpdateResult(bb); this->UpdateResult(bb);
} }
vtkm::Id numTermMessages = 0; this->Communicate(messenger);
this->Communicate(messenger, numTerm, numTermMessages); this->Terminator.Control(!this->Active.empty());
this->TotalNumTerminatedParticles += (numTerm + numTermMessages);
if (this->TotalNumTerminatedParticles > this->TotalNumParticles)
throw vtkm::cont::ErrorFilterExecution("Particle count error");
} }
} }
@ -128,19 +208,6 @@ public:
this->ParticleBlockIDsMap.clear(); this->ParticleBlockIDsMap.clear();
} }
void ComputeTotalNumParticles()
{
vtkm::Id numLocal = static_cast<vtkm::Id>(this->Inactive.size());
for (const auto& it : this->Active)
numLocal += it.second.size();
#ifdef VTKM_ENABLE_MPI
vtkmdiy::mpi::all_reduce(this->Comm, numLocal, this->TotalNumParticles, std::plus<vtkm::Id>{});
#else
this->TotalNumParticles = numLocal;
#endif
}
DataSetIntegrator<DSIType, ParticleType>& GetDataSet(vtkm::Id id) DataSetIntegrator<DSIType, ParticleType>& GetDataSet(vtkm::Id id)
{ {
for (auto& it : this->Blocks) for (auto& it : this->Blocks)
@ -213,9 +280,7 @@ public:
return !particles.empty(); return !particles.empty();
} }
void Communicate(vtkm::filter::flow::internal::ParticleMessenger<ParticleType>& messenger, void Communicate(vtkm::filter::flow::internal::ParticleMessenger<ParticleType>& messenger)
vtkm::Id numLocalTerminations,
vtkm::Id& numTermMessages)
{ {
std::vector<ParticleType> outgoing; std::vector<ParticleType> outgoing;
std::vector<vtkm::Id> outgoingRanks; std::vector<vtkm::Id> outgoingRanks;
@ -224,16 +289,17 @@ public:
std::vector<ParticleType> incoming; std::vector<ParticleType> incoming;
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>> incomingBlockIDs; std::unordered_map<vtkm::Id, std::vector<vtkm::Id>> incomingBlockIDs;
numTermMessages = 0;
bool block = false; bool block = false;
#ifdef VTKM_ENABLE_MPI #ifdef VTKM_ENABLE_MPI
block = this->GetBlockAndWait(messenger.UsingSyncCommunication(), numLocalTerminations); block = this->GetBlockAndWait(messenger.UsingSyncCommunication());
#endif #endif
vtkm::Id numTermMessages;
messenger.Exchange(outgoing, messenger.Exchange(outgoing,
outgoingRanks, outgoingRanks,
this->ParticleBlockIDsMap, this->ParticleBlockIDsMap,
numLocalTerminations, 0,
incoming, incoming,
incomingBlockIDs, incomingBlockIDs,
numTermMessages, numTermMessages,
@ -311,17 +377,22 @@ public:
{ {
VTKM_ASSERT(particles.size() == idsMap.size()); VTKM_ASSERT(particles.size() == idsMap.size());
for (auto pit = particles.begin(); pit != particles.end(); pit++) if (!particles.empty())
{ {
vtkm::Id particleID = pit->GetID(); this->Terminator.AddWork();
const auto& it = idsMap.find(particleID);
VTKM_ASSERT(it != idsMap.end() && !it->second.empty());
vtkm::Id blockId = it->second[0];
this->Active[blockId].emplace_back(*pit);
}
for (const auto& it : idsMap) for (auto pit = particles.begin(); pit != particles.end(); pit++)
this->ParticleBlockIDsMap[it.first] = it.second; {
vtkm::Id particleID = pit->GetID();
const auto& it = idsMap.find(particleID);
VTKM_ASSERT(it != idsMap.end() && !it->second.empty());
vtkm::Id blockId = it->second[0];
this->Active[blockId].emplace_back(*pit);
}
for (const auto& it : idsMap)
this->ParticleBlockIDsMap[it.first] = it.second;
}
} }
virtual void UpdateInactive(const std::vector<ParticleType>& particles, virtual void UpdateInactive(const std::vector<ParticleType>& particles,
@ -351,7 +422,7 @@ public:
} }
virtual bool GetBlockAndWait(const bool& syncComm, const vtkm::Id& numLocalTerm) virtual bool GetBlockAndWait(const bool& syncComm)
{ {
bool haveNoWork = this->Active.empty() && this->Inactive.empty(); bool haveNoWork = this->Active.empty() && this->Inactive.empty();
@ -367,9 +438,11 @@ public:
//2. numLocalTerm + this->TotalNumberOfTerminatedParticles == this->TotalNumberOfParticles //2. numLocalTerm + this->TotalNumberOfTerminatedParticles == this->TotalNumberOfParticles
//So, if neither are true, we can safely block and wait for communication to come in. //So, if neither are true, we can safely block and wait for communication to come in.
if (haveNoWork && // if (this->Terminator.State == AdvectAlgorithmTerminator::AdvectAlgorithmTerminatorState::STATE_2)
(numLocalTerm + this->TotalNumTerminatedParticles < this->TotalNumParticles)) // return true;
return true;
// if (haveNoWork && (numLocalTerm + this->TotalNumTerminatedParticles < this->TotalNumParticles))
// return true;
return false; return false;
} }
@ -388,9 +461,8 @@ public:
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>> ParticleBlockIDsMap; std::unordered_map<vtkm::Id, std::vector<vtkm::Id>> ParticleBlockIDsMap;
vtkm::Id Rank; vtkm::Id Rank;
vtkm::FloatDefault StepSize; vtkm::FloatDefault StepSize;
vtkm::Id TotalNumParticles = 0;
vtkm::Id TotalNumTerminatedParticles = 0;
bool UseAsynchronousCommunication = true; bool UseAsynchronousCommunication = true;
AdvectAlgorithmTerminator Terminator;
}; };
} }

@ -39,7 +39,6 @@ public:
bool useAsyncComm) bool useAsyncComm)
: AdvectAlgorithm<DSIType>(bm, blocks, useAsyncComm) : AdvectAlgorithm<DSIType>(bm, blocks, useAsyncComm)
, Done(false) , Done(false)
, WorkerActivate(false)
{ {
//For threaded algorithm, the particles go out of scope in the Work method. //For threaded algorithm, the particles go out of scope in the Work method.
//When this happens, they are destructed by the time the Manage thread gets them. //When this happens, they are destructed by the time the Manage thread gets them.
@ -50,8 +49,6 @@ public:
void Go() override void Go() override
{ {
this->ComputeTotalNumParticles();
std::vector<std::thread> workerThreads; std::vector<std::thread> workerThreads;
workerThreads.emplace_back(std::thread(AdvectAlgorithmThreaded::Worker, this)); workerThreads.emplace_back(std::thread(AdvectAlgorithmThreaded::Worker, this));
this->Manage(); this->Manage();
@ -63,6 +60,13 @@ public:
} }
protected: protected:
bool HaveAnyWork()
{
std::lock_guard<std::mutex> lock(this->Mutex);
//We have work if there particles in any queues or a worker is busy.
return !this->Active.empty() || !this->Inactive.empty() || this->WorkerActivate;
}
bool GetActiveParticles(std::vector<ParticleType>& particles, vtkm::Id& blockId) override bool GetActiveParticles(std::vector<ParticleType>& particles, vtkm::Id& blockId) override
{ {
std::lock_guard<std::mutex> lock(this->Mutex); std::lock_guard<std::mutex> lock(this->Mutex);
@ -144,38 +148,31 @@ protected:
vtkm::filter::flow::internal::ParticleMessenger<ParticleType> messenger( vtkm::filter::flow::internal::ParticleMessenger<ParticleType> messenger(
this->Comm, useAsync, this->BoundsMap, 1, 128); this->Comm, useAsync, this->BoundsMap, 1, 128);
while (this->TotalNumTerminatedParticles < this->TotalNumParticles) while (!this->Terminator.Done())
{ {
std::unordered_map<vtkm::Id, std::vector<DSIHelperInfo<ParticleType>>> workerResults; std::unordered_map<vtkm::Id, std::vector<DSIHelperInfo<ParticleType>>> workerResults;
this->GetWorkerResults(workerResults); this->GetWorkerResults(workerResults);
vtkm::Id numTerm = 0;
for (auto& it : workerResults) for (auto& it : workerResults)
{
for (auto& r : it.second) for (auto& r : it.second)
numTerm += this->UpdateResult(r); this->UpdateResult(r);
}
vtkm::Id numTermMessages = 0; this->Communicate(messenger);
this->Communicate(messenger, numTerm, numTermMessages); this->Terminator.Control(this->HaveAnyWork());
this->TotalNumTerminatedParticles += (numTerm + numTermMessages);
if (this->TotalNumTerminatedParticles > this->TotalNumParticles)
throw vtkm::cont::ErrorFilterExecution("Particle count error");
} }
//Let the workers know that we are done. //Let the workers know that we are done.
this->SetDone(); this->SetDone();
} }
bool GetBlockAndWait(const bool& syncComm, const vtkm::Id& numLocalTerm) override bool GetBlockAndWait(const bool& syncComm) override
{ {
std::lock_guard<std::mutex> lock(this->Mutex); std::lock_guard<std::mutex> lock(this->Mutex);
if (this->Done) if (this->Done)
return true; return true;
return (this->AdvectAlgorithm<DSIType>::GetBlockAndWait(syncComm, numLocalTerm) && return (this->AdvectAlgorithm<DSIType>::GetBlockAndWait(syncComm) && !this->WorkerActivate &&
!this->WorkerActivate && this->WorkerResults.empty()); this->WorkerResults.empty());
} }
void GetWorkerResults( void GetWorkerResults(
@ -193,7 +190,7 @@ protected:
std::atomic<bool> Done; std::atomic<bool> Done;
std::mutex Mutex; std::mutex Mutex;
bool WorkerActivate; bool WorkerActivate = false;
std::condition_variable WorkerActivateCondition; std::condition_variable WorkerActivateCondition;
std::unordered_map<vtkm::Id, std::vector<DSIHelperInfo<ParticleType>>> WorkerResults; std::unordered_map<vtkm::Id, std::vector<DSIHelperInfo<ParticleType>>> WorkerResults;
}; };