finished part of the hosts, topos and configuration next

This commit is contained in:
2025-09-13 01:24:38 -04:00
parent 3d3d8113b2
commit 321c60bcf8
66 changed files with 2661 additions and 22 deletions

View File

@@ -1,4 +1,5 @@
#include "core/host.h"
#include "core/error.h"
namespace dofs {

View File

@@ -27,12 +27,15 @@ public:
// Data-plane completion: a whole flow has arrived.
virtual void recv_flow(NodeId src,
FlowId flow,
FlowPriority priority,
Bytes flow_size) = 0;
// Control/telemetry interrupt: ACK/NACK/TRIM_BACK, etc.
virtual void recv_frame(const Packet& frame) = 0;
virtual void recv_mgmt_msg(std::unique_ptr<struct MgmtMsg> msg) = 0;
Host(const Host &) = delete;
Host &operator=(const Host &) = delete;

View File

@@ -49,7 +49,25 @@ void Simulator::run_until(Time end_time) {
}
}
std::vector<std::unique_ptr<Simulator>>& Simulator::registry_() {
void Simulator::lock() noexcept {
_locked = true;
}
void Simulator::flush_after(Time grace) noexcept {
_locked = true;
const Time deadline = _now + grace;
run_until(deadline);
while (!_event_pq.empty()) {
_event_pq.pop();
}
_cancelled.clear();
}
std::vector<std::unique_ptr<Simulator>>& Simulator::_registry() {
static std::vector<std::unique_ptr<Simulator>> reg;
return reg;
}
@@ -59,7 +77,7 @@ std::pair<InstanceId, Simulator *> Simulator::create_simulator(InstanceId id) {
return {INVALID_INSTANCE_ID, nullptr};
}
auto& reg = registry_();
auto& reg = _registry();
if (static_cast<size_t>(id) >= reg.size()) {
reg.resize(static_cast<size_t>(id) + 1);
@@ -79,7 +97,7 @@ Simulator * Simulator::get_simulator(InstanceId id) noexcept {
if (id == INVALID_INSTANCE_ID)
return nullptr;
auto& reg = registry_();
auto& reg = _registry();
const size_t idx = static_cast<size_t>(id);
if (idx >= reg.size())

View File

@@ -1,8 +1,6 @@
#ifndef CORE_SIMULATOR_H
#define CORE_SIMULATOR_H
// TODO: implement concrete node methods for different nodes, store them all uniformly in vector<u_ptr<Node>> and return a get function
#include <cstdint>
#include <functional>
#include <queue>
@@ -49,6 +47,7 @@ private:
std::unordered_set<EventId> _cancelled;
Time _now{};
EventId _next_id{1};
bool _locked{false};
std::unique_ptr<Rng> _rng;
@@ -65,6 +64,9 @@ public:
template <class F, class... Args>
EventId schedule_at(Time abs_time, F&& f, Args&&... args) {
if (_locked)
return NULL_EVENT;
if (abs_time < _now)
return NULL_EVENT;
@@ -88,6 +90,15 @@ public:
bool run_next();
void run_until(Time end_time);
// ----- Termination helpers -----
// Prevent any future schedule_* calls from enqueuing events.
void lock() noexcept;
bool is_locked() const noexcept {
return _locked;
}
// Lock, then run all events up to now()+grace, finally discard any remaining.
void flush_after(Time grace) noexcept;
// ---------- Object management ----------
Rng* create_rng(std::uint64_t seed);
Rng* get_rng() noexcept;
@@ -112,7 +123,7 @@ private:
};
}
static std::vector<std::unique_ptr<Simulator>>& registry_();
static std::vector<std::unique_ptr<Simulator>>& _registry();
};
} // namespace dofs

View File

@@ -65,6 +65,12 @@ enum class LBType : uint8_t {
RANDOM_PACKET_SPRAYING
};
enum class MgmtKind : uint8_t {
HEARTBEAT,
JOB_FINISHED,
END_SIMULATION
};
} // namespace dofs
#endif // CORE_TYPES_H

View File

