diff --git a/include/weaver.h b/include/weaver.h index be6d77d..86ec264 100644 --- a/include/weaver.h +++ b/include/weaver.h @@ -6,8 +6,10 @@ #include #include #include +#include #include #include +#include namespace THWeaver { @@ -25,44 +27,63 @@ enum class FanInFabricRecvTryResult { Ok, ErrTryFailed }; enum class FanInFabricRecvAggrResult { Ok, ErrAggrFailed }; +template struct FanOutFabricSendResult { + BitmapType failures; + enum { Ok, ErrPartialSuccess, ErrFullFailure } result; +}; + +enum class FanOutFabricRecvResult { Ok, ErrEmpty }; + +enum class FanOutFabricDeallocResult { Ok, ErrReclaimQueueFull }; + template class EndpointQueue { public: static constexpr std::size_t QSIZE = 1 << qsize_expt; static constexpr std::size_t QSIZE_MASK = QSIZE - 1; EndpointQueue() noexcept {}; - EndpointQueue(const EndpointQueue &) = delete; - EndpointQueue & - operator=(const EndpointQueue &) = delete; - EndpointQueue(EndpointQueue &&) = delete; - EndpointQueue & - operator=(EndpointQueue &&) = delete; + EndpointQueue(const EndpointQueue &) = delete; + EndpointQueue &operator=(const EndpointQueue &) = delete; + EndpointQueue(EndpointQueue &&) = delete; + EndpointQueue &operator=(EndpointQueue &&) = delete; void init() noexcept { - head.store(0); - tail.store(0); + head.store(0, std::memory_order_release); + tail.store(0, std::memory_order_release); } void flush() noexcept { - head.store(0); - tail.store(0); + head.store(0, std::memory_order_release); + tail.store(0, std::memory_order_release); } std::size_t size() const noexcept { - return ((tail.load() + QSIZE_MASK) - head.load()) & QSIZE_MASK; + return ((tail.load(std::memory_order::acquire) + QSIZE_MASK) - + head.load(std::memory_order::acquire)) & + QSIZE_MASK; } - bool is_empty() const noexcept { return head.load() == tail.load(); } + bool is_empty() const noexcept { + return head.load(std::memory_order::acquire) == + tail.load(std::memory_order::acquire); + } bool is_full() const noexcept { - return head.load() == ((tail.load() + 1) & QSIZE_MASK); + return head.load(std::memory_order::acquire) == + ((tail.load(std::memory_order::acquire) + 1) & + QSIZE_MASK); } - std::size_t get_head() const noexcept { return head.load(); } - std::size_t get_tail() const noexcept { return tail.load(); } + std::size_t get_head() const noexcept { + return head.load(std::memory_order::acquire); + } + std::size_t get_tail() const noexcept { + return tail.load(std::memory_order::acquire); + } + std::array &get_buffer() noexcept { return buffer; } EndpointQueueSendResult send(MessageType &&message) noexcept { - std::size_t h = head.load(); - std::size_t t = tail.load(); + std::size_t h = head.load(std::memory_order::acquire); + std::size_t t = tail.load(std::memory_order::acquire); if (h == ((t + 1) & QSIZE_MASK)) return EndpointQueueSendResult::ErrFull; @@ -70,14 +91,14 @@ public: buffer[t] = std::move(message); t = (t + 1) & QSIZE_MASK; - tail.store(t); + tail.store(t, std::memory_order_release); return EndpointQueueSendResult::Ok; } EndpointQueueRecvResult recv(MessageType &buffer) noexcept { - std::size_t h = head.load(); - std::size_t t = tail.load(); + std::size_t h = head.load(std::memory_order::acquire); + std::size_t t = tail.load(std::memory_order::acquire); if (h == t) return EndpointQueueRecvResult::ErrEmpty; @@ -85,15 +106,16 @@ public: buffer = std::move(this->buffer[h]); h = (h + 1) & QSIZE_MASK; - head.store(h); + head.store(h, std::memory_order_release); return EndpointQueueRecvResult::Ok; } - void recv_unsafe(MessageType &buffer, std::size_t old_head) noexcept { + void recv_unsafe(MessageType &buffer, std::size_t &old_head) noexcept { buffer = std::move(this->buffer[old_head]); + old_head = (old_head + 1) & QSIZE_MASK; - head.store((old_head + 1) & QSIZE_MASK); + head.store(old_head, std::memory_order_release); } private: @@ -108,7 +130,7 @@ class FanInFabric { public: using BitmapType = uint64_t; - static constexpr std::size_t QSIZE = 1 << qsize_expt; + static constexpr std::size_t QSIZE = 1ULL << qsize_expt; static constexpr std::size_t QSIZE_MASK = QSIZE - 1; static constexpr std::size_t BITMAP_SIZE = sizeof(BitmapType) * 8; static constexpr BitmapType THREAD_BITMAP_MASK = @@ -116,19 +138,13 @@ public: ? ~BitmapType(0) : (BitmapType(1) << IN_THREAD_CNT) - 1; - static_assert(IN_THREAD_CNT <= BITMAP_SIZE, "Producer limit exceeded"); + static_assert(IN_THREAD_CNT <= BITMAP_SIZE, "Insufficient bitmap size"); FanInFabric() noexcept {} - FanInFabric(const FanInFabric - &) = delete; - FanInFabric & - operator=(const FanInFabric &) = - delete; - FanInFabric(FanInFabric &&) = - delete; - FanInFabric & - operator=(FanInFabric &&) = - delete; + FanInFabric(const FanInFabric &) = delete; + FanInFabric &operator=(const FanInFabric &) = delete; + FanInFabric(FanInFabric &&) = delete; + FanInFabric &operator=(FanInFabric &&) = delete; void init() noexcept { for (int i = 0; i < IN_THREAD_CNT; ++i) @@ -350,7 +366,9 @@ private: } } } dl; + std::size_t rotation; + BitmapType rotate(BitmapType bitmap) const noexcept { bitmap &= THREAD_BITMAP_MASK; @@ -363,6 +381,336 @@ private: } }; +template +class FanOutFabric { + using BitmapType = uint64_t; + using RefCountType = uint32_t; + using IndexType = uint16_t; + + static constexpr std::size_t CONSUMER_QSIZE = 1ULL + << consumer_qsize_expt; + static constexpr std::size_t CONSUMER_QSIZE_MASK = CONSUMER_QSIZE - 1; + static constexpr std::size_t RECLAIM_QSIZE = 1ULL << reclaim_qsize_expt; + static constexpr std::size_t BITMAP_SIZE = sizeof(BitmapType) * 8; + static constexpr BitmapType THREAD_BITMAP_MASK = + OUT_THREAD_CNT == BITMAP_SIZE + ? ~BitmapType(0) + : (BitmapType(1) << OUT_THREAD_CNT) - 1; + + static_assert(OUT_THREAD_CNT <= BITMAP_SIZE, + "Insufficient bitmap size"); + static_assert(std::numeric_limits::max() >= BUFFER_SIZE, + "Insuffiently large IndexType"); + + FanOutFabric() noexcept {} + FanOutFabric(const FanOutFabric &) = delete; + FanOutFabric &operator=(const FanOutFabric &) = delete; + FanOutFabric(FanOutFabric &&) = delete; + FanOutFabric &operator=(FanOutFabric &&) = delete; + + template class BorrowedRef { + public: + static_assert(thread_id < OUT_THREAD_CNT, + "Thread ID out of bounds"); + + BorrowedRef() noexcept + : parent(nullptr), slot(BUFFER_SIZE), payload(nullptr), + released(true) {} + BorrowedRef(const BorrowedRef &) = delete; + BorrowedRef &operator=(const BorrowedRef &) = delete; + BorrowedRef(BorrowedRef &&) = delete; + BorrowedRef &operator=(BorrowedRef &&) = delete; + + ~BorrowedRef() noexcept { + if (is_valid()) + release(); + } + + void init(FanOutFabric *parent, IndexType slot, + const MessageType *payload) noexcept { + this->parent = parent; + this->slot = slot; + this->payload = payload; + released = false; + } + + FanOutFabricDeallocResult release() noexcept { + released = true; + payload = nullptr; + return parent->dealloc_try(slot); + } + + bool is_valid() const noexcept { + return !released && payload != nullptr && + parent != nullptr; + } + + bool is_released() const noexcept { + return released || payload == nullptr; + } + + const MessageType &get() const noexcept { return *payload; } + IndexType get_slot() const noexcept { return slot; } + + private: + FanOutFabric *parent; + IndexType slot; + const MessageType *payload; + bool released; + }; + + void init() noexcept { + for (int i = 0; i < OUT_THREAD_CNT; ++i) { + reclaim_queues[i].init(); + consumer_queues[i].init(); + } + reclaim_hint.store(0, std::memory_order_release); + buffer.resize(BUFFER_SIZE); + fs.init(); + rotation = 0; + } + + // Prohibitively expensive + template + FanOutFabricDeallocResult flush() noexcept { + static_assert(thread_id < OUT_THREAD_CNT, + "Thread ID out of bounds"); + + std::size_t head = consumer_queues[thread_id].get_head(); + std::size_t tail = consumer_queues[thread_id].get_tail(); + auto &qbuffer = consumer_queues[thread_id].get_buffer(); + + for (std::size_t i = head; i != tail; + i = ((i + 1) & CONSUMER_QSIZE_MASK)) { + RefCountType cnt = + buffer[qbuffer[i]].count.fetch_sub(1); + + if (cnt == 1 && dealloc_try(qbuffer[i]) != + FanOutFabricDeallocResult::Ok) + return FanOutFabricDeallocResult:: + ErrReclaimQueueFull; + } + + consumer_queues[thread_id].flush(); + return FanOutFabricDeallocResult(); + } + + void flush_all() noexcept { + for (int i = 0; i < OUT_THREAD_CNT; ++i) { + reclaim_queues[i].flush(); + consumer_queues[i].flush(); + } + reclaim_hint.store(0, std::memory_order_release); + fs.init(); + reclaim_rot = 0; + } + + template + BorrowedRef get_empty_borrowed_ref() const noexcept { + return BorrowedRef(); + } + IndexType free_size() const noexcept { return fs.size(); } + template std::size_t size() const noexcept { + static_assert(thread_id < OUT_THREAD_CNT, + "Thread ID out of bounds"); + + return consumer_queues[thread_id].size(); + } + + FanOutFabricSendResult + send(MessageType &&message, BitmapType policy, + IndexType reclaim_budget) noexcept { + if (fs.is_empty()) { + reclaim(reclaim_budget); + + if (fs.is_empty()) + return {policy, + FanOutFabricSendResult< + BitmapType>::ErrFullFailure}; + } + + IndexType slot = fs.pop_unsafe(); + RefCountType failed_cnt = 0; + RefCountType try_cnt = std::popcount(policy); + buffer[slot].payload = std::move(message); + buffer[slot].count.store(try_cnt); + + for (std::size_t i = 0; i < OUT_THREAD_CNT; ++i) { + if ((policy & (BitmapType(1) << i)) && + consumer_queues[i].send(slot) != + EndpointQueueSendResult::Ok) + ++failed_cnt; + else + policy &= (~(BitmapType(1) << i)) & + THREAD_BITMAP_MASK; + } + + buffer[slot].count.fetch_sub(failed_cnt); + + if (try_cnt == failed_cnt) { + fs.push(slot); + return {policy, FanOutFabricSendResult< + BitmapType>::ErrFullFailure}; + } else if (failed_cnt) { + return {policy, FanOutFabricSendResult< + BitmapType>::ErrPartialSuccess}; + } else { + return {0, FanOutFabricSendResult::Ok}; + } + } + + template + FanOutFabricRecvResult recv(BorrowedRef &token) noexcept { + static_assert(thread_id < OUT_THREAD_CNT, + "Thread ID out of bounds"); + + std::size_t head = consumer_queues[thread_id].get_head(); + std::size_t tail = consumer_queues[thread_id].get_tail(); + + if (head == tail) + return FanOutFabricRecvResult::ErrEmpty; + + if (token.is_valid()) + token.release(); + + IndexType slot; + consumer_queues[thread_id].recv_unsafe(slot, head); + + token.init(this, slot, &(buffer[slot].payload)); + return FanOutFabricRecvResult::Ok; + } + + IndexType reclaim(IndexType budget) noexcept { + IndexType reclaim_cnt = 0; + + BitmapType bitmap = + reclaim_hint.load(std::memory_order_relaxed); + bitmap = rotate(bitmap); + + for (std::size_t i = 0; + reclaim_cnt < budget && i < OUT_THREAD_CNT; ++i) { + if (!(bitmap & (BitmapType(1) << i))) + continue; + + std::size_t idx = (i + reclaim_rot) % OUT_THREAD_CNT; + + reclaim_hint.fetch_and(~(BitmapType(1) << idx), + std::memory_order_relaxed); + + std::size_t head = reclaim_queues[idx].get_head(); + std::size_t tail = reclaim_queues[idx].get_tail(); + while (reclaim_cnt < budget && head != tail) { + IndexType tmp; + reclaim_queues[idx].recv_unsafe(tmp, head); + fs.push(tmp); + ++reclaim_cnt; + } + + if (reclaim_cnt >= budget && + !reclaim_queues[idx].is_empty()) + reclaim_hint.fetch_or( + BitmapType(1) << idx, + std::memory_order_relaxed); + reclaim_rot = (idx + 1) % OUT_THREAD_CNT; + } + + return reclaim_cnt; + } + IndexType reclaim_unlimited() noexcept { return reclaim(BUFFER_SIZE); } + IndexType reclaim_scan_all(IndexType budget) noexcept { + IndexType reclaim_cnt = 0; + + for (std::size_t i = 0; + reclaim_cnt < budget && i < OUT_THREAD_CNT; ++i) { + std::size_t idx = (i + reclaim_rot) % OUT_THREAD_CNT; + + reclaim_hint.fetch_and(~(BitmapType(1) << idx), + std::memory_order_relaxed); + + std::size_t head = reclaim_queues[idx].get_head(); + std::size_t tail = reclaim_queues[idx].get_tail(); + while (reclaim_cnt < budget && head != tail) { + IndexType tmp; + reclaim_queues[idx].recv_unsafe(tmp, head); + fs.push(tmp); + ++reclaim_cnt; + } + + if (reclaim_cnt >= budget && + !reclaim_queues[idx].is_empty()) + reclaim_hint.fetch_or( + BitmapType(1) << idx, + std::memory_order_relaxed); + reclaim_rot = (idx + 1) % OUT_THREAD_CNT; + } + + return reclaim_cnt; + } + + template + FanOutFabricDeallocResult dealloc_try(IndexType slot) noexcept { + static_assert(thread_id < OUT_THREAD_CNT, + "Thread ID out of bounds"); + + auto res = reclaim_queues[thread_id].send(slot); + if (res == EndpointQueueSendResult::Ok) { + reclaim_hint.fetch_or(BitmapType(1) << thread_id, + std::memory_order_relaxed); + return FanOutFabricDeallocResult::Ok; + } else { + return FanOutFabricDeallocResult::ErrReclaimQueueFull; + } + } + +private: + std::array, OUT_THREAD_CNT> + reclaim_queues; + std::array, + OUT_THREAD_CNT> + consumer_queues; + alignas(CLS) std::atomic reclaim_hint; + struct alignas(CLS) BufferSlot { + std::atomic count; + MessageType payload; + }; + std::vector buffer; + struct alignas(CLS) FreeStack { + std::array free; + int top; + void init() { + for (int i = 0; i < BUFFER_SIZE; ++i) + free[i] = i; + top = BUFFER_SIZE; + } + void push(IndexType val) { + free[top] = val; + ++top; + } + IndexType pop() { + if (top) + return free[--top]; + return BUFFER_SIZE; + } + IndexType pop_unsafe() { return free[--top]; } + bool is_empty() const { return top == 0; } + int size() const { return top; } + } fs; + std::size_t reclaim_rot; + + BitmapType rotate(BitmapType bitmap) const noexcept { + bitmap &= THREAD_BITMAP_MASK; + + if (reclaim_rot == 0) + return bitmap; + + return ((bitmap >> reclaim_rot) | + (bitmap << (OUT_THREAD_CNT - reclaim_rot))) & + THREAD_BITMAP_MASK; + } +}; + } // namespace THWeaver #endif