multi_backend shows how a filter can use multiple device adapter

This commit is contained in:
Robert Maynard 2018-05-04 14:42:46 -04:00
parent 7cf0926172
commit ad545dad5b
8 changed files with 748 additions and 112 deletions

@ -23,15 +23,28 @@ cmake_minimum_required(VERSION 3.3 FATAL_ERROR)
project(MultiBackend CXX)
#Find the VTK-m package
find_package(VTKm REQUIRED QUIET)
find_package(VTKm REQUIRED)
find_package(Threads REQUIRED QUIET)
set(headers
IOGenerator.h
MultiDeviceGradient.h
TaskQueue.h
)
set(device_srcs
MultiDeviceGradient.cxx
)
set(srcs
MultiBackend.cxx)
IOGenerator.cxx
MultiBackend.cxx
)
if(TARGET vtkm::cuda)
vtkm_compile_as_cuda(cuda_srcs ${srcs})
set(srcs ${cuda_srcs})
vtkm_compile_as_cuda(cuda_srcs ${device_srcs})
set(device_srcs ${cuda_srcs})
endif()
add_executable(MultiBackend ${srcs})
target_link_libraries(MultiBackend PRIVATE vtkm_cont)
add_executable(MultiBackend ${device_srcs} ${srcs} ${headers})
target_link_libraries(MultiBackend PRIVATE vtkm_cont Threads::Threads)

@ -0,0 +1,111 @@
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//
// Copyright 2014 National Technology & Engineering Solutions of Sandia, LLC (NTESS).
// Copyright 2014 UT-Battelle, LLC.
// Copyright 2014 Los Alamos National Security.
//
// Under the terms of Contract DE-NA0003525 with NTESS,
// the U.S. Government retains certain rights in this software.
//
// Under the terms of Contract DE-AC52-06NA25396 with Los Alamos National
// Laboratory (LANL), the U.S. Government retains certain rights in
// this software.
//============================================================================
#include "IOGenerator.h"
#include <vtkm/Math.h>
#include <vtkm/cont/DataSetBuilderUniform.h>
#include <vtkm/cont/DataSetFieldAdd.h>
#include <vtkm/worklet/DispatcherMapField.h>
#include <vtkm/worklet/WorkletMapField.h>
#include <vtkm/cont/serial/DeviceAdapterSerial.h>
#include <chrono>
#include <random>
struct WaveField : public vtkm::worklet::WorkletMapField
{
typedef void ControlSignature(FieldIn<Vec3>, FieldOut<Vec3>);
typedef void ExecutionSignature(_1, _2);
template <typename T>
VTKM_EXEC void operator()(const vtkm::Vec<T, 3>& input, vtkm::Vec<T, 3>& output) const
{
output[0] = input[0];
output[1] = 0.25f * vtkm::Sin(input[0]) * vtkm::Cos(input[2]);
output[2] = input[2];
}
};
vtkm::cont::DataSet make_test3DImageData(int xdim, int ydim, int zdim)
{
using Builder = vtkm::cont::DataSetBuilderUniform;
using FieldAdd = vtkm::cont::DataSetFieldAdd;
vtkm::cont::DataSet ds = Builder::Create(vtkm::Id3{ xdim, ydim, zdim });
vtkm::cont::ArrayHandle<vtkm::Vec<vtkm::Float32, 3>> field;
vtkm::worklet::DispatcherMapField<WaveField, vtkm::cont::DeviceAdapterTagSerial> dispatcher;
dispatcher.Invoke(ds.GetCoordinateSystem(), field);
FieldAdd::AddPointField(ds, "vec_field", field);
return ds;
}
//=================================================================
void io_generator(TaskQueue<vtkm::cont::MultiBlock>& queue, std::size_t numberOfTasks)
{
//Step 1. We want to build an initial set of blocks
//that vary in size. This way we can generate uneven
//work to show off the vtk-m filter work distribution
vtkm::cont::DataSet small = make_test3DImageData(128, 128, 128);
vtkm::cont::DataSet medium = make_test3DImageData(256, 256, 128);
vtkm::cont::DataSet large = make_test3DImageData(512, 512, 128);
std::vector<vtkm::cont::DataSet> blocks;
blocks.push_back(small);
blocks.push_back(medium);
blocks.push_back(large);
std::mt19937 rng;
//uniform_int_distribution is a closed interval [] so both the min and max
//can be chosen values
std::uniform_int_distribution<vtkm::Id> blockNumGen(6, 32);
std::uniform_int_distribution<std::size_t> blockPicker(0, blocks.size() - 1);
for (std::size_t i = 0; i < numberOfTasks; ++i)
{
//Step 2. Construct a random number of blocks
const vtkm::Id numberOfBlocks = blockNumGen(rng);
//Step 3. Randomly pick the blocks in the dataset
vtkm::cont::MultiBlock mb(numberOfBlocks);
for (vtkm::Id b = 0; b < numberOfBlocks; ++b)
{
mb.AddBlock(blocks[blockPicker(rng)]);
}
std::cout << "adding multi-block with " << mb.GetNumberOfBlocks() << " blocks" << std::endl;
//Step 4. Add the multi-block to the queue. We explicitly
//use std::move to signal that this thread can't use the
//mb object after this call
queue.push(std::move(mb));
//Step 5. Go to sleep for a period of time to replicate
//data stream in
// std::this_thread::sleep_for(std::chrono::seconds(1));
}
//Step 6. Tell the queue that we are done submitting work
queue.shutdown();
std::cout << "io_generator finished" << std::endl;
}

