Compare commits
6 Commits
ffd5ce5b12
...
dd3d6be073
| Author | SHA1 | Date | |
|---|---|---|---|
| dd3d6be073 | |||
| 103903be8e | |||
| c1745b7bfc | |||
| 9258d6c197 | |||
| bc58b8982a | |||
| 00011f7c88 |
445
include/weaver.h
445
include/weaver.h
@@ -1,11 +1,14 @@
|
||||
#ifndef THWEAVER_FABRIC_H
|
||||
#define THWEAVER_FABRIC_H
|
||||
#ifndef THREADWEAVER_WEAVER_H
|
||||
#define THREADWEAVER_WEAVER_H
|
||||
|
||||
#include <array>
|
||||
#include <atomic>
|
||||
#include <bit>
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <limits>
|
||||
#include <new>
|
||||
#include <utility>
|
||||
|
||||
namespace THWeaver {
|
||||
|
||||
@@ -23,44 +26,63 @@ enum class FanInFabricRecvTryResult { Ok, ErrTryFailed };
|
||||
|
||||
enum class FanInFabricRecvAggrResult { Ok, ErrAggrFailed };
|
||||
|
||||
template <typename BitmapType> struct FanOutFabricSendResult {
|
||||
BitmapType failures;
|
||||
enum { Ok, ErrPartialSuccess, ErrFullFailure } result;
|
||||
};
|
||||
|
||||
enum class FanOutFabricRecvResult { Ok, ErrEmpty };
|
||||
|
||||
enum class FanOutFabricDeallocResult { Ok, ErrReclaimQueueFull };
|
||||
|
||||
template <typename MessageType, std::size_t qsize_expt> class EndpointQueue {
|
||||
public:
|
||||
static constexpr std::size_t QSIZE = 1 << qsize_expt;
|
||||
static constexpr std::size_t QSIZE_MASK = QSIZE - 1;
|
||||
|
||||
EndpointQueue() noexcept {};
|
||||
EndpointQueue(const EndpointQueue<MessageType, qsize_expt> &) = delete;
|
||||
EndpointQueue &
|
||||
operator=(const EndpointQueue<MessageType, qsize_expt> &) = delete;
|
||||
EndpointQueue(EndpointQueue<MessageType, qsize_expt> &&) = delete;
|
||||
EndpointQueue &
|
||||
operator=(EndpointQueue<MessageType, qsize_expt> &&) = delete;
|
||||
EndpointQueue(const EndpointQueue &) = delete;
|
||||
EndpointQueue &operator=(const EndpointQueue &) = delete;
|
||||
EndpointQueue(EndpointQueue &&) = delete;
|
||||
EndpointQueue &operator=(EndpointQueue &&) = delete;
|
||||
|
||||
void init() noexcept {
|
||||
head.store(0);
|
||||
tail.store(0);
|
||||
head.store(0, std::memory_order_release);
|
||||
tail.store(0, std::memory_order_release);
|
||||
}
|
||||
|
||||
void flush() noexcept {
|
||||
head.store(0);
|
||||
tail.store(0);
|
||||
head.store(0, std::memory_order_release);
|
||||
tail.store(0, std::memory_order_release);
|
||||
}
|
||||
|
||||
std::size_t size() const noexcept {
|
||||
return ((tail.load() + QSIZE_MASK) - head.load()) & QSIZE_MASK;
|
||||
return ((tail.load(std::memory_order::acquire) + QSIZE_MASK) -
|
||||
head.load(std::memory_order::acquire)) &
|
||||
QSIZE_MASK;
|
||||
}
|
||||
|
||||
bool is_empty() const noexcept { return head.load() == tail.load(); }
|
||||
bool is_empty() const noexcept {
|
||||
return head.load(std::memory_order::acquire) ==
|
||||
tail.load(std::memory_order::acquire);
|
||||
}
|
||||
bool is_full() const noexcept {
|
||||
return head.load() == ((tail.load() + 1) & QSIZE_MASK);
|
||||
return head.load(std::memory_order::acquire) ==
|
||||
((tail.load(std::memory_order::acquire) + 1) &
|
||||
QSIZE_MASK);
|
||||
}
|
||||
|
||||
std::size_t get_head() const noexcept { return head.load(); }
|
||||
std::size_t get_tail() const noexcept { return tail.load(); }
|
||||
std::size_t get_head() const noexcept {
|
||||
return head.load(std::memory_order::acquire);
|
||||
}
|
||||
std::size_t get_tail() const noexcept {
|
||||
return tail.load(std::memory_order::acquire);
|
||||
}
|
||||
std::array<MessageType, QSIZE> &get_buffer() noexcept { return buffer; }
|
||||
|
||||
EndpointQueueSendResult send(MessageType &&message) noexcept {
|
||||
std::size_t h = head.load();
|
||||
std::size_t t = tail.load();
|
||||
std::size_t h = head.load(std::memory_order::acquire);
|
||||
std::size_t t = tail.load(std::memory_order::acquire);
|
||||
|
||||
if (h == ((t + 1) & QSIZE_MASK))
|
||||
return EndpointQueueSendResult::ErrFull;
|
||||
@@ -68,14 +90,14 @@ public:
|
||||
buffer[t] = std::move(message);
|
||||
|
||||
t = (t + 1) & QSIZE_MASK;
|
||||
tail.store(t);
|
||||
tail.store(t, std::memory_order_release);
|
||||
|
||||
return EndpointQueueSendResult::Ok;
|
||||
}
|
||||
|
||||
EndpointQueueRecvResult recv(MessageType &buffer) noexcept {
|
||||
std::size_t h = head.load();
|
||||
std::size_t t = tail.load();
|
||||
std::size_t h = head.load(std::memory_order::acquire);
|
||||
std::size_t t = tail.load(std::memory_order::acquire);
|
||||
|
||||
if (h == t)
|
||||
return EndpointQueueRecvResult::ErrEmpty;
|
||||
@@ -83,15 +105,16 @@ public:
|
||||
buffer = std::move(this->buffer[h]);
|
||||
|
||||
h = (h + 1) & QSIZE_MASK;
|
||||
head.store(h);
|
||||
head.store(h, std::memory_order_release);
|
||||
|
||||
return EndpointQueueRecvResult::Ok;
|
||||
}
|
||||
|
||||
void recv_unsafe(MessageType &buffer, std::size_t old_head) noexcept {
|
||||
void recv_unsafe(MessageType &buffer, std::size_t &old_head) noexcept {
|
||||
buffer = std::move(this->buffer[old_head]);
|
||||
old_head = (old_head + 1) & QSIZE_MASK;
|
||||
|
||||
head.store((old_head + 1) & QSIZE_MASK);
|
||||
head.store(old_head, std::memory_order_release);
|
||||
}
|
||||
|
||||
private:
|
||||
@@ -106,7 +129,7 @@ class FanInFabric {
|
||||
public:
|
||||
using BitmapType = uint64_t;
|
||||
|
||||
static constexpr std::size_t QSIZE = 1 << qsize_expt;
|
||||
static constexpr std::size_t QSIZE = 1ULL << qsize_expt;
|
||||
static constexpr std::size_t QSIZE_MASK = QSIZE - 1;
|
||||
static constexpr std::size_t BITMAP_SIZE = sizeof(BitmapType) * 8;
|
||||
static constexpr BitmapType THREAD_BITMAP_MASK =
|
||||
@@ -114,22 +137,16 @@ public:
|
||||
? ~BitmapType(0)
|
||||
: (BitmapType(1) << IN_THREAD_CNT) - 1;
|
||||
|
||||
static_assert(IN_THREAD_CNT <= BITMAP_SIZE, "Producer limit exceeded");
|
||||
static_assert(IN_THREAD_CNT <= BITMAP_SIZE, "Insufficient bitmap size");
|
||||
|
||||
FanInFabric() noexcept {}
|
||||
FanInFabric(const FanInFabric<MessageType, qsize_expt, IN_THREAD_CNT>
|
||||
&) = delete;
|
||||
FanInFabric &
|
||||
operator=(const FanInFabric<MessageType, qsize_expt, IN_THREAD_CNT> &) =
|
||||
delete;
|
||||
FanInFabric(FanInFabric<MessageType, qsize_expt, IN_THREAD_CNT> &&) =
|
||||
delete;
|
||||
FanInFabric &
|
||||
operator=(FanInFabric<MessageType, qsize_expt, IN_THREAD_CNT> &&) =
|
||||
delete;
|
||||
FanInFabric(const FanInFabric &) = delete;
|
||||
FanInFabric &operator=(const FanInFabric &) = delete;
|
||||
FanInFabric(FanInFabric &&) = delete;
|
||||
FanInFabric &operator=(FanInFabric &&) = delete;
|
||||
|
||||
void init() noexcept {
|
||||
for (int i = 0; i < IN_THREAD_CNT; ++i)
|
||||
for (std::size_t i = 0; i < IN_THREAD_CNT; ++i)
|
||||
in_queues[i].init();
|
||||
hint.store(0);
|
||||
dl.init();
|
||||
@@ -141,7 +158,8 @@ public:
|
||||
"Thread ID out of bounds");
|
||||
|
||||
in_queues[thread_id].flush();
|
||||
hint.fetch_and(~(1 << thread_id), std::memory_order_relaxed);
|
||||
hint.fetch_and(~(BitmapType(1) << thread_id),
|
||||
std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
void flush_all() noexcept {
|
||||
@@ -158,7 +176,7 @@ public:
|
||||
"Thread ID out of bounds");
|
||||
|
||||
return static_cast<FanInFabricSendResult>(static_cast<uint32_t>(
|
||||
in_queues[thread_id].send(message)));
|
||||
in_queues[thread_id].send(std::move(message))));
|
||||
}
|
||||
|
||||
FanInFabricRecvBitmapResult
|
||||
@@ -178,7 +196,7 @@ public:
|
||||
std::size_t qsize = ((tail + QSIZE_MASK) - head) & QSIZE_MASK;
|
||||
|
||||
if (qsize <= 1)
|
||||
hint.fetch_and(~(1 << hinted_id),
|
||||
hint.fetch_and(~(BitmapType(1) << hinted_id),
|
||||
std::memory_order_relaxed);
|
||||
|
||||
if (qsize) {
|
||||
@@ -205,7 +223,7 @@ public:
|
||||
((tail + QSIZE_MASK) - head) & QSIZE_MASK;
|
||||
|
||||
if (qsize <= 1)
|
||||
hint.fetch_and(~(1 << hinted_id),
|
||||
hint.fetch_and(~(BitmapType(1) << hinted_id),
|
||||
std::memory_order_relaxed);
|
||||
|
||||
if (qsize) {
|
||||
@@ -241,7 +259,7 @@ public:
|
||||
((tail + QSIZE_MASK) - head) & QSIZE_MASK;
|
||||
|
||||
if (qsize <= 1)
|
||||
hint.fetch_and(~(1 << hinted_id),
|
||||
hint.fetch_and(~(BitmapType(1) << hinted_id),
|
||||
std::memory_order_relaxed);
|
||||
|
||||
if (qsize) {
|
||||
@@ -274,15 +292,15 @@ public:
|
||||
uint8_t curr = dl.first;
|
||||
|
||||
if (in_queues[curr].size())
|
||||
bitmap |= 1 << curr;
|
||||
bitmap |= BitmapType(1) << curr;
|
||||
|
||||
do {
|
||||
curr = dl.next[curr];
|
||||
if (in_queues[curr].size())
|
||||
bitmap |= 1 << curr;
|
||||
bitmap |= BitmapType(1) << curr;
|
||||
} while (curr != dl.last);
|
||||
|
||||
hint.fetch_or(bitmap, std::memory_order_release);
|
||||
hint.fetch_or(bitmap, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
template <std::size_t thread_id> std::size_t size() const noexcept {
|
||||
@@ -317,7 +335,7 @@ private:
|
||||
uint8_t first;
|
||||
uint8_t last;
|
||||
void init() noexcept {
|
||||
for (int i = 0; i < IN_THREAD_CNT; ++i) {
|
||||
for (std::size_t i = 0; i < IN_THREAD_CNT; ++i) {
|
||||
next[i] = (i + 1) % IN_THREAD_CNT;
|
||||
prev[i] =
|
||||
(i + IN_THREAD_CNT - 1) % IN_THREAD_CNT;
|
||||
@@ -347,7 +365,9 @@ private:
|
||||
}
|
||||
}
|
||||
} dl;
|
||||
|
||||
std::size_t rotation;
|
||||
|
||||
BitmapType rotate(BitmapType bitmap) const noexcept {
|
||||
bitmap &= THREAD_BITMAP_MASK;
|
||||
|
||||
@@ -360,6 +380,337 @@ private:
|
||||
}
|
||||
};
|
||||
|
||||
template <typename MessageType, std::size_t consumer_qsize_expt,
|
||||
std::size_t BUFFER_SIZE, std::size_t reclaim_qsize_expt,
|
||||
std::size_t OUT_THREAD_CNT>
|
||||
class FanOutFabric {
|
||||
public:
|
||||
using BitmapType = uint64_t;
|
||||
using RefCountType = uint32_t;
|
||||
using IndexType = uint16_t;
|
||||
|
||||
static constexpr std::size_t CONSUMER_QSIZE = 1ULL
|
||||
<< consumer_qsize_expt;
|
||||
static constexpr std::size_t CONSUMER_QSIZE_MASK = CONSUMER_QSIZE - 1;
|
||||
static constexpr std::size_t RECLAIM_QSIZE = 1ULL << reclaim_qsize_expt;
|
||||
static constexpr std::size_t BITMAP_SIZE = sizeof(BitmapType) * 8;
|
||||
static constexpr BitmapType THREAD_BITMAP_MASK =
|
||||
OUT_THREAD_CNT == BITMAP_SIZE
|
||||
? ~BitmapType(0)
|
||||
: (BitmapType(1) << OUT_THREAD_CNT) - 1;
|
||||
|
||||
static_assert(OUT_THREAD_CNT <= BITMAP_SIZE,
|
||||
"Insufficient bitmap size");
|
||||
static_assert(std::numeric_limits<IndexType>::max() >= BUFFER_SIZE,
|
||||
"Insuffiently large IndexType");
|
||||
|
||||
FanOutFabric() noexcept {}
|
||||
FanOutFabric(const FanOutFabric &) = delete;
|
||||
FanOutFabric &operator=(const FanOutFabric &) = delete;
|
||||
FanOutFabric(FanOutFabric &&) = delete;
|
||||
FanOutFabric &operator=(FanOutFabric &&) = delete;
|
||||
|
||||
template <std::size_t thread_id> class BorrowedRef {
|
||||
public:
|
||||
static_assert(thread_id < OUT_THREAD_CNT,
|
||||
"Thread ID out of bounds");
|
||||
|
||||
BorrowedRef() noexcept
|
||||
: parent(nullptr), slot(BUFFER_SIZE), payload(nullptr),
|
||||
released(true) {}
|
||||
BorrowedRef(const BorrowedRef &) = delete;
|
||||
BorrowedRef &operator=(const BorrowedRef &) = delete;
|
||||
BorrowedRef(BorrowedRef &&) = delete;
|
||||
BorrowedRef &operator=(BorrowedRef &&) = delete;
|
||||
|
||||
~BorrowedRef() noexcept {
|
||||
if (is_valid())
|
||||
release();
|
||||
}
|
||||
|
||||
void init(FanOutFabric *parent, IndexType slot,
|
||||
const MessageType *payload) noexcept {
|
||||
this->parent = parent;
|
||||
this->slot = slot;
|
||||
this->payload = payload;
|
||||
released = false;
|
||||
}
|
||||
|
||||
FanOutFabricDeallocResult release() noexcept {
|
||||
released = true;
|
||||
payload = nullptr;
|
||||
return parent->dealloc_try<thread_id>(slot);
|
||||
}
|
||||
|
||||
bool is_valid() const noexcept {
|
||||
return !released && payload != nullptr &&
|
||||
parent != nullptr;
|
||||
}
|
||||
|
||||
bool is_released() const noexcept {
|
||||
return released || payload == nullptr;
|
||||
}
|
||||
|
||||
const MessageType &get() const noexcept { return *payload; }
|
||||
IndexType get_slot() const noexcept { return slot; }
|
||||
|
||||
private:
|
||||
FanOutFabric *parent;
|
||||
IndexType slot;
|
||||
const MessageType *payload;
|
||||
bool released;
|
||||
};
|
||||
|
||||
void init() noexcept {
|
||||
for (std::size_t i = 0; i < OUT_THREAD_CNT; ++i) {
|
||||
reclaim_queues[i].init();
|
||||
consumer_queues[i].init();
|
||||
}
|
||||
reclaim_hint.store(0, std::memory_order_release);
|
||||
fs.init();
|
||||
reclaim_rot = 0;
|
||||
}
|
||||
|
||||
// Prohibitively expensive
|
||||
template <std::size_t thread_id>
|
||||
FanOutFabricDeallocResult flush() noexcept {
|
||||
static_assert(thread_id < OUT_THREAD_CNT,
|
||||
"Thread ID out of bounds");
|
||||
|
||||
std::size_t head = consumer_queues[thread_id].get_head();
|
||||
std::size_t tail = consumer_queues[thread_id].get_tail();
|
||||
auto &qbuffer = consumer_queues[thread_id].get_buffer();
|
||||
|
||||
for (std::size_t i = head; i != tail;
|
||||
i = ((i + 1) & CONSUMER_QSIZE_MASK)) {
|
||||
RefCountType cnt = buffer[qbuffer[i]].count.fetch_sub(
|
||||
1, std::memory_order_relaxed);
|
||||
|
||||
if (cnt == 1 && dealloc_try<thread_id>(qbuffer[i]) !=
|
||||
FanOutFabricDeallocResult::Ok)
|
||||
return FanOutFabricDeallocResult::
|
||||
ErrReclaimQueueFull;
|
||||
}
|
||||
|
||||
consumer_queues[thread_id].flush();
|
||||
return FanOutFabricDeallocResult();
|
||||
}
|
||||
|
||||
void flush_all() noexcept {
|
||||
for (int i = 0; i < OUT_THREAD_CNT; ++i) {
|
||||
reclaim_queues[i].flush();
|
||||
consumer_queues[i].flush();
|
||||
}
|
||||
reclaim_hint.store(0, std::memory_order_release);
|
||||
fs.init();
|
||||
reclaim_rot = 0;
|
||||
}
|
||||
|
||||
template <std::size_t thread_id>
|
||||
BorrowedRef<thread_id> get_empty_borrowed_ref() const noexcept {
|
||||
return BorrowedRef<thread_id>();
|
||||
}
|
||||
IndexType free_size() const noexcept { return fs.size(); }
|
||||
template <std::size_t thread_id> std::size_t size() const noexcept {
|
||||
static_assert(thread_id < OUT_THREAD_CNT,
|
||||
"Thread ID out of bounds");
|
||||
|
||||
return consumer_queues[thread_id].size();
|
||||
}
|
||||
|
||||
FanOutFabricSendResult<BitmapType>
|
||||
send(MessageType &&message, BitmapType policy,
|
||||
IndexType reclaim_budget) noexcept {
|
||||
if (fs.is_empty()) {
|
||||
reclaim(reclaim_budget);
|
||||
|
||||
if (fs.is_empty())
|
||||
return {policy,
|
||||
FanOutFabricSendResult<
|
||||
BitmapType>::ErrFullFailure};
|
||||
}
|
||||
|
||||
IndexType slot = fs.pop_unsafe();
|
||||
RefCountType failed_cnt = 0;
|
||||
RefCountType try_cnt = std::popcount(policy);
|
||||
buffer[slot].payload = std::move(message);
|
||||
buffer[slot].count.store(try_cnt);
|
||||
|
||||
for (std::size_t i = 0; i < OUT_THREAD_CNT; ++i) {
|
||||
if ((policy & (BitmapType(1) << i)) &&
|
||||
consumer_queues[i].send(std::move(slot)) !=
|
||||
EndpointQueueSendResult::Ok)
|
||||
++failed_cnt;
|
||||
else
|
||||
policy &= (~(BitmapType(1) << i)) &
|
||||
THREAD_BITMAP_MASK;
|
||||
}
|
||||
|
||||
buffer[slot].count.fetch_sub(failed_cnt,
|
||||
std::memory_order_relaxed);
|
||||
|
||||
if (try_cnt == failed_cnt) {
|
||||
fs.push(slot);
|
||||
return {policy, FanOutFabricSendResult<
|
||||
BitmapType>::ErrFullFailure};
|
||||
} else if (failed_cnt) {
|
||||
return {policy, FanOutFabricSendResult<
|
||||
BitmapType>::ErrPartialSuccess};
|
||||
} else {
|
||||
return {0, FanOutFabricSendResult<BitmapType>::Ok};
|
||||
}
|
||||
}
|
||||
|
||||
template <std::size_t thread_id>
|
||||
FanOutFabricRecvResult recv(BorrowedRef<thread_id> &token) noexcept {
|
||||
static_assert(thread_id < OUT_THREAD_CNT,
|
||||
"Thread ID out of bounds");
|
||||
|
||||
std::size_t head = consumer_queues[thread_id].get_head();
|
||||
std::size_t tail = consumer_queues[thread_id].get_tail();
|
||||
|
||||
if (head == tail)
|
||||
return FanOutFabricRecvResult::ErrEmpty;
|
||||
|
||||
if (token.is_valid())
|
||||
token.release();
|
||||
|
||||
IndexType slot;
|
||||
consumer_queues[thread_id].recv_unsafe(slot, head);
|
||||
|
||||
token.init(this, slot, &(buffer[slot].payload));
|
||||
return FanOutFabricRecvResult::Ok;
|
||||
}
|
||||
|
||||
IndexType reclaim(IndexType budget) noexcept {
|
||||
IndexType reclaim_cnt = 0;
|
||||
|
||||
BitmapType bitmap =
|
||||
reclaim_hint.load(std::memory_order_relaxed);
|
||||
bitmap = rotate(bitmap);
|
||||
|
||||
for (std::size_t i = 0;
|
||||
reclaim_cnt < budget && i < OUT_THREAD_CNT; ++i) {
|
||||
if (!(bitmap & (BitmapType(1) << i)))
|
||||
continue;
|
||||
|
||||
std::size_t idx = (i + reclaim_rot) % OUT_THREAD_CNT;
|
||||
|
||||
reclaim_hint.fetch_and(~(BitmapType(1) << idx),
|
||||
std::memory_order_relaxed);
|
||||
|
||||
std::size_t head = reclaim_queues[idx].get_head();
|
||||
std::size_t tail = reclaim_queues[idx].get_tail();
|
||||
while (reclaim_cnt < budget && head != tail) {
|
||||
IndexType tmp;
|
||||
reclaim_queues[idx].recv_unsafe(tmp, head);
|
||||
fs.push(tmp);
|
||||
++reclaim_cnt;
|
||||
}
|
||||
|
||||
if (reclaim_cnt >= budget &&
|
||||
head != reclaim_queues[idx].get_tail())
|
||||
reclaim_hint.fetch_or(
|
||||
BitmapType(1) << idx,
|
||||
std::memory_order_relaxed);
|
||||
reclaim_rot = (idx + 1) % OUT_THREAD_CNT;
|
||||
}
|
||||
|
||||
return reclaim_cnt;
|
||||
}
|
||||
IndexType reclaim_unlimited() noexcept { return reclaim(BUFFER_SIZE); }
|
||||
IndexType reclaim_scan_all(IndexType budget) noexcept {
|
||||
IndexType reclaim_cnt = 0;
|
||||
|
||||
for (std::size_t i = 0;
|
||||
reclaim_cnt < budget && i < OUT_THREAD_CNT; ++i) {
|
||||
std::size_t idx = (i + reclaim_rot) % OUT_THREAD_CNT;
|
||||
|
||||
reclaim_hint.fetch_and(~(BitmapType(1) << idx),
|
||||
std::memory_order_relaxed);
|
||||
|
||||
std::size_t head = reclaim_queues[idx].get_head();
|
||||
std::size_t tail = reclaim_queues[idx].get_tail();
|
||||
while (reclaim_cnt < budget && head != tail) {
|
||||
IndexType tmp;
|
||||
reclaim_queues[idx].recv_unsafe(tmp, head);
|
||||
fs.push(tmp);
|
||||
++reclaim_cnt;
|
||||
}
|
||||
|
||||
if (reclaim_cnt >= budget &&
|
||||
head != reclaim_queues[idx].get_tail())
|
||||
reclaim_hint.fetch_or(
|
||||
BitmapType(1) << idx,
|
||||
std::memory_order_relaxed);
|
||||
reclaim_rot = (idx + 1) % OUT_THREAD_CNT;
|
||||
}
|
||||
|
||||
return reclaim_cnt;
|
||||
}
|
||||
|
||||
template <std::size_t thread_id>
|
||||
FanOutFabricDeallocResult dealloc_try(IndexType slot) noexcept {
|
||||
static_assert(thread_id < OUT_THREAD_CNT,
|
||||
"Thread ID out of bounds");
|
||||
|
||||
auto res = reclaim_queues[thread_id].send(std::move(slot));
|
||||
if (res == EndpointQueueSendResult::Ok) {
|
||||
reclaim_hint.fetch_or(BitmapType(1) << thread_id,
|
||||
std::memory_order_relaxed);
|
||||
return FanOutFabricDeallocResult::Ok;
|
||||
} else {
|
||||
return FanOutFabricDeallocResult::ErrReclaimQueueFull;
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
std::array<EndpointQueue<IndexType, reclaim_qsize_expt>, OUT_THREAD_CNT>
|
||||
reclaim_queues;
|
||||
std::array<EndpointQueue<IndexType, consumer_qsize_expt>,
|
||||
OUT_THREAD_CNT>
|
||||
consumer_queues;
|
||||
alignas(CLS) std::atomic<BitmapType> reclaim_hint;
|
||||
struct alignas(CLS) BufferSlot {
|
||||
std::atomic<RefCountType> count;
|
||||
MessageType payload;
|
||||
};
|
||||
std::array<BufferSlot, BUFFER_SIZE> buffer;
|
||||
struct alignas(CLS) FreeStack {
|
||||
std::array<IndexType, BUFFER_SIZE> free;
|
||||
int top;
|
||||
void init() {
|
||||
for (std::size_t i = 0; i < BUFFER_SIZE; ++i)
|
||||
free[i] = i;
|
||||
top = BUFFER_SIZE;
|
||||
}
|
||||
void push(IndexType val) {
|
||||
free[top] = val;
|
||||
++top;
|
||||
}
|
||||
IndexType pop() {
|
||||
if (top)
|
||||
return free[--top];
|
||||
return BUFFER_SIZE;
|
||||
}
|
||||
IndexType pop_unsafe() { return free[--top]; }
|
||||
bool is_empty() const { return top == 0; }
|
||||
int size() const { return top; }
|
||||
} fs;
|
||||
std::size_t reclaim_rot;
|
||||
|
||||
BitmapType rotate(BitmapType bitmap) const noexcept {
|
||||
bitmap &= THREAD_BITMAP_MASK;
|
||||
|
||||
if (reclaim_rot == 0)
|
||||
return bitmap;
|
||||
|
||||
return ((bitmap >> reclaim_rot) |
|
||||
(bitmap << (OUT_THREAD_CNT - reclaim_rot))) &
|
||||
THREAD_BITMAP_MASK;
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace THWeaver
|
||||
|
||||
#endif
|
||||
|
||||
161
tests/fan_in/basic_test.cc
Normal file
161
tests/fan_in/basic_test.cc
Normal file
@@ -0,0 +1,161 @@
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <cstdint>
|
||||
#include <iostream>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#include <weaver.h>
|
||||
|
||||
using std::chrono::nanoseconds;
|
||||
|
||||
constexpr std::size_t kThreads = 2;
|
||||
|
||||
uint64_t pack_message(uint32_t producer, uint32_t seq) {
|
||||
return (static_cast<uint64_t>(producer) << 32) |
|
||||
static_cast<uint64_t>(seq);
|
||||
}
|
||||
|
||||
void unpack_message(uint64_t value, uint32_t &producer, uint32_t &seq) {
|
||||
producer = static_cast<uint32_t>(value >> 32);
|
||||
seq = static_cast<uint32_t>(value & 0xffffffffu);
|
||||
}
|
||||
|
||||
using std::chrono::nanoseconds;
|
||||
using std::chrono::steady_clock;
|
||||
|
||||
int main() {
|
||||
int n = 0;
|
||||
if (!(std::cin >> n) || n <= 0) {
|
||||
std::cerr << "n must be positive\n";
|
||||
return 1;
|
||||
}
|
||||
|
||||
using Fabric = THWeaver::FanInFabric<uint64_t, 10, kThreads>;
|
||||
Fabric fabric;
|
||||
fabric.init();
|
||||
|
||||
std::atomic<bool> failed(false);
|
||||
|
||||
std::vector<int64_t> send_time0(n, -1);
|
||||
std::vector<int64_t> send_time1(n, -1);
|
||||
std::vector<int64_t> recv_time0(n, -1);
|
||||
std::vector<int64_t> recv_time1(n, -1);
|
||||
|
||||
std::thread producer0([&]() {
|
||||
for (int i = 1; i <= n && !failed.load(); ++i) {
|
||||
while (true) {
|
||||
auto now =
|
||||
std::chrono::steady_clock::now()
|
||||
.time_since_epoch();
|
||||
auto ns =
|
||||
std::chrono::duration_cast<nanoseconds>(
|
||||
now)
|
||||
.count();
|
||||
if (fabric.send<0>(pack_message(0, i)) ==
|
||||
THWeaver::FanInFabricSendResult::Ok) {
|
||||
send_time0[i - 1] = ns;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
std::thread producer1([&]() {
|
||||
for (int i = 1; i <= n && !failed.load(); ++i) {
|
||||
while (true) {
|
||||
auto now =
|
||||
std::chrono::steady_clock::now()
|
||||
.time_since_epoch();
|
||||
auto ns =
|
||||
std::chrono::duration_cast<nanoseconds>(
|
||||
now)
|
||||
.count();
|
||||
if (fabric.send<1>(pack_message(1, i)) ==
|
||||
THWeaver::FanInFabricSendResult::Ok) {
|
||||
send_time1[i - 1] = ns;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
std::thread consumer([&]() {
|
||||
std::vector<uint32_t> expected(kThreads, 1);
|
||||
std::size_t received = 0;
|
||||
const std::size_t total =
|
||||
static_cast<std::size_t>(n) * kThreads;
|
||||
|
||||
while (received < total && !failed.load()) {
|
||||
uint64_t value = 0;
|
||||
auto res = fabric.recv_try(value);
|
||||
if (res == THWeaver::FanInFabricRecvTryResult::Ok) {
|
||||
uint32_t producer = 0;
|
||||
uint32_t seq = 0;
|
||||
unpack_message(value, producer, seq);
|
||||
|
||||
if (producer >= kThreads ||
|
||||
seq != expected[producer]) {
|
||||
std::cerr
|
||||
<< "fan-in mismatch: producer "
|
||||
<< producer << " expected "
|
||||
<< expected[producer] << " got "
|
||||
<< seq << "\n";
|
||||
failed.store(true);
|
||||
break;
|
||||
}
|
||||
|
||||
if (producer == 0) {
|
||||
auto recv_time =
|
||||
std::chrono::steady_clock::now()
|
||||
.time_since_epoch();
|
||||
auto recv_ns =
|
||||
std::chrono::duration_cast<
|
||||
nanoseconds>(recv_time)
|
||||
.count();
|
||||
recv_time0[seq - 1] = recv_ns;
|
||||
} else {
|
||||
auto recv_time =
|
||||
std::chrono::steady_clock::now()
|
||||
.time_since_epoch();
|
||||
auto recv_ns =
|
||||
std::chrono::duration_cast<
|
||||
nanoseconds>(recv_time)
|
||||
.count();
|
||||
recv_time1[seq - 1] = recv_ns;
|
||||
}
|
||||
|
||||
++expected[producer];
|
||||
++received;
|
||||
continue;
|
||||
}
|
||||
|
||||
Fabric::BitmapType bitmap = 0;
|
||||
fabric.full_scan(bitmap);
|
||||
}
|
||||
});
|
||||
|
||||
producer0.join();
|
||||
producer1.join();
|
||||
consumer.join();
|
||||
|
||||
if (failed.load()) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
for (int i = 1; i <= n; ++i) {
|
||||
uint64_t payload0 = pack_message(0, i);
|
||||
uint64_t payload1 = pack_message(1, i);
|
||||
|
||||
int64_t send_ns0 = send_time0[i - 1];
|
||||
int64_t recv_ns0 = recv_time0[i - 1];
|
||||
int64_t send_ns1 = send_time1[i - 1];
|
||||
int64_t recv_ns1 = recv_time1[i - 1];
|
||||
|
||||
std::cout << payload0 << ","
|
||||
<< (recv_ns0 - send_ns0) << "\n";
|
||||
std::cout << payload1 << ","
|
||||
<< (recv_ns1 - send_ns1) << "\n";
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
134
tests/fan_out/basic_test.cc
Normal file
134
tests/fan_out/basic_test.cc
Normal file
@@ -0,0 +1,134 @@
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <cstdint>
|
||||
#include <iostream>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#include <weaver.h>
|
||||
|
||||
constexpr std::size_t kConsumers = 2;
|
||||
constexpr std::size_t kBufferSize = 256;
|
||||
|
||||
using std::chrono::nanoseconds;
|
||||
using std::chrono::steady_clock;
|
||||
|
||||
int main() {
|
||||
int n = 0;
|
||||
if (!(std::cin >> n) || n <= 0) {
|
||||
std::cerr << "n must be positive\n";
|
||||
return 1;
|
||||
}
|
||||
|
||||
using Fabric =
|
||||
THWeaver::FanOutFabric<uint64_t, 8, kBufferSize, 8, kConsumers>;
|
||||
Fabric fabric;
|
||||
fabric.init();
|
||||
|
||||
std::atomic<bool> failed(false);
|
||||
std::atomic<int> consumed0(0);
|
||||
std::atomic<int> consumed1(0);
|
||||
std::vector<int64_t> send_time(n, -1);
|
||||
std::vector<int64_t> recv_time0(n, -1);
|
||||
std::vector<int64_t> recv_time1(n, -1);
|
||||
|
||||
std::thread consumer0([&]() {
|
||||
auto token = fabric.get_empty_borrowed_ref<0>();
|
||||
int expected = 1;
|
||||
while (expected <= n && !failed.load()) {
|
||||
if (fabric.recv<0>(token) !=
|
||||
THWeaver::FanOutFabricRecvResult::Ok) {
|
||||
continue;
|
||||
}
|
||||
if (token.get() != static_cast<uint64_t>(expected)) {
|
||||
std::cerr << "fan-out mismatch on consumer 0: "
|
||||
<< "expected " << expected << " got "
|
||||
<< token.get() << "\n";
|
||||
failed.store(true);
|
||||
break;
|
||||
}
|
||||
auto recv_time = steady_clock::now().time_since_epoch();
|
||||
auto recv_ns =
|
||||
std::chrono::duration_cast<nanoseconds>(
|
||||
recv_time)
|
||||
.count();
|
||||
recv_time0[expected - 1] = recv_ns;
|
||||
token.release();
|
||||
consumed0.store(expected);
|
||||
++expected;
|
||||
}
|
||||
});
|
||||
|
||||
std::thread consumer1([&]() {
|
||||
auto token = fabric.get_empty_borrowed_ref<1>();
|
||||
int expected = 1;
|
||||
while (expected <= n && !failed.load()) {
|
||||
if (fabric.recv<1>(token) !=
|
||||
THWeaver::FanOutFabricRecvResult::Ok) {
|
||||
continue;
|
||||
}
|
||||
if (token.get() != static_cast<uint64_t>(expected)) {
|
||||
std::cerr << "fan-out mismatch on consumer 1: "
|
||||
<< "expected " << expected << " got "
|
||||
<< token.get() << "\n";
|
||||
failed.store(true);
|
||||
break;
|
||||
}
|
||||
auto recv_time = steady_clock::now().time_since_epoch();
|
||||
auto recv_ns =
|
||||
std::chrono::duration_cast<nanoseconds>(
|
||||
recv_time)
|
||||
.count();
|
||||
recv_time1[expected - 1] = recv_ns;
|
||||
token.release();
|
||||
consumed1.store(expected);
|
||||
++expected;
|
||||
}
|
||||
});
|
||||
|
||||
constexpr uint64_t policy = (1ULL << 0) | (1ULL << 1);
|
||||
|
||||
for (int i = 1; i <= n && !failed.load(); ++i) {
|
||||
while (!failed.load()) {
|
||||
auto now = steady_clock::now().time_since_epoch();
|
||||
auto ns = std::chrono::duration_cast<nanoseconds>(now)
|
||||
.count();
|
||||
auto res = fabric.send(static_cast<uint64_t>(i), policy,
|
||||
kBufferSize);
|
||||
if (res.result ==
|
||||
THWeaver::FanOutFabricSendResult<uint64_t>::Ok) {
|
||||
send_time[i - 1] = ns;
|
||||
break;
|
||||
}
|
||||
if (res.result ==
|
||||
THWeaver::FanOutFabricSendResult<
|
||||
uint64_t>::ErrPartialSuccess) {
|
||||
std::cerr << "fan-out partial success for "
|
||||
<< "message " << i << "\n";
|
||||
failed.store(true);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
while (!failed.load() &&
|
||||
(consumed0.load() < i || consumed1.load() < i)) {
|
||||
}
|
||||
}
|
||||
|
||||
consumer0.join();
|
||||
consumer1.join();
|
||||
|
||||
if (failed.load()) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
for (int i = 1; i <= n; ++i) {
|
||||
int64_t send_ns = send_time[i - 1];
|
||||
int64_t recv_ns0 = recv_time0[i - 1];
|
||||
int64_t recv_ns1 = recv_time1[i - 1];
|
||||
|
||||
std::cout << i << "," << (recv_ns0 - send_ns) << "\n";
|
||||
std::cout << i << "," << (recv_ns1 - send_ns) << "\n";
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
Reference in New Issue
Block a user