distributed memory support for streamline and particleadvect filters.

This commit is contained in:
dpugmire 2020-08-13 09:53:57 -04:00
parent 6dbf304000
commit c806403e2a
25 changed files with 2656 additions and 22 deletions

@ -28,6 +28,7 @@ if(VTKm_ENABLE_EXAMPLES)
add_subdirectory(multi_backend)
add_subdirectory(oscillator)
add_subdirectory(particle_advection)
add_subdirectory(streamline_mpi)
add_subdirectory(polyline_archimedean_helix)
add_subdirectory(redistribute_points)
add_subdirectory(temporal_advection)

@ -0,0 +1,27 @@
##============================================================================
## Copyright (c) Kitware, Inc.
## All rights reserved.
## See LICENSE.txt for details.
##
## This software is distributed WITHOUT ANY WARRANTY; without even
## the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
## PURPOSE. See the above copyright notice for more information.
##============================================================================
cmake_minimum_required(VERSION 3.12...3.15 FATAL_ERROR)
project(StreamlineMPI CXX)
#Find the VTK-m package
find_package(VTKm REQUIRED QUIET)
if (VTKm_ENABLE_MPI)
add_executable(StreamlineMPI StreamlineMPI.cxx)
target_compile_definitions(StreamlineMPI PRIVATE "MPI_ENABLED")
target_link_libraries(StreamlineMPI PRIVATE vtkm_filter vtkm_io MPI::MPI_CXX)
vtkm_add_target_information(StreamlineMPI
DROP_UNUSED_SYMBOLS MODIFY_CUDA_FLAGS
DEVICE_SOURCES StreamlineMPI.cxx)
endif()
#if(TARGET vtkm::tbb)
# target_compile_definitions(streamline_mpi PRIVATE BUILDING_TBB_VERSION)
#endif()

@ -0,0 +1,120 @@
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
//
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//============================================================================
#include <vtkm/cont/AssignerPartitionedDataSet.h>
#include <vtkm/cont/DataSet.h>
#include <vtkm/cont/EnvironmentTracker.h>
#include <vtkm/cont/Field.h>
#include <vtkm/cont/Initialize.h>
#include <vtkm/cont/PartitionedDataSet.h>
#include <vtkm/filter/Streamline.h>
#include <vtkm/io/VTKDataSetReader.h>
#include <vtkm/io/VTKDataSetWriter.h>
#include <vtkm/io/reader/VTKDataSetReader.h>
#include <mpi.h>
#include <vtkm/thirdparty/diy/diy.h>
#include <vtkm/thirdparty/diy/mpi-cast.h>
#include <vtkm/filter/ParticleAdvection.h>
#include <vtkm/filter/particleadvection/BoundsMap.h>
#include <vtkm/filter/particleadvection/ParticleMessenger.h>
void LoadData(std::string& fname, std::vector<vtkm::cont::DataSet>& dataSets, int rank, int nRanks)
{
std::string buff;
std::ifstream is;
is.open(fname);
std::cout << "Opening: " << fname << std::endl;
if (!is)
{
std::cout << "File not found! : " << fname << std::endl;
throw "unknown file: " + fname;
}
auto p0 = fname.rfind(".visit");
if (p0 == std::string::npos)
throw "Only .visit files are supported.";
auto tmp = fname.substr(0, p0);
auto p1 = tmp.rfind("/");
auto dir = tmp.substr(0, p1);
std::getline(is, buff);
auto numBlocks = std::stoi(buff.substr(buff.find("!NBLOCKS ") + 9, buff.size()));
if (rank == 0)
std::cout << "numBlocks= " << numBlocks << std::endl;
int nPer = numBlocks / nRanks;
int b0 = rank * nPer, b1 = (rank + 1) * nPer;
if (rank == (nRanks - 1))
b1 = numBlocks;
for (int i = 0; i < numBlocks; i++)
{
std::getline(is, buff);
if (i >= b0 && i < b1)
{
vtkm::cont::DataSet ds;
std::string vtkFile = dir + "/" + buff;
vtkm::io::reader::VTKDataSetReader reader(vtkFile);
ds = reader.ReadDataSet();
auto f = ds.GetField("grad").GetData();
vtkm::cont::ArrayHandle<vtkm::Vec<double, 3>> fieldArray;
fieldArray = f.Cast<vtkm::cont::ArrayHandle<vtkm::Vec<double, 3>>>();
int n = fieldArray.GetNumberOfValues();
auto portal = fieldArray.WritePortal();
for (int ii = 0; ii < n; ii++)
portal.Set(ii, vtkm::Vec<double, 3>(1, 0, 0));
dataSets.push_back(ds);
}
}
}
// Example computing streamlines.
// An example vector field is available in the vtk-m data directory: magField.vtk
// Example usage:
// this will advect 200 particles 50 steps using a step size of 0.01
//
// Particle_Advection <path-to-data-dir>/magField.vtk vec 200 50 0.01 output.vtk
//
int main(int argc, char** argv)
{
MPI_Init(&argc, &argv);
auto comm = vtkm::cont::EnvironmentTracker::GetCommunicator();
int rank = comm.rank();
int size = comm.size();
std::string dataFile = argv[1];
std::vector<vtkm::cont::DataSet> dataSets;
LoadData(dataFile, dataSets, rank, size);
vtkm::filter::ParticleAdvection pa;
vtkm::cont::ArrayHandle<vtkm::Massless> seedArray;
std::vector<vtkm::Massless> seeds;
seeds.push_back(vtkm::Massless(vtkm::Vec3f(.1f, .1f, .9f), 0));
seeds.push_back(vtkm::Massless(vtkm::Vec3f(.1f, .6f, .6f), 1));
seeds.push_back(vtkm::Massless(vtkm::Vec3f(.1f, .9f, .1f), 2));
seedArray = vtkm::cont::make_ArrayHandle(seeds);
pa.SetStepSize(0.001f);
pa.SetNumberOfSteps(10000);
pa.SetSeeds(seedArray);
pa.SetActiveField("grad");
vtkm::cont::PartitionedDataSet pds(dataSets);
auto output = pa.Execute(pds);
output.PrintSummary(std::cout);
return 0;
}

@ -28,7 +28,8 @@ namespace cont
template <typename ParticleType>
VTKM_ALWAYS_EXPORT inline void ParticleArrayCopy(
const vtkm::cont::ArrayHandle<ParticleType, vtkm::cont::StorageTagBasic>& inP,
vtkm::cont::ArrayHandle<vtkm::Vec3f, vtkm::cont::StorageTagBasic>& outPos);
vtkm::cont::ArrayHandle<vtkm::Vec3f, vtkm::cont::StorageTagBasic>& outPos,
bool CopyTerminatedOnly = false);
/// \brief Copy all fields in vtkm::Particle to standard types.
///

