[UNTESTED] Basic queue and fan-in fabric implementation without full template generalization
This commit is contained in:
267
src/fan_in.cc
Normal file
267
src/fan_in.cc
Normal file
@@ -0,0 +1,267 @@
|
||||
#include <atomic>
|
||||
#include <bit>
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <weaver/fabric.h>
|
||||
#include <weaver/results.h>
|
||||
|
||||
namespace THWeaver {
|
||||
|
||||
template <typename MessageType, std::size_t qsize_expt,
|
||||
std::size_t IN_THREAD_CNT>
|
||||
void FanInFabric<MessageType, qsize_expt, IN_THREAD_CNT>::init() noexcept {
|
||||
for (int i = 0; i < IN_THREAD_CNT; ++i)
|
||||
in_queues[i].init();
|
||||
hint.store(0);
|
||||
dl.init();
|
||||
rotation = 0;
|
||||
}
|
||||
|
||||
template <typename MessageType, std::size_t qsize_expt,
|
||||
std::size_t IN_THREAD_CNT>
|
||||
template <std::size_t thread_id>
|
||||
void FanInFabric<MessageType, qsize_expt, IN_THREAD_CNT>::flush() noexcept {
|
||||
static_assert(thread_id < IN_THREAD_CNT, "Thread ID out of bounds");
|
||||
|
||||
in_queues[thread_id].flush();
|
||||
hint.fetch_and(~(1 << thread_id), std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
template <typename MessageType, std::size_t qsize_expt,
|
||||
std::size_t IN_THREAD_CNT>
|
||||
void FanInFabric<MessageType, qsize_expt, IN_THREAD_CNT>::flush_all() noexcept {
|
||||
for (int i = 0; i < IN_THREAD_CNT; ++i)
|
||||
in_queues[i].flush();
|
||||
hint.store(0, std::memory_order_relaxed);
|
||||
dl.init();
|
||||
rotation = 0;
|
||||
}
|
||||
|
||||
template <typename MessageType, std::size_t qsize_expt,
|
||||
std::size_t IN_THREAD_CNT>
|
||||
template <std::size_t thread_id>
|
||||
FanInFabricSendResult FanInFabric<MessageType, qsize_expt, IN_THREAD_CNT>::send(
|
||||
MessageType &&message) noexcept {
|
||||
static_assert(thread_id < IN_THREAD_CNT, "Thread ID out of bounds");
|
||||
|
||||
return static_cast<FanInFabricSendResult>(
|
||||
static_cast<uint32_t>(in_queues[thread_id].send(message)));
|
||||
}
|
||||
|
||||
template <typename MessageType, std::size_t qsize_expt,
|
||||
std::size_t IN_THREAD_CNT>
|
||||
template <std::size_t thread_id>
|
||||
std::size_t
|
||||
FanInFabric<MessageType, qsize_expt, IN_THREAD_CNT>::size() const noexcept {
|
||||
static_assert(thread_id < IN_THREAD_CNT, "Thread ID out of bounds");
|
||||
|
||||
return in_queues[thread_id].size();
|
||||
}
|
||||
|
||||
template <typename MessageType, std::size_t qsize_expt,
|
||||
std::size_t IN_THREAD_CNT>
|
||||
template <std::size_t thread_id>
|
||||
bool FanInFabric<MessageType, qsize_expt, IN_THREAD_CNT>::is_empty()
|
||||
const noexcept {
|
||||
static_assert(thread_id < IN_THREAD_CNT, "Thread ID out of bounds");
|
||||
|
||||
return in_queues[thread_id].is_empty();
|
||||
}
|
||||
|
||||
template <typename MessageType, std::size_t qsize_expt,
|
||||
std::size_t IN_THREAD_CNT>
|
||||
template <std::size_t thread_id>
|
||||
bool FanInFabric<MessageType, qsize_expt, IN_THREAD_CNT>::is_full()
|
||||
const noexcept {
|
||||
static_assert(thread_id < IN_THREAD_CNT, "Thread ID out of bounds");
|
||||
|
||||
return in_queues[thread_id].is_full();
|
||||
}
|
||||
|
||||
template <typename MessageType, std::size_t qsize_expt,
|
||||
std::size_t IN_THREAD_CNT>
|
||||
FanInFabricRecvBitmapResult
|
||||
FanInFabric<MessageType, qsize_expt, IN_THREAD_CNT>::recv_bitmap_only(
|
||||
MessageType &buffer) noexcept {
|
||||
BitmapType bitmap = hint.load(std::memory_order_relaxed);
|
||||
|
||||
if (!bitmap)
|
||||
return FanInFabricRecvBitmapResult::ErrBitmapEmpty;
|
||||
|
||||
bitmap = rotate(bitmap);
|
||||
int hinted_id = (std::countr_zero(bitmap) + rotation) % IN_THREAD_CNT;
|
||||
rotation = (hinted_id + 1) % IN_THREAD_CNT;
|
||||
|
||||
std::size_t head = in_queues[hinted_id].get_head();
|
||||
std::size_t tail = in_queues[hinted_id].get_tail();
|
||||
std::size_t qsize = ((tail + QSIZE_MASK) - head) & QSIZE_MASK;
|
||||
|
||||
if (qsize <= 1)
|
||||
hint.fetch_add(~(1 << hinted_id), std::memory_order_relaxed);
|
||||
|
||||
if (qsize) {
|
||||
in_queues[hinted_id].recv_unsafe(buffer, head);
|
||||
dl.move_to_last(hinted_id);
|
||||
return FanInFabricRecvBitmapResult::Ok;
|
||||
}
|
||||
|
||||
return FanInFabricRecvBitmapResult::ErrBitmapFailed;
|
||||
}
|
||||
|
||||
template <typename MessageType, std::size_t qsize_expt,
|
||||
std::size_t IN_THREAD_CNT>
|
||||
FanInFabricRecvTryResult
|
||||
FanInFabric<MessageType, qsize_expt, IN_THREAD_CNT>::recv_try(
|
||||
MessageType &buffer) noexcept {
|
||||
BitmapType bitmap = hint.load(std::memory_order_relaxed);
|
||||
|
||||
if (bitmap) {
|
||||
bitmap = rotate(bitmap);
|
||||
int hinted_id =
|
||||
(std::countr_zero(bitmap) + rotation) % IN_THREAD_CNT;
|
||||
rotation = (hinted_id + 1) % IN_THREAD_CNT;
|
||||
|
||||
std::size_t head = in_queues[hinted_id].get_head();
|
||||
std::size_t tail = in_queues[hinted_id].get_tail();
|
||||
std::size_t qsize = ((tail + QSIZE_MASK) - head) & QSIZE_MASK;
|
||||
|
||||
if (qsize <= 1)
|
||||
hint.fetch_add(~(1 << hinted_id),
|
||||
std::memory_order_relaxed);
|
||||
|
||||
if (qsize) {
|
||||
in_queues[hinted_id].recv_unsafe(buffer, head);
|
||||
dl.move_to_last(hinted_id);
|
||||
return FanInFabricRecvTryResult::Ok;
|
||||
}
|
||||
}
|
||||
|
||||
std::size_t head = in_queues[dl.first].get_head();
|
||||
std::size_t tail = in_queues[dl.first].get_tail();
|
||||
|
||||
if (head == tail)
|
||||
return FanInFabricRecvTryResult::ErrTryFailed;
|
||||
|
||||
in_queues[dl.first].recv_unsafe(buffer, head);
|
||||
dl.move_first_to_last();
|
||||
return FanInFabricRecvTryResult::Ok;
|
||||
}
|
||||
|
||||
template <typename MessageType, std::size_t qsize_expt,
|
||||
std::size_t IN_THREAD_CNT>
|
||||
FanInFabricRecvAggrResult
|
||||
FanInFabric<MessageType, qsize_expt, IN_THREAD_CNT>::recv_aggr(
|
||||
MessageType &buffer) noexcept {
|
||||
BitmapType bitmap = hint.load(std::memory_order_relaxed);
|
||||
|
||||
if (bitmap) {
|
||||
bitmap = rotate(bitmap);
|
||||
int hinted_id =
|
||||
(std::countr_zero(bitmap) + rotation) % IN_THREAD_CNT;
|
||||
rotation = (hinted_id + 1) % IN_THREAD_CNT;
|
||||
|
||||
std::size_t head = in_queues[hinted_id].get_head();
|
||||
std::size_t tail = in_queues[hinted_id].get_tail();
|
||||
std::size_t qsize = ((tail + QSIZE_MASK) - head) & QSIZE_MASK;
|
||||
|
||||
if (qsize <= 1)
|
||||
hint.fetch_add(~(1 << hinted_id),
|
||||
std::memory_order_relaxed);
|
||||
|
||||
if (qsize) {
|
||||
in_queues[hinted_id].recv_unsafe(buffer, head);
|
||||
dl.move_to_last(hinted_id);
|
||||
return FanInFabricRecvAggrResult::Ok;
|
||||
}
|
||||
}
|
||||
|
||||
uint8_t last = dl.last;
|
||||
|
||||
do {
|
||||
std::size_t head = in_queues[dl.first].get_head();
|
||||
std::size_t tail = in_queues[dl.first].get_tail();
|
||||
dl.move_first_to_last();
|
||||
|
||||
if (head != tail) {
|
||||
in_queues[dl.first].recv_unsafe(buffer, head);
|
||||
return FanInFabricRecvAggrResult::Ok;
|
||||
}
|
||||
} while (dl.last != last);
|
||||
|
||||
return FanInFabricRecvAggrResult::ErrAggrFailed;
|
||||
}
|
||||
|
||||
template <typename MessageType, std::size_t qsize_expt,
|
||||
std::size_t IN_THREAD_CNT>
|
||||
void FanInFabric<MessageType, qsize_expt, IN_THREAD_CNT>::full_scan(
|
||||
BitmapType &bitmap) noexcept {
|
||||
bitmap = 0;
|
||||
|
||||
uint8_t curr = dl.first;
|
||||
|
||||
if (in_queues[curr].size())
|
||||
bitmap |= 1 << curr;
|
||||
|
||||
do {
|
||||
curr = dl.next[curr];
|
||||
if (in_queues[curr].size())
|
||||
bitmap |= 1 << curr;
|
||||
} while (curr != dl.last);
|
||||
|
||||
hint.fetch_or(bitmap, std::memory_order_release);
|
||||
}
|
||||
|
||||
template <typename MessageType, std::size_t qsize_expt,
|
||||
std::size_t IN_THREAD_CNT>
|
||||
void FanInFabric<MessageType, qsize_expt,
|
||||
IN_THREAD_CNT>::DiscoveryList::init() noexcept {
|
||||
for (int i = 0; i < IN_THREAD_CNT; ++i) {
|
||||
next[i] = (i + 1) % IN_THREAD_CNT;
|
||||
prev[i] = (i + IN_THREAD_CNT - 1) % IN_THREAD_CNT;
|
||||
}
|
||||
first = 0;
|
||||
last = IN_THREAD_CNT - 1;
|
||||
}
|
||||
|
||||
template <typename MessageType, std::size_t qsize_expt,
|
||||
std::size_t IN_THREAD_CNT>
|
||||
void FanInFabric<MessageType, qsize_expt,
|
||||
IN_THREAD_CNT>::DiscoveryList::move_first_to_last() noexcept {
|
||||
prev[first] = last;
|
||||
next[last] = first;
|
||||
last = next[last];
|
||||
first = next[last];
|
||||
}
|
||||
|
||||
template <typename MessageType, std::size_t qsize_expt,
|
||||
std::size_t IN_THREAD_CNT>
|
||||
void FanInFabric<MessageType, qsize_expt, IN_THREAD_CNT>::DiscoveryList::
|
||||
move_to_last(uint8_t index) noexcept {
|
||||
if (index == last)
|
||||
return;
|
||||
else if (index == first) {
|
||||
move_first_to_last();
|
||||
} else {
|
||||
prev[next[index]] = prev[index];
|
||||
next[prev[index]] = next[index];
|
||||
next[last] = index;
|
||||
prev[index] = last;
|
||||
last = index;
|
||||
}
|
||||
}
|
||||
|
||||
template <typename MessageType, std::size_t qsize_expt,
|
||||
std::size_t IN_THREAD_CNT>
|
||||
FanInFabric<MessageType, qsize_expt, IN_THREAD_CNT>::BitmapType
|
||||
FanInFabric<MessageType, qsize_expt, IN_THREAD_CNT>::rotate(
|
||||
BitmapType bitmap) const noexcept {
|
||||
bitmap &= THREAD_BITMAP_MASK;
|
||||
|
||||
if (rotation == 0)
|
||||
return bitmap;
|
||||
|
||||
return ((bitmap >> rotation) | (bitmap << (IN_THREAD_CNT - rotation))) &
|
||||
THREAD_BITMAP_MASK;
|
||||
}
|
||||
|
||||
} // namespace THWeaver
|
||||
89
src/queue.cc
Normal file
89
src/queue.cc
Normal file
@@ -0,0 +1,89 @@
|
||||
#include <atomic>
|
||||
#include <cstddef>
|
||||
|
||||
#include <weaver/fabric.h>
|
||||
#include <weaver/results.h>
|
||||
|
||||
namespace THWeaver {
|
||||
|
||||
template <typename MessageType, std::size_t qsize_expt>
|
||||
void EndpointQueue<MessageType, qsize_expt>::init() noexcept {
|
||||
head.store(0);
|
||||
tail.store(0);
|
||||
}
|
||||
|
||||
template <typename MessageType, std::size_t qsize_expt>
|
||||
void EndpointQueue<MessageType, qsize_expt>::flush() noexcept {
|
||||
head.store(0);
|
||||
tail.store(0);
|
||||
}
|
||||
|
||||
template <typename MessageType, std::size_t qsize_expt>
|
||||
EndpointQueueSendResult
|
||||
EndpointQueue<MessageType, qsize_expt>::send(MessageType &&message) noexcept {
|
||||
std::size_t h = head.load(std::memory_order_acquire);
|
||||
std::size_t t = tail.load(std::memory_order_acquire);
|
||||
|
||||
if (h == ((t + 1) & QSIZE_MASK))
|
||||
return EndpointQueueSendResult::ErrFull;
|
||||
|
||||
buffer[t] = std::move(message);
|
||||
|
||||
t = (t + 1) & QSIZE_MASK;
|
||||
tail.store(t);
|
||||
|
||||
return EndpointQueueSendResult::Ok;
|
||||
}
|
||||
|
||||
template <typename MessageType, std::size_t qsize_expt>
|
||||
EndpointQueueRecvResult
|
||||
EndpointQueue<MessageType, qsize_expt>::recv(MessageType &buffer) noexcept {
|
||||
std::size_t h = head.load();
|
||||
std::size_t t = tail.load();
|
||||
|
||||
if (h == t)
|
||||
return EndpointQueueRecvResult::ErrEmpty;
|
||||
|
||||
buffer = std::move(this->buffer[h]);
|
||||
|
||||
h = (h + 1) & QSIZE_MASK;
|
||||
head.store(h);
|
||||
|
||||
return EndpointQueueRecvResult::Ok;
|
||||
}
|
||||
|
||||
template <typename MessageType, std::size_t qsize_expt>
|
||||
void EndpointQueue<MessageType, qsize_expt>::recv_unsafe(
|
||||
MessageType &buffer, std::size_t old_head) noexcept {
|
||||
|
||||
buffer = std::move(this->buffer[old_head]);
|
||||
|
||||
head.store((old_head + 1) & QSIZE_MASK);
|
||||
}
|
||||
|
||||
template <typename MessageType, std::size_t qsize_expt>
|
||||
std::size_t EndpointQueue<MessageType, qsize_expt>::size() const noexcept {
|
||||
return ((tail.load() + QSIZE_MASK) - head.load()) & QSIZE_MASK;
|
||||
}
|
||||
|
||||
template <typename MessageType, std::size_t qsize_expt>
|
||||
bool EndpointQueue<MessageType, qsize_expt>::is_empty() const noexcept {
|
||||
return head.load() == tail.load();
|
||||
}
|
||||
|
||||
template <typename MessageType, std::size_t qsize_expt>
|
||||
bool EndpointQueue<MessageType, qsize_expt>::is_full() const noexcept {
|
||||
return head.load() != ((tail.load() + 1) & QSIZE_MASK);
|
||||
}
|
||||
|
||||
template <typename MessageType, std::size_t qsize_expt>
|
||||
std::size_t EndpointQueue<MessageType, qsize_expt>::get_head() const noexcept {
|
||||
return head.load();
|
||||
}
|
||||
|
||||
template <typename MessageType, std::size_t qsize_expt>
|
||||
std::size_t EndpointQueue<MessageType, qsize_expt>::get_tail() const noexcept {
|
||||
return tail.load();
|
||||
}
|
||||
|
||||
} // namespace THWeaver
|
||||
Reference in New Issue
Block a user