Compare commits

...

8 Commits

Author SHA1 Message Date
Dave Pugmire
d804d0fcdf Merge branch 'sl_comm_probe' into 'master'
Draft: Use async termination.

See merge request vtk/vtk-m!3182
2024-07-02 16:33:03 -04:00
Kenneth Moreland
5df372db2b Merge topic 'env-options'
310579b9a Load options from environment variables

Acked-by: Kitware Robot <kwrobot@kitware.com>
Acked-by: Li-Ta Lo <ollie@lanl.gov>
Merge-request: !3243
2024-07-02 16:31:08 -04:00
Kenneth Moreland
310579b9a7 Load options from environment variables
Some common VTK-m options such as the device and log level could be
specified on the command line but not through environment variables. It is
not always possible to set VTK-m command line options, so environment
variables are added.

Also added documentation to the user's guide about what options are
available and how to set them.
2024-07-02 12:47:34 -04:00
Dave Pugmire
c75dcace78 Merge branch 'master' of https://gitlab.kitware.com/vtk/vtk-m into sl_comm_probe 2024-04-01 15:44:57 -04:00
Dave Pugmire
02553c0b6f Merge branch 'master' of https://gitlab.kitware.com/vtk/vtk-m into sl_comm_probe 2024-03-19 15:33:13 -04:00
Dave Pugmire
eef93ff825 Merge branch 'master' of https://gitlab.kitware.com/vtk/vtk-m into sl_comm_probe 2024-02-02 15:05:57 -05:00
Dave Pugmire
dd96f1144b Fixes for non-mpi code... 2024-01-24 07:23:55 -05:00
Dave Pugmire
4bd340156b Use async termination. 2024-01-23 16:57:10 -05:00
10 changed files with 265 additions and 95 deletions

@ -0,0 +1,9 @@
## Load options from environment variables
Some common VTK-m options such as the device and log level could be
specified on the command line but not through environment variables. It is
not always possible to set VTK-m command line options, so environment
variables are added.
Also added documentation to the user's guide about what options are
available and how to set them.

@ -19,6 +19,44 @@ But it can also optionally take the ``argc`` and ``argv`` arguments to the ``mai
|VTKm| accepts arguments that, for example, configure the compute device to use or establish logging levels.
Any arguments that are handled by |VTKm| are removed from the ``argc``/``argv`` list so that your program can then respond to the remaining arguments.
Many options can also be set with environment variables.
If both the environment variable and command line argument are provided, the command line argument is used.
The following table lists the currently supported options.
.. list-table:: |VTKm| command line arguments and environment variable options.
:widths: 23 22 15 40
:header-rows: 1
* - Command Line Argument
- Environment Variable
- Default Value
- Description
* - ``--vtkm-help``
-
-
- Causes the program to print information about |VTKm| command line arguments and then exits the program.
* - ``--vtkm-log-level``
- ``VTKM_LOG_LEVEL``
- ``WARNING``
- Specifies the logging level.
Valid values are ``INFO``, ``WARNING``, ``ERROR``, ``FATAL``, and ``OFF``.
This can also be set to a numeric value for the logging level.
* - ``--vtkm-device``
- ``VTKM_DEVICE``
-
- Force |VTKm| to use the specified device.
If not specified or ``Any`` given, then any available device may be used.
* - ``--vtkm-num-threads``
- ``VTKM_NUM_THREADS``
-
- Set the number of threads to use on a multi-core device.
If not specified, the device will use the cores available in the system.
* - ``--vtkm-device-instance``
- ``VTKM_DEVICE_INSTANCE``
-
- Selects the device to use when more than one device device of a given type is available.
The device is specified with a numbered index.
:func:`vtkm::cont::Initialize` returns a :struct:`vtkm::cont::InitializeResult` structure.
This structure contains information about the supported arguments and options selected during initialization.

