[TESTED] patched for basic tests, moved fan-out buffer to the stack, all basic tests are runnable

This commit is contained in:
2026-01-04 13:21:17 -05:00
parent 103903be8e
commit dd3d6be073
3 changed files with 303 additions and 9 deletions

View File

@@ -9,7 +9,6 @@
#include <limits> #include <limits>
#include <new> #include <new>
#include <utility> #include <utility>
#include <vector>
namespace THWeaver { namespace THWeaver {
@@ -147,7 +146,7 @@ public:
FanInFabric &operator=(FanInFabric &&) = delete; FanInFabric &operator=(FanInFabric &&) = delete;
void init() noexcept { void init() noexcept {
for (int i = 0; i < IN_THREAD_CNT; ++i) for (std::size_t i = 0; i < IN_THREAD_CNT; ++i)
in_queues[i].init(); in_queues[i].init();
hint.store(0); hint.store(0);
dl.init(); dl.init();
@@ -336,7 +335,7 @@ private:
uint8_t first; uint8_t first;
uint8_t last; uint8_t last;
void init() noexcept { void init() noexcept {
for (int i = 0; i < IN_THREAD_CNT; ++i) { for (std::size_t i = 0; i < IN_THREAD_CNT; ++i) {
next[i] = (i + 1) % IN_THREAD_CNT; next[i] = (i + 1) % IN_THREAD_CNT;
prev[i] = prev[i] =
(i + IN_THREAD_CNT - 1) % IN_THREAD_CNT; (i + IN_THREAD_CNT - 1) % IN_THREAD_CNT;
@@ -385,6 +384,7 @@ template <typename MessageType, std::size_t consumer_qsize_expt,
std::size_t BUFFER_SIZE, std::size_t reclaim_qsize_expt, std::size_t BUFFER_SIZE, std::size_t reclaim_qsize_expt,
std::size_t OUT_THREAD_CNT> std::size_t OUT_THREAD_CNT>
class FanOutFabric { class FanOutFabric {
public:
using BitmapType = uint64_t; using BitmapType = uint64_t;
using RefCountType = uint32_t; using RefCountType = uint32_t;
using IndexType = uint16_t; using IndexType = uint16_t;
@@ -462,12 +462,11 @@ class FanOutFabric {
}; };
void init() noexcept { void init() noexcept {
for (int i = 0; i < OUT_THREAD_CNT; ++i) { for (std::size_t i = 0; i < OUT_THREAD_CNT; ++i) {
reclaim_queues[i].init(); reclaim_queues[i].init();
consumer_queues[i].init(); consumer_queues[i].init();
} }
reclaim_hint.store(0, std::memory_order_release); reclaim_hint.store(0, std::memory_order_release);
buffer.resize(BUFFER_SIZE);
fs.init(); fs.init();
reclaim_rot = 0; reclaim_rot = 0;
} }
@@ -539,7 +538,7 @@ class FanOutFabric {
for (std::size_t i = 0; i < OUT_THREAD_CNT; ++i) { for (std::size_t i = 0; i < OUT_THREAD_CNT; ++i) {
if ((policy & (BitmapType(1) << i)) && if ((policy & (BitmapType(1) << i)) &&
consumer_queues[i].send(slot) != consumer_queues[i].send(std::move(slot)) !=
EndpointQueueSendResult::Ok) EndpointQueueSendResult::Ok)
++failed_cnt; ++failed_cnt;
else else
@@ -655,7 +654,7 @@ class FanOutFabric {
static_assert(thread_id < OUT_THREAD_CNT, static_assert(thread_id < OUT_THREAD_CNT,
"Thread ID out of bounds"); "Thread ID out of bounds");
auto res = reclaim_queues[thread_id].send(slot); auto res = reclaim_queues[thread_id].send(std::move(slot));
if (res == EndpointQueueSendResult::Ok) { if (res == EndpointQueueSendResult::Ok) {
reclaim_hint.fetch_or(BitmapType(1) << thread_id, reclaim_hint.fetch_or(BitmapType(1) << thread_id,
std::memory_order_relaxed); std::memory_order_relaxed);
@@ -676,12 +675,12 @@ private:
std::atomic<RefCountType> count; std::atomic<RefCountType> count;
MessageType payload; MessageType payload;
}; };
std::vector<BufferSlot> buffer; std::array<BufferSlot, BUFFER_SIZE> buffer;
struct alignas(CLS) FreeStack { struct alignas(CLS) FreeStack {
std::array<IndexType, BUFFER_SIZE> free; std::array<IndexType, BUFFER_SIZE> free;
int top; int top;
void init() { void init() {
for (int i = 0; i < BUFFER_SIZE; ++i) for (std::size_t i = 0; i < BUFFER_SIZE; ++i)
free[i] = i; free[i] = i;
top = BUFFER_SIZE; top = BUFFER_SIZE;
} }

161
tests/fan_in/basic_test.cc Normal file
View File

@@ -0,0 +1,161 @@
#include <atomic>
#include <chrono>
#include <cstdint>
#include <iostream>
#include <thread>
#include <vector>
#include <weaver.h>
using std::chrono::nanoseconds;
constexpr std::size_t kThreads = 2;
uint64_t pack_message(uint32_t producer, uint32_t seq) {
return (static_cast<uint64_t>(producer) << 32) |
static_cast<uint64_t>(seq);
}
void unpack_message(uint64_t value, uint32_t &producer, uint32_t &seq) {
producer = static_cast<uint32_t>(value >> 32);
seq = static_cast<uint32_t>(value & 0xffffffffu);
}
using std::chrono::nanoseconds;
using std::chrono::steady_clock;
int main() {
int n = 0;
if (!(std::cin >> n) || n <= 0) {
std::cerr << "n must be positive\n";
return 1;
}
using Fabric = THWeaver::FanInFabric<uint64_t, 10, kThreads>;
Fabric fabric;
fabric.init();
std::atomic<bool> failed(false);
std::vector<int64_t> send_time0(n, -1);
std::vector<int64_t> send_time1(n, -1);
std::vector<int64_t> recv_time0(n, -1);
std::vector<int64_t> recv_time1(n, -1);
std::thread producer0([&]() {
for (int i = 1; i <= n && !failed.load(); ++i) {
while (true) {
auto now =
std::chrono::steady_clock::now()
.time_since_epoch();
auto ns =
std::chrono::duration_cast<nanoseconds>(
now)
.count();
if (fabric.send<0>(pack_message(0, i)) ==
THWeaver::FanInFabricSendResult::Ok) {
send_time0[i - 1] = ns;
break;
}
}
}
});
std::thread producer1([&]() {
for (int i = 1; i <= n && !failed.load(); ++i) {
while (true) {
auto now =
std::chrono::steady_clock::now()
.time_since_epoch();
auto ns =
std::chrono::duration_cast<nanoseconds>(
now)
.count();
if (fabric.send<1>(pack_message(1, i)) ==
THWeaver::FanInFabricSendResult::Ok) {
send_time1[i - 1] = ns;
break;
}
}
}
});
std::thread consumer([&]() {
std::vector<uint32_t> expected(kThreads, 1);
std::size_t received = 0;
const std::size_t total =
static_cast<std::size_t>(n) * kThreads;
while (received < total && !failed.load()) {
uint64_t value = 0;
auto res = fabric.recv_try(value);
if (res == THWeaver::FanInFabricRecvTryResult::Ok) {
uint32_t producer = 0;
uint32_t seq = 0;
unpack_message(value, producer, seq);
if (producer >= kThreads ||
seq != expected[producer]) {
std::cerr
<< "fan-in mismatch: producer "
<< producer << " expected "
<< expected[producer] << " got "
<< seq << "\n";
failed.store(true);
break;
}
if (producer == 0) {
auto recv_time =
std::chrono::steady_clock::now()
.time_since_epoch();
auto recv_ns =
std::chrono::duration_cast<
nanoseconds>(recv_time)
.count();
recv_time0[seq - 1] = recv_ns;
} else {
auto recv_time =
std::chrono::steady_clock::now()
.time_since_epoch();
auto recv_ns =
std::chrono::duration_cast<
nanoseconds>(recv_time)
.count();
recv_time1[seq - 1] = recv_ns;
}
++expected[producer];
++received;
continue;
}
Fabric::BitmapType bitmap = 0;
fabric.full_scan(bitmap);
}
});
producer0.join();
producer1.join();
consumer.join();
if (failed.load()) {
return 1;
}
for (int i = 1; i <= n; ++i) {
uint64_t payload0 = pack_message(0, i);
uint64_t payload1 = pack_message(1, i);
int64_t send_ns0 = send_time0[i - 1];
int64_t recv_ns0 = recv_time0[i - 1];
int64_t send_ns1 = send_time1[i - 1];
int64_t recv_ns1 = recv_time1[i - 1];
std::cout << payload0 << ","
<< (recv_ns0 - send_ns0) << "\n";
std::cout << payload1 << ","
<< (recv_ns1 - send_ns1) << "\n";
}
return 0;
}

134
tests/fan_out/basic_test.cc Normal file
View File

@@ -0,0 +1,134 @@
#include <atomic>
#include <chrono>
#include <cstdint>
#include <iostream>
#include <thread>
#include <vector>
#include <weaver.h>
constexpr std::size_t kConsumers = 2;
constexpr std::size_t kBufferSize = 256;
using std::chrono::nanoseconds;
using std::chrono::steady_clock;
int main() {
int n = 0;
if (!(std::cin >> n) || n <= 0) {
std::cerr << "n must be positive\n";
return 1;
}
using Fabric =
THWeaver::FanOutFabric<uint64_t, 8, kBufferSize, 8, kConsumers>;
Fabric fabric;
fabric.init();
std::atomic<bool> failed(false);
std::atomic<int> consumed0(0);
std::atomic<int> consumed1(0);
std::vector<int64_t> send_time(n, -1);
std::vector<int64_t> recv_time0(n, -1);
std::vector<int64_t> recv_time1(n, -1);
std::thread consumer0([&]() {
auto token = fabric.get_empty_borrowed_ref<0>();
int expected = 1;
while (expected <= n && !failed.load()) {
if (fabric.recv<0>(token) !=
THWeaver::FanOutFabricRecvResult::Ok) {
continue;
}
if (token.get() != static_cast<uint64_t>(expected)) {
std::cerr << "fan-out mismatch on consumer 0: "
<< "expected " << expected << " got "
<< token.get() << "\n";
failed.store(true);
break;
}
auto recv_time = steady_clock::now().time_since_epoch();
auto recv_ns =
std::chrono::duration_cast<nanoseconds>(
recv_time)
.count();
recv_time0[expected - 1] = recv_ns;
token.release();
consumed0.store(expected);
++expected;
}
});
std::thread consumer1([&]() {
auto token = fabric.get_empty_borrowed_ref<1>();
int expected = 1;
while (expected <= n && !failed.load()) {
if (fabric.recv<1>(token) !=
THWeaver::FanOutFabricRecvResult::Ok) {
continue;
}
if (token.get() != static_cast<uint64_t>(expected)) {
std::cerr << "fan-out mismatch on consumer 1: "
<< "expected " << expected << " got "
<< token.get() << "\n";
failed.store(true);
break;
}
auto recv_time = steady_clock::now().time_since_epoch();
auto recv_ns =
std::chrono::duration_cast<nanoseconds>(
recv_time)
.count();
recv_time1[expected - 1] = recv_ns;
token.release();
consumed1.store(expected);
++expected;
}
});
constexpr uint64_t policy = (1ULL << 0) | (1ULL << 1);
for (int i = 1; i <= n && !failed.load(); ++i) {
while (!failed.load()) {
auto now = steady_clock::now().time_since_epoch();
auto ns = std::chrono::duration_cast<nanoseconds>(now)
.count();
auto res = fabric.send(static_cast<uint64_t>(i), policy,
kBufferSize);
if (res.result ==
THWeaver::FanOutFabricSendResult<uint64_t>::Ok) {
send_time[i - 1] = ns;
break;
}
if (res.result ==
THWeaver::FanOutFabricSendResult<
uint64_t>::ErrPartialSuccess) {
std::cerr << "fan-out partial success for "
<< "message " << i << "\n";
failed.store(true);
break;
}
}
while (!failed.load() &&
(consumed0.load() < i || consumed1.load() < i)) {
}
}
consumer0.join();
consumer1.join();
if (failed.load()) {
return 1;
}
for (int i = 1; i <= n; ++i) {
int64_t send_ns = send_time[i - 1];
int64_t recv_ns0 = recv_time0[i - 1];
int64_t recv_ns1 = recv_time1[i - 1];
std::cout << i << "," << (recv_ns0 - send_ns) << "\n";
std::cout << i << "," << (recv_ns1 - send_ns) << "\n";
}
return 0;
}