Compare commits

...

13 Commits

Author SHA1 Message Date
Dave Pugmire
3081a31951 Merge branch 'sl_comm_probe' into 'master'
Draft: Use async termination.

See merge request vtk/vtk-m!3182
2024-06-27 15:54:24 -04:00
Vicente Bolea
9c0a3aef31 Merge topic 'update-rocm'
c6f0e3698 ci: update kokkos hip to 6.1, ubuntu 2204

Acked-by: Kitware Robot <kwrobot@kitware.com>
Acked-by: Kenneth Moreland <morelandkd@ornl.gov>
Merge-request: !3237
2024-06-27 14:21:15 -04:00
Vicente Bolea
726bd0b551 Merge branch 'release' into master 2024-06-26 17:46:45 -04:00
Vicente Bolea
743a7c3ac4 Merge branch 'release-2.0' into release 2024-06-26 17:46:45 -04:00
Vicente Bolea
b6a0e4d79b Merge topic 'backport-3233' into release-2.0
638d18356 fix: perftest upload
1078d7dfb Revert "ci: fix perftest uploading"

Acked-by: Kitware Robot <kwrobot@kitware.com>
Acked-by: Kenneth Moreland <morelandkd@ornl.gov>
Merge-request: !3235
2024-06-26 17:46:45 -04:00
Vicente Adolfo Bolea Sanchez
c6f0e36986 ci: update kokkos hip to 6.1, ubuntu 2204 2024-06-26 16:31:25 -04:00
Vicente Adolfo Bolea Sanchez
638d183567 fix: perftest upload
(cherry picked from commit 76ddf7a5b2836cf4153e36f28dd2675f5ef8a735)
2024-06-21 16:50:11 -04:00
Vicente Adolfo Bolea Sanchez
1078d7dfb4 Revert "ci: fix perftest uploading"
This reverts commit 31b8b2faf90a79e069761b12deac8525cd4c0aa1.

(cherry picked from commit cdd3a55f61850c4438a0b0f890bc6e3f11c641fb)
2024-06-21 16:50:07 -04:00
Dave Pugmire
c75dcace78 Merge branch 'master' of https://gitlab.kitware.com/vtk/vtk-m into sl_comm_probe 2024-04-01 15:44:57 -04:00
Dave Pugmire
02553c0b6f Merge branch 'master' of https://gitlab.kitware.com/vtk/vtk-m into sl_comm_probe 2024-03-19 15:33:13 -04:00
Dave Pugmire
eef93ff825 Merge branch 'master' of https://gitlab.kitware.com/vtk/vtk-m into sl_comm_probe 2024-02-02 15:05:57 -05:00
Dave Pugmire
dd96f1144b Fixes for non-mpi code... 2024-01-24 07:23:55 -05:00
Dave Pugmire
4bd340156b Use async termination. 2024-01-23 16:57:10 -05:00
12 changed files with 259 additions and 137 deletions

@ -69,8 +69,8 @@
extends:
- .docker_image
.ubuntu2004_hip_kokkos: &ubuntu2004_hip_kokkos
image: "kitware/vtkm:ci-ubuntu2004_hip_kokkos-20230220"
.ubuntu2204_hip_kokkos: &ubuntu2204_hip_kokkos
image: "kitware/vtkm:ci-ubuntu2204_hip_kokkos-20240625"
extends:
- .docker_image
@ -245,4 +245,5 @@ include:
- local: '/.gitlab/ci/ubuntu1604.yml'
- local: '/.gitlab/ci/ubuntu1804.yml'
- local: '/.gitlab/ci/ubuntu2004.yml'
- local: '/.gitlab/ci/ubuntu2204.yml'
- local: '/.gitlab/ci/windows10.yml'