@@ -8,7 +8,7 @@ target_include_directories(dofs_hosts
$<INSTALL_INTERFACE:include>
)
target_link_libraries(dofs_hosts PUBLIC dofs_core)
target_link_libraries(dofs_hosts PUBLIC dofs_core dofs_network)
if(DOFS_GLOB_SOURCES)
file(GLOB HOSTS_CC CONFIGURE_DEPENDS "${CMAKE_CURRENT_SOURCE_DIR}/*.cc")
@@ -18,9 +18,13 @@ if(DOFS_GLOB_SOURCES)
else()
target_sources(dofs_hosts
PUBLIC
nodes_dummy.h
mgmt_msg.h
policies.h
publisher.h
subscriber.h
PRIVATE
nodes_dummy.cc
publisher.cc
subscriber.cc
)
endif()
@@ -39,7 +43,12 @@ endforeach()
add_library(dofs_hosts_headers_tooling STATIC ${HOSTS_STUBS})
# inherit include dirs/usage reqs from dofs_hosts
target_link_libraries(dofs_hosts_headers_tooling PRIVATE dofs_hosts dofs_core)
target_link_libraries(
dofs_hosts_headers_tooling
PRIVATE
dofs_hosts
dofs_core
dofs_network)
# This ensures the same -I flags even if cmake-ide builds the stubs alone
target_include_directories(dofs_hosts_headers_tooling

51
src/hosts/mgmt_msg.h Normal file
View File

@@ -0,0 +1,51 @@
#ifndef HOSTS_MGMT_MSG_H
#define HOSTS_MGMT_MSG_H
#include <cstdint>
#include <memory>
#include "core/time.h"
#include "core/types.h"
namespace dofs {
struct MgmtMsg {
virtual ~MgmtMsg() = default;
virtual MgmtKind kind() const noexcept = 0;
};
struct HeartbeatMsg final : public MgmtMsg {
NodeId subscriber_id;
NodeStatus status;
Time generated_at;
explicit HeartbeatMsg(NodeId sid, NodeStatus st, Time t) noexcept
: subscriber_id(sid), status(st), generated_at(t) {}
MgmtKind kind() const noexcept override {
return MgmtKind::HEARTBEAT;
}
};
struct JobFinishedMsg final : public MgmtMsg {
FlowId flow_id;
Time finished_at;
explicit JobFinishedMsg(FlowId fid, Time t) noexcept
: flow_id(fid), finished_at(t) {}
MgmtKind kind() const noexcept override {
return MgmtKind::JOB_FINISHED;
}
};
struct EndSimulationMsg final : public MgmtMsg {
MgmtKind kind() const noexcept override {
return MgmtKind::END_SIMULATION;
}
};
using MgmtMsgPtr = std::unique_ptr<MgmtMsg>;
} // namespace dofs
#endif // HOSTS_MGMT_MSG_H

View File

@@ -1,6 +0,0 @@
#include "core/error.h"
#include "core/logger.h"
namespace dofs {
int _nodes_tooling_dummy = 0;
}

View File

@@ -1,4 +0,0 @@
#ifndef NODES_DUMMY
#define NODES_DUMMY
#endif

86
src/hosts/policies.h Normal file
View File

@@ -0,0 +1,86 @@
#ifndef HOSTS_POLICIES_H
#define HOSTS_POLICIES_H
#include <cstdint>
#include <vector>
#include <unordered_map>
#include <cassert>
#include "core/types.h"
namespace dofs {
// Abstract publisher policy: map logical update groups → concrete multicast mask.
struct PubBasePolicy {
virtual ~PubBasePolicy() = default;
virtual PacketGroups select_multicast_groups(PacketGroups update_groups_mask) =
0;
};
// Round-robin policy over bit ranges per logical group.
class PubRRPolicy final : public PubBasePolicy {
public:
struct ReplicaRange {
uint32_t update_group; // logical group id you use in your workload file
uint8_t low_bit; // inclusive, < 128
uint8_t high_bit; // inclusive, < 128
};
explicit PubRRPolicy(std::vector<ReplicaRange> ranges)
: _ranges(std::move(ranges)) {
validate_and_build();
}
PacketGroups select_multicast_groups(PacketGroups update_groups_mask) override {
PacketGroups result = 0;
for (auto const& r : _ranges) {
if (!group_present(update_groups_mask, r.update_group))
continue;
const uint8_t width = static_cast<uint8_t>(r.high_bit - r.low_bit + 1);
uint8_t &cursor = _rr_cursor[r.update_group];
uint8_t chosen = static_cast<uint8_t>(r.low_bit + (cursor % width));
cursor = static_cast<uint8_t>((cursor + 1) % width);
result |= (PacketGroups(1) << chosen);
}
return result;
}
private:
std::vector<ReplicaRange> _ranges;
std::unordered_map<uint32_t, uint8_t> _rr_cursor; // per logical group
static bool group_present(PacketGroups mask, uint32_t gid) noexcept {
// convention: logical update groups are not bits, theyre ids you pass in;
// presence is determined outside—if you want bitwise presence, adapt here later.
// For now, assume any nonzero mask means all configured groups are requested.
(void)gid;
return mask != 0;
}
void validate_and_build() {
for (auto const& r : _ranges) {
assert(r.low_bit <= r.high_bit);
assert(r.high_bit < 128 && r.low_bit < 128);
(void)r;
// Non-overlap across different logical groups is allowed.
}
// init all cursors to 0 for determinism.
for (auto const& r : _ranges)
_rr_cursor[r.update_group] = 0;
}
};
struct SubBasePolicy {
virtual ~SubBasePolicy() = default;
};
struct SubDummyPolicy final : public SubBasePolicy {
~SubDummyPolicy() override = default;
};
} // namespace dofs
#endif // HOSTS_POLICIES_H

143
src/hosts/publisher.cc Normal file
View File

@@ -0,0 +1,143 @@
#include "hosts/publisher.h"
#include "core/error.h"
#include "network/network_nic.h"
namespace dofs {
Publisher::Publisher(Simulator *const sim,
NodeId id,
Time update_latency_base,
std::unique_ptr<PubBasePolicy> policy,
Time mgmt_latency) noexcept
: Host(sim, id)
, _sim(sim)
, _update_latency_base(update_latency_base)
, _update_latency_cur(update_latency_base)
, _mgmt_latency(mgmt_latency)
, _policy(std::move(policy)) {}
void Publisher::set_status(NodeStatus s, Time new_latency) noexcept {
// Enforce clamp: new_latency == 0 means "no-op"
if (new_latency.count() != 0 && new_latency < _update_latency_base) {
DOFS_ERROR("publisher", "set_status: new latency below base; ignored");
} else if (new_latency.count() != 0) {
_update_latency_cur = new_latency;
}
// Forward to Node base if available (Host derives from Node).
// If Node::set_status is public, this works; otherwise adjust when you expose it.
// (We keep this call for correctness; remove if not yet available.)
// Node::set_status(s);
if (s == NodeStatus::DOWN) {
if (_staging_evt != NULL_EVENT) {
_sim->cancel(_staging_evt);
_staging_evt = NULL_EVENT;
}
_state = State::Quiesced;
}
}
void Publisher::recv_update(Bytes size,
PacketGroups update_groups_mask) noexcept {
if (size == 0 || update_groups_mask == 0) {
DOFS_ERROR("publisher", "recv_update: invalid size or groups");
return;
}
_queue.push_back(Pending{size, update_groups_mask, _sim->now()});
++_updates_in;
if (_state == State::Idle) {
_state = State::Staging;
arm_staging_if_needed();
}
}
void Publisher::arm_staging_if_needed() noexcept {
if (_staging_evt != NULL_EVENT)
return;
if (_queue.empty()) {
_state = State::Idle;
return;
}
_staging_evt = _sim->schedule_after(_update_latency_cur, [this]() {
this->on_staging_timer();
});
}
void Publisher::on_staging_timer() noexcept {
_staging_evt = NULL_EVENT;
if (_state == State::Quiesced)
return;
if (_queue.empty()) {
_state = State::Idle;
return;
}
Pending p = _queue.front();
_queue.pop_front();
const PacketGroups mcast_mask = _policy ? _policy->select_multicast_groups(
p.mask) : PacketGroups{0};
if (mcast_mask == 0) {
DOFS_ERROR("publisher", "policy produced empty multicast mask");
} else {
auto* n = this->nic();
if (!n) {
DOFS_ERROR("publisher", "no NIC attached; dropping update");
} else {
n->send_flow(mcast_mask, p.size, FlowPriority::MICE);
_bytes_out += p.size;
}
}
// Next
if (_queue.empty()) {
_state = State::Idle;
} else {
arm_staging_if_needed();
}
}
void Publisher::recv_mgmt_msg(MgmtMsgPtr msg) noexcept {
if (!msg)
return;
++_mgmt_in;
switch (msg->kind()) {
case MgmtKind::END_SIMULATION: {
if (_staging_evt != NULL_EVENT) {
_sim->cancel(_staging_evt);
_staging_evt = NULL_EVENT;
}
_state = State::Quiesced;
}
break;
case MgmtKind::HEARTBEAT:
case MgmtKind::JOB_FINISHED:
default:
// informational only for now
break;
}
}
void Publisher::recv_flow(NodeId, FlowId, FlowPriority, Bytes) {
// Publisher is mainly a sender in this workload; ignore data-plane flow arrivals.
}
void Publisher::recv_frame(const Packet &) {
// Control frames not used by Publisher in this stage.
}
} // namespace dofs

79
src/hosts/publisher.h Normal file
View File

@@ -0,0 +1,79 @@
#ifndef HOSTS_PUBLISHER_H
#define HOSTS_PUBLISHER_H
#include <cstdint>
#include <deque>
#include <memory>
#include "core/time.h"
#include "core/types.h"
#include "core/host.h"
#include "core/simulator.h"
#include "hosts/mgmt_msg.h"
#include "hosts/policies.h"
namespace dofs {
class Publisher final : public Host {
public:
Publisher(Simulator* sim,
NodeId id,
Time update_latency_base,
std::unique_ptr<PubBasePolicy> policy,
Time mgmt_latency) noexcept;
// Triggered by workload driver
void recv_update(Bytes size, PacketGroups update_groups_mask) noexcept;
// Status with latency clamp; new_latency==0 means "no change"
void set_status(NodeStatus s, Time new_latency = Time{}) noexcept;
// Mgmt-plane receive (from subscribers)
virtual void recv_mgmt_msg(MgmtMsgPtr msg) noexcept override;
// Host overrides (data-plane callbacks)
virtual void recv_flow(NodeId src, FlowId flow, FlowPriority prio,
Bytes flow_size) override;
virtual void recv_frame(const Packet& frame) override;
// Telemetry
uint64_t updates_in() const noexcept {
return _updates_in;
}
uint64_t bytes_out() const noexcept {
return _bytes_out;
}
private:
enum class State : uint8_t { Idle = 0, Staging = 1, Quiesced = 2 };
struct Pending {
Bytes size{0};
PacketGroups mask{0};
Time enq_time{};
};
void arm_staging_if_needed() noexcept;
void on_staging_timer() noexcept;
private:
Simulator *const _sim;
Time _update_latency_base;
Time _update_latency_cur;
Time _mgmt_latency;
std::unique_ptr<PubBasePolicy> _policy;
State _state{State::Idle};
EventId _staging_evt{NULL_EVENT};
std::deque<Pending> _queue;
// telemetry
uint64_t _updates_in{0};
uint64_t _bytes_out{0};
uint64_t _mgmt_in{0};
uint64_t _mgmt_out{0};
};
} // namespace dofs
#endif // HOSTS_PUBLISHER_H

104
src/hosts/subscriber.cc Normal file
View File

@@ -0,0 +1,104 @@
#include "hosts/subscriber.h"
#include "hosts/publisher.h"
#include "core/error.h"
namespace dofs {
Subscriber::Subscriber(Simulator* sim,
NodeId id,
Publisher* publisher,
std::unique_ptr<SubBasePolicy> policy,
Time mgmt_latency,
Time heartbeat_period) noexcept
: Host(sim, id)
, _sim(sim)
, _publisher(publisher)
, _policy(std::move(policy))
, _mgmt_latency(mgmt_latency)
, _hb_period(heartbeat_period) {
// Deterministic stagger: simple function of NodeId.
Time::rep off = (static_cast<Time::rep>(id) % (std::max<Time::rep>(1,
heartbeat_period.count() / 4)));
schedule_next_heartbeat(Time(off));
}
void Subscriber::set_status(NodeStatus s) noexcept {
// Node::set_status(s);
if (s == NodeStatus::DOWN) {
if (_hb_evt != NULL_EVENT) {
_sim->cancel(_hb_evt);
_hb_evt = NULL_EVENT;
}
_state = State::Quiesced;
}
}
void Subscriber::schedule_next_heartbeat(Time delay) noexcept {
if (_state == State::Quiesced)
return;
_hb_evt = _sim->schedule_after(delay, [this]() {
this->on_heartbeat_timer();
});
}
void Subscriber::on_heartbeat_timer() noexcept {
_hb_evt = NULL_EVENT;
if (_state == State::Quiesced)
return;
if (_publisher) {
const NodeId sid = this->id();
const NodeStatus st = NodeStatus::OK;
const Time gen = _sim->now();
Publisher* const pub = _publisher;
_sim->schedule_after(_mgmt_latency, [pub, sid, st, gen]() {
pub->recv_mgmt_msg(std::make_unique<HeartbeatMsg>(sid, st, gen));
});
}
schedule_next_heartbeat(_hb_period);
}
void Subscriber::send_job_finished(FlowId flow) noexcept {
if (_publisher) {
const Time fin = _sim->now();
_sim->schedule_after(_mgmt_latency, [pub = _publisher, flow, fin]() mutable {
pub->recv_mgmt_msg(std::make_unique<JobFinishedMsg>(flow, fin));
});
}
}
void Subscriber::recv_mgmt_msg(MgmtMsgPtr msg) noexcept {
if (!msg)
return;
if (msg->kind() == MgmtKind::END_SIMULATION) {
if (_hb_evt != NULL_EVENT) {
_sim->cancel(_hb_evt);
_hb_evt = NULL_EVENT;
}
_state = State::Quiesced;
}
}
void Subscriber::recv_flow(NodeId /*src*/, FlowId flow, FlowPriority /*prio*/,
Bytes /*flow_size*/) {
if (_state == State::Quiesced) {
DOFS_ERROR("subscriber", "drop flow: quiesced");
return;
}
// 3-day scope: zero processing delay → immediately report completion.
send_job_finished(flow);
_state = State::Idle;
}
void Subscriber::recv_frame(const Packet & /*frame*/) {
// Not used for this pub/sub workload phase.
}
} // namespace dofs

61
src/hosts/subscriber.h Normal file
View File

@@ -0,0 +1,61 @@
#ifndef HOSTS_SUBSCRIBER_H
#define HOSTS_SUBSCRIBER_H
#include <cstdint>
#include <memory>
#include "core/host.h"
#include "core/simulator.h"
#include "hosts/mgmt_msg.h"
#include "hosts/policies.h"
namespace dofs {
class Publisher;
class Subscriber final : public Host {
public:
Subscriber(Simulator* sim,
NodeId id,
Publisher* publisher,
std::unique_ptr<SubBasePolicy> policy,
Time mgmt_latency,
Time heartbeat_period) noexcept;
// Mgmt-plane receive (from publisher)
virtual void recv_mgmt_msg(MgmtMsgPtr msg) noexcept override;
// Host overrides (data-plane callbacks)
void recv_flow(NodeId src, FlowId flow, FlowPriority prio,
Bytes flow_size) override;
void recv_frame(const Packet& frame) override;
void set_status(NodeStatus s) noexcept;
// For future: swap publisher pointer
void set_publisher(Publisher* p) noexcept {
_publisher = p;
}
private:
enum class State : uint8_t { Idle = 0, Processing = 1, Quiesced = 2 };
void schedule_next_heartbeat(Time delay) noexcept;
void on_heartbeat_timer() noexcept;
void send_job_finished(FlowId flow) noexcept;
private:
Simulator *_sim;
Publisher *_publisher;
std::unique_ptr<SubBasePolicy> _policy;
Time _mgmt_latency;
Time _hb_period;
EventId _hb_evt{NULL_EVENT};
State _state{State::Idle};
// simple token generator for completions (since FlowId not surfaced here)
uint64_t _next_token{1};
};
} // namespace dofs
#endif // HOSTS_SUBSCRIBER_H

View File

@@ -165,7 +165,8 @@ void NetworkNic::recv_pkt(Packet &pkt_in, PortId ingress) {
// Flow completion: if EOF, notify host with total size from payload (accumulation would live in RxFlowState).
if (pkt.is_eof() && _host) {
_host->recv_flow(pkt.src_node(), pkt.priority(), pkt.payload_size());
_host->recv_flow(pkt.src_node(),
pkt.flow_id(), pkt.priority(), pkt.payload_size());
}
break;

View File

@@ -59,7 +59,7 @@ public:
bool is_eof() const noexcept;
FlowPriority priority() const noexcept;
uint8_t priority_raw() const noexcept;
uint8_t priority_raw() const noexcept;
Bytes header_size() const noexcept;
Bytes payload_size() const noexcept;

View File

@@ -90,6 +90,9 @@ protected:
case FlowPriority::ELEPHANT:
return PRI_ELE;
default:
return PRI_CTRL;
}
return PRI_ELE; // defensive