vtk-m2/vtkm/filter/NewFilter.cxx
Li-Ta Lo 130d0d9dfe Updated Doxygen comments
fixed some typo

more Dosygen updates, remove necessary #include

clarify on name lookup for overloaded virtural function

minor refine on the name lookup rule

rename subdirectory

move virtual functions to .cxx, apply NVI pattern

fixed EXPORT macro

make namespace reflect directory structure
2021-12-13 09:28:13 -07:00

140 lines
3.7 KiB
C++

//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
//
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//============================================================================
#include <vtkm/cont/Algorithm.h>
#include <vtkm/cont/Logging.h>
#include <vtkm/cont/RuntimeDeviceTracker.h>
#include <vtkm/filter/NewFilter.h>
#include <vtkm/filter/TaskQueue.h>
#include <future>
namespace vtkm
{
namespace filter
{
namespace
{
void RunFilter(NewFilter* self,
vtkm::filter::DataSetQueue& input,
vtkm::filter::DataSetQueue& output)
{
std::pair<vtkm::Id, vtkm::cont::DataSet> task;
while (input.GetTask(task))
{
auto outDS = self->Execute(task.second);
output.Push(std::make_pair(task.first, std::move(outDS)));
}
vtkm::cont::Algorithm::Synchronize();
}
} // anonymous namespace
NewFilter::~NewFilter() = default;
bool NewFilter::CanThread() const
{
return true;
}
//----------------------------------------------------------------------------
vtkm::cont::PartitionedDataSet NewFilter::DoExecute(const vtkm::cont::PartitionedDataSet& input)
{
vtkm::cont::PartitionedDataSet output;
if (this->GetRunMultiThreadedFilter())
{
vtkm::filter::DataSetQueue inputQueue(input);
vtkm::filter::DataSetQueue outputQueue;
vtkm::Id numThreads = this->DetermineNumberOfThreads(input);
//Run 'numThreads' filters.
std::vector<std::future<void>> futures(static_cast<std::size_t>(numThreads));
for (std::size_t i = 0; i < static_cast<std::size_t>(numThreads); i++)
{
auto f = std::async(
std::launch::async, RunFilter, this, std::ref(inputQueue), std::ref(outputQueue));
futures[i] = std::move(f);
}
for (auto& f : futures)
f.get();
//Get results from the outputQueue.
output = outputQueue.Get();
}
else
{
for (const auto& inBlock : input)
{
vtkm::cont::DataSet outBlock = this->Execute(inBlock);
output.AppendPartition(outBlock);
}
}
return output;
}
vtkm::cont::DataSet NewFilter::Execute(const vtkm::cont::DataSet& input)
{
return this->DoExecute(input);
}
vtkm::cont::PartitionedDataSet NewFilter::Execute(const vtkm::cont::PartitionedDataSet& input)
{
VTKM_LOG_SCOPE(vtkm::cont::LogLevel::Perf,
"NewFilter (%d partitions): '%s'",
(int)input.GetNumberOfPartitions(),
vtkm::cont::TypeToString<decltype(*this)>().c_str());
this->PreExecute(input);
vtkm::cont::PartitionedDataSet output = this->DoExecute(input);
this->PostExecute(input, output);
return output;
}
vtkm::Id NewFilter::DetermineNumberOfThreads(const vtkm::cont::PartitionedDataSet& input)
{
vtkm::Id numDS = input.GetNumberOfPartitions();
//Aribitrary constants.
const vtkm::Id threadsPerGPU = 8;
const vtkm::Id threadsPerCPU = 4;
vtkm::Id availThreads = 1;
auto& tracker = vtkm::cont::GetRuntimeDeviceTracker();
if (tracker.CanRunOn(vtkm::cont::DeviceAdapterTagCuda{}))
availThreads = threadsPerGPU;
else if (tracker.CanRunOn(vtkm::cont::DeviceAdapterTagKokkos{}))
{
//Kokkos doesn't support threading on the CPU.
#ifdef VTKM_KOKKOS_CUDA
availThreads = threadsPerGPU;
#else
availThreads = 1;
#endif
}
else if (tracker.CanRunOn(vtkm::cont::DeviceAdapterTagSerial{}))
availThreads = 1;
else
availThreads = threadsPerCPU;
vtkm::Id numThreads = std::min<vtkm::Id>(numDS, availThreads);
return numThreads;
}
} // namespace filter
} // namespace vtkm