@ -10,7 +10,7 @@
##
##=============================================================================
FROM rocm/dev-ubuntu-20.04
FROM rocm/dev-ubuntu-22.04
LABEL maintainer "Vicente Adolfo Bolea Sanchez<vicente.bolea@gmail.com>"
# Base dependencies for building VTK-m projects
@ -58,11 +58,22 @@ ENV PATH "/opt/cmake/bin:${PATH}"
ENV CMAKE_PREFIX_PATH "/opt/rocm/lib/cmake:/opt/rocm/lib:${CMAKE_PREFIX_PATH}"
ENV CMAKE_GENERATOR "Ninja"
# Build and install Kokkos
ARG KOKKOS_VERSION=3.7.01
ENV KOKKOS_VERSION=3.7.01
COPY kokkos_cmake_config.cmake kokkos_cmake_config.cmake
RUN curl -L https://github.com/kokkos/kokkos/archive/refs/tags/$KOKKOS_VERSION.tar.gz | tar -xzf - && \
cmake -S kokkos-$KOKKOS_VERSION -B build -C kokkos_cmake_config.cmake && \
cmake -S kokkos-$KOKKOS_VERSION -B build -C kokkos_cmake_config.cmake \
-DCMAKE_PREFIX_INSTALL=/opt/kokkos/$KOKKOS_VERSION \
-DKokkos_ARCH_VEGA900=ON && \
cmake --build build -v && \
cmake --install build && \
rm -rf build kokkos-$KOKKOS_VERSION
ENV KOKKOS_VERSION=4.3.01
COPY kokkos_cmake_config.cmake kokkos_cmake_config.cmake
RUN curl -L https://github.com/kokkos/kokkos/archive/refs/tags/$KOKKOS_VERSION.tar.gz | tar -xzf - && \
cmake -S kokkos-$KOKKOS_VERSION -B build -C kokkos_cmake_config.cmake \
-DCMAKE_PREFIX_INSTALL=/opt/kokkos/$KOKKOS_VERSION \
-DKokkos_ARCH_VEGA906=ON && \
cmake --build build -v && \
cmake --install build && \
rm -rf build kokkos-$KOKKOS_VERSION

@ -9,13 +9,10 @@
##============================================================================
set(CMAKE_BUILD_TYPE "release" CACHE STRING "")
set(CMAKE_INSTALL_PREFIX /opt/kokkos CACHE PATH "")
set(CMAKE_C_COMPILER /opt/rocm/llvm/bin/clang CACHE FILEPATH "")
set(CMAKE_CXX_COMPILER /opt/rocm/llvm/bin/clang++ CACHE FILEPATH "")
set(CMAKE_CXX_STANDARD "14" CACHE STRING "")
set(CMAKE_POSITION_INDEPENDENT_CODE ON CACHE BOOL "")
set(Kokkos_ENABLE_SERIAL ON CACHE BOOL "")
set(Kokkos_ARCH_VEGA900 ON CACHE BOOL "")
set(Kokkos_ENABLE_HIP ON CACHE BOOL "")
set(Kokkos_ENABLE_HIP_RELOCATABLE_DEVICE_CODE OFF CACHE BOOL "")

@ -196,7 +196,7 @@ test:ubuntu1804_clang8:
# Build on ubuntu1804 with kokkos and test on ubuntu1804
# Uses CUDA 11
build:ubuntu1804_kokkos:
build:ubuntu1804_kokkos37:
tags:
- build
- vtkm
@ -212,7 +212,7 @@ build:ubuntu1804_kokkos:
CMAKE_BUILD_TYPE: Release
VTKM_SETTINGS: "benchmarks+kokkos+turing+64bit_floats+shared"
test:ubuntu1804_kokkos:
test:ubuntu1804_kokkos37:
tags:
- test
- vtkm
@ -225,9 +225,9 @@ test:ubuntu1804_kokkos:
- .cmake_test_linux
- .run_automatically
dependencies:
- build:ubuntu1804_kokkos
- build:ubuntu1804_kokkos37
needs:
- build:ubuntu1804_kokkos
- build:ubuntu1804_kokkos37
build:ubuntu1804_cuda_perftest:
tags:

