Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AnnService/inc/Core/SPANN/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ namespace SPTAG {
VectorFileType m_vectorType;
SizeType m_vectorSize; //Optional on condition
std::string m_vectorDelimiter; //Optional on condition
bool m_mmapVectors; // memory-map the input vector file instead of loading it fully into RAM
std::string m_queryPath;
VectorFileType m_queryType;
SizeType m_querySize; //Optional on condition
Expand Down
1 change: 1 addition & 0 deletions AnnService/inc/Core/SPANN/ParameterDefinitionList.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ DefineBasicParameter(m_vectorPath, std::string, std::string(""), "VectorPath")
DefineBasicParameter(m_vectorType, SPTAG::VectorFileType, SPTAG::VectorFileType::DEFAULT, "VectorType")
DefineBasicParameter(m_vectorSize, SPTAG::SizeType, -1, "VectorSize")
DefineBasicParameter(m_vectorDelimiter, std::string, std::string("|"), "VectorDelimiter")
DefineBasicParameter(m_mmapVectors, bool, false, "MmapVectors")
DefineBasicParameter(m_queryPath, std::string, std::string(""), "QueryPath")
DefineBasicParameter(m_queryType, SPTAG::VectorFileType, SPTAG::VectorFileType::Undefined, "QueryType")
DefineBasicParameter(m_querySize, SPTAG::SizeType, -1, "QuerySize")
Expand Down
54 changes: 45 additions & 9 deletions AnnService/inc/Helper/ConcurrentSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <unordered_set>
#include <unordered_map>
#include <queue>
#include <deque>
#endif // TBB
#else
#include <concurrent_unordered_map.h>
Expand Down Expand Up @@ -47,9 +48,10 @@ namespace SPTAG
template <typename T>
class ConcurrentSet
{
public:
typedef typename std::unordered_set<T>::iterator iterator;
typedef typename std::unordered_set<T>::const_iterator const_iterator;

public:
ConcurrentSet() { m_lock.reset(new std::shared_timed_mutex); }

~ConcurrentSet() {}
Expand All @@ -72,6 +74,14 @@ namespace SPTAG
return m_data.insert(key);
}

// Unsafe iteration: caller must ensure no concurrent modification.
// Mirrors the semantics of tbb::concurrent_unordered_set::unsafe_begin/unsafe_end,
// which (unlike the locked accessors above) do not synchronize.
iterator begin() { return m_data.begin(); }
iterator end() { return m_data.end(); }
const_iterator begin() const { return m_data.begin(); }
const_iterator end() const { return m_data.end(); }

private:
std::unique_ptr<std::shared_timed_mutex> m_lock;
std::unordered_set<T> m_data;
Expand All @@ -80,9 +90,10 @@ namespace SPTAG
template <typename K, typename V>
class ConcurrentMap
{
public:
typedef typename std::unordered_map<K, V>::iterator iterator;
typedef typename std::unordered_map<K, V>::value_type value_type;

public:
ConcurrentMap(int capacity = 8) { m_lock.reset(new std::shared_timed_mutex); m_data.reserve(capacity); }

~ConcurrentMap() {}
Expand Down Expand Up @@ -127,6 +138,8 @@ namespace SPTAG
class ConcurrentQueue
{
public:
typedef typename std::deque<T>::iterator iterator;
typedef typename std::deque<T>::const_iterator const_iterator;

ConcurrentQueue() {}

Expand All @@ -135,7 +148,7 @@ namespace SPTAG
void push(const T& j)
{
std::lock_guard<std::mutex> lock(m_lock);
m_queue.push(j);
m_queue.push_back(j);
}

bool try_pop(T& j)
Expand All @@ -145,13 +158,36 @@ namespace SPTAG
return false;
}
j = m_queue.front();
m_queue.pop();
m_queue.pop_front();
return true;
}

// The TBB concurrent_queue exposes empty() and unsafe_size() as
// best-effort, lock-free queries. Here we take the lock so the
// snapshot is consistent; callers should still treat the result
// as advisory in concurrent contexts.
bool empty() const
{
std::lock_guard<std::mutex> lock(m_lock);
return m_queue.empty();
}

size_t unsafe_size() const
{
std::lock_guard<std::mutex> lock(m_lock);
return m_queue.size();
}

// Unsafe iteration: caller must ensure no concurrent modification,
// matching tbb::concurrent_queue::unsafe_begin/unsafe_end semantics.
iterator unsafe_begin() { return m_queue.begin(); }
iterator unsafe_end() { return m_queue.end(); }
const_iterator unsafe_begin() const { return m_queue.begin(); }
const_iterator unsafe_end() const { return m_queue.end(); }

protected:
std::mutex m_lock;
std::queue<T> m_queue;
mutable std::mutex m_lock;
std::deque<T> m_queue;
};