@ -0,0 +1,30 @@
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//
// Copyright 2014 National Technology & Engineering Solutions of Sandia, LLC (NTESS).
// Copyright 2014 UT-Battelle, LLC.
// Copyright 2014 Los Alamos National Security.
//
// Under the terms of Contract DE-NA0003525 with NTESS,
// the U.S. Government retains certain rights in this software.
//
// Under the terms of Contract DE-AC52-06NA25396 with Los Alamos National
// Laboratory (LANL), the U.S. Government retains certain rights in
// this software.
//============================================================================
#ifndef vtk_m_examples_multibackend_IOWorker_h
#define vtk_m_examples_multibackend_IOWorker_h
#include "TaskQueue.h"
#include <vtkm/cont/DataSet.h>
#include <vtkm/cont/MultiBlock.h>
vtkm::cont::DataSet make_test3DImageData(int xdim, int ydim, int zdim);
void io_generator(TaskQueue<vtkm::cont::MultiBlock>& queue, std::size_t numberOfTasks);
#endif

@ -17,118 +17,96 @@
// Laboratory (LANL), the U.S. Government retains certain rights in
// this software.
//============================================================================
#include <iostream>
#include <thread>
#include <vtkm/Math.h>
#include <vtkm/cont/ArrayHandle.h>
#include <vtkm/cont/RuntimeDeviceInformation.h>
#include <vtkm/cont/MultiBlock.h>
#include <vtkm/worklet/DispatcherMapField.h>
#include <vtkm/worklet/WorkletMapField.h>
#include "IOGenerator.h"
#include "MultiDeviceGradient.h"
#include "TaskQueue.h"
#include <vtkm/cont/TryExecute.h>
#include <vtkm/cont/cuda/DeviceAdapterCuda.h>
#include <vtkm/cont/serial/DeviceAdapterSerial.h>
#include <vtkm/cont/tbb/DeviceAdapterTBB.h>
using FloatVec3 = vtkm::Vec<vtkm::Float32, 3>;
using Uint8Vec4 = vtkm::Vec<vtkm::UInt8, 4>;
struct GenerateSurfaceWorklet : public vtkm::worklet::WorkletMapField
{
vtkm::Float32 t;
GenerateSurfaceWorklet(vtkm::Float32 st)
: t(st)
{
}
typedef void ControlSignature(FieldIn<>, FieldOut<>, FieldOut<>);
typedef void ExecutionSignature(_1, _2, _3);
template <typename T>
VTKM_EXEC void operator()(const vtkm::Vec<T, 3>& input,
vtkm::Vec<T, 3>& output,
vtkm::Vec<vtkm::UInt8, 4>& color) const
{
output[0] = input[0];
output[1] = 0.25f * vtkm::Sin(input[0] * 10.f + t) * vtkm::Cos(input[2] * 10.f + t);
output[2] = input[2];
color[0] = 0;
color[1] = static_cast<vtkm::UInt8>(160 + (96 * vtkm::Sin(input[0] * 10.f + t)));
color[2] = static_cast<vtkm::UInt8>(160 + (96 * vtkm::Cos(input[2] * 5.f + t)));
color[3] = 255;
}
};
struct RunGenerateSurfaceWorklet
{
template <typename DeviceAdapterTag>
bool operator()(DeviceAdapterTag) const
{
//At this point we know we have runtime support
using DeviceTraits = vtkm::cont::DeviceAdapterTraits<DeviceAdapterTag>;
using DispatcherType =
vtkm::worklet::DispatcherMapField<GenerateSurfaceWorklet, DeviceAdapterTag>;
std::cout << "Running a worklet on device adapter: " << DeviceTraits::GetName() << std::endl;
GenerateSurfaceWorklet worklet(0.05f);
DispatcherType(worklet).Invoke(this->In, this->Out, this->Color);
return true;
}
vtkm::cont::ArrayHandle<FloatVec3> In;
vtkm::cont::ArrayHandle<FloatVec3> Out;
vtkm::cont::ArrayHandle<Uint8Vec4> Color;
};
template <typename T>
std::vector<vtkm::Vec<T, 3>> make_testData(int size)
{
std::vector<vtkm::Vec<T, 3>> data;
data.reserve(static_cast<std::size_t>(size * size));
for (int i = 0; i < size; ++i)
{
for (int j = 0; j < size; ++j)
{
data.push_back(vtkm::Vec<T, 3>(
2.f * static_cast<T>(i / size) - 1.f, 0.f, 2.f * static_cast<T>(j / size) - 1.f));
}
}
return data;
}
//This is the list of devices to compile in support for. The order of the
//devices determines the runtime preference.
struct DevicesToTry : vtkm::ListTagBase<vtkm::cont::DeviceAdapterTagCuda,
vtkm::cont::DeviceAdapterTagTBB,
vtkm::cont::DeviceAdapterTagSerial>
{
};
//This demo shows off using vtk-m in multiple threads in two different ways.
//
//At a high level we have 2 primary threads, an IO thread and a Worker thread
//The IO thread will generate all data using the vtk-m serial device, and
//will post this data to a worker queue as a vtk-m multiblock.
//The Worker thread will pull down these vtk-m multiblock data and run a
//vtk-m filter on the multiblock.
//The vtk-m filter it runs will itself have a worker pool which it will
//distribute work too. The number of workers is based on what device adapters
//are enabled but uses the following logic:
// - If TBB is enabled construct a single TBB worker
// - If CUDA is enabled construct 4 workers for each GPU on the machine
//
//Unfortunately due to some thread unsafe logic in VTK-m it is currently not
//possible to have CUDA and TBB workers at the same time. So the class will
//choose CUDA over TBB when possible.
//Once the thread unsafe logic is fixed a machine that has a single CPU
//and single GPU we should expect that we will have 2 primary 'main loop'
//threads, and 5 threads for heavy 'task' work.
void multiblock_processing(TaskQueue<vtkm::cont::MultiBlock>& queue);
int main(int, char**)
{
std::vector<FloatVec3> data = make_testData<vtkm::Float32>(1024);
//Step 1. Construct the two primary 'main loops'. The threads
//share a queue object so we need to explicitly pass it
//by reference (the std::ref call)
TaskQueue<vtkm::cont::MultiBlock> queue;
std::thread io(io_generator, std::ref(queue), 12);
std::thread worker(multiblock_processing, std::ref(queue));
//make array handles for the data
// TryExecutes takes a functor and a list of devices. It then tries to run
// the functor for each device (in the order given in the list) until the
// execution succeeds. This allows you to compile in support for multiple
// devices which have runtime requirements ( GPU / HW Accelerator ) and
// correctly choose the best device at runtime.
//
// The functor parentheses operator should take exactly one argument, which is
// the DeviceAdapterTag to use. The functor should return true if the execution
// succeeds.
//
// This function also optionally takes a vtkm::cont::RuntimeDeviceTracker, which
// will monitor for certain failures across calls to TryExecute and skip trying
// devices with a history of failure.
RunGenerateSurfaceWorklet task;
task.In = vtkm::cont::make_ArrayHandle(data);
vtkm::cont::TryExecute(task, DevicesToTry());
//Step N. Wait for the work to finish
io.join();
worker.join();
return 0;
}
//=================================================================
void multiblock_processing(TaskQueue<vtkm::cont::MultiBlock>& queue)
{
//Step 1. Construct the gradient filter outside the work loop
//so that we can reuse the thread pool it constructs
MultiDeviceGradient gradient;
gradient.SetComputePointGradient(true);
while (queue.hasTasks())
{
//Step 2. grab the next multi-block skipping any that are empty
//as empty ones can be returned when the queue is about
//to say it has no work
vtkm::cont::MultiBlock mb = queue.pop();
if (mb.GetNumberOfBlocks() == 0)
{
continue;
}
//Step 3. Get the first field name from the multi-block
std::string fieldName = mb.GetBlock(0).GetField(0).GetName();
//Step 4. Run a multi device gradient
gradient.SetActiveField(fieldName);
vtkm::cont::MultiBlock result = gradient.Execute(mb);
std::cout << "finished processing a multi-block" << std::endl;
//Step 5. Verify each block has a "Gradients" field
for (auto&& block : result)
{
// std::cout << std::endl << std::endl << std::endl;
// std::cout << "block: " << std::endl;
// block.PrintSummary(std::cout);
try
{
const auto& field = block.GetField("Gradients", vtkm::cont::Field::ASSOC_POINTS);
(void)field;
}
catch (vtkm::cont::ErrorBadValue)
{
std::cerr << "gradient filter failed!" << std::endl;
break;
}
}
}
std::cout << "multiblock_processing finished" << std::endl;
}