@ -10,7 +10,7 @@
##
##=============================================================================
build:ubuntu2004_kokkos:
build:ubuntu2004_kokkos37:
tags:
- build
- vtkm
@ -25,7 +25,7 @@ build:ubuntu2004_kokkos:
CMAKE_PREFIX_PATH: "/opt/anari"
VTKM_SETTINGS: "kokkos+shared+64bit_floats+rendering+anari"
test:ubuntu2004_kokkos:
test:ubuntu2004_kokkos37:
tags:
- test
- vtkm
@ -36,55 +36,6 @@ test:ubuntu2004_kokkos:
- .cmake_test_linux
- .run_automatically
dependencies:
- build:ubuntu2004_kokkos
- build:ubuntu2004_kokkos37
needs:
- build:ubuntu2004_kokkos
build:ubuntu2004_hip_kokkos:
tags:
- vtkm
- docker
- linux-x86_64
- radeon
extends:
- .ubuntu2004_hip_kokkos
- .cmake_build_linux
- .run_automatically
variables:
CMAKE_BUILD_TYPE: "RelWithDebInfo"
VTKM_SETTINGS: "benchmarks+kokkos+hip+no_rendering+ccache"
CMAKE_PREFIX_PATH: "/opt/rocm/lib/cmake"
LD_LIBRARY_PATH: "/opt/rocm/lib"
CMAKE_HIP_COMPILER: "/opt/rocm/llvm/bin/clang++"
Kokkos_CXX_COMPILER: "/opt/rocm/llvm/bin/clang++"
CMAKE_HIP_ARCHITECTURES: "gfx900"
# -isystem= is not affected by CCACHE_BASEDIR, thus we must ignore it
CCACHE_IGNOREOPTIONS: "-isystem=*"
CCACHE_BASEDIR: "$CI_PROJECT_DIR"
CCACHE_COMPILERCHECK: "content"
CCACHE_NOHASHDIR: "true"
CCACHE_RESHARE: "true"
after_script:
- ccache -v -s
- ccache -z
test:ubuntu2004_hip_kokkos:
tags:
- vtkm
- docker
- linux-x86_64
- radeon
extends:
- .ubuntu2004_hip_kokkos
- .cmake_test_linux
- .run_upstream_branches
variables:
CTEST_MAX_PARALLELISM: 1
CTEST_EXCLUSIONS: "UnitTestWorkletParticleAdvection"
dependencies:
- build:ubuntu2004_hip_kokkos
needs:
- build:ubuntu2004_hip_kokkos
timeout: 3 hours
- build:ubuntu2004_kokkos37

85
.gitlab/ci/ubuntu2204.yml Normal file

@ -0,0 +1,85 @@
##=============================================================================
##
## Copyright (c) Kitware, Inc.
## All rights reserved.
## See LICENSE.txt for details.
##
## This software is distributed WITHOUT ANY WARRANTY; without even
## the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
## PURPOSE. See the above copyright notice for more information.
##
##=============================================================================
.kokkos_rocm_vars: &kokkos_rocm_vars
variables:
CCACHE_BASEDIR: "$CI_PROJECT_DIR"
CCACHE_COMPILERCHECK: "content"
# -isystem= is not affected by CCACHE_BASEDIR, thus we must ignore it
CCACHE_IGNOREOPTIONS: "-isystem=*"
CCACHE_NOHASHDIR: "true"
CCACHE_RESHARE: "true"
CMAKE_BUILD_TYPE: "RelWithDebInfo"
CMAKE_HIP_COMPILER: "/opt/rocm/llvm/bin/clang++"
Kokkos_CXX_COMPILER: "/opt/rocm/llvm/bin/clang++"
LD_LIBRARY_PATH: "/opt/rocm/lib"
CXX: "hipcc"
build:ubuntu2204_hip_kokkos37:
tags:
- vtkm
- docker
- linux-x86_64
- radeon
extends:
- .ubuntu2204_hip_kokkos
- .cmake_build_linux
- .kokkos_rocm_vars
- .run_automatically
variables:
CMAKE_BUILD_TYPE: "RelWithDebInfo"
CMAKE_HIP_ARCHITECTURES: "gfx900"
Kokkos_DIR: "/opt/kokkos/3.7.01/"
VTKM_SETTINGS: "benchmarks+kokkos+hip+no_rendering+ccache"
after_script:
- ccache -v -s
- ccache -z
test:ubuntu2204_hip_kokkos37:
tags:
- vtkm
- docker
- linux-x86_64
- radeon
extends:
- .ubuntu2204_hip_kokkos
- .cmake_test_linux
- .run_upstream_branches
variables:
CTEST_MAX_PARALLELISM: 1
CTEST_EXCLUSIONS: "UnitTestWorkletParticleAdvection"
dependencies:
- build:ubuntu2204_hip_kokkos37
needs:
- build:ubuntu2204_hip_kokkos37
timeout: 3 hours
build:ubuntu2204_hip_kokkos43:
tags:
- vtkm
- docker
- linux-x86_64
- radeon
extends:
- .ubuntu2204_hip_kokkos
- .cmake_build_linux
- .kokkos_rocm_vars
- .run_automatically
variables:
CMAKE_BUILD_TYPE: "RelWithDebInfo"
CMAKE_HIP_ARCHITECTURES: "gfx906"
Kokkos_DIR: "/opt/kokkos/4.3.01/"
VTKM_SETTINGS: "benchmarks+kokkos+hip+no_rendering+ccache"
after_script:
- ccache -v -s
- ccache -z

