Add proper enqueuing of Tokens for ArrayHandle

An issue that was identified for the thread safety of `ArrayHandle` is
that if several threads are waiting to use an `ArrayHandle`, there might
be an expectation of the order in which the operations happen. For
example, if one thread is modifying the contents of an `ArrayHandle` and
another is reading those results, we would need the first one to start
before the second one.

To solve this, a queue is added to `ArrayHandle` such that when waiting
to read or write an `ArrayHandle` the `Token` has to be at the top of
the queue in addition to other requirements being met.

Additionally, an `Enqueue` method is added to add a `Token` to the queue
without blocking. This allows a control thread to queue the access and
then spawn a thread where the actual work will be done. As long as
everything is enqueued on the main thread, the operations will happen in
the expected order.
This commit is contained in:
Kenneth Moreland 2020-06-08 12:25:07 -06:00
parent aeb94c0dd8
commit 99e14ab8a6
10 changed files with 849 additions and 294 deletions

@ -0,0 +1,93 @@
# Order asynchronous `ArrayHandle` access
The recent feature of [tokens that scope access to
`ArrayHandle`s](scoping-tokens.md) allows multiple threads to use the same
`ArrayHandle`s without read/write hazards. The intent is twofold. First, it
allows two separate threads in the control environment to independently
schedule tasks. Second, it allows us to move toward scheduling worklets and
other algorithms asynchronously.
However, there was a flaw with the original implementation. Once requests
to an `ArrayHandle` get queued up, they are resolved in arbitrary order.
This might mean that things run in surprising and incorrect order.
## Problematic use case
To demonstrate the flaw in the original implementation, let us consider a
future scenario where when you invoke a worklet (on OpenMP or TBB), the
call to invoke returns immediately and the actual work is scheduled
asynchronously. Now let us say we have a sequence of 3 worklets we wish to
run: `Worklet1`, `Worklet2`, and `Worklet3`. One of `Worklet1`'s parameters
is a `FieldOut` that creates an intermediate `ArrayHandle` that we will
simply call `array`. `Worklet2` is given `array` as a `FieldInOut` to
modify its values. Finally, `Worklet3` is given `array` as a `FieldIn`. It
is clear that for the computation to be correct, the three worklets _must_
execute in the correct order of `Worklet1`, `Worklet2`, and `Worklet3`.
The problem is that if `Worklet2` and `Worklet3` are both scheduled before
`Worklet1` finishes, the order they are executed could be arbitrary. Let us
say that `Worklet1` is invoked, and the invoke call returns before the
execution of `Worklet1` finishes.
The calling code immediately invokes `Worklet2`. Because `array` is already
locked by `Worklet1`, `Worklet2` does not execute right away. Instead, it
waits on a condition variable of `array` until it is free. But even though
the scheduling of `Worklet2` is blocked, the invoke returns because we are
scheduling asynchronously.
Likewise, the calling code then immediately calls invoke for `Worklet3`.
`Worklet3` similarly waits on the condition variable of `array` until it is
free.
Let us assume the likely event that both `Worklet2` and `Worklet3` get
scheduled before `Worklet1` finishes. When `Worklet1` then later does
finish, it's token relinquishes the lock on `array`, which wakes up the
threads waiting for access to `array`. However, there is no imposed order on
in what order the waiting threads will acquire the lock and run. (At least,
I'm not aware of anything imposing an order.) Thus, it is quite possible
that `Worklet3` will wake up first. It will see that `array` is no longer
locked (because `Worklet1` has released it and `Worklet2` has not had a
chance to claim it).
Oops. Now `Worklet3` is operating on `array` before `Worklet2` has had a
chance to put the correct values in it. The results will be wrong.
## Queuing requests
What we want is to impose the restriction that locks to an `ArrayHandle`
get resolved in the order that they are requested. In the previous example,
we have 3 requests on an array that happen in a known order. We want
control given to them in the same order.
To implement this, we need to impose another restriction on the
`condition_variable` when waiting to read or write. We want the lock to go
to the thread that first started waiting. To do this, we added an
internal queue of `Token`s to the `ArrayHandle`.
In `ArrayHandle::WaitToRead` and `ArrayHandle::WaitToWrite`, it first adds
its `Token` to the back of the queue before waiting on the condition
variable. In the `CanRead` and `CanWrite` methods, it checks this queue to
see if the provided `Token` is at the front. If not, then the lock is
denied and the thread must continue to wait.
## Early enqueuing
Another issue that can happen in the previous example is that as threads
are spawned for the 3 different worklets, they may actually start running
in an unexpected order. So the thread running `Worklet3` might actually
start before the other 2 and place itself in the queue first.
The solution is to add a method to `ArrayHandle` called `Enqueue`. This
method takes a `Token` object and adds that `Token` to the queue. However,
regardless of where the `Token` ends up on the queue, the method
immediately returns. It does not attempt to lock the `ArrayHandle`.
So now we can ensure that `Worklet1` properly locks `array` with this
sequence of events. First, the main thread calls `array.Enqueue`. Then a
thread is spawned to call `PrepareForOutput`.
Even if control returns to the calling code and it calls invoke for
`Worklet2` before this spawned thread starts, `Worklet2` cannot start
first. When `PrepareForInput` is called on `array`, it is queued after the
`Token` for `Worklet1`, even if `Worklet1` has not started waiting on the
`array`.

@ -30,6 +30,7 @@
#include <algorithm>
#include <iterator>
#include <list>
#include <memory>
#include <mutex>
#include <vector>
@ -400,39 +401,33 @@ public:
typename StorageType::PortalConstType GetPortalConstControl() const;
/// \endcond
/// \@{
/// \brief Get an array portal that can be used in the control environment.
///
/// The returned array can be used in the control environment to read values from the array. (It
/// is not possible to write to the returned portal. That is `Get` will work on the portal, but
/// `Set` will not.)
///
/// **Note:** The returned portal will prevent any writes or modifications to the array. To
/// ensure that the data pointed to by the portal is valid, this `ArrayHandle` will be locked to
/// any modifications while the portal remains in scope. (You can call `Detach` on the returned
/// portal to unlock the array. However, this will invalidate the portal.)
///
/// **Note:** The returned portal cannot be used in the execution environment. This is because
/// the portal will not work on some devices like GPUs. To get a portal that will work in the
/// execution environment, use `PrepareForInput`.
///
VTKM_CONT ReadPortalType ReadPortal() const;
/// \@}
/// \@{
/// \brief Get an array portal that can be used in the control environment.
///
/// The returned array can be used in the control environment to reand and write values to the
/// array.
///
/// **Note:** The returned portal will prevent any reads, writes, or modifications to the array.
/// To ensure that the data pointed to by the portal is valid, this `ArrayHandle` will be locked
/// to any modifications while the portal remains in scope. Also, to make sure that no reads get
/// out of sync, reads other than the returned portal are also blocked. (You can call `Detach` on
/// the returned portal to unlock the array. However, this will invalidate the portal.)
///
/// **Note:** The returned portal cannot be used in the execution environment. This is because
/// the portal will not work on some devices like GPUs. To get a portal that will work in the
/// execution environment, use `PrepareForInput`.
///
VTKM_CONT WritePortalType WritePortal() const;
/// \@}
/// Returns the number of entries in the array.
///
@ -454,14 +449,20 @@ public:
VTKM_CONT
void Allocate(vtkm::Id numberOfValues)
{
LockType lock = this->GetLock();
this->WaitToWrite(lock, vtkm::cont::Token{});
this->ReleaseResourcesExecutionInternal(lock);
this->Internals->GetControlArray(lock)->Allocate(numberOfValues);
// Set to false and then to true to ensure anything pointing to an array before the allocate
// is invalidated.
this->Internals->SetControlArrayValid(lock, false);
this->Internals->SetControlArrayValid(lock, true);
// A Token should not be declared within the scope of a lock. when the token goes out of scope
// it will attempt to aquire the lock, which is undefined behavior of the thread already has
// the lock.
vtkm::cont::Token token;
{
LockType lock = this->GetLock();
this->WaitToWrite(lock, token);
this->ReleaseResourcesExecutionInternal(lock, token);
this->Internals->GetControlArray(lock)->Allocate(numberOfValues);
// Set to false and then to true to ensure anything pointing to an array before the allocate
// is invalidated.
this->Internals->SetControlArrayValid(lock, false);
this->Internals->SetControlArrayValid(lock, true);
}
}
/// \brief Reduces the size of the array without changing its values.
@ -479,28 +480,40 @@ public:
///
VTKM_CONT void ReleaseResourcesExecution()
{
LockType lock = this->GetLock();
this->WaitToWrite(lock, vtkm::cont::Token{});
// A Token should not be declared within the scope of a lock. when the token goes out of scope
// it will attempt to aquire the lock, which is undefined behavior of the thread already has
// the lock.
vtkm::cont::Token token;
{
LockType lock = this->GetLock();
this->WaitToWrite(lock, token);
// Save any data in the execution environment by making sure it is synced
// with the control environment.
this->SyncControlArray(lock);
// Save any data in the execution environment by making sure it is synced
// with the control environment.
this->SyncControlArray(lock, token);
this->ReleaseResourcesExecutionInternal(lock);
this->ReleaseResourcesExecutionInternal(lock, token);
}
}
/// Releases all resources in both the control and execution environments.
///
VTKM_CONT void ReleaseResources()
{
LockType lock = this->GetLock();
this->ReleaseResourcesExecutionInternal(lock);
if (this->Internals->IsControlArrayValid(lock))
// A Token should not be declared within the scope of a lock. when the token goes out of scope
// it will attempt to aquire the lock, which is undefined behavior of the thread already has
// the lock.
vtkm::cont::Token token;
{
this->Internals->GetControlArray(lock)->ReleaseResources();
this->Internals->SetControlArrayValid(lock, false);
LockType lock = this->GetLock();
this->ReleaseResourcesExecutionInternal(lock, token);
if (this->Internals->IsControlArrayValid(lock))
{
this->Internals->GetControlArray(lock)->ReleaseResources();
this->Internals->SetControlArrayValid(lock, false);
}
}
}
@ -604,10 +617,39 @@ public:
///
VTKM_CONT void SyncControlArray() const
{
LockType lock = this->GetLock();
this->SyncControlArray(lock);
// A Token should not be declared within the scope of a lock. when the token goes out of scope
// it will attempt to aquire the lock, which is undefined behavior of the thread already has
// the lock.
vtkm::cont::Token token;
{
LockType lock = this->GetLock();
this->SyncControlArray(lock, token);
}
}
/// \brief Enqueue a token for access to this ArrayHandle.
///
/// This method places the given `Token` into the queue of `Token`s waiting for
/// access to this `ArrayHandle` and then returns immediately. When this token
/// is later used to get data from this `ArrayHandle` (for example, in a call to
/// `PrepareForInput`), it will use this place in the queue while waiting for
/// access.
///
/// This method is to be used to ensure that a set of accesses to an `ArrayHandle`
/// that happen on multiple threads occur in a specified order. For example, if
/// you spawn of a job to modify data in an `ArrayHandle` and then spawn off a job
/// that reads that same data, you need to make sure that the first job gets
/// access to the `ArrayHandle` before the second. If they both just attempt to call
/// their respective `Prepare` methods, there is no guarantee which order they
/// will occur. Having the spawning thread first call this method will ensure the order.
///
/// \warning After calling this method it is required to subsequently
/// call a method like one of the `Prepare` methods that attaches the token
/// to this `ArrayHandle`. Otherwise, the enqueued token will block any subsequent
/// access to the `ArrayHandle`, even if the `Token` is destroyed.
///
VTKM_CONT void Enqueue(const vtkm::cont::Token& token) const;
private:
/// Acquires a lock on the internals of this `ArrayHandle`. The calling
/// function should keep the returned lock and let it go out of scope
@ -617,47 +659,19 @@ private:
/// Returns true if read operations can currently be performed.
///
VTKM_CONT bool CanRead(const LockType& lock, const vtkm::cont::Token& token) const
{
return ((*this->Internals->GetWriteCount(lock) < 1) ||
(token.IsAttached(this->Internals->GetWriteCount(lock))));
}
VTKM_CONT bool CanRead(const LockType& lock, const vtkm::cont::Token& token) const;
//// Returns true if write operations can currently be performed.
///
VTKM_CONT bool CanWrite(const LockType& lock, const vtkm::cont::Token& token) const
{
return (((*this->Internals->GetWriteCount(lock) < 1) ||
(token.IsAttached(this->Internals->GetWriteCount(lock)))) &&
((*this->Internals->GetReadCount(lock) < 1) ||
((*this->Internals->GetReadCount(lock) == 1) &&
token.IsAttached(this->Internals->GetReadCount(lock)))));
}
VTKM_CONT bool CanWrite(const LockType& lock, const vtkm::cont::Token& token) const;
//// Will block the current thread until a read can be performed.
///
VTKM_CONT void WaitToRead(LockType& lock, const vtkm::cont::Token& token) const
{
// Note that if you deadlocked here, that means that you are trying to do a read operation on
// an array where an object is writing to it. This could happen on the same thread. For
// example, if you call `GetPortalControl()` then no other operation that can result in reading
// or writing data in the array can happen while the resulting portal is still in scope.
this->Internals->ConditionVariable.wait(
lock, [&lock, &token, this] { return this->CanRead(lock, token); });
}
VTKM_CONT void WaitToRead(LockType& lock, vtkm::cont::Token& token) const;
//// Will block the current thread until a write can be performed.
///
VTKM_CONT void WaitToWrite(LockType& lock, const vtkm::cont::Token& token) const
{
// Note that if you deadlocked here, that means that you are trying to do a write operation on
// an array where an object is reading or writing to it. This could happen on the same thread.
// For example, if you call `GetPortalControl()` then no other operation that can result in
// reading or writing data in the array can happen while the resulting portal is still in
// scope.
this->Internals->ConditionVariable.wait(
lock, [&lock, &token, this] { return this->CanWrite(lock, token); });
}
VTKM_CONT void WaitToWrite(LockType& lock, vtkm::cont::Token& token) const;
/// Gets this array handle ready to interact with the given device. If the
/// array handle has already interacted with this device, then this method
@ -665,7 +679,7 @@ private:
/// method is declared const because logically the data does not.
///
template <typename DeviceAdapterTag>
VTKM_CONT void PrepareForDevice(LockType& lock, DeviceAdapterTag) const;
VTKM_CONT void PrepareForDevice(LockType& lock, vtkm::cont::Token& token, DeviceAdapterTag) const;
/// Synchronizes the control array with the execution array. If either the
/// user array or control array is already valid, this method does nothing
@ -673,16 +687,16 @@ private:
/// Although the internal state of this class can change, the method is
/// declared const because logically the data does not.
///
VTKM_CONT void SyncControlArray(LockType& lock) const;
VTKM_CONT void SyncControlArray(LockType& lock, vtkm::cont::Token& token) const;
vtkm::Id GetNumberOfValues(LockType& lock) const;
VTKM_CONT
void ReleaseResourcesExecutionInternal(LockType& lock) const
void ReleaseResourcesExecutionInternal(LockType& lock, vtkm::cont::Token& token) const
{
if (this->Internals->IsExecutionArrayValid(lock))
{
this->WaitToWrite(lock, vtkm::cont::Token{});
this->WaitToWrite(lock, token);
// Note that it is possible that while waiting someone else deleted the execution array.
// That is why we check again.
}
@ -693,6 +707,8 @@ private:
}
}
VTKM_CONT void Enqueue(const LockType& lock, const vtkm::cont::Token& token) const;
class VTKM_ALWAYS_EXPORT InternalStruct
{
mutable StorageType ControlArray;
@ -704,6 +720,8 @@ private:
mutable vtkm::cont::Token::ReferenceCount ReadCount = 0;
mutable vtkm::cont::Token::ReferenceCount WriteCount = 0;
mutable std::list<vtkm::cont::Token::Reference> Queue;
VTKM_CONT void CheckLock(const LockType& lock) const
{
VTKM_ASSERT((lock.mutex() == &this->Mutex) && (lock.owns_lock()));
@ -812,6 +830,11 @@ private:
this->CheckLock(lock);
return &this->WriteCount;
}
VTKM_CONT std::list<vtkm::cont::Token::Reference>& GetQueue(const LockType& lock) const
{
this->CheckLock(lock);
return this->Queue;
}
};
VTKM_CONT