@ -0,0 +1,28 @@
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//
// Copyright 2014 National Technology & Engineering Solutions of Sandia, LLC (NTESS).
// Copyright 2014 UT-Battelle, LLC.
// Copyright 2014 Los Alamos National Security.
//
// Under the terms of Contract DE-NA0003525 with NTESS,
// the U.S. Government retains certain rights in this software.
//
// Under the terms of Contract DE-AC52-06NA25396 with Los Alamos National
// Laboratory (LANL), the U.S. Government retains certain rights in
// this software.
//============================================================================
#define vtk_m_examples_multibackend_MultiDeviceGradient_cxx
#include "MultiDeviceGradient.h"
#include "MultiDeviceGradient.hxx"
template vtkm::cont::MultiBlock MultiDeviceGradient::PrepareForExecution<
vtkm::filter::PolicyDefault>(const vtkm::cont::MultiBlock&,
const vtkm::filter::PolicyBase<vtkm::filter::PolicyDefault>&);

@ -0,0 +1,94 @@
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//
// Copyright 2014 National Technology & Engineering Solutions of Sandia, LLC (NTESS).
// Copyright 2014 UT-Battelle, LLC.
// Copyright 2014 Los Alamos National Security.
//
// Under the terms of Contract DE-NA0003525 with NTESS,
// the U.S. Government retains certain rights in this software.
//
// Under the terms of Contract DE-AC52-06NA25396 with Los Alamos National
// Laboratory (LANL), the U.S. Government retains certain rights in
// this software.
//============================================================================
#ifndef vtk_m_examples_multibackend_MultiDeviceGradient_h
#define vtk_m_examples_multibackend_MultiDeviceGradient_h
#include <vtkm/filter/FilterField.h>
#include <vtkm/filter/FilterTraits.h>
#include "TaskQueue.h"
#include <thread>
using RuntimeTaskQueue = TaskQueue<std::function<void(const vtkm::cont::RuntimeDeviceTracker&)>>;
/// \brief Construct a MultiDeviceGradient for a given multiblock dataset
///
/// The Policy used with MultiDeviceGradient must include the TBB and CUDA
/// backends.
class MultiDeviceGradient : public vtkm::filter::FilterField<MultiDeviceGradient>
{
public:
//Construct a MultiDeviceGradient and worker pool
VTKM_CONT
MultiDeviceGradient();
//Needed so that we can shut down the worker pool properly
VTKM_CONT
~MultiDeviceGradient();
/// When this flag is on (default is off), the gradient filter will provide a
/// point based gradients, which are significantly more costly since for each
/// point we need to compute the gradient of each cell that uses it.
void SetComputePointGradient(bool enable) { ComputePointGradient = enable; }
bool GetComputePointGradient() const { return ComputePointGradient; }
/// Will submit each block to a work queue that the threads will
/// pull work from
template <typename DerivedPolicy>
VTKM_CONT vtkm::cont::MultiBlock PrepareForExecution(
const vtkm::cont::MultiBlock&,
const vtkm::filter::PolicyBase<DerivedPolicy>&);
private:
bool ComputePointGradient;
RuntimeTaskQueue Queue;
std::vector<std::thread> Workers;
};
namespace vtkm
{
namespace filter
{
template <>
class FilterTraits<MultiDeviceGradient>
{
public:
struct TypeListTagGradientInputs : vtkm::ListTagBase<vtkm::Float32,
vtkm::Float64,
vtkm::Vec<vtkm::Float32, 3>,
vtkm::Vec<vtkm::Float64, 3>>
{
};
using InputFieldTypeList = TypeListTagGradientInputs;
};
}
} // namespace vtkm::filter
#ifndef vtk_m_examples_multibackend_MultiDeviceGradient_cxx
extern template vtkm::cont::MultiBlock MultiDeviceGradient::PrepareForExecution<
vtkm::filter::PolicyDefault>(const vtkm::cont::MultiBlock&,
const vtkm::filter::PolicyBase<vtkm::filter::PolicyDefault>&);
#endif
#endif