@ -17,6 +17,7 @@
#include <vtkm/thirdparty/diy/environment.h>
#include <cstdlib>
#include <memory>
#include <sstream>
@ -123,7 +124,7 @@ InitializeResult Initialize(int& argc, char* argv[], InitializeOptions opts)
}
else
{
vtkm::cont::InitLogging(argc, argv, loggingFlag);
vtkm::cont::InitLogging(argc, argv, loggingFlag, "VTKM_LOG_LEVEL");
}
if (!vtkmdiy::mpi::environment::initialized())
{
@ -225,37 +226,70 @@ InitializeResult Initialize(int& argc, char* argv[], InitializeOptions opts)
vtkm::cont::DeviceAdapterTagAny{}, runtimeDeviceOptions, argc, argv);
}
// Check for device on command line.
if (options[opt::OptionIndex::DEVICE])
{
const char* arg = options[opt::OptionIndex::DEVICE].arg;
auto id = vtkm::cont::make_DeviceAdapterId(arg);
if (id != vtkm::cont::DeviceAdapterTagAny{})
config.Device = vtkm::cont::make_DeviceAdapterId(arg);
}
// If not on command line, check for device in environment variable.
if (config.Device == vtkm::cont::DeviceAdapterTagUndefined{})
{
const char* deviceEnv = std::getenv("VTKM_DEVICE");
if (deviceEnv != nullptr)
{
vtkm::cont::GetRuntimeDeviceTracker().ForceDevice(id);
auto id = vtkm::cont::make_DeviceAdapterId(std::getenv("VTKM_DEVICE"));
if (VtkmDeviceArg::DeviceIsAvailable(id))
{
config.Device = id;
}
else
{
// Got invalid device. Log an error, but continue to do the default action for
// the device (i.e., ignore the environment variable setting).
VTKM_LOG_S(vtkm::cont::LogLevel::Error,
"Invalid device `"
<< deviceEnv
<< "` specified in VTKM_DEVICE environment variable. Ignoring.");
VTKM_LOG_S(vtkm::cont::LogLevel::Error,
"Valid devices are: " << VtkmDeviceArg::GetValidDeviceNames());
}
}
}
// If still not defined, check to see if "any" device should be added.
if ((config.Device == vtkm::cont::DeviceAdapterTagUndefined{}) &&
(opts & InitializeOptions::DefaultAnyDevice) != InitializeOptions::None)
{
config.Device = vtkm::cont::DeviceAdapterTagAny{};
}
// Set the state for the device selected.
if (config.Device == vtkm::cont::DeviceAdapterTagUndefined{})
{
if ((opts & InitializeOptions::RequireDevice) != InitializeOptions::None)
{
auto devices = VtkmDeviceArg::GetValidDeviceNames();
VTKM_LOG_S(vtkm::cont::LogLevel::Fatal, "Device not given on command line.");
std::cerr << "Target device must be specified via --vtkm-device.\n"
"Valid devices: "
<< devices << std::endl;
if ((opts & InitializeOptions::AddHelp) != InitializeOptions::None)
{
std::cerr << config.Usage;
}
exit(1);
}
else
{
vtkm::cont::GetRuntimeDeviceTracker().Reset();
// No device specified. Do nothing and let VTK-m decide what it is going to do.
}
config.Device = id;
}
else if ((opts & InitializeOptions::DefaultAnyDevice) != InitializeOptions::None)
else if (config.Device == vtkm::cont::DeviceAdapterTagAny{})
{
vtkm::cont::GetRuntimeDeviceTracker().Reset();
config.Device = vtkm::cont::DeviceAdapterTagAny{};
}
else if ((opts & InitializeOptions::RequireDevice) != InitializeOptions::None)
else
{
auto devices = VtkmDeviceArg::GetValidDeviceNames();
VTKM_LOG_S(vtkm::cont::LogLevel::Error, "Device not given on command line.");
std::cerr << "Target device must be specified via --vtkm-device.\n"
"Valid devices: "
<< devices << std::endl;
if ((opts & InitializeOptions::AddHelp) != InitializeOptions::None)
{
std::cerr << config.Usage;
}
exit(1);
vtkm::cont::GetRuntimeDeviceTracker().ForceDevice(config.Device);
}

