added networking components, hosts are next

This commit is contained in:
2025-09-07 11:47:15 -04:00
commit 3411089908
59 changed files with 4777 additions and 0 deletions

56
src/core/CMakeLists.txt Normal file
View File

@@ -0,0 +1,56 @@
add_library(dofs_core STATIC)
# Propagate include dir via dofs_config to anyone linking dofs_core
target_link_libraries(dofs_core PUBLIC dofs_config)
target_include_directories(dofs_core
PUBLIC
$<BUILD_INTERFACE:${PROJECT_SOURCE_DIR}/src>
$<INSTALL_INTERFACE:include>
)
if(TARGET yaml-cpp)
target_link_libraries(dofs_core PUBLIC yaml-cpp)
endif()
if(DOFS_GLOB_SOURCES)
file(GLOB CORE_CC CONFIGURE_DEPENDS "${CMAKE_CURRENT_SOURCE_DIR}/*.cc")
target_sources(dofs_core PRIVATE ${CORE_CC})
else()
# Prefer explicit lists for reproducibility
target_sources(dofs_core
PUBLIC
time.h
timer.h
types.h
error.h
rng.h
logger.h
simulator.h
node.h
host.h
PRIVATE
error.cc
logger.cc
simulator.cc
node.cc
host.cc
)
endif()
add_library(dofs::core ALIAS dofs_core)
# --- Tooling-only unity TU for headers (core) ---
# Generate per-header stubs with the SAME basename (time.h -> time.cpp)
file(GLOB CORE_HEADERS CONFIGURE_DEPENDS "${CMAKE_CURRENT_SOURCE_DIR}/*.h")
set(CORE_STUBS)
foreach(h ${CORE_HEADERS})
get_filename_component(_base "${h}" NAME_WE)
set(_stub "${CMAKE_CURRENT_BINARY_DIR}/${_base}.cpp")
file(WRITE "${_stub}" "// tooling stub for ${_base}.h\n#include \"core/${_base}.h\"\n")
list(APPEND CORE_STUBS "${_stub}")
endforeach()
add_library(dofs_core_headers_tooling STATIC ${CORE_STUBS})
target_link_libraries(dofs_core_headers_tooling PRIVATE dofs_core)
# This target exists purely for compile_commands; no need to link it elsewhere.

34
src/core/error.cc Normal file
View File

@@ -0,0 +1,34 @@
#include "error.h"
#include <iostream>
#include <mutex>
#include <string>
namespace dofs {
static std::mutex g_error_mutex;
std::mutex &error_mutex() noexcept {
return g_error_mutex;
}
static inline void emit_error_line(std::string_view type,
std::optional<uint64_t> ts,
std::string_view message) noexcept {
std::lock_guard<std::mutex> lock(g_error_mutex);
std::cerr << '[' << type << ']';
if (ts.has_value()) {
std::cerr << '[' << *ts << ']';
}
std::cerr << ": " << message << '\n';
}
void log_error(std::string_view type,
std::string_view message,
std::optional<uint64_t> timestamp) noexcept {
emit_error_line(type, timestamp, message);
}
} // namespace dofs

54
src/core/error.h Normal file
View File