@ -0,0 +1,231 @@
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//
// Copyright 2014 National Technology & Engineering Solutions of Sandia, LLC (NTESS).
// Copyright 2014 UT-Battelle, LLC.
// Copyright 2014 Los Alamos National Security.
//
// Under the terms of Contract DE-NA0003525 with NTESS,
// the U.S. Government retains certain rights in this software.
//
// Under the terms of Contract DE-AC52-06NA25396 with Los Alamos National
// Laboratory (LANL), the U.S. Government retains certain rights in
// this software.
//============================================================================
#include <vtkm/cont/RuntimeDeviceTracker.h>
#include <vtkm/cont/cuda/DeviceAdapterCuda.h>
#include <vtkm/cont/tbb/DeviceAdapterTBB.h>
#include <vtkm/filter/Gradient.h>
namespace
{
vtkm::Id deterine_cuda_gpu_count()
{
vtkm::Id count = 0;
#if defined(VTKM_ENABLE_CUDA)
int numberOfDevices = 0;
auto res = cudaGetDeviceCount(&numberOfDevices);
if (res == cudaSuccess)
{
count = static_cast<vtkm::Id>(numberOfDevices);
}
#endif
return count;
}
void process_block_tbb(RuntimeTaskQueue& queue)
{
//Step 1. Set the device adapter to this thread to TBB.
//This makes sure that any vtkm::filters used by our
//task operate only on TBB
//
vtkm::cont::RuntimeDeviceTracker tracker;
tracker.ForceDevice(vtkm::cont::DeviceAdapterTagTBB{});
while (queue.hasTasks())
{
//Step 2. Get the task to run on TBB
auto task = queue.pop();
//Step 3. Run the task on TBB. We check the validity
//of the task since we could be given an empty task
//when the queue is empty and we are shutting down
if (task != nullptr)
{
task(tracker);
}
//Step 4. Notify the queue that we finished processing this task
queue.completedTask();
std::cout << "finished a block on tbb (" << std::this_thread::get_id() << ")" << std::endl;
}
}
void process_block_cuda(RuntimeTaskQueue& queue, int gpuId)
{
//Step 1. Set the device adapter to this thread to cuda.
//This makes sure that any vtkm::filters used by our
//task operate only on cuda
//
vtkm::cont::RuntimeDeviceTracker tracker;
#if defined(VTKM_ENABLE_CUDA)
auto error = cudaSetDevice(gpuId);
tracker.ForceDevice(vtkm::cont::DeviceAdapterTagCuda{});
#endif
(void)gpuId;
while (queue.hasTasks())
{
//Step 2. Get the task to run on cuda
auto task = queue.pop();
//Step 3. Run the task on TBB. We check the validity
//of the task since we could be given an empty task
//when the queue is empty and we are shutting down
if (task != nullptr)
{
task(tracker);
}
//Step 4. Notify the queue that we finished processing this task
queue.completedTask();
std::cout << "finished a block on cuda (" << std::this_thread::get_id() << ")" << std::endl;
}
}
} //namespace
//-----------------------------------------------------------------------------
VTKM_CONT MultiDeviceGradient::MultiDeviceGradient()
: ComputePointGradient(false)
, Queue()
, Workers()
{
//Step 1. Determine the number of workers we want
vtkm::cont::RuntimeDeviceTracker tracker;
const bool runOnTbb = tracker.CanRunOn(vtkm::cont::DeviceAdapterTagTBB{});
const bool runOnCuda = tracker.CanRunOn(vtkm::cont::DeviceAdapterTagCuda{});
//Note currently the virtual implementation has some issues
//In a multi-threaded enviornment only cuda can be used or
//all SMP backends ( Serial, TBB, OpenMP ).
//Once this issue is resolved we can enable CUDA + TBB in
//this example
//Step 2. Launch workers that will use cuda (if enabled).
//The threads share a queue object so we need to explicitly pass it
//by reference (the std::ref call)
if (runOnCuda)
{
std::cout << "adding cuda workers" << std::endl;
const vtkm::Id gpu_count = deterine_cuda_gpu_count();
for (vtkm::Id i = 0; i < gpu_count; ++i)
{
//The number of workers per GPU is purely arbitrary currently,
//but in general we want multiple of them so we can overlap compute
//and transfer
this->Workers.emplace_back(process_block_cuda, std::ref(this->Queue), i);
this->Workers.emplace_back(process_block_cuda, std::ref(this->Queue), i);
this->Workers.emplace_back(process_block_cuda, std::ref(this->Queue), i);
this->Workers.emplace_back(process_block_cuda, std::ref(this->Queue), i);
}
}
//Step 3. Launch a worker that will use tbb (if enabled).
//The threads share a queue object so we need to explicitly pass it
//by reference (the std::ref call)
else if (runOnTbb)
{
std::cout << "adding a tbb worker" << std::endl;
this->Workers.emplace_back(process_block_tbb, std::ref(this->Queue));
}
}
//-----------------------------------------------------------------------------
VTKM_CONT MultiDeviceGradient::~MultiDeviceGradient()
{
this->Queue.shutdown();
//shutdown all workers
for (auto&& thread : this->Workers)
{
thread.join();
}
}
//-----------------------------------------------------------------------------
template <typename DerivedPolicy>
inline VTKM_CONT vtkm::cont::MultiBlock MultiDeviceGradient::PrepareForExecution(
const vtkm::cont::MultiBlock& mb,
const vtkm::filter::PolicyBase<DerivedPolicy>& policy)
{
//Step 1. Say that we have no more to submit for this multi block
//This is needed to happen for each execute as we want to support
//the same filter being used for multiple inputs
this->Queue.reset();
//Step 2. Construct the multi-block we are going to fill. The size signature
//to MultiBlock just reserves size
vtkm::cont::MultiBlock output;
output.AddBlocks(std::vector<vtkm::cont::DataSet>(mb.GetNumberOfBlocks()));
vtkm::cont::MultiBlock* outPtr = &output;
//Step 3. Construct the filter we want to run on each block
vtkm::filter::Gradient gradient;
gradient.SetComputePointGradient(this->GetComputePointGradient());
gradient.SetActiveField(this->GetActiveFieldName());
//Step 3b. Post 1 block up as work and block intil it is
//complete. This is needed as currently constructing the virtual
//Point Coordinates is not thread safe.
auto block = mb.cbegin();
{
vtkm::cont::DataSet input = *block;
this->Queue.push( //build a lambda that is the work to do
[=](const vtkm::cont::RuntimeDeviceTracker& tracker) {
//make a per thread copy of the filter
//and give it the device tracker
vtkm::filter::Gradient perThreadGrad = gradient;
perThreadGrad.SetRuntimeDeviceTracker(tracker);
vtkm::cont::DataSet result = perThreadGrad.Execute(input, policy);
outPtr->ReplaceBlock(0, result);
});
this->Queue.waitForAllTasksToComplete();
block++;
}
vtkm::Id index = 1;
for (; block != mb.cend(); ++block)
{
vtkm::cont::DataSet input = *block;
//Step 4. For each input block construct a lambda
//and add it to the queue for workers to take. This
//will allows us to have multiple works execute in a non
//blocking manner
this->Queue.push( //build a lambda that is the work to do
[=](const vtkm::cont::RuntimeDeviceTracker& tracker) {
//make a per thread copy of the filter
//and give it the device tracker
vtkm::filter::Gradient perThreadGrad = gradient;
perThreadGrad.SetRuntimeDeviceTracker(tracker);
vtkm::cont::DataSet result = perThreadGrad.Execute(input, policy);
outPtr->ReplaceBlock(index, result);
});
index++;
}
// Step 5. Wait on all workers to finish
this->Queue.waitForAllTasksToComplete();
return output;
}