template <typename T>
Expand All @@ -161,7 +197,7 @@ namespace SPTAG
ConcurrentPriorityQueue() {}
~ConcurrentPriorityQueue() {}

size_type size() const {
size_t size() const {
std::lock_guard<std::mutex> lock(m_lock);
return m_queue.size();
}
Expand All @@ -178,11 +214,11 @@ namespace SPTAG
}
value = m_queue.top();
m_queue.pop();
return true;
return true;
}

private:
std::mutex m_lock;
mutable std::mutex m_lock;
std::priority_queue<T> m_queue;
};
#endif // TBB
Expand Down
9 changes: 8 additions & 1 deletion AnnService/inc/Helper/Logging.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,14 @@ namespace SPTAG

class LoggerHolder
{
#if ((defined(_MSVC_LANG) && _MSVC_LANG >= 202002L) || __cplusplus >= 202002L)
// The C++20 path uses `std::atomic<std::shared_ptr<Logger>>`, which the
// libc++ shipped with ClickHouse rejects (it requires the value type to
// be trivially copyable). The pre-C++20 path with `std::atomic_load` /
// `std::atomic_store` works under both standards, so force it
// unconditionally — same workaround SPTAG-cmake uses for `_sptag` via
// `-std=c++17`, but applied at the header so consumers compiled at
// higher standards (e.g. ClickHouse's `dbms` at C++23) are unaffected.
#if 0
private:
std::atomic<std::shared_ptr<Logger>> m_logger;
public:
Expand Down
140 changes: 140 additions & 0 deletions AnnService/inc/Helper/MemoryMappedFile.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

#ifndef _SPTAG_HELPER_MEMORYMAPPEDFILE_H_
#define _SPTAG_HELPER_MEMORYMAPPEDFILE_H_

#include <cstdint>
#include <string>

#ifdef _MSC_VER
#ifndef WIN32_LEAN_AND_MEAN
#define WIN32_LEAN_AND_MEAN
#endif
#ifndef NOMINMAX
#define NOMINMAX
#endif
#include <windows.h>
#else
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#endif

namespace SPTAG
{
namespace Helper
{

// RAII read-only whole-file memory mapping. The OS demand-pages the file on access and
// reclaims clean file-backed pages under memory pressure, so resident set stays bounded
// instead of holding the whole file in heap. Header-only so it needs no build-list change.
class MemoryMappedFile
{
public:
MemoryMappedFile() = default;

~MemoryMappedFile() { Close(); }

// Non-copyable, non-movable: a single owner unmaps exactly once.
MemoryMappedFile(const MemoryMappedFile&) = delete;
MemoryMappedFile& operator=(const MemoryMappedFile&) = delete;

// Maps the entire file read-only. Returns false (and leaves the object closed) on failure.
bool Open(const std::string& p_path)
{
Close();
#ifdef _MSC_VER
HANDLE file = ::CreateFileA(p_path.c_str(), GENERIC_READ, FILE_SHARE_READ, nullptr,
OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, nullptr);
if (file == INVALID_HANDLE_VALUE) return false;

LARGE_INTEGER size;
if (!::GetFileSizeEx(file, &size) || size.QuadPart <= 0)
{
::CloseHandle(file);
return false;
}

HANDLE mapping = ::CreateFileMappingA(file, nullptr, PAGE_READONLY, 0, 0, nullptr);
if (mapping == nullptr)
{
::CloseHandle(file);
return false;
}

void* view = ::MapViewOfFile(mapping, FILE_MAP_READ, 0, 0, 0);
if (view == nullptr)
{
::CloseHandle(mapping);
::CloseHandle(file);
return false;
}

m_base = reinterpret_cast<std::uint8_t*>(view);
m_length = static_cast<std::uint64_t>(size.QuadPart);
m_file = file;
m_mapping = mapping;
return true;
#else
int fd = ::open(p_path.c_str(), O_RDONLY);
if (fd < 0) return false;

struct stat st;
if (::fstat(fd, &st) != 0 || st.st_size <= 0)
{
::close(fd);
return false;
}

void* p = ::mmap(nullptr, static_cast<size_t>(st.st_size), PROT_READ, MAP_SHARED, fd, 0);
// The mapping keeps its own reference to the file, so the descriptor can be closed now.
::close(fd);
if (p == MAP_FAILED) return false;

#if defined(MADV_RANDOM)
// BKT build accesses vectors randomly by id; suppress readahead to keep RSS tight.
::madvise(p, static_cast<size_t>(st.st_size), MADV_RANDOM);
#endif
m_base = reinterpret_cast<std::uint8_t*>(p);
m_length = static_cast<std::uint64_t>(st.st_size);
return true;
#endif
}

void Close()
{
if (m_base == nullptr) return;
#ifdef _MSC_VER
::UnmapViewOfFile(m_base);
if (m_mapping != nullptr) ::CloseHandle(m_mapping);
if (m_file != nullptr) ::CloseHandle(m_file);
m_mapping = nullptr;
m_file = nullptr;
#else
::munmap(m_base, static_cast<size_t>(m_length));
#endif
m_base = nullptr;
m_length = 0;
}

const std::uint8_t* Data() const { return m_base; }

std::uint64_t Length() const { return m_length; }

bool IsOpen() const { return m_base != nullptr; }

private:
std::uint8_t* m_base = nullptr;
std::uint64_t m_length = 0;
#ifdef _MSC_VER
void* m_file = nullptr;
void* m_mapping = nullptr;
#endif
};

} // namespace Helper
} // namespace SPTAG

#endif // _SPTAG_HELPER_MEMORYMAPPEDFILE_H_
4 changes: 4 additions & 0 deletions AnnService/inc/Helper/VectorSetReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ class ReaderOptions : public ArgumentsParser
std::uint32_t m_threadNum;

bool m_normalized;

// When true (and the file type supports it), map the input vector file instead of
// reading it fully into RAM, so the OS demand-pages it and RSS stays bounded.
bool m_useMmap = false;
};

