commit 0a73580a80faaebfd80fae1c0b41a955df95672e Author: Peisong Xiao Date: Thu Jan 1 05:17:56 2026 -0500 [UNTESTED] Basic queue and fan-in fabric implementation without full template generalization diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2fe0b45 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +build/ +compile_commands.json +.cache/ diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..aff3d42 --- /dev/null +++ b/Makefile @@ -0,0 +1,70 @@ +LIB_NAME ?= weaver +BUILD_DIR ?= build +SRC_DIR ?= src +INCLUDE_DIRS := include src/include + +MODE ?= debug + +RELEASE_OPT ?= 2 +CLINE_SIZE ?= 64 + +CXX ?= clang++ +AR ?= ar +ARFLAGS := rcs + +SRCS := $(shell find $(SRC_DIR) -name '*.cc') +OBJS := $(patsubst $(SRC_DIR)/%.cc,$(BUILD_DIR)/%.o,$(SRCS)) +DEPS := $(OBJS:.o=.d) + +INCLUDES := $(addprefix -I,$(INCLUDE_DIRS)) + +WARN_FLAGS := \ + -Wall \ + -Wextra \ + -Wpedantic \ + -Werror + +COMMON_CXXFLAGS := \ + -std=c++20 \ + -MMD -MP \ + -flto \ + --param=destructive-interference-size=$(CLINE_SIZE) \ + $(WARN_FLAGS) \ + $(INCLUDES) + +DEBUG_CXXFLAGS := \ + -O0 \ + -g \ + -fsanitize=thread \ + -fno-omit-frame-pointer + +RELEASE_CXXFLAGS := \ + -O$(RELEASE_OPT) \ + -DNDEBUG + +ifeq ($(MODE),debug) + CXXFLAGS := $(COMMON_CXXFLAGS) $(DEBUG_CXXFLAGS) +else ifeq ($(MODE),release) + CXXFLAGS := $(COMMON_CXXFLAGS) $(RELEASE_CXXFLAGS) +else + $(error Unknown MODE '$(MODE)'; use MODE=debug or MODE=release) +endif + +LIB := $(BUILD_DIR)/lib$(LIB_NAME).a + +.PHONY: all clean + +all: $(LIB) + +$(LIB): $(OBJS) + @mkdir -p $(dir $@/bin) + $(AR) $(ARFLAGS) $@/bin $^ + +$(BUILD_DIR)/%.o: $(SRC_DIR)/%.cc + @mkdir -p $(dir $@) + $(CXX) $(CXXFLAGS) -c $< -o $@ + +-include $(DEPS) + +clean: + rm -rf $(BUILD_DIR) diff --git a/include/weaver/atomic.h b/include/weaver/atomic.h new file mode 100644 index 0000000..1434dc7 --- /dev/null +++ b/include/weaver/atomic.h @@ -0,0 +1,88 @@ +#ifndef THWEAVER_ATOMIC_H +#define THWEAVER_ATOMIC_H + +#include +#include +#include +#include + +template +concept Additive = std::is_integral_v && !std::is_same_v; + +template +concept AtomicAdditive = +requires(A a, T v) { + { + a.load() + } + -> std::same_as; + { + a.store(v) + }; + { + a.fetch_add(v) + } + -> std::same_as; + { + a.fetch_sub(v) + } + -> std::same_as; +}; + +template +concept Bitwise = + (std::is_integral_v || std::is_enum_v) && + !std::is_same_v; + +template +concept AtomicBitwise = +requires(A a, T v) { + { + a.load() + } + -> std::same_as; + { + a.store(v) + }; + { + a.fetch_or(v) + } + -> std::same_as; + { + a.fetch_and(v) + } + -> std::same_as; + { + a.fetch_xor(v) + } + -> std::same_as; +}; + +// Uses the full bitmap +template +requires Bitwise && AtomicBitwise && +Additive && std::is_unsigned_v +class AtomicBitmap { +public: + static_assert(NumBits <= std::numeric_limits::digits); + + AtomicBitmap() noexcept + : bits(T {}), rotation(0) {} + + void reset(const T &bits, const Index rotation) noexcept; + + bool get_bit(const Index n) const noexcept; + void set_bit(const Index n) noexcept; + void clear_bit(const Index n) noexcept; + + void rotate(const Index n) noexcept; + Index get_rotation() const noexcept; + + Index get_low_bit() const noexcept; + Index rotate_to_low_bit(const Index offset = 0) noexcept; +private: + Atomic bits; + Index rotation; +}; + +#endif diff --git a/include/weaver/bitwise.h b/include/weaver/bitwise.h new file mode 100644 index 0000000..8a0325b --- /dev/null +++ b/include/weaver/bitwise.h @@ -0,0 +1,35 @@ +#ifndef THWEAVER_BITWISE_H +#define THWEAVER_BITWISE_H + +#include + +template +concept Bitwise = +requires(T a, T b) { + { + a & b + } + -> std::same_as; + { + a | b + } + -> std::same_as; + { + a ^ b + } + -> std::same_as; + { + ~a + } + -> std::same_as; + { + a << 1 + } + -> std::same_as; + { + a >> 1 + } + -> std::same_as; +}; + +#endif diff --git a/include/weaver/fabric.h b/include/weaver/fabric.h new file mode 100644 index 0000000..648bcd8 --- /dev/null +++ b/include/weaver/fabric.h @@ -0,0 +1,114 @@ +#ifndef THWEAVER_FABRIC_H +#define THWEAVER_FABRIC_H + +#include +#include +#include +#include +#include +#include + +namespace THWeaver { + +template class EndpointQueue { +public: + static constexpr size_t CLS = + std::hardware_destructive_interference_size; + static constexpr std::size_t QSIZE = 1 << qsize_expt; + static constexpr std::size_t QSIZE_MASK = QSIZE - 1; + + EndpointQueue(const EndpointQueue &) = delete; + EndpointQueue & + operator=(const EndpointQueue &) = delete; + EndpointQueue(EndpointQueue &&) = delete; + EndpointQueue & + operator=(EndpointQueue &&) = delete; + + void init() noexcept; + void flush() noexcept; + + std::size_t size() const noexcept; + bool is_empty() const noexcept; + bool is_full() const noexcept; + + std::size_t get_head() const noexcept; + std::size_t get_tail() const noexcept; + + EndpointQueueSendResult send(MessageType &&message) noexcept; + EndpointQueueRecvResult recv(MessageType &buffer) noexcept; + void recv_unsafe(MessageType &buffer, std::size_t old_head) noexcept; + +private: + alignas(CLS) std::array buffer; + alignas(CLS) std::atomic head; + alignas(CLS) std::atomic tail; +}; + +template +class FanInFabric { +public: + using BitmapType = uint64_t; + + static constexpr size_t CLS = + std::hardware_destructive_interference_size; + static constexpr std::size_t QSIZE = 1 << qsize_expt; + static constexpr std::size_t QSIZE_MASK = QSIZE - 1; + static constexpr std::size_t BITMAP_SIZE = sizeof(BitmapType) * 8; + static constexpr BitmapType THREAD_BITMAP_MASK = + IN_THREAD_CNT == BITMAP_SIZE + ? ~BitmapType(0) + : (BitmapType(1) << IN_THREAD_CNT) - 1; + + static_assert(IN_THREAD_CNT <= BITMAP_SIZE, "Producer limit exceeded"); + + FanInFabric(const FanInFabric + &) = delete; + FanInFabric & + operator=(const FanInFabric &) = + delete; + FanInFabric(FanInFabric &&) = + delete; + FanInFabric & + operator=(FanInFabric &&) = + delete; + + void init() noexcept; + template void flush() noexcept; + void flush_all() noexcept; + + template + FanInFabricSendResult send(MessageType &&message) noexcept; + + FanInFabricRecvBitmapResult + recv_bitmap_only(MessageType &buffer) noexcept; + FanInFabricRecvTryResult recv_try(MessageType &buffer) noexcept; + FanInFabricRecvAggrResult recv_aggr(MessageType &buffer) noexcept; + + void full_scan(BitmapType &bitmap) noexcept; + + template std::size_t size() const noexcept; + template bool is_empty() const noexcept; + template bool is_full() const noexcept; + +private: + std::array, IN_THREAD_CNT> + in_queues; + + alignas(CLS) std::atomic hint; + struct alignas(CLS) DiscoveryList { + std::array next; + std::array prev; + uint8_t first; + uint8_t last; + void init() noexcept; + void move_first_to_last() noexcept; + void move_to_last(uint8_t index) noexcept; + } dl; + std::size_t rotation; + BitmapType rotate(BitmapType bitmap) const noexcept; +}; + +} // namespace THWeaver + +#endif diff --git a/include/weaver/results.h b/include/weaver/results.h new file mode 100644 index 0000000..4b9fd2c --- /dev/null +++ b/include/weaver/results.h @@ -0,0 +1,21 @@ +#ifndef THWEAVER_RESULTS_H +#define THWEAVER_RESULTS_H + +#include +namespace THWeaver { + +enum class EndpointQueueSendResult : uint32_t { Ok = 0, ErrFull = 1 }; + +enum class EndpointQueueRecvResult { Ok, ErrEmpty }; + +enum class FanInFabricSendResult : uint32_t { Ok = 0, ErrFull = 1 }; + +enum class FanInFabricRecvBitmapResult { Ok, ErrBitmapEmpty, ErrBitmapFailed }; + +enum class FanInFabricRecvTryResult { Ok, ErrTryFailed }; + +enum class FanInFabricRecvAggrResult { Ok, ErrAggrFailed }; + +} // namespace THWeaver + +#endif diff --git a/src/fan_in.cc b/src/fan_in.cc new file mode 100644 index 0000000..6b99ec0 --- /dev/null +++ b/src/fan_in.cc @@ -0,0 +1,267 @@ +#include +#include +#include +#include +#include +#include + +namespace THWeaver { + +template +void FanInFabric::init() noexcept { + for (int i = 0; i < IN_THREAD_CNT; ++i) + in_queues[i].init(); + hint.store(0); + dl.init(); + rotation = 0; +} + +template +template +void FanInFabric::flush() noexcept { + static_assert(thread_id < IN_THREAD_CNT, "Thread ID out of bounds"); + + in_queues[thread_id].flush(); + hint.fetch_and(~(1 << thread_id), std::memory_order_relaxed); +} + +template +void FanInFabric::flush_all() noexcept { + for (int i = 0; i < IN_THREAD_CNT; ++i) + in_queues[i].flush(); + hint.store(0, std::memory_order_relaxed); + dl.init(); + rotation = 0; +} + +template +template +FanInFabricSendResult FanInFabric::send( + MessageType &&message) noexcept { + static_assert(thread_id < IN_THREAD_CNT, "Thread ID out of bounds"); + + return static_cast( + static_cast(in_queues[thread_id].send(message))); +} + +template +template +std::size_t +FanInFabric::size() const noexcept { + static_assert(thread_id < IN_THREAD_CNT, "Thread ID out of bounds"); + + return in_queues[thread_id].size(); +} + +template +template +bool FanInFabric::is_empty() + const noexcept { + static_assert(thread_id < IN_THREAD_CNT, "Thread ID out of bounds"); + + return in_queues[thread_id].is_empty(); +} + +template +template +bool FanInFabric::is_full() + const noexcept { + static_assert(thread_id < IN_THREAD_CNT, "Thread ID out of bounds"); + + return in_queues[thread_id].is_full(); +} + +template +FanInFabricRecvBitmapResult +FanInFabric::recv_bitmap_only( + MessageType &buffer) noexcept { + BitmapType bitmap = hint.load(std::memory_order_relaxed); + + if (!bitmap) + return FanInFabricRecvBitmapResult::ErrBitmapEmpty; + + bitmap = rotate(bitmap); + int hinted_id = (std::countr_zero(bitmap) + rotation) % IN_THREAD_CNT; + rotation = (hinted_id + 1) % IN_THREAD_CNT; + + std::size_t head = in_queues[hinted_id].get_head(); + std::size_t tail = in_queues[hinted_id].get_tail(); + std::size_t qsize = ((tail + QSIZE_MASK) - head) & QSIZE_MASK; + + if (qsize <= 1) + hint.fetch_add(~(1 << hinted_id), std::memory_order_relaxed); + + if (qsize) { + in_queues[hinted_id].recv_unsafe(buffer, head); + dl.move_to_last(hinted_id); + return FanInFabricRecvBitmapResult::Ok; + } + + return FanInFabricRecvBitmapResult::ErrBitmapFailed; +} + +template +FanInFabricRecvTryResult +FanInFabric::recv_try( + MessageType &buffer) noexcept { + BitmapType bitmap = hint.load(std::memory_order_relaxed); + + if (bitmap) { + bitmap = rotate(bitmap); + int hinted_id = + (std::countr_zero(bitmap) + rotation) % IN_THREAD_CNT; + rotation = (hinted_id + 1) % IN_THREAD_CNT; + + std::size_t head = in_queues[hinted_id].get_head(); + std::size_t tail = in_queues[hinted_id].get_tail(); + std::size_t qsize = ((tail + QSIZE_MASK) - head) & QSIZE_MASK; + + if (qsize <= 1) + hint.fetch_add(~(1 << hinted_id), + std::memory_order_relaxed); + + if (qsize) { + in_queues[hinted_id].recv_unsafe(buffer, head); + dl.move_to_last(hinted_id); + return FanInFabricRecvTryResult::Ok; + } + } + + std::size_t head = in_queues[dl.first].get_head(); + std::size_t tail = in_queues[dl.first].get_tail(); + + if (head == tail) + return FanInFabricRecvTryResult::ErrTryFailed; + + in_queues[dl.first].recv_unsafe(buffer, head); + dl.move_first_to_last(); + return FanInFabricRecvTryResult::Ok; +} + +template +FanInFabricRecvAggrResult +FanInFabric::recv_aggr( + MessageType &buffer) noexcept { + BitmapType bitmap = hint.load(std::memory_order_relaxed); + + if (bitmap) { + bitmap = rotate(bitmap); + int hinted_id = + (std::countr_zero(bitmap) + rotation) % IN_THREAD_CNT; + rotation = (hinted_id + 1) % IN_THREAD_CNT; + + std::size_t head = in_queues[hinted_id].get_head(); + std::size_t tail = in_queues[hinted_id].get_tail(); + std::size_t qsize = ((tail + QSIZE_MASK) - head) & QSIZE_MASK; + + if (qsize <= 1) + hint.fetch_add(~(1 << hinted_id), + std::memory_order_relaxed); + + if (qsize) { + in_queues[hinted_id].recv_unsafe(buffer, head); + dl.move_to_last(hinted_id); + return FanInFabricRecvAggrResult::Ok; + } + } + + uint8_t last = dl.last; + + do { + std::size_t head = in_queues[dl.first].get_head(); + std::size_t tail = in_queues[dl.first].get_tail(); + dl.move_first_to_last(); + + if (head != tail) { + in_queues[dl.first].recv_unsafe(buffer, head); + return FanInFabricRecvAggrResult::Ok; + } + } while (dl.last != last); + + return FanInFabricRecvAggrResult::ErrAggrFailed; +} + +template +void FanInFabric::full_scan( + BitmapType &bitmap) noexcept { + bitmap = 0; + + uint8_t curr = dl.first; + + if (in_queues[curr].size()) + bitmap |= 1 << curr; + + do { + curr = dl.next[curr]; + if (in_queues[curr].size()) + bitmap |= 1 << curr; + } while (curr != dl.last); + + hint.fetch_or(bitmap, std::memory_order_release); +} + +template +void FanInFabric::DiscoveryList::init() noexcept { + for (int i = 0; i < IN_THREAD_CNT; ++i) { + next[i] = (i + 1) % IN_THREAD_CNT; + prev[i] = (i + IN_THREAD_CNT - 1) % IN_THREAD_CNT; + } + first = 0; + last = IN_THREAD_CNT - 1; +} + +template +void FanInFabric::DiscoveryList::move_first_to_last() noexcept { + prev[first] = last; + next[last] = first; + last = next[last]; + first = next[last]; +} + +template +void FanInFabric::DiscoveryList:: + move_to_last(uint8_t index) noexcept { + if (index == last) + return; + else if (index == first) { + move_first_to_last(); + } else { + prev[next[index]] = prev[index]; + next[prev[index]] = next[index]; + next[last] = index; + prev[index] = last; + last = index; + } +} + +template +FanInFabric::BitmapType +FanInFabric::rotate( + BitmapType bitmap) const noexcept { + bitmap &= THREAD_BITMAP_MASK; + + if (rotation == 0) + return bitmap; + + return ((bitmap >> rotation) | (bitmap << (IN_THREAD_CNT - rotation))) & + THREAD_BITMAP_MASK; +} + +} // namespace THWeaver diff --git a/src/queue.cc b/src/queue.cc new file mode 100644 index 0000000..025988d --- /dev/null +++ b/src/queue.cc @@ -0,0 +1,89 @@ +#include +#include + +#include +#include + +namespace THWeaver { + +template +void EndpointQueue::init() noexcept { + head.store(0); + tail.store(0); +} + +template +void EndpointQueue::flush() noexcept { + head.store(0); + tail.store(0); +} + +template +EndpointQueueSendResult +EndpointQueue::send(MessageType &&message) noexcept { + std::size_t h = head.load(std::memory_order_acquire); + std::size_t t = tail.load(std::memory_order_acquire); + + if (h == ((t + 1) & QSIZE_MASK)) + return EndpointQueueSendResult::ErrFull; + + buffer[t] = std::move(message); + + t = (t + 1) & QSIZE_MASK; + tail.store(t); + + return EndpointQueueSendResult::Ok; +} + +template +EndpointQueueRecvResult +EndpointQueue::recv(MessageType &buffer) noexcept { + std::size_t h = head.load(); + std::size_t t = tail.load(); + + if (h == t) + return EndpointQueueRecvResult::ErrEmpty; + + buffer = std::move(this->buffer[h]); + + h = (h + 1) & QSIZE_MASK; + head.store(h); + + return EndpointQueueRecvResult::Ok; +} + +template +void EndpointQueue::recv_unsafe( + MessageType &buffer, std::size_t old_head) noexcept { + + buffer = std::move(this->buffer[old_head]); + + head.store((old_head + 1) & QSIZE_MASK); +} + +template +std::size_t EndpointQueue::size() const noexcept { + return ((tail.load() + QSIZE_MASK) - head.load()) & QSIZE_MASK; +} + +template +bool EndpointQueue::is_empty() const noexcept { + return head.load() == tail.load(); +} + +template +bool EndpointQueue::is_full() const noexcept { + return head.load() != ((tail.load() + 1) & QSIZE_MASK); +} + +template +std::size_t EndpointQueue::get_head() const noexcept { + return head.load(); +} + +template +std::size_t EndpointQueue::get_tail() const noexcept { + return tail.load(); +} + +} // namespace THWeaver