@ -30,7 +30,7 @@
#endif // VTKM_ENABLE_LOGGING
#include <cassert>
#include <cstdlib>
#include <iomanip>
#include <sstream>
#include <stdexcept>
@ -108,7 +108,10 @@ namespace cont
{
VTKM_CONT
void InitLogging(int& argc, char* argv[], const std::string& loggingFlag)
void InitLogging(int& argc,
char* argv[],
const std::string& loggingFlag,
const std::string& loggingEnv)
{
SetLogLevelName(vtkm::cont::LogLevel::Off, "Off");
SetLogLevelName(vtkm::cont::LogLevel::Fatal, "FATL");
@ -130,8 +133,16 @@ void InitLogging(int& argc, char* argv[], const std::string& loggingFlag)
loguru::set_verbosity_to_name_callback(&verbosityToNameCallback);
loguru::set_name_to_verbosity_callback(&nameToVerbosityCallback);
// Set the default log level to warning
SetStderrLogLevel(vtkm::cont::LogLevel::Warn);
const char* envLevel = std::getenv(loggingEnv.c_str());
if (envLevel != nullptr)
{
SetStderrLogLevel(envLevel);
}
else
{
// Set the default log level to warning
SetStderrLogLevel(vtkm::cont::LogLevel::Warn);
}
loguru::init(argc, argv, loggingFlag.c_str());
}
#else // VTKM_ENABLE_LOGGING

@ -371,7 +371,10 @@ enum class LogLevel
*/
VTKM_CONT_EXPORT
VTKM_CONT
void InitLogging(int& argc, char* argv[], const std::string& loggingFlag = "--vtkm-log-level");
void InitLogging(int& argc,
char* argv[],
const std::string& loggingFlag = "--vtkm-log-level",
const std::string& loggingEnv = "VTKM_LOG_LEVEL");
VTKM_CONT_EXPORT
VTKM_CONT
void InitLogging();

@ -15,6 +15,7 @@
#include <vtkm/cont/ErrorFilterExecution.h>
#include <vtkm/filter/Filter.h>
#include <vtkm/filter/flow/FlowTypes.h>
#include <vtkm/filter/flow/internal/BoundsMap.h>
#include <vtkm/filter/flow/vtkm_filter_flow_export.h>
namespace vtkm
@ -104,7 +105,7 @@ protected:
bool BlockIdsSet = false;
std::vector<vtkm::Id> BlockIds;
vtkm::filter::flow::internal::BoundsMap BoundsMap;
vtkm::Id NumberOfSteps = 0;
vtkm::cont::UnknownArrayHandle Seeds;
vtkm::filter::flow::IntegrationSolverType SolverType =

@ -58,13 +58,15 @@ FilterParticleAdvectionSteadyState<Derived>::DoExecutePartitions(
DataSetIntegratorSteadyState<ParticleType, FieldType, TerminationType, AnalysisType>;
this->ValidateOptions();
if (this->BlockIdsSet)
this->BoundsMap = vtkm::filter::flow::internal::BoundsMap(input, this->BlockIds);
else
this->BoundsMap = vtkm::filter::flow::internal::BoundsMap(input);
vtkm::filter::flow::internal::BoundsMap boundsMap(input);
std::vector<DSIType> dsi;
for (vtkm::Id i = 0; i < input.GetNumberOfPartitions(); i++)
{
vtkm::Id blockId = boundsMap.GetLocalBlockId(i);
vtkm::Id blockId = this->BoundsMap.GetLocalBlockId(i);
auto dataset = input.GetPartition(i);
// Build the field for the current dataset
@ -78,7 +80,7 @@ FilterParticleAdvectionSteadyState<Derived>::DoExecutePartitions(
}
vtkm::filter::flow::internal::ParticleAdvector<DSIType> pav(
boundsMap, dsi, this->UseThreadedAlgorithm, this->UseAsynchronousCommunication);
this->BoundsMap, dsi, this->UseThreadedAlgorithm, this->UseAsynchronousCommunication);
vtkm::cont::ArrayHandle<ParticleType> particles;
this->Seeds.AsArrayHandle(particles);

@ -55,12 +55,15 @@ FilterParticleAdvectionUnsteadyState<Derived>::DoExecutePartitions(
using DSIType = vtkm::filter::flow::internal::
DataSetIntegratorUnsteadyState<ParticleType, FieldType, TerminationType, AnalysisType>;
vtkm::filter::flow::internal::BoundsMap boundsMap(input);
if (this->BlockIdsSet)
this->BoundsMap = vtkm::filter::flow::internal::BoundsMap(input, this->BlockIds);
else
this->BoundsMap = vtkm::filter::flow::internal::BoundsMap(input);
std::vector<DSIType> dsi;
for (vtkm::Id i = 0; i < input.GetNumberOfPartitions(); i++)
{
vtkm::Id blockId = boundsMap.GetLocalBlockId(i);
vtkm::Id blockId = this->BoundsMap.GetLocalBlockId(i);
auto ds1 = input.GetPartition(i);
auto ds2 = this->Input2.GetPartition(i);
@ -85,7 +88,7 @@ FilterParticleAdvectionUnsteadyState<Derived>::DoExecutePartitions(
analysis);
}
vtkm::filter::flow::internal::ParticleAdvector<DSIType> pav(
boundsMap, dsi, this->UseThreadedAlgorithm, this->UseAsynchronousCommunication);
this->BoundsMap, dsi, this->UseThreadedAlgorithm, this->UseAsynchronousCommunication);
vtkm::cont::ArrayHandle<ParticleType> particles;
this->Seeds.AsArrayHandle(particles);

@ -15,6 +15,10 @@
#include <vtkm/filter/flow/internal/BoundsMap.h>
#include <vtkm/filter/flow/internal/DataSetIntegrator.h>
#include <vtkm/filter/flow/internal/ParticleMessenger.h>
#ifdef VTKM_ENABLE_MPI
#include <vtkm/thirdparty/diy/diy.h>
#include <vtkm/thirdparty/diy/mpi-cast.h>
#endif
namespace vtkm
{
@ -25,6 +29,87 @@ namespace flow
namespace internal
{
class AdvectAlgorithmTerminator
{
public:
#ifdef VTKM_ENABLE_MPI
AdvectAlgorithmTerminator(vtkmdiy::mpi::communicator& comm)
: MPIComm(vtkmdiy::mpi::mpi_cast(comm.handle()))
#else
AdvectAlgorithmTerminator(vtkmdiy::mpi::communicator& vtkmNotUsed(comm))
#endif
{
}
void AddWork()
{
#ifdef VTKM_ENABLE_MPI
this->Dirty = 1;
#endif
}
bool Done() const { return this->State == AdvectAlgorithmTerminatorState::DONE; }
void Control(bool haveLocalWork)
{
#ifdef VTKM_ENABLE_MPI
if (this->State == STATE_0 && !haveLocalWork)
{
MPI_Ibarrier(this->MPIComm, &this->StateReq);
this->Dirty = 0;
this->State = STATE_1;
}
else if (this->State == STATE_1)
{
MPI_Status status;
int flag;
MPI_Test(&this->StateReq, &flag, &status);
if (flag == 1)
{
int localDirty = this->Dirty;
MPI_Iallreduce(
&localDirty, &this->AllDirty, 1, MPI_INT, MPI_LOR, this->MPIComm, &this->StateReq);
this->State = STATE_2;
}
}
else if (this->State == STATE_2)
{
MPI_Status status;
int flag;
MPI_Test(&this->StateReq, &flag, &status);
if (flag == 1)
{
if (this->AllDirty == 0) //done
this->State = DONE;
else
this->State = STATE_0; //reset.
}
}
#else
if (!haveLocalWork)
this->State = DONE;
#endif
}
private:
enum AdvectAlgorithmTerminatorState
{
STATE_0,
STATE_1,
STATE_2,
DONE
};
AdvectAlgorithmTerminatorState State = AdvectAlgorithmTerminatorState::STATE_0;
#ifdef VTKM_ENABLE_MPI
std::atomic<int> Dirty;
int AllDirty = 0;
MPI_Request StateReq;
MPI_Comm MPIComm;
#endif
};
template <typename DSIType>
class AdvectAlgorithm
{
@ -39,6 +124,7 @@ public:
, NumRanks(this->Comm.size())
, Rank(this->Comm.rank())
, UseAsynchronousCommunication(useAsyncComm)
, Terminator(this->Comm)
{
}
@ -97,27 +183,21 @@ public:
vtkm::filter::flow::internal::ParticleMessenger<ParticleType> messenger(
this->Comm, this->UseAsynchronousCommunication, this->BoundsMap, 1, 128);
this->ComputeTotalNumParticles();
while (this->TotalNumTerminatedParticles < this->TotalNumParticles)
while (!this->Terminator.Done())
{
std::vector<ParticleType> v;
vtkm::Id numTerm = 0, blockId = -1;
vtkm::Id blockId = -1;
if (this->GetActiveParticles(v, blockId))
{
//make this a pointer to avoid the copy?
auto& block = this->GetDataSet(blockId);
DSIHelperInfo<ParticleType> bb(v, this->BoundsMap, this->ParticleBlockIDsMap);
block.Advect(bb, this->StepSize);
numTerm = this->UpdateResult(bb);
this->UpdateResult(bb);
}
vtkm::Id numTermMessages = 0;
this->Communicate(messenger, numTerm, numTermMessages);
this->TotalNumTerminatedParticles += (numTerm + numTermMessages);
if (this->TotalNumTerminatedParticles > this->TotalNumParticles)
throw vtkm::cont::ErrorFilterExecution("Particle count error");
this->Communicate(messenger);
this->Terminator.Control(!this->Active.empty());
}
}
@ -128,19 +208,6 @@ public:
this->ParticleBlockIDsMap.clear();
}
void ComputeTotalNumParticles()
{
vtkm::Id numLocal = static_cast<vtkm::Id>(this->Inactive.size());
for (const auto& it : this->Active)
numLocal += it.second.size();
#ifdef VTKM_ENABLE_MPI
vtkmdiy::mpi::all_reduce(this->Comm, numLocal, this->TotalNumParticles, std::plus<vtkm::Id>{});
#else
this->TotalNumParticles = numLocal;
#endif
}
DataSetIntegrator<DSIType, ParticleType>& GetDataSet(vtkm::Id id)
{
for (auto& it : this->Blocks)
@ -213,9 +280,7 @@ public:
return !particles.empty();
}
void Communicate(vtkm::filter::flow::internal::ParticleMessenger<ParticleType>& messenger,
vtkm::Id numLocalTerminations,
vtkm::Id& numTermMessages)
void Communicate(vtkm::filter::flow::internal::ParticleMessenger<ParticleType>& messenger)
{
std::vector<ParticleType> outgoing;
std::vector<vtkm::Id> outgoingRanks;
@ -224,16 +289,17 @@ public:
std::vector<ParticleType> incoming;
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>> incomingBlockIDs;
numTermMessages = 0;
bool block = false;
#ifdef VTKM_ENABLE_MPI
block = this->GetBlockAndWait(messenger.UsingSyncCommunication(), numLocalTerminations);
block = this->GetBlockAndWait(messenger.UsingSyncCommunication());
#endif
vtkm::Id numTermMessages;
messenger.Exchange(outgoing,
outgoingRanks,
this->ParticleBlockIDsMap,
numLocalTerminations,
0,
incoming,
incomingBlockIDs,
numTermMessages,
@ -311,17 +377,22 @@ public:
{
VTKM_ASSERT(particles.size() == idsMap.size());
for (auto pit = particles.begin(); pit != particles.end(); pit++)
if (!particles.empty())
{
vtkm::Id particleID = pit->GetID();
const auto& it = idsMap.find(particleID);
VTKM_ASSERT(it != idsMap.end() && !it->second.empty());
vtkm::Id blockId = it->second[0];
this->Active[blockId].emplace_back(*pit);
}
this->Terminator.AddWork();
for (const auto& it : idsMap)
this->ParticleBlockIDsMap[it.first] = it.second;
for (auto pit = particles.begin(); pit != particles.end(); pit++)
{
vtkm::Id particleID = pit->GetID();
const auto& it = idsMap.find(particleID);
VTKM_ASSERT(it != idsMap.end() && !it->second.empty());
vtkm::Id blockId = it->second[0];
this->Active[blockId].emplace_back(*pit);
}
for (const auto& it : idsMap)
this->ParticleBlockIDsMap[it.first] = it.second;
}
}
virtual void UpdateInactive(const std::vector<ParticleType>& particles,
@ -351,7 +422,7 @@ public:
}
virtual bool GetBlockAndWait(const bool& syncComm, const vtkm::Id& numLocalTerm)
virtual bool GetBlockAndWait(const bool& syncComm)
{
bool haveNoWork = this->Active.empty() && this->Inactive.empty();
@ -367,9 +438,11 @@ public:
//2. numLocalTerm + this->TotalNumberOfTerminatedParticles == this->TotalNumberOfParticles
//So, if neither are true, we can safely block and wait for communication to come in.
if (haveNoWork &&
(numLocalTerm + this->TotalNumTerminatedParticles < this->TotalNumParticles))
return true;
// if (this->Terminator.State == AdvectAlgorithmTerminator::AdvectAlgorithmTerminatorState::STATE_2)
// return true;
// if (haveNoWork && (numLocalTerm + this->TotalNumTerminatedParticles < this->TotalNumParticles))
// return true;
return false;
}
@ -388,9 +461,8 @@ public:
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>> ParticleBlockIDsMap;
vtkm::Id Rank;
vtkm::FloatDefault StepSize;
vtkm::Id TotalNumParticles = 0;
vtkm::Id TotalNumTerminatedParticles = 0;
bool UseAsynchronousCommunication = true;
AdvectAlgorithmTerminator Terminator;
};
}

@ -39,7 +39,6 @@ public:
bool useAsyncComm)
: AdvectAlgorithm<DSIType>(bm, blocks, useAsyncComm)
, Done(false)
, WorkerActivate(false)
{
//For threaded algorithm, the particles go out of scope in the Work method.
//When this happens, they are destructed by the time the Manage thread gets them.
@ -50,8 +49,6 @@ public:
void Go() override
{
this->ComputeTotalNumParticles();
std::vector<std::thread> workerThreads;
workerThreads.emplace_back(std::thread(AdvectAlgorithmThreaded::Worker, this));
this->Manage();
@ -63,6 +60,13 @@ public:
}
protected:
bool HaveAnyWork()
{
std::lock_guard<std::mutex> lock(this->Mutex);
//We have work if there particles in any queues or a worker is busy.
return !this->Active.empty() || !this->Inactive.empty() || this->WorkerActivate;
}
bool GetActiveParticles(std::vector<ParticleType>& particles, vtkm::Id& blockId) override
{
std::lock_guard<std::mutex> lock(this->Mutex);
@ -144,38 +148,31 @@ protected:
vtkm::filter::flow::internal::ParticleMessenger<ParticleType> messenger(
this->Comm, useAsync, this->BoundsMap, 1, 128);
while (this->TotalNumTerminatedParticles < this->TotalNumParticles)
while (!this->Terminator.Done())
{
std::unordered_map<vtkm::Id, std::vector<DSIHelperInfo<ParticleType>>> workerResults;
this->GetWorkerResults(workerResults);
vtkm::Id numTerm = 0;
for (auto& it : workerResults)
{
for (auto& r : it.second)
numTerm += this->UpdateResult(r);
}
this->UpdateResult(r);
vtkm::Id numTermMessages = 0;
this->Communicate(messenger, numTerm, numTermMessages);
this->TotalNumTerminatedParticles += (numTerm + numTermMessages);
if (this->TotalNumTerminatedParticles > this->TotalNumParticles)
throw vtkm::cont::ErrorFilterExecution("Particle count error");
this->Communicate(messenger);
this->Terminator.Control(this->HaveAnyWork());
}
//Let the workers know that we are done.
this->SetDone();
}
bool GetBlockAndWait(const bool& syncComm, const vtkm::Id& numLocalTerm) override
bool GetBlockAndWait(const bool& syncComm) override
{
std::lock_guard<std::mutex> lock(this->Mutex);
if (this->Done)
return true;
return (this->AdvectAlgorithm<DSIType>::GetBlockAndWait(syncComm, numLocalTerm) &&
!this->WorkerActivate && this->WorkerResults.empty());
return (this->AdvectAlgorithm<DSIType>::GetBlockAndWait(syncComm) && !this->WorkerActivate &&
this->WorkerResults.empty());
}
void GetWorkerResults(
@ -193,7 +190,7 @@ protected:
std::atomic<bool> Done;
std::mutex Mutex;
bool WorkerActivate;
bool WorkerActivate = false;
std::condition_variable WorkerActivateCondition;
std::unordered_map<vtkm::Id, std::vector<DSIHelperInfo<ParticleType>>> WorkerResults;
};