Files
threadweaver/include/weaver.h

366 lines
13 KiB
C++

#ifndef THWEAVER_FABRIC_H
#define THWEAVER_FABRIC_H
#include <array>
#include <atomic>
#include <cstddef>
#include <cstdint>
#include <new>
namespace THWeaver {
static constexpr size_t CLS = std::hardware_destructive_interference_size;
enum class EndpointQueueSendResult : uint32_t { Ok = 0, ErrFull = 1 };
enum class EndpointQueueRecvResult { Ok, ErrEmpty };
enum class FanInFabricSendResult : uint32_t { Ok = 0, ErrFull = 1 };
enum class FanInFabricRecvBitmapResult { Ok, ErrBitmapEmpty, ErrBitmapFailed };
enum class FanInFabricRecvTryResult { Ok, ErrTryFailed };
enum class FanInFabricRecvAggrResult { Ok, ErrAggrFailed };
template <typename MessageType, std::size_t qsize_expt> class EndpointQueue {
public:
static constexpr std::size_t QSIZE = 1 << qsize_expt;
static constexpr std::size_t QSIZE_MASK = QSIZE - 1;
EndpointQueue() noexcept {};
EndpointQueue(const EndpointQueue<MessageType, qsize_expt> &) = delete;
EndpointQueue &
operator=(const EndpointQueue<MessageType, qsize_expt> &) = delete;
EndpointQueue(EndpointQueue<MessageType, qsize_expt> &&) = delete;
EndpointQueue &
operator=(EndpointQueue<MessageType, qsize_expt> &&) = delete;
void init() noexcept {
head.store(0);
tail.store(0);
}
void flush() noexcept {
head.store(0);
tail.store(0);
}
std::size_t size() const noexcept {
return ((tail.load() + QSIZE_MASK) - head.load()) & QSIZE_MASK;
}
bool is_empty() const noexcept { return head.load() == tail.load(); }
bool is_full() const noexcept {
return head.load() == ((tail.load() + 1) & QSIZE_MASK);
}
std::size_t get_head() const noexcept { return head.load(); }
std::size_t get_tail() const noexcept { return tail.load(); }
EndpointQueueSendResult send(MessageType &&message) noexcept {
std::size_t h = head.load();
std::size_t t = tail.load();
if (h == ((t + 1) & QSIZE_MASK))
return EndpointQueueSendResult::ErrFull;
buffer[t] = std::move(message);
t = (t + 1) & QSIZE_MASK;
tail.store(t);
return EndpointQueueSendResult::Ok;
}
EndpointQueueRecvResult recv(MessageType &buffer) noexcept {
std::size_t h = head.load();
std::size_t t = tail.load();
if (h == t)
return EndpointQueueRecvResult::ErrEmpty;
buffer = std::move(this->buffer[h]);
h = (h + 1) & QSIZE_MASK;
head.store(h);
return EndpointQueueRecvResult::Ok;
}
void recv_unsafe(MessageType &buffer, std::size_t old_head) noexcept {
buffer = std::move(this->buffer[old_head]);
head.store((old_head + 1) & QSIZE_MASK);
}
private:
alignas(CLS) std::array<MessageType, QSIZE> buffer;
alignas(CLS) std::atomic<std::size_t> head;
alignas(CLS) std::atomic<std::size_t> tail;
};
template <typename MessageType, std::size_t qsize_expt,
std::size_t IN_THREAD_CNT>
class FanInFabric {
public:
using BitmapType = uint64_t;
static constexpr std::size_t QSIZE = 1 << qsize_expt;
static constexpr std::size_t QSIZE_MASK = QSIZE - 1;
static constexpr std::size_t BITMAP_SIZE = sizeof(BitmapType) * 8;
static constexpr BitmapType THREAD_BITMAP_MASK =
IN_THREAD_CNT == BITMAP_SIZE
? ~BitmapType(0)
: (BitmapType(1) << IN_THREAD_CNT) - 1;
static_assert(IN_THREAD_CNT <= BITMAP_SIZE, "Producer limit exceeded");
FanInFabric() noexcept {}
FanInFabric(const FanInFabric<MessageType, qsize_expt, IN_THREAD_CNT>
&) = delete;
FanInFabric &
operator=(const FanInFabric<MessageType, qsize_expt, IN_THREAD_CNT> &) =
delete;
FanInFabric(FanInFabric<MessageType, qsize_expt, IN_THREAD_CNT> &&) =
delete;
FanInFabric &
operator=(FanInFabric<MessageType, qsize_expt, IN_THREAD_CNT> &&) =
delete;
void init() noexcept {
for (int i = 0; i < IN_THREAD_CNT; ++i)
in_queues[i].init();
hint.store(0);
dl.init();
rotation = 0;
}
template <std::size_t thread_id> void flush() noexcept {
static_assert(thread_id < IN_THREAD_CNT,
"Thread ID out of bounds");
in_queues[thread_id].flush();
hint.fetch_and(~(1 << thread_id), std::memory_order_relaxed);
}
void flush_all() noexcept {
for (int i = 0; i < IN_THREAD_CNT; ++i)
in_queues[i].flush();
hint.store(0, std::memory_order_relaxed);
dl.init();
rotation = 0;
}
template <std::size_t thread_id>
FanInFabricSendResult send(MessageType &&message) noexcept {
static_assert(thread_id < IN_THREAD_CNT,
"Thread ID out of bounds");
return static_cast<FanInFabricSendResult>(static_cast<uint32_t>(
in_queues[thread_id].send(message)));
}
FanInFabricRecvBitmapResult
recv_bitmap_only(MessageType &buffer) noexcept {
BitmapType bitmap = hint.load(std::memory_order_relaxed);
if (!bitmap)
return FanInFabricRecvBitmapResult::ErrBitmapEmpty;
bitmap = rotate(bitmap);
int hinted_id =
(std::countr_zero(bitmap) + rotation) % IN_THREAD_CNT;
rotation = (hinted_id + 1) % IN_THREAD_CNT;
std::size_t head = in_queues[hinted_id].get_head();
std::size_t tail = in_queues[hinted_id].get_tail();
std::size_t qsize = ((tail + QSIZE_MASK) - head) & QSIZE_MASK;
if (qsize <= 1)
hint.fetch_and(~(1 << hinted_id),
std::memory_order_relaxed);
if (qsize) {
in_queues[hinted_id].recv_unsafe(buffer, head);
dl.move_to_last(hinted_id);
return FanInFabricRecvBitmapResult::Ok;
}
return FanInFabricRecvBitmapResult::ErrBitmapFailed;
}
FanInFabricRecvTryResult recv_try(MessageType &buffer) noexcept {
BitmapType bitmap = hint.load(std::memory_order_relaxed);
if (bitmap) {
bitmap = rotate(bitmap);
int hinted_id = (std::countr_zero(bitmap) + rotation) %
IN_THREAD_CNT;
rotation = (hinted_id + 1) % IN_THREAD_CNT;
std::size_t head = in_queues[hinted_id].get_head();
std::size_t tail = in_queues[hinted_id].get_tail();
std::size_t qsize =
((tail + QSIZE_MASK) - head) & QSIZE_MASK;
if (qsize <= 1)
hint.fetch_and(~(1 << hinted_id),
std::memory_order_relaxed);
if (qsize) {
in_queues[hinted_id].recv_unsafe(buffer, head);
dl.move_to_last(hinted_id);
return FanInFabricRecvTryResult::Ok;
}
}
std::size_t head = in_queues[dl.first].get_head();
std::size_t tail = in_queues[dl.first].get_tail();
if (head == tail)
return FanInFabricRecvTryResult::ErrTryFailed;
in_queues[dl.first].recv_unsafe(buffer, head);
dl.move_first_to_last();
return FanInFabricRecvTryResult::Ok;
}
FanInFabricRecvAggrResult recv_aggr(MessageType &buffer) noexcept {
BitmapType bitmap = hint.load(std::memory_order_relaxed);
if (bitmap) {
bitmap = rotate(bitmap);
int hinted_id = (std::countr_zero(bitmap) + rotation) %
IN_THREAD_CNT;
rotation = (hinted_id + 1) % IN_THREAD_CNT;
std::size_t head = in_queues[hinted_id].get_head();
std::size_t tail = in_queues[hinted_id].get_tail();
std::size_t qsize =
((tail + QSIZE_MASK) - head) & QSIZE_MASK;
if (qsize <= 1)
hint.fetch_and(~(1 << hinted_id),
std::memory_order_relaxed);
if (qsize) {
in_queues[hinted_id].recv_unsafe(buffer, head);
dl.move_to_last(hinted_id);
return FanInFabricRecvAggrResult::Ok;
}
}
uint8_t last = dl.last;
do {
uint8_t first = dl.first;
std::size_t head = in_queues[dl.first].get_head();
std::size_t tail = in_queues[dl.first].get_tail();
dl.move_first_to_last();
if (head != tail) {
in_queues[first].recv_unsafe(buffer, head);
return FanInFabricRecvAggrResult::Ok;
}
} while (dl.last != last);
return FanInFabricRecvAggrResult::ErrAggrFailed;
}
void full_scan(BitmapType &bitmap) noexcept {
bitmap = 0;
uint8_t curr = dl.first;
if (in_queues[curr].size())
bitmap |= 1 << curr;
do {
curr = dl.next[curr];
if (in_queues[curr].size())
bitmap |= 1 << curr;
} while (curr != dl.last);
hint.fetch_or(bitmap, std::memory_order_release);
}
template <std::size_t thread_id> std::size_t size() const noexcept {
static_assert(thread_id < IN_THREAD_CNT,
"Thread ID out of bounds");
return in_queues[thread_id].size();
}
template <std::size_t thread_id> bool is_empty() const noexcept {
static_assert(thread_id < IN_THREAD_CNT,
"Thread ID out of bounds");
return in_queues[thread_id].is_empty();
}
template <std::size_t thread_id> bool is_full() const noexcept {
static_assert(thread_id < IN_THREAD_CNT,
"Thread ID out of bounds");
return in_queues[thread_id].is_full();
}
private:
std::array<EndpointQueue<MessageType, qsize_expt>, IN_THREAD_CNT>
in_queues;
alignas(CLS) std::atomic<BitmapType> hint;
struct alignas(CLS) DiscoveryList {
std::array<uint8_t, IN_THREAD_CNT> next;
std::array<uint8_t, IN_THREAD_CNT> prev;
uint8_t first;
uint8_t last;
void init() noexcept {
for (int i = 0; i < IN_THREAD_CNT; ++i) {
next[i] = (i + 1) % IN_THREAD_CNT;
prev[i] =
(i + IN_THREAD_CNT - 1) % IN_THREAD_CNT;
}
first = 0;
last = IN_THREAD_CNT - 1;
}
void move_first_to_last() noexcept {
prev[first] = last;
next[last] = first;
last = next[last];
first = next[last];
}
void move_to_last(uint8_t index) noexcept {
if (index == last)
return;
else if (index == first) {
move_first_to_last();
} else {
prev[next[index]] = prev[index];
next[prev[index]] = next[index];
next[last] = index;
prev[index] = last;
last = index;
}
}
} dl;
std::size_t rotation;
BitmapType rotate(BitmapType bitmap) const noexcept {
bitmap &= THREAD_BITMAP_MASK;
if (rotation == 0)
return bitmap;
return ((bitmap >> rotation) |
(bitmap << (IN_THREAD_CNT - rotation))) &
THREAD_BITMAP_MASK;
}
};
} // namespace THWeaver
#endif