@ -0,0 +1,151 @@
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//
// Copyright 2014 National Technology & Engineering Solutions of Sandia, LLC (NTESS).
// Copyright 2014 UT-Battelle, LLC.
// Copyright 2014 Los Alamos National Security.
//
// Under the terms of Contract DE-NA0003525 with NTESS,
// the U.S. Government retains certain rights in this software.
//
// Under the terms of Contract DE-AC52-06NA25396 with Los Alamos National
// Laboratory (LANL), the U.S. Government retains certain rights in
// this software.
//============================================================================
#ifndef vtk_m_examples_multibackend_TaskQueue_h
#define vtk_m_examples_multibackend_TaskQueue_h
#include <vtkm/cont/MultiBlock.h>
#include <condition_variable>
#include <mutex>
#include <queue>
template <typename T>
class TaskQueue
{
public:
TaskQueue() = default;
void reset()
{
{
std::unique_lock<std::mutex> lock(this->Lock);
this->ShutdownOnceTasksCompleted = false;
this->TaskCount = 0;
}
this->CV.notify_all();
}
void shutdown()
{
{
std::unique_lock<std::mutex> lock(this->Lock);
this->ShutdownOnceTasksCompleted = true;
}
this->CV.notify_all();
}
//Say we always have tasks while the producer (IO) hasn't
//reported it is finished adding tasks. Once it has finished
//submitting tasks, we run until the queue is empty
bool hasTasks()
{
{
std::unique_lock<std::mutex> lock(this->Lock);
if (this->ShutdownOnceTasksCompleted)
{
return this->Queue.size() > 0;
}
return true;
}
}
//Add a task to the Queue.
void push(T&& item)
{
{
std::unique_lock<std::mutex> lock(this->Lock);
this->Queue.push(item);
this->TaskCount++;
} //unlock before we notify so we don't deadlock
this->CV.notify_all();
}
//Get a task from the Queue.
T pop()
{
T item;
{
//wait for a job to come into the queue
std::unique_lock<std::mutex> lock(this->Lock);
this->CV.wait(lock, [this] {
//if we are shutting down we need to always wake up
if (this->ShutdownOnceTasksCompleted)
{
return true;
}
//if we aren't shutting down sleep when we have no work
return this->Queue.size() > 0;
});
//When shutting down we don't check the queue size
//so make sure we have something to pop
if (this->Queue.size() > 0)
{
//take the job
item = this->Queue.front();
this->Queue.pop();
}
} //unlock before we notify so we don't deadlock
this->CV.notify_all();
return item;
}
//Report that you finished processing a task popped from
//the Queue
void completedTask()
{
{
std::unique_lock<std::mutex> lock(this->Lock);
this->TaskCount--;
} //unlock before we notify so we don't deadlock
this->CV.notify_all();
}
//Wait for all task to be removed from the queue
//and to be completed
//For this to , threads after processing the
//data they got from pop() must call didTask()
//
void waitForAllTasksToComplete()
{
{
std::unique_lock<std::mutex> lock(this->Lock);
this->CV.wait(lock, [this] { return this->TaskCount == 0; });
}
this->CV.notify_all();
}
private:
std::mutex Lock;
std::queue<T> Queue;
std::condition_variable CV;
int TaskCount = 0;
bool ShutdownOnceTasksCompleted = false;
//don't want copies of this
TaskQueue(const TaskQueue& rhs) = delete;
TaskQueue& operator=(const TaskQueue& rhs) = delete;
TaskQueue(TaskQueue&& rhs) = delete;
TaskQueue& operator=(TaskQueue&& rhs) = delete;
};
#endif