From dd3d6be0736da0634e6852999ce8ddd0f51716e0 Mon Sep 17 00:00:00 2001 From: Peisong Xiao Date: Sun, 4 Jan 2026 13:21:17 -0500 Subject: [PATCH] [TESTED] patched for basic tests, moved fan-out buffer to the stack, all basic tests are runnable --- include/weaver.h | 17 ++-- tests/fan_in/basic_test.cc | 161 ++++++++++++++++++++++++++++++++++++ tests/fan_out/basic_test.cc | 134 ++++++++++++++++++++++++++++++ 3 files changed, 303 insertions(+), 9 deletions(-) create mode 100644 tests/fan_in/basic_test.cc create mode 100644 tests/fan_out/basic_test.cc diff --git a/include/weaver.h b/include/weaver.h index 9cd708e..fbc11c3 100644 --- a/include/weaver.h +++ b/include/weaver.h @@ -9,7 +9,6 @@ #include #include #include -#include namespace THWeaver { @@ -147,7 +146,7 @@ public: FanInFabric &operator=(FanInFabric &&) = delete; void init() noexcept { - for (int i = 0; i < IN_THREAD_CNT; ++i) + for (std::size_t i = 0; i < IN_THREAD_CNT; ++i) in_queues[i].init(); hint.store(0); dl.init(); @@ -336,7 +335,7 @@ private: uint8_t first; uint8_t last; void init() noexcept { - for (int i = 0; i < IN_THREAD_CNT; ++i) { + for (std::size_t i = 0; i < IN_THREAD_CNT; ++i) { next[i] = (i + 1) % IN_THREAD_CNT; prev[i] = (i + IN_THREAD_CNT - 1) % IN_THREAD_CNT; @@ -385,6 +384,7 @@ template class FanOutFabric { +public: using BitmapType = uint64_t; using RefCountType = uint32_t; using IndexType = uint16_t; @@ -462,12 +462,11 @@ class FanOutFabric { }; void init() noexcept { - for (int i = 0; i < OUT_THREAD_CNT; ++i) { + for (std::size_t i = 0; i < OUT_THREAD_CNT; ++i) { reclaim_queues[i].init(); consumer_queues[i].init(); } reclaim_hint.store(0, std::memory_order_release); - buffer.resize(BUFFER_SIZE); fs.init(); reclaim_rot = 0; } @@ -539,7 +538,7 @@ class FanOutFabric { for (std::size_t i = 0; i < OUT_THREAD_CNT; ++i) { if ((policy & (BitmapType(1) << i)) && - consumer_queues[i].send(slot) != + consumer_queues[i].send(std::move(slot)) != EndpointQueueSendResult::Ok) ++failed_cnt; else @@ -655,7 +654,7 @@ class FanOutFabric { static_assert(thread_id < OUT_THREAD_CNT, "Thread ID out of bounds"); - auto res = reclaim_queues[thread_id].send(slot); + auto res = reclaim_queues[thread_id].send(std::move(slot)); if (res == EndpointQueueSendResult::Ok) { reclaim_hint.fetch_or(BitmapType(1) << thread_id, std::memory_order_relaxed); @@ -676,12 +675,12 @@ private: std::atomic count; MessageType payload; }; - std::vector buffer; + std::array buffer; struct alignas(CLS) FreeStack { std::array free; int top; void init() { - for (int i = 0; i < BUFFER_SIZE; ++i) + for (std::size_t i = 0; i < BUFFER_SIZE; ++i) free[i] = i; top = BUFFER_SIZE; } diff --git a/tests/fan_in/basic_test.cc b/tests/fan_in/basic_test.cc new file mode 100644 index 0000000..06141d5 --- /dev/null +++ b/tests/fan_in/basic_test.cc @@ -0,0 +1,161 @@ +#include +#include +#include +#include +#include +#include + +#include + +using std::chrono::nanoseconds; + +constexpr std::size_t kThreads = 2; + +uint64_t pack_message(uint32_t producer, uint32_t seq) { + return (static_cast(producer) << 32) | + static_cast(seq); +} + +void unpack_message(uint64_t value, uint32_t &producer, uint32_t &seq) { + producer = static_cast(value >> 32); + seq = static_cast(value & 0xffffffffu); +} + +using std::chrono::nanoseconds; +using std::chrono::steady_clock; + +int main() { + int n = 0; + if (!(std::cin >> n) || n <= 0) { + std::cerr << "n must be positive\n"; + return 1; + } + + using Fabric = THWeaver::FanInFabric; + Fabric fabric; + fabric.init(); + + std::atomic failed(false); + + std::vector send_time0(n, -1); + std::vector send_time1(n, -1); + std::vector recv_time0(n, -1); + std::vector recv_time1(n, -1); + + std::thread producer0([&]() { + for (int i = 1; i <= n && !failed.load(); ++i) { + while (true) { + auto now = + std::chrono::steady_clock::now() + .time_since_epoch(); + auto ns = + std::chrono::duration_cast( + now) + .count(); + if (fabric.send<0>(pack_message(0, i)) == + THWeaver::FanInFabricSendResult::Ok) { + send_time0[i - 1] = ns; + break; + } + } + } + }); + + std::thread producer1([&]() { + for (int i = 1; i <= n && !failed.load(); ++i) { + while (true) { + auto now = + std::chrono::steady_clock::now() + .time_since_epoch(); + auto ns = + std::chrono::duration_cast( + now) + .count(); + if (fabric.send<1>(pack_message(1, i)) == + THWeaver::FanInFabricSendResult::Ok) { + send_time1[i - 1] = ns; + break; + } + } + } + }); + + std::thread consumer([&]() { + std::vector expected(kThreads, 1); + std::size_t received = 0; + const std::size_t total = + static_cast(n) * kThreads; + + while (received < total && !failed.load()) { + uint64_t value = 0; + auto res = fabric.recv_try(value); + if (res == THWeaver::FanInFabricRecvTryResult::Ok) { + uint32_t producer = 0; + uint32_t seq = 0; + unpack_message(value, producer, seq); + + if (producer >= kThreads || + seq != expected[producer]) { + std::cerr + << "fan-in mismatch: producer " + << producer << " expected " + << expected[producer] << " got " + << seq << "\n"; + failed.store(true); + break; + } + + if (producer == 0) { + auto recv_time = + std::chrono::steady_clock::now() + .time_since_epoch(); + auto recv_ns = + std::chrono::duration_cast< + nanoseconds>(recv_time) + .count(); + recv_time0[seq - 1] = recv_ns; + } else { + auto recv_time = + std::chrono::steady_clock::now() + .time_since_epoch(); + auto recv_ns = + std::chrono::duration_cast< + nanoseconds>(recv_time) + .count(); + recv_time1[seq - 1] = recv_ns; + } + + ++expected[producer]; + ++received; + continue; + } + + Fabric::BitmapType bitmap = 0; + fabric.full_scan(bitmap); + } + }); + + producer0.join(); + producer1.join(); + consumer.join(); + + if (failed.load()) { + return 1; + } + + for (int i = 1; i <= n; ++i) { + uint64_t payload0 = pack_message(0, i); + uint64_t payload1 = pack_message(1, i); + + int64_t send_ns0 = send_time0[i - 1]; + int64_t recv_ns0 = recv_time0[i - 1]; + int64_t send_ns1 = send_time1[i - 1]; + int64_t recv_ns1 = recv_time1[i - 1]; + + std::cout << payload0 << "," + << (recv_ns0 - send_ns0) << "\n"; + std::cout << payload1 << "," + << (recv_ns1 - send_ns1) << "\n"; + } + return 0; +} diff --git a/tests/fan_out/basic_test.cc b/tests/fan_out/basic_test.cc new file mode 100644 index 0000000..84bc276 --- /dev/null +++ b/tests/fan_out/basic_test.cc @@ -0,0 +1,134 @@ +#include +#include +#include +#include +#include +#include + +#include + +constexpr std::size_t kConsumers = 2; +constexpr std::size_t kBufferSize = 256; + +using std::chrono::nanoseconds; +using std::chrono::steady_clock; + +int main() { + int n = 0; + if (!(std::cin >> n) || n <= 0) { + std::cerr << "n must be positive\n"; + return 1; + } + + using Fabric = + THWeaver::FanOutFabric; + Fabric fabric; + fabric.init(); + + std::atomic failed(false); + std::atomic consumed0(0); + std::atomic consumed1(0); + std::vector send_time(n, -1); + std::vector recv_time0(n, -1); + std::vector recv_time1(n, -1); + + std::thread consumer0([&]() { + auto token = fabric.get_empty_borrowed_ref<0>(); + int expected = 1; + while (expected <= n && !failed.load()) { + if (fabric.recv<0>(token) != + THWeaver::FanOutFabricRecvResult::Ok) { + continue; + } + if (token.get() != static_cast(expected)) { + std::cerr << "fan-out mismatch on consumer 0: " + << "expected " << expected << " got " + << token.get() << "\n"; + failed.store(true); + break; + } + auto recv_time = steady_clock::now().time_since_epoch(); + auto recv_ns = + std::chrono::duration_cast( + recv_time) + .count(); + recv_time0[expected - 1] = recv_ns; + token.release(); + consumed0.store(expected); + ++expected; + } + }); + + std::thread consumer1([&]() { + auto token = fabric.get_empty_borrowed_ref<1>(); + int expected = 1; + while (expected <= n && !failed.load()) { + if (fabric.recv<1>(token) != + THWeaver::FanOutFabricRecvResult::Ok) { + continue; + } + if (token.get() != static_cast(expected)) { + std::cerr << "fan-out mismatch on consumer 1: " + << "expected " << expected << " got " + << token.get() << "\n"; + failed.store(true); + break; + } + auto recv_time = steady_clock::now().time_since_epoch(); + auto recv_ns = + std::chrono::duration_cast( + recv_time) + .count(); + recv_time1[expected - 1] = recv_ns; + token.release(); + consumed1.store(expected); + ++expected; + } + }); + + constexpr uint64_t policy = (1ULL << 0) | (1ULL << 1); + + for (int i = 1; i <= n && !failed.load(); ++i) { + while (!failed.load()) { + auto now = steady_clock::now().time_since_epoch(); + auto ns = std::chrono::duration_cast(now) + .count(); + auto res = fabric.send(static_cast(i), policy, + kBufferSize); + if (res.result == + THWeaver::FanOutFabricSendResult::Ok) { + send_time[i - 1] = ns; + break; + } + if (res.result == + THWeaver::FanOutFabricSendResult< + uint64_t>::ErrPartialSuccess) { + std::cerr << "fan-out partial success for " + << "message " << i << "\n"; + failed.store(true); + break; + } + } + + while (!failed.load() && + (consumed0.load() < i || consumed1.load() < i)) { + } + } + + consumer0.join(); + consumer1.join(); + + if (failed.load()) { + return 1; + } + + for (int i = 1; i <= n; ++i) { + int64_t send_ns = send_time[i - 1]; + int64_t recv_ns0 = recv_time0[i - 1]; + int64_t recv_ns1 = recv_time1[i - 1]; + + std::cout << i << "," << (recv_ns0 - send_ns) << "\n"; + std::cout << i << "," << (recv_ns1 - send_ns) << "\n"; + } + return 0; +}