/* * Copyright (c) 2026 Peisong Xiao * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal * in the Software without restriction, including without limitation the rights * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. */ #ifndef THREADWEAVER_WEAVER_H #define THREADWEAVER_WEAVER_H #include #include #include #include #include #include #include #include namespace THWeaver { static constexpr size_t CLS = std::hardware_destructive_interference_size; enum class EndpointQueueSendResult : uint32_t { Ok = 0, ErrFull = 1 }; enum class EndpointQueueRecvResult { Ok, ErrEmpty }; enum class FanInFabricSendResult : uint32_t { Ok = 0, ErrFull = 1 }; enum class FanInFabricRecvBitmapResult { Ok, ErrBitmapEmpty, ErrBitmapFailed }; enum class FanInFabricRecvTryResult { Ok, ErrTryFailed }; enum class FanInFabricRecvAggrResult { Ok, ErrAggrFailed }; template struct FanOutFabricSendResult { BitmapType failures; enum { Ok, ErrPartialSuccess, ErrFullFailure } result; }; enum class FanOutFabricRecvResult { Ok, ErrEmpty }; enum class FanOutFabricDeallocResult { Ok, ErrReclaimQueueFull }; template class EndpointQueue { public: static constexpr std::size_t QSIZE = 1 << qsize_expt; static constexpr std::size_t QSIZE_MASK = QSIZE - 1; EndpointQueue() noexcept {}; EndpointQueue(const EndpointQueue &) = delete; EndpointQueue &operator=(const EndpointQueue &) = delete; EndpointQueue(EndpointQueue &&) = delete; EndpointQueue &operator=(EndpointQueue &&) = delete; void init() noexcept { head.store(0, std::memory_order_release); tail.store(0, std::memory_order_release); } void flush() noexcept { head.store(0, std::memory_order_release); tail.store(0, std::memory_order_release); } std::size_t get_size() const noexcept { return ((tail.load(std::memory_order::acquire) + QSIZE_MASK) - head.load(std::memory_order::acquire)) & QSIZE_MASK; } bool is_empty() const noexcept { return head.load(std::memory_order::acquire) == tail.load(std::memory_order::acquire); } bool is_full() const noexcept { return head.load(std::memory_order::acquire) == ((tail.load(std::memory_order::acquire) + 1) & QSIZE_MASK); } std::size_t get_head() const noexcept { return head.load(std::memory_order::acquire); } std::size_t get_tail() const noexcept { return tail.load(std::memory_order::acquire); } std::array &get_buffer() noexcept { return buffer; } EndpointQueueSendResult send(MessageType &&message) noexcept { std::size_t h = head.load(std::memory_order::acquire); std::size_t t = tail.load(std::memory_order::acquire); if (h == ((t + 1) & QSIZE_MASK)) return EndpointQueueSendResult::ErrFull; buffer[t] = std::move(message); t = (t + 1) & QSIZE_MASK; tail.store(t, std::memory_order_release); return EndpointQueueSendResult::Ok; } EndpointQueueRecvResult recv(MessageType &buffer) noexcept { std::size_t h = head.load(std::memory_order::acquire); std::size_t t = tail.load(std::memory_order::acquire); if (h == t) return EndpointQueueRecvResult::ErrEmpty; buffer = std::move(this->buffer[h]); h = (h + 1) & QSIZE_MASK; head.store(h, std::memory_order_release); return EndpointQueueRecvResult::Ok; } void recv_unsafe(MessageType &buffer, std::size_t &old_head) noexcept { buffer = std::move(this->buffer[old_head]); old_head = (old_head + 1) & QSIZE_MASK; head.store(old_head, std::memory_order_release); } private: alignas(CLS) std::array buffer; alignas(CLS) std::atomic head; alignas(CLS) std::atomic tail; }; template class FanInFabric { public: using BitmapType = uint64_t; static constexpr std::size_t QSIZE = 1ULL << qsize_expt; static constexpr std::size_t QSIZE_MASK = QSIZE - 1; static constexpr std::size_t BITMAP_SIZE = sizeof(BitmapType) * 8; static constexpr BitmapType THREAD_BITMAP_MASK = IN_THREAD_CNT == BITMAP_SIZE ? ~BitmapType(0) : (BitmapType(1) << IN_THREAD_CNT) - 1; static_assert(IN_THREAD_CNT <= BITMAP_SIZE, "Insufficient bitmap size"); FanInFabric() noexcept {} FanInFabric(const FanInFabric &) = delete; FanInFabric &operator=(const FanInFabric &) = delete; FanInFabric(FanInFabric &&) = delete; FanInFabric &operator=(FanInFabric &&) = delete; void init() noexcept { for (std::size_t i = 0; i < IN_THREAD_CNT; ++i) in_queues[i].init(); hint.store(0); dl.init(); rotation = 0; } template void flush() noexcept { static_assert(thread_id < IN_THREAD_CNT, "Thread ID out of bounds"); in_queues[thread_id].flush(); hint.fetch_and(~(BitmapType(1) << thread_id), std::memory_order_relaxed); } void flush_all() noexcept { for (int i = 0; i < IN_THREAD_CNT; ++i) in_queues[i].flush(); hint.store(0, std::memory_order_relaxed); dl.init(); rotation = 0; } template FanInFabricSendResult send(MessageType &&message) noexcept { static_assert(thread_id < IN_THREAD_CNT, "Thread ID out of bounds"); return static_cast(static_cast( in_queues[thread_id].send(std::move(message)))); } FanInFabricRecvBitmapResult recv_bitmap_only(MessageType &buffer) noexcept { BitmapType bitmap = hint.load(std::memory_order_relaxed); if (!bitmap) return FanInFabricRecvBitmapResult::ErrBitmapEmpty; bitmap = rotate(bitmap); int hinted_id = (std::countr_zero(bitmap) + rotation) % IN_THREAD_CNT; rotation = (hinted_id + 1) % IN_THREAD_CNT; std::size_t head = in_queues[hinted_id].get_head(); std::size_t tail = in_queues[hinted_id].get_tail(); std::size_t qsize = ((tail + QSIZE_MASK) - head) & QSIZE_MASK; if (qsize <= 1) hint.fetch_and(~(BitmapType(1) << hinted_id), std::memory_order_relaxed); if (qsize) { in_queues[hinted_id].recv_unsafe(buffer, head); dl.move_to_last(hinted_id); return FanInFabricRecvBitmapResult::Ok; } return FanInFabricRecvBitmapResult::ErrBitmapFailed; } FanInFabricRecvTryResult recv_try(MessageType &buffer) noexcept { BitmapType bitmap = hint.load(std::memory_order_relaxed); if (bitmap) { bitmap = rotate(bitmap); int hinted_id = (std::countr_zero(bitmap) + rotation) % IN_THREAD_CNT; rotation = (hinted_id + 1) % IN_THREAD_CNT; std::size_t head = in_queues[hinted_id].get_head(); std::size_t tail = in_queues[hinted_id].get_tail(); std::size_t qsize = ((tail + QSIZE_MASK) - head) & QSIZE_MASK; if (qsize <= 1) hint.fetch_and(~(BitmapType(1) << hinted_id), std::memory_order_relaxed); if (qsize) { in_queues[hinted_id].recv_unsafe(buffer, head); dl.move_to_last(hinted_id); return FanInFabricRecvTryResult::Ok; } } std::size_t head = in_queues[dl.first].get_head(); std::size_t tail = in_queues[dl.first].get_tail(); if (head == tail) return FanInFabricRecvTryResult::ErrTryFailed; in_queues[dl.first].recv_unsafe(buffer, head); dl.move_first_to_last(); return FanInFabricRecvTryResult::Ok; } FanInFabricRecvAggrResult recv_aggr(MessageType &buffer) noexcept { BitmapType bitmap = hint.load(std::memory_order_relaxed); if (bitmap) { bitmap = rotate(bitmap); int hinted_id = (std::countr_zero(bitmap) + rotation) % IN_THREAD_CNT; rotation = (hinted_id + 1) % IN_THREAD_CNT; std::size_t head = in_queues[hinted_id].get_head(); std::size_t tail = in_queues[hinted_id].get_tail(); std::size_t qsize = ((tail + QSIZE_MASK) - head) & QSIZE_MASK; if (qsize <= 1) hint.fetch_and(~(BitmapType(1) << hinted_id), std::memory_order_relaxed); if (qsize) { in_queues[hinted_id].recv_unsafe(buffer, head); dl.move_to_last(hinted_id); return FanInFabricRecvAggrResult::Ok; } } uint8_t last = dl.last; do { uint8_t first = dl.first; std::size_t head = in_queues[dl.first].get_head(); std::size_t tail = in_queues[dl.first].get_tail(); dl.move_first_to_last(); if (head != tail) { in_queues[first].recv_unsafe(buffer, head); return FanInFabricRecvAggrResult::Ok; } } while (dl.last != last); return FanInFabricRecvAggrResult::ErrAggrFailed; } void full_scan(BitmapType &bitmap) noexcept { bitmap = 0; uint8_t curr = dl.first; if (in_queues[curr].get_size()) bitmap |= BitmapType(1) << curr; do { curr = dl.next[curr]; if (in_queues[curr].get_size()) bitmap |= BitmapType(1) << curr; } while (curr != dl.last); hint.fetch_or(bitmap, std::memory_order_relaxed); } template std::size_t get_size() const noexcept { static_assert(thread_id < IN_THREAD_CNT, "Thread ID out of bounds"); return in_queues[thread_id].get_size(); } template bool is_empty() const noexcept { static_assert(thread_id < IN_THREAD_CNT, "Thread ID out of bounds"); return in_queues[thread_id].is_empty(); } template bool is_full() const noexcept { static_assert(thread_id < IN_THREAD_CNT, "Thread ID out of bounds"); return in_queues[thread_id].is_full(); } private: std::array, IN_THREAD_CNT> in_queues; alignas(CLS) std::atomic hint; struct alignas(CLS) DiscoveryList { std::array next; std::array prev; uint8_t first; uint8_t last; void init() noexcept { for (std::size_t i = 0; i < IN_THREAD_CNT; ++i) { next[i] = (i + 1) % IN_THREAD_CNT; prev[i] = (i + IN_THREAD_CNT - 1) % IN_THREAD_CNT; } first = 0; last = IN_THREAD_CNT - 1; } void move_first_to_last() noexcept { prev[first] = last; next[last] = first; last = next[last]; first = next[last]; } void move_to_last(uint8_t index) noexcept { if (index == last) return; else if (index == first) { move_first_to_last(); } else { prev[next[index]] = prev[index]; next[prev[index]] = next[index]; next[last] = index; prev[index] = last; last = index; } } } dl; std::size_t rotation; BitmapType rotate(BitmapType bitmap) const noexcept { bitmap &= THREAD_BITMAP_MASK; if (rotation == 0) return bitmap; return ((bitmap >> rotation) | (bitmap << (IN_THREAD_CNT - rotation))) & THREAD_BITMAP_MASK; } }; template class FanOutFabric { public: using BitmapType = uint64_t; using RefCountType = uint32_t; using IndexType = uint16_t; static constexpr std::size_t CONSUMER_QSIZE = 1ULL << consumer_qsize_expt; static constexpr std::size_t CONSUMER_QSIZE_MASK = CONSUMER_QSIZE - 1; static constexpr std::size_t RECLAIM_QSIZE = 1ULL << reclaim_qsize_expt; static constexpr std::size_t BITMAP_SIZE = sizeof(BitmapType) * 8; static constexpr BitmapType THREAD_BITMAP_MASK = OUT_THREAD_CNT == BITMAP_SIZE ? ~BitmapType(0) : (BitmapType(1) << OUT_THREAD_CNT) - 1; static_assert(OUT_THREAD_CNT <= BITMAP_SIZE, "Insufficient bitmap size"); static_assert(std::numeric_limits::max() >= BUFFER_SIZE, "Insuffiently large IndexType"); FanOutFabric() noexcept {} FanOutFabric(const FanOutFabric &) = delete; FanOutFabric &operator=(const FanOutFabric &) = delete; FanOutFabric(FanOutFabric &&) = delete; FanOutFabric &operator=(FanOutFabric &&) = delete; template class BorrowedRef { public: static_assert(thread_id < OUT_THREAD_CNT, "Thread ID out of bounds"); BorrowedRef() noexcept : parent(nullptr), slot(BUFFER_SIZE), payload(nullptr), released(true) {} BorrowedRef(const BorrowedRef &) = delete; BorrowedRef &operator=(const BorrowedRef &) = delete; BorrowedRef(BorrowedRef &&) = delete; BorrowedRef &operator=(BorrowedRef &&) = delete; ~BorrowedRef() noexcept { if (is_valid()) release(); } void init(FanOutFabric *parent, IndexType slot, const MessageType *payload) noexcept { this->parent = parent; this->slot = slot; this->payload = payload; released = false; } FanOutFabricDeallocResult release() noexcept { released = true; payload = nullptr; return parent->dealloc_try(slot); } bool is_valid() const noexcept { return !released && payload != nullptr && parent != nullptr; } bool is_released() const noexcept { return released || payload == nullptr; } const MessageType &get() const noexcept { return *payload; } IndexType get_slot() const noexcept { return slot; } private: FanOutFabric *parent; IndexType slot; const MessageType *payload; bool released; }; void init() noexcept { for (std::size_t i = 0; i < OUT_THREAD_CNT; ++i) { reclaim_queues[i].init(); consumer_queues[i].init(); } reclaim_hint.store(0, std::memory_order_release); fs.init(); reclaim_rot = 0; } // Prohibitively expensive template FanOutFabricDeallocResult flush() noexcept { static_assert(thread_id < OUT_THREAD_CNT, "Thread ID out of bounds"); std::size_t head = consumer_queues[thread_id].get_head(); std::size_t tail = consumer_queues[thread_id].get_tail(); auto &qbuffer = consumer_queues[thread_id].get_buffer(); for (std::size_t i = head; i != tail; i = ((i + 1) & CONSUMER_QSIZE_MASK)) { RefCountType cnt = buffer[qbuffer[i]].count.fetch_sub( 1, std::memory_order_relaxed); if (cnt == 1 && dealloc_try(qbuffer[i]) != FanOutFabricDeallocResult::Ok) return FanOutFabricDeallocResult:: ErrReclaimQueueFull; } consumer_queues[thread_id].flush(); return FanOutFabricDeallocResult(); } void flush_all() noexcept { for (int i = 0; i < OUT_THREAD_CNT; ++i) { reclaim_queues[i].flush(); consumer_queues[i].flush(); } reclaim_hint.store(0, std::memory_order_release); fs.init(); reclaim_rot = 0; } template BorrowedRef get_empty_borrowed_ref() const noexcept { return BorrowedRef(); } IndexType free_get_size() const noexcept { return fs.get_size(); } template std::size_t get_size() const noexcept { static_assert(thread_id < OUT_THREAD_CNT, "Thread ID out of bounds"); return consumer_queues[thread_id].get_size(); } FanOutFabricSendResult send(MessageType &&message, BitmapType policy, IndexType reclaim_budget) noexcept { if (fs.is_empty()) { reclaim(reclaim_budget); if (fs.is_empty()) return {policy, FanOutFabricSendResult< BitmapType>::ErrFullFailure}; } IndexType slot = fs.pop_unsafe(); RefCountType failed_cnt = 0; RefCountType try_cnt = std::popcount(policy); buffer[slot].payload = std::move(message); buffer[slot].count.store(try_cnt); for (std::size_t i = 0; i < OUT_THREAD_CNT; ++i) { if ((policy & (BitmapType(1) << i)) && consumer_queues[i].send(std::move(slot)) != EndpointQueueSendResult::Ok) ++failed_cnt; else policy &= (~(BitmapType(1) << i)) & THREAD_BITMAP_MASK; } buffer[slot].count.fetch_sub(failed_cnt, std::memory_order_relaxed); if (try_cnt == failed_cnt) { fs.push(slot); return {policy, FanOutFabricSendResult< BitmapType>::ErrFullFailure}; } else if (failed_cnt) { return {policy, FanOutFabricSendResult< BitmapType>::ErrPartialSuccess}; } else { return {0, FanOutFabricSendResult::Ok}; } } template FanOutFabricRecvResult recv(BorrowedRef &token) noexcept { static_assert(thread_id < OUT_THREAD_CNT, "Thread ID out of bounds"); std::size_t head = consumer_queues[thread_id].get_head(); std::size_t tail = consumer_queues[thread_id].get_tail(); if (head == tail) return FanOutFabricRecvResult::ErrEmpty; if (token.is_valid()) token.release(); IndexType slot; consumer_queues[thread_id].recv_unsafe(slot, head); token.init(this, slot, &(buffer[slot].payload)); return FanOutFabricRecvResult::Ok; } IndexType reclaim(IndexType budget) noexcept { IndexType reclaim_cnt = 0; BitmapType bitmap = reclaim_hint.load(std::memory_order_relaxed); bitmap = rotate(bitmap); for (std::size_t i = 0; reclaim_cnt < budget && i < OUT_THREAD_CNT; ++i) { if (!(bitmap & (BitmapType(1) << i))) continue; std::size_t idx = (i + reclaim_rot) % OUT_THREAD_CNT; reclaim_hint.fetch_and(~(BitmapType(1) << idx), std::memory_order_relaxed); std::size_t head = reclaim_queues[idx].get_head(); std::size_t tail = reclaim_queues[idx].get_tail(); while (reclaim_cnt < budget && head != tail) { IndexType tmp; reclaim_queues[idx].recv_unsafe(tmp, head); fs.push(tmp); ++reclaim_cnt; } if (reclaim_cnt >= budget && head != reclaim_queues[idx].get_tail()) reclaim_hint.fetch_or( BitmapType(1) << idx, std::memory_order_relaxed); reclaim_rot = (idx + 1) % OUT_THREAD_CNT; } return reclaim_cnt; } IndexType reclaim_unlimited() noexcept { return reclaim(BUFFER_SIZE); } IndexType reclaim_scan_all(IndexType budget) noexcept { IndexType reclaim_cnt = 0; for (std::size_t i = 0; reclaim_cnt < budget && i < OUT_THREAD_CNT; ++i) { std::size_t idx = (i + reclaim_rot) % OUT_THREAD_CNT; reclaim_hint.fetch_and(~(BitmapType(1) << idx), std::memory_order_relaxed); std::size_t head = reclaim_queues[idx].get_head(); std::size_t tail = reclaim_queues[idx].get_tail(); while (reclaim_cnt < budget && head != tail) { IndexType tmp; reclaim_queues[idx].recv_unsafe(tmp, head); fs.push(tmp); ++reclaim_cnt; } if (reclaim_cnt >= budget && head != reclaim_queues[idx].get_tail()) reclaim_hint.fetch_or( BitmapType(1) << idx, std::memory_order_relaxed); reclaim_rot = (idx + 1) % OUT_THREAD_CNT; } return reclaim_cnt; } template FanOutFabricDeallocResult dealloc_try(IndexType slot) noexcept { static_assert(thread_id < OUT_THREAD_CNT, "Thread ID out of bounds"); auto res = reclaim_queues[thread_id].send(std::move(slot)); if (res == EndpointQueueSendResult::Ok) { reclaim_hint.fetch_or(BitmapType(1) << thread_id, std::memory_order_relaxed); return FanOutFabricDeallocResult::Ok; } else { return FanOutFabricDeallocResult::ErrReclaimQueueFull; } } private: std::array, OUT_THREAD_CNT> reclaim_queues; std::array, OUT_THREAD_CNT> consumer_queues; alignas(CLS) std::atomic reclaim_hint; struct BufferSlot { alignas(CLS) MessageType payload; alignas(CLS) std::atomic count; }; std::array buffer; struct alignas(CLS) FreeStack { std::array free; int top; void init() { for (std::size_t i = 0; i < BUFFER_SIZE; ++i) free[i] = i; top = BUFFER_SIZE; } void push(IndexType val) { free[top] = val; ++top; } IndexType pop() { if (top) return free[--top]; return BUFFER_SIZE; } IndexType pop_unsafe() { return free[--top]; } bool is_empty() const { return top == 0; } int get_size() const { return top; } } fs; std::size_t reclaim_rot; BitmapType rotate(BitmapType bitmap) const noexcept { bitmap &= THREAD_BITMAP_MASK; if (reclaim_rot == 0) return bitmap; return ((bitmap >> reclaim_rot) | (bitmap << (OUT_THREAD_CNT - reclaim_rot))) & THREAD_BITMAP_MASK; } }; } // namespace THWeaver #endif