vtk-m/vtkm/filter/Filter.cxx

155 lines
4.3 KiB
C++
Raw Normal View History

2021-12-03 20:44:51 +00:00
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
//
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//============================================================================
#include <vtkm/cont/Algorithm.h>
#include <vtkm/cont/Logging.h>
#include <vtkm/cont/RuntimeDeviceTracker.h>
#include <vtkm/filter/Filter.h>
2021-12-03 20:44:51 +00:00
#include <vtkm/filter/TaskQueue.h>
#include <future>
namespace vtkm
{
namespace filter
{
namespace
{
void RunFilter(Filter* self, vtkm::filter::DataSetQueue& input, vtkm::filter::DataSetQueue& output)
2021-12-03 20:44:51 +00:00
{
2022-08-12 19:29:38 +00:00
auto& tracker = vtkm::cont::GetRuntimeDeviceTracker();
2022-08-18 10:38:15 +00:00
bool prevVal = tracker.GetThreadFriendlyMemAlloc();
tracker.SetThreadFriendlyMemAlloc(true);
2021-12-03 20:44:51 +00:00
std::pair<vtkm::Id, vtkm::cont::DataSet> task;
while (input.GetTask(task))
{
auto outDS = self->Execute(task.second);
output.Push(std::make_pair(task.first, std::move(outDS)));
}
vtkm::cont::Algorithm::Synchronize();
2022-08-18 10:38:15 +00:00
tracker.SetThreadFriendlyMemAlloc(prevVal);
2021-12-03 20:44:51 +00:00
}
}
Filter::~Filter() = default;
bool Filter::CanThread() const
{
return true;
}
2021-12-03 20:44:51 +00:00
//----------------------------------------------------------------------------
vtkm::cont::PartitionedDataSet Filter::DoExecutePartitions(
2021-12-13 19:17:15 +00:00
const vtkm::cont::PartitionedDataSet& input)
2021-12-03 20:44:51 +00:00
{
vtkm::cont::PartitionedDataSet output;
if (this->GetRunMultiThreadedFilter())
{
vtkm::filter::DataSetQueue inputQueue(input);
vtkm::filter::DataSetQueue outputQueue;
vtkm::Id numThreads = this->DetermineNumberOfThreads(input);
//Run 'numThreads' filters.
std::vector<std::future<void>> futures(static_cast<std::size_t>(numThreads));
for (std::size_t i = 0; i < static_cast<std::size_t>(numThreads); i++)
{
auto f = std::async(
std::launch::async, RunFilter, this, std::ref(inputQueue), std::ref(outputQueue));
2021-12-03 20:44:51 +00:00
futures[i] = std::move(f);
}
for (auto& f : futures)
f.get();
//Get results from the outputQueue.
output = outputQueue.Get();
}
else
{
for (const auto& inBlock : input)
{
vtkm::cont::DataSet outBlock = this->Execute(inBlock);
output.AppendPartition(outBlock);
}
}
return this->CreateResult(input, output);
2021-12-03 20:44:51 +00:00
}
vtkm::cont::DataSet Filter::Execute(const vtkm::cont::DataSet& input)
{
return this->DoExecute(input);
}
2021-12-03 20:44:51 +00:00
vtkm::cont::PartitionedDataSet Filter::Execute(const vtkm::cont::PartitionedDataSet& input)
2021-12-03 20:44:51 +00:00
{
VTKM_LOG_SCOPE(vtkm::cont::LogLevel::Perf,
"Filter (%d partitions): '%s'",
2021-12-03 20:44:51 +00:00
(int)input.GetNumberOfPartitions(),
vtkm::cont::TypeToString<decltype(*this)>().c_str());
return this->DoExecutePartitions(input);
2021-12-03 20:44:51 +00:00
}
vtkm::cont::DataSet Filter::CreateResult(const vtkm::cont::DataSet& inDataSet) const
{
auto fieldMapper = [](vtkm::cont::DataSet& out, const vtkm::cont::Field& fieldToPass) {
out.AddField(fieldToPass);
};
return this->CreateResult(inDataSet, inDataSet.GetCellSet(), fieldMapper);
}
vtkm::cont::PartitionedDataSet Filter::CreateResult(
const vtkm::cont::PartitionedDataSet& input,
2022-08-26 16:03:20 +00:00
const vtkm::cont::PartitionedDataSet& resultPartitions) const
{
auto fieldMapper = [](vtkm::cont::PartitionedDataSet& out, const vtkm::cont::Field& fieldToPass) {
out.AddField(fieldToPass);
};
return this->CreateResult(input, resultPartitions, fieldMapper);
}
vtkm::Id Filter::DetermineNumberOfThreads(const vtkm::cont::PartitionedDataSet& input)
2021-12-03 20:44:51 +00:00
{
vtkm::Id numDS = input.GetNumberOfPartitions();
vtkm::Id availThreads = 1;
auto& tracker = vtkm::cont::GetRuntimeDeviceTracker();
if (tracker.CanRunOn(vtkm::cont::DeviceAdapterTagCuda{}))
2022-08-10 20:06:42 +00:00
availThreads = this->NumThreadsPerGPU;
2021-12-03 20:44:51 +00:00
else if (tracker.CanRunOn(vtkm::cont::DeviceAdapterTagKokkos{}))
{
//Kokkos doesn't support threading on the CPU.
#ifdef VTKM_KOKKOS_CUDA
2022-08-10 20:06:42 +00:00
availThreads = this->NumThreadsPerGPU;
2021-12-03 20:44:51 +00:00
#else
availThreads = 1;
#endif
}
else if (tracker.CanRunOn(vtkm::cont::DeviceAdapterTagSerial{}))
availThreads = 1;
else
2022-08-10 20:06:42 +00:00
availThreads = this->NumThreadsPerCPU;
2021-12-03 20:44:51 +00:00
vtkm::Id numThreads = std::min<vtkm::Id>(numDS, availThreads);
return numThreads;
}
} // namespace filter
} // namespace vtkm