class VectorSetReader
Expand Down
10 changes: 10 additions & 0 deletions AnnService/src/Core/SPANN/SPANNIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1246,6 +1246,16 @@ template <typename T> ErrorCode Index<T>::BuildIndex(bool p_normalized)
std::shared_ptr<Helper::ReaderOptions> vectorOptions(
new Helper::ReaderOptions(valueType, dim, m_options.m_vectorType, m_options.m_vectorDelimiter,
m_options.m_iSSDNumberOfThreads, p_normalized));
// A read-only mapping cannot satisfy the in-place Normalize done in SelectHead, so mmap is
// only enabled when no normalization is required (L2, or already-normalized Cosine input).
bool needNormalize = (m_options.m_distCalcMethod == DistCalcMethod::Cosine) && !p_normalized;
vectorOptions->m_useMmap = m_options.m_mmapVectors && !needNormalize;
if (m_options.m_mmapVectors && needNormalize)
{
SPTAGLIB_LOG(Helper::LogLevel::LL_Warning,
"MmapVectors disabled: Cosine with un-normalized input requires in-place "
"Normalize; using in-memory load. Pre-normalize input or use L2 to enable mmap.\n");
}
auto vectorReader = Helper::VectorSetReader::CreateInstance(vectorOptions);
if (m_options.m_vectorPath.empty())
{
Expand Down
2 changes: 1 addition & 1 deletion AnnService/src/Core/VectorIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1185,7 +1185,7 @@ void VectorIndex::ApproximateRNG(std::shared_ptr<VectorSet> &fullVectors, std::u
default:
SPTAGLIB_LOG(
Helper::LogLevel::LL_Error, "Unable to get quantizer reconstruct type %s",
Helper::Convert::ConvertToString<VectorValueType>(m_pQuantizer->GetReconstructType()));
Helper::Convert::ConvertToString<VectorValueType>(m_pQuantizer->GetReconstructType()).c_str());
}
}
else
Expand Down
35 changes: 35 additions & 0 deletions AnnService/src/Helper/VectorSetReaders/DefaultReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "inc/Helper/VectorSetReaders/DefaultReader.h"
#include "inc/Core/VectorIndex.h"
#include "inc/Helper/CommonHelper.h"
#include "inc/Helper/MemoryMappedFile.h"

