Add threaded filters for multiblock dataset.

This commit is contained in:
Dave Pugmire 2021-08-04 14:27:41 -04:00
parent 8f2d002465
commit f8021dbc0d
11 changed files with 583 additions and 6 deletions

@ -42,6 +42,17 @@ class VTKM_FILTER_COMMON_EXPORT CleanGrid : public vtkm::filter::FilterDataSet<C
public:
CleanGrid();
VTKM_CONT
Filter* Clone() const override
{
CleanGrid* clone = new CleanGrid();
clone->CopyStateFrom(this);
return clone;
}
VTKM_CONT
bool CanThread() const override { return true; }
/// When the CompactPointFields flag is true, the filter will identify any
/// points that are not used by the topology. This is on by default.
///
@ -96,6 +107,19 @@ public:
return this->MapFieldOntoOutput(result, field);
}
VTKM_CONT
void CopyStateFrom(const CleanGrid* cleanGrid)
{
this->FilterDataSet<CleanGrid>::CopyStateFrom(cleanGrid);
this->CompactPointFields = cleanGrid->CompactPointFields;
this->MergePoints = cleanGrid->MergePoints;
this->Tolerance = cleanGrid->Tolerance;
this->ToleranceIsAbsolute = cleanGrid->ToleranceIsAbsolute;
this->RemoveDegenerateCells = cleanGrid->RemoveDegenerateCells;
this->FastMerge = cleanGrid->FastMerge;
}
private:
bool CompactPointFields;
bool MergePoints;

@ -34,6 +34,17 @@ class VTKM_FILTER_CONTOUR_EXPORT Contour : public vtkm::filter::FilterDataSetWit
public:
using SupportedTypes = vtkm::List<vtkm::UInt8, vtkm::Int8, vtkm::Float32, vtkm::Float64>;
VTKM_CONT
Filter* Clone() const override
{
Contour* clone = new Contour();
clone->CopyStateFrom(this);
return clone;
}
VTKM_CONT
bool CanThread() const override { return true; }
Contour();
void SetNumberOfIsoValues(vtkm::Id num);
@ -155,6 +166,21 @@ public:
return true;
}
protected:
VTKM_CONT
void CopyStateFrom(const Contour* contour)
{
this->FilterDataSetWithField<Contour>::CopyStateFrom(contour);
this->IsoValues = contour->IsoValues;
this->GenerateNormals = contour->GenerateNormals;
this->AddInterpolationEdgeIds = contour->AddInterpolationEdgeIds;
this->ComputeFastNormalsForStructured = contour->ComputeFastNormalsForStructured;
this->ComputeFastNormalsForUnstructured = contour->ComputeFastNormalsForUnstructured;
this->NormalArrayName = contour->NormalArrayName;
this->InterpolationEdgeIdsArrayName = contour->InterpolationEdgeIdsArrayName;
}
private:
std::vector<vtkm::Float64> IsoValues;
bool GenerateNormals;

