[UNTESTED] added fan-out implementation

This commit is contained in:
2026-01-04 10:45:51 -05:00
parent 00011f7c88
commit bc58b8982a

View File

@@ -6,8 +6,10 @@
#include <bit>
#include <cstddef>
#include <cstdint>
#include <limits>
#include <new>
#include <utility>
#include <vector>
namespace THWeaver {
@@ -25,44 +27,63 @@ enum class FanInFabricRecvTryResult { Ok, ErrTryFailed };
enum class FanInFabricRecvAggrResult { Ok, ErrAggrFailed };
template <typename BitmapType> struct FanOutFabricSendResult {
BitmapType failures;
enum { Ok, ErrPartialSuccess, ErrFullFailure } result;
};
enum class FanOutFabricRecvResult { Ok, ErrEmpty };
enum class FanOutFabricDeallocResult { Ok, ErrReclaimQueueFull };
template <typename MessageType, std::size_t qsize_expt> class EndpointQueue {
public:
static constexpr std::size_t QSIZE = 1 << qsize_expt;
static constexpr std::size_t QSIZE_MASK = QSIZE - 1;
EndpointQueue() noexcept {};
EndpointQueue(const EndpointQueue<MessageType, qsize_expt> &) = delete;
EndpointQueue &
operator=(const EndpointQueue<MessageType, qsize_expt> &) = delete;
EndpointQueue(EndpointQueue<MessageType, qsize_expt> &&) = delete;
EndpointQueue &
operator=(EndpointQueue<MessageType, qsize_expt> &&) = delete;
EndpointQueue(const EndpointQueue &) = delete;
EndpointQueue &operator=(const EndpointQueue &) = delete;
EndpointQueue(EndpointQueue &&) = delete;
EndpointQueue &operator=(EndpointQueue &&) = delete;
void init() noexcept {
head.store(0);
tail.store(0);
head.store(0, std::memory_order_release);
tail.store(0, std::memory_order_release);
}
void flush() noexcept {
head.store(0);
tail.store(0);
head.store(0, std::memory_order_release);
tail.store(0, std::memory_order_release);
}
std::size_t size() const noexcept {
return ((tail.load() + QSIZE_MASK) - head.load()) & QSIZE_MASK;
return ((tail.load(std::memory_order::acquire) + QSIZE_MASK) -
head.load(std::memory_order::acquire)) &
QSIZE_MASK;
}
bool is_empty() const noexcept { return head.load() == tail.load(); }
bool is_empty() const noexcept {
return head.load(std::memory_order::acquire) ==
tail.load(std::memory_order::acquire);
}
bool is_full() const noexcept {
return head.load() == ((tail.load() + 1) & QSIZE_MASK);
return head.load(std::memory_order::acquire) ==
((tail.load(std::memory_order::acquire) + 1) &
QSIZE_MASK);
}
std::size_t get_head() const noexcept { return head.load(); }
std::size_t get_tail() const noexcept { return tail.load(); }
std::size_t get_head() const noexcept {
return head.load(std::memory_order::acquire);
}
std::size_t get_tail() const noexcept {
return tail.load(std::memory_order::acquire);
}
std::array<MessageType, QSIZE> &get_buffer() noexcept { return buffer; }
EndpointQueueSendResult send(MessageType &&message) noexcept {
std::size_t h = head.load();
std::size_t t = tail.load();
std::size_t h = head.load(std::memory_order::acquire);
std::size_t t = tail.load(std::memory_order::acquire);
if (h == ((t + 1) & QSIZE_MASK))
return EndpointQueueSendResult::ErrFull;
@@ -70,14 +91,14 @@ public:
buffer[t] = std::move(message);
t = (t + 1) & QSIZE_MASK;
tail.store(t);
tail.store(t, std::memory_order_release);
return EndpointQueueSendResult::Ok;
}
EndpointQueueRecvResult recv(MessageType &buffer) noexcept {
std::size_t h = head.load();
std::size_t t = tail.load();
std::size_t h = head.load(std::memory_order::acquire);
std::size_t t = tail.load(std::memory_order::acquire);
if (h == t)
return EndpointQueueRecvResult::ErrEmpty;
@@ -85,15 +106,16 @@ public:
buffer = std::move(this->buffer[h]);
h = (h + 1) & QSIZE_MASK;
head.store(h);
head.store(h, std::memory_order_release);
return EndpointQueueRecvResult::Ok;
}
void recv_unsafe(MessageType &buffer, std::size_t old_head) noexcept {
void recv_unsafe(MessageType &buffer, std::size_t &old_head) noexcept {
buffer = std::move(this->buffer[old_head]);
old_head = (old_head + 1) & QSIZE_MASK;
head.store((old_head + 1) & QSIZE_MASK);
head.store(old_head, std::memory_order_release);
}
private:
@@ -108,7 +130,7 @@ class FanInFabric {
public:
using BitmapType = uint64_t;
static constexpr std::size_t QSIZE = 1 << qsize_expt;
static constexpr std::size_t QSIZE = 1ULL << qsize_expt;
static constexpr std::size_t QSIZE_MASK = QSIZE - 1;
static constexpr std::size_t BITMAP_SIZE = sizeof(BitmapType) * 8;
static constexpr BitmapType THREAD_BITMAP_MASK =
@@ -116,19 +138,13 @@ public:
? ~BitmapType(0)
: (BitmapType(1) << IN_THREAD_CNT) - 1;
static_assert(IN_THREAD_CNT <= BITMAP_SIZE, "Producer limit exceeded");
static_assert(IN_THREAD_CNT <= BITMAP_SIZE, "Insufficient bitmap size");
FanInFabric() noexcept {}
FanInFabric(const FanInFabric<MessageType, qsize_expt, IN_THREAD_CNT>
&) = delete;
FanInFabric &
operator=(const FanInFabric<MessageType, qsize_expt, IN_THREAD_CNT> &) =
delete;
FanInFabric(FanInFabric<MessageType, qsize_expt, IN_THREAD_CNT> &&) =
delete;
FanInFabric &
operator=(FanInFabric<MessageType, qsize_expt, IN_THREAD_CNT> &&) =
delete;
FanInFabric(const FanInFabric &) = delete;
FanInFabric &operator=(const FanInFabric &) = delete;
FanInFabric(FanInFabric &&) = delete;
FanInFabric &operator=(FanInFabric &&) = delete;
void init() noexcept {
for (int i = 0; i < IN_THREAD_CNT; ++i)
@@ -350,7 +366,9 @@ private:
}
}
} dl;
std::size_t rotation;
BitmapType rotate(BitmapType bitmap) const noexcept {
bitmap &= THREAD_BITMAP_MASK;
@@ -363,6 +381,336 @@ private:
}
};
template <typename MessageType, std::size_t consumer_qsize_expt,
std::size_t BUFFER_SIZE, std::size_t reclaim_qsize_expt,
std::size_t OUT_THREAD_CNT>
class FanOutFabric {
using BitmapType = uint64_t;
using RefCountType = uint32_t;
using IndexType = uint16_t;
static constexpr std::size_t CONSUMER_QSIZE = 1ULL
<< consumer_qsize_expt;
static constexpr std::size_t CONSUMER_QSIZE_MASK = CONSUMER_QSIZE - 1;
static constexpr std::size_t RECLAIM_QSIZE = 1ULL << reclaim_qsize_expt;
static constexpr std::size_t BITMAP_SIZE = sizeof(BitmapType) * 8;
static constexpr BitmapType THREAD_BITMAP_MASK =
OUT_THREAD_CNT == BITMAP_SIZE
? ~BitmapType(0)
: (BitmapType(1) << OUT_THREAD_CNT) - 1;
static_assert(OUT_THREAD_CNT <= BITMAP_SIZE,
"Insufficient bitmap size");
static_assert(std::numeric_limits<IndexType>::max() >= BUFFER_SIZE,
"Insuffiently large IndexType");
FanOutFabric() noexcept {}
FanOutFabric(const FanOutFabric &) = delete;
FanOutFabric &operator=(const FanOutFabric &) = delete;
FanOutFabric(FanOutFabric &&) = delete;
FanOutFabric &operator=(FanOutFabric &&) = delete;
template <std::size_t thread_id> class BorrowedRef {
public:
static_assert(thread_id < OUT_THREAD_CNT,
"Thread ID out of bounds");
BorrowedRef() noexcept
: parent(nullptr), slot(BUFFER_SIZE), payload(nullptr),
released(true) {}
BorrowedRef(const BorrowedRef &) = delete;
BorrowedRef &operator=(const BorrowedRef &) = delete;
BorrowedRef(BorrowedRef &&) = delete;
BorrowedRef &operator=(BorrowedRef &&) = delete;
~BorrowedRef() noexcept {
if (is_valid())
release();
}
void init(FanOutFabric *parent, IndexType slot,
const MessageType *payload) noexcept {
this->parent = parent;
this->slot = slot;
this->payload = payload;
released = false;
}
FanOutFabricDeallocResult release() noexcept {
released = true;
payload = nullptr;
return parent->dealloc_try<thread_id>(slot);
}
bool is_valid() const noexcept {
return !released && payload != nullptr &&
parent != nullptr;
}
bool is_released() const noexcept {
return released || payload == nullptr;
}
const MessageType &get() const noexcept { return *payload; }
IndexType get_slot() const noexcept { return slot; }
private:
FanOutFabric *parent;
IndexType slot;
const MessageType *payload;
bool released;
};
void init() noexcept {
for (int i = 0; i < OUT_THREAD_CNT; ++i) {
reclaim_queues[i].init();
consumer_queues[i].init();
}
reclaim_hint.store(0, std::memory_order_release);
buffer.resize(BUFFER_SIZE);
fs.init();
rotation = 0;
}
// Prohibitively expensive
template <std::size_t thread_id>
FanOutFabricDeallocResult flush() noexcept {
static_assert(thread_id < OUT_THREAD_CNT,
"Thread ID out of bounds");
std::size_t head = consumer_queues[thread_id].get_head();
std::size_t tail = consumer_queues[thread_id].get_tail();
auto &qbuffer = consumer_queues[thread_id].get_buffer();
for (std::size_t i = head; i != tail;
i = ((i + 1) & CONSUMER_QSIZE_MASK)) {
RefCountType cnt =
buffer[qbuffer[i]].count.fetch_sub(1);
if (cnt == 1 && dealloc_try<thread_id>(qbuffer[i]) !=
FanOutFabricDeallocResult::Ok)
return FanOutFabricDeallocResult::
ErrReclaimQueueFull;
}
consumer_queues[thread_id].flush();
return FanOutFabricDeallocResult();
}
void flush_all() noexcept {
for (int i = 0; i < OUT_THREAD_CNT; ++i) {
reclaim_queues[i].flush();
consumer_queues[i].flush();
}
reclaim_hint.store(0, std::memory_order_release);
fs.init();
reclaim_rot = 0;
}
template <std::size_t thread_id>
BorrowedRef<thread_id> get_empty_borrowed_ref() const noexcept {
return BorrowedRef<thread_id>();
}
IndexType free_size() const noexcept { return fs.size(); }
template <std::size_t thread_id> std::size_t size() const noexcept {
static_assert(thread_id < OUT_THREAD_CNT,
"Thread ID out of bounds");
return consumer_queues[thread_id].size();
}
FanOutFabricSendResult<BitmapType>
send(MessageType &&message, BitmapType policy,
IndexType reclaim_budget) noexcept {
if (fs.is_empty()) {
reclaim(reclaim_budget);
if (fs.is_empty())
return {policy,
FanOutFabricSendResult<
BitmapType>::ErrFullFailure};
}
IndexType slot = fs.pop_unsafe();
RefCountType failed_cnt = 0;
RefCountType try_cnt = std::popcount(policy);
buffer[slot].payload = std::move(message);
buffer[slot].count.store(try_cnt);
for (std::size_t i = 0; i < OUT_THREAD_CNT; ++i) {
if ((policy & (BitmapType(1) << i)) &&
consumer_queues[i].send(slot) !=
EndpointQueueSendResult::Ok)
++failed_cnt;
else
policy &= (~(BitmapType(1) << i)) &
THREAD_BITMAP_MASK;
}
buffer[slot].count.fetch_sub(failed_cnt);
if (try_cnt == failed_cnt) {
fs.push(slot);
return {policy, FanOutFabricSendResult<
BitmapType>::ErrFullFailure};
} else if (failed_cnt) {
return {policy, FanOutFabricSendResult<
BitmapType>::ErrPartialSuccess};
} else {
return {0, FanOutFabricSendResult<BitmapType>::Ok};
}
}
template <std::size_t thread_id>
FanOutFabricRecvResult recv(BorrowedRef<thread_id> &token) noexcept {
static_assert(thread_id < OUT_THREAD_CNT,
"Thread ID out of bounds");
std::size_t head = consumer_queues[thread_id].get_head();
std::size_t tail = consumer_queues[thread_id].get_tail();
if (head == tail)
return FanOutFabricRecvResult::ErrEmpty;
if (token.is_valid())
token.release();
IndexType slot;
consumer_queues[thread_id].recv_unsafe(slot, head);
token.init(this, slot, &(buffer[slot].payload));
return FanOutFabricRecvResult::Ok;
}
IndexType reclaim(IndexType budget) noexcept {
IndexType reclaim_cnt = 0;
BitmapType bitmap =
reclaim_hint.load(std::memory_order_relaxed);
bitmap = rotate(bitmap);
for (std::size_t i = 0;
reclaim_cnt < budget && i < OUT_THREAD_CNT; ++i) {
if (!(bitmap & (BitmapType(1) << i)))
continue;
std::size_t idx = (i + reclaim_rot) % OUT_THREAD_CNT;
reclaim_hint.fetch_and(~(BitmapType(1) << idx),
std::memory_order_relaxed);
std::size_t head = reclaim_queues[idx].get_head();
std::size_t tail = reclaim_queues[idx].get_tail();
while (reclaim_cnt < budget && head != tail) {
IndexType tmp;
reclaim_queues[idx].recv_unsafe(tmp, head);
fs.push(tmp);
++reclaim_cnt;
}
if (reclaim_cnt >= budget &&
!reclaim_queues[idx].is_empty())
reclaim_hint.fetch_or(
BitmapType(1) << idx,
std::memory_order_relaxed);
reclaim_rot = (idx + 1) % OUT_THREAD_CNT;
}
return reclaim_cnt;
}
IndexType reclaim_unlimited() noexcept { return reclaim(BUFFER_SIZE); }
IndexType reclaim_scan_all(IndexType budget) noexcept {
IndexType reclaim_cnt = 0;
for (std::size_t i = 0;
reclaim_cnt < budget && i < OUT_THREAD_CNT; ++i) {
std::size_t idx = (i + reclaim_rot) % OUT_THREAD_CNT;
reclaim_hint.fetch_and(~(BitmapType(1) << idx),
std::memory_order_relaxed);
std::size_t head = reclaim_queues[idx].get_head();
std::size_t tail = reclaim_queues[idx].get_tail();
while (reclaim_cnt < budget && head != tail) {
IndexType tmp;
reclaim_queues[idx].recv_unsafe(tmp, head);
fs.push(tmp);
++reclaim_cnt;
}
if (reclaim_cnt >= budget &&
!reclaim_queues[idx].is_empty())
reclaim_hint.fetch_or(
BitmapType(1) << idx,
std::memory_order_relaxed);
reclaim_rot = (idx + 1) % OUT_THREAD_CNT;
}
return reclaim_cnt;
}
template <std::size_t thread_id>
FanOutFabricDeallocResult dealloc_try(IndexType slot) noexcept {
static_assert(thread_id < OUT_THREAD_CNT,
"Thread ID out of bounds");
auto res = reclaim_queues[thread_id].send(slot);
if (res == EndpointQueueSendResult::Ok) {
reclaim_hint.fetch_or(BitmapType(1) << thread_id,
std::memory_order_relaxed);
return FanOutFabricDeallocResult::Ok;
} else {
return FanOutFabricDeallocResult::ErrReclaimQueueFull;
}
}
private:
std::array<EndpointQueue<IndexType, reclaim_qsize_expt>, OUT_THREAD_CNT>
reclaim_queues;
std::array<EndpointQueue<IndexType, consumer_qsize_expt>,
OUT_THREAD_CNT>
consumer_queues;
alignas(CLS) std::atomic<BitmapType> reclaim_hint;
struct alignas(CLS) BufferSlot {
std::atomic<RefCountType> count;
MessageType payload;
};
std::vector<BufferSlot> buffer;
struct alignas(CLS) FreeStack {
std::array<IndexType, BUFFER_SIZE> free;
int top;
void init() {
for (int i = 0; i < BUFFER_SIZE; ++i)
free[i] = i;
top = BUFFER_SIZE;
}
void push(IndexType val) {
free[top] = val;
++top;
}
IndexType pop() {
if (top)
return free[--top];
return BUFFER_SIZE;
}
IndexType pop_unsafe() { return free[--top]; }
bool is_empty() const { return top == 0; }
int size() const { return top; }
} fs;
std::size_t reclaim_rot;
BitmapType rotate(BitmapType bitmap) const noexcept {
bitmap &= THREAD_BITMAP_MASK;
if (reclaim_rot == 0)
return bitmap;
return ((bitmap >> reclaim_rot) |
(bitmap << (OUT_THREAD_CNT - reclaim_rot))) &
THREAD_BITMAP_MASK;
}
};
} // namespace THWeaver
#endif