#include #include #include #include #include #include #include using std::chrono::nanoseconds; constexpr std::size_t kThreads = 2; uint64_t pack_message(uint32_t producer, uint32_t seq) { return (static_cast(producer) << 32) | static_cast(seq); } void unpack_message(uint64_t value, uint32_t &producer, uint32_t &seq) { producer = static_cast(value >> 32); seq = static_cast(value & 0xffffffffu); } using std::chrono::nanoseconds; using std::chrono::steady_clock; int main() { int n = 0; if (!(std::cin >> n) || n <= 0) { std::cerr << "n must be positive\n"; return 1; } using Fabric = THWeaver::FanInFabric; Fabric fabric; fabric.init(); std::atomic failed(false); std::vector send_time0(n, -1); std::vector send_time1(n, -1); std::vector recv_time0(n, -1); std::vector recv_time1(n, -1); std::thread producer0([&]() { for (int i = 1; i <= n && !failed.load(); ++i) { while (true) { auto now = std::chrono::steady_clock::now() .time_since_epoch(); auto ns = std::chrono::duration_cast( now) .count(); if (fabric.send<0>(pack_message(0, i)) == THWeaver::FanInFabricSendResult::Ok) { send_time0[i - 1] = ns; break; } } } }); std::thread producer1([&]() { for (int i = 1; i <= n && !failed.load(); ++i) { while (true) { auto now = std::chrono::steady_clock::now() .time_since_epoch(); auto ns = std::chrono::duration_cast( now) .count(); if (fabric.send<1>(pack_message(1, i)) == THWeaver::FanInFabricSendResult::Ok) { send_time1[i - 1] = ns; break; } } } }); std::thread consumer([&]() { std::vector expected(kThreads, 1); std::size_t received = 0; const std::size_t total = static_cast(n) * kThreads; while (received < total && !failed.load()) { uint64_t value = 0; auto res = fabric.recv_try(value); if (res == THWeaver::FanInFabricRecvTryResult::Ok) { uint32_t producer = 0; uint32_t seq = 0; unpack_message(value, producer, seq); if (producer >= kThreads || seq != expected[producer]) { std::cerr << "fan-in mismatch: producer " << producer << " expected " << expected[producer] << " got " << seq << "\n"; failed.store(true); break; } if (producer == 0) { auto recv_time = std::chrono::steady_clock::now() .time_since_epoch(); auto recv_ns = std::chrono::duration_cast< nanoseconds>(recv_time) .count(); recv_time0[seq - 1] = recv_ns; } else { auto recv_time = std::chrono::steady_clock::now() .time_since_epoch(); auto recv_ns = std::chrono::duration_cast< nanoseconds>(recv_time) .count(); recv_time1[seq - 1] = recv_ns; } ++expected[producer]; ++received; continue; } Fabric::BitmapType bitmap = 0; fabric.full_scan(bitmap); } }); producer0.join(); producer1.join(); consumer.join(); if (failed.load()) { return 1; } for (int i = 1; i <= n; ++i) { uint64_t payload0 = pack_message(0, i); uint64_t payload1 = pack_message(1, i); int64_t send_ns0 = send_time0[i - 1]; int64_t recv_ns0 = recv_time0[i - 1]; int64_t send_ns1 = send_time1[i - 1]; int64_t recv_ns1 = recv_time1[i - 1]; std::cout << payload0 << "," << (recv_ns0 - send_ns0) << "\n"; std::cout << payload1 << "," << (recv_ns1 - send_ns1) << "\n"; } return 0; }