@ -14,6 +14,7 @@
#include <vtkm/cont/DataSet.h>
#include <vtkm/cont/Field.h>
#include <vtkm/cont/Invoker.h>
#include <vtkm/cont/Logging.h>
#include <vtkm/cont/PartitionedDataSet.h>
#include <vtkm/filter/CreateResult.h>
@ -178,7 +179,35 @@ public:
Filter();
VTKM_CONT
~Filter();
virtual ~Filter();
VTKM_CONT
virtual bool CanThread() const { return false; }
VTKM_CONT
bool GetRunMultiThreadedFilter() const
{
return this->CanThread() && this->RunFilterWithMultipleThreads;
}
VTKM_CONT
void SetRunMultiThreadedFilter(bool val)
{
if (this->CanThread())
this->RunFilterWithMultipleThreads = val;
else
{
std::string msg =
"Multi threaded filter not supported for " + std::string(typeid(Derived).name());
VTKM_LOG_S(vtkm::cont::LogLevel::Info, msg);
}
}
VTKM_CONT
virtual Filter* Clone() const
{
throw vtkm::cont::ErrorExecution("You must implement Clone in the derived class.");
}
/// \brief Specify which subset of types a filter supports.
///
@ -274,6 +303,10 @@ public:
/// On success, this the dataset produced. On error, vtkm::cont::ErrorExecution will be thrown.
VTKM_CONT vtkm::cont::PartitionedDataSet Execute(const vtkm::cont::PartitionedDataSet& input);
VTKM_CONT vtkm::cont::PartitionedDataSet ExecuteThreaded(
const vtkm::cont::PartitionedDataSet& input,
vtkm::Id numThreads);
template <typename DerivedPolicy>
VTKM_DEPRECATED(1.6,
"Filter::Execute no longer guarantees policy modifications. "
@ -295,11 +328,20 @@ public:
/// which device adapters a filter uses.
void SetInvoker(vtkm::cont::Invoker inv) { this->Invoke = inv; }
VTKM_CONT
virtual vtkm::Id DetermineNumberOfThreads(const vtkm::cont::PartitionedDataSet& input);
protected:
vtkm::cont::Invoker Invoke;
vtkm::filter::Filter<Derived>& operator=(const vtkm::filter::Filter<Derived>&) = default;
VTKM_CONT
void CopyStateFrom(const Filter<Derived>* filter) { *this = *filter; }
private:
vtkm::filter::FieldSelection FieldsToPass;
bool RunFilterWithMultipleThreads = false;
};
}
} // namespace vtkm::filter

