#include #include #include #include #include #include #include constexpr std::size_t kConsumers = 2; constexpr std::size_t kBufferSize = 256; using std::chrono::nanoseconds; using std::chrono::steady_clock; int main() { int n = 0; if (!(std::cin >> n) || n <= 0) { std::cerr << "n must be positive\n"; return 1; } using Fabric = THWeaver::FanOutFabric; Fabric fabric; fabric.init(); std::atomic failed(false); std::atomic consumed0(0); std::atomic consumed1(0); std::vector send_time(n, -1); std::vector recv_time0(n, -1); std::vector recv_time1(n, -1); std::thread consumer0([&]() { auto token = fabric.get_empty_borrowed_ref<0>(); int expected = 1; while (expected <= n && !failed.load()) { if (fabric.recv<0>(token) != THWeaver::FanOutFabricRecvResult::Ok) { continue; } if (token.get() != static_cast(expected)) { std::cerr << "fan-out mismatch on consumer 0: " << "expected " << expected << " got " << token.get() << "\n"; failed.store(true); break; } auto recv_time = steady_clock::now().time_since_epoch(); auto recv_ns = std::chrono::duration_cast( recv_time) .count(); recv_time0[expected - 1] = recv_ns; token.release(); consumed0.store(expected); ++expected; } }); std::thread consumer1([&]() { auto token = fabric.get_empty_borrowed_ref<1>(); int expected = 1; while (expected <= n && !failed.load()) { if (fabric.recv<1>(token) != THWeaver::FanOutFabricRecvResult::Ok) { continue; } if (token.get() != static_cast(expected)) { std::cerr << "fan-out mismatch on consumer 1: " << "expected " << expected << " got " << token.get() << "\n"; failed.store(true); break; } auto recv_time = steady_clock::now().time_since_epoch(); auto recv_ns = std::chrono::duration_cast( recv_time) .count(); recv_time1[expected - 1] = recv_ns; token.release(); consumed1.store(expected); ++expected; } }); constexpr uint64_t policy = (1ULL << 0) | (1ULL << 1); for (int i = 1; i <= n && !failed.load(); ++i) { while (!failed.load()) { auto now = steady_clock::now().time_since_epoch(); auto ns = std::chrono::duration_cast(now) .count(); auto res = fabric.send(static_cast(i), policy, kBufferSize); if (res.result == THWeaver::FanOutFabricSendResult::Ok) { send_time[i - 1] = ns; break; } if (res.result == THWeaver::FanOutFabricSendResult< uint64_t>::ErrPartialSuccess) { std::cerr << "fan-out partial success for " << "message " << i << "\n"; failed.store(true); break; } } while (!failed.load() && (consumed0.load() < i || consumed1.load() < i)) { } } consumer0.join(); consumer1.join(); if (failed.load()) { return 1; } for (int i = 1; i <= n; ++i) { int64_t send_ns = send_time[i - 1]; int64_t recv_ns0 = recv_time0[i - 1]; int64_t recv_ns1 = recv_time1[i - 1]; std::cout << i << "," << (recv_ns0 - send_ns) << "\n"; std::cout << i << "," << (recv_ns1 - send_ns) << "\n"; } return 0; }