vtk-m/examples/multi_backend/MultiDeviceGradient.hxx
Kenneth Moreland d4ff67a226 Merge topic 'deprecate-execute-with-policy'
544a078cd Remove use of deprecated policies in examples
06f5119c2 Fix deprecation warning
f29a4712b Correct field types for ComputeMoments filter
a20ec03d0 Disable proxies in filter benchmark
72cd0107e Deprecate Execute with policy

Acked-by: Kitware Robot <kwrobot@kitware.com>
Acked-by: Robert Maynard <robert.maynard@kitware.com>
Merge-request: !2093
2020-05-20 18:28:37 -04:00

253 lines
8.5 KiB
C++

//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
//
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//============================================================================
#include <vtkm/cont/RuntimeDeviceTracker.h>
#include <vtkm/cont/cuda/DeviceAdapterCuda.h>
#include <vtkm/cont/openmp/DeviceAdapterOpenMP.h>
#include <vtkm/cont/tbb/DeviceAdapterTBB.h>
#include <vtkm/filter/Gradient.h>
namespace
{
int determine_cuda_gpu_count()
{
int count = 0;
#if defined(VTKM_ENABLE_CUDA)
int numberOfDevices = 0;
auto res = cudaGetDeviceCount(&numberOfDevices);
if (res == cudaSuccess)
{
count = numberOfDevices;
}
#endif
return count;
}
void process_partition_tbb(RuntimeTaskQueue& queue)
{
//Step 1. Set the device adapter to this thread to TBB.
//This makes sure that any vtkm::filters used by our
//task operate only on TBB. The "global" thread tracker
//is actually thread-local, so we can use that.
//
vtkm::cont::GetRuntimeDeviceTracker().ForceDevice(vtkm::cont::DeviceAdapterTagTBB{});
while (queue.hasTasks())
{
//Step 2. Get the task to run on TBB
auto task = queue.pop();
//Step 3. Run the task on TBB. We check the validity
//of the task since we could be given an empty task
//when the queue is empty and we are shutting down
if (task != nullptr)
{
task();
}
//Step 4. Notify the queue that we finished processing this task
queue.completedTask();
std::cout << "finished a partition on tbb (" << std::this_thread::get_id() << ")" << std::endl;
}
}
void process_partition_openMP(RuntimeTaskQueue& queue)
{
//Step 1. Set the device adapter to this thread to openMP.
//This makes sure that any vtkm::filters used by our
//task operate only on openMP. The "global" thread tracker
//is actually thread-local, so we can use that.
//
vtkm::cont::GetRuntimeDeviceTracker().ForceDevice(vtkm::cont::DeviceAdapterTagOpenMP{});
while (queue.hasTasks())
{
//Step 2. Get the task to run on openMP
auto task = queue.pop();
//Step 3. Run the task on openMP. We check the validity
//of the task since we could be given an empty task
//when the queue is empty and we are shutting down
if (task != nullptr)
{
task();
}
//Step 4. Notify the queue that we finished processing this task
queue.completedTask();
std::cout << "finished a partition on openMP (" << std::this_thread::get_id() << ")"
<< std::endl;
}
}
void process_partition_cuda(RuntimeTaskQueue& queue, int gpuId)
{
//Step 1. Set the device adapter to this thread to cuda.
//This makes sure that any vtkm::filters used by our
//task operate only on cuda. The "global" thread tracker
//is actually thread-local, so we can use that.
//
vtkm::cont::GetRuntimeDeviceTracker().ForceDevice(vtkm::cont::DeviceAdapterTagCuda{});
(void)gpuId;
while (queue.hasTasks())
{
//Step 2. Get the task to run on cuda
auto task = queue.pop();
//Step 3. Run the task on cuda. We check the validity
//of the task since we could be given an empty task
//when the queue is empty and we are shutting down
if (task != nullptr)
{
task();
}
//Step 4. Notify the queue that we finished processing this task
queue.completedTask();
std::cout << "finished a partition on cuda (" << std::this_thread::get_id() << ")" << std::endl;
}
}
} //namespace
//-----------------------------------------------------------------------------
VTKM_CONT MultiDeviceGradient::MultiDeviceGradient()
: ComputePointGradient(false)
, Queue()
, Workers()
{
//Step 1. Determine the number of workers we want
auto& tracker = vtkm::cont::GetRuntimeDeviceTracker();
const bool runOnCuda = tracker.CanRunOn(vtkm::cont::DeviceAdapterTagCuda{});
const bool runOnOpenMP = tracker.CanRunOn(vtkm::cont::DeviceAdapterTagOpenMP{});
const bool runOnTbb = tracker.CanRunOn(vtkm::cont::DeviceAdapterTagTBB{});
//Note currently the virtual implementation has some issues
//In a multi-threaded environment only cuda can be used or
//all SMP backends ( Serial, TBB, OpenMP ).
//Once this issue is resolved we can enable CUDA + TBB in
//this example
//Step 2. Launch workers that will use cuda (if enabled).
//The threads share a queue object so we need to explicitly pass it
//by reference (the std::ref call)
if (runOnCuda)
{
std::cout << "adding cuda workers" << std::endl;
const int gpu_count = determine_cuda_gpu_count();
for (int i = 0; i < gpu_count; ++i)
{
//The number of workers per GPU is purely arbitrary currently,
//but in general we want multiple of them so we can overlap compute
//and transfer
this->Workers.emplace_back(std::bind(process_partition_cuda, std::ref(this->Queue), i));
this->Workers.emplace_back(std::bind(process_partition_cuda, std::ref(this->Queue), i));
this->Workers.emplace_back(std::bind(process_partition_cuda, std::ref(this->Queue), i));
this->Workers.emplace_back(std::bind(process_partition_cuda, std::ref(this->Queue), i));
}
}
//Step 3. Launch a worker that will use openMP (if enabled).
//The threads share a queue object so we need to explicitly pass it
//by reference (the std::ref call)
else if (runOnOpenMP)
{
std::cout << "adding a openMP worker" << std::endl;
this->Workers.emplace_back(std::bind(process_partition_openMP, std::ref(this->Queue)));
}
//Step 4. Launch a worker that will use tbb (if enabled).
//The threads share a queue object so we need to explicitly pass it
//by reference (the std::ref call)
else if (runOnTbb)
{
std::cout << "adding a tbb worker" << std::endl;
this->Workers.emplace_back(std::bind(process_partition_tbb, std::ref(this->Queue)));
}
}
//-----------------------------------------------------------------------------
VTKM_CONT MultiDeviceGradient::~MultiDeviceGradient()
{
this->Queue.shutdown();
//shutdown all workers
for (auto&& thread : this->Workers)
{
thread.join();
}
}
//-----------------------------------------------------------------------------
template <typename DerivedPolicy>
inline VTKM_CONT vtkm::cont::PartitionedDataSet MultiDeviceGradient::PrepareForExecution(
const vtkm::cont::PartitionedDataSet& pds,
const vtkm::filter::PolicyBase<DerivedPolicy>&)
{
//Step 1. Say that we have no more to submit for this PartitionedDataSet
//This is needed to happen for each execute as we want to support
//the same filter being used for multiple inputs
this->Queue.reset();
//Step 2. Construct the PartitionedDataSet we are going to fill. The size
//signature to PartitionedDataSet just reserves size
vtkm::cont::PartitionedDataSet output;
output.AppendPartitions(
std::vector<vtkm::cont::DataSet>(static_cast<size_t>(pds.GetNumberOfPartitions())));
vtkm::cont::PartitionedDataSet* outPtr = &output;
//Step 3. Construct the filter we want to run on each partition
vtkm::filter::Gradient gradient;
gradient.SetComputePointGradient(this->GetComputePointGradient());
gradient.SetActiveField(this->GetActiveFieldName());
//Step 3b. Post 1 partition up as work and block until it is
//complete. This is needed as currently constructing the virtual
//Point Coordinates is not thread safe.
auto partition = pds.cbegin();
{
vtkm::cont::DataSet input = *partition;
this->Queue.push( //build a lambda that is the work to do
[=]() {
vtkm::filter::Gradient perThreadGrad = gradient;
vtkm::cont::DataSet result = perThreadGrad.Execute(input);
outPtr->ReplacePartition(0, result);
});
this->Queue.waitForAllTasksToComplete();
partition++;
}
vtkm::Id index = 1;
for (; partition != pds.cend(); ++partition)
{
vtkm::cont::DataSet input = *partition;
//Step 4. For each input partition construct a lambda
//and add it to the queue for workers to take. This
//will allows us to have multiple works execute in a non
//blocking manner
this->Queue.push( //build a lambda that is the work to do
[=]() {
vtkm::filter::Gradient perThreadGrad = gradient;
vtkm::cont::DataSet result = perThreadGrad.Execute(input);
outPtr->ReplacePartition(index, result);
});
index++;
}
// Step 5. Wait on all workers to finish
this->Queue.waitForAllTasksToComplete();
return output;
}