diff --git a/Makefile b/Makefile index aff3d42..414ed01 100644 --- a/Makefile +++ b/Makefile @@ -1,22 +1,23 @@ -LIB_NAME ?= weaver BUILD_DIR ?= build -SRC_DIR ?= src -INCLUDE_DIRS := include src/include +DEPS_DIR ?= $(BUILD_DIR)/deps +TEST_DIR ?= tests +INCLUDE_DIR := include MODE ?= debug -RELEASE_OPT ?= 2 +RELEASE_OPT ?= 3 CLINE_SIZE ?= 64 CXX ?= clang++ AR ?= ar ARFLAGS := rcs -SRCS := $(shell find $(SRC_DIR) -name '*.cc') -OBJS := $(patsubst $(SRC_DIR)/%.cc,$(BUILD_DIR)/%.o,$(SRCS)) -DEPS := $(OBJS:.o=.d) +TESTS := $(shell find $(TEST_DIR) -name '*.cc') -INCLUDES := $(addprefix -I,$(INCLUDE_DIRS)) +TEST_BINS := $(patsubst $(TEST_DIR)/%.cc,$(BUILD_DIR)/tests/%,$(TESTS)) +TEST_DEPS := $(patsubst $(TEST_DIR)/%.cc,$(DEPS_DIR)/tests/%.d,$(TESTS)) + +INCLUDES := $(addprefix -I,$(INCLUDE_DIR)) WARN_FLAGS := \ -Wall \ @@ -26,7 +27,6 @@ WARN_FLAGS := \ COMMON_CXXFLAGS := \ -std=c++20 \ - -MMD -MP \ -flto \ --param=destructive-interference-size=$(CLINE_SIZE) \ $(WARN_FLAGS) \ @@ -50,21 +50,17 @@ else $(error Unknown MODE '$(MODE)'; use MODE=debug or MODE=release) endif -LIB := $(BUILD_DIR)/lib$(LIB_NAME).a +.PHONY: all clean tests -.PHONY: all clean +all: tests +tests: $(TEST_BINS) -all: $(LIB) - -$(LIB): $(OBJS) - @mkdir -p $(dir $@/bin) - $(AR) $(ARFLAGS) $@/bin $^ - -$(BUILD_DIR)/%.o: $(SRC_DIR)/%.cc +$(BUILD_DIR)/tests/%: $(TEST_DIR)/%.cc @mkdir -p $(dir $@) - $(CXX) $(CXXFLAGS) -c $< -o $@ - --include $(DEPS) + @mkdir -p $(DEPS_DIR)/tests/$(dir $*) + $(CXX) $(CXXFLAGS) -MMD -MP -MF $(DEPS_DIR)/tests/$*.d $< -o $@ clean: rm -rf $(BUILD_DIR) + +-include $(TEST_DEPS) diff --git a/include/weaver.h b/include/weaver.h new file mode 100644 index 0000000..41fa753 --- /dev/null +++ b/include/weaver.h @@ -0,0 +1,365 @@ +#ifndef THWEAVER_FABRIC_H +#define THWEAVER_FABRIC_H + +#include +#include +#include +#include +#include + +namespace THWeaver { + +static constexpr size_t CLS = std::hardware_destructive_interference_size; + +enum class EndpointQueueSendResult : uint32_t { Ok = 0, ErrFull = 1 }; + +enum class EndpointQueueRecvResult { Ok, ErrEmpty }; + +enum class FanInFabricSendResult : uint32_t { Ok = 0, ErrFull = 1 }; + +enum class FanInFabricRecvBitmapResult { Ok, ErrBitmapEmpty, ErrBitmapFailed }; + +enum class FanInFabricRecvTryResult { Ok, ErrTryFailed }; + +enum class FanInFabricRecvAggrResult { Ok, ErrAggrFailed }; + +template class EndpointQueue { +public: + static constexpr std::size_t QSIZE = 1 << qsize_expt; + static constexpr std::size_t QSIZE_MASK = QSIZE - 1; + + EndpointQueue() noexcept {}; + EndpointQueue(const EndpointQueue &) = delete; + EndpointQueue & + operator=(const EndpointQueue &) = delete; + EndpointQueue(EndpointQueue &&) = delete; + EndpointQueue & + operator=(EndpointQueue &&) = delete; + + void init() noexcept { + head.store(0); + tail.store(0); + } + + void flush() noexcept { + head.store(0); + tail.store(0); + } + + std::size_t size() const noexcept { + return ((tail.load() + QSIZE_MASK) - head.load()) & QSIZE_MASK; + } + + bool is_empty() const noexcept { return head.load() == tail.load(); } + bool is_full() const noexcept { + return head.load() == ((tail.load() + 1) & QSIZE_MASK); + } + + std::size_t get_head() const noexcept { return head.load(); } + std::size_t get_tail() const noexcept { return tail.load(); } + + EndpointQueueSendResult send(MessageType &&message) noexcept { + std::size_t h = head.load(); + std::size_t t = tail.load(); + + if (h == ((t + 1) & QSIZE_MASK)) + return EndpointQueueSendResult::ErrFull; + + buffer[t] = std::move(message); + + t = (t + 1) & QSIZE_MASK; + tail.store(t); + + return EndpointQueueSendResult::Ok; + } + + EndpointQueueRecvResult recv(MessageType &buffer) noexcept { + std::size_t h = head.load(); + std::size_t t = tail.load(); + + if (h == t) + return EndpointQueueRecvResult::ErrEmpty; + + buffer = std::move(this->buffer[h]); + + h = (h + 1) & QSIZE_MASK; + head.store(h); + + return EndpointQueueRecvResult::Ok; + } + + void recv_unsafe(MessageType &buffer, std::size_t old_head) noexcept { + buffer = std::move(this->buffer[old_head]); + + head.store((old_head + 1) & QSIZE_MASK); + } + +private: + alignas(CLS) std::array buffer; + alignas(CLS) std::atomic head; + alignas(CLS) std::atomic tail; +}; + +template +class FanInFabric { +public: + using BitmapType = uint64_t; + + static constexpr std::size_t QSIZE = 1 << qsize_expt; + static constexpr std::size_t QSIZE_MASK = QSIZE - 1; + static constexpr std::size_t BITMAP_SIZE = sizeof(BitmapType) * 8; + static constexpr BitmapType THREAD_BITMAP_MASK = + IN_THREAD_CNT == BITMAP_SIZE + ? ~BitmapType(0) + : (BitmapType(1) << IN_THREAD_CNT) - 1; + + static_assert(IN_THREAD_CNT <= BITMAP_SIZE, "Producer limit exceeded"); + + FanInFabric() noexcept {} + FanInFabric(const FanInFabric + &) = delete; + FanInFabric & + operator=(const FanInFabric &) = + delete; + FanInFabric(FanInFabric &&) = + delete; + FanInFabric & + operator=(FanInFabric &&) = + delete; + + void init() noexcept { + for (int i = 0; i < IN_THREAD_CNT; ++i) + in_queues[i].init(); + hint.store(0); + dl.init(); + rotation = 0; + } + + template void flush() noexcept { + static_assert(thread_id < IN_THREAD_CNT, + "Thread ID out of bounds"); + + in_queues[thread_id].flush(); + hint.fetch_and(~(1 << thread_id), std::memory_order_relaxed); + } + + void flush_all() noexcept { + for (int i = 0; i < IN_THREAD_CNT; ++i) + in_queues[i].flush(); + hint.store(0, std::memory_order_relaxed); + dl.init(); + rotation = 0; + } + + template + FanInFabricSendResult send(MessageType &&message) noexcept { + static_assert(thread_id < IN_THREAD_CNT, + "Thread ID out of bounds"); + + return static_cast(static_cast( + in_queues[thread_id].send(message))); + } + + FanInFabricRecvBitmapResult + recv_bitmap_only(MessageType &buffer) noexcept { + BitmapType bitmap = hint.load(std::memory_order_relaxed); + + if (!bitmap) + return FanInFabricRecvBitmapResult::ErrBitmapEmpty; + + bitmap = rotate(bitmap); + int hinted_id = + (std::countr_zero(bitmap) + rotation) % IN_THREAD_CNT; + rotation = (hinted_id + 1) % IN_THREAD_CNT; + + std::size_t head = in_queues[hinted_id].get_head(); + std::size_t tail = in_queues[hinted_id].get_tail(); + std::size_t qsize = ((tail + QSIZE_MASK) - head) & QSIZE_MASK; + + if (qsize <= 1) + hint.fetch_and(~(1 << hinted_id), + std::memory_order_relaxed); + + if (qsize) { + in_queues[hinted_id].recv_unsafe(buffer, head); + dl.move_to_last(hinted_id); + return FanInFabricRecvBitmapResult::Ok; + } + + return FanInFabricRecvBitmapResult::ErrBitmapFailed; + } + + FanInFabricRecvTryResult recv_try(MessageType &buffer) noexcept { + BitmapType bitmap = hint.load(std::memory_order_relaxed); + + if (bitmap) { + bitmap = rotate(bitmap); + int hinted_id = (std::countr_zero(bitmap) + rotation) % + IN_THREAD_CNT; + rotation = (hinted_id + 1) % IN_THREAD_CNT; + + std::size_t head = in_queues[hinted_id].get_head(); + std::size_t tail = in_queues[hinted_id].get_tail(); + std::size_t qsize = + ((tail + QSIZE_MASK) - head) & QSIZE_MASK; + + if (qsize <= 1) + hint.fetch_and(~(1 << hinted_id), + std::memory_order_relaxed); + + if (qsize) { + in_queues[hinted_id].recv_unsafe(buffer, head); + dl.move_to_last(hinted_id); + return FanInFabricRecvTryResult::Ok; + } + } + + std::size_t head = in_queues[dl.first].get_head(); + std::size_t tail = in_queues[dl.first].get_tail(); + + if (head == tail) + return FanInFabricRecvTryResult::ErrTryFailed; + + in_queues[dl.first].recv_unsafe(buffer, head); + dl.move_first_to_last(); + return FanInFabricRecvTryResult::Ok; + } + + FanInFabricRecvAggrResult recv_aggr(MessageType &buffer) noexcept { + BitmapType bitmap = hint.load(std::memory_order_relaxed); + + if (bitmap) { + bitmap = rotate(bitmap); + int hinted_id = (std::countr_zero(bitmap) + rotation) % + IN_THREAD_CNT; + rotation = (hinted_id + 1) % IN_THREAD_CNT; + + std::size_t head = in_queues[hinted_id].get_head(); + std::size_t tail = in_queues[hinted_id].get_tail(); + std::size_t qsize = + ((tail + QSIZE_MASK) - head) & QSIZE_MASK; + + if (qsize <= 1) + hint.fetch_and(~(1 << hinted_id), + std::memory_order_relaxed); + + if (qsize) { + in_queues[hinted_id].recv_unsafe(buffer, head); + dl.move_to_last(hinted_id); + return FanInFabricRecvAggrResult::Ok; + } + } + + uint8_t last = dl.last; + + do { + uint8_t first = dl.first; + std::size_t head = in_queues[dl.first].get_head(); + std::size_t tail = in_queues[dl.first].get_tail(); + dl.move_first_to_last(); + + if (head != tail) { + in_queues[first].recv_unsafe(buffer, head); + return FanInFabricRecvAggrResult::Ok; + } + } while (dl.last != last); + + return FanInFabricRecvAggrResult::ErrAggrFailed; + } + + void full_scan(BitmapType &bitmap) noexcept { + bitmap = 0; + + uint8_t curr = dl.first; + + if (in_queues[curr].size()) + bitmap |= 1 << curr; + + do { + curr = dl.next[curr]; + if (in_queues[curr].size()) + bitmap |= 1 << curr; + } while (curr != dl.last); + + hint.fetch_or(bitmap, std::memory_order_release); + } + + template std::size_t size() const noexcept { + static_assert(thread_id < IN_THREAD_CNT, + "Thread ID out of bounds"); + + return in_queues[thread_id].size(); + } + + template bool is_empty() const noexcept { + static_assert(thread_id < IN_THREAD_CNT, + "Thread ID out of bounds"); + + return in_queues[thread_id].is_empty(); + } + + template bool is_full() const noexcept { + static_assert(thread_id < IN_THREAD_CNT, + "Thread ID out of bounds"); + + return in_queues[thread_id].is_full(); + } + +private: + std::array, IN_THREAD_CNT> + in_queues; + + alignas(CLS) std::atomic hint; + struct alignas(CLS) DiscoveryList { + std::array next; + std::array prev; + uint8_t first; + uint8_t last; + void init() noexcept { + for (int i = 0; i < IN_THREAD_CNT; ++i) { + next[i] = (i + 1) % IN_THREAD_CNT; + prev[i] = + (i + IN_THREAD_CNT - 1) % IN_THREAD_CNT; + } + first = 0; + last = IN_THREAD_CNT - 1; + } + + void move_first_to_last() noexcept { + prev[first] = last; + next[last] = first; + last = next[last]; + first = next[last]; + } + + void move_to_last(uint8_t index) noexcept { + if (index == last) + return; + else if (index == first) { + move_first_to_last(); + } else { + prev[next[index]] = prev[index]; + next[prev[index]] = next[index]; + next[last] = index; + prev[index] = last; + last = index; + } + } + } dl; + std::size_t rotation; + BitmapType rotate(BitmapType bitmap) const noexcept { + bitmap &= THREAD_BITMAP_MASK; + + if (rotation == 0) + return bitmap; + + return ((bitmap >> rotation) | + (bitmap << (IN_THREAD_CNT - rotation))) & + THREAD_BITMAP_MASK; + } +}; + +} // namespace THWeaver + +#endif diff --git a/include/weaver/atomic.h b/include/weaver/atomic.h deleted file mode 100644 index 1434dc7..0000000 --- a/include/weaver/atomic.h +++ /dev/null @@ -1,88 +0,0 @@ -#ifndef THWEAVER_ATOMIC_H -#define THWEAVER_ATOMIC_H - -#include -#include -#include -#include - -template -concept Additive = std::is_integral_v && !std::is_same_v; - -template -concept AtomicAdditive = -requires(A a, T v) { - { - a.load() - } - -> std::same_as; - { - a.store(v) - }; - { - a.fetch_add(v) - } - -> std::same_as; - { - a.fetch_sub(v) - } - -> std::same_as; -}; - -template -concept Bitwise = - (std::is_integral_v || std::is_enum_v) && - !std::is_same_v; - -template -concept AtomicBitwise = -requires(A a, T v) { - { - a.load() - } - -> std::same_as; - { - a.store(v) - }; - { - a.fetch_or(v) - } - -> std::same_as; - { - a.fetch_and(v) - } - -> std::same_as; - { - a.fetch_xor(v) - } - -> std::same_as; -}; - -// Uses the full bitmap -template -requires Bitwise && AtomicBitwise && -Additive && std::is_unsigned_v -class AtomicBitmap { -public: - static_assert(NumBits <= std::numeric_limits::digits); - - AtomicBitmap() noexcept - : bits(T {}), rotation(0) {} - - void reset(const T &bits, const Index rotation) noexcept; - - bool get_bit(const Index n) const noexcept; - void set_bit(const Index n) noexcept; - void clear_bit(const Index n) noexcept; - - void rotate(const Index n) noexcept; - Index get_rotation() const noexcept; - - Index get_low_bit() const noexcept; - Index rotate_to_low_bit(const Index offset = 0) noexcept; -private: - Atomic bits; - Index rotation; -}; - -#endif diff --git a/include/weaver/bitwise.h b/include/weaver/bitwise.h deleted file mode 100644 index 8a0325b..0000000 --- a/include/weaver/bitwise.h +++ /dev/null @@ -1,35 +0,0 @@ -#ifndef THWEAVER_BITWISE_H -#define THWEAVER_BITWISE_H - -#include - -template -concept Bitwise = -requires(T a, T b) { - { - a & b - } - -> std::same_as; - { - a | b - } - -> std::same_as; - { - a ^ b - } - -> std::same_as; - { - ~a - } - -> std::same_as; - { - a << 1 - } - -> std::same_as; - { - a >> 1 - } - -> std::same_as; -}; - -#endif diff --git a/include/weaver/fabric.h b/include/weaver/fabric.h deleted file mode 100644 index 648bcd8..0000000 --- a/include/weaver/fabric.h +++ /dev/null @@ -1,114 +0,0 @@ -#ifndef THWEAVER_FABRIC_H -#define THWEAVER_FABRIC_H - -#include -#include -#include -#include -#include -#include - -namespace THWeaver { - -template class EndpointQueue { -public: - static constexpr size_t CLS = - std::hardware_destructive_interference_size; - static constexpr std::size_t QSIZE = 1 << qsize_expt; - static constexpr std::size_t QSIZE_MASK = QSIZE - 1; - - EndpointQueue(const EndpointQueue &) = delete; - EndpointQueue & - operator=(const EndpointQueue &) = delete; - EndpointQueue(EndpointQueue &&) = delete; - EndpointQueue & - operator=(EndpointQueue &&) = delete; - - void init() noexcept; - void flush() noexcept; - - std::size_t size() const noexcept; - bool is_empty() const noexcept; - bool is_full() const noexcept; - - std::size_t get_head() const noexcept; - std::size_t get_tail() const noexcept; - - EndpointQueueSendResult send(MessageType &&message) noexcept; - EndpointQueueRecvResult recv(MessageType &buffer) noexcept; - void recv_unsafe(MessageType &buffer, std::size_t old_head) noexcept; - -private: - alignas(CLS) std::array buffer; - alignas(CLS) std::atomic head; - alignas(CLS) std::atomic tail; -}; - -template -class FanInFabric { -public: - using BitmapType = uint64_t; - - static constexpr size_t CLS = - std::hardware_destructive_interference_size; - static constexpr std::size_t QSIZE = 1 << qsize_expt; - static constexpr std::size_t QSIZE_MASK = QSIZE - 1; - static constexpr std::size_t BITMAP_SIZE = sizeof(BitmapType) * 8; - static constexpr BitmapType THREAD_BITMAP_MASK = - IN_THREAD_CNT == BITMAP_SIZE - ? ~BitmapType(0) - : (BitmapType(1) << IN_THREAD_CNT) - 1; - - static_assert(IN_THREAD_CNT <= BITMAP_SIZE, "Producer limit exceeded"); - - FanInFabric(const FanInFabric - &) = delete; - FanInFabric & - operator=(const FanInFabric &) = - delete; - FanInFabric(FanInFabric &&) = - delete; - FanInFabric & - operator=(FanInFabric &&) = - delete; - - void init() noexcept; - template void flush() noexcept; - void flush_all() noexcept; - - template - FanInFabricSendResult send(MessageType &&message) noexcept; - - FanInFabricRecvBitmapResult - recv_bitmap_only(MessageType &buffer) noexcept; - FanInFabricRecvTryResult recv_try(MessageType &buffer) noexcept; - FanInFabricRecvAggrResult recv_aggr(MessageType &buffer) noexcept; - - void full_scan(BitmapType &bitmap) noexcept; - - template std::size_t size() const noexcept; - template bool is_empty() const noexcept; - template bool is_full() const noexcept; - -private: - std::array, IN_THREAD_CNT> - in_queues; - - alignas(CLS) std::atomic hint; - struct alignas(CLS) DiscoveryList { - std::array next; - std::array prev; - uint8_t first; - uint8_t last; - void init() noexcept; - void move_first_to_last() noexcept; - void move_to_last(uint8_t index) noexcept; - } dl; - std::size_t rotation; - BitmapType rotate(BitmapType bitmap) const noexcept; -}; - -} // namespace THWeaver - -#endif diff --git a/include/weaver/results.h b/include/weaver/results.h deleted file mode 100644 index 4b9fd2c..0000000 --- a/include/weaver/results.h +++ /dev/null @@ -1,21 +0,0 @@ -#ifndef THWEAVER_RESULTS_H -#define THWEAVER_RESULTS_H - -#include -namespace THWeaver { - -enum class EndpointQueueSendResult : uint32_t { Ok = 0, ErrFull = 1 }; - -enum class EndpointQueueRecvResult { Ok, ErrEmpty }; - -enum class FanInFabricSendResult : uint32_t { Ok = 0, ErrFull = 1 }; - -enum class FanInFabricRecvBitmapResult { Ok, ErrBitmapEmpty, ErrBitmapFailed }; - -enum class FanInFabricRecvTryResult { Ok, ErrTryFailed }; - -enum class FanInFabricRecvAggrResult { Ok, ErrAggrFailed }; - -} // namespace THWeaver - -#endif diff --git a/src/fan_in.cc b/src/fan_in.cc deleted file mode 100644 index 6b99ec0..0000000 --- a/src/fan_in.cc +++ /dev/null @@ -1,267 +0,0 @@ -#include -#include -#include -#include -#include -#include - -namespace THWeaver { - -template -void FanInFabric::init() noexcept { - for (int i = 0; i < IN_THREAD_CNT; ++i) - in_queues[i].init(); - hint.store(0); - dl.init(); - rotation = 0; -} - -template -template -void FanInFabric::flush() noexcept { - static_assert(thread_id < IN_THREAD_CNT, "Thread ID out of bounds"); - - in_queues[thread_id].flush(); - hint.fetch_and(~(1 << thread_id), std::memory_order_relaxed); -} - -template -void FanInFabric::flush_all() noexcept { - for (int i = 0; i < IN_THREAD_CNT; ++i) - in_queues[i].flush(); - hint.store(0, std::memory_order_relaxed); - dl.init(); - rotation = 0; -} - -template -template -FanInFabricSendResult FanInFabric::send( - MessageType &&message) noexcept { - static_assert(thread_id < IN_THREAD_CNT, "Thread ID out of bounds"); - - return static_cast( - static_cast(in_queues[thread_id].send(message))); -} - -template -template -std::size_t -FanInFabric::size() const noexcept { - static_assert(thread_id < IN_THREAD_CNT, "Thread ID out of bounds"); - - return in_queues[thread_id].size(); -} - -template -template -bool FanInFabric::is_empty() - const noexcept { - static_assert(thread_id < IN_THREAD_CNT, "Thread ID out of bounds"); - - return in_queues[thread_id].is_empty(); -} - -template -template -bool FanInFabric::is_full() - const noexcept { - static_assert(thread_id < IN_THREAD_CNT, "Thread ID out of bounds"); - - return in_queues[thread_id].is_full(); -} - -template -FanInFabricRecvBitmapResult -FanInFabric::recv_bitmap_only( - MessageType &buffer) noexcept { - BitmapType bitmap = hint.load(std::memory_order_relaxed); - - if (!bitmap) - return FanInFabricRecvBitmapResult::ErrBitmapEmpty; - - bitmap = rotate(bitmap); - int hinted_id = (std::countr_zero(bitmap) + rotation) % IN_THREAD_CNT; - rotation = (hinted_id + 1) % IN_THREAD_CNT; - - std::size_t head = in_queues[hinted_id].get_head(); - std::size_t tail = in_queues[hinted_id].get_tail(); - std::size_t qsize = ((tail + QSIZE_MASK) - head) & QSIZE_MASK; - - if (qsize <= 1) - hint.fetch_add(~(1 << hinted_id), std::memory_order_relaxed); - - if (qsize) { - in_queues[hinted_id].recv_unsafe(buffer, head); - dl.move_to_last(hinted_id); - return FanInFabricRecvBitmapResult::Ok; - } - - return FanInFabricRecvBitmapResult::ErrBitmapFailed; -} - -template -FanInFabricRecvTryResult -FanInFabric::recv_try( - MessageType &buffer) noexcept { - BitmapType bitmap = hint.load(std::memory_order_relaxed); - - if (bitmap) { - bitmap = rotate(bitmap); - int hinted_id = - (std::countr_zero(bitmap) + rotation) % IN_THREAD_CNT; - rotation = (hinted_id + 1) % IN_THREAD_CNT; - - std::size_t head = in_queues[hinted_id].get_head(); - std::size_t tail = in_queues[hinted_id].get_tail(); - std::size_t qsize = ((tail + QSIZE_MASK) - head) & QSIZE_MASK; - - if (qsize <= 1) - hint.fetch_add(~(1 << hinted_id), - std::memory_order_relaxed); - - if (qsize) { - in_queues[hinted_id].recv_unsafe(buffer, head); - dl.move_to_last(hinted_id); - return FanInFabricRecvTryResult::Ok; - } - } - - std::size_t head = in_queues[dl.first].get_head(); - std::size_t tail = in_queues[dl.first].get_tail(); - - if (head == tail) - return FanInFabricRecvTryResult::ErrTryFailed; - - in_queues[dl.first].recv_unsafe(buffer, head); - dl.move_first_to_last(); - return FanInFabricRecvTryResult::Ok; -} - -template -FanInFabricRecvAggrResult -FanInFabric::recv_aggr( - MessageType &buffer) noexcept { - BitmapType bitmap = hint.load(std::memory_order_relaxed); - - if (bitmap) { - bitmap = rotate(bitmap); - int hinted_id = - (std::countr_zero(bitmap) + rotation) % IN_THREAD_CNT; - rotation = (hinted_id + 1) % IN_THREAD_CNT; - - std::size_t head = in_queues[hinted_id].get_head(); - std::size_t tail = in_queues[hinted_id].get_tail(); - std::size_t qsize = ((tail + QSIZE_MASK) - head) & QSIZE_MASK; - - if (qsize <= 1) - hint.fetch_add(~(1 << hinted_id), - std::memory_order_relaxed); - - if (qsize) { - in_queues[hinted_id].recv_unsafe(buffer, head); - dl.move_to_last(hinted_id); - return FanInFabricRecvAggrResult::Ok; - } - } - - uint8_t last = dl.last; - - do { - std::size_t head = in_queues[dl.first].get_head(); - std::size_t tail = in_queues[dl.first].get_tail(); - dl.move_first_to_last(); - - if (head != tail) { - in_queues[dl.first].recv_unsafe(buffer, head); - return FanInFabricRecvAggrResult::Ok; - } - } while (dl.last != last); - - return FanInFabricRecvAggrResult::ErrAggrFailed; -} - -template -void FanInFabric::full_scan( - BitmapType &bitmap) noexcept { - bitmap = 0; - - uint8_t curr = dl.first; - - if (in_queues[curr].size()) - bitmap |= 1 << curr; - - do { - curr = dl.next[curr]; - if (in_queues[curr].size()) - bitmap |= 1 << curr; - } while (curr != dl.last); - - hint.fetch_or(bitmap, std::memory_order_release); -} - -template -void FanInFabric::DiscoveryList::init() noexcept { - for (int i = 0; i < IN_THREAD_CNT; ++i) { - next[i] = (i + 1) % IN_THREAD_CNT; - prev[i] = (i + IN_THREAD_CNT - 1) % IN_THREAD_CNT; - } - first = 0; - last = IN_THREAD_CNT - 1; -} - -template -void FanInFabric::DiscoveryList::move_first_to_last() noexcept { - prev[first] = last; - next[last] = first; - last = next[last]; - first = next[last]; -} - -template -void FanInFabric::DiscoveryList:: - move_to_last(uint8_t index) noexcept { - if (index == last) - return; - else if (index == first) { - move_first_to_last(); - } else { - prev[next[index]] = prev[index]; - next[prev[index]] = next[index]; - next[last] = index; - prev[index] = last; - last = index; - } -} - -template -FanInFabric::BitmapType -FanInFabric::rotate( - BitmapType bitmap) const noexcept { - bitmap &= THREAD_BITMAP_MASK; - - if (rotation == 0) - return bitmap; - - return ((bitmap >> rotation) | (bitmap << (IN_THREAD_CNT - rotation))) & - THREAD_BITMAP_MASK; -} - -} // namespace THWeaver diff --git a/src/queue.cc b/src/queue.cc deleted file mode 100644 index 025988d..0000000 --- a/src/queue.cc +++ /dev/null @@ -1,89 +0,0 @@ -#include -#include - -#include -#include - -namespace THWeaver { - -template -void EndpointQueue::init() noexcept { - head.store(0); - tail.store(0); -} - -template -void EndpointQueue::flush() noexcept { - head.store(0); - tail.store(0); -} - -template -EndpointQueueSendResult -EndpointQueue::send(MessageType &&message) noexcept { - std::size_t h = head.load(std::memory_order_acquire); - std::size_t t = tail.load(std::memory_order_acquire); - - if (h == ((t + 1) & QSIZE_MASK)) - return EndpointQueueSendResult::ErrFull; - - buffer[t] = std::move(message); - - t = (t + 1) & QSIZE_MASK; - tail.store(t); - - return EndpointQueueSendResult::Ok; -} - -template -EndpointQueueRecvResult -EndpointQueue::recv(MessageType &buffer) noexcept { - std::size_t h = head.load(); - std::size_t t = tail.load(); - - if (h == t) - return EndpointQueueRecvResult::ErrEmpty; - - buffer = std::move(this->buffer[h]); - - h = (h + 1) & QSIZE_MASK; - head.store(h); - - return EndpointQueueRecvResult::Ok; -} - -template -void EndpointQueue::recv_unsafe( - MessageType &buffer, std::size_t old_head) noexcept { - - buffer = std::move(this->buffer[old_head]); - - head.store((old_head + 1) & QSIZE_MASK); -} - -template -std::size_t EndpointQueue::size() const noexcept { - return ((tail.load() + QSIZE_MASK) - head.load()) & QSIZE_MASK; -} - -template -bool EndpointQueue::is_empty() const noexcept { - return head.load() == tail.load(); -} - -template -bool EndpointQueue::is_full() const noexcept { - return head.load() != ((tail.load() + 1) & QSIZE_MASK); -} - -template -std::size_t EndpointQueue::get_head() const noexcept { - return head.load(); -} - -template -std::size_t EndpointQueue::get_tail() const noexcept { - return tail.load(); -} - -} // namespace THWeaver diff --git a/tests/endpoint_queue/basic_test.cc b/tests/endpoint_queue/basic_test.cc new file mode 100644 index 0000000..e1df456 --- /dev/null +++ b/tests/endpoint_queue/basic_test.cc @@ -0,0 +1,75 @@ +#include +#include +#include +#include +#include +#include + +using std::chrono::nanoseconds; +using std::chrono::steady_clock; + +int init() { + int n = 0; + std::cin >> n; + return n; +} + +int main() { + std::vector send_time; + std::vector recv_time; + std::vector received; + + int n = init(); + if (n <= 0) { + std::cerr << "n must be positive\n"; + return 1; + } + + received.resize(n); + send_time.resize(n); + recv_time.resize(n); + + THWeaver::EndpointQueue queue; + queue.init(); + + std::thread producer([&]() { + for (int i = 0; i < n; ++i) { + while (queue.send(static_cast(i + 1)) != + THWeaver::EndpointQueueSendResult::Ok) { + } + send_time[i] = steady_clock::now(); + } + }); + + std::thread consumer([&]() { + for (int i = 0; i < n; ++i) { + uint64_t value = 0; + while (queue.recv(value) != + THWeaver::EndpointQueueRecvResult::Ok) { + } + recv_time[i] = steady_clock::now(); + received[i] = value; + } + }); + + producer.join(); + consumer.join(); + + for (int i = 0; i < n; ++i) { + uint64_t expected = static_cast(i + 1); + if (received[i] != expected) { + std::cerr << "mismatch at " << i << ": expected " + << expected << ", got " << received[i] + << "\n"; + return 1; + } + } + + for (int i = 0; i < n; ++i) { + nanoseconds duration = std::chrono::duration_cast( + recv_time[i] - send_time[i]); + std::cout << (i + 1) << "," << duration.count() << "\n"; + } + + return 0; +}