162 lines
6.4 KiB
C++
162 lines
6.4 KiB
C++
#include <atomic>
|
|
#include <chrono>
|
|
#include <cstdint>
|
|
#include <iostream>
|
|
#include <thread>
|
|
#include <vector>
|
|
|
|
#include <weaver.h>
|
|
|
|
using std::chrono::nanoseconds;
|
|
|
|
constexpr std::size_t kThreads = 2;
|
|
|
|
uint64_t pack_message(uint32_t producer, uint32_t seq) {
|
|
return (static_cast<uint64_t>(producer) << 32) |
|
|
static_cast<uint64_t>(seq);
|
|
}
|
|
|
|
void unpack_message(uint64_t value, uint32_t &producer, uint32_t &seq) {
|
|
producer = static_cast<uint32_t>(value >> 32);
|
|
seq = static_cast<uint32_t>(value & 0xffffffffu);
|
|
}
|
|
|
|
using std::chrono::nanoseconds;
|
|
using std::chrono::steady_clock;
|
|
|
|
int main() {
|
|
int n = 0;
|
|
if (!(std::cin >> n) || n <= 0) {
|
|
std::cerr << "n must be positive\n";
|
|
return 1;
|
|
}
|
|
|
|
using Fabric = THWeaver::FanInFabric<uint64_t, 10, kThreads>;
|
|
Fabric fabric;
|
|
fabric.init();
|
|
|
|
std::atomic<bool> failed(false);
|
|
|
|
std::vector<int64_t> send_time0(n, -1);
|
|
std::vector<int64_t> send_time1(n, -1);
|
|
std::vector<int64_t> recv_time0(n, -1);
|
|
std::vector<int64_t> recv_time1(n, -1);
|
|
|
|
std::thread producer0([&]() {
|
|
for (int i = 1; i <= n && !failed.load(); ++i) {
|
|
while (true) {
|
|
auto now =
|
|
std::chrono::steady_clock::now()
|
|
.time_since_epoch();
|
|
auto ns =
|
|
std::chrono::duration_cast<nanoseconds>(
|
|
now)
|
|
.count();
|
|
if (fabric.send<0>(pack_message(0, i)) ==
|
|
THWeaver::FanInFabricSendResult::Ok) {
|
|
send_time0[i - 1] = ns;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
});
|
|
|
|
std::thread producer1([&]() {
|
|
for (int i = 1; i <= n && !failed.load(); ++i) {
|
|
while (true) {
|
|
auto now =
|
|
std::chrono::steady_clock::now()
|
|
.time_since_epoch();
|
|
auto ns =
|
|
std::chrono::duration_cast<nanoseconds>(
|
|
now)
|
|
.count();
|
|
if (fabric.send<1>(pack_message(1, i)) ==
|
|
THWeaver::FanInFabricSendResult::Ok) {
|
|
send_time1[i - 1] = ns;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
});
|
|
|
|
std::thread consumer([&]() {
|
|
std::vector<uint32_t> expected(kThreads, 1);
|
|
std::size_t received = 0;
|
|
const std::size_t total =
|
|
static_cast<std::size_t>(n) * kThreads;
|
|
|
|
while (received < total && !failed.load()) {
|
|
uint64_t value = 0;
|
|
auto res = fabric.recv_try(value);
|
|
if (res == THWeaver::FanInFabricRecvTryResult::Ok) {
|
|
uint32_t producer = 0;
|
|
uint32_t seq = 0;
|
|
unpack_message(value, producer, seq);
|
|
|
|
if (producer >= kThreads ||
|
|
seq != expected[producer]) {
|
|
std::cerr
|
|
<< "fan-in mismatch: producer "
|
|
<< producer << " expected "
|
|
<< expected[producer] << " got "
|
|
<< seq << "\n";
|
|
failed.store(true);
|
|
break;
|
|
}
|
|
|
|
if (producer == 0) {
|
|
auto recv_time =
|
|
std::chrono::steady_clock::now()
|
|
.time_since_epoch();
|
|
auto recv_ns =
|
|
std::chrono::duration_cast<
|
|
nanoseconds>(recv_time)
|
|
.count();
|
|
recv_time0[seq - 1] = recv_ns;
|
|
} else {
|
|
auto recv_time =
|
|
std::chrono::steady_clock::now()
|
|
.time_since_epoch();
|
|
auto recv_ns =
|
|
std::chrono::duration_cast<
|
|
nanoseconds>(recv_time)
|
|
.count();
|
|
recv_time1[seq - 1] = recv_ns;
|
|
}
|
|
|
|
++expected[producer];
|
|
++received;
|
|
continue;
|
|
}
|
|
|
|
Fabric::BitmapType bitmap = 0;
|
|
fabric.full_scan(bitmap);
|
|
}
|
|
});
|
|
|
|
producer0.join();
|
|
producer1.join();
|
|
consumer.join();
|
|
|
|
if (failed.load()) {
|
|
return 1;
|
|
}
|
|
|
|
for (int i = 1; i <= n; ++i) {
|
|
uint64_t payload0 = pack_message(0, i);
|
|
uint64_t payload1 = pack_message(1, i);
|
|
|
|
int64_t send_ns0 = send_time0[i - 1];
|
|
int64_t recv_ns0 = recv_time0[i - 1];
|
|
int64_t send_ns1 = send_time1[i - 1];
|
|
int64_t recv_ns1 = recv_time1[i - 1];
|
|
|
|
std::cout << payload0 << ","
|
|
<< (recv_ns0 - send_ns0) << "\n";
|
|
std::cout << payload1 << ","
|
|
<< (recv_ns1 - send_ns1) << "\n";
|
|
}
|
|
return 0;
|
|
}
|