@@ -0,0 +1,54 @@
#ifndef CORE_ERROR_H
#define CORE_ERROR_H
#include <cstdint>
#include <optional>
#include <string_view>
#include <mutex>
#include <utility>
#include <iostream>
#include <format>
namespace dofs {
void log_error(std::string_view type,
std::string_view message,
std::optional<uint64_t> timestamp = std::nullopt) noexcept;
std::mutex &error_mutex() noexcept;
template <class T>
inline T&& show(std::string_view name, T&& value) noexcept {
std::lock_guard<std::mutex> lock(error_mutex());
std::cerr << name << '=' << value << '\n';
return std::forward<T>(value);
}
template <class T>
inline T&& eval_and_show(std::string_view expr, T&& value) noexcept {
std::lock_guard<std::mutex> lock(error_mutex());
std::cerr << expr << '=' << value << '\n';
return std::forward<T>(value);
}
#define DOFS_ERROR(TYPE, MSG) \
::dofs::log_error((TYPE), (MSG))
#define DOFS_ERROR_T(TYPE, MSG, TS) \
::dofs::log_error((TYPE), (MSG), (TS))
#define DOFS_ERROR_ST(TYPE, SRC, MSG, TS) \
::dofs::log_error((TYPE), (SRC), (MSG), (TS))
#define DOFS_SHOW(VAR) \
(::dofs::show(#VAR, (VAR)))
#define DOFS_EVAL(EXPR) \
(::dofs::eval_and_show(#EXPR, (EXPR)))
#define FORMAT(VAR) \
(std::format("{}={}", #VAR, VAR))
} // namespace dofs
#endif // CORE_ERROR_H

29
src/core/host.cc Normal file
View File

@@ -0,0 +1,29 @@
#include "core/host.h"
#include "core/error.h"
namespace dofs {
Host::Host(Simulator *const sim, NodeId id) noexcept
: Node(sim, id, NodeType::HOST), _nic(nullptr) {
// Hosts start in OK unless caller overrides later.
Node::set_status(NodeStatus::OK);
}
void Host::attach_nic(NetworkNic* nic) noexcept {
if (_nic && _nic != nic) {
log_error("ERROR", "Host::attach_nic called while NIC already attached");
}
_nic = nic;
}
void Host::detach_nic(NetworkNic* nic) noexcept {
if (_nic && _nic != nic) {
log_error("ERROR", "Host::detach_nic called with non-matching NIC pointer");
return;
}
_nic = nullptr;
}
} // namespace dofs

45
src/core/host.h Normal file
View File

@@ -0,0 +1,45 @@
#ifndef CORE_HOST_H
#define CORE_HOST_H
#include <cstdint>
#include "core/node.h"
#include "core/types.h"
#include "network/packet.h"
namespace dofs {
class NetworkNic;
class Host : public Node {
public:
Host(Simulator *const sim, NodeId id) noexcept;
virtual ~Host() = default;
// Access to the attached NIC (may be null if detached)
NetworkNic *nic() const noexcept {
return _nic;
}
// NIC lifecycle hooks (called by the NIC)
void attach_nic(NetworkNic* nic) noexcept;
void detach_nic(NetworkNic* nic) noexcept;
// Data-plane completion: a whole flow has arrived.
virtual void recv_flow(NodeId src,
FlowPriority priority,
Bytes flow_size) = 0;
// Control/telemetry interrupt: ACK/NACK/TRIM_BACK, etc.
virtual void recv_frame(const Packet& frame) = 0;
Host(const Host &) = delete;
Host &operator=(const Host &) = delete;
private:
NetworkNic *_nic{nullptr};
};
} // namespace dofs
#endif // CORE_HOST_H

67
src/core/logger.cc Normal file
View File

@@ -0,0 +1,67 @@
#include "logger.h"
#include <algorithm>
#include <iostream>
namespace dofs {
Logger::Logger(std::string_view path, bool append) noexcept
: _path(path) {
const std::ios::openmode mode = append ? std::ios::app : std::ios::trunc;
(void)open(mode);
}
bool Logger::open(std::ios::openmode mode) noexcept {
_stream.exceptions(std::ios::goodbit);
_stream.open(_path, std::ios::out | mode);
_is_open = _stream.is_open();
if (!_is_open) {
DOFS_ERROR("Logger", ("failed to open file: " + _path).c_str());
}
return _is_open;
}
Logger::~Logger() noexcept {
close();
}
void Logger::flush() noexcept {
std::lock_guard<std::mutex> _lk(_mutex);
if (_is_open) {
_stream.flush();
if (!_stream) {
DOFS_ERROR("Logger", ("flush failed for: " + _path).c_str());
}
}
}
void Logger::close() noexcept {
std::lock_guard<std::mutex> _lk(_mutex);
if (_is_open) {
_stream.flush();
_stream.close();
_is_open = false;
}
}
void Logger::write_line(std::string_view line) noexcept {
std::lock_guard<std::mutex> _lk(_mutex);
if (!_is_open) {
DOFS_ERROR("Logger", ("write on closed file: " + _path).c_str());
return;
}
_stream << line << '\n';
if (!_stream) {
DOFS_ERROR("Logger", ("write failed for: " + _path).c_str());
}
}
} // namespace dofs

53
src/core/logger.h Normal file
View File

@@ -0,0 +1,53 @@
#ifndef CORE_LOGGER_H
#define CORE_LOGGER_H
#include <cstdint>
#include <fstream>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <string_view>
#include <unordered_map>
#include <utility>
#include "core/error.h"
#include "core/types.h"
namespace dofs {
class Logger final {
public:
Logger(std::string_view path, bool append) noexcept;
~Logger() noexcept;
Logger(const Logger &) = delete;
Logger &operator=(const Logger &) = delete;
Logger(Logger &&) = delete;
Logger &operator=(Logger &&) = delete;
bool is_open() const noexcept {
return _is_open;
}
void write_line(std::string_view line) noexcept;
void flush() noexcept;
void close() noexcept;
std::string_view path() const noexcept {
return _path;
}
private:
bool open(std::ios::openmode mode) noexcept;
std::string _path;
std::ofstream _stream;
bool _is_open{false};
std::mutex _mutex;
};
} // namespace dofs
#endif // CORE_LOGGER_H

67
src/core/node.cc Normal file
View File

@@ -0,0 +1,67 @@
#include "core/node.h"
#include "core/simulator.h"
#include "core/error.h"
#include <optional>
namespace dofs {
Node::Node(Simulator *const sim, NodeId id, NodeType type) noexcept
: _sim(sim), _id(id), _status(NodeStatus::OK), _type(type) {}
// Accessors
NodeId Node::id() const noexcept {
return _id;
}
NodeStatus Node::status() const noexcept {
return _status;
}
NodeType Node::type() const noexcept {
return _type;
}
void Node::set_status(NodeStatus s) noexcept {
_status = s;
}
static inline bool schedule_status_ok_after(Simulator *const sim, Time delay_ns,
Node* self) {
if (!sim) {
log_error("ERROR", "Node: simulator instance not found for boot/reboot");
return false;
}
sim->schedule_after(delay_ns, [self]() {
self->set_status(NodeStatus::OK);
});
return true;
}
void Node::boot(Time boottime_ns) {
if (_status != NodeStatus::DOWN) {
log_error("ERROR", "boot() ignored: node not in DOWN state");
return;
}
_status = NodeStatus::BOOTING;
if (!schedule_status_ok_after(_sim, boottime_ns, this)) {
// Intentionally left BOOTING; caller can handle recovery/logs.
}
}
void Node::reboot(Time boottime_ns) {
if (_status != NodeStatus::OK && _status != NodeStatus::THROTTLING) {
log_error("ERROR", "reboot() ignored: node not in OK/THROTTLING state");
return;
}
_status = NodeStatus::BOOTING;
if (!schedule_status_ok_after(_sim, boottime_ns, this)) {
// Intentionally left BOOTING; caller can handle recovery/logs.
}
}
} // namespace dofs

39
src/core/node.h Normal file
View File

@@ -0,0 +1,39 @@
#ifndef CORE_NODE_H
#define CORE_NODE_H
#include <cstdint>
#include "core/simulator.h"
#include "core/time.h"
#include "core/types.h"
namespace dofs {
class Node {
public:
Node(Simulator *const sim, NodeId id, NodeType type) noexcept;
virtual ~Node() = default;
NodeId id() const noexcept;
NodeStatus status() const noexcept;
NodeType type() const noexcept;
void set_status(NodeStatus s) noexcept;
void boot(Time boottime_ns);
void reboot(Time boottime_ns);
Node(const Node &) = delete;
Node &operator=(const Node &) = delete;
protected:
Simulator *const _sim;
NodeId _id;
NodeStatus _status;
NodeType _type;
};
} // namespace dofs
#endif // CORE_NODE_H

131
src/core/rng.h Normal file
View File

@@ -0,0 +1,131 @@
#ifndef CORE_RNG_H
#define CORE_RNG_H
#include <cstdint>
#include <random>
#include <vector>
#include <utility>
#include <initializer_list>
#include <type_traits>
#include <limits>
namespace dofs {
class Rng final {
public:
using engine_type = std::mt19937_64;
using seed_type = engine_type::result_type;
explicit Rng(seed_type seed = default_seed()) noexcept
: _eng(seed) {}
void seed(seed_type s) noexcept {
_eng.seed(s);
}
// Real in [0, 1)
double uniform01() {
return std::uniform_real_distribution<double> {}(_eng);
}
// Integer in [lo, hi) — lo inclusive, hi exclusive
template <typename Int,
typename = std::enable_if_t<std::is_integral<Int>::value>>
Int uniform_range(Int lo_inclusive, Int hi_exclusive) {
return std::uniform_int_distribution<Int>(lo_inclusive, hi_exclusive - 1)(_eng);
}
// Integer in [0, hi) — convenience overload
template <typename Int,
typename = std::enable_if_t<std::is_integral<Int>::value>>
Int uniform_range(Int hi_exclusive) {
return uniform_range<Int>(0, hi_exclusive);
}
double uniform_range(double lo_inclusive, double hi_exclusive) {
return lo_inclusive + (hi_exclusive - lo_inclusive) * uniform01();
}
std::uint64_t poisson(double lambda) {
return static_cast<std::uint64_t>(std::poisson_distribution<std::uint64_t>
(lambda)(_eng));
}
template <typename T>
T choose_weighted(const std::vector<std::pair<double, T>>& items) {
return choose_weighted_impl(items.begin(), items.end());
}
template <typename T>
T choose_weighted(std::initializer_list<std::pair<double, T>> items) {
return choose_weighted_impl(items.begin(), items.end());
}
private:
engine_type _eng;
static constexpr seed_type default_seed() noexcept {
return 0xC0FFEEULL ^ 0x9E3779B97F4A7C15ULL;
}
template <typename Iter>
auto choose_weighted_impl(Iter first,
Iter last) -> typename std::iterator_traits<Iter>::value_type::second_type {
using Pair = typename std::iterator_traits<Iter>::value_type;
using T = typename Pair::second_type;
double total = 0.0;
for (auto it = first; it != last; ++it) {
const double w = it->first;
if (w > 0.0 && std::isfinite(w))
total += w;
}
if (!(total > 0.0)) {
std::size_t n = 0;
for (auto it = first; it != last; ++it)
++n;
if (n == 0)
return T{};
std::size_t idx = static_cast<std::size_t>(uniform_range<std::uint64_t>(0,
static_cast<std::uint64_t>(n)));
auto it = first;
while (idx--)
++it;
return it->second;
}
double u = uniform01() * total;
for (auto it = first; it != last; ++it) {
const double w = (it->first > 0.0 &&
std::isfinite(it->first)) ? it->first : 0.0;
if (u < w)
return it->second;
u -= w;
}
auto it = last;
do {
--it;
} while (it->first <= 0.0 || !std::isfinite(it->first));
return it->second;
}
};
} // namespace dofs
#endif // CORE_RNG_H

157
src/core/simulator.cc Normal file
View File

@@ -0,0 +1,157 @@
#include "core/simulator.h"
#include <memory>
#include "core/rng.h"
#include "network/link.h"
namespace dofs {
bool Simulator::Cmp::operator()(const Item& a, const Item& b) const noexcept {
if (a.when != b.when)
return a.when > b.when;
return a.id > b.id;
}
Time Simulator::now() const noexcept {
return _now;
}
bool Simulator::cancel(EventId id) {
return _cancelled.insert(id).second;
}
bool Simulator::run_next() {
while (!_event_pq.empty()) {
Item it = _event_pq.top();
_event_pq.pop();
if (_cancelled.erase(it.id) > 0)
continue;
_now = it.when;
it.fn();
return true;
}
return false;
}
void Simulator::run_until(Time end_time) {
while (!_event_pq.empty()) {
const Item& top = _event_pq.top();
if (end_time < top.when)
break;
run_next();
}
}
std::vector<std::unique_ptr<Simulator>>& Simulator::registry_() {
static std::vector<std::unique_ptr<Simulator>> reg;
return reg;
}
std::pair<InstanceId, Simulator *> Simulator::create_simulator(InstanceId id) {
if (id == INVALID_INSTANCE_ID) {
return {INVALID_INSTANCE_ID, nullptr};
}
auto& reg = registry_();
if (static_cast<size_t>(id) >= reg.size()) {
reg.resize(static_cast<size_t>(id) + 1);
}
if (!reg[static_cast<size_t>(id)]) {
auto sim = std::make_unique<Simulator>();
Simulator* ptr = sim.get();
reg[static_cast<size_t>(id)] = std::move(sim);
return {id, ptr};
}
return {id, reg[static_cast<size_t>(id)].get()};
}
Simulator * Simulator::get_simulator(InstanceId id) noexcept {
if (id == INVALID_INSTANCE_ID)
return nullptr;
auto& reg = registry_();
const size_t idx = static_cast<size_t>(id);
if (idx >= reg.size())
return nullptr;
return reg[idx].get();
}
Rng * Simulator::create_rng(std::uint64_t seed) {
if (_rng)
return nullptr;
_rng = std::make_unique<Rng>(seed);
return _rng.get();
}
Rng * Simulator::get_rng() noexcept {
return _rng.get();
}
Rng const * Simulator::get_rng() const noexcept {
return _rng.get();
}
std::pair<LinkId, Link *> Simulator::create_link(
NetworkNode* a, PortId a_port,
NetworkNode* b, PortId b_port,
Time latency,
double bandwidth_gbps) {
if (!a || !b)
return {INVALID_LINK_ID, nullptr};
if (bandwidth_gbps <= 0.0)
return {INVALID_LINK_ID, nullptr};
if (latency < Time{0})
return {INVALID_LINK_ID, nullptr};
const LinkId id = static_cast<LinkId>(_links.size());
auto up =
std::unique_ptr<Link>(new Link(this, id,
a, a_port, b, b_port,
latency,
bandwidth_gbps));
Link* raw = up.get();
_links.push_back(std::move(up));
return {id, raw};
}
Link * Simulator::get_link(LinkId id) noexcept {
if (id == INVALID_LINK_ID)
return nullptr;
const size_t idx = static_cast<size_t>(id);
if (idx >= _links.size())
return nullptr;
return _links[idx].get();
}
Link const * Simulator::get_link(LinkId id) const noexcept {
if (id == INVALID_LINK_ID)
return nullptr;
const size_t idx = static_cast<size_t>(id);
if (idx >= _links.size())
return nullptr;
return _links[idx].get();
}
} // namespace dofs

120
src/core/simulator.h Normal file
View File

@@ -0,0 +1,120 @@
#ifndef CORE_SIMULATOR_H
#define CORE_SIMULATOR_H
// TODO: implement concrete node methods for different nodes, store them all uniformly in vector<u_ptr<Node>> and return a get function
#include <cstdint>
#include <functional>
#include <queue>
#include <vector>
#include <unordered_set>
#include <utility>
#include <tuple>
#include <type_traits>
#include <limits>
#include <memory>
#include "core/time.h"
#include "core/types.h"
namespace dofs {
class Rng;
class Logger;
class Node;
class NetworkNode;
class Link;
constexpr EventId NULL_EVENT = 0;
constexpr InstanceId INVALID_INSTANCE_ID =
std::numeric_limits<InstanceId>::max();
constexpr LinkId INVALID_LINK_ID =
std::numeric_limits<LinkId>::max();
class Simulator final {
private:
struct Item {
Time when;
EventId id;
std::function<void()> fn;
};
struct Cmp {
bool operator()(const Item& a, const Item& b) const noexcept;
};
std::priority_queue<Item, std::vector<Item>, Cmp> _event_pq;
std::unordered_set<EventId> _cancelled;
Time _now{};
EventId _next_id{1};
std::unique_ptr<Rng> _rng;
std::vector<std::unique_ptr<Node>> _nodes;
std::vector<std::unique_ptr<Link>> _links;
public:
Simulator() = default;
static std::pair<InstanceId, Simulator *> create_simulator(InstanceId id);
static Simulator *get_simulator(InstanceId id) noexcept;
Time now() const noexcept;
template <class F, class... Args>
EventId schedule_at(Time abs_time, F&& f, Args&&... args) {
if (abs_time < _now)
return NULL_EVENT;
const EventId eid = _next_id++;
Item it{
abs_time,
eid,
make_callable(std::forward<F>(f), std::forward<Args>(args)...)
};
_event_pq.push(std::move(it));
return eid;
}
template <class F, class... Args>
EventId schedule_after(Time delay, F&& f, Args&&... args) {
return schedule_at(_now + delay, std::forward<F>(f),
std::forward<Args>(args)...);
}
bool cancel(EventId id);
bool run_next();
void run_until(Time end_time);
// ---------- Object management ----------
Rng* create_rng(std::uint64_t seed);
Rng* get_rng() noexcept;
Rng const* get_rng() const noexcept;
std::pair<LinkId, Link *> create_link(NetworkNode* a, PortId a_port,
NetworkNode* b, PortId b_port,
Time latency,
double bandwidth_gbps);
Link* get_link(LinkId id) noexcept;
Link const* get_link(LinkId id) const noexcept;
private:
template <class F, class... Args>
static auto make_callable(F&& f, Args&&... args) {
using Fn = std::decay_t<F>;
using Tup = std::tuple<std::decay_t<Args>...>;
return [fn = Fn(std::forward<F>(f)),
tup = Tup(std::forward<Args>(args)...)]() mutable {
std::apply(fn, std::move(tup));
};
}
static std::vector<std::unique_ptr<Simulator>>& registry_();
};
} // namespace dofs
#endif // CORE_SIMULATOR_H

129
src/core/time.h Normal file
View File

@@ -0,0 +1,129 @@
#ifndef CORE_TIME_H
#define CORE_TIME_H
#include <cstdint>
#include <optional>
namespace dofs {
class Time final {
public:
using rep = uint64_t;
// ----- Constructors -----
constexpr Time() : _nsec(0) {}
explicit constexpr Time(rep ns) : _nsec(ns) {}
// ----- Factories -----
static constexpr Time from_ns(rep ns) noexcept {
return Time(ns);
}
static constexpr Time from_us(rep us) noexcept {
return Time(us * 1000ULL);
}
static constexpr Time from_ms(rep ms) noexcept {
return Time(ms * 1000ULL * 1000ULL);
}
static constexpr Time from_s (rep s ) noexcept {
return Time(s * 1000ULL * 1000ULL * 1000ULL);
}
// ----- Accessors -----
constexpr rep ns() const noexcept {
return _nsec;
}
constexpr rep count() const noexcept {
return _nsec;
}
// ----- Converters (plain uint64_t) -----
static constexpr rep us_to_ns(rep us) noexcept {
return us * 1000ULL;
}
static constexpr rep ms_to_ns(rep ms) noexcept {
return ms * 1000ULL * 1000ULL;
}
// ----- Comparisons -----
friend constexpr bool operator==(Time a, Time b) noexcept {
return a._nsec == b._nsec;
}
friend constexpr bool operator!=(Time a, Time b) noexcept {
return !(a == b);
}
friend constexpr bool operator< (Time a, Time b) noexcept {
return a._nsec < b._nsec;
}
friend constexpr bool operator> (Time a, Time b) noexcept {
return b < a;
}
friend constexpr bool operator<=(Time a, Time b) noexcept {
return !(b < a);
}
friend constexpr bool operator>=(Time a, Time b) noexcept {
return !(a < b);
}
// ----- Arithmetic -----
constexpr Time &operator+=(Time rhs) noexcept {
_nsec += rhs._nsec;
return *this;
}
constexpr Time &operator-=(Time rhs) noexcept {
if (_nsec < rhs._nsec)
_nsec = 0;
else
_nsec -= rhs._nsec;
return *this;
}
friend constexpr Time operator+(Time a, Time b) noexcept {
return Time(a._nsec + b._nsec);
}
friend constexpr Time operator*(Time a, Time b) noexcept {
return Time(a._nsec * b._nsec);
}
friend constexpr std::optional<Time> operator-(Time a, Time b) noexcept {
return safe_sub(a, b);
}
friend constexpr std::optional<Time> safe_sub(Time a, Time b) noexcept {
if (a._nsec < b._nsec) {
return std::nullopt;
}
return Time(a._nsec - b._nsec);
}
constexpr Time unsafe_sub(Time t) const noexcept {
return Time(this->_nsec - t._nsec);
}
private:
rep _nsec;
};
constexpr Time operator""_ns(unsigned long long v) noexcept {
return Time::from_ns(static_cast<Time::rep>(v));
}
constexpr Time operator""_us(unsigned long long v) noexcept {
return Time::from_us(static_cast<Time::rep>(v));
}
constexpr Time operator""_ms(unsigned long long v) noexcept {
return Time::from_ms(static_cast<Time::rep>(v));
}
constexpr Time operator""_s (unsigned long long v) noexcept {
return Time::from_s (static_cast<Time::rep>(v));
}
} // namespace dofs
#endif // CORE_TIME_H

56
src/core/timer.h Normal file
View File

@@ -0,0 +1,56 @@
#ifndef CORE_TIMER_H
#define CORE_TIMER_H
#include <chrono>
#include "core/time.h"
namespace dofs {
class Timer final {
public:
Timer() {
init();
}
void init() noexcept {
_start = clock::now();
}
Time start() const noexcept {
using namespace std;
auto tp = _start.time_since_epoch();
return Time::from_ns(
static_cast<Time::rep>(
chrono::duration_cast<chrono::nanoseconds>(tp).count()
));
}
Time now() const noexcept {
using namespace std;
auto tp = clock::now().time_since_epoch();
return Time::from_ns(
static_cast<Time::rep>(
chrono::duration_cast<chrono::nanoseconds>(tp).count()
));
}
Time elapsed() const noexcept {
using namespace std;
auto dur = clock::now() - _start;
return Time::from_ns(
static_cast<Time::rep>(
chrono::duration_cast<chrono::nanoseconds>(dur).count()
));
}
private:
using clock = std::chrono::steady_clock;
clock::time_point _start;
};
} // namespace dofs
#endif // CORE_TIMER_H

70
src/core/types.h Normal file
View File

@@ -0,0 +1,70 @@
#ifndef CORE_TYPES_H
#define CORE_TYPES_H
#include <cstdint>
namespace dofs {
using uint128_t = __uint128_t;
using PacketGroups = uint128_t;
using EventId = uint64_t;
using InstanceId = uint8_t;
using NodeId = uint16_t;
enum class NodeStatus : uint8_t {
OK,
THROTTLING,
DOWN,
BOOTING
};
enum class NodeType : uint8_t {
HOST,
SWITCH,
NIC
};
using PortId = uint16_t;
using IPv4Addr = uint32_t;
inline IPv4Addr ipv4(NodeId n, PortId p) noexcept {
return (static_cast<uint32_t>(n) << 16) | static_cast<uint32_t>(p);
}
using LinkId = uint32_t;
enum class LinkStatus : uint8_t { OK, DOWN, THROTTLING };
using Bytes = uint32_t;
using PacketSeq = uint16_t;
using FlowId = uint16_t;
enum class PacketProtocol : uint8_t {
DATA,
ACK,
NACK,
HEADER_TRIM,
HEADER_TRIM_BACK
};
enum class SwitchBufferType : uint8_t { SHARED = 0, DEDICATED = 1 };
enum class FlowPriority : uint8_t {
CTRL = 0,
MICE = 1,
ELEPHANT = 2,
AUTO
};
enum class CCType : uint8_t {
DCQCN,
NSCC
};
enum class LBType : uint8_t {
RANDOM_PACKET_SPRAYING
};
} // namespace dofs
#endif // CORE_TYPES_H

49
src/hosts/CMakeLists.txt Normal file
View File

@@ -0,0 +1,49 @@
# --- hosts core (no switch sources here) ---
add_library(dofs_hosts STATIC)
# Make both "hosts/…" and "core/…" resolvable from anywhere
target_include_directories(dofs_hosts
PUBLIC
$<BUILD_INTERFACE:${CMAKE_SOURCE_DIR}/src>
$<INSTALL_INTERFACE:include>
)
target_link_libraries(dofs_hosts PUBLIC dofs_core)
if(DOFS_GLOB_SOURCES)
file(GLOB HOSTS_CC CONFIGURE_DEPENDS "${CMAKE_CURRENT_SOURCE_DIR}/*.cc")
# Exclude switch/*.cc from this glob just in case
list(FILTER HOSTS_CC EXCLUDE REGEX ".*/switch/.*")
target_sources(dofs_hosts PRIVATE ${HOSTS_CC})
else()
target_sources(dofs_hosts
PUBLIC
nodes_dummy.h
PRIVATE
nodes_dummy.cc
)
endif()
add_library(dofs::hosts ALIAS dofs_hosts)
# --- Tooling stubs for headers in this folder only (not switch) ---
file(GLOB HOSTS_HEADERS CONFIGURE_DEPENDS "${CMAKE_CURRENT_SOURCE_DIR}/*.h")
set(HOSTS_STUBS)
foreach(h ${HOSTS_HEADERS})
get_filename_component(_base "${h}" NAME_WE)
set(_stub "${CMAKE_CURRENT_BINARY_DIR}/${_base}.cpp")
file(WRITE "${_stub}" "// tooling stub for ${_base}.h\n#include \"hosts/${_base}.h\"\n")
list(APPEND HOSTS_STUBS "${_stub}")
endforeach()
add_library(dofs_hosts_headers_tooling STATIC ${HOSTS_STUBS})
# inherit include dirs/usage reqs from dofs_hosts
target_link_libraries(dofs_hosts_headers_tooling PRIVATE dofs_hosts dofs_core)
# This ensures the same -I flags even if cmake-ide builds the stubs alone
target_include_directories(dofs_hosts_headers_tooling
PRIVATE
$<TARGET_PROPERTY:dofs_hosts,INTERFACE_INCLUDE_DIRECTORIES>
$<TARGET_PROPERTY:dofs_hosts,INCLUDE_DIRECTORIES>
)

6
src/hosts/nodes_dummy.cc Normal file
View File

@@ -0,0 +1,6 @@
#include "core/error.h"
#include "core/logger.h"
namespace dofs {
int _nodes_tooling_dummy = 0;
}

4
src/hosts/nodes_dummy.h Normal file
View File

@@ -0,0 +1,4 @@
#ifndef NODES_DUMMY
#define NODES_DUMMY
#endif

View File

@@ -0,0 +1,58 @@
# --- Library: dofs_network ---
add_library(dofs_network STATIC)
# Expose src/ so "network/..." and "core/..." resolve from anywhere
target_include_directories(dofs_network
PUBLIC
$<BUILD_INTERFACE:${CMAKE_SOURCE_DIR}/src>
$<INSTALL_INTERFACE:include>
)
target_link_libraries(dofs_network PUBLIC dofs_core)
# Sources
if(DOFS_GLOB_SOURCES)
file(GLOB NETWORK_CC CONFIGURE_DEPENDS "${CMAKE_CURRENT_SOURCE_DIR}/*.cc")
target_sources(dofs_network PRIVATE ${NETWORK_CC})
else()
target_sources(dofs_network
PUBLIC
packet.h
link.h
network_node.h
network_switch.h
network_nic.h
PRIVATE
packet.cc
link.cc
network_node.cc
network_switch.cc
network_nic.cc
)
endif()
# Alias
add_library(dofs::network ALIAS dofs_network)
# --- Tooling: per-header stubs for "network/*.h" ---
file(GLOB NETWORK_HEADERS CONFIGURE_DEPENDS "${CMAKE_CURRENT_SOURCE_DIR}/*.h")
set(NETWORK_STUBS)
foreach(h ${NETWORK_HEADERS})
get_filename_component(_base "${h}" NAME_WE)
set(_stub "${CMAKE_CURRENT_BINARY_DIR}/${_base}.cpp")
file(WRITE "${_stub}" "// tooling stub for ${_base}.h\n#include \"network/${_base}.h\"\n")
list(APPEND NETWORK_STUBS "${_stub}")
endforeach()
add_library(dofs_network_headers_tooling STATIC ${NETWORK_STUBS})
# Make stubs see the same include roots as the real target
target_include_directories(dofs_network_headers_tooling
PRIVATE
$<BUILD_INTERFACE:${CMAKE_SOURCE_DIR}/src>
)
target_link_libraries(dofs_network_headers_tooling PRIVATE dofs_network dofs_core)
# --- Subdir: switch ---
add_subdirectory(switch)
add_subdirectory(nic)

182
src/network/link.cc Normal file
View File

@@ -0,0 +1,182 @@
#include "network/link.h"
#include <unordered_map>
#include <memory>
#include <cmath>
namespace dofs {
// If gbps <= 0, return a very large sentinel duration (treated as "can't send").
Time Link::serialization_time(Bytes bytes, double gbps) noexcept {
if (gbps <= 0.0) {
return Time::from_ns(UINT64_C(0xFFFFFFFFFFFFFFFF));
}
const double ns = std::ceil(static_cast<double>(bytes) * 8.0 / gbps);
return Time::from_ns(static_cast<uint64_t>(ns));
}
Link::Link(Simulator *const sim,
LinkId id,
NetworkNode *a, PortId a_port,
NetworkNode *b, PortId b_port,
Time latency,
double bandwidth_gbps) noexcept
: _sim(sim),
_id(id),
_a(a), _b(b),
_a_id(a->id()),
_b_id(b->id()),
_a_port(a_port), _b_port(b_port),
_latency_init(latency),
_bandwidth_gbps_init(bandwidth_gbps),
_latency_cur(latency),
_bandwidth_gbps_cur(bandwidth_gbps),
_status(LinkStatus::OK),
_next_available_ab(Time(0)),
_next_available_ba(Time(0)) {}
// Map sender to a direction; return nullopt if not an endpoint.
std::optional<Link::Dir> Link::_dir_from(NodeId sender) const noexcept {
if (sender == _a_id)
return Dir::AtoB;
if (sender == _b_id)
return Dir::BtoA;
return std::nullopt;
}
Time Link::next_available(NodeId sender) const noexcept {
auto d = _dir_from(sender);
if (!d.has_value()) {
// Unknown sender; be defensive: report "now" so caller won't stall forever.
// The caller (switch) should treat this as a logic error.
return _sim ? _sim->now() : Time(0);
}
return (d.value() == Dir::AtoB) ? _next_available_ab : _next_available_ba;
}
std::optional<Link::Reservation> Link::reserve(Bytes bytes,
NodeId sender) noexcept {
if (_status == LinkStatus::DOWN) {
return std::nullopt;
}
if (!_sim) {
log_error("ERROR", "Link: simulator instance not found");
return std::nullopt;
}
auto d = _dir_from(sender);
if (!d.has_value()) {
log_error("ERROR", "Link::reserve called by non-endpoint node");
return std::nullopt;
}
Time& cursor = (d.value() == Dir::AtoB) ? _next_available_ab :
_next_available_ba;
const Time now = _sim->now();
const Time start = (cursor > now) ? cursor : now;
const Time ser = serialization_time(bytes, _bandwidth_gbps_cur);
const Time finish = start + ser;
cursor = finish;
return Reservation{start, finish};
}
// --- Legacy path: send + schedule internally via the link ---
// This keeps existing callers working while the switch migrates to reserve().
void Link::send_pkt(Packet &pkt, NodeId caller) {
if (_status == LinkStatus::DOWN) {
// Drop silently
return;
}
if (!_sim) {
log_error("ERROR", "Link: simulator instance not found");
return;
}
auto d = _dir_from(caller);
if (!d.has_value()) {
log_error("ERROR", "Link::send_pkt called by non-endpoint node");
return;
}
auto rsv = reserve(pkt.total_size(), caller);
if (!rsv.has_value()) {
// DOWN or invalid sender; nothing to do.
return;
}
const Time now = _sim->now();
const Time to_finish = (rsv->finish > now) ?
rsv->finish.unsafe_sub(now) : Time(0);
const Time total_delay = to_finish + _latency_cur;
NetworkNode* target_node = (d.value() == Dir::AtoB) ? _b : _a;
PortId target_ingress = (d.value() == Dir::AtoB) ? _b_port : _a_port;
_sim->schedule_after(total_delay,
[target_node, target_ingress, p = pkt]() mutable {
target_node->recv_pkt(p, target_ingress);
});
}
void Link::schedule_delivery_after(Packet &pkt, NodeId caller, Time after) {
if (_status == LinkStatus::DOWN || !_sim)
return;
auto d = _dir_from(caller);
if (!d.has_value()) {
log_error("ERROR", "Link::schedule_delivery_after called by non-endpoint node");
return;
}
NetworkNode* target_node = (d.value() == Dir::AtoB) ? _b : _a;
PortId target_ingress = (d.value() == Dir::AtoB) ? _b_port : _a_port;
_sim->schedule_after(after,
[target_node, target_ingress, p = pkt]() mutable {
target_node->recv_pkt(p, target_ingress);
});
}
void Link::set_status(LinkStatus s, Time new_latency,
double new_bandwidth_gbps) {
switch (s) {
case LinkStatus::OK:
_status = LinkStatus::OK;
_latency_cur = _latency_init;
_bandwidth_gbps_cur = _bandwidth_gbps_init;
return;
case LinkStatus::DOWN:
_status = LinkStatus::DOWN;
return;
case LinkStatus::THROTTLING:
if (new_latency < _latency_init ||
new_bandwidth_gbps > _bandwidth_gbps_init) {
log_error("ERROR",
"Link::set_status(THROTTLING) ignored: invalid latency/bandwidth");
return;
}
_status = LinkStatus::THROTTLING;
_latency_cur = new_latency;
_bandwidth_gbps_cur = new_bandwidth_gbps;
return;
}
}
} // namespace dofs

120
src/network/link.h Normal file
View File

@@ -0,0 +1,120 @@
#ifndef NETWORK_LINK_H
#define NETWORK_LINK_H
#include <cstdint>
#include <optional>
#include <functional>
#include "core/time.h"
#include "core/types.h"
#include "core/node.h"
#include "core/simulator.h"
#include "core/error.h"
#include "network/packet.h"
#include "network/network_node.h"
namespace dofs {
// Link represents a full-duplex point-to-point link between two NetworkNodes.
// It exposes a reservation-based timing API; the switch owns scheduling.
//
// API for switch-owned scheduling:
// - next_available(sender): when this direction can start sending next
// - serialization_time(bytes): how long to serialize 'bytes' at current bw
// - propagation_latency(): current one-way latency
// - reserve(bytes, sender): returns {start, finish} and advances cursor
// - send_pkt(pkt, caller): uses reserve() + schedules delivery itself.
class Link {
public:
struct Reservation {
Time start; // serialization start time on the link
Time finish; // serialization finish time on the link
};
Link(Simulator *const sim,
LinkId id,
NetworkNode *a, PortId a_port,
NetworkNode *b, PortId b_port,
Time latency,
double bandwidth_gbps) noexcept;
void send_pkt(Packet &pkt, NodeId caller);
void schedule_delivery_after(Packet &pkt, NodeId caller, Time after);
Time next_available(NodeId sender) const noexcept;
std::optional<Reservation> reserve(Bytes bytes, NodeId sender) noexcept;
Time serialization_time(Bytes bytes) const noexcept {
return serialization_time(bytes, _bandwidth_gbps_cur);
}
Time propagation_latency() const noexcept {
return _latency_cur;
}
LinkId id() const noexcept {
return _id;
}
LinkStatus status() const noexcept {
return _status;
}
double bandwidth_gbps() const noexcept {
return _bandwidth_gbps_cur;
}
// Endpoint accessors (A/B as constructed)
NodeId src_id() const noexcept {
return _a_id; // 'A' end
}
NodeId dst_id() const noexcept {
return _b_id; // 'B' end
}
PortId src_port() const noexcept {
return _a_port;
}
PortId dst_port() const noexcept {
return _b_port;
}
// Status changes:
// - set_status(OK): restore initial latency/bandwidth
// - set_status(DOWN): link becomes inactive (reservations/sends rejected)
// - set_status(THROTTLING, new_latency, new_bw_gbps): degrade
// (reject if new_latency < initial or new_bw_gbps > initial)
void set_status(LinkStatus s,
Time new_latency = Time(0),
double new_bandwidth_gbps = 0.0);
static Time serialization_time(Bytes bytes, double gbps) noexcept;
private:
enum class Dir : uint8_t { AtoB, BtoA };
std::optional<Dir> _dir_from(NodeId sender) const noexcept;
Simulator *const _sim;
LinkId _id;
NetworkNode *_a;
NetworkNode *_b;
NodeId _a_id;
NodeId _b_id;
PortId _a_port;
PortId _b_port;
Time _latency_init;
double _bandwidth_gbps_init;
Time _latency_cur;
double _bandwidth_gbps_cur;
LinkStatus _status;
Time _next_available_ab;
Time _next_available_ba;
};
} // namespace dofs
#endif // NETWORK_LINK_H

660
src/network/network_nic.cc Normal file
View File

@@ -0,0 +1,660 @@
#include "network/network_nic.h"
#include <algorithm>
#include <utility>
#include "core/host.h"
#include "core/error.h"
namespace dofs {
NetworkNic::NetworkNic(Simulator *const sim,
NodeId id,
uint16_t total_ports,
SwitchBuffer *const buf,
Time nic_latency,
Bytes mice_elephant_threshold,
PacketSeq ooo_threshold,
CCType cc_type,
LBType lb_type,
Bytes cc_init_cwnd,
Bytes cc_max_cwnd,
const NicSchedulingWeights& schedw) noexcept
: NetworkNode(sim, id, NodeType::NIC),
_buf(buf),
_port_cnt(total_ports),
_nic_latency_init(nic_latency),
_nic_latency_cur(nic_latency),
_mice_thresh(mice_elephant_threshold),
_ooo_thresh(ooo_threshold),
_cc_type(cc_type),
_lb_type(lb_type),
_schedw(schedw),
_ports(total_ports) {
Node::set_status(NodeStatus::OK);
_port_blacklisted.assign(_port_cnt, false);
if (!_buf) {
log_error("ERROR", "NetworkNic constructed with null SwitchBuffer");
}
switch (_cc_type) {
case CCType::DCQCN:
_cc = std::make_unique<DCQCN>(cc_init_cwnd, cc_max_cwnd);
break;
case CCType::NSCC:
_cc = std::make_unique<NSCC>(cc_init_cwnd, cc_max_cwnd);
break;
default:
_cc = std::make_unique<DCQCN>(cc_init_cwnd, cc_max_cwnd);
break;
}
switch (_lb_type) {
case LBType::RANDOM_PACKET_SPRAYING:
_lb = std::make_unique<LBRandomPacketSpraying>(&_rng);
break;
default:
_lb = std::make_unique<LBRandomPacketSpraying>(&_rng);
break;
}
for (auto& pq : _ports) {
pq.b_control = _schedw.control;
pq.b_retrans = _schedw.retrans;
pq.b_mice = _schedw.mice;
pq.b_elephant = _schedw.elephant;
}
}
void NetworkNic::attach_host(Host* host) noexcept {
_host = host;
if (_host)
_host->attach_nic(this);
}
void NetworkNic::detach_host(Host* host) noexcept {
if (_host && _host != host) {
log_error("ERROR", "NetworkNic::detach_host pointer mismatch");
return;
}
if (_host)
_host->detach_nic(this);
_host = nullptr;
}
void NetworkNic::set_status(NodeStatus s, Time new_latency) noexcept {
switch (s) {
case NodeStatus::OK:
Node::set_status(NodeStatus::OK);
_nic_latency_cur = _nic_latency_init;
return;
case NodeStatus::THROTTLING:
if (new_latency < _nic_latency_init) {
log_error("ERROR", "NetworkNic::set_status THROTTLING new latency < initial");
return;
}
Node::set_status(NodeStatus::THROTTLING);
_nic_latency_cur = new_latency;
return;
case NodeStatus::DOWN:
case NodeStatus::BOOTING:
log_error("ERROR", "NetworkNic::set_status ignored for DOWN/BOOTING");
return;
}
}
bool NetworkNic::is_elephant(Bytes sz) const noexcept {
return sz >= _mice_thresh;
}
std::uint64_t NetworkNic::txkey(FlowId f, PacketSeq s) noexcept {
return (static_cast<std::uint64_t>(static_cast<std::uint32_t>(f)) << 16) |
static_cast<std::uint64_t>(static_cast<std::uint16_t>(s));
}
void NetworkNic::record_tx_timestamp(const Packet& pkt) noexcept {
if (!_sim)
return;
const auto key = txkey(pkt.flow_id(), pkt.seq());
_tx_sent_at[key] = _sim->now();
}
bool NetworkNic::lookup_rtt_and_erase(const Packet& ack_like, Time now,
Time& out_rtt) noexcept {
const auto key = txkey(ack_like.flow_id(), ack_like.seq());
auto it = _tx_sent_at.find(key);
if (it == _tx_sent_at.end())
return false;
out_rtt = now.unsafe_sub(it->second);
_tx_sent_at.erase(it);
return true;
}
void NetworkNic::recv_pkt(Packet &pkt_in, PortId ingress) {
if (status() == NodeStatus::DOWN || status() == NodeStatus::BOOTING) {
return;
}
Packet pkt = pkt_in;
const Time now = _sim ? _sim->now() : Time(0);
switch (pkt.protocol()) {
case PacketProtocol::DATA: {
_telemetry.rx_data++;
// Auto-ACK after NIC latency
schedule_ack(pkt, ingress);
// For OOO detection & NACK: basic placeholder based on seq gap.
// (A real impl would maintain RxFlowState with expected_seq.)
// Here we keep it minimal and do not NACK unless explicitly enabled later.
// Flow completion: if EOF, notify host with total size from payload (accumulation would live in RxFlowState).
if (pkt.is_eof() && _host) {
_host->recv_flow(pkt.src_node(), pkt.priority(), pkt.payload_size());
}
break;
}
case PacketProtocol::ACK: {
_telemetry.rx_acks++;
// RTT measurement and CC update
Time rtt;
if (lookup_rtt_and_erase(pkt, now, rtt)) {
if (_cc) {
_cc->update(pkt, rtt);
_telemetry.cc_updates++;
}
} else {
if (_cc) {
_cc->update(pkt, Time(0));
_telemetry.cc_updates++;
}
}
if (_lb) {
_lb->update(pkt);
_telemetry.lb_updates++;
}
if (_host)
_host->recv_frame(pkt);
break;
}
case PacketProtocol::NACK: {
_telemetry.rx_nacks++;
if (_cc) {
_cc->update(pkt, Time(0));
_telemetry.cc_updates++;
}
if (_lb) {
_lb->update(pkt);
_telemetry.lb_updates++;
}
if (_host)
_host->recv_frame(pkt);
// Retransmission path would re-enqueue missing seq as CONTROL/RETRANS.
break;
}
case PacketProtocol::HEADER_TRIM: {
_telemetry.rx_trims++;
schedule_trim_back_response(pkt, ingress);
if (_cc) {
_cc->update(pkt, Time(0));
_telemetry.cc_updates++;
}
if (_lb) {
_lb->update(pkt);
_telemetry.lb_updates++;
}
if (_host)
_host->recv_frame(pkt);
break;
}
case PacketProtocol::HEADER_TRIM_BACK: {
_telemetry.rx_trims++;
if (_cc) {
_cc->update(pkt, Time(0));
_telemetry.cc_updates++;
}
if (_lb) {
_lb->update(pkt);
_telemetry.lb_updates++;
}
if (_host)
_host->recv_frame(pkt);
break;
}
default:
// Unknown proto: ignore
break;
}
}
void NetworkNic::send_flow(NodeId dst, Bytes size, FlowPriority desired) {
const bool ele = is_elephant(size);
const FlowPriority prio = (desired == FlowPriority::AUTO)
? (ele ? FlowPriority::ELEPHANT : FlowPriority::MICE)
: desired;
start_tx_unicast(dst, size, prio);
}
void NetworkNic::send_flow(PacketGroups gmask, Bytes size,
FlowPriority desired) {
const bool ele = is_elephant(size);
const FlowPriority prio = (desired == FlowPriority::AUTO)
? (ele ? FlowPriority::ELEPHANT : FlowPriority::MICE)
: desired;
start_tx_multicast(gmask, size, prio);
}
void NetworkNic::set_port_blacklisted(PortId port, bool blacklisted) noexcept {
if (port >= _port_cnt) {
log_error("ERROR", "NetworkNic::set_port_blacklisted invalid port");
return;
}
if (port >= _port_blacklisted.size())
_port_blacklisted.resize(_port_cnt, false);
_port_blacklisted[port] = blacklisted;
}
bool NetworkNic::is_port_blacklisted(PortId port) const noexcept {
if (port >= _port_cnt || port >= _port_blacklisted.size())
return false;
return _port_blacklisted[port];
}
PortId NetworkNic::pick_src_port_for_flow(NodeId dst) noexcept {
(void)dst;
if (_port_cnt == 0)
return static_cast<PortId>(0);
// Simple RR scheme
const uint16_t start = _rr_cursor % _port_cnt;
for (uint16_t i = 0; i < _port_cnt; ++i) {
const PortId cand = static_cast<PortId>((start + i) % _port_cnt);
if (cand < _port_blacklisted.size() && !_port_blacklisted[cand]) {
_rr_cursor = static_cast<PortId>((cand + 1) % _port_cnt);
return cand;
}
}
log_error("WARN",
"NetworkNic: all ports blacklisted; choosing RR port despite blacklist");
const PortId fallback = static_cast<PortId>(start);
_rr_cursor = static_cast<PortId>((fallback + 1) % _port_cnt);
return fallback;
}
Packet NetworkNic::make_data_packet(NodeId dst, PortId out_port,
FlowPriority prio,
FlowId fid, PacketSeq seq, Bytes payload) noexcept {
Packet p(this->id(), out_port,
dst, static_cast<PortId>(0),
PacketProtocol::DATA,
prio, seq, fid, 0, 0, payload);
if (_lb) {
const uint16_t e = _lb->get_entropy(p);
p.set_entropy(e);
}
return p;
}
Packet NetworkNic::make_ack_packet(const Packet& rx_data,
PortId ingress) noexcept {
Packet ack(rx_data.dst_node(), rx_data.dst_port(),
rx_data.src_node(), rx_data.src_port(),
PacketProtocol::ACK,
FlowPriority::CTRL,
rx_data.seq(), rx_data.flow_id(),
rx_data.entropy(), 0, Bytes(0));
(void)ingress;
return ack;
}
Packet NetworkNic::make_nack_packet(NodeId peer, FlowId flow, PacketSeq miss,
FlowPriority prio,
PortId ingress) noexcept {
Packet nack(this->id(), ingress,
peer, static_cast<PortId>(0),
PacketProtocol::NACK,
prio, miss, flow, 0, 0, Bytes(0));
return nack;
}
Packet NetworkNic::make_trim_back_response(const Packet& trim,
PortId ingress) noexcept {
Packet tb(trim.dst_node(), trim.dst_port(),
trim.src_node(), trim.src_port(),
PacketProtocol::HEADER_TRIM_BACK,
FlowPriority::CTRL,
trim.seq(), trim.flow_id(),
trim.entropy(), 0, Bytes(0));
(void)ingress;
return tb;
}
void NetworkNic::schedule_ack(const Packet& rx_data, PortId ingress) noexcept {
if (!_sim)
return;
const Packet ack = make_ack_packet(rx_data, ingress);
const PortId egress = static_cast<PortId> (ingress);
_sim->schedule_after(_nic_latency_cur,
[this, ack, egress]() mutable {
enqueue_packet(egress, QClass::CONTROL, ack);
});
_telemetry.tx_acks++;
}
void NetworkNic::schedule_nack(NodeId peer, FlowId flow, PacketSeq missing_seq,
FlowPriority prio, PortId ingress) noexcept {
if (!_sim)
return;
const Packet nack = make_nack_packet(peer, flow, missing_seq, prio, ingress);
const PortId egress = ingress;
_sim->schedule_after(_nic_latency_cur,
[this, nack, egress]() mutable {
enqueue_packet(egress, QClass::CONTROL, nack);
});
_telemetry.tx_nacks++;
}
void NetworkNic::schedule_trim_back_response(const Packet& trim,
PortId ingress) noexcept {
if (!_sim)
return;
const Packet tb = make_trim_back_response(trim, ingress);
const PortId egress = ingress;
_sim->schedule_after(_nic_latency_cur,
[this, tb, egress]() mutable {
enqueue_packet(egress, QClass::CONTROL, tb);
});
_telemetry.tx_trims++;
}
void NetworkNic::start_tx_unicast(NodeId dst, Bytes size, FlowPriority prio) {
// Minimal segmentation loop: 1 flow_id per call; simple seq starting at 0
static FlowId next_flow = 1;
const FlowId fid = next_flow++;
PortId port = pick_src_port_for_flow(dst);
Bytes remaining = size;
PacketSeq seq = 0;
while (remaining > Bytes(0)) {
const Bytes payload = std::min(remaining, DEFAULT_MSS_BYTES);
Packet pkt = make_data_packet(dst, port, prio, fid, seq, payload);
if (remaining <= DEFAULT_MSS_BYTES) {
pkt.set_eof(true);
}
enqueue_packet(port, (prio == FlowPriority::ELEPHANT) ? QClass::ELEPHANT :
QClass::MICE, pkt);
_telemetry.tx_data++;
remaining -= payload;
seq = static_cast<PacketSeq>(static_cast<std::uint16_t>(seq + 1));
}
}
void NetworkNic::start_tx_multicast(PacketGroups gmask, Bytes size,
FlowPriority prio) {
static FlowId next_flow = 1;
const FlowId fid = next_flow++;
PortId port = static_cast<PortId>(0);
Bytes remaining = size;
PacketSeq seq = 0;
while (remaining > Bytes(0)) {
const Bytes payload = std::min(remaining, DEFAULT_MSS_BYTES);
Packet pkt(this->id(), port, NodeId(0), PortId(0),
PacketProtocol::DATA, prio, seq, fid, 0, 0, payload);
pkt.set_groups(gmask);
if (_lb) {
const uint16_t e = _lb->get_entropy(pkt);
pkt.set_entropy(e);
}
if (remaining <= DEFAULT_MSS_BYTES) {
pkt.set_eof(true);
}
enqueue_packet(port, (prio == FlowPriority::ELEPHANT) ? QClass::ELEPHANT :
QClass::MICE, pkt);
_telemetry.tx_data++;
remaining -= payload;
seq = static_cast<PacketSeq>(static_cast<std::uint16_t>(seq + 1));
}
}
void NetworkNic::enqueue_packet(PortId port, QClass cls, Packet pkt) noexcept {
if (port >= _port_cnt) {
log_error("ERROR", "NetworkNic::enqueue_packet invalid port");
return;
}
auto& pq = _ports[port];
switch (cls) {
case QClass::CONTROL:
pq.control.push_back(std::move(pkt));
break;
case QClass::RETRANS:
pq.retrans.push_back(std::move(pkt));
break;
case QClass::MICE:
pq.mice.push_back(std::move(pkt));
break;
case QClass::ELEPHANT:
pq.elephant.push_back(std::move(pkt));
break;
}
schedule_port_if_needed(port);
}
void NetworkNic::schedule_port_if_needed(PortId port) noexcept {
auto& pq = _ports[port];
if (pq.scheduled || !_sim)
return;
pq.scheduled = true;
_sim->schedule_after(Time(0),
std::mem_fn(&NetworkNic::port_drain_task),
this, port);
}
bool NetworkNic::pick_next_qclass(const PortQueues& pq,
QClass& out_cls) const noexcept {
if (!pq.control.empty() && pq.b_control > Bytes(0)) {
out_cls = QClass::CONTROL;
return true;
}
if (!pq.retrans.empty() && pq.b_retrans > Bytes(0)) {
out_cls = QClass::RETRANS;
return true;
}
if (!pq.mice.empty() && pq.b_mice > Bytes(0)) {
out_cls = QClass::MICE;
return true;
}
if (!pq.elephant.empty() && pq.b_elephant > Bytes(0)) {
out_cls = QClass::ELEPHANT;
return true;
}
return false;
}
bool NetworkNic::try_send_one(PortId port, QClass cls) noexcept {
auto& pq = _ports[port];
std::deque<Packet> *q = nullptr;
Bytes* budget = nullptr;
switch (cls) {
case QClass::CONTROL:
q = &pq.control;
budget = &pq.b_control;
break;
case QClass::RETRANS:
q = &pq.retrans;
budget = &pq.b_retrans;
break;
case QClass::MICE:
q = &pq.mice;
budget = &pq.b_mice;
break;
case QClass::ELEPHANT:
q = &pq.elephant;
budget = &pq.b_elephant;
break;
}
if (!q || q->empty() || *budget <= Bytes(0))
return false;
Packet pkt = std::move(q->front());
q->pop_front();
const bool is_ctrl = (cls == QClass::CONTROL || cls == QClass::RETRANS);
if (!is_ctrl && _cc) {
const Bytes next_bytes = pkt.total_size();
Bytes& out = _tx_outstanding[pkt.flow_id()];
if (!_cc->is_allowed_to_send(out, next_bytes)) {
q->push_front(std::move(pkt));
return false;
}
out += next_bytes;
}
if (_sim) {
_sim->schedule_after(_nic_latency_cur,
[this, pkt, port]() mutable {
if (pkt.protocol() == PacketProtocol::DATA) {
record_tx_timestamp(pkt);
}
(void)_buf->enqueue_packet(pkt, port, pkt.priority());
});
}
const Bytes cost = pkt.total_size();
if (cost >= *budget)
*budget = Bytes(0);
else *budget = *budget - cost;
return true;
}
void NetworkNic::port_drain_task(PortId port) noexcept {
auto& pq = _ports[port];
// Attempt to send while there is any budget and any queue has packets.
// Refill budgets when all depleted or all queues empty.
for (;;) {
QClass cls;
if (pick_next_qclass(pq, cls)) {
if (!try_send_one(port, cls)) {
// Could not send, stop and retry later.
break;
}
continue;
}
const bool any_nonempty = !(pq.control.empty() && pq.retrans.empty() &&
pq.mice.empty() && pq.elephant.empty());
const bool any_budget = (pq.b_control > Bytes(0) ||
pq.b_retrans > Bytes(0) ||
pq.b_mice > Bytes(0) ||
pq.b_elephant > Bytes(0));
if (!any_budget && any_nonempty) {
pq.b_control = _schedw.control;
pq.b_retrans = _schedw.retrans;
pq.b_mice = _schedw.mice;
pq.b_elephant = _schedw.elephant;
continue;
}
break;
}
pq.scheduled = false;
if (!(pq.control.empty() && pq.retrans.empty() &&
pq.mice.empty() && pq.elephant.empty())) {
schedule_port_if_needed(port);
}
}
} // namespace dofs

154
src/network/network_nic.h Normal file
View File

@@ -0,0 +1,154 @@
#ifndef NETWORK_NETWORK_NIC_H
#define NETWORK_NETWORK_NIC_H
#include <cstdint>
#include <deque>
#include <unordered_map>
#include <vector>
#include "core/time.h"
#include "core/types.h"
#include "core/rng.h"
#include "network/network_node.h"
#include "network/packet.h"
#include "network/switch/switch_buffer.h"
#include "network/nic/nic_telemetry.h"
#include "network/nic/congestion_control.h"
#include "network/nic/load_balance.h"
namespace dofs {
class Host;
struct NicSchedulingWeights {
// Bytes of budget per round (refilled when all budgets deplete).
Bytes control { 64 * 1024 };
Bytes retrans { 256 * 1024 };
Bytes mice { 1024 * 1024 };
Bytes elephant { 1024 * 1024 };
};
class NetworkNic final : public NetworkNode {
public:
NetworkNic(Simulator *const sim,
NodeId id,
uint16_t total_ports,
SwitchBuffer *const buf,
Time nic_latency,
Bytes mice_elephant_threshold,
PacketSeq ooo_threshold,
CCType cc_type,
LBType lb_type,
Bytes cc_init_cwnd,
Bytes cc_max_cwnd,
const NicSchedulingWeights& schedw) noexcept;
virtual void recv_pkt(Packet &pkt, PortId ingress) override;
void attach_host(Host* host) noexcept;
void detach_host(Host* host) noexcept;
// Host API (TX):
void send_flow(NodeId dst, Bytes size,
FlowPriority desired = FlowPriority::AUTO);
void send_flow(PacketGroups group_mask, Bytes size,
FlowPriority desired = FlowPriority::AUTO);
void set_status(NodeStatus s, Time new_latency = Time(0)) noexcept;
const NicTelemetry &telemetry() const noexcept {
return _telemetry;
}
void set_port_blacklisted(PortId port, bool blacklisted) noexcept;
bool is_port_blacklisted(PortId port) const noexcept;
private:
enum class QClass : uint8_t { CONTROL = 0, RETRANS = 1, MICE = 2, ELEPHANT = 3 };
struct PortQueues {
std::deque<Packet> control;
std::deque<Packet> retrans;
std::deque<Packet> mice;
std::deque<Packet> elephant;
Bytes b_control {0};
Bytes b_retrans {0};
Bytes b_mice {0};
Bytes b_elephant {0};
bool scheduled {false};
};
void schedule_port_if_needed(PortId port) noexcept;
void port_drain_task(PortId port) noexcept;
bool pick_next_qclass(const PortQueues& pq, QClass& out_cls) const noexcept;
bool try_send_one(PortId port, QClass cls) noexcept;
void enqueue_packet(PortId port, QClass cls, Packet pkt) noexcept;
void schedule_ack(const Packet& rx_data, PortId ingress) noexcept;
void schedule_nack(NodeId peer, FlowId flow, PacketSeq missing_seq,
FlowPriority prio, PortId ingress) noexcept;
void schedule_trim_back_response(const Packet& trim, PortId ingress) noexcept;
bool is_elephant(Bytes sz) const noexcept;
static inline std::uint64_t txkey(FlowId f, PacketSeq s) noexcept;
void record_tx_timestamp(const Packet& pkt) noexcept;
bool lookup_rtt_and_erase(const Packet& ack_like, Time now,
Time& out_rtt) noexcept;
PortId pick_src_port_for_flow(NodeId dst) noexcept;
void start_tx_unicast(NodeId dst, Bytes size, FlowPriority prio);
void start_tx_multicast(PacketGroups gmask, Bytes size, FlowPriority prio);
Packet make_data_packet(NodeId dst, PortId out_port, FlowPriority prio,
FlowId fid, PacketSeq seq, Bytes payload) noexcept;
Packet make_ack_packet(const Packet& rx_data, PortId ingress) noexcept;
Packet make_nack_packet(NodeId peer, FlowId flow, PacketSeq miss,
FlowPriority prio,
PortId ingress) noexcept;
Packet make_trim_back_response(const Packet& trim, PortId ingress) noexcept;
private:
Host *_host {nullptr};
SwitchBuffer *_buf {nullptr};
const uint16_t _port_cnt;
Time _nic_latency_init;
Time _nic_latency_cur;
const Bytes _mice_thresh;
const PacketSeq _ooo_thresh;
CCType _cc_type;
LBType _lb_type;
NicSchedulingWeights _schedw;
NicTelemetry _telemetry;
Rng _rng;
std::vector<PortQueues> _ports;
std::vector<bool> _port_blacklisted;
PortId _rr_cursor {0};
std::unique_ptr<CongestionControl> _cc;
std::unique_ptr<LoadBalance> _lb;
// Outstanding bytes per flow (TX) for cwnd gating
std::unordered_map<FlowId, Bytes> _tx_outstanding;
// TX timestamps for RTT: (flow, seq) -> send_time
std::unordered_map<std::uint64_t, Time> _tx_sent_at;
};
} // namespace dofs
#endif // NETWORK_NETWORK_NIC_H

View File

@@ -0,0 +1,9 @@
#include "network/network_node.h"
namespace dofs {
NetworkNode::NetworkNode(Simulator *const sim, NodeId id,
NodeType type) noexcept
: Node(sim, id, type) {}
} // namespace dofs

View File

@@ -0,0 +1,24 @@
#ifndef NETWORK_NETWORK_NODE_H
#define NETWORK_NETWORK_NODE_H
#include <cstdint>
#include "core/node.h"
#include "core/types.h"
#include "network/packet.h"
namespace dofs {
class NetworkNode : public Node {
public:
explicit NetworkNode(Simulator *const sim, NodeId id, NodeType type) noexcept;
virtual ~NetworkNode() = default;
virtual void recv_pkt(Packet &pkt, PortId ingress) = 0;
};
} // namespace dofs
#endif // NETWORK_NETWORK_NODE_H

View File

@@ -0,0 +1,238 @@
#include "network/network_switch.h"
#include <algorithm>
#include <iterator>
#include <set>
#include "network/switch/multicast_table.h"
#include "network/switch/unicast_table.h"
namespace dofs {
NetworkSwitch::NetworkSwitch(Simulator *const sim,
NodeId id,
uint16_t total_ports,
ECNEngine *const ecn,
SwitchBuffer *const buf,
const RoutingTables *const rt,
Time forwarding_latency,
Time multicast_dup_delay) noexcept
: NetworkNode(sim, id, NodeType::SWITCH),
_ecn(ecn),
_buf(buf),
_rt(rt),
_port_cnt(total_ports),
_fwd_latency_init(forwarding_latency),
_fwd_latency_cur(forwarding_latency),
_dup_delay(multicast_dup_delay) {
Node::set_status(NodeStatus::OK);
if (!_ecn) {
log_error("ERROR", "NetworkSwitch constructed with null ECN engine");
}
if (!_buf) {
log_error("ERROR", "NetworkSwitch constructed with null SwitchBuffer");
}
if (!_rt) {
log_error("ERROR", "NetworkSwitch constructed with null RoutingTables");
}
}
void NetworkSwitch::set_status(NodeStatus s,
Time new_forward_latency) noexcept {
switch (s) {
case NodeStatus::OK:
Node::set_status(NodeStatus::OK);
_fwd_latency_cur = _fwd_latency_init;
return;
case NodeStatus::THROTTLING:
if (new_forward_latency < _fwd_latency_init) {
log_error("ERROR",
"NetworkSwitch::set_status(THROTTLING) new latency < initial");
return;
}
Node::set_status(NodeStatus::THROTTLING);
_fwd_latency_cur = new_forward_latency;
return;
case NodeStatus::DOWN:
case NodeStatus::BOOTING:
log_error("ERROR", "NetworkSwitch::set_status ignored for DOWN/BOOTING");
return;
}
}
void NetworkSwitch::recv_pkt(Packet &pkt_in, PortId ingress) {
// If switch is DOWN or BOOTING, drop silently
if (status() == NodeStatus::DOWN || status() == NodeStatus::BOOTING) {
return;
}
Packet pkt = pkt_in;
if (_ecn && _buf) {
pkt = _ecn->process_packet(pkt, _buf);
}
if (_sim) {
_sim->schedule_after(_fwd_latency_cur,
std::mem_fn(&NetworkSwitch::forward_after_delay),
this, pkt, ingress);
}
}
void NetworkSwitch::forward_after_delay(Packet pkt, PortId ingress) noexcept {
if (!_buf || !_rt)
return;
// Decide multicast vs unicast based on groups mask
std::vector<PortId> fanout;
fanout.reserve(8); // small typical fanout
const bool is_mc = (pkt.groups() != static_cast<PacketGroups>(0));
if (is_mc) {
build_mc_fanout(pkt, ingress, fanout);
} else {
build_uc_egress(pkt, fanout);
}
// Filter invalid/self ports and keep determinism
fanout.erase(std::remove_if(fanout.begin(), fanout.end(),
[&](PortId p) {
return (p >= _port_cnt) || (p == ingress);
}),
fanout.end());
if (fanout.empty())
return;
const FlowPriority prio = pkt.priority();
for (std::size_t i = 0; i < fanout.size(); ++i) {
const PortId egress = fanout[i];
const Time offset = _dup_delay * Time(i);
if (_sim && offset > Time(0)) {
_sim->schedule_after(offset,
std::mem_fn(&NetworkSwitch::enqueue_one),
this, pkt, egress, prio);
} else {
enqueue_one(pkt, egress, prio);
}
}
}
void NetworkSwitch::enqueue_one(Packet pkt, PortId egress,
FlowPriority prio) noexcept {
(void)_buf->enqueue_packet(pkt, egress, prio);
}
void NetworkSwitch::build_uc_egress(const Packet& pkt,
std::vector<PortId> &out) const {
// Unicast: consult table; if multiple candidates, ECMP-pick one by entropy
std::vector<PortId> candidates =
_rt->unicast.get_port_list(pkt.dst_node(), pkt.dst_port());
if (candidates.empty()) {
// Fallback to dst_port if sane
if (pkt.dst_port() < _port_cnt) {
out.clear();
out.push_back(pkt.dst_port());
}
return;
}
if (candidates.size() == 1) {
out = candidates;
return;
}
// Multiple candidates: pick one deterministically via hash_ecmp
const uint32_t differentiator =
(static_cast<uint32_t>(pkt.entropy()) << 16) ^
static_cast<uint32_t>(pkt.dst_port());
const uint16_t idx = hash_ecmp(pkt.src_node(), pkt.src_port(),
pkt.dst_node(), pkt.dst_port(),
differentiator,
static_cast<uint16_t>(candidates.size()));
out.clear();
out.push_back(candidates[idx % candidates.size()]);
}
void NetworkSwitch::build_mc_fanout(const Packet& pkt, PortId ingress,
std::vector<PortId> &fanout) const {
if (!_rt)
return;
const PacketGroups mask = pkt.groups();
const std::size_t gcount = _rt->multicast.group_count();
for (std::size_t gid = 0; gid < gcount; ++gid) {
if (!group_bit_set(mask, gid))
continue;
const auto* trees = _rt->multicast.trees_of(gid);
if (!trees || trees->empty())
continue;
const uint16_t k = static_cast<uint16_t>(trees->size());
uint16_t chosen_idx = 0;
if (k > 1) {
// Entropy-based multipath spraying:
// differentiator mixes 16b entropy with 16b group index
const uint32_t differentiator =
(static_cast<uint32_t>(pkt.entropy()) << 16) ^
static_cast<uint32_t>(gid & 0xFFFF);
chosen_idx = hash_ecmp(pkt.src_node(), pkt.src_port(),
pkt.dst_node(), pkt.dst_port(),
differentiator, k);
chosen_idx %= k;
}
const McTree& t = (*trees)[chosen_idx];
if (t.parent_port.has_value() && ingress != t.parent_port.value()) {
// Skip this group's replication from the wrong direction
continue;
}
merge_sorted_unique(fanout, t.child_ports);
}
}
void NetworkSwitch::merge_sorted_unique(std::vector<PortId> &base,
const std::vector<PortId> &add) {
if (add.empty())
return;
if (base.empty()) {
base = add;
return;
}
std::vector<PortId> merged;
merged.reserve(base.size() + add.size());
std::set_union(base.begin(), base.end(),
add.begin(), add.end(),
std::back_inserter(merged));
base.swap(merged);
}
inline bool NetworkSwitch::group_bit_set(PacketGroups mask,
std::size_t gid) noexcept {
return (mask >> gid) & static_cast<PacketGroups>(1);
}
} // namespace dofs

View File

@@ -0,0 +1,81 @@
#ifndef NETWORK_NETWORK_SWITCH_H
#define NETWORK_NETWORK_SWITCH_H
#include <cstdint>
#include <vector>
#include "core/time.h"
#include "core/types.h"
#include "core/error.h"
#include "network/network_node.h"
#include "network/packet.h"
#include "network/switch/ecn_engine.h"
#include "network/switch/switch_buffer.h"
#include "network/switch/routing_tables.h"
#include "network/switch/routing_alg.h"
namespace dofs {
class NetworkSwitch final : public NetworkNode {
public:
NetworkSwitch(Simulator *const sim,
NodeId id,
uint16_t total_ports,
ECNEngine *const ecn,
SwitchBuffer *const buf,
const RoutingTables *const rt,
Time forwarding_latency,
Time multicast_dup_delay) noexcept;
virtual void recv_pkt(Packet &pkt, PortId ingress) override;
// Status control:
// - OK: restore forwarding latency to initial
// - THROTTLING(new_latency): set _fwd_latency_cur = new_latency (>= initial)
// - DOWN/BOOTING: ignored with error log
void set_status(NodeStatus s, Time new_forward_latency = Time(0)) noexcept;
NodeStatus get_status() const noexcept {
return status();
}
uint16_t port_cnt() const noexcept {
return _port_cnt;
}
private:
void forward_after_delay(Packet pkt, PortId ingress) noexcept;
// Schedules an enqueue of a single copy after delay offset
void enqueue_one(Packet pkt, PortId egress, FlowPriority prio) noexcept;
// Build multicast fanout: pick exactly one tree per active group bit,
// optionally enforcing RPF via parent_port, and union child ports.
void build_mc_fanout(const Packet& pkt, PortId ingress,
std::vector<PortId> &fanout) const;
// Build unicast egress list: if multiple candidates, ECMP-pick one via hash_ecmp.
void build_uc_egress(const Packet& pkt, std::vector<PortId> &out) const;
static void merge_sorted_unique(std::vector<PortId> &base,
const std::vector<PortId> &add);
// Test if group bit gid is set in PacketGroups
static inline bool group_bit_set(PacketGroups mask, std::size_t gid) noexcept;
private:
ECNEngine *const _ecn;
SwitchBuffer *const _buf;
const RoutingTables *const _rt;
const uint16_t _port_cnt;
const Time _fwd_latency_init;
Time _fwd_latency_cur;
const Time _dup_delay;
};
} // namespace dofs
#endif // NETWORK_NETWORK_SWITCH_H

View File

@@ -0,0 +1,50 @@
# --- Library: dofs_nic ---
add_library(dofs_nic STATIC)
target_include_directories(dofs_nic
PUBLIC
$<BUILD_INTERFACE:${CMAKE_SOURCE_DIR}/src> # "network/...", "core/..."
$<BUILD_INTERFACE:${CMAKE_SOURCE_DIR}/src/network> # ** enables "nic/..." **
$<INSTALL_INTERFACE:include>
)
# Sources
if(DOFS_GLOB_SOURCES)
file(GLOB NIC_CC CONFIGURE_DEPENDS "${CMAKE_CURRENT_SOURCE_DIR}/*.cc")
target_sources(dofs_nic PRIVATE ${NIC_CC})
else()
target_sources(dofs_nic
PUBLIC
nic_telemetry.h
congestion_control.h
load_balance.h
PRIVATE
congestion_control.cc
load_balance.cc
)
endif()
target_link_libraries(dofs_nic PUBLIC dofs_network dofs_core)
add_library(dofs::nic ALIAS dofs_nic)
# --- Tooling stubs for nic headers ---
file(GLOB NIC_HEADERS CONFIGURE_DEPENDS "${CMAKE_CURRENT_SOURCE_DIR}/*.h")
set(NIC_STUBS)
foreach(h ${NIC_HEADERS})
get_filename_component(_base "${h}" NAME_WE)
set(_stub "${CMAKE_CURRENT_BINARY_DIR}/nic_${_base}.cpp")
file(WRITE "${_stub}" "// stub nic/${_base}.h\n#include \"nic/${_base}.h\"\n")
list(APPEND NIC_STUBS "${_stub}")
endforeach()
add_library(dofs_nic_headers_tooling STATIC ${NIC_STUBS})
# Make sure stubs see the SAME include roots
target_include_directories(dofs_nic_headers_tooling
PRIVATE
$<BUILD_INTERFACE:${CMAKE_SOURCE_DIR}/src>
$<BUILD_INTERFACE:${CMAKE_SOURCE_DIR}/src/network> # ** not src/network/nic **
)
target_link_libraries(dofs_nic_headers_tooling PRIVATE dofs_nic dofs_network dofs_core)

View File

@@ -0,0 +1,100 @@
#include "network/nic/congestion_control.h"
namespace dofs {
CongestionControl::CongestionControl(Bytes init_cwnd, Bytes max_cwnd) noexcept
: _cwnd(init_cwnd), _cwnd_max(max_cwnd) {
if (_cwnd > _cwnd_max)
_cwnd = _cwnd_max;
if (_cwnd == Bytes(0))
_cwnd = Bytes(1);
}
bool CongestionControl::is_allowed_to_send(Bytes bytes_outstanding,
Bytes next_bytes) const noexcept {
const Bytes needed = bytes_outstanding + next_bytes;
return needed <= _cwnd;
}
// IMPORTANT: placeholders
DCQCN::DCQCN(Bytes init_cwnd, Bytes max_cwnd) noexcept
: CongestionControl(init_cwnd, max_cwnd) {}
void DCQCN::update(const Packet& pkt, Time rtt) noexcept {
// Extremely simple placeholder logic:
// - If ECN marked or TRIM_BACK/NACK, cut cwnd multiplicatively.
// - If ACK of data (no ECN), increase cwnd additively (slowly).
// This gives you a cwnd-based shape; replace with real DCQCN later.
const bool is_ctrl = (pkt.protocol() == PacketProtocol::ACK ||
pkt.protocol() == PacketProtocol::NACK ||
pkt.protocol() == PacketProtocol::HEADER_TRIM_BACK);
if (pkt.protocol() == PacketProtocol::HEADER_TRIM_BACK ||
pkt.protocol() == PacketProtocol::NACK ||
pkt.is_ecn()) {
// MD: halve cwnd, floor at 1 MSS
Bytes new_cwnd = _cwnd / 2;
if (new_cwnd < DEFAULT_MSS_BYTES)
new_cwnd = DEFAULT_MSS_BYTES;
_cwnd = new_cwnd;
if (_cwnd > _cwnd_max)
_cwnd = _cwnd_max;
(void)rtt;
(void)is_ctrl;
return;
}
// Additive increase on clean ACKs; bound by cwnd_max.
if (pkt.protocol() == PacketProtocol::ACK) {
Bytes inc = DEFAULT_MSS_BYTES; // 1 MSS per RTT-ish sample
Bytes new_cwnd = _cwnd + inc;
if (new_cwnd > _cwnd_max)
new_cwnd = _cwnd_max;
_cwnd = new_cwnd;
(void)rtt;
(void)is_ctrl;
}
}
NSCC::NSCC(Bytes init_cwnd, Bytes max_cwnd) noexcept
: CongestionControl(init_cwnd, max_cwnd) {}
void NSCC::update(const Packet& pkt, Time rtt) noexcept {
if (pkt.protocol() == PacketProtocol::HEADER_TRIM_BACK ||
pkt.protocol() == PacketProtocol::NACK ||
pkt.is_ecn()) {
Bytes new_cwnd = (_cwnd * 3) / 4; // 25% cut
if (new_cwnd < DEFAULT_MSS_BYTES)
new_cwnd = DEFAULT_MSS_BYTES;
_cwnd = new_cwnd;
if (_cwnd > _cwnd_max)
_cwnd = _cwnd_max;
(void)rtt;
return;
}
if (pkt.protocol() == PacketProtocol::ACK) {
Bytes inc = DEFAULT_MSS_BYTES;
Bytes new_cwnd = _cwnd + inc;
if (new_cwnd > _cwnd_max)
new_cwnd = _cwnd_max;
_cwnd = new_cwnd;
(void)rtt;
}
}
} // namespace dofs

View File

@@ -0,0 +1,47 @@
#ifndef NETWORK_NIC_CONGESTION_CONTROL_H
#define NETWORK_NIC_CONGESTION_CONTROL_H
#include <cstdint>
#include "core/time.h"
#include "core/types.h"
#include "network/packet.h"
namespace dofs {
class CongestionControl {
public:
explicit CongestionControl(Bytes init_cwnd, Bytes max_cwnd) noexcept;
virtual ~CongestionControl() = default;
virtual void update(const Packet& pkt, Time rtt) noexcept = 0;
virtual bool is_allowed_to_send(Bytes bytes_outstanding,
Bytes next_bytes) const noexcept;
Bytes cwnd() const noexcept {
return _cwnd;
}
Bytes cwnd_max() const noexcept {
return _cwnd_max;
}
protected:
Bytes _cwnd;
Bytes _cwnd_max;
};
class DCQCN final : public CongestionControl {
public:
explicit DCQCN(Bytes init_cwnd, Bytes max_cwnd) noexcept;
virtual void update(const Packet& pkt, Time rtt) noexcept override;
};
class NSCC final : public CongestionControl {
public:
explicit NSCC(Bytes init_cwnd, Bytes max_cwnd) noexcept;
virtual void update(const Packet& pkt, Time rtt) noexcept override;
};
} // namespace dofs
#endif // NETWORK_NIC_CONGESTION_CONTROL_H

View File

@@ -0,0 +1,18 @@
#include "network/nic/load_balance.h"
namespace dofs {
void LBRandomPacketSpraying::update(const Packet& pkt) noexcept {
(void)pkt;
}
uint16_t LBRandomPacketSpraying::get_entropy(const Packet& context) noexcept {
(void)context;
if (!_rng)
return 0;
return static_cast<uint16_t>(_rng->uniform_range<std::uint32_t>(0, 0x10000u));
}
} // namespace dofs

View File

@@ -0,0 +1,36 @@
#ifndef NETWORK_NIC_LOAD_BALANCE_H
#define NETWORK_NIC_LOAD_BALANCE_H
#include <cstdint>
#include "network/packet.h"
#include "core/rng.h"
namespace dofs {
// Base class for NIC load balancing policy.
// Only exposes update() and get_entropy().
class LoadBalance {
public:
explicit LoadBalance(Rng *const rng) noexcept : _rng(rng) {}
virtual ~LoadBalance() = default;
virtual void update(const Packet& pkt) noexcept = 0;
// Produce a 16-bit entropy value for the next packet.
virtual uint16_t get_entropy(const Packet& context) noexcept = 0;
protected:
Rng *const _rng;
};
// Random per-packet spraying (testing/baseline).
class LBRandomPacketSpraying final : public LoadBalance {
public:
explicit LBRandomPacketSpraying(Rng *const rng) noexcept : LoadBalance(rng) {}
virtual void update(const Packet& pkt) noexcept override;
virtual uint16_t get_entropy(const Packet& context) noexcept override;
};
} // namespace dofs
#endif // NETWORK_NIC_LOAD_BALANCE_H

View File

@@ -0,0 +1,28 @@
#ifndef NETWORK_NIC_NIC_TELEMETRY_H
#define NETWORK_NIC_NIC_TELEMETRY_H
#include <cstdint>
namespace dofs {
struct NicTelemetry {
std::uint64_t rx_acks {0};
std::uint64_t rx_nacks {0};
std::uint64_t rx_trims {0};
std::uint64_t rx_data {0};
std::uint64_t tx_acks {0};
std::uint64_t tx_nacks {0};
std::uint64_t tx_trims {0};
std::uint64_t tx_data {0};
std::uint64_t retrans {0};
std::uint64_t timeouts {0};
std::uint64_t cc_updates {0};
std::uint64_t lb_updates {0};
};
} // namespace dofs
#endif // NETWORK_NIC_NIC_TELEMETRY_H

180
src/network/packet.cc Normal file
View File

@@ -0,0 +1,180 @@
#include "network/packet.h"
namespace dofs {
Packet::Packet(NodeId src_node,
PortId src_port,
NodeId dst_node,
PortId dst_port,
PacketProtocol proto,
FlowPriority prio,
PacketSeq seq,
FlowId flow,
uint16_t entropy,
uint8_t notifications,
Bytes payload_bytes) noexcept
: _src_node(src_node),
_src_port(src_port),
_dst_node(dst_node),
_dst_port(dst_port),
_protocol(proto),
_seq(seq),
_flow(flow),
_entropy(entropy),
_payload_size((proto == PacketProtocol::DATA) ?
payload_bytes : Bytes{0}),
_groups(0),
_notifications(static_cast<uint8_t>(notifications & 0x07)) {
const uint8_t p = static_cast<uint8_t>(prio) & 0x03;
_notifications = static_cast<uint8_t>(
(_notifications & ~PRIORITY_MASK) |
(p << PRIORITY_SHIFT));
}
NodeId Packet::src_node() const noexcept {
return _src_node;
}
PortId Packet::src_port() const noexcept {
return _src_port;
}
NodeId Packet::dst_node() const noexcept {
return _dst_node;
}
PortId Packet::dst_port() const noexcept {
return _dst_port;
}
PacketProtocol Packet::protocol() const noexcept {
return _protocol;
}
PacketSeq Packet::seq() const noexcept {
return _seq;
}
FlowId Packet::flow_id() const noexcept {
return _flow;
}
uint32_t Packet::entropy() const noexcept {
return _entropy;
}
void Packet::set_src_node(NodeId n) noexcept {
_src_node = n;
}
void Packet::set_src_port(PortId p) noexcept {
_src_port = p;
}
void Packet::set_dst_node(NodeId n) noexcept {
_dst_node = n;
}
void Packet::set_dst_port(PortId p) noexcept {
_dst_port = p;
}
void Packet::set_seq(PacketSeq s) noexcept {
_seq = s;
}
void Packet::set_flow_id(FlowId f) noexcept {
_flow = f;
}
void Packet::set_entropy(uint32_t e) noexcept {
_entropy = e;
}
void Packet::set_protocol(PacketProtocol p) noexcept {
_protocol = p;
}
void Packet::set_payload_size(Bytes size) noexcept {
_payload_size = size;
}
void Packet::set_ecn_enabled(bool v) noexcept {
if (v)
_notifications |= ECN_ENABLED_MASK;
else
_notifications &= ~ECN_ENABLED_MASK;
}
void Packet::set_ecn_marked(bool v) noexcept {
if (v)
_notifications |= ECN_MARK_MASK;
else
_notifications &= ~ECN_MARK_MASK;
}
void Packet::set_eof(bool v) noexcept {
if (v)
_notifications |= EOF_MASK;
else
_notifications &= ~EOF_MASK;
}
bool Packet::is_ecn_enabled() const noexcept {
return (_notifications & ECN_ENABLED_MASK) != 0;
}
bool Packet::is_ecn() const noexcept {
return _notifications & ECN_MARK_MASK;
}
bool Packet::is_eof() const noexcept {
return (_notifications & EOF_MASK) != 0;
}
FlowPriority Packet::priority() const noexcept {
const uint8_t pr = static_cast<uint8_t>(
(_notifications & PRIORITY_MASK) >>
PRIORITY_SHIFT);
return static_cast<FlowPriority>(pr);
}
uint8_t Packet::priority_raw() const noexcept {
return static_cast<uint8_t>((_notifications & PRIORITY_MASK) >>
PRIORITY_SHIFT);
}
Bytes Packet::header_size() const noexcept {
return HEADER_SIZE_BYTES;
}
Bytes Packet::payload_size() const noexcept {
switch (_protocol) {
case PacketProtocol::DATA:
return _payload_size;
case PacketProtocol::ACK:
case PacketProtocol::NACK:
return 0;
case PacketProtocol::HEADER_TRIM:
case PacketProtocol::HEADER_TRIM_BACK:
return 0;
}
return 0;
}
Bytes Packet::total_size() const noexcept {
switch (_protocol) {
case PacketProtocol::DATA:
return HEADER_SIZE_BYTES + _payload_size;
case PacketProtocol::ACK:
case PacketProtocol::NACK:
return ACK_SIZE_BYTES;
case PacketProtocol::HEADER_TRIM:
case PacketProtocol::HEADER_TRIM_BACK:
return HEADER_SIZE_BYTES;
}
return HEADER_SIZE_BYTES;
}
PacketGroups Packet::groups() const noexcept {
return _groups;
}
void Packet::set_groups(PacketGroups g) noexcept {
_groups = g;
}
void Packet::add_groups(PacketGroups gmask) noexcept {
_groups |= gmask;
}
} // namespace dofs

97
src/network/packet.h Normal file
View File

@@ -0,0 +1,97 @@
#ifndef NETWORK_PACKET_H
#define NETWORK_PACKET_H
#include <cstdint>
#include <memory>
#include "core/types.h"
#include "core/node.h"
namespace dofs {
inline const std::size_t MAX_GROUPS = 128;
inline Bytes HEADER_SIZE_BYTES = 64;
inline Bytes ACK_SIZE_BYTES = 64;
inline Bytes DEFAULT_MTU_BYTES = 4160;
inline Bytes DEFAULT_MSS_BYTES = 4096;
class Packet {
public:
Packet(NodeId src_node,
PortId src_port,
NodeId dst_node,
PortId dst_port,
PacketProtocol proto,
FlowPriority prio,
PacketSeq seq = 0,
FlowId flow = 0,
uint16_t entropy = 0,
uint8_t notifications = 0,
Bytes payload_bytes = DEFAULT_MSS_BYTES) noexcept;
NodeId src_node() const noexcept;
PortId src_port() const noexcept;
NodeId dst_node() const noexcept;
PortId dst_port() const noexcept;
PacketProtocol protocol() const noexcept;
PacketSeq seq() const noexcept;
FlowId flow_id() const noexcept;
uint32_t entropy() const noexcept;
void set_src_node(NodeId n) noexcept;
void set_src_port(PortId p) noexcept;
void set_dst_node(NodeId n) noexcept;
void set_dst_port(PortId p) noexcept;
void set_seq(PacketSeq s) noexcept;
void set_flow_id(FlowId f) noexcept;
void set_entropy(uint32_t e) noexcept;
void set_protocol(PacketProtocol p) noexcept;
void set_payload_size(Bytes size) noexcept;
void set_ecn_enabled(bool v) noexcept;
void set_ecn_marked(bool v) noexcept;
// end of flow
void set_eof(bool v) noexcept;
bool is_ecn_enabled() const noexcept;
bool is_ecn() const noexcept;
bool is_eof() const noexcept;
FlowPriority priority() const noexcept;
uint8_t priority_raw() const noexcept;
Bytes header_size() const noexcept;
Bytes payload_size() const noexcept;
Bytes total_size() const noexcept;
PacketGroups groups() const noexcept;
void set_groups(PacketGroups g) noexcept;
void add_groups(PacketGroups gmask) noexcept;
private:
NodeId _src_node;
PortId _src_port;
NodeId _dst_node;
PortId _dst_port;
PacketProtocol _protocol;
PacketSeq _seq;
FlowId _flow;
uint16_t _entropy;
Bytes _payload_size;
PacketGroups _groups;
uint8_t _notifications;
static constexpr uint8_t ECN_ENABLED_MASK = 0x01;
static constexpr uint8_t ECN_MARK_MASK = 0x02;
static constexpr uint8_t EOF_MASK = 0x04;
static constexpr uint8_t PRIORITY_MASK = 0x18;
static constexpr uint8_t PRIORITY_SHIFT = 3;
};
} // namespace dofs
#endif // NETWORK_PACKET_H

View File

@@ -0,0 +1,63 @@
# --- Library: dofs_switch ---
add_library(dofs_switch STATIC)
target_include_directories(dofs_switch
PUBLIC
$<BUILD_INTERFACE:${CMAKE_SOURCE_DIR}/src> # "network/...", "core/..."
$<BUILD_INTERFACE:${CMAKE_SOURCE_DIR}/src/network> # ** enables "switch/..." **
$<INSTALL_INTERFACE:include>
)
# Sources
if(DOFS_GLOB_SOURCES)
file(GLOB SWITCH_CC CONFIGURE_DEPENDS "${CMAKE_CURRENT_SOURCE_DIR}/*.cc")
target_sources(dofs_switch PRIVATE ${SWITCH_CC})
else()
target_sources(dofs_switch
PUBLIC
routing_alg.h
routing_tables.h
unicast_table.h
multicast_table.h
switch_buffer.h
shared_buffer.h
dedicated_buffer.h
ecn_engine.h
ecn_shared_red.h
ecn_dedicated_red.h
PRIVATE
routing_alg.cc
unicast_table.cc
multicast_table.cc
switch_buffer.cc
shared_buffer.cc
dedicated_buffer.cc
ecn_shared_red.cc
ecn_dedicated_red.cc
)
endif()
target_link_libraries(dofs_switch PUBLIC dofs_network dofs_core)
add_library(dofs::switch ALIAS dofs_switch)
# --- Tooling stubs for switch headers ---
file(GLOB SWITCH_HEADERS CONFIGURE_DEPENDS "${CMAKE_CURRENT_SOURCE_DIR}/*.h")
set(SWITCH_STUBS)
foreach(h ${SWITCH_HEADERS})
get_filename_component(_base "${h}" NAME_WE)
set(_stub "${CMAKE_CURRENT_BINARY_DIR}/switch_${_base}.cpp")
file(WRITE "${_stub}" "// stub switch/${_base}.h\n#include \"switch/${_base}.h\"\n")
list(APPEND SWITCH_STUBS "${_stub}")
endforeach()
add_library(dofs_switch_headers_tooling STATIC ${SWITCH_STUBS})
# Make sure stubs see the SAME include roots
target_include_directories(dofs_switch_headers_tooling
PRIVATE
$<BUILD_INTERFACE:${CMAKE_SOURCE_DIR}/src>
$<BUILD_INTERFACE:${CMAKE_SOURCE_DIR}/src/network> # ** not src/network/switch **
)
target_link_libraries(dofs_switch_headers_tooling PRIVATE dofs_switch dofs_network dofs_core)

View File

@@ -0,0 +1,65 @@
#include "network/switch/dedicated_buffer.h"
namespace dofs {
DedicatedBuffer::DedicatedBuffer(Simulator* const sim,
NetworkSwitch* const owner,
Bytes total_bytes,
uint16_t ports)
: SwitchBuffer(sim, owner, SwitchBufferType::DEDICATED, total_bytes, ports),
_per_port_cap(ports ? total_bytes / ports : Bytes(0)),
_per_port_queues(ports)
{}
std::array<std::deque<SwitchBuffer::Queued>, SwitchBuffer::PRI_COUNT>&DedicatedBuffer::queues_for(
PortId p) {
return _per_port_queues[p];
}
const std::array<std::deque<SwitchBuffer::Queued>, SwitchBuffer::PRI_COUNT>&DedicatedBuffer::queues_for(
PortId p) const {
return _per_port_queues[p];
}
bool DedicatedBuffer::on_enqueue_cap_check(PortId port, Bytes sz) {
if (port >= _port_cnt)
return false;
// Guard per-port cap
if (_per_port_bytes[port] + sz > _per_port_cap)
return false;
return true;
}
void DedicatedBuffer::on_enqueue_commit(PortId port, Bytes sz) {
_per_port_bytes[port] += sz;
}
void DedicatedBuffer::on_dequeue_commit(PortId port, Bytes sz) {
_per_port_bytes[port] -= sz;
}
bool DedicatedBuffer::enqueue_packet(const Packet& pkt, PortId egress,
FlowPriority prio) {
if (egress >= _port_cnt)
return false;
const Bytes sz = pkt.total_size();
if (!on_enqueue_cap_check(egress, sz))
return false;
Queued q{pkt, egress, prio, _sim->now(), sz};
auto& qset = queues_for(egress);
qset[to_idx(prio)].push_back(std::move(q));
on_enqueue_commit(egress, sz);
schedule_drain_if_needed(egress);
return true;
}
bool DedicatedBuffer::drain_one(PortId port) {
return drain_one_common(port);
}
} // namespace dofs

View File

@@ -0,0 +1,38 @@
#ifndef NETWORK_SWITCH_DEDICATED_BUFFER_H
#define NETWORK_SWITCH_DEDICATED_BUFFER_H
#include "network/switch/switch_buffer.h"
namespace dofs {
// DedicatedBuffer: buffer_size is split across ports (equal split by default).
// Each port has a hard cap; no borrowing.
class DedicatedBuffer : public SwitchBuffer {
public:
DedicatedBuffer(Simulator* const sim,
NetworkSwitch* const owner,
Bytes total_bytes,
uint16_t ports);
bool enqueue_packet(const Packet& pkt, PortId egress,
FlowPriority prio) override;
bool drain_one(PortId port) override;
protected:
std::array<std::deque<Queued>, PRI_COUNT> &queues_for(PortId p) override;
const std::array<std::deque<Queued>, PRI_COUNT> &queues_for(
PortId p) const override;
bool on_enqueue_cap_check(PortId port, Bytes sz) override;
void on_enqueue_commit(PortId port, Bytes sz) override;
void on_dequeue_commit(PortId port, Bytes sz) override;
private:
// Hard cap per port (equal share of total; final remainder goes to low ports).
Bytes _per_port_cap;
std::vector<std::array<std::deque<Queued>, PRI_COUNT >> _per_port_queues;
};
} // namespace dofs
#endif // NETWORK_SWITCH_DEDICATED_BUFFER_H

View File

@@ -0,0 +1,50 @@
#include "network/switch/ecn_dedicated_red.h"
#include <algorithm>
#include <cmath>
namespace dofs {
Packet &DedicatedREDEngine::process_packet(Packet &pkt,
SwitchBuffer* buf) noexcept {
if (!buf)
return pkt;
const PortId port = pkt.dst_port();
const uint16_t pc = buf->port_cnt();
if (port >= pc)
return pkt;
ensure_size(pc);
// instantaneous queue (bytes) for the egress port
const double q_inst = static_cast<double>(buf->port_buffered(port));
double &q_avg = _avg_bytes[port];
q_avg = ewma(q_inst, q_avg);
const double min_th = static_cast<double>(_min_th);
const double max_th = static_cast<double>(_max_th);
if (q_avg <= min_th) {
// Below min_th: no marking
return pkt;
}
if (q_avg >= max_th) {
// Above max_th: trim (replace drop)
header_trim(pkt, _back_to_sender);
return pkt;
}
// Between thresholds: mark ECN with probability p(q)
const double p = _p_max * (q_avg - min_th) / std::max(1.0, (max_th - min_th));
if (_rng && _rng->uniform01() < p) {
maybe_mark_ecn(pkt);
}
return pkt;
}
} // namespace dofs

View File

@@ -0,0 +1,54 @@
#ifndef NETWORK_SWITCH_ECN_DEDICATED_RED_H
#define NETWORK_SWITCH_ECN_DEDICATED_RED_H
#include <vector>
#include "network/switch/ecn_engine.h"
#include "network/switch/switch_buffer.h"
#include "core/rng.h"
namespace dofs {
// Classic per-port RED over DedicatedBuffer caps.
// Probabilistic ECN in [min_th, max_th]; trim at > max_th.
class DedicatedREDEngine final : public ECNEngine {
public:
DedicatedREDEngine(Bytes min_th,
Bytes max_th,
double p_max,
bool back_to_sender,
Rng *const rng) noexcept
: _min_th(min_th),
_max_th(max_th),
_p_max(p_max),
_back_to_sender(back_to_sender),
_rng(rng) {}
virtual Packet &process_packet(Packet &pkt,
SwitchBuffer *buf) noexcept override;
private:
// EWMA queue estimator per port (bytes)
void ensure_size(uint16_t port_cnt) {
if (_avg_bytes.size() != port_cnt)
_avg_bytes.assign(port_cnt, 0.0);
}
double ewma(double cur, double avg) const noexcept {
// w_q ~ 1/8 for responsiveness without flapping
constexpr double W = 0.125;
return (1.0 - W) * avg + W * cur;
}
private:
Bytes _min_th;
Bytes _max_th;
double _p_max; // in [0,1]
bool _back_to_sender; // whether to trim back to sender
Rng *const _rng; // not owned
std::vector<double> _avg_bytes;
};
} // namespace dofs
#endif // NETWORK_SWITCH_ECN_DEDICATED_RED_H

View File

@@ -0,0 +1,51 @@
#ifndef NETWORK_SWITCH_ECN_ENGINE_H
#define NETWORK_SWITCH_ECN_ENGINE_H
#include <cstdint>
#include "core/types.h"
#include "network/packet.h"
namespace dofs {
class SwitchBuffer;
class ECNEngine {
public:
virtual ~ECNEngine() = default;
// Decide ECN mark / header-trim for a packet about to enter 'buf'.
// Returns the same packet reference, possibly modified in-place.
virtual Packet &process_packet(Packet &pkt, SwitchBuffer* buf) noexcept = 0;
protected:
// Helpers for derived engines
// Now defined directly in the header
static inline void maybe_mark_ecn(Packet &pkt) noexcept {
if (pkt.is_ecn_enabled())
pkt.set_ecn_marked(true);
}
// Trim payload to zero and set protocol accordingly.
// If back_to_sender, flip src/dst like an RX NIC reflection.
static inline void header_trim(Packet &pkt, bool back_to_sender) noexcept {
pkt.set_payload_size(Bytes(0));
if (back_to_sender) {
const NodeId s_n = pkt.src_node();
const PortId s_p = pkt.src_port();
const NodeId d_n = pkt.dst_node();
const PortId d_p = pkt.dst_port();
pkt.set_src_node(d_n);
pkt.set_src_port(d_p);
pkt.set_dst_node(s_n);
pkt.set_dst_port(s_p);
pkt.set_protocol(PacketProtocol::HEADER_TRIM_BACK);
} else {
pkt.set_protocol(PacketProtocol::HEADER_TRIM);
}
}
};
} // namespace dofs
#endif // NETWORK_SWITCH_ECN_ENGINE_

View File

@@ -0,0 +1,126 @@
#include "network/switch/ecn_shared_red.h"
#include <algorithm>
#include <numeric>
#include <cmath>
namespace dofs {
SharedREDEngine::Thresholds SharedREDEngine::dynamic_thresholds(
const Packet &pkt,
const SwitchBuffer *buf,
double total_bytes_avg) const {
const Bytes cap_b = buf->buffer_size();
const double cap = static_cast<double>(cap_b);
if (cap <= 0.0) {
return {0.0, 0.0, 0.0, false};
}
// Active ports heuristic
const auto &per = buf->ports_buffered();
const std::size_t active = std::max<std::size_t>(1,
std::count_if(per.begin(), per.end(), [](Bytes b) {
return b > 0;
}));
const double fair_share = cap / static_cast<double>(active);
// Class factors: be generous to CTRL, moderate for MICE, strict for ELE.
double alpha = 0.20, beta = 0.85, pmax = 0.10; // mice defaults
bool allow_trim = true;
if (is_ctrl(pkt)) {
alpha = 0.80;
beta = 0.98;
pmax = 0.02;
allow_trim = false; // almost no marking, never trim
} else if (is_ele(pkt)) {
alpha = 0.10;
beta = 0.60;
pmax = 0.25;
allow_trim = true; // aggressive on elephants
}
// Dynamic bias: if total avg usage is high, tighten thresholds a bit
// shrink factor in [0.8, 1.0] as pool fills
const double util = std::clamp(total_bytes_avg / cap, 0.0, 1.0);
const double tighten = 1.0 - 0.2 * util;
const double min_th = alpha * fair_share * tighten;
const double max_th = std::max(min_th + 1.0, beta * fair_share * tighten);
return {min_th, max_th, pmax, allow_trim};
}
Packet &SharedREDEngine::process_packet(Packet &pkt,
SwitchBuffer *buf) noexcept {
if (!buf)
return pkt;
const uint16_t pc = buf->port_cnt();
ensure_size(pc);
// Update EWMA of total bytes in the shared pool.
const auto &per = buf->ports_buffered();
const double total_now = std::accumulate(per.begin(), per.end(), 0.0,
[](double acc, Bytes b) {
return acc + static_cast<double>(b);
});
_avg_total_bytes = ewma(total_now, _avg_total_bytes);
// Also keep per-port EWMA to stabilize active-port detection (optional).
for (uint16_t p = 0; p < pc; ++p) {
_avg_port_bytes[p] = ewma(static_cast<double>(per[p]),
_avg_port_bytes[p]);
}
// Use per-port instantaneous occupancy for the target port.
const PortId port = pkt.dst_port();
if (port >= pc)
return pkt;
// Derive class-aware dynamic thresholds
const auto thr = dynamic_thresholds(pkt, buf, _avg_total_bytes);
// Choose which queue measure to feed RED: per-port EWMA
const double q_avg = _avg_port_bytes[port];
if (q_avg <= thr.min_th) {
// Well below min_th: no mark
return pkt;
}
if (q_avg >= thr.max_th) {
// Above max_th: trim (unless forbidden for class)
if (thr.allow_trim) {
header_trim(pkt, false);
} else {
// CTRL: at worst, mark ECN if allowed
maybe_mark_ecn(pkt);
}
return pkt;
}
// Between thresholds: probabilistic ECN
const double p = thr.p_max * (q_avg - thr.min_th) / std::max(1.0,
(thr.max_th - thr.min_th));
if (_rng) {
if (_rng->uniform01() < p) {
maybe_mark_ecn(pkt);
}
} else {
// Deterministic fallback: mark if over mid-point
const double mid = 0.5 * (thr.min_th + thr.max_th);
if (q_avg > mid)
maybe_mark_ecn(pkt);
}
return pkt;
}
} // namespace dofs

View File

@@ -0,0 +1,71 @@
#ifndef NETWORK_SWITCH_ECN_SHARED_RED_H
#define NETWORK_SWITCH_ECN_SHARED_RED_H
#include <vector>
#include "network/switch/ecn_engine.h"
#include "network/switch/switch_buffer.h"
#include "core/rng.h"
namespace dofs {
// Dynamic-threshold RED for a shared pool.
// Intuition:
// - Let active ports = count of ports with non-zero backlog.
// - Per-port fair share ~ buffer_size / max(1, active).
// - Class tweak: CTRL lenient, ELE stricter.
// - min_th = alpha * share; max_th = beta * share; (alpha < beta < 1)
// - ECN prob grows linearly; above max_th → header trim (not drop).
class SharedREDEngine final : public ECNEngine {
public:
explicit SharedREDEngine(Rng *const rng = nullptr) noexcept
: _rng(rng),
_avg_total_bytes(0.0),
_avg_port_bytes() {}
virtual Packet &process_packet(Packet &pkt,
SwitchBuffer *buf) noexcept override;
private:
void ensure_size(uint16_t port_cnt) {
if (_avg_port_bytes.size() != port_cnt)
_avg_port_bytes.assign(port_cnt, 0.0);
}
static inline bool is_ctrl(const Packet &p) noexcept {
return p.priority() == FlowPriority::CTRL;
}
static inline bool is_mice(const Packet &p) noexcept {
return p.priority() == FlowPriority::MICE;
}
static inline bool is_ele (const Packet &p) noexcept {
return p.priority() == FlowPriority::ELEPHANT;
}
double ewma(double cur, double avg) const noexcept {
constexpr double W = 0.125; // same 1/8 smoothing
return (1.0 - W) * avg + W * cur;
}
struct Thresholds {
double min_th;
double max_th;
double p_max; // cap for ECN probability
bool allow_trim; // whether we ever trim this class
};
// Compute dynamic thresholds from shared state and class.
Thresholds dynamic_thresholds(const Packet &pkt,
const SwitchBuffer *buf,
double total_bytes_avg) const;
private:
Rng *const _rng;
// EWMA of total bytes (shared pool) and per-port bytes
double _avg_total_bytes;
std::vector<double> _avg_port_bytes;
};
} // namespace dofs
#endif // NETWORK_SWITCH_ECN_SHARED_RED_H

View File

@@ -0,0 +1,208 @@
#include "network/switch/multicast_table.h"
#include <set>
#include <iterator>
namespace dofs {
MulticastTable::MulticastTable()
: _groups(MAX_GROUPS) {}
std::size_t MulticastTable::group_count() const noexcept {
return _groups.size();
}
bool MulticastTable::add_tree(std::size_t group_id, uint16_t tree_id) {
if (group_id >= _groups.size())
return false;
auto& trees = _groups[group_id];
auto it = std::find_if(trees.begin(), trees.end(),
[&](const McTree & t) {
return t.tree_id == tree_id;
});
if (it != trees.end())
return false;
McTree t;
t.tree_id = tree_id;
trees.insert(std::lower_bound(trees.begin(), trees.end(), tree_id,
[](const McTree & a, uint16_t id) {
return a.tree_id < id;
}),
t);
return true;
}
bool MulticastTable::delete_tree(std::size_t group_id, uint16_t tree_id) {
if (group_id >= _groups.size())
return false;
auto& trees = _groups[group_id];
auto it = std::find_if(trees.begin(), trees.end(),
[&](const McTree & t) {
return t.tree_id == tree_id;
});
if (it == trees.end())
return false;
trees.erase(it);
return true;
}
bool MulticastTable::add_child_port(std::size_t group_id, uint16_t tree_id,
PortId out_port) {
if (group_id >= _groups.size())
return false;
auto& trees = _groups[group_id];
auto it = std::find_if(trees.begin(), trees.end(),
[&](const McTree & t) {
return t.tree_id == tree_id;
});
if (it == trees.end())
return false;
return insert_sorted_unique(it->child_ports, out_port);
}
bool MulticastTable::delete_child_port(std::size_t group_id, uint16_t tree_id,
PortId out_port) {
if (group_id >= _groups.size())
return false;
auto& trees = _groups[group_id];
auto it = std::find_if(trees.begin(), trees.end(),
[&](const McTree & t) {
return t.tree_id == tree_id;
});
if (it == trees.end())
return false;
return erase_sorted(it->child_ports, out_port);
}
bool MulticastTable::set_parent(std::size_t group_id, uint16_t tree_id,
PortId parent) {
if (group_id >= _groups.size())
return false;
auto& trees = _groups[group_id];
auto it = std::find_if(trees.begin(), trees.end(),
[&](const McTree & t) {
return t.tree_id == tree_id;
});
if (it == trees.end())
return false;
it->parent_port = parent;
return true;
}
bool MulticastTable::set_weight(std::size_t group_id, uint16_t tree_id,
uint8_t w) {
if (group_id >= _groups.size())
return false;
auto& trees = _groups[group_id];
auto it = std::find_if(trees.begin(), trees.end(),
[&](const McTree & t) {
return t.tree_id == tree_id;
});
if (it == trees.end())
return false;
it->weight = w;
return true;
}
bool MulticastTable::set_epoch(std::size_t group_id, uint16_t tree_id,
uint8_t epoch) {
if (group_id >= _groups.size())
return false;
auto& trees = _groups[group_id];
auto it = std::find_if(trees.begin(), trees.end(),
[&](const McTree & t) {
return t.tree_id == tree_id;
});
if (it == trees.end())
return false;
it->epoch = epoch;
return true;
}
const std::vector<McTree> *MulticastTable::trees_of(std::size_t group_id)
const {
if (group_id >= _groups.size())
return nullptr;
return &_groups[group_id];
}
// Legacy: union of tree_id=0 ports for all set groups
std::vector<PortId> MulticastTable::get_port_list(PacketGroups groups) const {
std::vector<PortId> result;
for (std::size_t gid = 0; gid < _groups.size(); ++gid) {
const bool bit_set = (groups >> gid) & static_cast<PacketGroups>(1);
if (!bit_set)
continue;
const auto& trees = _groups[gid];
auto it = std::find_if(trees.begin(), trees.end(),
[](const McTree & t) {
return t.tree_id == 0;
});
if (it == trees.end())
continue;
if (it->child_ports.empty())
continue;
if (result.empty()) {
result = it->child_ports;
} else {
std::vector<PortId> merged;
merged.reserve(result.size() + it->child_ports.size());
std::set_union(result.begin(), result.end(),
it->child_ports.begin(), it->child_ports.end(),
std::back_inserter(merged));
result.swap(merged);
}
}
return result;
}
bool MulticastTable::insert_sorted_unique(std::vector<PortId> &vec, PortId v) {
auto it = std::lower_bound(vec.begin(), vec.end(), v);
if (it != vec.end() && *it == v)
return false;
vec.insert(it, v);
return true;
}
bool MulticastTable::erase_sorted(std::vector<PortId> &vec, PortId v) {
auto it = std::lower_bound(vec.begin(), vec.end(), v);
if (it == vec.end() || *it != v)
return false;
vec.erase(it);
return true;
}
} // namespace dofs

View File

@@ -0,0 +1,52 @@
#ifndef NETWORK_SWITCH_MULTICAST_TABLE_H
#define NETWORK_SWITCH_MULTICAST_TABLE_H
#include <cstdint>
#include <vector>
#include <optional>
#include <algorithm>
#include "core/types.h"
#include "network/packet.h"
namespace dofs {
struct McTree {
std::vector<PortId> child_ports;
std::optional<PortId> parent_port;
uint8_t weight{1};
uint8_t tier{0};
uint16_t tree_id{0};
uint8_t epoch{0};
};
class MulticastTable {
public:
MulticastTable();
bool add_tree(std::size_t group_id, uint16_t tree_id);
bool delete_tree(std::size_t group_id, uint16_t tree_id);
bool add_child_port(std::size_t group_id, uint16_t tree_id, PortId out_port);
bool delete_child_port(std::size_t group_id, uint16_t tree_id, PortId out_port);
bool set_parent(std::size_t group_id, uint16_t tree_id, PortId parent);
bool set_weight(std::size_t group_id, uint16_t tree_id, uint8_t w);
bool set_epoch(std::size_t group_id, uint16_t tree_id, uint8_t epoch);
const std::vector<McTree> *trees_of(std::size_t group_id) const;
std::size_t group_count() const noexcept;
// Legacy: return ports from tree_id=0 for each group bit
std::vector<PortId> get_port_list(PacketGroups groups) const;
private:
static bool insert_sorted_unique(std::vector<PortId> &vec, PortId v);
static bool erase_sorted(std::vector<PortId> &vec, PortId v);
std::vector<std::vector<McTree>> _groups;
};
} // namespace dofs
#endif // NETWORK_SWITCH_MULTICAST_TABLE_H

View File

@@ -0,0 +1,36 @@
#include "network/switch/routing_alg.h"
#include <cstdint>
namespace dofs {
static inline uint32_t mix32(uint32_t x) noexcept {
x ^= x >> 16;
x *= 0x7feb352dU;
x ^= x >> 15;
x *= 0x846ca68bU;
x ^= x >> 16;
return x;
}
PortId hash_ecmp(NodeId src_node, PortId src_port,
NodeId dst_node, PortId dst_port,
uint32_t differentiator,
uint16_t port_count) noexcept {
if (port_count == 0) {
log_error("ERROR", "hash_ecmp ignored: port_count == 0");
return static_cast<PortId>(0);
}
const uint32_t sip = ipv4(src_node, src_port);
const uint32_t dip = ipv4(dst_node, dst_port);
uint32_t key = sip;
key ^= mix32(dip + 0x9e3779b9U);
key ^= mix32(differentiator + 0x85ebca6bU);
const uint32_t h = mix32(key);
const uint16_t idx = static_cast<uint16_t>(h % port_count);
return static_cast<PortId>(idx);
}
} // namespace dofs

View File

@@ -0,0 +1,26 @@
#ifndef NETWORK_SWITCH_ROUTING_ALG_H
#define NETWORK_SWITCH_ROUTING_ALG_H
#include <cstdint>
#include "core/types.h"
#include "core/error.h"
namespace dofs {
PortId hash_ecmp(NodeId src_node, PortId src_port,
NodeId dst_node, PortId dst_port,
uint32_t differentiator,
uint16_t port_count) noexcept;
inline PortId hash_ecmp(NodeId src_node,
NodeId dst_node,
uint32_t differentiator,
uint16_t port_count) noexcept {
return hash_ecmp(src_node, static_cast<PortId>(0),
dst_node, static_cast<PortId>(0),
differentiator, port_count);
}
} // namespace dofs
#endif // NETWORK_SWITCH_ROUTING_ALG_H

View File

@@ -0,0 +1,16 @@
#ifndef NETWORK_SWITCH_ROUTING_TABLES_H
#define NETWORK_SWITCH_ROUTING_TABLES_H
#include "network/switch/unicast_table.h"
#include "network/switch/multicast_table.h"
namespace dofs {
struct RoutingTables {
UnicastTable unicast;
MulticastTable multicast;
};
} // namespace dofs
#endif // NETWORK_SWITCH_ROUTING_TABLES_H

View File

@@ -0,0 +1,63 @@
#include "network/switch/shared_buffer.h"
namespace dofs {
SharedBuffer::SharedBuffer(Simulator* const sim,
NetworkSwitch* const owner,
Bytes total_bytes,
uint16_t ports)
: SwitchBuffer(sim, owner, SwitchBufferType::SHARED, total_bytes, ports),
_per_port_queues(ports),
_used_total(0)
{}
std::array<std::deque<SwitchBuffer::Queued>, SwitchBuffer::PRI_COUNT>&SharedBuffer::queues_for(
PortId p) {
return _per_port_queues[p];
}
const std::array<std::deque<SwitchBuffer::Queued>, SwitchBuffer::PRI_COUNT>&SharedBuffer::queues_for(
PortId p) const {
return _per_port_queues[p];
}
bool SharedBuffer::on_enqueue_cap_check(PortId, Bytes sz) {
if (_used_total + sz > _buffer_bytes)
return false;
return true;
}
void SharedBuffer::on_enqueue_commit(PortId port, Bytes sz) {
_used_total += sz;
_per_port_bytes[port] += sz;
}
void SharedBuffer::on_dequeue_commit(PortId port, Bytes sz) {
_used_total -= sz;
_per_port_bytes[port] -= sz;
}
bool SharedBuffer::enqueue_packet(const Packet& pkt, PortId egress,
FlowPriority prio) {
if (egress >= _port_cnt)
return false;
const Bytes sz = pkt.total_size();
if (!on_enqueue_cap_check(egress, sz))
return false;
Queued q{pkt, egress, prio, _sim->now(), sz};
auto& qset = queues_for(egress);
qset[to_idx(prio)].push_back(std::move(q));
on_enqueue_commit(egress, sz);
schedule_drain_if_needed(egress);
return true;
}
bool SharedBuffer::drain_one(PortId port) {
return drain_one_common(port);
}
} // namespace dofs

View File

@@ -0,0 +1,39 @@
#ifndef NETWORK_SWITCH_SHARED_BUFFER_H
#define NETWORK_SWITCH_SHARED_BUFFER_H
#include "network/switch/switch_buffer.h"
namespace dofs {
// SharedBuffer: a single global memory pool shared by all ports.
// Only guard is the total pool (_buffer_bytes). Per-port usage tracked for stats.
class SharedBuffer : public SwitchBuffer {
public:
SharedBuffer(Simulator* const sim,
NetworkSwitch* const owner,
Bytes total_bytes,
uint16_t ports);
virtual bool enqueue_packet(const Packet& pkt, PortId egress,
FlowPriority prio) override;
virtual bool drain_one(PortId port) override;
protected:
virtual std::array<std::deque<Queued>, PRI_COUNT> &queues_for(
PortId p) override;
virtual const std::array<std::deque<Queued>, PRI_COUNT> &queues_for(
PortId p) const override;
virtual bool on_enqueue_cap_check(PortId port, Bytes sz) override;
virtual void on_enqueue_commit(PortId port, Bytes sz) override;
virtual void on_dequeue_commit(PortId port, Bytes sz) override;
private:
// Per-port, per-priority queues
std::vector<std::array<std::deque<Queued>, PRI_COUNT >> _per_port_queues;
Bytes _used_total;
};
} // namespace dofs
#endif // NETWORK_SWITCH_SHARED_BUFFER_H

View File

@@ -0,0 +1,244 @@
#include "network/switch/switch_buffer.h"
#include <cmath>
#include <algorithm>
#include <numeric>
#include <functional>
#include "network/network_switch.h"
namespace dofs {
SwitchBuffer::SwitchBuffer(Simulator* const sim,
NetworkSwitch* const owner,
SwitchBufferType t,
Bytes total_bytes,
uint16_t ports)
: _sim(sim),
_owner(owner),
_type(t),
_buffer_bytes(total_bytes),
_port_cnt(ports),
_egress_links(ports, nullptr),
_drain_scheduled(ports, false),
_per_port_bytes(ports, Bytes(0)),
_sched(ports),
_share_ctrl(30), _share_mice(50), _share_ele(20) // sane defaults
{}
Bytes SwitchBuffer::buffer_size() const noexcept {
return _buffer_bytes;
}
SwitchBufferType SwitchBuffer::type() const noexcept {
return _type;
}
uint16_t SwitchBuffer::port_cnt() const noexcept {
return _port_cnt;
}
Bytes SwitchBuffer::port_buffered(PortId p) const noexcept {
if (p >= _per_port_bytes.size())
return Bytes(0);
return _per_port_bytes[p];
}
const std::vector<Bytes> &SwitchBuffer::ports_buffered() const noexcept {
return _per_port_bytes;
}
uint8_t SwitchBuffer::share_ctrl() const noexcept {
return _share_ctrl;
}
uint8_t SwitchBuffer::share_mice() const noexcept {
return _share_mice;
}
uint8_t SwitchBuffer::share_elephant() const noexcept {
return _share_ele;
}
const Simulator *SwitchBuffer::simulator() const noexcept {
return _sim;
}
const NetworkSwitch *SwitchBuffer::owner() const noexcept {
return _owner;
}
void SwitchBuffer::set_egress_links(const std::vector<Link*> &links) {
if (links.size() != _port_cnt) {
log_error("ERROR", "SwitchBuffer::set_egress_links size mismatch");
return;
}
_egress_links = links;
}
static void normalize_triple(uint8_t &a, uint8_t &b, uint8_t &c) {
int sum = int(a) + int(b) + int(c);
if (sum == 100 || sum == 0) {
if (sum == 0) {
a = 34;
b = 33;
c = 33;
}
return;
}
// proportional normalization
double k = 100.0 / double(sum);
int na = int(std::round(a * k));
int nb = int(std::round(b * k));
int nc = 100 - na - nb;
if (nc < 0)
nc = 0;
a = uint8_t(std::clamp(na, 0, 100));
b = uint8_t(std::clamp(nb, 0, 100));
c = uint8_t(std::clamp(nc, 0, 100));
}
void SwitchBuffer::set_share_ctrl(uint8_t pct) noexcept {
_share_ctrl = pct;
normalize_triple(_share_ctrl, _share_mice, _share_ele);
}
void SwitchBuffer::set_share_mice(uint8_t pct) noexcept {
_share_mice = pct;
normalize_triple(_share_ctrl, _share_mice, _share_ele);
}
void SwitchBuffer::set_share_elephant(uint8_t pct) noexcept {
_share_ele = pct;
normalize_triple(_share_ctrl, _share_mice, _share_ele);
}
Bytes SwitchBuffer::queued_bytes_total() const noexcept {
Bytes sum(0);
for (auto b : _per_port_bytes)
sum += b;
return sum;
}
Bytes SwitchBuffer::queued_bytes_port(PortId p) const noexcept {
return port_buffered(p);
}
std::optional<Time> SwitchBuffer::try_reserve_and_send(PortId port, Queued& q) {
if (port >= _egress_links.size() || _egress_links[port] == nullptr) {
log_error("ERROR", "SwitchBuffer: egress link missing on port");
return std::nullopt;
}
Link* link = _egress_links[port];
auto r = link->reserve(q.size_bytes, _owner->id());
if (!r.has_value()) {
return std::nullopt; // link down or invalid sender
}
const Time now = _sim->now();
const Time to_finish = (r->finish > now) ? r->finish.unsafe_sub(now) : Time(0);
const Time deliver_after = to_finish + link->propagation_latency();
link->schedule_delivery_after(q.pkt, _owner->id(), deliver_after);
return r->finish;
}
void SwitchBuffer::schedule_drain_if_needed(PortId port) {
if (port >= _port_cnt)
return;
if (_drain_scheduled[port])
return;
if (queued_bytes_port(port) == Bytes(0))
return;
Link* link = _egress_links[port];
if (!link)
return;
const Time now = _sim->now();
const Time na = link->next_available(_owner->id());
const Time delay = (na > now) ? na.unsafe_sub(now) : Time(0);
_drain_scheduled[port] = true;
_sim->schedule_after(delay,
std::mem_fn(&SwitchBuffer::drain_once), this, port);
}
void SwitchBuffer::drain_once(PortId port) {
_drain_scheduled[port] = false;
// Try to send one packet. If we made progress, drain_one_common()
// already scheduled the next drain at the right time.
if (drain_one_common(port)) {
return;
}
// No progress: either queues empty or link not reservable now.
// If queues are still non-empty, retry later at next-available.
auto& qs = queues_for(port);
bool any = !qs[PRI_CTRL].empty() || !qs[PRI_MICE].empty() ||
!qs[PRI_ELE].empty();
if (any) {
schedule_drain_if_needed(port);
}
}
bool SwitchBuffer::drain_one_common(PortId port) {
if (port >= _port_cnt)
return false;
auto& queues = queues_for(port);
auto& sched = _sched[port];
auto head_size = [&](int pri) -> Bytes {
if (queues[pri].empty())
return Bytes(0);
return queues[pri].front().size_bytes;
};
sched.deficit_bytes[PRI_CTRL] += int64_t(_share_ctrl) * QUANTUM_UNIT;
sched.deficit_bytes[PRI_MICE] += int64_t(_share_mice) * QUANTUM_UNIT;
sched.deficit_bytes[PRI_ELE] += int64_t(_share_ele) * QUANTUM_UNIT;
for (int k = 0; k < PRI_COUNT; ++k) {
int pri = (sched.next_pick + k) % PRI_COUNT;
Bytes sz = head_size(pri);
if (sz > Bytes(0) && sched.deficit_bytes[pri] >= int64_t(sz)) {
auto& q = queues[pri].front();
auto finish_opt = try_reserve_and_send(port, q);
if (finish_opt.has_value()) {
queues[pri].pop_front();
sched.deficit_bytes[pri] -= int64_t(sz);
on_dequeue_commit(port, sz);
sched.next_pick = (pri + 1) % PRI_COUNT;
Link* link = _egress_links[port];
const Time na = link->next_available(_owner->id());
const Time now = _sim->now();
const Time delay = (na > now) ? na.unsafe_sub(now) : Time(0);
_drain_scheduled[port] = true;
_sim->schedule_after(delay,
std::mem_fn(&SwitchBuffer::drain_once), this, port);
return true;
} else {
// Could not reserve now (link down). Stop early
return false;
}
}
}
sched.next_pick = (sched.next_pick + 1) % PRI_COUNT;
return false;
}
} // namespace dofs

View File

@@ -0,0 +1,162 @@
#ifndef NETWORK_SWITCH_SWITCH_BUFFER_H
#define NETWORK_SWITCH_SWITCH_BUFFER_H
#include <cstdint>
#include <vector>
#include <deque>
#include <optional>
#include "core/time.h"
#include "core/types.h"
#include "core/simulator.h"
#include "core/error.h"
#include "network/packet.h"
#include "network/link.h"
namespace dofs {
class NetworkSwitch;
// Base class for switch egress buffering with weighted priorities.
// Implementations: SharedBuffer, DedicatedBuffer.
class SwitchBuffer {
public:
struct Queued {
Packet pkt;
PortId egress;
FlowPriority prio;
Time enq_time;
Bytes size_bytes;
};
virtual ~SwitchBuffer() = default;
// --- API: enqueue & drain ---
// NOTE: ECN/drop decisions are expected to be done *before* enqueue;
// we still guard and return false on capacity violations.
virtual bool enqueue_packet(const Packet& pkt,
PortId egress,
FlowPriority prio) = 0;
// Attempt to send exactly one packet on 'port' using weighted priorities.
// Returns true if a packet was reserved and scheduled.
virtual bool drain_one(PortId port) = 0;
// --- Accessors (implemented in .cc) ---
Bytes buffer_size() const noexcept;
SwitchBufferType type() const noexcept;
uint16_t port_cnt() const noexcept;
Bytes port_buffered(PortId p) const noexcept;
const std::vector<Bytes> &ports_buffered() const noexcept;
// Priority shares: percentages [0..100]. Sum must be 100.
uint8_t share_ctrl() const noexcept;
uint8_t share_mice() const noexcept;
uint8_t share_elephant() const noexcept;
// --- Mutators for priority shares (implemented in .cc) ---
// If the 3 shares don't sum to 100, we normalize proportionally.
void set_share_ctrl(uint8_t pct) noexcept;
void set_share_mice(uint8_t pct) noexcept;
void set_share_elephant(uint8_t pct) noexcept;
// Owner & env
const Simulator *simulator() const noexcept;
const NetworkSwitch *owner() const noexcept;
// Link wiring (egress side). Size must equal port_cnt().
void set_egress_links(const std::vector<Link*> &links);
protected:
SwitchBuffer(Simulator* const sim,
NetworkSwitch* const owner,
SwitchBufferType t,
Bytes total_bytes,
uint16_t ports);
// Per-priority index and helpers
static constexpr int PRI_COUNT = 3;
static constexpr int PRI_CTRL = 0;
static constexpr int PRI_MICE = 1;
static constexpr int PRI_ELE = 2;
static int to_idx(FlowPriority p) noexcept {
switch (p) {
case FlowPriority::CTRL:
return PRI_CTRL;
case FlowPriority::MICE:
return PRI_MICE;
case FlowPriority::ELEPHANT:
return PRI_ELE;
}
return PRI_ELE; // defensive
}
// Weighted Deficit Round Robin across priorities (per port):
// Maintain byte-deficits per prio; each drain attempt adds quantum
// proportional to the configured share. Pick first prio whose head fits.
struct PerPortSched {
std::array<int64_t, PRI_COUNT> deficit_bytes {0, 0, 0};
int next_pick = 0; // rotating pointer among priorities
};
// Quantum baseline (bytes) per "share unit". Calibrated to MSS-ish.
// Effective quantum = share_pct * QUANTUM_UNIT.
static constexpr int64_t QUANTUM_UNIT = 128; // bytes per % share
// For derived classes
bool drain_one_common(PortId port);
// Per-port self-scheduling
void schedule_drain_if_needed(PortId port);
void drain_once(PortId port); // event-loop callback
// Derived classes must expose queues and capacity guards:
// Access the 3 per-priority queues for a port.
virtual std::array<std::deque<Queued>, PRI_COUNT> &queues_for(PortId p) = 0;
virtual const std::array<std::deque<Queued>, PRI_COUNT> &queues_for(
PortId p) const = 0;
// Capacity checks and accounting; return false if not enough space.
virtual bool on_enqueue_cap_check(PortId port, Bytes sz) = 0;
virtual void on_enqueue_commit(PortId port, Bytes sz) = 0;
virtual void on_dequeue_commit(PortId port, Bytes sz) = 0;
// Attempt reservation+schedule on the egress link for (port, packet).
// Returns the reservation finish time if successful (for scheduling the next drain).
std::optional<Time> try_reserve_and_send(PortId port, Queued &q);
// Helpers
Bytes queued_bytes_total() const noexcept;
Bytes queued_bytes_port(PortId p) const noexcept;
protected:
Simulator *const _sim;
NetworkSwitch *const _owner;
SwitchBufferType _type;
Bytes _buffer_bytes;
uint16_t _port_cnt;
// Egress link per port (owned by switch; not by buffer)
std::vector<Link *> _egress_links;
std::vector<bool> _drain_scheduled;
// Per-port accounting and sched state
std::vector<Bytes> _per_port_bytes;
std::vector<PerPortSched> _sched; // per-port deficits
// Priority shares (percentages). Maintained to sum to 100.
uint8_t _share_ctrl;
uint8_t _share_mice;
uint8_t _share_ele;
};
} // namespace dofs
#endif // NETWORK_SWITCH_SWITCH_BUFFER_H

View File

@@ -0,0 +1,54 @@
#include "network/switch/unicast_table.h"
namespace dofs {
std::vector<PortId> UnicastTable::get_port_list(NodeId dst_node,
PortId dst_port) const {
const IPv4Addr k = ipv4(dst_node, dst_port);
auto it = _table.find(k);
if (it == _table.end())
return {};
return it->second;
}
bool UnicastTable::add_entry(NodeId dst_node, PortId dst_port,
PortId out_port) {
const IPv4Addr k = ipv4(dst_node, dst_port);
auto& vec = _table[k];
return insert_sorted_unique(vec, out_port);
}
bool UnicastTable::delete_entry(NodeId dst_node, PortId dst_port,
PortId out_port) {
const IPv4Addr k = ipv4(dst_node, dst_port);
auto it = _table.find(k);
if (it == _table.end())
return false;
return erase_sorted(it->second, out_port);
}
bool UnicastTable::insert_sorted_unique(std::vector<PortId> &vec, PortId v) {
auto it = std::lower_bound(vec.begin(), vec.end(), v);
if (it != vec.end() && *it == v)
return false;
vec.insert(it, v);
return true;
}
bool UnicastTable::erase_sorted(std::vector<PortId> &vec, PortId v) {
auto it = std::lower_bound(vec.begin(), vec.end(), v);
if (it == vec.end() || *it != v)
return false;
vec.erase(it);
return true;
}
} // namespace dofs

View File

@@ -0,0 +1,36 @@
#ifndef NETWORK_SWITCH_UNICAST_TABLE_H
#define NETWORK_SWITCH_UNICAST_TABLE_H
#include <cstdint>
#include <unordered_map>
#include <vector>
#include <algorithm>
#include "core/types.h"
namespace dofs {
class UnicastTable {
public:
UnicastTable() = default;
std::vector<PortId> get_port_list(NodeId dst_node, PortId dst_port) const;
bool add_entry(NodeId dst_node, PortId dst_port, PortId out_port);
bool delete_entry(NodeId dst_node, PortId dst_port, PortId out_port);
private:
static constexpr uint32_t kPortShiftBits = 8;
static bool insert_sorted_unique(std::vector<PortId> &vec, PortId v);
static bool erase_sorted(std::vector<PortId> &vec, PortId v);
private:
std::unordered_map<IPv4Addr, std::vector<PortId>> _table;
};
} // namespace dofs
#endif // NETWORK_SWITCH_UNICAST_TABLE_H