@ -86,55 +86,73 @@ ArrayHandle<T, S>& ArrayHandle<T, S>::operator=(ArrayHandle<T, S>&& src) noexcep
template <typename T, typename S>
typename ArrayHandle<T, S>::StorageType& ArrayHandle<T, S>::GetStorage()
{
LockType lock = this->GetLock();
// A Token should not be declared within the scope of a lock. when the token goes out of scope
// it will attempt to aquire the lock, which is undefined behavior of the thread already has
// the lock.
vtkm::cont::Token token;
{
LockType lock = this->GetLock();
this->SyncControlArray(lock);
if (this->Internals->IsControlArrayValid(lock))
{
return *this->Internals->GetControlArray(lock);
}
else
{
throw vtkm::cont::ErrorInternal(
"ArrayHandle::SyncControlArray did not make control array valid.");
this->SyncControlArray(lock, token);
if (this->Internals->IsControlArrayValid(lock))
{
return *this->Internals->GetControlArray(lock);
}
else
{
throw vtkm::cont::ErrorInternal(
"ArrayHandle::SyncControlArray did not make control array valid.");
}
}
}
template <typename T, typename S>
const typename ArrayHandle<T, S>::StorageType& ArrayHandle<T, S>::GetStorage() const
{
LockType lock = this->GetLock();
// A Token should not be declared within the scope of a lock. when the token goes out of scope
// it will attempt to aquire the lock, which is undefined behavior of the thread already has
// the lock.
vtkm::cont::Token token;
{
LockType lock = this->GetLock();
this->SyncControlArray(lock);
if (this->Internals->IsControlArrayValid(lock))
{
return *this->Internals->GetControlArray(lock);
}
else
{
throw vtkm::cont::ErrorInternal(
"ArrayHandle::SyncControlArray did not make control array valid.");
this->SyncControlArray(lock, token);
if (this->Internals->IsControlArrayValid(lock))
{
return *this->Internals->GetControlArray(lock);
}
else
{
throw vtkm::cont::ErrorInternal(
"ArrayHandle::SyncControlArray did not make control array valid.");
}
}
}
template <typename T, typename S>
typename ArrayHandle<T, S>::StorageType::PortalType ArrayHandle<T, S>::GetPortalControl()
{
LockType lock = this->GetLock();
// A Token should not be declared within the scope of a lock. when the token goes out of scope
// it will attempt to aquire the lock, which is undefined behavior of the thread already has
// the lock.
vtkm::cont::Token token;
{
LockType lock = this->GetLock();
this->SyncControlArray(lock);
if (this->Internals->IsControlArrayValid(lock))
{
// If the user writes into the iterator we return, then the execution
// array will become invalid. Play it safe and release the execution
// resources. (Use the const version to preserve the execution array.)
this->ReleaseResourcesExecutionInternal(lock);
return this->Internals->GetControlArray(lock)->GetPortal();
}
else
{
throw vtkm::cont::ErrorInternal(
"ArrayHandle::SyncControlArray did not make control array valid.");
this->SyncControlArray(lock, token);
if (this->Internals->IsControlArrayValid(lock))
{
// If the user writes into the iterator we return, then the execution
// array will become invalid. Play it safe and release the execution
// resources. (Use the const version to preserve the execution array.)
this->ReleaseResourcesExecutionInternal(lock, token);
return this->Internals->GetControlArray(lock)->GetPortal();
}
else
{
throw vtkm::cont::ErrorInternal(
"ArrayHandle::SyncControlArray did not make control array valid.");
}
}
}
@ -142,65 +160,77 @@ template <typename T, typename S>
typename ArrayHandle<T, S>::StorageType::PortalConstType ArrayHandle<T, S>::GetPortalConstControl()
const
{
LockType lock = this->GetLock();
// A Token should not be declared within the scope of a lock. when the token goes out of scope
// it will attempt to aquire the lock, which is undefined behavior of the thread already has
// the lock.
vtkm::cont::Token token;
{
LockType lock = this->GetLock();
this->SyncControlArray(lock);
if (this->Internals->IsControlArrayValid(lock))
{
return this->Internals->GetControlArray(lock)->GetPortalConst();
}
else
{
throw vtkm::cont::ErrorInternal(
"ArrayHandle::SyncControlArray did not make control array valid.");
this->SyncControlArray(lock, token);
if (this->Internals->IsControlArrayValid(lock))
{
return this->Internals->GetControlArray(lock)->GetPortalConst();
}
else
{
throw vtkm::cont::ErrorInternal(
"ArrayHandle::SyncControlArray did not make control array valid.");
}
}
}
template <typename T, typename S>
typename ArrayHandle<T, S>::ReadPortalType ArrayHandle<T, S>::ReadPortal() const
{
LockType lock = this->GetLock();
// A Token should not be declared within the scope of a lock. when the token goes out of scope
// it will attempt to aquire the lock, which is undefined behavior of the thread already has
// the lock.
vtkm::cont::Token token;
{
vtkm::cont::Token token;
LockType lock = this->GetLock();
this->WaitToRead(lock, token);
}
this->SyncControlArray(lock);
if (this->Internals->IsControlArrayValid(lock))
{
return ReadPortalType(this->Internals->GetControlArrayValidPointer(lock),
this->Internals->GetControlArray(lock)->GetPortalConst());
}
else
{
throw vtkm::cont::ErrorInternal(
"ArrayHandle::SyncControlArray did not make control array valid.");
this->SyncControlArray(lock, token);
if (this->Internals->IsControlArrayValid(lock))
{
return ReadPortalType(this->Internals->GetControlArrayValidPointer(lock),
this->Internals->GetControlArray(lock)->GetPortalConst());
}
else
{
throw vtkm::cont::ErrorInternal(
"ArrayHandle::SyncControlArray did not make control array valid.");
}
}
}
template <typename T, typename S>
typename ArrayHandle<T, S>::WritePortalType ArrayHandle<T, S>::WritePortal() const
{
LockType lock = this->GetLock();
// A Token should not be declared within the scope of a lock. when the token goes out of scope
// it will attempt to aquire the lock, which is undefined behavior of the thread already has
// the lock.
vtkm::cont::Token token;
{
vtkm::cont::Token token;
LockType lock = this->GetLock();
this->WaitToWrite(lock, token);
}
this->SyncControlArray(lock);
if (this->Internals->IsControlArrayValid(lock))
{
// If the user writes into the iterator we return, then the execution
// array will become invalid. Play it safe and release the execution
// resources. (Use the const version to preserve the execution array.)
this->ReleaseResourcesExecutionInternal(lock);
return WritePortalType(this->Internals->GetControlArrayValidPointer(lock),
this->Internals->GetControlArray(lock)->GetPortal());
}
else
{
throw vtkm::cont::ErrorInternal(
"ArrayHandle::SyncControlArray did not make control array valid.");
this->SyncControlArray(lock, token);
if (this->Internals->IsControlArrayValid(lock))
{
// If the user writes into the iterator we return, then the execution
// array will become invalid. Play it safe and release the execution
// resources. (Use the const version to preserve the execution array.)
this->ReleaseResourcesExecutionInternal(lock, token);
return WritePortalType(this->Internals->GetControlArrayValidPointer(lock),
this->Internals->GetControlArray(lock)->GetPortal());
}
else
{
throw vtkm::cont::ErrorInternal(
"ArrayHandle::SyncControlArray did not make control array valid.");
}
}
}
@ -226,6 +256,11 @@ void ArrayHandle<T, S>::Shrink(vtkm::Id numberOfValues)
{
VTKM_ASSERT(numberOfValues >= 0);
// A Token should not be declared within the scope of a lock. when the token goes out of scope
// it will attempt to aquire the lock, which is undefined behavior of the thread already has
// the lock.
vtkm::cont::Token token;
if (numberOfValues > 0)
{
LockType lock = this->GetLock();
@ -234,7 +269,7 @@ void ArrayHandle<T, S>::Shrink(vtkm::Id numberOfValues)
if (numberOfValues < originalNumberOfValues)
{
this->WaitToWrite(lock, vtkm::cont::Token{});
this->WaitToWrite(lock, token);
if (this->Internals->IsControlArrayValid(lock))
{
this->Internals->GetControlArray(lock)->Shrink(numberOfValues);
@ -282,15 +317,12 @@ ArrayHandle<T, S>::PrepareForInput(DeviceAdapterTag device, vtkm::cont::Token& t
this->Internals->SetControlArrayValid(lock, true);
}
this->PrepareForDevice(lock, device);
this->PrepareForDevice(lock, token, device);
auto portal = this->Internals->GetExecutionArray(lock)->PrepareForInput(
!this->Internals->IsExecutionArrayValid(lock), device, token);
this->Internals->SetExecutionArrayValid(lock, true);
token.Attach(
*this, this->Internals->GetReadCount(lock), lock, &this->Internals->ConditionVariable);
return portal;
}
@ -311,7 +343,7 @@ ArrayHandle<T, S>::PrepareForOutput(vtkm::Id numberOfValues,
// idea when shared with execution.
this->Internals->SetControlArrayValid(lock, false);
this->PrepareForDevice(lock, device);
this->PrepareForDevice(lock, token, device);
auto portal =
this->Internals->GetExecutionArray(lock)->PrepareForOutput(numberOfValues, device, token);
@ -326,9 +358,6 @@ ArrayHandle<T, S>::PrepareForOutput(vtkm::Id numberOfValues,
// assumption anyway.)
this->Internals->SetExecutionArrayValid(lock, true);
token.Attach(
*this, this->Internals->GetWriteCount(lock), lock, &this->Internals->ConditionVariable);
return portal;
}
@ -350,7 +379,7 @@ ArrayHandle<T, S>::PrepareForInPlace(DeviceAdapterTag device, vtkm::cont::Token&
this->Internals->SetControlArrayValid(lock, true);
}
this->PrepareForDevice(lock, device);
this->PrepareForDevice(lock, token, device);
auto portal = this->Internals->GetExecutionArray(lock)->PrepareForInPlace(
!this->Internals->IsExecutionArrayValid(lock), device, token);
@ -361,15 +390,14 @@ ArrayHandle<T, S>::PrepareForInPlace(DeviceAdapterTag device, vtkm::cont::Token&
// array. It may be shared as the execution array.
this->Internals->SetControlArrayValid(lock, false);
token.Attach(
*this, this->Internals->GetWriteCount(lock), lock, &this->Internals->ConditionVariable);
return portal;
}
template <typename T, typename S>
template <typename DeviceAdapterTag>
void ArrayHandle<T, S>::PrepareForDevice(LockType& lock, DeviceAdapterTag device) const
void ArrayHandle<T, S>::PrepareForDevice(LockType& lock,
vtkm::cont::Token& token,
DeviceAdapterTag device) const
{
if (this->Internals->GetExecutionArray(lock) != nullptr)
{
@ -389,8 +417,8 @@ void ArrayHandle<T, S>::PrepareForDevice(LockType& lock, DeviceAdapterTag device
// could change the ExecutionInterface, which would cause problems. In the future we should
// support multiple devices, in which case we would not have to delete one execution array
// to load another.
this->WaitToWrite(lock, vtkm::cont::Token{}); // Make sure no one is reading device array
this->SyncControlArray(lock);
this->WaitToWrite(lock, token); // Make sure no one is reading device array
this->SyncControlArray(lock, token);
// Need to change some state that does not change the logical state from
// an external point of view.
this->Internals->DeleteExecutionArray(lock);
@ -403,7 +431,7 @@ void ArrayHandle<T, S>::PrepareForDevice(LockType& lock, DeviceAdapterTag device
}
template <typename T, typename S>
void ArrayHandle<T, S>::SyncControlArray(LockType& lock) const
void ArrayHandle<T, S>::SyncControlArray(LockType& lock, vtkm::cont::Token& token) const
{
if (!this->Internals->IsControlArrayValid(lock))
{
@ -411,7 +439,7 @@ void ArrayHandle<T, S>::SyncControlArray(LockType& lock) const
// However, if we are here, that `Token` should not already be attached to this array.
// If it were, then there should be no reason to move data arround (unless the `Token`
// was used when preparing for multiple devices, which it should not be used like that).
this->WaitToRead(lock, vtkm::cont::Token{});
this->WaitToRead(lock, token);
// Need to change some state that does not change the logical state from
// an external point of view.
@ -431,6 +459,122 @@ void ArrayHandle<T, S>::SyncControlArray(LockType& lock) const
}
}
}
template <typename T, typename S>
bool ArrayHandle<T, S>::CanRead(const LockType& lock, const vtkm::cont::Token& token) const
{
// If the token is already attached to this array, then we allow reading.
if (token.IsAttached(this->Internals->GetWriteCount(lock)) ||
token.IsAttached(this->Internals->GetReadCount(lock)))
{
return true;
}
// If there is anyone else waiting at the top of the queue, we cannot access this array.
auto& queue = this->Internals->GetQueue(lock);
if (!queue.empty() && (queue.front() != token))
{
return false;
}
// No one else is waiting, so we can read the array as long as no one else is writing.
return (*this->Internals->GetWriteCount(lock) < 1);
}
template <typename T, typename S>
bool ArrayHandle<T, S>::CanWrite(const LockType& lock, const vtkm::cont::Token& token) const
{
// If the token is already attached to this array, then we allow writing.
if (token.IsAttached(this->Internals->GetWriteCount(lock)) ||
token.IsAttached(this->Internals->GetReadCount(lock)))
{
return true;
}
// If there is anyone else waiting at the top of the queue, we cannot access this array.
auto& queue = this->Internals->GetQueue(lock);
if (!queue.empty() && (queue.front() != token))
{
return false;
}
// No one else is waiting, so we can write the array as long as no one else is reading or writing.
return ((*this->Internals->GetWriteCount(lock) < 1) &&
(*this->Internals->GetReadCount(lock) < 1));
}
template <typename T, typename S>
void ArrayHandle<T, S>::WaitToRead(LockType& lock, vtkm::cont::Token& token) const
{
this->Enqueue(lock, token);
// Note that if you deadlocked here, that means that you are trying to do a read operation on an
// array where an object is writing to it.
this->Internals->ConditionVariable.wait(
lock, [&lock, &token, this] { return this->CanRead(lock, token); });
token.Attach(this->Internals,
this->Internals->GetReadCount(lock),
lock,
&this->Internals->ConditionVariable);
// We successfully attached the token. Pop it off the queue.
auto& queue = this->Internals->GetQueue(lock);
if (!queue.empty() && queue.front() == token)
{
queue.pop_front();
}
}
template <typename T, typename S>
void ArrayHandle<T, S>::WaitToWrite(LockType& lock, vtkm::cont::Token& token) const
{
this->Enqueue(lock, token);
// Note that if you deadlocked here, that means that you are trying to do a write operation on an
// array where an object is reading or writing to it.
this->Internals->ConditionVariable.wait(
lock, [&lock, &token, this] { return this->CanWrite(lock, token); });
token.Attach(this->Internals,
this->Internals->GetWriteCount(lock),
lock,
&this->Internals->ConditionVariable);
// We successfully attached the token. Pop it off the queue.
auto& queue = this->Internals->GetQueue(lock);
if (!queue.empty() && queue.front() == token)
{
queue.pop_front();
}
}
template <typename T, typename S>
void ArrayHandle<T, S>::Enqueue(const vtkm::cont::Token& token) const
{
LockType lock = this->GetLock();
this->Enqueue(lock, token);
}
template <typename T, typename S>
void ArrayHandle<T, S>::Enqueue(const LockType& lock, const vtkm::cont::Token& token) const
{
if (token.IsAttached(this->Internals->GetWriteCount(lock)) ||
token.IsAttached(this->Internals->GetReadCount(lock)))
{
// Do not need to enqueue if we are already attached.
return;
}
auto& queue = this->Internals->GetQueue(lock);
if (std::find(queue.begin(), queue.end(), token.GetReference()) != queue.end())
{
// This token is already in the queue.
return;
}
this->Internals->GetQueue(lock).push_back(token.GetReference());
}
}
} // vtkm::cont

@ -85,6 +85,16 @@ void vtkm::cont::Token::DetachFromAll()
heldReferences->clear();
}
vtkm::cont::Token::Reference vtkm::cont::Token::GetReference() const
{
if (!this->Internals)
{
this->Internals.reset(new InternalStruct);
}
return this->Internals.get();
}
void vtkm::cont::Token::Attach(std::unique_ptr<vtkm::cont::Token::ObjectReference>&& objectRef,
vtkm::cont::Token::ReferenceCount* referenceCountPointer,
std::unique_lock<std::mutex>& lock,

@ -35,7 +35,7 @@ namespace cont
class VTKM_CONT_EXPORT Token final
{
class InternalStruct;
std::unique_ptr<InternalStruct> Internals;
mutable std::unique_ptr<InternalStruct> Internals;
struct HeldReference;
@ -129,6 +129,38 @@ public:
///
VTKM_CONT bool IsAttached(vtkm::cont::Token::ReferenceCount* referenceCountPointer) const;
class Reference
{
friend Token;
const void* InternalsPointer;
VTKM_CONT Reference(const void* internalsPointer)
: InternalsPointer(internalsPointer)
{
}
public:
VTKM_CONT bool operator==(const Reference& rhs) const
{
return this->InternalsPointer == rhs.InternalsPointer;
}
VTKM_CONT bool operator!=(const Reference& rhs) const
{
return this->InternalsPointer != rhs.InternalsPointer;
}
};
/// \brief Returns a reference object to this `Token`.
///
/// `Token` objects cannot be copied and generally are not shared. However, there are cases
/// where you need to save a reference to a `Token` belonging to someone else so that it can
/// later be compared. Saving a pointer to a `Token` is not always safe because `Token`s can
/// be moved. To get around this problem, you can save a `Reference` to the `Token`. You
/// cannot use the `Reference` to manipulate the `Token` in any way (because you do not
/// own it). Rather, a `Reference` can just be used to compare to a `Token` object (or another
/// `Reference`).
///
VTKM_CONT Reference GetReference() const;
private:
VTKM_CONT void Attach(std::unique_ptr<vtkm::cont::Token::ObjectReference>&& objectReference,
vtkm::cont::Token::ReferenceCount* referenceCountPointer,
@ -138,6 +170,28 @@ private:
VTKM_CONT bool IsAttached(std::unique_lock<std::mutex>& lock,
vtkm::cont::Token::ReferenceCount* referenceCountPointer) const;
};
VTKM_CONT inline bool operator==(const vtkm::cont::Token& token,
const vtkm::cont::Token::Reference& ref)
{
return token.GetReference() == ref;
}
VTKM_CONT inline bool operator!=(const vtkm::cont::Token& token,
const vtkm::cont::Token::Reference& ref)
{
return token.GetReference() != ref;
}
VTKM_CONT inline bool operator==(const vtkm::cont::Token::Reference& ref,
const vtkm::cont::Token& token)
{
return ref == token.GetReference();
}
VTKM_CONT inline bool operator!=(const vtkm::cont::Token::Reference& ref,
const vtkm::cont::Token& token)
{
return ref != token.GetReference();
}
}
} // namespace vtkm::cont

@ -28,8 +28,8 @@
ArrayHandle<Type, StorageTagBasic>::ExecutionTypes<Device>::Portal \
ArrayHandle<Type, StorageTagBasic>::PrepareForInPlace(Device, vtkm::cont::Token&); \
extern template VTKM_CONT_TEMPLATE_EXPORT void \
ArrayHandle<Type, StorageTagBasic>::PrepareForDevice(std::unique_lock<std::mutex>&, Device) \
const;
ArrayHandle<Type, StorageTagBasic>::PrepareForDevice( \
std::unique_lock<std::mutex>&, vtkm::cont::Token&, Device) const;
#define VTKM_EXPORT_ARRAYHANDLE_FOR_DEVICE_ADAPTER(BasicType, Device) \
VTKM_EXPORT_ARRAYHANDLE_FOR_VALUE_TYPE_AND_DEVICE_ADAPTER(BasicType, Device) \
@ -69,7 +69,7 @@
template VTKM_CONT_EXPORT ArrayHandle<Type, StorageTagBasic>::ExecutionTypes<Device>::Portal \
ArrayHandle<Type, StorageTagBasic>::PrepareForInPlace(Device, vtkm::cont::Token&); \
template VTKM_CONT_EXPORT void ArrayHandle<Type, StorageTagBasic>::PrepareForDevice( \
std::unique_lock<std::mutex>&, Device) const;
std::unique_lock<std::mutex>&, vtkm::cont::Token&, Device) const;
#define VTKM_INSTANTIATE_ARRAYHANDLE_FOR_DEVICE_ADAPTER(BasicType, Device) \
VTKM_INSTANTIATE_ARRAYHANDLE_FOR_VALUE_TYPE_AND_DEVICE_ADAPTER(BasicType, Device) \

@ -95,10 +95,13 @@ vtkm::Id ArrayHandleImpl::GetNumberOfValues(const LockType& lock, vtkm::UInt64 s
}
}
void ArrayHandleImpl::Allocate(LockType& lock, vtkm::Id numberOfValues, vtkm::UInt64 sizeOfT)
void ArrayHandleImpl::Allocate(LockType& lock,
vtkm::cont::Token& token,
vtkm::Id numberOfValues,
vtkm::UInt64 sizeOfT)
{
this->WaitToWrite(lock, vtkm::cont::Token{});
this->ReleaseResourcesExecutionInternal(lock);
this->WaitToWrite(lock, token);
this->ReleaseResourcesExecutionInternal(lock, token);
this->Internals->GetControlArray(lock)->AllocateValues(numberOfValues, sizeOfT);
// Set to false and then to true to ensure anything pointing to an array before the allocate
// is invalidated.
@ -106,7 +109,10 @@ void ArrayHandleImpl::Allocate(LockType& lock, vtkm::Id numberOfValues, vtkm::UI
this->Internals->SetControlArrayValid(lock, true);
}
void ArrayHandleImpl::Shrink(LockType& lock, vtkm::Id numberOfValues, vtkm::UInt64 sizeOfT)
void ArrayHandleImpl::Shrink(LockType& lock,
vtkm::cont::Token& token,
vtkm::Id numberOfValues,
vtkm::UInt64 sizeOfT)
{
VTKM_ASSERT(numberOfValues >= 0);
@ -116,7 +122,7 @@ void ArrayHandleImpl::Shrink(LockType& lock, vtkm::Id numberOfValues, vtkm::UInt
if (numberOfValues < originalNumberOfValues)
{
this->WaitToWrite(lock, vtkm::cont::Token{});
this->WaitToWrite(lock, token);
if (this->Internals->IsControlArrayValid(lock))
{
this->Internals->GetControlArray(lock)->Shrink(numberOfValues);
@ -144,14 +150,14 @@ void ArrayHandleImpl::Shrink(LockType& lock, vtkm::Id numberOfValues, vtkm::UInt
// If we are shrinking to 0, there is nothing to save and we might as well
// free up memory. Plus, some storage classes expect that data will be
// deallocated when the size goes to zero.
this->Allocate(lock, 0, sizeOfT);
this->Allocate(lock, token, 0, sizeOfT);
}
}
void ArrayHandleImpl::ReleaseResources(LockType& lock)
void ArrayHandleImpl::ReleaseResources(LockType& lock, vtkm::cont::Token& token)
{
this->WaitToWrite(lock, vtkm::cont::Token{});
this->ReleaseResourcesExecutionInternal(lock);
this->WaitToWrite(lock, token);
this->ReleaseResourcesExecutionInternal(lock, token);
if (this->Internals->IsControlArrayValid(lock))
{
@ -192,11 +198,6 @@ void ArrayHandleImpl::PrepareForInput(LockType& lock,
this->Internals->GetControlArray(lock)->GetBasePointer(),
this->Internals->GetExecutionArray(lock),
numBytes);
token.Attach(this->Internals,
this->Internals->GetReadCount(lock),
lock,
&this->Internals->ConditionVariable);
}
void ArrayHandleImpl::PrepareForOutput(LockType& lock,
@ -221,11 +222,6 @@ void ArrayHandleImpl::PrepareForOutput(LockType& lock,
numBytes);
this->Internals->SetExecutionArrayValid(lock, true);
token.Attach(this->Internals,
this->Internals->GetWriteCount(lock),
lock,
&this->Internals->ConditionVariable);
}
void ArrayHandleImpl::PrepareForInPlace(LockType& lock,
@ -265,14 +261,10 @@ void ArrayHandleImpl::PrepareForInPlace(LockType& lock,
// Invalidate the control array, since we expect the values to be modified:
this->Internals->SetControlArrayValid(lock, false);
token.Attach(this->Internals,
this->Internals->GetWriteCount(lock),
lock,
&this->Internals->ConditionVariable);
}
bool ArrayHandleImpl::PrepareForDevice(LockType& lock,
vtkm::cont::Token& token,
DeviceAdapterId devId,
vtkm::UInt64 sizeOfT) const
{
@ -292,8 +284,8 @@ bool ArrayHandleImpl::PrepareForDevice(LockType& lock,
// could change the ExecutionInterface, which would cause problems. In the future we should
// support multiple devices, in which case we would not have to delete one execution array
// to load another.
this->WaitToWrite(lock, vtkm::cont::Token{}); // Make sure no one is reading device array
this->SyncControlArray(lock, sizeOfT);
this->WaitToWrite(lock, token); // Make sure no one is reading device array
this->SyncControlArray(lock, token, sizeOfT);
TypelessExecutionArray execArray = this->Internals->MakeTypelessExecutionArray(lock);
this->Internals->GetExecutionInterface(lock)->Free(execArray);
this->Internals->SetExecutionArrayValid(lock, false);
@ -313,7 +305,9 @@ DeviceAdapterId ArrayHandleImpl::GetDeviceAdapterId(const LockType& lock) const
}
void ArrayHandleImpl::SyncControlArray(LockType& lock, vtkm::UInt64 sizeOfT) const
void ArrayHandleImpl::SyncControlArray(LockType& lock,
vtkm::cont::Token& token,
vtkm::UInt64 sizeOfT) const
{
if (!this->Internals->IsControlArrayValid(lock))
{
@ -325,7 +319,7 @@ void ArrayHandleImpl::SyncControlArray(LockType& lock, vtkm::UInt64 sizeOfT) con
// However, if we are here, that `Token` should not already be attached to this array.
// If it were, then there should be no reason to move data arround (unless the `Token`
// was used when preparing for multiple devices, which it should not be used like that).
this->WaitToRead(lock, vtkm::cont::Token{});
this->WaitToRead(lock, token);
const vtkm::UInt64 numBytes =
static_cast<vtkm::UInt64>(static_cast<char*>(this->Internals->GetExecutionArrayEnd(lock)) -
static_cast<char*>(this->Internals->GetExecutionArray(lock)));
@ -349,11 +343,11 @@ void ArrayHandleImpl::SyncControlArray(LockType& lock, vtkm::UInt64 sizeOfT) con
}
}
void ArrayHandleImpl::ReleaseResourcesExecutionInternal(LockType& lock)
void ArrayHandleImpl::ReleaseResourcesExecutionInternal(LockType& lock, vtkm::cont::Token& token)
{
if (this->Internals->IsExecutionArrayValid(lock))
{
this->WaitToWrite(lock, vtkm::cont::Token{});
this->WaitToWrite(lock, token);
// Note that it is possible that while waiting someone else deleted the execution array.
// That is why we check again.
}
@ -367,37 +361,110 @@ void ArrayHandleImpl::ReleaseResourcesExecutionInternal(LockType& lock)
bool ArrayHandleImpl::CanRead(const LockType& lock, const vtkm::cont::Token& token) const
{
return ((*this->Internals->GetWriteCount(lock) < 1) ||
(token.IsAttached(this->Internals->GetWriteCount(lock))));
// If the token is already attached to this array, then we allow reading.
if (token.IsAttached(this->Internals->GetWriteCount(lock)) ||
token.IsAttached(this->Internals->GetReadCount(lock)))
{
return true;
}
// If there is anyone else waiting at the top of the queue, we cannot access this array.
auto& queue = this->Internals->GetQueue(lock);
if (!queue.empty() && (queue.front() != token))
{
return false;
}
// No one else is waiting, so we can read the array as long as no one else is writing.
return (*this->Internals->GetWriteCount(lock) < 1);
}
bool ArrayHandleImpl::CanWrite(const LockType& lock, const vtkm::cont::Token& token) const
{
return (((*this->Internals->GetWriteCount(lock) < 1) ||
(token.IsAttached(this->Internals->GetWriteCount(lock)))) &&
((*this->Internals->GetReadCount(lock) < 1) ||
((*this->Internals->GetReadCount(lock) == 1) &&
token.IsAttached(this->Internals->GetReadCount(lock)))));
// If the token is already attached to this array, then we allow writing.
if (token.IsAttached(this->Internals->GetWriteCount(lock)) ||
token.IsAttached(this->Internals->GetReadCount(lock)))
{
return true;
}
// If there is anyone else waiting at the top of the queue, we cannot access this array.
auto& queue = this->Internals->GetQueue(lock);
if (!queue.empty() && (queue.front() != token))
{
return false;
}
// No one else is waiting, so we can write the array as long as no one else is reading or writing.
return ((*this->Internals->GetWriteCount(lock) < 1) &&
(*this->Internals->GetReadCount(lock) < 1));
}
void ArrayHandleImpl::WaitToRead(LockType& lock, const vtkm::cont::Token& token) const
void ArrayHandleImpl::WaitToRead(LockType& lock, vtkm::cont::Token& token) const
{
this->Enqueue(lock, token);
// Note that if you deadlocked here, that means that you are trying to do a read operation on an
// array where an object is writing to it. This could happen on the same thread. For example, if
// you call `WritePortal()` then no other operation that can result in reading or writing
// data in the array can happen while the resulting portal is still in scope.
// array where an object is writing to it.
this->Internals->ConditionVariable.wait(
lock, [&lock, &token, this] { return this->CanRead(lock, token); });
token.Attach(this->Internals,
this->Internals->GetReadCount(lock),
lock,
&this->Internals->ConditionVariable);
// We successfully attached the token. Pop it off the queue.
auto& queue = this->Internals->GetQueue(lock);
if (!queue.empty() && queue.front() == token)
{
queue.pop_front();
// It might be the case that the next Token in the queue is also waiting. Wake up threads
// waiting on the condition variable so that they may also access this array.
this->Internals->ConditionVariable.notify_all();
}
}
void ArrayHandleImpl::WaitToWrite(LockType& lock, const vtkm::cont::Token& token) const
void ArrayHandleImpl::WaitToWrite(LockType& lock, vtkm::cont::Token& token) const
{
this->Enqueue(lock, token);
// Note that if you deadlocked here, that means that you are trying to do a write operation on an
// array where an object is reading or writing to it. This could happen on the same thread. For
// example, if you call `WritePortal()` then no other operation that can result in reading
// or writing data in the array can happen while the resulting portal is still in scope.
// array where an object is reading or writing to it.
this->Internals->ConditionVariable.wait(
lock, [&lock, &token, this] { return this->CanWrite(lock, token); });
token.Attach(this->Internals,
this->Internals->GetWriteCount(lock),
lock,
&this->Internals->ConditionVariable);
// We successfully attached the token. Pop it off the queue.
auto& queue = this->Internals->GetQueue(lock);
if (!queue.empty() && queue.front() == token)
{
queue.pop_front();
}
}
void ArrayHandleImpl::Enqueue(const LockType& lock, const vtkm::cont::Token& token) const
{
if (token.IsAttached(this->Internals->GetWriteCount(lock)) ||
token.IsAttached(this->Internals->GetReadCount(lock)))
{
// Do not need to enqueue if we are already attached.
return;
}
auto& queue = this->Internals->GetQueue(lock);
if (std::find(queue.begin(), queue.end(), token.GetReference()) != queue.end())
{
// This token is already in the queue.
return;
}
this->Internals->GetQueue(lock).push_back(token.GetReference());
}
} // end namespace internal

@ -159,12 +159,20 @@ struct VTKM_CONT_EXPORT ArrayHandleImpl
VTKM_CONT void CheckControlArrayValid(const LockType& lock) noexcept(false);
VTKM_CONT vtkm::Id GetNumberOfValues(const LockType& lock, vtkm::UInt64 sizeOfT) const;
VTKM_CONT void Allocate(LockType& lock, vtkm::Id numberOfValues, vtkm::UInt64 sizeOfT);
VTKM_CONT void Shrink(LockType& lock, vtkm::Id numberOfValues, vtkm::UInt64 sizeOfT);
VTKM_CONT void Allocate(LockType& lock,
vtkm::cont::Token& token,
vtkm::Id numberOfValues,
vtkm::UInt64 sizeOfT);
VTKM_CONT void Shrink(LockType& lock,
vtkm::cont::Token& token,
vtkm::Id numberOfValues,
vtkm::UInt64 sizeOfT);
VTKM_CONT void SyncControlArray(LockType& lock, vtkm::UInt64 sizeofT) const;
VTKM_CONT void ReleaseResources(LockType& lock);
VTKM_CONT void ReleaseResourcesExecutionInternal(LockType& lock);
VTKM_CONT void SyncControlArray(LockType& lock,
vtkm::cont::Token& token,
vtkm::UInt64 sizeofT) const;
VTKM_CONT void ReleaseResources(LockType& lock, vtkm::cont::Token& token);
VTKM_CONT void ReleaseResourcesExecutionInternal(LockType& lock, vtkm::cont::Token& token);
VTKM_CONT void PrepareForInput(LockType& lock,
vtkm::UInt64 sizeofT,
@ -180,6 +188,7 @@ struct VTKM_CONT_EXPORT ArrayHandleImpl
// ExecutionInterface instance.
// Returns true when the caller needs to reallocate ExecutionInterface
VTKM_CONT bool PrepareForDevice(LockType& lock,
vtkm::cont::Token& token,
DeviceAdapterId devId,
vtkm::UInt64 sizeofT) const;
@ -190,10 +199,14 @@ struct VTKM_CONT_EXPORT ArrayHandleImpl
//// Returns true if write operations can currently be performed.
VTKM_CONT bool CanWrite(const LockType& lock, const vtkm::cont::Token& token) const;
//// Will block the current thread until a read can be performed.
VTKM_CONT void WaitToRead(LockType& lock, const vtkm::cont::Token& token) const;
//// Will block the current thread until a write can be performed.
VTKM_CONT void WaitToWrite(LockType& lock, const vtkm::cont::Token& token) const;
/// Will block the current thread until a read can be performed.
/// The token will get attached to this read count.
VTKM_CONT void WaitToRead(LockType& lock, vtkm::cont::Token& token) const;
/// Will block the current thread until a write can be performed.
/// The token will get attached to this write count.
VTKM_CONT void WaitToWrite(LockType& lock, vtkm::cont::Token& token) const;
VTKM_CONT void Enqueue(const LockType& lock, const vtkm::cont::Token& token) const;
/// Acquires a lock on the internals of this `ArrayHandle`. The calling
/// function should keep the returned lock and let it go out of scope
@ -215,6 +228,8 @@ struct VTKM_CONT_EXPORT ArrayHandleImpl
mutable vtkm::cont::Token::ReferenceCount ReadCount = 0;
mutable vtkm::cont::Token::ReferenceCount WriteCount = 0;
mutable std::list<vtkm::cont::Token::Reference> Queue;
VTKM_CONT void CheckLock(const LockType& lock) const
{
VTKM_ASSERT((lock.mutex() == &this->Mutex) && (lock.owns_lock()));
@ -331,6 +346,11 @@ struct VTKM_CONT_EXPORT ArrayHandleImpl
this->CheckLock(lock);
return &this->WriteCount;
}
VTKM_CONT std::list<vtkm::cont::Token::Reference>& GetQueue(const LockType& lock) const
{
this->CheckLock(lock);
return this->Queue;
}
VTKM_CONT TypelessExecutionArray MakeTypelessExecutionArray(const LockType& lock);
};
@ -416,6 +436,9 @@ public:
typename StorageType::PortalConstType GetPortalConstControl() const;
/// @endcond
VTKM_CONT ReadPortalType ReadPortal(vtkm::cont::Token& token) const;
VTKM_CONT WritePortalType WritePortal(vtkm::cont::Token& token) const;
VTKM_CONT ReadPortalType ReadPortal() const;
VTKM_CONT WritePortalType WritePortal() const;
@ -463,17 +486,19 @@ public:
}
template <typename DeviceAdapterTag>
VTKM_CONT void PrepareForDevice(LockType& lock, DeviceAdapterTag) const;
VTKM_CONT void PrepareForDevice(LockType& lock, vtkm::cont::Token& token, DeviceAdapterTag) const;
VTKM_CONT DeviceAdapterId GetDeviceAdapterId() const;
VTKM_CONT void SyncControlArray() const;
VTKM_CONT void Enqueue(const vtkm::cont::Token& token) const;
std::shared_ptr<internal::ArrayHandleImpl> Internals;
private:
VTKM_CONT void SyncControlArray(LockType& lock) const;
VTKM_CONT void ReleaseResourcesExecutionInternal(LockType& lock) const;
VTKM_CONT void SyncControlArray(LockType& lock, vtkm::cont::Token& token) const;
VTKM_CONT void ReleaseResourcesExecutionInternal(LockType& lock, vtkm::cont::Token& token) const;
/// Acquires a lock on the internals of this `ArrayHandle`. The calling
/// function should keep the returned lock and let it go out of scope

@ -96,72 +96,93 @@ VTKM_CONT bool ArrayHandle<T, StorageTagBasic>::operator!=(const ArrayHandle<VT,
template <typename T>
typename ArrayHandle<T, StorageTagBasic>::StorageType& ArrayHandle<T, StorageTagBasic>::GetStorage()
{
LockType lock = this->GetLock();
this->SyncControlArray(lock);
this->Internals->CheckControlArrayValid(lock);
//CheckControlArrayValid will throw an exception if this->Internals->ControlArrayValid
//is not valid
// A Token should not be declared within the scope of a lock. when the token goes out of scope
// it will attempt to aquire the lock, which is undefined behavior of the thread already has
// the lock.
vtkm::cont::Token token;
{
LockType lock = this->GetLock();
this->SyncControlArray(lock, token);
this->Internals->CheckControlArrayValid(lock);
//CheckControlArrayValid will throw an exception if this->Internals->ControlArrayValid
//is not valid
return *(static_cast<StorageType*>(this->Internals->Internals->GetControlArray(lock)));
return *(static_cast<StorageType*>(this->Internals->Internals->GetControlArray(lock)));
}
}
template <typename T>
const typename ArrayHandle<T, StorageTagBasic>::StorageType&
ArrayHandle<T, StorageTagBasic>::GetStorage() const
{
LockType lock = this->GetLock();
this->SyncControlArray(lock);
this->Internals->CheckControlArrayValid(lock);
//CheckControlArrayValid will throw an exception if this->Internals->ControlArrayValid
//is not valid
// A Token should not be declared within the scope of a lock. when the token goes out of scope
// it will attempt to aquire the lock, which is undefined behavior of the thread already has
// the lock.
vtkm::cont::Token token;
{
LockType lock = this->GetLock();
this->SyncControlArray(lock, token);
this->Internals->CheckControlArrayValid(lock);
//CheckControlArrayValid will throw an exception if this->Internals->ControlArrayValid
//is not valid
return *(static_cast<const StorageType*>(this->Internals->Internals->GetControlArray(lock)));
return *(static_cast<const StorageType*>(this->Internals->Internals->GetControlArray(lock)));
}
}
template <typename T>
typename ArrayHandle<T, StorageTagBasic>::StorageType::PortalType
ArrayHandle<T, StorageTagBasic>::GetPortalControl()
{
LockType lock = this->GetLock();
this->SyncControlArray(lock);
this->Internals->CheckControlArrayValid(lock);
//CheckControlArrayValid will throw an exception if this->Internals->ControlArrayValid
//is not valid
// A Token should not be declared within the scope of a lock. when the token goes out of scope
// it will attempt to aquire the lock, which is undefined behavior of the thread already has
// the lock.
vtkm::cont::Token token;
{
LockType lock = this->GetLock();
this->SyncControlArray(lock, token);
this->Internals->CheckControlArrayValid(lock);
//CheckControlArrayValid will throw an exception if this->Internals->ControlArrayValid
//is not valid
// If the user writes into the iterator we return, then the execution
// array will become invalid. Play it safe and release the execution
// resources. (Use the const version to preserve the execution array.)
this->ReleaseResourcesExecutionInternal(lock);
StorageType* privStorage =
static_cast<StorageType*>(this->Internals->Internals->GetControlArray(lock));
return privStorage->GetPortal();
// If the user writes into the iterator we return, then the execution
// array will become invalid. Play it safe and release the execution
// resources. (Use the const version to preserve the execution array.)
this->ReleaseResourcesExecutionInternal(lock, token);
StorageType* privStorage =
static_cast<StorageType*>(this->Internals->Internals->GetControlArray(lock));
return privStorage->GetPortal();
}
}
template <typename T>
typename ArrayHandle<T, StorageTagBasic>::StorageType::PortalConstType
ArrayHandle<T, StorageTagBasic>::GetPortalConstControl() const
{
LockType lock = this->GetLock();
this->SyncControlArray(lock);
this->Internals->CheckControlArrayValid(lock);
//CheckControlArrayValid will throw an exception if this->Internals->ControlArrayValid
//is not valid
// A Token should not be declared within the scope of a lock. when the token goes out of scope
// it will attempt to aquire the lock, which is undefined behavior of the thread already has
// the lock.
vtkm::cont::Token token;
{
LockType lock = this->GetLock();
this->SyncControlArray(lock, token);
this->Internals->CheckControlArrayValid(lock);
//CheckControlArrayValid will throw an exception if this->Internals->ControlArrayValid
//is not valid
StorageType* privStorage =
static_cast<StorageType*>(this->Internals->Internals->GetControlArray(lock));
return privStorage->GetPortalConst();
StorageType* privStorage =
static_cast<StorageType*>(this->Internals->Internals->GetControlArray(lock));
return privStorage->GetPortalConst();
}
}
template <typename T>
typename ArrayHandle<T, StorageTagBasic>::ReadPortalType
ArrayHandle<T, StorageTagBasic>::ReadPortal() const
ArrayHandle<T, StorageTagBasic>::ReadPortal(vtkm::cont::Token& token) const
{
LockType lock = this->GetLock();
{
vtkm::cont::Token token;
this->Internals->WaitToRead(lock, token);
}
this->SyncControlArray(lock);
this->Internals->WaitToRead(lock, token);
this->SyncControlArray(lock, token);
this->Internals->CheckControlArrayValid(lock);
//CheckControlArrayValid will throw an exception if this->Internals->ControlArrayValid
//is not valid
@ -172,16 +193,21 @@ ArrayHandle<T, StorageTagBasic>::ReadPortal() const
this->Internals->Internals->GetControlArrayValidPointer(lock), privStorage->GetPortalConst());
}
template <typename T>
typename ArrayHandle<T, StorageTagBasic>::ReadPortalType
ArrayHandle<T, StorageTagBasic>::ReadPortal() const
{
vtkm::cont::Token token;
return this->ReadPortal(token);
}
template <typename T>
typename ArrayHandle<T, StorageTagBasic>::WritePortalType
ArrayHandle<T, StorageTagBasic>::WritePortal() const
ArrayHandle<T, StorageTagBasic>::WritePortal(vtkm::cont::Token& token) const
{
LockType lock = this->GetLock();
{
vtkm::cont::Token token;
this->Internals->WaitToWrite(lock, token);
}
this->SyncControlArray(lock);
this->Internals->WaitToWrite(lock, token);
this->SyncControlArray(lock, token);
this->Internals->CheckControlArrayValid(lock);
//CheckControlArrayValid will throw an exception if this->Internals->ControlArrayValid
//is not valid
@ -189,13 +215,21 @@ ArrayHandle<T, StorageTagBasic>::WritePortal() const
// If the user writes into the iterator we return, then the execution
// array will become invalid. Play it safe and release the execution
// resources. (Use the const version to preserve the execution array.)
this->ReleaseResourcesExecutionInternal(lock);
this->ReleaseResourcesExecutionInternal(lock, token);
StorageType* privStorage =
static_cast<StorageType*>(this->Internals->Internals->GetControlArray(lock));
return ArrayHandle<T, StorageTagBasic>::WritePortalType(
this->Internals->Internals->GetControlArrayValidPointer(lock), privStorage->GetPortal());
}
template <typename T>
typename ArrayHandle<T, StorageTagBasic>::WritePortalType
ArrayHandle<T, StorageTagBasic>::WritePortal() const
{
vtkm::cont::Token token;
return this->WritePortal(token);
}
template <typename T>
vtkm::Id ArrayHandle<T, StorageTagBasic>::GetNumberOfValues() const
{
@ -206,32 +240,56 @@ vtkm::Id ArrayHandle<T, StorageTagBasic>::GetNumberOfValues() const
template <typename T>
void ArrayHandle<T, StorageTagBasic>::Allocate(vtkm::Id numberOfValues)
{
LockType lock = this->GetLock();
this->Internals->Allocate(lock, numberOfValues, sizeof(T));
// A Token should not be declared within the scope of a lock. when the token goes out of scope
// it will attempt to aquire the lock, which is undefined behavior of the thread already has
// the lock.
vtkm::cont::Token token;
{
LockType lock = this->GetLock();
this->Internals->Allocate(lock, token, numberOfValues, sizeof(T));
}
}
template <typename T>
void ArrayHandle<T, StorageTagBasic>::Shrink(vtkm::Id numberOfValues)
{
LockType lock = this->GetLock();
this->Internals->Shrink(lock, numberOfValues, sizeof(T));
// A Token should not be declared within the scope of a lock. when the token goes out of scope
// it will attempt to aquire the lock, which is undefined behavior of the thread already has
// the lock.
vtkm::cont::Token token;
{
LockType lock = this->GetLock();
this->Internals->Shrink(lock, token, numberOfValues, sizeof(T));
}
}
template <typename T>
void ArrayHandle<T, StorageTagBasic>::ReleaseResourcesExecution()
{
LockType lock = this->GetLock();
// Save any data in the execution environment by making sure it is synced
// with the control environment.
this->SyncControlArray(lock);
this->Internals->ReleaseResourcesExecutionInternal(lock);
// A Token should not be declared within the scope of a lock. when the token goes out of scope
// it will attempt to aquire the lock, which is undefined behavior of the thread already has
// the lock.
vtkm::cont::Token token;
{
LockType lock = this->GetLock();
// Save any data in the execution environment by making sure it is synced
// with the control environment.
this->SyncControlArray(lock, token);
this->Internals->ReleaseResourcesExecutionInternal(lock, token);
}
}
template <typename T>
void ArrayHandle<T, StorageTagBasic>::ReleaseResources()
{
LockType lock = this->GetLock();
this->Internals->ReleaseResources(lock);
// A Token should not be declared within the scope of a lock. when the token goes out of scope
// it will attempt to aquire the lock, which is undefined behavior of the thread already has
// the lock.
vtkm::cont::Token token;
{
LockType lock = this->GetLock();
this->Internals->ReleaseResources(lock, token);
}
}
template <typename T>
@ -242,7 +300,7 @@ ArrayHandle<T, StorageTagBasic>::PrepareForInput(DeviceAdapterTag device,
{
VTKM_IS_DEVICE_ADAPTER_TAG(DeviceAdapterTag);
LockType lock = this->GetLock();
this->PrepareForDevice(lock, device);
this->PrepareForDevice(lock, token, device);
this->Internals->PrepareForInput(lock, sizeof(T), token);
return PortalFactory<DeviceAdapterTag>::CreatePortalConst(
@ -259,7 +317,7 @@ ArrayHandle<T, StorageTagBasic>::PrepareForOutput(vtkm::Id numVals,
{
VTKM_IS_DEVICE_ADAPTER_TAG(DeviceAdapterTag);
LockType lock = this->GetLock();
this->PrepareForDevice(lock, device);
this->PrepareForDevice(lock, token, device);
this->Internals->PrepareForOutput(lock, numVals, sizeof(T), token);
return PortalFactory<DeviceAdapterTag>::CreatePortal(
@ -275,7 +333,7 @@ ArrayHandle<T, StorageTagBasic>::PrepareForInPlace(DeviceAdapterTag device,
{
VTKM_IS_DEVICE_ADAPTER_TAG(DeviceAdapterTag);
LockType lock = this->GetLock();
this->PrepareForDevice(lock, device);
this->PrepareForDevice(lock, token, device);
this->Internals->PrepareForInPlace(lock, sizeof(T), token);
return PortalFactory<DeviceAdapterTag>::CreatePortal(
@ -286,9 +344,10 @@ ArrayHandle<T, StorageTagBasic>::PrepareForInPlace(DeviceAdapterTag device,
template <typename T>
template <typename DeviceAdapterTag>
void ArrayHandle<T, StorageTagBasic>::PrepareForDevice(LockType& lock,
vtkm::cont::Token& token,
DeviceAdapterTag device) const
{
bool needToRealloc = this->Internals->PrepareForDevice(lock, device, sizeof(T));
bool needToRealloc = this->Internals->PrepareForDevice(lock, token, device, sizeof(T));
if (needToRealloc)
{
this->Internals->Internals->SetExecutionInterface(
@ -307,21 +366,37 @@ DeviceAdapterId ArrayHandle<T, StorageTagBasic>::GetDeviceAdapterId() const
template <typename T>
void ArrayHandle<T, StorageTagBasic>::SyncControlArray() const
{
// A Token should not be declared within the scope of a lock. when the token goes out of scope
// it will attempt to aquire the lock, which is undefined behavior of the thread already has
// the lock.
vtkm::cont::Token token;
{
LockType lock = this->GetLock();
this->SyncControlArray(lock, token);
}
}
template <typename T>
void ArrayHandle<T, StorageTagBasic>::SyncControlArray(LockType& lock,
vtkm::cont::Token& token) const
{
this->Internals->SyncControlArray(lock, token, sizeof(T));
}
template <typename T>
void ArrayHandle<T, StorageTagBasic>::Enqueue(const vtkm::cont::Token& token) const
{
LockType lock = this->GetLock();
this->Internals->SyncControlArray(lock, sizeof(T));
this->Internals->Enqueue(lock, token);
}
template <typename T>
void ArrayHandle<T, StorageTagBasic>::SyncControlArray(LockType& lock) const
void ArrayHandle<T, StorageTagBasic>::ReleaseResourcesExecutionInternal(
LockType& lock,
vtkm::cont::Token& token) const
{
this->Internals->SyncControlArray(lock, sizeof(T));
}
template <typename T>
void ArrayHandle<T, StorageTagBasic>::ReleaseResourcesExecutionInternal(LockType& lock) const
{
this->Internals->ReleaseResourcesExecutionInternal(lock);
this->Internals->ReleaseResourcesExecutionInternal(lock, token);
}
}
} // end namespace vtkm::cont

@ -18,7 +18,9 @@
#include <vtkm/cont/testing/Testing.h>
#include <array>
#include <chrono>
#include <future>
#include <thread>
namespace
{
@ -35,7 +37,7 @@ bool IncrementArray(vtkm::cont::ArrayHandle<ValueType, Storage> array)
auto portal = array.PrepareForInPlace(vtkm::cont::DeviceAdapterTagSerial{}, token);
if (portal.GetNumberOfValues() != ARRAY_SIZE)
{
std::cout << "!!!!! Wrong array size: " << portal.GetNumberOfValues();
std::cout << "!!!!! Wrong array size: " << portal.GetNumberOfValues() << std::endl;
return false;
}
@ -54,6 +56,42 @@ bool IncrementArray(vtkm::cont::ArrayHandle<ValueType, Storage> array)
return true;
}
template <typename Storage>
bool IncrementArrayOrdered(vtkm::cont::ArrayHandle<ValueType, Storage> array,
vtkm::cont::Token&& token_,
std::size_t threadNum)
{
// Make sure the Token is moved to the proper scope.
vtkm::cont::Token token = std::move(token_);
// Sleep for a bit to make sure that threads at the end wait for threads before that
// are sleeping.
std::this_thread::sleep_for(
std::chrono::milliseconds(10 * static_cast<long long>(NUM_THREADS - threadNum)));
auto portal = array.PrepareForInPlace(vtkm::cont::DeviceAdapterTagSerial{}, token);
if (portal.GetNumberOfValues() != ARRAY_SIZE)
{
std::cout << "!!!!! Wrong array size: " << portal.GetNumberOfValues() << std::endl;
return false;
}
for (vtkm::Id index = 0; index < ARRAY_SIZE; ++index)
{
ValueType value = portal.Get(index);
ValueType base = TestValue(index, ValueType{});
if (!test_equal(value, base + static_cast<ValueType>(threadNum)))
{
std::cout << "!!!!! Unexpected value in array: " << value << std::endl;
std::cout << "!!!!! ArrayHandle access likely out of order." << std::endl;
return false;
}
portal.Set(index, value + 1);
}
return true;
}
template <typename Storage>
bool CheckArray(vtkm::cont::ArrayHandle<ValueType, Storage> array)
{
@ -61,7 +99,7 @@ bool CheckArray(vtkm::cont::ArrayHandle<ValueType, Storage> array)
auto portal = array.PrepareForInput(vtkm::cont::DeviceAdapterTagSerial{}, token);
if (portal.GetNumberOfValues() != ARRAY_SIZE)
{
std::cout << "!!!!! Wrong array size: " << portal.GetNumberOfValues();
std::cout << "!!!!! Wrong array size: " << portal.GetNumberOfValues() << std::endl;
return false;
}
@ -86,7 +124,7 @@ bool DecrementArray(vtkm::cont::ArrayHandle<ValueType, Storage> array)
auto portal = array.PrepareForInPlace(vtkm::cont::DeviceAdapterTagSerial{}, token);
if (portal.GetNumberOfValues() != ARRAY_SIZE)
{
std::cout << "!!!!! Wrong array size: " << portal.GetNumberOfValues();
std::cout << "!!!!! Wrong array size: " << portal.GetNumberOfValues() << std::endl;
return false;
}
@ -180,6 +218,31 @@ void ThreadsDecrementArray(vtkm::cont::ArrayHandle<ValueType, Storage> array)
CheckPortal(array.ReadPortal());
}
template <typename Storage>
void ThreadsIncrementToArrayOrdered(vtkm::cont::ArrayHandle<ValueType, Storage> array)
{
VTKM_TEST_ASSERT(array.GetNumberOfValues() == ARRAY_SIZE);
SetPortal(array.WritePortal());
std::cout << " Starting ordered write threads" << std::endl;
std::array<decltype(std::async(std::launch::async, IncrementArray<Storage>, array)), NUM_THREADS>
futures;
for (std::size_t index = 0; index < NUM_THREADS; ++index)
{
vtkm::cont::Token token;
array.Enqueue(token);
futures[index] = std::async(
std::launch::async, IncrementArrayOrdered<Storage>, array, std::move(token), index);
}
std::cout << " Wait for threads to complete" << std::endl;
for (std::size_t index = 0; index < NUM_THREADS; ++index)
{
bool futureResult = futures[index].get();
VTKM_TEST_ASSERT(futureResult, "Failure in IncrementArray");
}
}
template <typename Storage>
void InvalidateControlPortal(vtkm::cont::ArrayHandle<ValueType, Storage> array)
{
@ -219,6 +282,7 @@ void DoThreadSafetyTest(vtkm::cont::ArrayHandle<ValueType, Storage> array)
ThreadsIncrementToArray(array);
ThreadsCheckArray(array);
ThreadsDecrementArray(array);
ThreadsIncrementToArrayOrdered(array);
InvalidateControlPortal(array);
}