// ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // WorkStealingQueue.h // // Header file containing the core implementation of the work stealing data structures and algorithms. // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- #pragma once #define AFFINITY_EXECUTED 0x1 #define IS_AFFINITIZED_TASK(t) ((ULONG_PTR)t & 0x1) #define STRIP_AFFINITY_INDICATOR(type, t) ((type *)((ULONG_PTR)t & ~((ULONG_PTR)0x1))) #define ADD_AFFINITY_INDICATOR(type, t) ((type *)((ULONG_PTR)(t) | 0x1)) namespace Concurrency { namespace details { /// /// A WorkStealingQueue is a wait-free, lock-free structure associated with a single /// thread that can Push and Pop elements. Other threads can do Steal operations /// on the other end of the WorkStealingQueue with little contention. /// template class WorkStealingQueue { // A 'WorkStealingQueue' always runs its code in a single OS thread. We call this the // 'bound' thread. Only the code in the Steal operation can be executed by // other 'foreign' threads that try to steal work. // // The queue is implemented as an array. The m_head and m_tail index this // array. To avoid copying elements, the m_head and m_tail index the array modulo // the size of the array. By making this a power of two, we can use a cheap // bit-and operation to take the modulus. The "m_mask" is always equal to the // size of the task array minus one (where the size is a power of two). // // The m_head and m_tail are volatile as they can be updated from different OS threads. // The "m_head" is only updated by foreign threads as they Steal a task from // this queue. By putting a lock in Steal, there is at most one foreign thread // changing m_head at a time. The m_tail is only updated by the bound thread. // // invariants: // tasks.length is a power of 2 // m_mask == tasks.length-1 // m_head is only written to by foreign threads // m_tail is only written to by the bound thread // At most one foreign thread can do a Steal // All methods except Steal are executed from a single bound thread // m_tail points to the first unused location // // This work stealing implementation also supports the notion of out-of-order waiting // and out-of-order removal from the bound thread given that it is initialized to do so. // There is additional cost to performing this. // public: /// /// The callback for a sweep of the workstealing queue. This will be called under the stealing lock on the owning thread /// for every chore matching a predefined predicate. If true is returned, the item is pulled from the WSQ. If false is returned, /// the item stays in the WSQ. /// typedef bool (*SweepFunction)(T *pObject, void *pData); /// /// A predicate for a WSQ sweep. /// typedef bool (*SweepPredicate)(T *pObject, void *pData); /// /// Constructs a new work stealing queue /// WorkStealingQueue(LOCK *pLock) : m_pLock(pLock) { ASSERT(m_pLock != NULL); ASSERT(s_initialSize > 1); Reinitialize(); m_mask = s_initialSize - 1; m_pTasks = _concrt_new T*[s_initialSize]; m_pSlots = _concrt_new typename Mailbox::Slot[s_initialSize]; } /// /// Reinitializes a workqueue to the state just after construction. This is used when recycling a work /// queue from its ListArray /// void Reinitialize() { m_head = 0; m_tail = 0; m_detachmentTail = 0; m_fMarkedForDetachment = false; m_cookieBase = 0; } /// /// Unlocked count /// int Count() const { return (m_tail - m_head); } /// /// Unlocked check if empty /// bool Empty() const { return (m_tail - m_head <= 0); } /// /// Check whether to skip the steal /// bool MarkedForDetachment() const { return m_fMarkedForDetachment; } // // Push/Pop and Steal can be executed interleaved. In particular: // 1) A steal and pop should be careful when there is just one element // in the queue. This is done by first incrementing the m_head/decrementing the m_tail // and than checking if it interleaved (m_head > m_tail). // 2) A push and steal can interleave in the sense that a push can overwrite the // value that is just stolen. To account for this, we check conservatively in // the push to assume that the size is one less than it actually is. // /// /// Attempts to steal the oldest element in the queue. This handles potential interleaving with both /// a Pop and TryPop operation. /// T* UnlockedSteal(bool fForceStealLocalized, bool = false) { T* pResult = NULL; for (;;) { // // increment the m_head. Save in local h for efficiency // int h = m_head; InterlockedExchange((volatile LONG*)&m_head, h + 1); // // insert a memory fence here -- memory may not be sequentially consistent // if (0 < m_tail - h) { // // Do not allow a steal from this work stealing queue if the bottom task was mailed to a location which has active searchers. // This will not block finalization in any way as the last pass SFW will pull the task out of the mailbox regardless of affinity. // We should be careful not to do this if the current context's virtual processor is in the affinity set of the segment. // If not, there could be cases where all virtual processors deactivate, but the scheduler does not shutdown since there are chores // in the queue, even if they have been dequeued via the mailbox. // if(IS_AFFINITIZED_TASK(m_pTasks[h & m_mask])) { if (!fForceStealLocalized && m_pSlots[h & m_mask].DeferToAffineSearchers()) { // // Skip this workqueue if there are affine searchers and we are not one of them. // m_head = h; return NULL; } } // // If the queue is detached and we've crossed the point of detachment, end the detachment marker. // if (m_fMarkedForDetachment && 0 >= m_detachmentTail - m_head) m_fMarkedForDetachment = false; // // == (h+1 <= m_tail) == (m_head <= m_tail) // // When we allow out-of-order waits, it's entirely possible that a TryPop // executing on the bound thread will grab this out from underneath us. Not // only do we need guards against interleave with ordered pop, but we also // need a guard against an out-of-order trypop. // pResult = reinterpret_cast (InterlockedExchangePointer( reinterpret_cast( &(m_pTasks[h & m_mask])), (PVOID) NULL )); if (pResult != NULL) { // // If the task had an associated affinity, we must deal with the possibility that it was already executed by a virtual // processor to which it was affine. // if (IS_AFFINITIZED_TASK(pResult)) { pResult = STRIP_AFFINITY_INDICATOR(T, pResult); // // If the task was already executed via a mailbox dequeue, move on and try to steal again. // if (!m_pSlots[h & m_mask].Claim()) { pResult = NULL; continue; } } break; } } else { // // failure: either empty or single element interleaving with pop // m_head = h; // restore the m_head break; } } return pResult; } // only used in a test T* Steal(bool fForceStealLocalized = false) { typename LOCK::_Scoped_lock lockHolder(*m_pLock); return UnlockedSteal(fForceStealLocalized); } /// /// Attempts to pop the newest element on the work stealing queue. It may return NULL if there is no such /// item (either unbalanced push/pop, a chore stolen) /// T* Pop() { T* pResult = NULL; int t; for(;;) { // // decrement the m_tail. Use local t for efficiency. // t = m_tail - 1; InterlockedExchange((volatile LONG*)&m_tail, t); // // insert a memory fence here (InterlockedExchange does the job) -- // memory may not be sequentially consistent // if (0 <= t - m_head) { // // == (m_head <= m_tail) // pResult = m_pTasks[t & m_mask]; // // Out of order TryPops on the bound thread will set this without // the need for a fence. // if (pResult == NULL) continue; break; } else { // // failure: either empty or single element interleaving with steal // m_tail = t + 1; // restore the m_tail return SyncPop(); // do a single-threaded pop } } if (IS_AFFINITIZED_TASK(pResult)) { pResult = STRIP_AFFINITY_INDICATOR(T, pResult); // // If the task was already executed via a mailbox dequeue, return an indication to the caller. // if (!m_pSlots[t & m_mask].Claim()) { return (T*)AFFINITY_EXECUTED; } } return pResult; } /// /// Tries to pop a previously pushed element from the work stealing queue. Note that this executes /// a potentially out-of-order wait. /// /// /// The value returned from a Push() call for the work in question /// T* TryPop(int cookie) { cookie = (cookie - m_cookieBase); // // TryPop() has Pop() semantics if we try the topmost element. We only need to do something // "special" in the out of order case. // if (cookie == m_tail - 1) return Pop(); if (cookie - m_tail >= 0 || cookie - m_head < 0) return NULL; T* pResult = reinterpret_cast (InterlockedExchangePointer( reinterpret_cast( &(m_pTasks[cookie & m_mask])), (PVOID) NULL )); if (IS_AFFINITIZED_TASK(pResult)) { pResult = STRIP_AFFINITY_INDICATOR(T, pResult); // // If the task was already executed via a mailbox dequeue, return an indication to the caller. // if (!m_pSlots[cookie & m_mask].Claim()) { return (T*)AFFINITY_EXECUTED; } } return pResult; } /// /// Pushes an element onto the work stealing queue. The returned cookie can be utilized to identify /// the work item for a future TryPop() call. Note that the returned cookie is only valid until a Pop() /// or TryPop() call removes the work in question. /// int Push(T* elem, typename ::Concurrency::details::Mailbox::Slot affinitySlot) { int t = m_tail; // // Careful here since we might interleave with Steal. // This is no problem since we just conservatively check if there is // enough space left (t < m_head + size). However, Steal might just have // incremented m_head and we could potentially overwrite the old m_head // entry, so we always leave at least one extra 'buffer' element and // check (m_tail < m_head + size - 1). This also plays nicely with our // initial m_mask of 0, where size is 2^0 == 1, but the tasks array is // still null. // if (0 < m_head + m_mask - t) // == t < m_head + size - 1 { if (!affinitySlot.IsEmpty()) { // // Flag the element as affinitized. On popping this element, the box slot must be cleared to prevent // multiple execution. // m_pSlots[t & m_mask] = affinitySlot; elem = ADD_AFFINITY_INDICATOR(T, elem); } m_pTasks[t & m_mask] = elem; // Only increment once we have initialized the task entry. This is a volatile write and has release semantics on weaker memory models m_tail = t + 1; return t + m_cookieBase; } else { // // failure: we need to resize or re-index // return SyncPush(elem, affinitySlot); } } /// /// Pushes an element onto the work stealing queue. The returned cookie can be utilized to identify /// the work item for a future TryPop() call. Note that the returned cookie is only valid until a Pop() /// or TryPop() call removes the work in question. /// int Push(T* elem) { int t = m_tail; // // Careful here since we might interleave with Steal. // This is no problem since we just conservatively check if there is // enough space left (t < m_head + size). However, Steal might just have // incremented m_head and we could potentially overwrite the old m_head // entry, so we always leave at least one extra 'buffer' element and // check (m_tail < m_head + size - 1). This also plays nicely with our // initial m_mask of 0, where size is 2^0 == 1, but the tasks array is // still null. // if (0 < m_head + m_mask - t) // == t < m_head + size - 1 { m_pTasks[t & m_mask] = elem; // Only increment once we have initialized the task entry. This is a volatile write and has release semantics on weaker memory models m_tail = t + 1; return t + m_cookieBase; } else { // // failure: we need to resize or re-index // return SyncPush(elem, Mailbox::Slot()); } } /// /// Only called from the bound thread, this sweeps the work stealing queue under the steal lock for any chores matching the /// specified predicate. /// void Sweep(SweepPredicate pPredicate, void *pData, SweepFunction pSweepFn) { typename LOCK::_Scoped_lock lockHolder(*m_pLock); int nt = m_tail; int t = m_tail - 1; while (t - m_head >= 0) { T* pResult = m_pTasks[t & m_mask]; if (pResult != NULL) { if (pPredicate(pResult, pData)) { if (pSweepFn(pResult, pData)) { // // If it's atop the WSQ, just decrement the tail (nt == new tail); otherwise, // make sure to NULL out the entry to indicate an out-of-order rip. // if (t + 1 == nt) nt--; else m_pTasks[t & m_mask] = NULL; } } } t--; } InterlockedExchange((volatile LONG *)&m_tail, nt); } /// /// Marks the work stealing queue as detached. The current head pointer marks the end point of detachment. Note /// that this should only be called when there is a guarantee of no concurrent pushes or pops from the owning thread. /// void MarkDetachment() { typename LOCK::_Scoped_lock lockHolder(*m_pLock); m_fMarkedForDetachment = true; m_detachmentTail = m_tail; } /// /// Destroys a work stealing queue. /// ~WorkStealingQueue() { delete [] m_pTasks; delete [] m_pSlots; } private: // The m_head and m_tail are volatile as they can be updated from different OS threads. // The "m_head" is only updated by foreign threads as they Steal a task from // this queue. By putting a lock in Steal, there is at most one foreign thread // changing m_head at a time. The m_tail is only updated by the bound thread. // // invariants: // tasks.length is a power of 2 // m_mask == tasks.length-1 // m_head is only written to by foreign threads // m_tail is only written to by the bound thread // At most one foreign thread can do a Steal // All methods except Steal are executed from a single bound thread // m_tail points to the first unused location // static const int s_initialSize = 64; // must be a power of 2 volatile int m_head; // only updated by Steal volatile int m_tail; // only updated by Push and Pop int m_mask; // the m_mask for taking modulus int m_cookieBase; // the base cookie index LOCK *m_pLock; // the lock that guards stealing from push/pops bool m_fMarkedForDetachment; // Indicates whether or not the work stealing queue is marked for detachment int m_detachmentTail; // The tail pointer for detachment. When the head crosses this, the mark ends T** m_pTasks; // the array of tasks typename Mailbox::Slot *m_pSlots; // the array of side-band affinity structures /// /// Pushes an element onto the work stealing queue under the queue lock. This guarantees that no steal /// interleaves and guarantees the ability to reallocate the physical store. The returned value is a cookie /// per Push(). /// int SyncPush(T* elem, typename ::Concurrency::details::Mailbox::Slot affinitySlot) { // // Because WorkStealingQueue is used for LRC and LRC needs to be searched in a SFW from a UMS primary, the lock here is a hyper // lock and no memory allocations can happen inside its scope. Preallocate everything up front! // // Keep in mind that the only thing that's going to happen without the lock held is a steal. No one else will try to resize, // pop, push, etc... // // // == (count >= size-1) // int oldsize = m_mask + 1; int newsize = 2 * oldsize; // highly unlikely, but throw out-of-memory if this overflows // // Yes -- it's entirely possible that we allocate and DON'T need to in rare circumstances - steal just opened up a slot. In that particular // case, we will just do the resizing since it's almost full. // T** pNewTasks = _concrt_new T*[newsize]; // // Again, for reasons of UMS, we cannot delete the old array until after we release the hyper lock. Stash it away // and defer the deletion. // T** pOldTasks = m_pTasks; typename Mailbox::Slot *pNewSlots = _concrt_new typename Mailbox::Slot[newsize]; typename Mailbox::Slot *pOldSlots = m_pSlots; { // // ensure that no Steal interleaves here // typename LOCK::_Scoped_lock lockHolder(*m_pLock); // // cache m_head, and calculate number of tasks // int h = m_head; const int count = m_tail - h; // // normalize indices // h = h & m_mask; // normalize m_head m_cookieBase += m_tail - (h + count); m_head = h; m_tail = h + count; #pragma warning(push) #pragma warning(disable:26010) // we get here the first time we've overflowed, // so as long as m_mask >= 1, which is asserted in the ctor, there's plenty of room CONCRT_COREASSERT(count < newsize); CONCRT_COREASSERT(pNewTasks != NULL); for (int i = 0; i < count; ++i) { pNewTasks[i] = m_pTasks[(h + i) & m_mask]; pNewSlots[i] = m_pSlots[(h + i) & m_mask]; } m_pTasks = pNewTasks; m_pSlots = pNewSlots; #pragma warning(pop) // // Rebase the cookie index. We can't hand out duplicate cookies due to this. // m_cookieBase += m_head; // // Rebase the detachment point if necessary. // if (m_fMarkedForDetachment) { CONCRT_COREASSERT(m_detachmentTail - m_head >= 0); m_detachmentTail -= m_head; } m_mask = newsize - 1; m_head = 0; m_tail = count; CONCRT_COREASSERT(count < m_mask); // // push the element // int t = m_tail; if (!affinitySlot.IsEmpty()) { // // Flag the element as affinitized. On popping this element, the box slot must be cleared to prevent // multiple execution. // m_pSlots[t & m_mask] = affinitySlot; elem = ADD_AFFINITY_INDICATOR(T, elem); } m_pTasks[t & m_mask] = elem; m_tail = t + 1; } delete[] pOldTasks; delete[] pOldSlots; return m_tail - 1 + m_cookieBase; } /// /// Synchronously pops an element from the work stealing queue. Note that this is called in the case where /// a Pop() call and a Steal() call interleave. /// T* SyncPop() { // // ensure that no Steal interleaves with this pop // typename LOCK::_Scoped_lock lockHolder(*m_pLock); typename Mailbox::Slot affinitySlot; T* pResult = NULL; int t = m_tail - 1; m_tail = t; if (0 <= t - m_head) { // // == (m_head <= m_tail) // pResult = m_pTasks[t & m_mask]; // // Because this was a single element / interleave with steal, there is nothing // below this in the WSQ in the event of a NULL return. Hence, we do not need // to perform an explicit skip as in Pop(). // affinitySlot = m_pSlots[t & m_mask]; } else { m_tail = t + 1; // restore m_tail } if (0 >= t - m_head) { // // Rebase the cookie index so we guarantee that currently handed out cookie values are // still valid until they are trypop()'d. // m_cookieBase += m_head; // // queue is empty: reset m_head and m_tail // m_head = 0; m_tail = 0; m_detachmentTail = 0; m_fMarkedForDetachment = false; } if (IS_AFFINITIZED_TASK(pResult)) { pResult = STRIP_AFFINITY_INDICATOR(T, pResult); // // If the task was already executed via a mailbox dequeue, return an indication to the caller. // if (!affinitySlot.Claim()) { return (T*)AFFINITY_EXECUTED; } } return pResult; } }; } // namespace details } // namespace Concurrency