@ -81,6 +81,8 @@ Optional dependencies are:
+ Kokkos Device Adapter
+ [Kokkos](https://kokkos.github.io/) 3.7+
+ CXX env variable or CMAKE_CXX_COMPILER should be set to
hipcc when using Kokkos device adapter with HIP (ROCM>=6).
+ CUDA Device Adapter
+ [Cuda Toolkit 9.2, >= 10.2](https://developer.nvidia.com/cuda-toolkit)
+ Note CUDA >= 10.2 is required on Windows

@ -15,6 +15,7 @@
#include <vtkm/cont/ErrorFilterExecution.h>
#include <vtkm/filter/Filter.h>
#include <vtkm/filter/flow/FlowTypes.h>
#include <vtkm/filter/flow/internal/BoundsMap.h>
#include <vtkm/filter/flow/vtkm_filter_flow_export.h>
namespace vtkm
@ -104,7 +105,7 @@ protected:
bool BlockIdsSet = false;
std::vector<vtkm::Id> BlockIds;
vtkm::filter::flow::internal::BoundsMap BoundsMap;
vtkm::Id NumberOfSteps = 0;
vtkm::cont::UnknownArrayHandle Seeds;
vtkm::filter::flow::IntegrationSolverType SolverType =

@ -58,13 +58,15 @@ FilterParticleAdvectionSteadyState<Derived>::DoExecutePartitions(
DataSetIntegratorSteadyState<ParticleType, FieldType, TerminationType, AnalysisType>;
this->ValidateOptions();
if (this->BlockIdsSet)
this->BoundsMap = vtkm::filter::flow::internal::BoundsMap(input, this->BlockIds);
else
this->BoundsMap = vtkm::filter::flow::internal::BoundsMap(input);
vtkm::filter::flow::internal::BoundsMap boundsMap(input);
std::vector<DSIType> dsi;
for (vtkm::Id i = 0; i < input.GetNumberOfPartitions(); i++)
{
vtkm::Id blockId = boundsMap.GetLocalBlockId(i);
vtkm::Id blockId = this->BoundsMap.GetLocalBlockId(i);
auto dataset = input.GetPartition(i);
// Build the field for the current dataset
@ -78,7 +80,7 @@ FilterParticleAdvectionSteadyState<Derived>::DoExecutePartitions(
}
vtkm::filter::flow::internal::ParticleAdvector<DSIType> pav(
boundsMap, dsi, this->UseThreadedAlgorithm, this->UseAsynchronousCommunication);
this->BoundsMap, dsi, this->UseThreadedAlgorithm, this->UseAsynchronousCommunication);
vtkm::cont::ArrayHandle<ParticleType> particles;
this->Seeds.AsArrayHandle(particles);

@ -55,12 +55,15 @@ FilterParticleAdvectionUnsteadyState<Derived>::DoExecutePartitions(
using DSIType = vtkm::filter::flow::internal::
DataSetIntegratorUnsteadyState<ParticleType, FieldType, TerminationType, AnalysisType>;
vtkm::filter::flow::internal::BoundsMap boundsMap(input);
if (this->BlockIdsSet)
this->BoundsMap = vtkm::filter::flow::internal::BoundsMap(input, this->BlockIds);
else
this->BoundsMap = vtkm::filter::flow::internal::BoundsMap(input);
std::vector<DSIType> dsi;
for (vtkm::Id i = 0; i < input.GetNumberOfPartitions(); i++)
{
vtkm::Id blockId = boundsMap.GetLocalBlockId(i);
vtkm::Id blockId = this->BoundsMap.GetLocalBlockId(i);
auto ds1 = input.GetPartition(i);
auto ds2 = this->Input2.GetPartition(i);
@ -85,7 +88,7 @@ FilterParticleAdvectionUnsteadyState<Derived>::DoExecutePartitions(
analysis);
}
vtkm::filter::flow::internal::ParticleAdvector<DSIType> pav(
boundsMap, dsi, this->UseThreadedAlgorithm, this->UseAsynchronousCommunication);
this->BoundsMap, dsi, this->UseThreadedAlgorithm, this->UseAsynchronousCommunication);
vtkm::cont::ArrayHandle<ParticleType> particles;
this->Seeds.AsArrayHandle(particles);

@ -15,6 +15,10 @@
#include <vtkm/filter/flow/internal/BoundsMap.h>
#include <vtkm/filter/flow/internal/DataSetIntegrator.h>
#include <vtkm/filter/flow/internal/ParticleMessenger.h>
#ifdef VTKM_ENABLE_MPI
#include <vtkm/thirdparty/diy/diy.h>
#include <vtkm/thirdparty/diy/mpi-cast.h>
#endif
namespace vtkm
{
@ -25,6 +29,87 @@ namespace flow
namespace internal
{
class AdvectAlgorithmTerminator
{
public:
#ifdef VTKM_ENABLE_MPI
AdvectAlgorithmTerminator(vtkmdiy::mpi::communicator& comm)
: MPIComm(vtkmdiy::mpi::mpi_cast(comm.handle()))
#else
AdvectAlgorithmTerminator(vtkmdiy::mpi::communicator& vtkmNotUsed(comm))
#endif
{
}
void AddWork()
{
#ifdef VTKM_ENABLE_MPI
this->Dirty = 1;
#endif
}
bool Done() const { return this->State == AdvectAlgorithmTerminatorState::DONE; }
void Control(bool haveLocalWork)
{
#ifdef VTKM_ENABLE_MPI
if (this->State == STATE_0 && !haveLocalWork)
{
MPI_Ibarrier(this->MPIComm, &this->StateReq);
this->Dirty = 0;
this->State = STATE_1;
}
else if (this->State == STATE_1)
{
MPI_Status status;
int flag;
MPI_Test(&this->StateReq, &flag, &status);
if (flag == 1)
{
int localDirty = this->Dirty;
MPI_Iallreduce(
&localDirty, &this->AllDirty, 1, MPI_INT, MPI_LOR, this->MPIComm, &this->StateReq);
this->State = STATE_2;
}
}
else if (this->State == STATE_2)
{
MPI_Status status;
int flag;
MPI_Test(&this->StateReq, &flag, &status);
if (flag == 1)
{
if (this->AllDirty == 0) //done
this->State = DONE;
else
this->State = STATE_0; //reset.
}
}
#else
if (!haveLocalWork)
this->State = DONE;
#endif
}
private:
enum AdvectAlgorithmTerminatorState
{
STATE_0,
STATE_1,
STATE_2,
DONE
};
AdvectAlgorithmTerminatorState State = AdvectAlgorithmTerminatorState::STATE_0;
#ifdef VTKM_ENABLE_MPI
std::atomic<int> Dirty;
int AllDirty = 0;
MPI_Request StateReq;
MPI_Comm MPIComm;
#endif
};
template <typename DSIType>
class AdvectAlgorithm
{
@ -39,6 +124,7 @@ public:
, NumRanks(this->Comm.size())
, Rank(this->Comm.rank())
, UseAsynchronousCommunication(useAsyncComm)
, Terminator(this->Comm)
{
}
@ -97,27 +183,21 @@ public:
vtkm::filter::flow::internal::ParticleMessenger<ParticleType> messenger(
this->Comm, this->UseAsynchronousCommunication, this->BoundsMap, 1, 128);
this->ComputeTotalNumParticles();
while (this->TotalNumTerminatedParticles < this->TotalNumParticles)
while (!this->Terminator.Done())
{
std::vector<ParticleType> v;
vtkm::Id numTerm = 0, blockId = -1;
vtkm::Id blockId = -1;
if (this->GetActiveParticles(v, blockId))
{
//make this a pointer to avoid the copy?
auto& block = this->GetDataSet(blockId);
DSIHelperInfo<ParticleType> bb(v, this->BoundsMap, this->ParticleBlockIDsMap);
block.Advect(bb, this->StepSize);
numTerm = this->UpdateResult(bb);
this->UpdateResult(bb);
}
vtkm::Id numTermMessages = 0;
this->Communicate(messenger, numTerm, numTermMessages);
this->TotalNumTerminatedParticles += (numTerm + numTermMessages);
if (this->TotalNumTerminatedParticles > this->TotalNumParticles)
throw vtkm::cont::ErrorFilterExecution("Particle count error");
this->Communicate(messenger);
this->Terminator.Control(!this->Active.empty());
}
}
@ -128,19 +208,6 @@ public:
this->ParticleBlockIDsMap.clear();
}
void ComputeTotalNumParticles()
{
vtkm::Id numLocal = static_cast<vtkm::Id>(this->Inactive.size());
for (const auto& it : this->Active)
numLocal += it.second.size();
#ifdef VTKM_ENABLE_MPI
vtkmdiy::mpi::all_reduce(this->Comm, numLocal, this->TotalNumParticles, std::plus<vtkm::Id>{});
#else
this->TotalNumParticles = numLocal;
#endif
}
DataSetIntegrator<DSIType, ParticleType>& GetDataSet(vtkm::Id id)
{
for (auto& it : this->Blocks)
@ -213,9 +280,7 @@ public:
return !particles.empty();
}
void Communicate(vtkm::filter::flow::internal::ParticleMessenger<ParticleType>& messenger,
vtkm::Id numLocalTerminations,
vtkm::Id& numTermMessages)
void Communicate(vtkm::filter::flow::internal::ParticleMessenger<ParticleType>& messenger)
{
std::vector<ParticleType> outgoing;
std::vector<vtkm::Id> outgoingRanks;
@ -224,16 +289,17 @@ public:
std::vector<ParticleType> incoming;
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>> incomingBlockIDs;
numTermMessages = 0;
bool block = false;
#ifdef VTKM_ENABLE_MPI
block = this->GetBlockAndWait(messenger.UsingSyncCommunication(), numLocalTerminations);
block = this->GetBlockAndWait(messenger.UsingSyncCommunication());
#endif
vtkm::Id numTermMessages;
messenger.Exchange(outgoing,
outgoingRanks,
this->ParticleBlockIDsMap,
numLocalTerminations,
0,
incoming,
incomingBlockIDs,
numTermMessages,
@ -311,17 +377,22 @@ public:
{
VTKM_ASSERT(particles.size() == idsMap.size());
for (auto pit = particles.begin(); pit != particles.end(); pit++)
if (!particles.empty())
{
vtkm::Id particleID = pit->GetID();
const auto& it = idsMap.find(particleID);
VTKM_ASSERT(it != idsMap.end() && !it->second.empty());
vtkm::Id blockId = it->second[0];
this->Active[blockId].emplace_back(*pit);
}
this->Terminator.AddWork();
for (const auto& it : idsMap)
this->ParticleBlockIDsMap[it.first] = it.second;
for (auto pit = particles.begin(); pit != particles.end(); pit++)
{
vtkm::Id particleID = pit->GetID();
const auto& it = idsMap.find(particleID);
VTKM_ASSERT(it != idsMap.end() && !it->second.empty());
vtkm::Id blockId = it->second[0];
this->Active[blockId].emplace_back(*pit);
}
for (const auto& it : idsMap)
this->ParticleBlockIDsMap[it.first] = it.second;
}
}
virtual void UpdateInactive(const std::vector<ParticleType>& particles,
@ -351,7 +422,7 @@ public:
}
virtual bool GetBlockAndWait(const bool& syncComm, const vtkm::Id& numLocalTerm)
virtual bool GetBlockAndWait(const bool& syncComm)
{
bool haveNoWork = this->Active.empty() && this->Inactive.empty();
@ -367,9 +438,11 @@ public:
//2. numLocalTerm + this->TotalNumberOfTerminatedParticles == this->TotalNumberOfParticles
//So, if neither are true, we can safely block and wait for communication to come in.
if (haveNoWork &&
(numLocalTerm + this->TotalNumTerminatedParticles < this->TotalNumParticles))
return true;
// if (this->Terminator.State == AdvectAlgorithmTerminator::AdvectAlgorithmTerminatorState::STATE_2)
// return true;
// if (haveNoWork && (numLocalTerm + this->TotalNumTerminatedParticles < this->TotalNumParticles))
// return true;
return false;
}
@ -388,9 +461,8 @@ public:
std::unordered_map<vtkm::Id, std::vector<vtkm::Id>> ParticleBlockIDsMap;
vtkm::Id Rank;
vtkm::FloatDefault StepSize;
vtkm::Id TotalNumParticles = 0;
vtkm::Id TotalNumTerminatedParticles = 0;
bool UseAsynchronousCommunication = true;
AdvectAlgorithmTerminator Terminator;
};
}

@ -39,7 +39,6 @@ public:
bool useAsyncComm)
: AdvectAlgorithm<DSIType>(bm, blocks, useAsyncComm)
, Done(false)
, WorkerActivate(false)
{
//For threaded algorithm, the particles go out of scope in the Work method.
//When this happens, they are destructed by the time the Manage thread gets them.
@ -50,8 +49,6 @@ public:
void Go() override
{
this->ComputeTotalNumParticles();
std::vector<std::thread> workerThreads;
workerThreads.emplace_back(std::thread(AdvectAlgorithmThreaded::Worker, this));
this->Manage();
@ -63,6 +60,13 @@ public:
}
protected:
bool HaveAnyWork()
{
std::lock_guard<std::mutex> lock(this->Mutex);
//We have work if there particles in any queues or a worker is busy.
return !this->Active.empty() || !this->Inactive.empty() || this->WorkerActivate;
}
bool GetActiveParticles(std::vector<ParticleType>& particles, vtkm::Id& blockId) override
{
std::lock_guard<std::mutex> lock(this->Mutex);
@ -144,38 +148,31 @@ protected:
vtkm::filter::flow::internal::ParticleMessenger<ParticleType> messenger(
this->Comm, useAsync, this->BoundsMap, 1, 128);
while (this->TotalNumTerminatedParticles < this->TotalNumParticles)
while (!this->Terminator.Done())
{
std::unordered_map<vtkm::Id, std::vector<DSIHelperInfo<ParticleType>>> workerResults;
this->GetWorkerResults(workerResults);
vtkm::Id numTerm = 0;
for (auto& it : workerResults)
{
for (auto& r : it.second)
numTerm += this->UpdateResult(r);
}
this->UpdateResult(r);
vtkm::Id numTermMessages = 0;
this->Communicate(messenger, numTerm, numTermMessages);
this->TotalNumTerminatedParticles += (numTerm + numTermMessages);
if (this->TotalNumTerminatedParticles > this->TotalNumParticles)
throw vtkm::cont::ErrorFilterExecution("Particle count error");
this->Communicate(messenger);
this->Terminator.Control(this->HaveAnyWork());
}
//Let the workers know that we are done.
this->SetDone();
}
bool GetBlockAndWait(const bool& syncComm, const vtkm::Id& numLocalTerm) override
bool GetBlockAndWait(const bool& syncComm) override
{
std::lock_guard<std::mutex> lock(this->Mutex);
if (this->Done)
return true;
return (this->AdvectAlgorithm<DSIType>::GetBlockAndWait(syncComm, numLocalTerm) &&
!this->WorkerActivate && this->WorkerResults.empty());
return (this->AdvectAlgorithm<DSIType>::GetBlockAndWait(syncComm) && !this->WorkerActivate &&
this->WorkerResults.empty());
}
void GetWorkerResults(
@ -193,7 +190,7 @@ protected:
std::atomic<bool> Done;
std::mutex Mutex;
bool WorkerActivate;
bool WorkerActivate = false;
std::condition_variable WorkerActivateCondition;
std::unordered_map<vtkm::Id, std::vector<DSIHelperInfo<ParticleType>>> WorkerResults;
};