@ -17,6 +17,13 @@
#include <vtkm/cont/ErrorFilterExecution.h>
#include <vtkm/cont/Field.h>
#include <vtkm/cont/Logging.h>
#include <vtkm/cont/RuntimeDeviceInformation.h>
#include <vtkm/cont/RuntimeDeviceTracker.h>
#include <vtkm/cont/internal/RuntimeDeviceConfiguration.h>
#include <vtkm/filter/TaskQueue.h>
#include <future>
namespace vtkm
{
@ -170,6 +177,26 @@ InputType CallPrepareForExecutionInternal(std::true_type,
return self->PrepareForExecution(input, policy);
}
template <typename Derived, typename DerivedPolicy>
void RunFilter(Derived* self,
const vtkm::filter::PolicyBase<DerivedPolicy>& policy,
vtkm::filter::DataSetQueue& input,
vtkm::filter::DataSetQueue& output)
{
auto filterClone = static_cast<Derived*>(self->Clone());
std::pair<vtkm::Id, vtkm::cont::DataSet> task;
while (input.GetTask(task))
{
auto outDS = CallPrepareForExecution(filterClone, task.second, policy);
CallMapFieldOntoOutput(filterClone, task.second, outDS, policy);
output.Push(std::make_pair(task.first, std::move(outDS)));
}
vtkm::cont::Algorithm::Synchronize();
delete filterClone;
}
//--------------------------------------------------------------------------------
// specialization for PartitionedDataSet input when `PrepareForExecution` is not provided
// by the subclass. we iterate over blocks and execute for each block
@ -182,12 +209,64 @@ vtkm::cont::PartitionedDataSet CallPrepareForExecutionInternal(
const vtkm::filter::PolicyBase<DerivedPolicy>& policy)
{
vtkm::cont::PartitionedDataSet output;
for (const auto& inBlock : input)
if (self->GetRunMultiThreadedFilter())
{
vtkm::cont::DataSet outBlock = CallPrepareForExecution(self, inBlock, policy);
CallMapFieldOntoOutput(self, inBlock, outBlock, policy);
output.AppendPartition(outBlock);
vtkm::filter::DataSetQueue inputQueue(input);
vtkm::filter::DataSetQueue outputQueue;
vtkm::Id numThreads = self->DetermineNumberOfThreads(input);
std::cout << "nThreads= " << numThreads << std::endl;
#if 0
std::vector<std::thread> threads;
for (vtkm::Id i = 0; i < numThreads; i++)
{
//auto clone = self->Clone();
//auto filterClone = static_cast<Derived*>(clone.get());
std::thread t(RunFilter<Derived, DerivedPolicy>,
self,
policy,
std::ref(inputQueue),
std::ref(outputQueue));
threads.push_back(std::move(t));
}
for (auto& t : threads)
t.join();
output = outputQueue.Get();
#endif
#if 1
//Run 'numThreads' filters.
std::vector<std::future<void>> futures(static_cast<std::size_t>(numThreads));
for (std::size_t i = 0; i < static_cast<std::size_t>(numThreads); i++)
{
auto f = std::async(std::launch::async,
RunFilter<Derived, DerivedPolicy>,
self,
policy,
std::ref(inputQueue),
std::ref(outputQueue));
futures[i] = std::move(f);
}
for (auto& f : futures)
f.get();
//Get results from the outputQueue.
output = outputQueue.Get();
#endif
}
else
{
for (const auto& inBlock : input)
{
vtkm::cont::DataSet outBlock = CallPrepareForExecution(self, inBlock, policy);
CallMapFieldOntoOutput(self, inBlock, outBlock, policy);
output.AppendPartition(outBlock);
}
}
return output;
}
@ -258,7 +337,6 @@ inline VTKM_CONT vtkm::cont::DataSet Filter<Derived>::Execute(const vtkm::cont::
return output.GetNumberOfPartitions() == 1 ? output.GetPartition(0) : vtkm::cont::DataSet();
}
//----------------------------------------------------------------------------
template <typename Derived>
inline VTKM_CONT vtkm::cont::PartitionedDataSet Filter<Derived>::Execute(
const vtkm::cont::PartitionedDataSet& input)
@ -283,6 +361,73 @@ inline VTKM_CONT vtkm::cont::PartitionedDataSet Filter<Derived>::Execute(
return output;
}
template <typename Derived>
inline VTKM_CONT vtkm::Id Filter<Derived>::DetermineNumberOfThreads(
const vtkm::cont::PartitionedDataSet& input)
{
vtkm::Id numDS = input.GetNumberOfPartitions();
//Aribitrary constants.
const vtkm::Id threadsPerGPU = 8;
const vtkm::Id threadsPerCPU = 4;
vtkm::Id availThreads = 1;
auto& tracker = vtkm::cont::GetRuntimeDeviceTracker();
const bool runOnSerial = tracker.CanRunOn(vtkm::cont::DeviceAdapterTagSerial{});
const bool runOnCuda = tracker.CanRunOn(vtkm::cont::DeviceAdapterTagCuda{});
const bool runOnKokkos = tracker.CanRunOn(vtkm::cont::DeviceAdapterTagKokkos{});
const bool runOnOpenMP = tracker.CanRunOn(vtkm::cont::DeviceAdapterTagOpenMP{});
std::cout << "****************Run: " << runOnSerial << " " << runOnCuda << " " << runOnKokkos
<< " " << runOnOpenMP << std::endl;
if (runOnSerial)
availThreads = 1;
else if (runOnCuda)
availThreads = threadsPerGPU;
else if (runOnOpenMP)
availThreads = threadsPerCPU;
else if (runOnKokkos)
{
#ifdef VTKM_KOKKOS_CUDA
availThreads = threadsPerGPU;
#else
availThreads = 1;
#endif
}
vtkm::Id numThreads = std::min<vtkm::Id>(numDS, availThreads);
return numThreads;
}
template <typename Derived>
inline VTKM_CONT vtkm::cont::PartitionedDataSet Filter<Derived>::ExecuteThreaded(
const vtkm::cont::PartitionedDataSet& input,
vtkm::Id vtkmNotUsed(numThreads))
{
VTKM_LOG_SCOPE(vtkm::cont::LogLevel::Perf,
"Filter (%d partitions): '%s'",
(int)input.GetNumberOfPartitions(),
vtkm::cont::TypeToString<Derived>().c_str());
Derived* self = static_cast<Derived*>(this);
vtkm::filter::PolicyDefault policy;
// Call `void Derived::PreExecute<DerivedPolicy>(input, policy)`, if defined.
internal::CallPreExecute(self, input, policy);
// Call `PrepareForExecution` (which should probably be renamed at some point)
vtkm::cont::PartitionedDataSet output = internal::CallPrepareForExecution(self, input, policy);
// Call `Derived::PostExecute<DerivedPolicy>(input, output, policy)` if defined.
internal::CallPostExecute(self, input, output, policy);
return output;
}
//----------------------------------------------------------------------------
template <typename Derived>

@ -57,6 +57,12 @@ public:
VTKM_CONT vtkm::cont::DataSet PrepareForExecution(const vtkm::cont::DataSet& input,
vtkm::filter::PolicyBase<DerivedPolicy> policy);
protected:
vtkm::filter::FilterDataSet<Derived>& operator=(const vtkm::filter::FilterDataSet<Derived>&) =
default;
VTKM_CONT
void CopyStateFrom(const FilterDataSet<Derived>* filter) { *this = *filter; }
private:
vtkm::Id CoordinateSystemIndex;

@ -83,6 +83,13 @@ public:
VTKM_CONT vtkm::cont::DataSet PrepareForExecution(const vtkm::cont::DataSet& input,
vtkm::filter::PolicyBase<DerivedPolicy> policy);
protected:
vtkm::filter::FilterDataSetWithField<Derived>& operator=(
const vtkm::filter::FilterDataSetWithField<Derived>&) = default;
VTKM_CONT
void CopyStateFrom(const FilterDataSetWithField<Derived>* filter) { *this = *filter; }
private:
template <typename DerivedPolicy>
VTKM_CONT vtkm::cont::DataSet PrepareForExecution(const vtkm::cont::DataSet& input,

@ -98,6 +98,11 @@ public:
//@}
protected:
vtkm::filter::FilterField<Derived>& operator=(const vtkm::filter::FilterField<Derived>&) =
default;
VTKM_CONT
void CopyStateFrom(const FilterField<Derived>* filter) { *this = *filter; }
private:
std::string OutputFieldName;
vtkm::Id CoordinateSystemIndex;

@ -96,6 +96,30 @@ public:
const vtkm::cont::ArrayHandle<T, StorageType>& field,
const vtkm::filter::FieldMetadata& fieldMeta,
vtkm::filter::PolicyBase<DerivedPolicy> policy);
VTKM_CONT
Filter* Clone() const override
{
Gradient* clone = new Gradient();
clone->CopyStateFrom(this);
return clone;
}
VTKM_CONT
bool CanThread() const override { return true; }
protected:
VTKM_CONT
void CopyStateFrom(const Gradient* gradient)
{
this->FilterField<Gradient>::CopyStateFrom(gradient);
this->ComputePointGradient = gradient->ComputePointGradient;
this->ComputeDivergence = gradient->ComputeDivergence;
this->ComputeVorticity = gradient->ComputeVorticity;
this->ComputeQCriterion = gradient->ComputeQCriterion;
this->StoreGradient = gradient->StoreGradient;
this->RowOrdering = gradient->RowOrdering;
}
private:
bool ComputePointGradient = false;

123
vtkm/filter/TaskQueue.h Normal file

@ -0,0 +1,123 @@
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
//
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//============================================================================
#ifndef vtk_m_filter_TaskQueue_h
#define vtk_m_filter_TaskQueue_h
#include <queue>
namespace vtkm
{
namespace filter
{
template <typename T>
class TaskQueue
{
public:
TaskQueue() = default;
//Add a task to the Queue.
void Push(T&& item)
{
std::unique_lock<std::mutex> lock(this->Lock);
this->Queue.push(item);
}
bool HasTasks()
{
std::unique_lock<std::mutex> lock(this->Lock);
return !(this->Queue.empty());
}
bool GetTask(T& item)
{
std::unique_lock<std::mutex> lock(this->Lock);
if (this->Queue.empty())
return false;
item = this->Queue.front();
this->Queue.pop();
return true;
}
T Pop()
{
T item;
std::unique_lock<std::mutex> lock(this->Lock);
if (!this->Queue.empty())
{
item = this->Queue.front();
this->Queue.pop();
}
return item;
}
protected:
vtkm::Id Length()
{
std::unique_lock<std::mutex> lock(this->Lock);
return static_cast<vtkm::Id>(this->Queue.size());
}
private:
std::mutex Lock;
std::queue<T> Queue;
//don't want copies of this
TaskQueue(const TaskQueue& rhs) = delete;
TaskQueue& operator=(const TaskQueue& rhs) = delete;
TaskQueue(TaskQueue&& rhs) = delete;
TaskQueue& operator=(TaskQueue&& rhs) = delete;
};
class DataSetQueue : public TaskQueue<std::pair<vtkm::Id, vtkm::cont::DataSet>>
{
public:
DataSetQueue(const vtkm::cont::PartitionedDataSet& input)
{
vtkm::Id idx = 0;
for (auto ds : input)
this->Push(std::make_pair(idx++, std::move(ds)));
}
DataSetQueue() {}
vtkm::cont::PartitionedDataSet Get()
{
vtkm::cont::PartitionedDataSet pds;
vtkm::Id num = this->Length();
if (num > 0)
{
std::vector<vtkm::cont::DataSet> dataSets(static_cast<std::size_t>(num));
//Insert them back in the same order.
std::pair<vtkm::Id, vtkm::cont::DataSet> task;
while (this->GetTask(task))
{
dataSets[static_cast<std::size_t>(task.first)] = std::move(task.second);
std::cout << "****** Get: " << task.first << std::endl;
}
pds.AppendPartitions(dataSets);
}
return pds;
}
private:
};
}
}
#endif

@ -50,6 +50,7 @@ set(unit_tests
UnitTestMaskFilter.cxx
UnitTestMaskPointsFilter.cxx
UnitTestMeshQualityFilter.cxx
UnitTestMultiBlockFilter.cxx
UnitTestNDEntropyFilter.cxx
UnitTestNDHistogramFilter.cxx
UnitTestParticleDensity.cxx

@ -0,0 +1,174 @@
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
//
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//============================================================================
#include <vtkm/Math.h>
#include <vtkm/cont/DataSet.h>
#include <vtkm/cont/testing/MakeTestDataSet.h>
#include <vtkm/cont/testing/Testing.h>
#include <vtkm/filter/CleanGrid.h>
#include <vtkm/filter/ClipWithField.h>
#include <vtkm/filter/Contour.h>
#include <vtkm/filter/Gradient.h>
#include <vtkm/io/VTKDataSetReader.h>
#include <vtkm/source/Tangle.h>
namespace
{
template <typename T>
vtkm::FloatDefault ValueDifference(const T& a, const T& b)
{
return vtkm::Abs(a - b);
}
template <typename T>
vtkm::FloatDefault ValueDifference(const vtkm::Vec<T, 3>& a, const vtkm::Vec<T, 3>& b)
{
return vtkm::Abs(a[0] - b[0]) + vtkm::Abs(a[1] - b[1]) + vtkm::Abs(a[2] - b[2]);
}
template <typename ArrayType>
void ValidateField(const ArrayType& truthField, const ArrayType& resultField)
{
VTKM_TEST_ASSERT(truthField.GetNumberOfValues() == resultField.GetNumberOfValues(),
"Wrong number of field values");
const vtkm::FloatDefault tol = static_cast<vtkm::FloatDefault>(1e-3);
vtkm::Id numPts = truthField.GetNumberOfValues();
const auto truthPortal = truthField.ReadPortal();
const auto resultPortal = resultField.ReadPortal();
for (vtkm::Id j = 0; j < numPts; j++)
{
auto diff = ValueDifference(truthPortal.Get(j), resultPortal.Get(j));
if (diff > tol)
std::cout << "****** error at j= " << j << " diff= " << diff << " nPts= " << numPts
<< std::endl;
VTKM_TEST_ASSERT(ValueDifference(truthPortal.Get(j), resultPortal.Get(j)) < tol,
"Wrong value in field");
}
}
void ValidateResults(const vtkm::cont::PartitionedDataSet& truth,
const vtkm::cont::PartitionedDataSet& result,
const std::string& varName,
bool isScalar = true)
{
VTKM_TEST_ASSERT(truth.GetNumberOfPartitions() == result.GetNumberOfPartitions());
vtkm::Id numDS = truth.GetNumberOfPartitions();
for (vtkm::Id i = 0; i < numDS; i++)
{
auto truthDS = truth.GetPartition(i);
auto resultDS = result.GetPartition(i);
std::cout << "Validate: " << truthDS.GetNumberOfPoints()
<< " :: " << resultDS.GetNumberOfPoints() << std::endl;
VTKM_TEST_ASSERT(truthDS.GetNumberOfPoints() == resultDS.GetNumberOfPoints(),
"Wrong number of points");
VTKM_TEST_ASSERT(truthDS.GetNumberOfCells() == resultDS.GetNumberOfCells(),
"Wrong number of cells");
VTKM_TEST_ASSERT(resultDS.HasField(varName), "Missing field");
if (isScalar)
{
vtkm::cont::ArrayHandle<vtkm::Float32> truthField, resultField;
truthDS.GetField(varName).GetData().AsArrayHandle(truthField);
resultDS.GetField(varName).GetData().AsArrayHandle(resultField);
ValidateField(truthField, resultField);
}
else
{
vtkm::cont::ArrayHandle<vtkm::Vec<vtkm::Float32, 3>> truthField, resultField;
truthDS.GetField(varName).GetData().AsArrayHandle(truthField);
resultDS.GetField(varName).GetData().AsArrayHandle(resultField);
ValidateField(truthField, resultField);
}
}
}
} //namespace
void TestMultiBlockFilter()
{
vtkm::cont::PartitionedDataSet pds;
for (int i = 0; i < 10; i++)
{
vtkm::Id3 dims(10 + i, 10 + i, 10 + i);
vtkm::source::Tangle tangle(dims);
pds.AppendPartition(tangle.Execute());
}
std::cout << "ClipWithField" << std::endl;
std::vector<vtkm::cont::PartitionedDataSet> results;
std::vector<bool> flags = { false, true };
for (const auto doThreading : flags)
{
vtkm::filter::ClipWithField clip;
clip.SetRunMultiThreadedFilter(doThreading);
clip.SetClipValue(0.0);
clip.SetActiveField("nodevar");
clip.SetFieldsToPass("nodevar", vtkm::cont::Field::Association::POINTS);
auto result = clip.Execute(pds);
VTKM_TEST_ASSERT(result.GetNumberOfPartitions() == pds.GetNumberOfPartitions());
results.push_back(result);
}
ValidateResults(results[0], results[1], "nodevar");
std::cout << "Contour" << std::endl;
results.clear();
for (const auto doThreading : flags)
{
vtkm::filter::Contour mc;
mc.SetRunMultiThreadedFilter(doThreading);
mc.SetGenerateNormals(true);
mc.SetIsoValue(0, 0.5);
mc.SetActiveField("nodevar");
mc.SetFieldsToPass("nodevar", vtkm::cont::Field::Association::POINTS);
auto result = mc.Execute(pds);
VTKM_TEST_ASSERT(result.GetNumberOfPartitions() == pds.GetNumberOfPartitions());
results.push_back(result);
}
ValidateResults(results[0], results[1], "nodevar");
std::cout << "CleanGrid" << std::endl;
results.clear();
for (const auto doThreading : flags)
{
vtkm::filter::CleanGrid clean;
clean.SetRunMultiThreadedFilter(doThreading);
clean.SetCompactPointFields(true);
clean.SetMergePoints(true);
auto result = clean.Execute(pds);
VTKM_TEST_ASSERT(result.GetNumberOfPartitions() == pds.GetNumberOfPartitions());
results.push_back(result);
}
ValidateResults(results[0], results[1], "nodevar");
std::cout << "Gradient" << std::endl;
results.clear();
for (const auto doThreading : flags)
{
vtkm::filter::Gradient grad;
grad.SetRunMultiThreadedFilter(doThreading);
grad.SetComputePointGradient(true);
grad.SetActiveField("nodevar");
grad.SetOutputFieldName("gradient");
auto result = grad.Execute(pds);
VTKM_TEST_ASSERT(result.GetNumberOfPartitions() == pds.GetNumberOfPartitions());
results.push_back(result);
}
ValidateResults(results[0], results[1], "gradient", false);
}
int UnitTestMultiBlockFilter(int argc, char* argv[])
{
return vtkm::cont::testing::Testing::Run(TestMultiBlockFilter, argc, argv);
}