diff --git a/AnnService/inc/Core/SPANN/Options.h b/AnnService/inc/Core/SPANN/Options.h index f49621230..fb7c56a3f 100644 --- a/AnnService/inc/Core/SPANN/Options.h +++ b/AnnService/inc/Core/SPANN/Options.h @@ -23,6 +23,7 @@ namespace SPTAG { VectorFileType m_vectorType; SizeType m_vectorSize; //Optional on condition std::string m_vectorDelimiter; //Optional on condition + bool m_mmapVectors; // memory-map the input vector file instead of loading it fully into RAM std::string m_queryPath; VectorFileType m_queryType; SizeType m_querySize; //Optional on condition diff --git a/AnnService/inc/Core/SPANN/ParameterDefinitionList.h b/AnnService/inc/Core/SPANN/ParameterDefinitionList.h index 0a88e3f1d..690f7930e 100644 --- a/AnnService/inc/Core/SPANN/ParameterDefinitionList.h +++ b/AnnService/inc/Core/SPANN/ParameterDefinitionList.h @@ -12,6 +12,7 @@ DefineBasicParameter(m_vectorPath, std::string, std::string(""), "VectorPath") DefineBasicParameter(m_vectorType, SPTAG::VectorFileType, SPTAG::VectorFileType::DEFAULT, "VectorType") DefineBasicParameter(m_vectorSize, SPTAG::SizeType, -1, "VectorSize") DefineBasicParameter(m_vectorDelimiter, std::string, std::string("|"), "VectorDelimiter") +DefineBasicParameter(m_mmapVectors, bool, false, "MmapVectors") DefineBasicParameter(m_queryPath, std::string, std::string(""), "QueryPath") DefineBasicParameter(m_queryType, SPTAG::VectorFileType, SPTAG::VectorFileType::Undefined, "QueryType") DefineBasicParameter(m_querySize, SPTAG::SizeType, -1, "QuerySize") diff --git a/AnnService/inc/Helper/ConcurrentSet.h b/AnnService/inc/Helper/ConcurrentSet.h index 49210a29e..9c9f534f5 100644 --- a/AnnService/inc/Helper/ConcurrentSet.h +++ b/AnnService/inc/Helper/ConcurrentSet.h @@ -16,6 +16,7 @@ #include #include #include +#include #endif // TBB #else #include @@ -47,9 +48,10 @@ namespace SPTAG template class ConcurrentSet { + public: typedef typename std::unordered_set::iterator iterator; + typedef typename std::unordered_set::const_iterator const_iterator; - public: ConcurrentSet() { m_lock.reset(new std::shared_timed_mutex); } ~ConcurrentSet() {} @@ -72,6 +74,14 @@ namespace SPTAG return m_data.insert(key); } + // Unsafe iteration: caller must ensure no concurrent modification. + // Mirrors the semantics of tbb::concurrent_unordered_set::unsafe_begin/unsafe_end, + // which (unlike the locked accessors above) do not synchronize. + iterator begin() { return m_data.begin(); } + iterator end() { return m_data.end(); } + const_iterator begin() const { return m_data.begin(); } + const_iterator end() const { return m_data.end(); } + private: std::unique_ptr m_lock; std::unordered_set m_data; @@ -80,9 +90,10 @@ namespace SPTAG template class ConcurrentMap { + public: typedef typename std::unordered_map::iterator iterator; + typedef typename std::unordered_map::value_type value_type; - public: ConcurrentMap(int capacity = 8) { m_lock.reset(new std::shared_timed_mutex); m_data.reserve(capacity); } ~ConcurrentMap() {} @@ -127,6 +138,8 @@ namespace SPTAG class ConcurrentQueue { public: + typedef typename std::deque::iterator iterator; + typedef typename std::deque::const_iterator const_iterator; ConcurrentQueue() {} @@ -135,7 +148,7 @@ namespace SPTAG void push(const T& j) { std::lock_guard lock(m_lock); - m_queue.push(j); + m_queue.push_back(j); } bool try_pop(T& j) @@ -145,13 +158,36 @@ namespace SPTAG return false; } j = m_queue.front(); - m_queue.pop(); + m_queue.pop_front(); return true; } + // The TBB concurrent_queue exposes empty() and unsafe_size() as + // best-effort, lock-free queries. Here we take the lock so the + // snapshot is consistent; callers should still treat the result + // as advisory in concurrent contexts. + bool empty() const + { + std::lock_guard lock(m_lock); + return m_queue.empty(); + } + + size_t unsafe_size() const + { + std::lock_guard lock(m_lock); + return m_queue.size(); + } + + // Unsafe iteration: caller must ensure no concurrent modification, + // matching tbb::concurrent_queue::unsafe_begin/unsafe_end semantics. + iterator unsafe_begin() { return m_queue.begin(); } + iterator unsafe_end() { return m_queue.end(); } + const_iterator unsafe_begin() const { return m_queue.begin(); } + const_iterator unsafe_end() const { return m_queue.end(); } + protected: - std::mutex m_lock; - std::queue m_queue; + mutable std::mutex m_lock; + std::deque m_queue; }; template @@ -161,7 +197,7 @@ namespace SPTAG ConcurrentPriorityQueue() {} ~ConcurrentPriorityQueue() {} - size_type size() const { + size_t size() const { std::lock_guard lock(m_lock); return m_queue.size(); } @@ -178,11 +214,11 @@ namespace SPTAG } value = m_queue.top(); m_queue.pop(); - return true; + return true; } private: - std::mutex m_lock; + mutable std::mutex m_lock; std::priority_queue m_queue; }; #endif // TBB diff --git a/AnnService/inc/Helper/Logging.h b/AnnService/inc/Helper/Logging.h index 221f4dfa5..d5e379481 100644 --- a/AnnService/inc/Helper/Logging.h +++ b/AnnService/inc/Helper/Logging.h @@ -37,7 +37,14 @@ namespace SPTAG class LoggerHolder { -#if ((defined(_MSVC_LANG) && _MSVC_LANG >= 202002L) || __cplusplus >= 202002L) + // The C++20 path uses `std::atomic>`, which the + // libc++ shipped with ClickHouse rejects (it requires the value type to + // be trivially copyable). The pre-C++20 path with `std::atomic_load` / + // `std::atomic_store` works under both standards, so force it + // unconditionally — same workaround SPTAG-cmake uses for `_sptag` via + // `-std=c++17`, but applied at the header so consumers compiled at + // higher standards (e.g. ClickHouse's `dbms` at C++23) are unaffected. +#if 0 private: std::atomic> m_logger; public: diff --git a/AnnService/inc/Helper/MemoryMappedFile.h b/AnnService/inc/Helper/MemoryMappedFile.h new file mode 100644 index 000000000..a61b07fc3 --- /dev/null +++ b/AnnService/inc/Helper/MemoryMappedFile.h @@ -0,0 +1,140 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +#ifndef _SPTAG_HELPER_MEMORYMAPPEDFILE_H_ +#define _SPTAG_HELPER_MEMORYMAPPEDFILE_H_ + +#include +#include + +#ifdef _MSC_VER +#ifndef WIN32_LEAN_AND_MEAN +#define WIN32_LEAN_AND_MEAN +#endif +#ifndef NOMINMAX +#define NOMINMAX +#endif +#include +#else +#include +#include +#include +#include +#endif + +namespace SPTAG +{ +namespace Helper +{ + +// RAII read-only whole-file memory mapping. The OS demand-pages the file on access and +// reclaims clean file-backed pages under memory pressure, so resident set stays bounded +// instead of holding the whole file in heap. Header-only so it needs no build-list change. +class MemoryMappedFile +{ +public: + MemoryMappedFile() = default; + + ~MemoryMappedFile() { Close(); } + + // Non-copyable, non-movable: a single owner unmaps exactly once. + MemoryMappedFile(const MemoryMappedFile&) = delete; + MemoryMappedFile& operator=(const MemoryMappedFile&) = delete; + + // Maps the entire file read-only. Returns false (and leaves the object closed) on failure. + bool Open(const std::string& p_path) + { + Close(); +#ifdef _MSC_VER + HANDLE file = ::CreateFileA(p_path.c_str(), GENERIC_READ, FILE_SHARE_READ, nullptr, + OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, nullptr); + if (file == INVALID_HANDLE_VALUE) return false; + + LARGE_INTEGER size; + if (!::GetFileSizeEx(file, &size) || size.QuadPart <= 0) + { + ::CloseHandle(file); + return false; + } + + HANDLE mapping = ::CreateFileMappingA(file, nullptr, PAGE_READONLY, 0, 0, nullptr); + if (mapping == nullptr) + { + ::CloseHandle(file); + return false; + } + + void* view = ::MapViewOfFile(mapping, FILE_MAP_READ, 0, 0, 0); + if (view == nullptr) + { + ::CloseHandle(mapping); + ::CloseHandle(file); + return false; + } + + m_base = reinterpret_cast(view); + m_length = static_cast(size.QuadPart); + m_file = file; + m_mapping = mapping; + return true; +#else + int fd = ::open(p_path.c_str(), O_RDONLY); + if (fd < 0) return false; + + struct stat st; + if (::fstat(fd, &st) != 0 || st.st_size <= 0) + { + ::close(fd); + return false; + } + + void* p = ::mmap(nullptr, static_cast(st.st_size), PROT_READ, MAP_SHARED, fd, 0); + // The mapping keeps its own reference to the file, so the descriptor can be closed now. + ::close(fd); + if (p == MAP_FAILED) return false; + +#if defined(MADV_RANDOM) + // BKT build accesses vectors randomly by id; suppress readahead to keep RSS tight. + ::madvise(p, static_cast(st.st_size), MADV_RANDOM); +#endif + m_base = reinterpret_cast(p); + m_length = static_cast(st.st_size); + return true; +#endif + } + + void Close() + { + if (m_base == nullptr) return; +#ifdef _MSC_VER + ::UnmapViewOfFile(m_base); + if (m_mapping != nullptr) ::CloseHandle(m_mapping); + if (m_file != nullptr) ::CloseHandle(m_file); + m_mapping = nullptr; + m_file = nullptr; +#else + ::munmap(m_base, static_cast(m_length)); +#endif + m_base = nullptr; + m_length = 0; + } + + const std::uint8_t* Data() const { return m_base; } + + std::uint64_t Length() const { return m_length; } + + bool IsOpen() const { return m_base != nullptr; } + +private: + std::uint8_t* m_base = nullptr; + std::uint64_t m_length = 0; +#ifdef _MSC_VER + void* m_file = nullptr; + void* m_mapping = nullptr; +#endif +}; + +} // namespace Helper +} // namespace SPTAG + +#endif // _SPTAG_HELPER_MEMORYMAPPEDFILE_H_ diff --git a/AnnService/inc/Helper/VectorSetReader.h b/AnnService/inc/Helper/VectorSetReader.h index ea68419dc..c65537e95 100644 --- a/AnnService/inc/Helper/VectorSetReader.h +++ b/AnnService/inc/Helper/VectorSetReader.h @@ -34,6 +34,10 @@ class ReaderOptions : public ArgumentsParser std::uint32_t m_threadNum; bool m_normalized; + + // When true (and the file type supports it), map the input vector file instead of + // reading it fully into RAM, so the OS demand-pages it and RSS stays bounded. + bool m_useMmap = false; }; class VectorSetReader diff --git a/AnnService/src/Core/SPANN/SPANNIndex.cpp b/AnnService/src/Core/SPANN/SPANNIndex.cpp index 0c34a17b8..9d95fe202 100644 --- a/AnnService/src/Core/SPANN/SPANNIndex.cpp +++ b/AnnService/src/Core/SPANN/SPANNIndex.cpp @@ -1246,6 +1246,16 @@ template ErrorCode Index::BuildIndex(bool p_normalized) std::shared_ptr vectorOptions( new Helper::ReaderOptions(valueType, dim, m_options.m_vectorType, m_options.m_vectorDelimiter, m_options.m_iSSDNumberOfThreads, p_normalized)); + // A read-only mapping cannot satisfy the in-place Normalize done in SelectHead, so mmap is + // only enabled when no normalization is required (L2, or already-normalized Cosine input). + bool needNormalize = (m_options.m_distCalcMethod == DistCalcMethod::Cosine) && !p_normalized; + vectorOptions->m_useMmap = m_options.m_mmapVectors && !needNormalize; + if (m_options.m_mmapVectors && needNormalize) + { + SPTAGLIB_LOG(Helper::LogLevel::LL_Warning, + "MmapVectors disabled: Cosine with un-normalized input requires in-place " + "Normalize; using in-memory load. Pre-normalize input or use L2 to enable mmap.\n"); + } auto vectorReader = Helper::VectorSetReader::CreateInstance(vectorOptions); if (m_options.m_vectorPath.empty()) { diff --git a/AnnService/src/Core/VectorIndex.cpp b/AnnService/src/Core/VectorIndex.cpp index 2f8ebfd13..5b7bd5270 100644 --- a/AnnService/src/Core/VectorIndex.cpp +++ b/AnnService/src/Core/VectorIndex.cpp @@ -1185,7 +1185,7 @@ void VectorIndex::ApproximateRNG(std::shared_ptr &fullVectors, std::u default: SPTAGLIB_LOG( Helper::LogLevel::LL_Error, "Unable to get quantizer reconstruct type %s", - Helper::Convert::ConvertToString(m_pQuantizer->GetReconstructType())); + Helper::Convert::ConvertToString(m_pQuantizer->GetReconstructType()).c_str()); } } else diff --git a/AnnService/src/Helper/VectorSetReaders/DefaultReader.cpp b/AnnService/src/Helper/VectorSetReaders/DefaultReader.cpp index eeb89a5a2..9bfa9c61a 100644 --- a/AnnService/src/Helper/VectorSetReaders/DefaultReader.cpp +++ b/AnnService/src/Helper/VectorSetReaders/DefaultReader.cpp @@ -4,6 +4,7 @@ #include "inc/Helper/VectorSetReaders/DefaultReader.h" #include "inc/Core/VectorIndex.h" #include "inc/Helper/CommonHelper.h" +#include "inc/Helper/MemoryMappedFile.h" using namespace SPTAG; using namespace SPTAG::Helper; @@ -33,6 +34,40 @@ ErrorCode DefaultVectorReader::LoadFile(const std::string &p_filePaths) std::shared_ptr DefaultVectorReader::GetVectorSet(SizeType start, SizeType end) const { + if (m_options->m_useMmap) + { + auto mmf = std::make_shared(); + if (mmf->Open(m_vectorOutput) && + mmf->Length() >= (std::uint64_t)(sizeof(SizeType) + sizeof(DimensionType))) + { + const std::uint8_t *base = mmf->Data(); + SizeType row = *reinterpret_cast(base); + DimensionType col = *reinterpret_cast(base + sizeof(SizeType)); + + if (start > row) + start = row; + if (end < 0 || end > row) + end = row; + + std::uint64_t perVec = ((std::uint64_t)GetValueTypeSize(m_options->m_inputValueType)) * col; + std::uint64_t dataOffset = sizeof(SizeType) + sizeof(DimensionType) + ((std::uint64_t)start) * perVec; + std::uint64_t dataLen = ((std::uint64_t)(end - start)) * perVec; + + if (dataOffset + dataLen <= mmf->Length()) + { + std::uint8_t *dataPtr = const_cast(base) + dataOffset; + // Aliasing shared_ptr: shares ownership with mmf (so the mapping outlives every + // copy of the ByteArray / BasicVectorSet) but get() returns the data pointer. + std::shared_ptr holder(mmf, dataPtr); + ByteArray vectorSet(dataPtr, dataLen, holder); + SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "Mmap Vector(%d,%d)\n", end - start, col); + return std::make_shared(vectorSet, m_options->m_inputValueType, col, end - start); + } + } + SPTAGLIB_LOG(Helper::LogLevel::LL_Warning, + "mmap failed for %s, falling back to in-memory read.\n", m_vectorOutput.c_str()); + } + auto ptr = f_createIO(); if (ptr == nullptr || !ptr->Initialize(m_vectorOutput.c_str(), std::ios::binary | std::ios::in)) { diff --git a/AnnService/src/Helper/VectorSetReaders/TxtReader.cpp b/AnnService/src/Helper/VectorSetReaders/TxtReader.cpp index 8bf9fb55c..ef1439821 100644 --- a/AnnService/src/Helper/VectorSetReaders/TxtReader.cpp +++ b/AnnService/src/Helper/VectorSetReaders/TxtReader.cpp @@ -5,7 +5,9 @@ #include "inc/Core/VectorIndex.h" #include "inc/Helper/CommonHelper.h" #include "inc/Helper/StringConvert.h" -#include +// was included historically but no OpenMP primitives are actually used +// in this translation unit. Drop it — ClickHouse's contrib build does not +// configure an OpenMP runtime. using namespace SPTAG; using namespace SPTAG::Helper;