@ -11,6 +11,9 @@
#ifndef vtk_m_cont_ParticleArrayCopy_hxx
#define vtk_m_cont_ParticleArrayCopy_hxx
#include <vtkm/cont/Algorithm.h>
#include <vtkm/cont/ArrayCopy.h>
#include <vtkm/cont/ArrayHandleTransform.h>
#include <vtkm/cont/Invoker.h>
#include <vtkm/cont/ParticleArrayCopy.h>
#include <vtkm/worklet/WorkletMapField.h>
@ -22,14 +25,17 @@ namespace cont
namespace detail
{
struct CopyParticlePositionWorklet : public vtkm::worklet::WorkletMapField
{
using ControlSignature = void(FieldIn inParticle, FieldOut outPos);
VTKM_EXEC void operator()(const vtkm::Particle& inParticle, vtkm::Vec3f& outPos) const
{
outPos = inParticle.Pos;
}
struct ExtractPositionFunctor
{
VTKM_EXEC_CONT
vtkm::Vec3f operator()(const vtkm::Massless& p) const { return p.Pos; }
};
struct ExtractTerminatedFunctor
{
VTKM_EXEC_CONT
bool operator()(const vtkm::Massless& p) const { return p.Status.CheckTerminate(); }
};
struct CopyParticleAllWorklet : public vtkm::worklet::WorkletMapField
@ -62,12 +68,18 @@ struct CopyParticleAllWorklet : public vtkm::worklet::WorkletMapField
template <typename ParticleType>
VTKM_ALWAYS_EXPORT inline void ParticleArrayCopy(
const vtkm::cont::ArrayHandle<ParticleType, vtkm::cont::StorageTagBasic>& inP,
vtkm::cont::ArrayHandle<vtkm::Vec3f, vtkm::cont::StorageTagBasic>& outPos)
vtkm::cont::ArrayHandle<vtkm::Vec3f, vtkm::cont::StorageTagBasic>& outPos,
bool CopyTerminatedOnly)
{
vtkm::cont::Invoker invoke;
detail::CopyParticlePositionWorklet worklet;
auto posTrn = vtkm::cont::make_ArrayHandleTransform(inP, detail::ExtractPositionFunctor());
invoke(worklet, inP, outPos);
if (CopyTerminatedOnly)
{
auto termTrn = vtkm::cont::make_ArrayHandleTransform(inP, detail::ExtractTerminatedFunctor());
vtkm::cont::Algorithm::CopyIf(posTrn, termTrn, outPos);
}
else
vtkm::cont::ArrayCopy(posTrn, outPos);
}
/// \brief Copy all fields in vtkm::Particle to standard types.

@ -0,0 +1,187 @@
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
//
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//============================================================================
#ifndef vtk_m_filter_BoundsMap_h
#define vtk_m_filter_BoundsMap_h
#include <vtkm/Bounds.h>
#include <vtkm/cont/AssignerPartitionedDataSet.h>
#include <vtkm/cont/DataSet.h>
#include <vtkm/cont/EnvironmentTracker.h>
#include <vtkm/cont/Field.h>
#include <vtkm/cont/PartitionedDataSet.h>
#include <vtkm/thirdparty/diy/diy.h>
#ifdef VTKM_ENABLE_MPI
#include <mpi.h>
#include <vtkm/thirdparty/diy/mpi-cast.h>
#endif
#include <vector>
namespace vtkm
{
namespace filter
{
class VTKM_ALWAYS_EXPORT BoundsMap
{
public:
BoundsMap()
: LocalNumBlocks(0)
, TotalNumBlocks(0)
{
}
BoundsMap(const vtkm::cont::DataSet& dataSet)
: LocalNumBlocks(1)
, TotalNumBlocks(0)
{
this->Init({ dataSet });
}
BoundsMap(const std::vector<vtkm::cont::DataSet>& dataSets)
: LocalNumBlocks(static_cast<vtkm::Id>(dataSets.size()))
, TotalNumBlocks(0)
{
this->Init(dataSets);
}
BoundsMap(const vtkm::cont::PartitionedDataSet& pds)
: LocalNumBlocks(pds.GetNumberOfPartitions())
, TotalNumBlocks(0)
{
this->Init(pds.GetPartitions());
}
vtkm::Id GetLocalBlockId(vtkm::Id idx) const
{
VTKM_ASSERT(idx >= 0 && idx < this->LocalNumBlocks);
return this->LocalIDs[static_cast<std::size_t>(idx)];
}
int FindRank(vtkm::Id blockId) const
{
auto it = BlockToRankMap.find(blockId);
if (it == BlockToRankMap.end())
return -1;
return it->second;
}
std::vector<vtkm::Id> FindBlocks(const vtkm::Vec3f& p) const { return this->FindBlocks(p, -1); }
std::vector<vtkm::Id> FindBlocks(const vtkm::Vec3f& p, const std::vector<vtkm::Id>& oldIDs) const
{
vtkm::Id ignoreID = (oldIDs.empty() ? -1 : oldIDs[0]);
return FindBlocks(p, ignoreID);
}
std::vector<vtkm::Id> FindBlocks(const vtkm::Vec3f& p, vtkm::Id ignoreBlock) const
{
std::vector<vtkm::Id> blockIDs;
if (this->GlobalBounds.Contains(p))
{
vtkm::Id blockId = 0;
for (auto& it : this->BlockBounds)
{
if (blockId != ignoreBlock && it.Contains(p))
blockIDs.push_back(blockId);
blockId++;
}
}
return blockIDs;
}
private:
void Init(const std::vector<vtkm::cont::DataSet>& dataSets)
{
vtkm::cont::AssignerPartitionedDataSet assigner(this->LocalNumBlocks);
this->TotalNumBlocks = assigner.nblocks();
std::vector<int> ids;
vtkmdiy::mpi::communicator Comm = vtkm::cont::EnvironmentTracker::GetCommunicator();
assigner.local_gids(Comm.rank(), ids);
for (const auto& i : ids)
this->LocalIDs.push_back(static_cast<vtkm::Id>(i));
for (vtkm::Id id = 0; id < this->TotalNumBlocks; id++)
this->BlockToRankMap[id] = assigner.rank(static_cast<int>(id));
this->Build(dataSets);
}
void Build(const std::vector<vtkm::cont::DataSet>& dataSets)
{
std::vector<vtkm::Float64> vals(static_cast<std::size_t>(this->TotalNumBlocks * 6), 0);
std::vector<vtkm::Float64> vals2(vals.size());
for (std::size_t i = 0; i < this->LocalIDs.size(); i++)
{
const vtkm::cont::DataSet& ds = dataSets[static_cast<std::size_t>(i)];
vtkm::Bounds bounds = ds.GetCoordinateSystem().GetBounds();
std::size_t idx = static_cast<std::size_t>(this->LocalIDs[i] * 6);
vals[idx++] = bounds.X.Min;
vals[idx++] = bounds.X.Max;
vals[idx++] = bounds.Y.Min;
vals[idx++] = bounds.Y.Max;
vals[idx++] = bounds.Z.Min;
vals[idx++] = bounds.Z.Max;
}
#ifdef VTKM_ENABLE_MPI
vtkmdiy::mpi::communicator Comm = vtkm::cont::EnvironmentTracker::GetCommunicator();
MPI_Comm mpiComm = vtkmdiy::mpi::mpi_cast(Comm.handle());
MPI_Allreduce(vals.data(), vals2.data(), vals.size(), MPI_DOUBLE, MPI_SUM, mpiComm);
#else
vals2 = vals;
#endif
this->BlockBounds.resize(static_cast<std::size_t>(this->TotalNumBlocks));
this->GlobalBounds = vtkm::Bounds();
std::size_t idx = 0;
for (auto& block : this->BlockBounds)
{
block = vtkm::Bounds(vals2[idx + 0],
vals2[idx + 1],
vals2[idx + 2],
vals2[idx + 3],
vals2[idx + 4],
vals2[idx + 5]);
this->GlobalBounds.Include(block);
idx += 6;
}
/*
if (rank == 0)
{
std::cout << "GlobalBounds: " << this->GlobalBounds << std::endl;
int bid = 0;
for (auto& it : this->BlockBounds)
{
std::cout << " " << bid << " " << it << " owner: " << this->FindRank(bid) << std::endl;
bid++;
}
}
*/
}
vtkm::Id LocalNumBlocks;
std::vector<vtkm::Id> LocalIDs;
std::map<vtkm::Id, vtkm::Int32> BlockToRankMap;
vtkm::Id TotalNumBlocks;
std::vector<vtkm::Bounds> BlockBounds;
vtkm::Bounds GlobalBounds;
};
}
} // namespace vtkm::filter
#endif

@ -0,0 +1,24 @@
##============================================================================
## Copyright (c) Kitware, Inc.
## All rights reserved.
## See LICENSE.txt for details.
##
## This software is distributed WITHOUT ANY WARRANTY; without even
## the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
## PURPOSE. See the above copyright notice for more information.
##============================================================================
set(headers
BoundsMap.h
DataSetIntegrator.h
Logger.h
MemStream.h
Messenger.h
ParticleMessenger.h
ParticleAdvector.h
ParticleAdvector.hxx
StreamUtil.h
)
#-----------------------------------------------------------------------------
vtkm_declare_headers(${headers})

@ -0,0 +1,105 @@
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
//
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//============================================================================
#ifndef vtk_m_filter_DataSetIntegrator_h
#define vtk_m_filter_DataSetIntegrator_h
#include <vtkm/cont/ArrayHandleTransform.h>
#include <vtkm/cont/DataSet.h>
#include <memory>
#include <vector>
namespace vtkm
{
namespace filter
{
class VTKM_ALWAYS_EXPORT DataSetIntegrator
{
using FieldHandle = vtkm::cont::ArrayHandle<vtkm::Vec3f>;
using FieldType = vtkm::worklet::particleadvection::VelocityField<FieldHandle>;
using GridEvalType = vtkm::worklet::particleadvection::GridEvaluator<FieldType>;
using RK4Type = vtkm::worklet::particleadvection::RK4Integrator<GridEvalType>;
public:
DataSetIntegrator(const vtkm::cont::DataSet& ds, vtkm::Id id, std::string fieldNm)
: ActiveField(ds.GetField(fieldNm))
, Eval(nullptr)
, ID(id)
{
const vtkm::cont::DynamicCellSet& cells = ds.GetCellSet();
const vtkm::cont::CoordinateSystem& coords = ds.GetCoordinateSystem();
auto fieldData = this->ActiveField.GetData();
FieldHandle fieldArray;
if (fieldData.IsType<FieldHandle>())
fieldArray = fieldData.Cast<FieldHandle>();
else
vtkm::cont::ArrayCopy(fieldData.ResetTypes<vtkm::TypeListFieldVec3>(), fieldArray);
this->Eval = std::shared_ptr<GridEvalType>(new GridEvalType(coords, cells, fieldArray));
}
vtkm::Id GetID() const { return this->ID; }
template <typename ResultType>
void Advect(std::vector<vtkm::Massless>& v,
vtkm::FloatDefault stepSize,
vtkm::Id maxSteps,
ResultType& result) const
{
auto seedArray = vtkm::cont::make_ArrayHandle(v, vtkm::CopyFlag::Off);
RK4Type rk4(*this->Eval, stepSize);
this->DoAdvect(seedArray, rk4, maxSteps, result);
}
private:
template <typename ResultType>
inline void DoAdvect(vtkm::cont::ArrayHandle<vtkm::Massless>& seeds,
const RK4Type& rk4,
vtkm::Id maxSteps,
ResultType& result) const;
vtkm::cont::Field ActiveField;
std::shared_ptr<GridEvalType> Eval;
vtkm::Id ID;
};
//-----
// Specialization for ParticleAdvection worklet
template <>
inline void DataSetIntegrator::DoAdvect(
vtkm::cont::ArrayHandle<vtkm::Massless>& seeds,
const RK4Type& rk4,
vtkm::Id maxSteps,
vtkm::worklet::ParticleAdvectionResult<vtkm::Massless>& result) const
{
vtkm::worklet::ParticleAdvection Worklet;
result = Worklet.Run(rk4, seeds, maxSteps);
}
//-----
// Specialization for Streamline worklet
template <>
inline void DataSetIntegrator::DoAdvect(
vtkm::cont::ArrayHandle<vtkm::Massless>& seeds,
const RK4Type& rk4,
vtkm::Id maxSteps,
vtkm::worklet::StreamlineResult<vtkm::Massless>& result) const
{
vtkm::worklet::Streamline Worklet;
result = Worklet.Run(rk4, seeds, maxSteps);
}
}
} // namespace vtkm::filter
#endif

@ -0,0 +1,44 @@
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
//
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//============================================================================
#include <vtkm/cont/EnvironmentTracker.h>
#include <vtkm/filter/particleadvection/Logger.h>
#include <vtkm/thirdparty/diy/diy.h>
namespace vtkm
{
namespace filter
{
std::map<std::string, vtkm::filter::Logger*> vtkm::filter::Logger::Loggers;
Logger::Logger(const std::string& name)
{
std::stringstream logName;
logName << name;
#ifdef VTKM_ENABLE_MPI
vtkmdiy::mpi::communicator Comm = vtkm::cont::EnvironmentTracker::GetCommunicator();
logName << "." << Comm.rank();
#endif
logName << ".log";
this->Stream.open(logName.str().c_str(), std::ofstream::out);
if (!this->Stream.is_open())
std::cout << "Warning: could not open the vtkh log file\n";
}
Logger::~Logger()
{
if (this->Stream.is_open())
this->Stream.close();
}
}
} // vtkm::filter

@ -0,0 +1,47 @@
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
//
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//============================================================================
#ifndef vtk_m_filter_Logger_h
#define vtk_m_filter_Logger_h
#include <iostream>
#include <map>
#include <string>
#include <vtkm/filter/particleadvection/Logger.h>
namespace vtkm
{
namespace filter
{
class Logger
{
public:
static Logger* GetInstance(const std::string& name)
{
if (Logger::Loggers.find(name) == Logger::Loggers.end())
Logger::Loggers[name] = new vtkm::filter::Logger(name);
return Logger::Loggers[name];
}
~Logger();
std::ofstream& GetStream() { return Stream; }
protected:
Logger(const std::string& name);
std::ofstream Stream;
static std::map<std::string, Logger*> Loggers;
};
}
}; // namespace vtkm::filter
#endif

@ -0,0 +1,85 @@
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
//
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//============================================================================
#include <vtkm/filter/particleadvection/MemStream.h>
namespace vtkm
{
namespace filter
{
MemStream::MemStream(std::size_t sz0)
: Data(NULL)
, Len(0)
, MaxLen(0)
, Pos(0)
{
CheckSize(sz0);
}
MemStream::MemStream(std::size_t sz, const unsigned char* buff)
: Data(NULL)
, Len(sz)
, MaxLen(sz)
, Pos(0)
{
this->Data = new unsigned char[this->Len];
std::memcpy(this->Data, buff, this->Len);
}
MemStream::MemStream(const MemStream& s)
{
this->Pos = 0;
this->Len = s.GetLen();
this->MaxLen = this->Len;
this->Data = new unsigned char[this->Len];
std::memcpy(this->Data, s.GetData(), this->Len);
}
MemStream::~MemStream()
{
this->ClearMemStream();
}
void MemStream::ClearMemStream()
{
if (this->Data)
{
delete[] this->Data;
this->Data = NULL;
}
this->Pos = 0;
this->Len = 0;
this->MaxLen = 0;
}
void MemStream::CheckSize(std::size_t sz)
{
std::size_t reqLen = this->Pos + sz;
if (reqLen > this->MaxLen)
{
std::size_t newLen = 2 * this->MaxLen; // double current size.
if (newLen < reqLen)
newLen = reqLen;
unsigned char* newData = new unsigned char[newLen];
if (this->Data)
{
std::memcpy(newData, this->Data, this->Len); // copy existing data to new buffer.
delete[] this->Data;
}
this->Data = newData;
this->MaxLen = newLen;
}
}
}
} // namespace vtkm::filter

@ -0,0 +1,210 @@
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
//
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//============================================================================
#ifndef vtk_m_filter_MemStream_h
#define vtk_m_filter_MemStream_h
#include <vtkm/filter/vtkm_filter_export.h>
#include <cstring>
#include <iostream>
#include <list>
#include <string>
#include <vector>
namespace vtkm
{
namespace filter
{
namespace detail
{
}
class MemStream
{
public:
MemStream(std::size_t sz0 = 32);
MemStream(std::size_t sz, const unsigned char* buff);
MemStream(const MemStream& s);
~MemStream();
void Rewind() { this->Pos = 0; }
std::size_t GetPos() const { return this->Pos; }
void SetPos(std::size_t p);
std::size_t GetLen() const { return this->Len; }
std::size_t GetCapacity() const { return this->MaxLen; }
unsigned char* GetData() const { return this->Data; }
//Read from buffer.
void ReadBinary(unsigned char* buff, const std::size_t& size);
//Write to buffer.
void WriteBinary(const unsigned char* buff, std::size_t size);
void ClearMemStream();
private:
// data members
unsigned char* Data;
std::size_t Len;
std::size_t MaxLen;
std::size_t Pos;
void CheckSize(std::size_t sz);
friend std::ostream& operator<<(std::ostream& out, const MemStream& m)
{
out << " MemStream(p= " << m.GetPos() << ", l= " << m.GetLen() << "[" << m.GetCapacity()
<< "]) data=[";
/*
for (std::size_t i=0; i < m.GetLen(); i++)
out<<(int)(m.Data[i])<<" ";
*/
out << "]";
return out;
}
};
inline void MemStream::ReadBinary(unsigned char* buff, const std::size_t& size)
{
std::size_t nBytes = sizeof(unsigned char) * size;
std::memcpy(buff, &this->Data[this->Pos], nBytes);
this->Pos += nBytes;
}
inline void MemStream::WriteBinary(const unsigned char* buff, std::size_t size)
{
std::size_t nBytes = sizeof(unsigned char) * size;
this->CheckSize(nBytes);
std::memcpy(&this->Data[this->Pos], buff, nBytes);
this->Pos += nBytes;
if (this->Pos > this->Len)
this->Len = this->Pos;
}
inline void MemStream::SetPos(std::size_t p)
{
this->Pos = p;
if (this->Pos > this->GetLen())
throw "MemStream::setPos failed";
}
template <typename T>
struct Serialization
{
#if (defined(__clang__) && !defined(__ppc64__)) || (defined(__GNUC__) && __GNUC__ >= 5)
static_assert(std::is_trivially_copyable<T>::value,
"Default serialization works only for trivially copyable types");
#endif
static void write(MemStream& memstream, const T& data)
{
memstream.WriteBinary((const unsigned char*)&data, sizeof(T));
}
static void read(MemStream& memstream, T& data)
{
memstream.ReadBinary((unsigned char*)&data, sizeof(T));
}
};
template <typename T>
static void write(MemStream& memstream, const T& data)
{
Serialization<T>::write(memstream, data);
}
template <typename T>
static void read(MemStream& memstream, T& data)
{
Serialization<T>::read(memstream, data);
}
template <class T>
struct Serialization<std::vector<T>>
{
static void write(MemStream& memstream, const std::vector<T>& data)
{
const std::size_t sz = data.size();
vtkm::filter::write(memstream, sz);
for (std::size_t i = 0; i < sz; i++)
vtkm::filter::write(memstream, data[i]);
}
static void read(MemStream& memstream, std::vector<T>& data)
{
std::size_t sz;
vtkm::filter::read(memstream, sz);
data.resize(sz);
for (std::size_t i = 0; i < sz; i++)
vtkm::filter::read(memstream, data[i]);
}
};
template <class T>
struct Serialization<std::list<T>>
{
static void write(MemStream& memstream, const std::list<T>& data)
{
vtkm::filter::write(memstream, data.size());
typename std::list<T>::const_iterator it;
for (it = data.begin(); it != data.end(); it++)
vtkm::filter::write(memstream, *it);
}
static void read(MemStream& memstream, std::list<T>& data)
{
std::size_t sz;
vtkm::filter::read(memstream, sz);
for (std::size_t i = 0; i < sz; i++)
{
T v;
vtkm::filter::read(memstream, v);
data.push_back(v);
}
}
};
template <class T, class U>
struct Serialization<std::pair<T, U>>
{
static void write(MemStream& memstream, const std::pair<T, U>& data)
{
vtkm::filter::write(memstream, data.first);
vtkm::filter::write(memstream, data.second);
}
static void read(MemStream& memstream, std::pair<T, U>& data)
{
vtkm::filter::read(memstream, data.first);
vtkm::filter::read(memstream, data.second);
}
};
//template<>
//struct Serialization<std::string>
//{
// static void write(MemStream &memstream, const std::string &data)
// {
// std::size_t sz = data.size();
// memstream.write(sz);
// memstream.write(data.data(), sz);
// }
//
// static void read(MemStream &memstream, std::string &data)
// {
// std::size_t sz;
// memstream.read(sz);
// data.resize(sz);
// memstream.read(&data[0], sz);
// }
//};
}
} // namespace vtkm::filter
#endif

@ -0,0 +1,396 @@
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
//
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//============================================================================
#include <iostream>
#include <sstream>
#include <string.h>
#include <vtkm/filter/particleadvection/Messenger.h>
#ifdef VTKM_ENABLE_MPI
#include <vtkm/thirdparty/diy/mpi-cast.h>
#endif
namespace vtkm
{
namespace filter
{
VTKM_CONT
#ifdef VTKM_ENABLE_MPI
Messenger::Messenger(vtkmdiy::mpi::communicator& comm)
: MPIComm(vtkmdiy::mpi::mpi_cast(comm.handle()))
, NumRanks(comm.size())
, Rank(comm.rank())
#else
Messenger::Messenger(vtkmdiy::mpi::communicator& vtkmNotUsed(comm))
#endif
{
}
#ifdef VTKM_ENABLE_MPI
VTKM_CONT
void Messenger::RegisterTag(int tag, int num_recvs, int size)
{
if (this->MessageTagInfo.find(tag) != this->MessageTagInfo.end() || tag == TAG_ANY)
{
std::stringstream msg;
msg << "Invalid message tag: " << tag << std::endl;
throw msg.str();
}
this->MessageTagInfo[tag] = std::pair<int, int>(num_recvs, size);
}
int Messenger::CalcMessageBufferSize(int msgSz)
{
MemStream buff;
int rank = 0;
std::vector<int> m(static_cast<std::size_t>(msgSz));
vtkm::filter::write(buff, rank);
vtkm::filter::write(buff, m);
return static_cast<int>(buff.GetLen());
}
void Messenger::InitializeBuffers()
{
//Setup receive buffers.
std::map<int, std::pair<int, int>>::const_iterator it;
for (it = this->MessageTagInfo.begin(); it != this->MessageTagInfo.end(); it++)
{
int tag = it->first, num = it->second.first;
for (int i = 0; i < num; i++)
PostRecv(tag);
}
}
void Messenger::CleanupRequests(int tag)
{
std::vector<RequestTagPair> delKeys;
for (auto i = this->RecvBuffers.begin(); i != this->RecvBuffers.end(); i++)
{
if (tag == TAG_ANY || tag == i->first.second)
delKeys.push_back(i->first);
}
if (!delKeys.empty())
{
std::vector<RequestTagPair>::const_iterator it;
for (it = delKeys.begin(); it != delKeys.end(); it++)
{
RequestTagPair v = *it;
unsigned char* buff = this->RecvBuffers[v];
MPI_Cancel(&(v.first));
delete[] buff;
this->RecvBuffers.erase(v);
}
}
}
void Messenger::PostRecv(int tag)
{
auto it = this->MessageTagInfo.find(tag);
if (it != this->MessageTagInfo.end())
PostRecv(tag, it->second.second);
}
void Messenger::PostRecv(int tag, int sz, int src)
{
sz += sizeof(Messenger::Header);
unsigned char* buff = new unsigned char[sz];
memset(buff, 0, sz);
MPI_Request req;
if (src == -1)
MPI_Irecv(buff, sz, MPI_BYTE, MPI_ANY_SOURCE, tag, this->MPIComm, &req);
else
MPI_Irecv(buff, sz, MPI_BYTE, src, tag, this->MPIComm, &req);
RequestTagPair entry(req, tag);
this->RecvBuffers[entry] = buff;
}
void Messenger::CheckPendingSendRequests()
{
std::vector<MPI_Request> req, copy;
std::vector<int> tags;
for (auto it = this->SendBuffers.begin(); it != this->SendBuffers.end(); it++)
{
req.push_back(it->first.first);
copy.push_back(it->first.first);
tags.push_back(it->first.second);
}
if (req.empty())
return;
//See if any sends are done.
int num = 0, *indices = new int[req.size()];
MPI_Status* status = new MPI_Status[req.size()];
int err = MPI_Testsome(req.size(), &req[0], &num, indices, status);
if (err != MPI_SUCCESS)
{
std::cerr << "Err with MPI_Testsome in PARIC algorithm" << std::endl;
}
for (int i = 0; i < num; i++)
{
MPI_Request r = copy[indices[i]];
int tag = tags[indices[i]];
RequestTagPair k(r, tag);
auto entry = this->SendBuffers.find(k);
if (entry != this->SendBuffers.end())
{
delete[] entry->second;
this->SendBuffers.erase(entry);
}
}
delete[] indices;
delete[] status;
}
bool Messenger::PacketCompare(const unsigned char* a, const unsigned char* b)
{
Messenger::Header ha, hb;
memcpy(&ha, a, sizeof(ha));
memcpy(&hb, b, sizeof(hb));
return ha.packet < hb.packet;
}
void Messenger::PrepareForSend(int tag, MemStream* buff, std::vector<unsigned char*>& buffList)
{
auto it = this->MessageTagInfo.find(tag);
if (it == this->MessageTagInfo.end())
{
std::stringstream msg;
msg << "Message tag not found: " << tag << std::endl;
throw msg.str();
}
int bytesLeft = buff->GetLen();
int maxDataLen = it->second.second;
Messenger::Header header;
header.tag = tag;
header.rank = this->Rank;
header.id = this->MsgID;
header.numPackets = 1;
if (buff->GetLen() > (unsigned int)maxDataLen)
header.numPackets += buff->GetLen() / maxDataLen;
header.packet = 0;
header.packetSz = 0;
header.dataSz = 0;
this->MsgID++;
buffList.resize(header.numPackets);
size_t pos = 0;
for (int i = 0; i < header.numPackets; i++)
{
header.packet = i;
if (i == (header.numPackets - 1))
header.dataSz = bytesLeft;
else
header.dataSz = maxDataLen;
header.packetSz = header.dataSz + sizeof(header);
unsigned char* b = new unsigned char[header.packetSz];
//Write the header.
unsigned char* bPtr = b;
memcpy(bPtr, &header, sizeof(header));
bPtr += sizeof(header);
//Write the data.
memcpy(bPtr, &buff->GetData()[pos], header.dataSz);
pos += header.dataSz;
buffList[i] = b;
bytesLeft -= maxDataLen;
}
}
void Messenger::SendData(int dst, int tag, MemStream* buff)
{
std::vector<unsigned char*> bufferList;
//Add headers, break into multiple buffers if needed.
PrepareForSend(tag, buff, bufferList);
Messenger::Header header;
for (size_t i = 0; i < bufferList.size(); i++)
{
memcpy(&header, bufferList[i], sizeof(header));
MPI_Request req;
int err = MPI_Isend(bufferList[i], header.packetSz, MPI_BYTE, dst, tag, this->MPIComm, &req);
if (err != MPI_SUCCESS)
{
std::cerr << "Err with MPI_Isend in SendData algorithm" << std::endl;
}
//Add it to sendBuffers
RequestTagPair entry(req, tag);
this->SendBuffers[entry] = bufferList[i];
}
delete buff;
}
bool Messenger::RecvData(int tag, std::vector<MemStream*>& buffers, bool blockAndWait)
{
std::set<int> setTag;
setTag.insert(tag);
std::vector<std::pair<int, MemStream*>> b;
buffers.resize(0);
if (RecvData(setTag, b, blockAndWait))
{
buffers.resize(b.size());
for (size_t i = 0; i < b.size(); i++)
buffers[i] = b[i].second;
return true;
}
return false;
}
bool Messenger::RecvData(std::set<int>& tags,
std::vector<std::pair<int, MemStream*>>& buffers,
bool blockAndWait)
{
buffers.resize(0);
//Find all recv of type tag.
std::vector<MPI_Request> req, copy;
std::vector<int> reqTags;
for (auto i = this->RecvBuffers.begin(); i != this->RecvBuffers.end(); i++)
{
if (tags.find(i->first.second) != tags.end())
{
req.push_back(i->first.first);
copy.push_back(i->first.first);
reqTags.push_back(i->first.second);
}
}
if (req.empty())
return false;
MPI_Status* status = new MPI_Status[req.size()];
int *indices = new int[req.size()], num = 0;
if (blockAndWait)
MPI_Waitsome(req.size(), &req[0], &num, indices, status);
else
MPI_Testsome(req.size(), &req[0], &num, indices, status);
if (num == 0)
{
delete[] status;
delete[] indices;
return false;
}
std::vector<unsigned char*> incomingBuffers(num);
for (int i = 0; i < num; i++)
{
RequestTagPair entry(copy[indices[i]], reqTags[indices[i]]);
auto it = this->RecvBuffers.find(entry);
if (it == this->RecvBuffers.end())
{
delete[] status;
delete[] indices;
throw "receive buffer not found";
}
incomingBuffers[i] = it->second;
this->RecvBuffers.erase(it);
}
ProcessReceivedBuffers(incomingBuffers, buffers);
for (int i = 0; i < num; i++)
PostRecv(reqTags[indices[i]]);
delete[] status;
delete[] indices;
return !buffers.empty();
}
void Messenger::ProcessReceivedBuffers(std::vector<unsigned char*>& incomingBuffers,
std::vector<std::pair<int, MemStream*>>& buffers)
{
for (size_t i = 0; i < incomingBuffers.size(); i++)
{
unsigned char* buff = incomingBuffers[i];
//Grab the header.
Messenger::Header header;
memcpy(&header, buff, sizeof(header));
//Only 1 packet, strip off header and add to list.
if (header.numPackets == 1)
{
MemStream* b = new MemStream(header.dataSz, (buff + sizeof(header)));
b->Rewind();
std::pair<int, MemStream*> entry(header.tag, b);
buffers.push_back(entry);
delete[] buff;
}
//Multi packet....
else
{
RankIdPair k(header.rank, header.id);
auto i2 = this->RecvPackets.find(k);
//First packet. Create a new list and add it.
if (i2 == this->RecvPackets.end())
{
std::list<unsigned char*> l;
l.push_back(buff);
this->RecvPackets[k] = l;
}
else
{
i2->second.push_back(buff);
// The last packet came in, merge into one MemStream.
if (i2->second.size() == (size_t)header.numPackets)
{
//Sort the packets into proper order.
i2->second.sort(Messenger::PacketCompare);
MemStream* mergedBuff = new MemStream;
std::list<unsigned char*>::iterator listIt;
for (listIt = i2->second.begin(); listIt != i2->second.end(); listIt++)
{
unsigned char* bi = *listIt;
Messenger::Header header2;
memcpy(&header2, bi, sizeof(header2));
mergedBuff->WriteBinary((bi + sizeof(header2)), header2.dataSz);
delete[] bi;
}
mergedBuff->Rewind();
std::pair<int, MemStream*> entry(header.tag, mergedBuff);
buffers.push_back(entry);
this->RecvPackets.erase(i2);
}
}
}
}
}
#endif
}
} // namespace vtkm::filter

@ -0,0 +1,99 @@
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
//
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//============================================================================
#ifndef vtk_m_filter_Messenger_h
#define vtk_m_filter_Messenger_h
#include <vtkm/Types.h>
#include <vtkm/filter/particleadvection/MemStream.h>
#include <vtkm/thirdparty/diy/diy.h>
#include <list>
#include <map>
#include <set>
#include <vector>
#ifdef VTKM_ENABLE_MPI
#include <mpi.h>
#endif
namespace vtkm
{
namespace filter
{
class VTKM_ALWAYS_EXPORT Messenger
{
public:
VTKM_CONT Messenger(vtkmdiy::mpi::communicator& comm);
VTKM_CONT virtual ~Messenger()
{
#ifdef VTKM_ENABLE_MPI
this->CleanupRequests();
#endif
}
#ifdef VTKM_ENABLE_MPI
VTKM_CONT void RegisterTag(int tag, int numRecvs, int size);
protected:
void InitializeBuffers();
void CleanupRequests(int tag = TAG_ANY);
void CheckPendingSendRequests();
void PostRecv(int tag);
void PostRecv(int tag, int sz, int src = -1);
void SendData(int dst, int tag, MemStream* buff);
bool RecvData(std::set<int>& tags,
std::vector<std::pair<int, MemStream*>>& buffers,
bool blockAndWait = false);
//Message headers.
typedef struct
{
int rank, id, tag, numPackets, packet, packetSz, dataSz;
} Header;
bool RecvData(int tag, std::vector<MemStream*>& buffers, bool blockAndWait = false);
void AddHeader(MemStream* buff);
void RemoveHeader(MemStream* input, MemStream* header, MemStream* buff);
template <typename P>
bool DoSendICs(int dst, std::vector<P>& ics);
void PrepareForSend(int tag, MemStream* buff, std::vector<unsigned char*>& buffList);
static bool PacketCompare(const unsigned char* a, const unsigned char* b);
void ProcessReceivedBuffers(std::vector<unsigned char*>& incomingBuffers,
std::vector<std::pair<int, MemStream*>>& buffers);
// Send/Recv buffer management structures.
using RequestTagPair = std::pair<MPI_Request, int>;
using RankIdPair = std::pair<int, int>;
//Member data
std::map<int, std::pair<int, int>> MessageTagInfo;
MPI_Comm MPIComm;
vtkm::Id MsgID;
int NumRanks;
int Rank;
std::map<RequestTagPair, unsigned char*> RecvBuffers;
std::map<RankIdPair, std::list<unsigned char*>> RecvPackets;
std::map<RequestTagPair, unsigned char*> SendBuffers;
static constexpr int TAG_ANY = -1;
#else
static constexpr int NumRanks = 1;
static constexpr int Rank = 0;
#endif
static int CalcMessageBufferSize(int msgSz);
};
}
} // namespace vtkm::filter
#endif

@ -0,0 +1,367 @@
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
//
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//============================================================================
#ifndef vtk_m_filter_ParticleAdvector_h
#define vtk_m_filter_ParticleAdvector_h
#include <vtkm/cont/Algorithm.h>
#include <vtkm/cont/ArrayCopy.h>
#include <vtkm/cont/ArrayHandleTransform.h>
#include <vtkm/cont/DataSet.h>
#include <vtkm/cont/ParticleArrayCopy.h>
#include <vtkm/filter/particleadvection/DataSetIntegrator.h>
#include <vtkm/filter/particleadvection/ParticleMessenger.h>
#include <map>
#include <vector>
namespace vtkm
{
namespace filter
{
//
// Base class for particle advector
//
template <typename ResultType>
class VTKM_ALWAYS_EXPORT ParticleAdvectorBase
{
public:
ParticleAdvectorBase(const vtkm::filter::BoundsMap& bm,
const std::vector<vtkm::filter::DataSetIntegrator>& blocks)
: Blocks(blocks)
, BoundsMap(bm)
, NumberOfSteps(0)
, NumRanks(this->Comm.size())
, Rank(this->Comm.rank())
, StepSize(0)
{
}
void SetStepSize(vtkm::FloatDefault stepSize) { this->StepSize = stepSize; }
void SetNumberOfSteps(vtkm::Id numSteps) { this->NumberOfSteps = numSteps; }
virtual void SetSeeds(const vtkm::cont::ArrayHandle<vtkm::Massless>& seeds) = 0;
virtual void Go() = 0;
virtual vtkm::cont::PartitionedDataSet GetOutput() = 0;
protected:
virtual bool GetActiveParticles(std::vector<vtkm::Massless>& particles) = 0;
inline vtkm::Id ComputeTotalNumParticles(vtkm::Id numLocal) const;
inline const vtkm::filter::DataSetIntegrator& GetDataSet(vtkm::Id id) const;
virtual void StoreResult(const ResultType& vtkmNotUsed(res), vtkm::Id vtkmNotUsed(blockId)) {}
inline void UpdateResult(const ResultType& res,
vtkm::Id blockId,
std::vector<vtkm::Massless>& I,
std::vector<vtkm::Massless>& T,
std::vector<vtkm::Massless>& A);
std::vector<vtkm::filter::DataSetIntegrator> Blocks;
vtkm::filter::BoundsMap BoundsMap;
vtkmdiy::mpi::communicator Comm = vtkm::cont::EnvironmentTracker::GetCommunicator();
vtkm::Id NumberOfSteps;
vtkm::Id NumRanks;
vtkm::Id Rank;
vtkm::FloatDefault StepSize;
std::map<vtkm::Id, std::vector<vtkm::Id>> ParticleBlockIDsMap;
};
//
// Base algorithm for particle advection.
// This is a single-threaded algorithm.
//
template <typename ResultType>
class VTKM_ALWAYS_EXPORT PABaseAlgorithm : public ParticleAdvectorBase<ResultType>
{
public:
PABaseAlgorithm(const vtkm::filter::BoundsMap& bm,
const std::vector<vtkm::filter::DataSetIntegrator>& blocks)
: ParticleAdvectorBase<ResultType>(bm, blocks)
{
}
void SetSeeds(const vtkm::cont::ArrayHandle<vtkm::Massless>& seeds) override
{
this->ParticleBlockIDsMap.clear();
vtkm::Id n = seeds.GetNumberOfValues();
auto portal = seeds.ReadPortal();
for (vtkm::Id i = 0; i < n; i++)
{
vtkm::Massless p = portal.Get(i);
std::vector<vtkm::Id> blockIDs = this->BoundsMap.FindBlocks(p.Pos);
if (!blockIDs.empty() && this->BoundsMap.FindRank(blockIDs[0]) == this->Rank)
{
this->Active.push_back(p);
this->ParticleBlockIDsMap[p.ID] = blockIDs;
}
}
}
void Go() override
{
vtkm::filter::ParticleMessenger messenger(this->Comm, this->BoundsMap, 1, 128);
vtkm::Id nLocal = static_cast<vtkm::Id>(this->Active.size() + this->Inactive.size());
vtkm::Id totalNumSeeds = this->ComputeTotalNumParticles(nLocal);
vtkm::Id N = 0;
while (N < totalNumSeeds)
{
std::vector<vtkm::Massless> v, I, T, A;
vtkm::Id blockId = -1;
if (GetActiveParticles(v))
{
blockId = this->ParticleBlockIDsMap[v[0].ID][0];
auto& block = this->GetDataSet(blockId);
std::cout << "block_" << blockId << ".Advect() " << v.size() << std::endl;
ResultType r;
block.Advect(v, this->StepSize, this->NumberOfSteps, r);
auto portal = r.Particles.ReadPortal();
for (int i = 0; i < r.Particles.GetNumberOfValues(); i++)
{
std::cout << " " << i << ": " << portal.Get(i).ID
<< " Status= " << portal.Get(i).Status << std::endl;
}
this->UpdateResult(r, blockId, I, T, A);
if (!A.empty())
this->Active.insert(this->Active.end(), A.begin(), A.end());
}
std::vector<vtkm::Massless> incoming;
std::map<vtkm::Id, std::vector<vtkm::Id>> incomingBlockIDsMap;
vtkm::Id myTerm = static_cast<vtkm::Id>(T.size());
vtkm::Id numTermMessages = 0;
messenger.Exchange(
I, this->ParticleBlockIDsMap, myTerm, incoming, incomingBlockIDsMap, numTermMessages);
if (!incoming.empty())
{
this->Active.insert(this->Active.end(), incoming.begin(), incoming.end());
for (const auto& it : incomingBlockIDsMap)
this->ParticleBlockIDsMap[it.first] = it.second;
}
if (!T.empty())
{
std::cout << "Terminations: " << T.size() << " in block " << blockId << std::endl;
auto& it = this->Terminated[blockId];
it.insert(it.end(), T.begin(), T.end());
}
N += (myTerm + numTermMessages);
if (N > totalNumSeeds)
throw "Particle count error";
}
std::cout << "Done: " << this->Rank << " Terminated= " << this->Terminated.size() << std::endl;
}
protected:
bool GetActiveParticles(std::vector<vtkm::Massless>& particles) override
{
particles.clear();
if (this->Active.empty())
return false;
vtkm::Id workingBlockID = this->ParticleBlockIDsMap[this->Active.front().ID][0];
auto it = this->Active.begin();
while (it != this->Active.end())
{
auto p = *it;
if (workingBlockID == this->ParticleBlockIDsMap[p.ID][0])
{
particles.push_back(p);
it = this->Active.erase(it);
}
else
it++;
}
return !particles.empty();
}
protected:
std::vector<vtkm::Massless> Active;
std::vector<vtkm::Massless> Inactive;
std::map<vtkm::Id, std::vector<vtkm::Massless>> Terminated;
};
class VTKM_ALWAYS_EXPORT ParticleAdvectionAlgorithm
: public PABaseAlgorithm<vtkm::worklet::ParticleAdvectionResult<vtkm::Massless>>
{
public:
ParticleAdvectionAlgorithm(const vtkm::filter::BoundsMap& bm,
const std::vector<vtkm::filter::DataSetIntegrator>& blocks)
: PABaseAlgorithm<vtkm::worklet::ParticleAdvectionResult<vtkm::Massless>>(bm, blocks)
{
}
vtkm::cont::PartitionedDataSet GetOutput() override
{
vtkm::cont::PartitionedDataSet output;
for (const auto& it : this->Terminated)
{
if (it.second.empty())
continue;
auto particles = vtkm::cont::make_ArrayHandle(it.second, vtkm::CopyFlag::Off);
vtkm::cont::ArrayHandle<vtkm::Vec3f> pos;
vtkm::cont::ParticleArrayCopy(particles, pos);
vtkm::cont::DataSet ds;
vtkm::cont::CoordinateSystem outCoords("coordinates", pos);
ds.AddCoordinateSystem(outCoords);
//Create vertex cell set
vtkm::Id numPoints = pos.GetNumberOfValues();
vtkm::cont::CellSetSingleType<> cells;
vtkm::cont::ArrayHandleIndex conn(numPoints);
vtkm::cont::ArrayHandle<vtkm::Id> connectivity;
vtkm::cont::ArrayCopy(conn, connectivity);
cells.Fill(numPoints, vtkm::CELL_SHAPE_VERTEX, 1, connectivity);
ds.SetCellSet(cells);
output.AppendPartition(ds);
}
return output;
}
protected:
};
class VTKM_ALWAYS_EXPORT StreamlineAlgorithm
: public PABaseAlgorithm<vtkm::worklet::StreamlineResult<vtkm::Massless>>
{
public:
StreamlineAlgorithm(const vtkm::filter::BoundsMap& bm,
const std::vector<vtkm::filter::DataSetIntegrator>& blocks)
: PABaseAlgorithm<vtkm::worklet::StreamlineResult<vtkm::Massless>>(bm, blocks)
{
}
vtkm::cont::PartitionedDataSet GetOutput() override
{
vtkm::cont::PartitionedDataSet output;
for (const auto& it : this->Results)
{
std::size_t nResults = it.second.size();
if (nResults == 0)
continue;
vtkm::cont::DataSet ds;
//Easy case with one result.
if (nResults == 1)
{
ds.AddCoordinateSystem(vtkm::cont::CoordinateSystem("coordinates", it.second[0].Positions));
ds.SetCellSet(it.second[0].PolyLines);
}
else
{
//Append all the results into one data set.
vtkm::cont::ArrayHandle<vtkm::Vec3f> appendPts;
std::vector<vtkm::Id> posOffsets(nResults);
const auto& res0 = it.second[0];
vtkm::Id totalNumCells = res0.PolyLines.GetNumberOfCells();
vtkm::Id totalNumPts = res0.Positions.GetNumberOfValues();
posOffsets[0] = 0;
for (std::size_t i = 1; i < nResults; i++)
{
const auto& res = it.second[i];
posOffsets[i] = totalNumPts;
totalNumPts += res.Positions.GetNumberOfValues();
totalNumCells += res.PolyLines.GetNumberOfCells();
}
//Append all the points together.
appendPts.Allocate(totalNumPts);
for (std::size_t i = 0; i < nResults; i++)
{
const auto& res = it.second[i];
// copy all values into appendPts starting at offset.
vtkm::cont::Algorithm::CopySubRange(
res.Positions, 0, res.Positions.GetNumberOfValues(), appendPts, posOffsets[i]);
}
vtkm::cont::CoordinateSystem outputCoords =
vtkm::cont::CoordinateSystem("coordinates", appendPts);
ds.AddCoordinateSystem(outputCoords);
//Create polylines.
std::vector<vtkm::Id> numPtsPerCell(static_cast<std::size_t>(totalNumCells));
std::size_t off = 0;
for (std::size_t i = 0; i < nResults; i++)
{
const auto& res = it.second[i];
vtkm::Id nCells = res.PolyLines.GetNumberOfCells();
for (vtkm::Id j = 0; j < nCells; j++)
numPtsPerCell[off++] = static_cast<vtkm::Id>(res.PolyLines.GetNumberOfPointsInCell(j));
}
auto numPointsPerCellArray =
vtkm::cont::make_ArrayHandle(numPtsPerCell, vtkm::CopyFlag::Off);
vtkm::cont::ArrayHandle<vtkm::Id> cellIndex;
vtkm::Id connectivityLen =
vtkm::cont::Algorithm::ScanExclusive(numPointsPerCellArray, cellIndex);
vtkm::cont::ArrayHandleCounting<vtkm::Id> connCount(0, 1, connectivityLen);
vtkm::cont::ArrayHandle<vtkm::Id> connectivity;
vtkm::cont::ArrayCopy(connCount, connectivity);
vtkm::cont::ArrayHandle<vtkm::UInt8> cellTypes;
auto polyLineShape = vtkm::cont::make_ArrayHandleConstant<vtkm::UInt8>(
vtkm::CELL_SHAPE_POLY_LINE, totalNumCells);
vtkm::cont::ArrayCopy(polyLineShape, cellTypes);
auto offsets = vtkm::cont::ConvertNumIndicesToOffsets(numPointsPerCellArray);
vtkm::cont::CellSetExplicit<> polyLines;
polyLines.Fill(totalNumPts, cellTypes, connectivity, offsets);
ds.SetCellSet(polyLines);
}
output.AppendPartition(ds);
}
return output;
}
protected:
virtual void StoreResult(const vtkm::worklet::StreamlineResult<vtkm::Massless>& res,
vtkm::Id blockId) override
{
this->Results[blockId].push_back(res);
}
std::map<vtkm::Id, std::vector<vtkm::worklet::StreamlineResult<vtkm::Massless>>> Results;
};
}
} // namespace vtkm::filter
#ifndef vtk_m_filter_ParticleAdvector_hxx
#include <vtkm/filter/particleadvection/ParticleAdvector.hxx>
#endif
#endif

@ -0,0 +1,111 @@
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
//
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//============================================================================
#ifndef vtk_m_filter_ParticleAdvector_hxx
#define vtk_m_filter_ParticleAdvector_hxx
namespace vtkm
{
namespace filter
{
template <typename ResultType>
inline vtkm::Id ParticleAdvectorBase<ResultType>::ComputeTotalNumParticles(vtkm::Id numLocal) const
{
long long totalNumParticles = static_cast<long long>(numLocal);
#ifdef VTKM_ENABLE_MPI
MPI_Comm mpiComm = vtkmdiy::mpi::mpi_cast(this->Comm.handle());
MPI_Allreduce(MPI_IN_PLACE, &totalNumParticles, 1, MPI_LONG_LONG, MPI_SUM, mpiComm);
#endif
return static_cast<vtkm::Id>(totalNumParticles);
}
template <typename ResultType>
inline const vtkm::filter::DataSetIntegrator& ParticleAdvectorBase<ResultType>::GetDataSet(
vtkm::Id id) const
{
for (const auto& it : this->Blocks)
if (it.GetID() == id)
return it;
throw vtkm::cont::ErrorFilterExecution("Bad block");
}
template <typename ResultType>
inline void ParticleAdvectorBase<ResultType>::UpdateResult(const ResultType& res,
vtkm::Id blockId,
std::vector<vtkm::Massless>& I,
std::vector<vtkm::Massless>& T,
std::vector<vtkm::Massless>& A)
{
vtkm::Id n = res.Particles.GetNumberOfValues();
auto portal = res.Particles.ReadPortal();
for (vtkm::Id i = 0; i < n; i++)
{
vtkm::Massless p = portal.Get(i);
if (p.Status.CheckTerminate())
{
T.push_back(p);
this->ParticleBlockIDsMap.erase(p.ID);
}
else
{
std::vector<vtkm::Id> currBIDs = this->ParticleBlockIDsMap[p.ID];
VTKM_ASSERT(!currBIDs.empty());
std::vector<vtkm::Id> newIDs;
if (p.Status.CheckSpatialBounds() && !p.Status.CheckTookAnySteps())
newIDs.assign(std::next(currBIDs.begin(), 1), currBIDs.end());
else
newIDs = this->BoundsMap.FindBlocks(p.Pos, currBIDs);
//reset the particle status.
p.Status = vtkm::ParticleStatus();
if (newIDs.empty()) //No blocks, we're done.
{
T.push_back(p);
this->ParticleBlockIDsMap.erase(p.ID);
}
else
{
//If we have more than blockId, we want to minimize communication
//and put any blocks owned by this rank first.
if (newIDs.size() > 1)
{
for (auto it = newIDs.begin(); it != newIDs.end(); it++)
{
vtkm::Id bid = *it;
if (this->BoundsMap.FindRank(bid) == this->Rank)
{
newIDs.erase(it);
newIDs.insert(newIDs.begin(), bid);
break;
}
}
}
int dstRank = this->BoundsMap.FindRank(newIDs[0]);
this->ParticleBlockIDsMap[p.ID] = newIDs;
if (dstRank == this->Rank)
A.push_back(p);
else
I.push_back(p);
}
}
}
this->StoreResult(res, blockId);
}
}
} // namespace vtkm::filter
#endif

@ -0,0 +1,271 @@
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
//
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//============================================================================
#include <vtkm/filter/particleadvection/MemStream.h>
#include <vtkm/filter/particleadvection/ParticleMessenger.h>
#include <iostream>
#include <string.h>
#include <vtkm/filter/particleadvection/Logger.h>
#if 0
#define DBG(msg) vtkm::filter::Logger::GetInstance("out")->GetStream() << msg
#define WDBG(msg) vtkm::filter::Logger::GetInstance("wout")->GetStream() << msg
#else
#define DBG(msg)
#define WDBG(msg)
#endif
namespace vtkm
{
namespace filter
{
VTKM_CONT
ParticleMessenger::ParticleMessenger(vtkmdiy::mpi::communicator& comm,
const vtkm::filter::BoundsMap& boundsMap,
int msgSz,
int numParticles)
: Messenger(comm)
#ifdef VTKM_ENABLE_MPI
, BoundsMap(boundsMap)
#endif
{
#ifdef VTKM_ENABLE_MPI
this->RegisterMessages(msgSz, numParticles);
#else
(void)(boundsMap);
(void)(msgSz);
(void)(numParticles);
#endif
}
int ParticleMessenger::CalcParticleBufferSize(int nParticles, int numBlockIds)
{
MemStream buff;
int r = 0;
//Make a vector of particles where each particle has 'numBlockIds' in the blockId array.
std::vector<vtkm::Massless> v(static_cast<std::size_t>(nParticles));
std::vector<vtkm::Id> blockIDs(static_cast<std::size_t>(numBlockIds), 0);
vtkm::filter::write(buff, r);
vtkm::filter::write(buff, v);
for (int i = 0; i < nParticles; i++)
vtkm::filter::write(buff, blockIDs);
return static_cast<int>(buff.GetLen());
}
VTKM_CONT
void ParticleMessenger::SerialExchange(
const std::vector<vtkm::Massless>& outData,
const std::map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
vtkm::Id vtkmNotUsed(numLocalTerm),
std::vector<vtkm::Massless>& inData,
std::map<vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap) const
{
for (auto& p : outData)
{
const auto& bids = outBlockIDsMap.find(p.ID)->second;
inData.push_back(p);
inDataBlockIDsMap[p.ID] = bids;
}
}
VTKM_CONT
void ParticleMessenger::Exchange(const std::vector<vtkm::Massless>& outData,
const std::map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
vtkm::Id numLocalTerm,
std::vector<vtkm::Massless>& inData,
std::map<vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap,
vtkm::Id& numTerminateMessages)
{
numTerminateMessages = 0;
inDataBlockIDsMap.clear();
if (this->NumRanks == 1)
return this->SerialExchange(outData, outBlockIDsMap, numLocalTerm, inData, inDataBlockIDsMap);
#ifdef VTKM_ENABLE_MPI
//dstRank, vector of (particles,blockIDs)
std::map<int, std::vector<ParticleCommType>> sendData;
for (auto& p : outData)
{
const auto& bids = outBlockIDsMap.find(p.ID)->second;
int dstRank = this->BoundsMap.FindRank(bids[0]);
sendData[dstRank].push_back(std::make_pair(p, bids));
}
//Check if we have anything coming in.
std::vector<ParticleRecvCommType> particleData;
std::vector<MsgCommType> msgData;
if (RecvAny(&msgData, &particleData, false))
{
DBG("-----Recv: M: " << msgData.size() << " P: " << particleData.size() << std::endl);
for (auto& it : particleData)
for (const auto& v : it.second)
{
const auto& p = v.first;
const auto& bids = v.second;
inData.push_back(p);
inDataBlockIDsMap[p.ID] = bids;
}
for (auto& m : msgData)
{
if (m.second[0] == MSG_TERMINATE)
{
numTerminateMessages += static_cast<vtkm::Id>(m.second[1]);
DBG("-----TERMinate: Recv: " << m.second[1] << std::endl);
}
}
}
//Do all the sending...
if (numLocalTerm > 0)
{
std::vector<int> msg = { MSG_TERMINATE, static_cast<int>(numLocalTerm) };
DBG("-----SendAllMsg: msg=" << msg << std::endl);
SendAllMsg(msg);
}
this->SendParticles(sendData);
this->CheckPendingSendRequests();
#endif
}
#ifdef VTKM_ENABLE_MPI
VTKM_CONT
void ParticleMessenger::RegisterMessages(int msgSz, int nParticles)
{
//Determine buffer size for msg and particle tags.
int messageBuffSz = CalcMessageBufferSize(msgSz + 1);
int particleBuffSz = CalcParticleBufferSize(nParticles);
int numRecvs = std::min(64, this->NumRanks - 1);
this->RegisterTag(ParticleMessenger::MESSAGE_TAG, numRecvs, messageBuffSz);
this->RegisterTag(ParticleMessenger::PARTICLE_TAG, numRecvs, particleBuffSz);
this->InitializeBuffers();
}
VTKM_CONT
void ParticleMessenger::SendMsg(int dst, const std::vector<int>& msg)
{
MemStream* buff = new MemStream();
//Write data.
vtkm::filter::write(*buff, this->Rank);
vtkm::filter::write(*buff, msg);
this->SendData(dst, ParticleMessenger::MESSAGE_TAG, buff);
}
VTKM_CONT
void ParticleMessenger::SendAllMsg(const std::vector<int>& msg)
{
for (int i = 0; i < this->NumRanks; i++)
if (i != this->Rank)
this->SendMsg(i, msg);
}
VTKM_CONT
bool ParticleMessenger::RecvAny(std::vector<MsgCommType>* msgs,
std::vector<ParticleRecvCommType>* recvParticles,
bool blockAndWait)
{
std::set<int> tags;
if (msgs)
{
tags.insert(ParticleMessenger::MESSAGE_TAG);
msgs->resize(0);
}
if (recvParticles)
{
tags.insert(ParticleMessenger::PARTICLE_TAG);
recvParticles->resize(0);
}
if (tags.empty())
return false;
std::vector<std::pair<int, MemStream*>> buffers;
if (!this->RecvData(tags, buffers, blockAndWait))
return false;
for (size_t i = 0; i < buffers.size(); i++)
{
if (buffers[i].first == ParticleMessenger::MESSAGE_TAG)
{
int sendRank;
std::vector<int> m;
vtkm::filter::read(*buffers[i].second, sendRank);
vtkm::filter::read(*buffers[i].second, m);
msgs->push_back(std::make_pair(sendRank, m));
}
else if (buffers[i].first == ParticleMessenger::PARTICLE_TAG)
{
int sendRank;
std::size_t num;
vtkm::filter::read(*buffers[i].second, sendRank);
vtkm::filter::read(*buffers[i].second, num);
if (num > 0)
{
std::vector<ParticleCommType> particles(num);
for (std::size_t j = 0; j < num; j++)
{
vtkm::filter::read(*(buffers[i].second), particles[j].first);
vtkm::filter::read(*(buffers[i].second), particles[j].second);
}
recvParticles->push_back(std::make_pair(sendRank, particles));
}
}
delete buffers[i].second;
}
return true;
}
VTKM_CONT
template <typename P, template <typename, typename> class Container, typename Allocator>
void ParticleMessenger::SendParticles(int dst, const Container<P, Allocator>& c)
{
if (dst == this->Rank)
{
std::cerr << "Error. Sending IC to yourself" << std::endl;
return;
}
if (c.empty())
return;
vtkm::filter::MemStream* buff = new vtkm::filter::MemStream();
vtkm::filter::write(*buff, this->Rank);
vtkm::filter::write(*buff, c);
this->SendData(dst, ParticleMessenger::PARTICLE_TAG, buff);
}
VTKM_CONT
template <typename P, template <typename, typename> class Container, typename Allocator>
void ParticleMessenger::SendParticles(const std::map<int, Container<P, Allocator>>& m)
{
for (auto mit = m.begin(); mit != m.end(); mit++)
if (!mit->second.empty())
this->SendParticles(mit->first, mit->second);
}
#endif
}
} // vtkm::filter

@ -0,0 +1,130 @@
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
//
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//============================================================================
#ifndef vtk_m_filter_ParticleMessenger_h
#define vtk_m_filter_ParticleMessenger_h
#include <vtkm/Particle.h>
#include <vtkm/filter/particleadvection/BoundsMap.h>
#include <vtkm/filter/particleadvection/Messenger.h>
#include <list>
#include <map>
#include <set>
#include <vector>
namespace vtkm
{
namespace filter
{
class VTKM_ALWAYS_EXPORT ParticleMessenger : public vtkm::filter::Messenger
{
//sendRank, message
using MsgCommType = std::pair<int, std::vector<int>>;
//particle + blockIDs.
using ParticleCommType = std::pair<vtkm::Massless, std::vector<vtkm::Id>>;
//sendRank, vector of ParticleCommType.
using ParticleRecvCommType = std::pair<int, std::vector<ParticleCommType>>;
public:
VTKM_CONT ParticleMessenger(vtkmdiy::mpi::communicator& comm,
const vtkm::filter::BoundsMap& bm,
int msgSz = 1,
int numParticles = 128);
VTKM_CONT ~ParticleMessenger() {}
VTKM_CONT void Exchange(const std::vector<vtkm::Massless>& outData,
const std::map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
vtkm::Id numLocalTerm,
std::vector<vtkm::Massless>& inData,
std::map<vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap,
vtkm::Id& numTerminateMessages);
protected:
#ifdef VTKM_ENABLE_MPI
static constexpr int MSG_TERMINATE = 1;
enum
{
MESSAGE_TAG = 0x42000,
PARTICLE_TAG = 0x42001
};
VTKM_CONT void RegisterMessages(int msgSz, int nParticles);
// Send/Recv Integral curves.
VTKM_CONT
template <typename P,
template <typename, typename> class Container,
typename Allocator = std::allocator<P>>
void SendParticles(int dst, const Container<P, Allocator>& c);
VTKM_CONT
template <typename P,
template <typename, typename> class Container,
typename Allocator = std::allocator<P>>
void SendParticles(const std::map<int, Container<P, Allocator>>& m);
// Send/Recv messages.
VTKM_CONT void SendMsg(int dst, const std::vector<int>& msg);
VTKM_CONT void SendAllMsg(const std::vector<int>& msg);
VTKM_CONT bool RecvMsg(std::vector<MsgCommType>& msgs) { return RecvAny(&msgs, NULL, false); }
// Send/Recv datasets.
VTKM_CONT bool RecvAny(std::vector<MsgCommType>* msgs,
std::vector<ParticleRecvCommType>* recvParticles,
bool blockAndWait);
const vtkm::filter::BoundsMap& BoundsMap;
#endif
VTKM_CONT void SerialExchange(const std::vector<vtkm::Massless>& outData,
const std::map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
vtkm::Id numLocalTerm,
std::vector<vtkm::Massless>& inData,
std::map<vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap) const;
static int CalcParticleBufferSize(int nParticles, int numBlockIds = 2);
};
template <>
struct Serialization<vtkm::Massless>
{
static void write(vtkm::filter::MemStream& memstream, const vtkm::Massless& data)
{
vtkm::filter::write(memstream, data.Pos[0]);
vtkm::filter::write(memstream, data.Pos[1]);
vtkm::filter::write(memstream, data.Pos[2]);
vtkm::filter::write(memstream, data.ID);
vtkm::filter::write(memstream, data.Status);
vtkm::filter::write(memstream, data.NumSteps);
vtkm::filter::write(memstream, data.Time);
}
static void read(vtkm::filter::MemStream& memstream, vtkm::Massless& data)
{
vtkm::filter::read(memstream, data.Pos[0]);
vtkm::filter::read(memstream, data.Pos[1]);
vtkm::filter::read(memstream, data.Pos[2]);
vtkm::filter::read(memstream, data.ID);
vtkm::filter::read(memstream, data.Status);
vtkm::filter::read(memstream, data.NumSteps);
vtkm::filter::read(memstream, data.Time);
}
};
}
} // namespace vtkm::filter
#endif

@ -0,0 +1,98 @@
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
//
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//============================================================================
#ifndef vtk_m_filter_StreamUtil_h
#define vtk_m_filter_StreamUtil_h
#include <deque>
#include <iostream>
#include <list>
#include <map>
#include <vector>
#include <vtkm/cont/DataSet.h>
namespace vtkm
{
namespace filter
{
template <class T>
inline std::ostream& operator<<(std::ostream& os, const std::list<T>& l)
{
os << "{";
for (auto it = l.begin(); it != l.end(); it++)
os << (*it) << " ";
os << "}";
return os;
}
template <typename T>
inline std::ostream& operator<<(std::ostream& os, const std::deque<T>& l)
{
os << "{";
for (auto it = l.begin(); it != l.end(); it++)
os << (*it) << " ";
os << "}";
return os;
}
// Forward declaration so we can have pairs with vectors
template <typename T>
std::ostream& operator<<(std::ostream& os, const std::vector<T>& v);
template <typename T1, typename T2>
inline std::ostream& operator<<(std::ostream& os, const std::pair<T1, T2>& p)
{
os << "(" << p.first << "," << p.second << ")";
return os;
}
template <typename T>
std::ostream& operator<<(std::ostream& os, const std::vector<T>& v)
{
os << "[";
int n = v.size();
if (n > 0)
{
for (int i = 0; i < n - 1; i++)
os << v[i] << " ";
os << v[n - 1];
}
os << "]";
return os;
}
template <typename T1, typename T2>
inline std::ostream& operator<<(std::ostream& os, const std::map<T1, T2>& m)
{
os << "{";
for (auto it = m.begin(); it != m.end(); it++)
os << "(" << it->first << "," << it->second << ") ";
os << "}";
return os;
}
inline std::ostream& operator<<(std::ostream& os, const vtkm::cont::DataSet& ds)
{
ds.PrintSummary(os);
return os;
}
/*
inline std::ostream &operator<<(std::ostream &os, const vtkm::Bounds &b)
{
os<<"{("<<b.X.Min<<":"<<b.X.Max<<")("<<b.Y.Min<<":"<<b.Y.Max<<")("<<b.Z.Min<<":"<<b.Z.Max<<")}";
return os;
}
*/
}
} // namespace vtkm
#endif

@ -75,3 +75,18 @@ vtkm_unit_tests(
ALL_BACKENDS
USE_VTKM_JOB_POOL
)
# add distributed tests i.e. test to run with MPI
# if MPI is enabled.
if (VTKm_ENABLE_MPI)
set(mpi_unit_tests
UnitTestParticleAdvectionFilterMPI.cxx
UnitTestStreamlineFilterMPI.cxx
)
vtkm_unit_tests(
MPI SOURCES ${mpi_unit_tests}
LIBRARIES vtkm_filter vtkm_source vtkm_io
ALL_BACKENDS
USE_VTKM_JOB_POOL
)
endif()

@ -12,10 +12,14 @@
#include <vtkm/cont/testing/Testing.h>
#include <vtkm/filter/ParticleAdvection.h>
#include <vtkm/io/VTKDataSetReader.h>
#include <vtkm/thirdparty/diy/environment.h>
namespace
{
vtkm::cont::DataSet CreateDataSet(const vtkm::Id3& dims, const vtkm::Vec3f& vec)
vtkm::cont::DataSet CreateDataSet(const vtkm::Id3& dims,
const vtkm::Vec3f& origin,
const vtkm::Vec3f& spacing,
const vtkm::Vec3f& vec)
{
vtkm::Id numPoints = dims[0] * dims[1] * dims[2];
@ -25,7 +29,7 @@ vtkm::cont::DataSet CreateDataSet(const vtkm::Id3& dims, const vtkm::Vec3f& vec)
vtkm::cont::DataSetBuilderUniform dataSetBuilder;
vtkm::cont::DataSet ds = dataSetBuilder.Create(dims);
vtkm::cont::DataSet ds = dataSetBuilder.Create(dims, origin, spacing);
ds.AddPointField("vector", vectorField);
return ds;
@ -34,13 +38,13 @@ vtkm::cont::DataSet CreateDataSet(const vtkm::Id3& dims, const vtkm::Vec3f& vec)
void TestBasic()
{
const vtkm::Id3 dims(5, 5, 5);
const vtkm::Vec3f vecX(1, 0, 0);
vtkm::cont::DataSet ds = CreateDataSet(dims, vecX);
const vtkm::Vec3f origin(0, 0, 0), spacing(1, 1, 1), vecX(1, 0, 0);
vtkm::cont::DataSet ds = CreateDataSet(dims, origin, spacing, vecX);
vtkm::cont::ArrayHandle<vtkm::Massless> seedArray =
vtkm::cont::make_ArrayHandle({ vtkm::Massless(vtkm::Vec3f(.2f, 1.0f, .2f), 0),
vtkm::Massless(vtkm::Vec3f(.2f, 2.0f, .2f), 1),
vtkm::Massless(vtkm::Vec3f(.2f, 3.0f, .2f), 2) });
vtkm::Massless(vtkm::Vec3f(.2f, 3.0f, .2f), 2),
vtkm::Massless(vtkm::Vec3f(.2f, 3.2f, .2f), 3) });
vtkm::filter::ParticleAdvection particleAdvection;
@ -56,10 +60,66 @@ void TestBasic()
"Wrong number of coordinate systems in the output dataset");
vtkm::cont::CoordinateSystem coords = output.GetCoordinateSystem();
VTKM_TEST_ASSERT(coords.GetNumberOfPoints() == 3, "Wrong number of coordinates");
VTKM_TEST_ASSERT(coords.GetNumberOfPoints() == 4, "Wrong number of coordinates");
vtkm::cont::DynamicCellSet dcells = output.GetCellSet();
VTKM_TEST_ASSERT(dcells.GetNumberOfCells() == 3, "Wrong number of cells");
VTKM_TEST_ASSERT(dcells.GetNumberOfCells() == 4, "Wrong number of cells");
}
void TestPartitionedDataSet()
{
const vtkm::Id3 dims(5, 5, 5);
const vtkm::Vec3f o1(0, 0, 0), o2(4, 0, 0), o3(8, 0, 0);
const vtkm::Vec3f spacing(1, 1, 1);
const vtkm::Vec3f vecX(1, 0, 0);
vtkm::cont::PartitionedDataSet pds;
pds.AppendPartition(CreateDataSet(dims, o1, spacing, vecX));
pds.AppendPartition(CreateDataSet(dims, o2, spacing, vecX));
pds.AppendPartition(CreateDataSet(dims, o3, spacing, vecX));
vtkm::cont::ArrayHandle<vtkm::Massless> seedArray;
seedArray = vtkm::cont::make_ArrayHandle({ vtkm::Massless(vtkm::Vec3f(.2f, 1.0f, .2f), 0),
vtkm::Massless(vtkm::Vec3f(.2f, 2.0f, .2f), 1),
vtkm::Massless(vtkm::Vec3f(4.2f, 1.0f, .2f), 2),
vtkm::Massless(vtkm::Vec3f(8.2f, 1.0f, .2f), 3) });
vtkm::Id numSeeds = seedArray.GetNumberOfValues();
vtkm::filter::ParticleAdvection particleAdvection;
particleAdvection.SetStepSize(0.1f);
particleAdvection.SetNumberOfSteps(1000);
particleAdvection.SetSeeds(seedArray);
particleAdvection.SetActiveField("vector");
auto out = particleAdvection.Execute(pds);
std::cout << "###### " << out.GetNumberOfPartitions() << std::endl;
for (int i = 0; i < out.GetNumberOfPartitions(); i++)
{
std::cout << "PID= " << i << std::endl;
out.GetPartition(i).PrintSummary(std::cout);
}
VTKM_TEST_ASSERT(out.GetNumberOfPartitions() == 1, "Wrong number of partitions in output");
auto ds = out.GetPartition(0);
//Validate the result is correct.
VTKM_TEST_ASSERT(ds.GetNumberOfCoordinateSystems() == 1,
"Wrong number of coordinate systems in the output dataset");
auto coords = ds.GetCoordinateSystem().GetDataAsMultiplexer();
VTKM_TEST_ASSERT(ds.GetNumberOfPoints() == numSeeds, "Wrong number of coordinates");
auto ptPortal = coords.ReadPortal();
vtkm::Id nPts = ptPortal.GetNumberOfValues();
for (vtkm::Id i = 0; i < nPts; i++)
VTKM_TEST_ASSERT(ptPortal.Get(i)[0] >= 12);
vtkm::cont::DynamicCellSet dcells = ds.GetCellSet();
VTKM_TEST_ASSERT(dcells.GetNumberOfCells() == numSeeds, "Wrong number of cells");
ds.PrintSummary(std::cout);
}
void TestFile(const std::string& fname,
@ -117,6 +177,7 @@ void TestFile(const std::string& fname,
void TestParticleAdvectionFilter()
{
TestBasic();
TestPartitionedDataSet();
std::string basePath = vtkm::cont::testing::Testing::GetTestDataBasePath();
@ -152,5 +213,8 @@ void TestParticleAdvectionFilter()
int UnitTestParticleAdvectionFilter(int argc, char* argv[])
{
// Setup MPI environment: This test is not intendent to be run in parallel
// but filter does make MPI calls
vtkmdiy::mpi::environment env(argc, argv);
return vtkm::cont::testing::Testing::Run(TestParticleAdvectionFilter, argc, argv);
}

@ -0,0 +1,105 @@
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
//
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//============================================================================
#include <vtkm/cont/DataSetBuilderUniform.h>
#include <vtkm/cont/EnvironmentTracker.h>
#include <vtkm/cont/testing/Testing.h>
#include <vtkm/filter/ParticleAdvection.h>
#include <vtkm/thirdparty/diy/diy.h>
namespace
{
vtkm::cont::DataSet CreateDataSet(const vtkm::Id3& dims,
const vtkm::Vec3f& origin,
const vtkm::Vec3f& spacing,
const vtkm::Vec3f& vec)
{
vtkm::Id numPoints = dims[0] * dims[1] * dims[2];
std::vector<vtkm::Vec3f> vectorField(static_cast<std::size_t>(numPoints));
for (std::size_t i = 0; i < static_cast<std::size_t>(numPoints); i++)
vectorField[i] = vec;
vtkm::cont::DataSetBuilderUniform dataSetBuilder;
vtkm::cont::DataSet ds = dataSetBuilder.Create(dims, origin, spacing);
ds.AddPointField("vector", vectorField);
return ds;
}
void TestPartitionedDataSet(int nPerRank)
{
const vtkm::Id3 dims(5, 5, 5);
const vtkm::Vec3f spacing(1, 1, 1);
const vtkm::Vec3f vecX(1, 0, 0);
vtkm::cont::PartitionedDataSet pds;
//Create nPerRank partitions per rank with X=4*rank.
auto comm = vtkm::cont::EnvironmentTracker::GetCommunicator();
for (int i = 0; i < nPerRank; i++)
{
vtkm::FloatDefault x = static_cast<vtkm::FloatDefault>(4 * (nPerRank * comm.rank() + i));
vtkm::Vec3f origin(x, 0, 0);
pds.AppendPartition(CreateDataSet(dims, origin, spacing, vecX));
}
vtkm::cont::ArrayHandle<vtkm::Massless> seedArray;
seedArray = vtkm::cont::make_ArrayHandle({ vtkm::Massless(vtkm::Vec3f(.2f, 1.0f, .2f), 0),
vtkm::Massless(vtkm::Vec3f(.2f, 2.0f, .2f), 1) });
vtkm::Id numSeeds = seedArray.GetNumberOfValues();
vtkm::filter::ParticleAdvection particleAdvection;
particleAdvection.SetStepSize(0.1f);
particleAdvection.SetNumberOfSteps(100000);
particleAdvection.SetSeeds(seedArray);
particleAdvection.SetActiveField("vector");
auto out = particleAdvection.Execute(pds);
//Particles end up in last rank.
if (comm.rank() == comm.size() - 1)
{
vtkm::FloatDefault globalMaxX = static_cast<vtkm::FloatDefault>(nPerRank * comm.size() * 4);
VTKM_TEST_ASSERT(out.GetNumberOfPartitions() == 1, "Wrong number of partitions in output");
auto ds = out.GetPartition(0);
//Validate the result is correct.
VTKM_TEST_ASSERT(ds.GetNumberOfCoordinateSystems() == 1,
"Wrong number of coordinate systems in the output dataset");
auto coords = ds.GetCoordinateSystem().GetDataAsMultiplexer();
VTKM_TEST_ASSERT(ds.GetNumberOfPoints() == numSeeds, "Wrong number of coordinates");
auto ptPortal = coords.ReadPortal();
for (vtkm::Id i = 0; i < numSeeds; i++)
VTKM_TEST_ASSERT(ptPortal.Get(i)[0] >= globalMaxX);
vtkm::cont::DynamicCellSet dcells = ds.GetCellSet();
VTKM_TEST_ASSERT(dcells.GetNumberOfCells() == numSeeds, "Wrong number of cells");
}
else
VTKM_TEST_ASSERT(out.GetNumberOfPartitions() == 0, "Wrong number of partitions in output");
}
void TestParticleAdvectionFilterMPI()
{
for (int n = 1; n < 10; n++)
TestPartitionedDataSet(n);
}
}
int UnitTestParticleAdvectionFilterMPI(int argc, char* argv[])
{
return vtkm::cont::testing::Testing::Run(TestParticleAdvectionFilterMPI, argc, argv);
}

@ -123,7 +123,7 @@ void TestStreamlineFile(const std::string& fname,
std::vector<vtkm::Massless> seeds;
for (vtkm::Id i = 0; i < numPoints; i++)
seeds.push_back(vtkm::Massless(pts[static_cast<std::size_t>(i)], i));
auto seedArray = vtkm::cont::make_ArrayHandle(seeds, vtkm::CopyFlag::On);
auto seedArray = vtkm::cont::make_ArrayHandle(seeds, vtkm::CopyFlag::Off);
vtkm::filter::Streamline streamline;
streamline.SetStepSize(stepSize);
@ -192,5 +192,8 @@ void TestStreamlineFilters()
int UnitTestStreamlineFilter(int argc, char* argv[])
{
// Setup MPI environment: This test is not intendent to be run in parallel
// but filter does make MPI calls
vtkmdiy::mpi::environment env(argc, argv);
return vtkm::cont::testing::Testing::Run(TestStreamlineFilters, argc, argv);
}

@ -0,0 +1,112 @@
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
//
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//============================================================================
#include <vtkm/cont/DataSetBuilderUniform.h>
#include <vtkm/cont/testing/Testing.h>
#include <vtkm/filter/Streamline.h>
#include <vtkm/thirdparty/diy/diy.h>
namespace
{
vtkm::cont::DataSet CreateDataSet(const vtkm::Id3& dims,
const vtkm::Vec3f& origin,
const vtkm::Vec3f& spacing,
const vtkm::Vec3f& vec)
{
vtkm::Id numPoints = dims[0] * dims[1] * dims[2];
std::vector<vtkm::Vec3f> vectorField(static_cast<std::size_t>(numPoints));
for (std::size_t i = 0; i < static_cast<std::size_t>(numPoints); i++)
vectorField[i] = vec;
vtkm::cont::DataSetBuilderUniform dataSetBuilder;
vtkm::cont::DataSet ds = dataSetBuilder.Create(dims, origin, spacing);
ds.AddPointField("vector", vectorField);
return ds;
}
void TestPartitionedDataSet(vtkm::Id nPerRank)
{
const vtkm::Id3 dims(5, 5, 5);
const vtkm::Vec3f spacing(1, 1, 1);
const vtkm::Vec3f vecX(1, 0, 0);
vtkm::cont::PartitionedDataSet pds;
//Create nPerRank partitions per rank with X=4*rank.
auto comm = vtkm::cont::EnvironmentTracker::GetCommunicator();
std::vector<vtkm::Range> XPartitionRanges;
for (vtkm::Id i = 0; i < nPerRank; i++)
{
vtkm::FloatDefault x = static_cast<vtkm::FloatDefault>(4 * (nPerRank * comm.rank() + i));
vtkm::Vec3f origin(x, 0, 0);
pds.AppendPartition(CreateDataSet(dims, origin, spacing, vecX));
XPartitionRanges.push_back(vtkm::Range(x, x + 4));
}
vtkm::cont::ArrayHandle<vtkm::Massless> seedArray;
seedArray = vtkm::cont::make_ArrayHandle({ vtkm::Massless(vtkm::Vec3f(.2f, 1.0f, .2f), 0),
vtkm::Massless(vtkm::Vec3f(.2f, 2.0f, .2f), 1) });
vtkm::Id numSeeds = seedArray.GetNumberOfValues();
vtkm::filter::Streamline streamline;
streamline.SetStepSize(0.1f);
streamline.SetNumberOfSteps(100000);
streamline.SetSeeds(seedArray);
streamline.SetActiveField("vector");
auto out = streamline.Execute(pds);
for (vtkm::Id i = 0; i < nPerRank; i++)
{
auto inputDS = pds.GetPartition(i);
auto inputBounds = inputDS.GetCoordinateSystem().GetBounds();
auto outputDS = out.GetPartition(i);
VTKM_TEST_ASSERT(outputDS.GetNumberOfCoordinateSystems() == 1,
"Wrong number of coordinate systems in the output dataset");
vtkm::cont::DynamicCellSet dcells = outputDS.GetCellSet();
VTKM_TEST_ASSERT(dcells.GetNumberOfCells() == numSeeds, "Wrong number of cells");
auto coords = outputDS.GetCoordinateSystem().GetDataAsMultiplexer();
auto ptPortal = coords.ReadPortal();
vtkm::cont::CellSetExplicit<> explicitCells;
VTKM_TEST_ASSERT(dcells.IsType<vtkm::cont::CellSetExplicit<>>(), "Wrong cell type.");
explicitCells = dcells.Cast<vtkm::cont::CellSetExplicit<>>();
for (vtkm::Id j = 0; j < numSeeds; j++)
{
vtkm::cont::ArrayHandle<vtkm::Id> indices;
explicitCells.GetIndices(j, indices);
vtkm::Id nPts = indices.GetNumberOfValues();
auto iPortal = indices.ReadPortal();
vtkm::Vec3f lastPt = ptPortal.Get(iPortal.Get(nPts - 1));
VTKM_TEST_ASSERT(lastPt[0] > inputBounds.X.Max, "Wrong end point for seed");
}
}
}
void TestStreamlineFiltersMPI()
{
for (int n = 1; n < 10; n++)
TestPartitionedDataSet(n);
}
}
int UnitTestStreamlineFilterMPI(int argc, char* argv[])
{
return vtkm::cont::testing::Testing::Run(TestStreamlineFiltersMPI, argc, argv);
}

@ -168,7 +168,7 @@ public:
void PreStepUpdate(const vtkm::Id& idx)
{
ParticleType p = this->ParticleExecutionObject<Device, ParticleType>::GetParticle(idx);
if (p.NumSteps == 0)
if (this->StepCount.Get(idx) == 0)
{
vtkm::Id loc = idx * Length;
this->History.Set(loc, p.Pos);