using namespace SPTAG;
using namespace SPTAG::Helper;
Expand Down Expand Up @@ -33,6 +34,40 @@ ErrorCode DefaultVectorReader::LoadFile(const std::string &p_filePaths)

std::shared_ptr<VectorSet> DefaultVectorReader::GetVectorSet(SizeType start, SizeType end) const
{
if (m_options->m_useMmap)
{
auto mmf = std::make_shared<Helper::MemoryMappedFile>();
if (mmf->Open(m_vectorOutput) &&
mmf->Length() >= (std::uint64_t)(sizeof(SizeType) + sizeof(DimensionType)))
{
const std::uint8_t *base = mmf->Data();
SizeType row = *reinterpret_cast<const SizeType *>(base);
DimensionType col = *reinterpret_cast<const DimensionType *>(base + sizeof(SizeType));

if (start > row)
start = row;
if (end < 0 || end > row)
end = row;

std::uint64_t perVec = ((std::uint64_t)GetValueTypeSize(m_options->m_inputValueType)) * col;
std::uint64_t dataOffset = sizeof(SizeType) + sizeof(DimensionType) + ((std::uint64_t)start) * perVec;
std::uint64_t dataLen = ((std::uint64_t)(end - start)) * perVec;

if (dataOffset + dataLen <= mmf->Length())
{
std::uint8_t *dataPtr = const_cast<std::uint8_t *>(base) + dataOffset;
// Aliasing shared_ptr: shares ownership with mmf (so the mapping outlives every
// copy of the ByteArray / BasicVectorSet) but get() returns the data pointer.
std::shared_ptr<std::uint8_t> holder(mmf, dataPtr);
ByteArray vectorSet(dataPtr, dataLen, holder);
SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "Mmap Vector(%d,%d)\n", end - start, col);
return std::make_shared<BasicVectorSet>(vectorSet, m_options->m_inputValueType, col, end - start);
}
}
SPTAGLIB_LOG(Helper::LogLevel::LL_Warning,
"mmap failed for %s, falling back to in-memory read.\n", m_vectorOutput.c_str());
}

auto ptr = f_createIO();
if (ptr == nullptr || !ptr->Initialize(m_vectorOutput.c_str(), std::ios::binary | std::ios::in))
{
Expand Down
4 changes: 3 additions & 1 deletion AnnService/src/Helper/VectorSetReaders/TxtReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
#include "inc/Core/VectorIndex.h"
#include "inc/Helper/CommonHelper.h"
#include "inc/Helper/StringConvert.h"
#include <omp.h>
// <omp.h> was included historically but no OpenMP primitives are actually used
// in this translation unit. Drop it — ClickHouse's contrib build does not
// configure an OpenMP runtime.

using namespace SPTAG;
using namespace SPTAG::Helper;
Expand Down