diff --git a/crates/k_means/src/lib.rs b/crates/k_means/src/lib.rs index 508fb7d9..aab50292 100644 --- a/crates/k_means/src/lib.rs +++ b/crates/k_means/src/lib.rs @@ -16,23 +16,9 @@ use rabitq::bit::block::BlockCode; use rabitq::packing::{any_pack, padding_pack}; use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; -use rayon::iter::{IntoParallelIterator, IntoParallelRefMutIterator, ParallelIterator}; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; use simd::Floating; -pub fn preprocess(num_threads: usize, x: &mut [T], f: impl Fn(&mut T) + Sync) { - rayon::ThreadPoolBuilder::new() - .num_threads(num_threads) - .build_scoped( - |thread| thread.run(), - move |pool| { - pool.install(|| { - x.par_iter_mut().for_each(&f); - }); - }, - ) - .expect("failed to build thread pool") -} - pub fn k_means( num_threads: usize, mut check: impl FnMut(usize), @@ -57,7 +43,7 @@ pub fn k_means( if n >= 1024 && c >= 1024 { rabitq_index(n, c, samples, centroids) } else { - flat_index(dims, n, c, samples, centroids) + flat_index(n, c, samples, centroids) } }; let mut lloyd_k_means = @@ -75,18 +61,6 @@ pub fn k_means( } } -pub fn k_means_lookup(vector: &[f32], centroids: &[Vec]) -> usize { - assert_ne!(centroids.len(), 0); - let mut result = (f32::INFINITY, 0); - for i in 0..centroids.len() { - let dis = f32::reduce_sum_of_d2(vector, ¢roids[i]); - if dis <= result.0 { - result = (dis, i); - } - } - result.1 -} - fn quick_centers( c: usize, dims: usize, @@ -175,13 +149,7 @@ fn rabitq_index(n: usize, c: usize, samples: &[Vec], centroids: &[Vec] .collect::>() } -fn flat_index( - _dims: usize, - n: usize, - c: usize, - samples: &[Vec], - centroids: &[Vec], -) -> Vec { +fn flat_index(n: usize, c: usize, samples: &[Vec], centroids: &[Vec]) -> Vec { (0..n) .into_par_iter() .map(|i| { diff --git a/crates/rabitq/src/bit.rs b/crates/rabitq/src/bit.rs index 3d8fe9e2..27dd5237 100644 --- a/crates/rabitq/src/bit.rs +++ b/crates/rabitq/src/bit.rs @@ -73,15 +73,9 @@ pub fn code(vector: &[f32]) -> Code { CodeMetadata { dis_u_2: sum_of_x_2, factor_cnt: { - let cnt_pos = vector - .iter() - .map(|x| x.is_sign_positive() as i32) - .sum::(); - let cnt_neg = vector - .iter() - .map(|x| x.is_sign_negative() as i32) - .sum::(); - (cnt_pos - cnt_neg) as f32 + let cnt_pos = vector.iter().filter(|x| x.is_sign_positive()).count(); + let cnt_neg = vector.iter().filter(|x| x.is_sign_negative()).count(); + cnt_pos as f32 - cnt_neg as f32 }, factor_ip: sum_of_x_2 / sum_of_abs_x, factor_err: { diff --git a/crates/rabitq/src/packing.rs b/crates/rabitq/src/packing.rs index 008140dd..79c8e2a4 100644 --- a/crates/rabitq/src/packing.rs +++ b/crates/rabitq/src/packing.rs @@ -101,14 +101,15 @@ pub fn any_pack(mut x: impl Iterator) -> [T; 32] { std::array::from_fn(|_| x.next()).map(|x| x.unwrap_or_default()) } -pub fn pack_to_u4(signs: &[bool]) -> Vec { - fn f(x: [bool; 4]) -> u8 { - x[0] as u8 | (x[1] as u8) << 1 | (x[2] as u8) << 2 | (x[3] as u8) << 3 - } - let mut result = Vec::with_capacity(signs.len().div_ceil(4)); - for i in 0..signs.len().div_ceil(4) { - let x = std::array::from_fn(|j| signs.get(i * 4 + j).copied().unwrap_or_default()); - result.push(f(x)); - } - result +pub fn pack_to_u4(input: &[bool]) -> Vec { + let f = |t: &[bool; 4]| t[0] as u8 | (t[1] as u8) << 1 | (t[2] as u8) << 2 | (t[3] as u8) << 3; + let (arrays, remainder) = input.as_chunks::<4>(); + let mut buffer = [false; 4]; + let tailing = if !remainder.is_empty() { + buffer[..remainder.len()].copy_from_slice(remainder); + Some(&buffer) + } else { + None + }; + arrays.iter().chain(tailing).map(f).collect() } diff --git a/crates/vchordrq/src/bulkdelete.rs b/crates/vchordrq/src/bulkdelete.rs index 00a446ff..3f2556fd 100644 --- a/crates/vchordrq/src/bulkdelete.rs +++ b/crates/vchordrq/src/bulkdelete.rs @@ -43,7 +43,7 @@ pub fn bulkdelete( for first in state { tape::read_h1_tape::( by_next(index, first).inspect(|_| check()), - || FunctionalAccessor::new((), id_0(|_, _| ()), id_1(|_, _| [(); 32])), + || FunctionalAccessor::new((), id_0(|_, _| ()), id_1(|_, _| [(); _])), |(), _, _, first, _| results.push(first), ); } diff --git a/crates/vchordrq/src/cache.rs b/crates/vchordrq/src/cache.rs index 0a21f3cd..eecc7275 100644 --- a/crates/vchordrq/src/cache.rs +++ b/crates/vchordrq/src/cache.rs @@ -39,7 +39,7 @@ where for first in state { tape::read_h1_tape::( by_next(index, first).inspect(|guard| trace.push(guard.id())), - || FunctionalAccessor::new((), id_0(|_, _| ()), id_1(|_, _| [(); 32])), + || FunctionalAccessor::new((), id_0(|_, _| ()), id_1(|_, _| [(); _])), |(), _, _, first, _| { results.push(first); }, diff --git a/crates/vchordrq/src/insert.rs b/crates/vchordrq/src/insert.rs index 3d6b337f..77d1c448 100644 --- a/crates/vchordrq/src/insert.rs +++ b/crates/vchordrq/src/insert.rs @@ -77,25 +77,23 @@ pub fn insert<'b, R: RelationRead + RelationWrite, O: Operator>( let epsilon = 1.9; type State = (Reverse, AlwaysEqual, AlwaysEqual); - let mut state: State = if !is_residual { - let first = meta_tuple.first(); - // it's safe to leave it a fake value - ( - Reverse(Distance::ZERO), - AlwaysEqual(0.0), - AlwaysEqual(first), - ) - } else { + let mut state: State = if is_residual { let prefetch = BorrowedIter::from_slice(meta_tuple.centroid_prefetch(), |x| bump.alloc_slice(x)); let head = meta_tuple.centroid_head(); - let norm = meta_tuple.centroid_norm(); - let first = meta_tuple.first(); let distance = vectors::read_for_h1_tuple::( prefetch.map(|id| index.read(id)), head, LAccess::new(O::Vector::unpack(vector), O::DistanceAccessor::default()), ); + let norm = meta_tuple.centroid_norm(); + let first = meta_tuple.first(); + (Reverse(distance), AlwaysEqual(norm), AlwaysEqual(first)) + } else { + // fast path + let distance = Distance::ZERO; + let norm = meta_tuple.centroid_norm(); + let first = meta_tuple.first(); (Reverse(distance), AlwaysEqual(norm), AlwaysEqual(first)) }; diff --git a/crates/vchordrq/src/maintain.rs b/crates/vchordrq/src/maintain.rs index 3d559c50..4b7b1866 100644 --- a/crates/vchordrq/src/maintain.rs +++ b/crates/vchordrq/src/maintain.rs @@ -57,7 +57,7 @@ where for first in state { tape::read_h1_tape::( by_next(index, first).inspect(|_| check()), - || FunctionalAccessor::new((), id_0(|_, _| ()), id_1(|_, _| [(); 32])), + || FunctionalAccessor::new((), id_0(|_, _| ()), id_1(|_, _| [(); _])), |(), _, _, first, _| results.push(first), ); } diff --git a/crates/vchordrq/src/prewarm.rs b/crates/vchordrq/src/prewarm.rs index d8066f69..6b33a165 100644 --- a/crates/vchordrq/src/prewarm.rs +++ b/crates/vchordrq/src/prewarm.rs @@ -64,7 +64,7 @@ where for first in state { tape::read_h1_tape::( by_next(index, first).inspect(|_| counter += 1), - || FunctionalAccessor::new((), id_0(|_, _| ()), id_1(|_, _| [(); 32])), + || FunctionalAccessor::new((), id_0(|_, _| ()), id_1(|_, _| [(); _])), |(), head, _, first, prefetch| { vectors::read_for_h1_tuple::( prefetch.iter().map(|&id| index.read(id)), @@ -97,7 +97,7 @@ where tape::read_directory_tape::(by_next(index, jump_tuple.directory_first())); tape::read_frozen_tape::( by_directory(&mut prefetch_h0_tuples, directory).inspect(|_| counter += 1), - || FunctionalAccessor::new((), id_0(|_, _| ()), id_1(|_, _| [(); 32])), + || FunctionalAccessor::new((), id_0(|_, _| ()), id_1(|_, _| [(); _])), id_2(|_, _, _, _| { results.push(()); }), @@ -105,7 +105,7 @@ where } else { tape::read_frozen_tape::( by_next(index, jump_tuple.frozen_first()).inspect(|_| counter += 1), - || FunctionalAccessor::new((), id_0(|_, _| ()), id_1(|_, _| [(); 32])), + || FunctionalAccessor::new((), id_0(|_, _| ()), id_1(|_, _| [(); _])), id_2(|_, _, _, _| { results.push(()); }), diff --git a/crates/vchordrq/src/search.rs b/crates/vchordrq/src/search.rs index 89e21362..4b3bd2cd 100644 --- a/crates/vchordrq/src/search.rs +++ b/crates/vchordrq/src/search.rs @@ -12,13 +12,13 @@ // // Copyright (c) 2025 TensorChord Inc. -use crate::closure_lifetime_binder::id_2; +use crate::closure_lifetime_binder::{id_0, id_1, id_2}; use crate::linked_vec::LinkedVec; use crate::operator::*; use crate::tape::{by_directory, by_next}; use crate::tuples::*; use crate::{Opaque, Page, tape, vectors}; -use algo::accessor::LAccess; +use algo::accessor::{FunctionalAccessor, LAccess}; use algo::prefetcher::{Prefetcher, PrefetcherHeapFamily, PrefetcherSequenceFamily}; use algo::{BorrowedIter, Bump, PackedRefMut4, PackedRefMut8, RelationRead}; use always_equal::AlwaysEqual; @@ -51,6 +51,7 @@ where let dims = meta_tuple.dims(); let is_residual = meta_tuple.is_residual(); let height_of_root = meta_tuple.height_of_root(); + let cells = meta_tuple.cells().to_vec(); assert_eq!(dims, vector.dims(), "unmatched dimensions"); if height_of_root as usize != 1 + probes.len() { panic!( @@ -59,27 +60,26 @@ where probes.len() ); } + debug_assert_eq!(cells[(height_of_root - 1) as usize], 1); type State = Vec<(Reverse, AlwaysEqual, AlwaysEqual)>; - let mut state: State = if !is_residual { - let first = meta_tuple.first(); - // it's safe to leave it a fake value - vec![( - Reverse(Distance::ZERO), - AlwaysEqual(0.0), - AlwaysEqual(first), - )] - } else { + let mut state: State = if is_residual { let prefetch = BorrowedIter::from_slice(meta_tuple.centroid_prefetch(), |x| bump.alloc_slice(x)); let head = meta_tuple.centroid_head(); - let norm = meta_tuple.centroid_norm(); - let first = meta_tuple.first(); let distance = vectors::read_for_h1_tuple::( prefetch.map(|id| index.read(id)), head, LAccess::new(O::Vector::unpack(vector), O::DistanceAccessor::default()), ); + let norm = meta_tuple.centroid_norm(); + let first = meta_tuple.first(); + vec![(Reverse(distance), AlwaysEqual(norm), AlwaysEqual(first))] + } else { + // fast path + let distance = Distance::ZERO; + let norm = meta_tuple.centroid_norm(); + let first = meta_tuple.first(); vec![(Reverse(distance), AlwaysEqual(norm), AlwaysEqual(first))] }; @@ -124,7 +124,27 @@ where }; for i in 1..height_of_root { - state = step(state).take(probes[i as usize - 1] as _).collect(); + let partial_scan = probes[i as usize - 1] < cells[(height_of_root - 1 - i) as usize]; + if partial_scan || is_residual { + state = step(state).take(probes[i as usize - 1] as _).collect(); + } else { + // fast path + let mut results = LinkedVec::new(); + for (Reverse(_), AlwaysEqual(_), AlwaysEqual(first)) in state { + tape::read_h1_tape::( + by_next(index, first), + || FunctionalAccessor::new((), id_0(|_, _| ()), id_1(|_, _| [(); _])), + |(), _, norm, first, _| { + results.push(( + Reverse(Distance::ZERO), + AlwaysEqual(norm), + AlwaysEqual(first), + )); + }, + ); + } + state = results.into_vec(); + } } let mut results = LinkedVec::<(_, AlwaysEqual<_>)>::new(); @@ -192,6 +212,7 @@ where let dims = meta_tuple.dims(); let is_residual = meta_tuple.is_residual(); let height_of_root = meta_tuple.height_of_root(); + let cells = meta_tuple.cells().to_vec(); assert_eq!(dims, vector.dims(), "unmatched dimensions"); if height_of_root as usize != 1 + probes.len() { panic!( @@ -200,27 +221,26 @@ where probes.len() ); } + debug_assert_eq!(cells[(height_of_root - 1) as usize], 1); type State = Vec<(Reverse, AlwaysEqual, AlwaysEqual)>; - let mut state: State = if !is_residual { - let first = meta_tuple.first(); - // it's safe to leave it a fake value - vec![( - Reverse(Distance::ZERO), - AlwaysEqual(0.0), - AlwaysEqual(first), - )] - } else { + let mut state: State = if is_residual { let prefetch = BorrowedIter::from_slice(meta_tuple.centroid_prefetch(), |x| bump.alloc_slice(x)); let head = meta_tuple.centroid_head(); - let norm = meta_tuple.centroid_norm(); - let first = meta_tuple.first(); let distance = vectors::read_for_h1_tuple::( prefetch.map(|id| index.read(id)), head, LAccess::new(O::Vector::unpack(vector), O::DistanceAccessor::default()), ); + let norm = meta_tuple.centroid_norm(); + let first = meta_tuple.first(); + vec![(Reverse(distance), AlwaysEqual(norm), AlwaysEqual(first))] + } else { + // fast path + let distance = Distance::ZERO; + let norm = meta_tuple.centroid_norm(); + let first = meta_tuple.first(); vec![(Reverse(distance), AlwaysEqual(norm), AlwaysEqual(first))] }; @@ -266,8 +286,29 @@ where let mut it = None; for i in 1..height_of_root { - let it = it.insert(step(state)); - state = it.take(probes[i as usize - 1] as _).collect(); + let partial_scan = probes[i as usize - 1] < cells[(height_of_root - 1 - i) as usize]; + let needs_sort = i + 1 == height_of_root && threshold != 0; + if partial_scan || is_residual || needs_sort { + let it = it.insert(step(state)); + state = it.take(probes[i as usize - 1] as _).collect(); + } else { + // fast path + let mut results = LinkedVec::new(); + for (Reverse(_), AlwaysEqual(_), AlwaysEqual(first)) in state { + tape::read_h1_tape::( + by_next(index, first), + || FunctionalAccessor::new((), id_0(|_, _| ()), id_1(|_, _| [(); _])), + |(), _, norm, first, _| { + results.push(( + Reverse(Distance::ZERO), + AlwaysEqual(norm), + AlwaysEqual(first), + )); + }, + ); + } + state = results.into_vec(); + } } let mut results = LinkedVec::<(_, AlwaysEqual<_>)>::new(); diff --git a/crates/vchordrq/src/types.rs b/crates/vchordrq/src/types.rs index 0f759bd1..9296dfd5 100644 --- a/crates/vchordrq/src/types.rs +++ b/crates/vchordrq/src/types.rs @@ -79,7 +79,7 @@ impl VectorKind { } } -#[derive(Debug, Clone, Serialize, Deserialize, Validate)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize, Validate)] #[serde(deny_unknown_fields)] #[validate(schema(function = "Self::validate_self"))] pub struct VectorOptions { diff --git a/src/index/vchordg/am/am_build.rs b/src/index/vchordg/am/am_build.rs index a8cb518e..fbc63f56 100644 --- a/src/index/vchordg/am/am_build.rs +++ b/src/index/vchordg/am/am_build.rs @@ -197,27 +197,20 @@ pub unsafe extern "C-unwind" fn ambuild( let errors = "alpha not equal to `1.0` are only applicable to l2 and cosine distance."; pgrx::warning!("warning while validating options: {errors}"); } - let opfamily = unsafe { opfamily(index_relation) }; let index = unsafe { PostgresRelation::new(index_relation) }; - let heap = Heap { - heap_relation, - index_relation, - index_info, - opfamily, - scan: std::ptr::null_mut(), - }; let mut reporter = PostgresReporter {}; crate::index::vchordg::algo::build(vector_options, vchordg_options.index, &index); reporter.phase(BuildPhase::from_code(BuildPhaseCode::Inserting)); - let cache = vchordg_cached::VchordgCached::_0 {}; + let cached = vchordg_cached::VchordgCached::_0 {}.serialize(); if let Some(leader) = unsafe { VchordgLeader::enter( heap_relation, index_relation, (*index_info).ii_Concurrent, - cache, + &cached, ) } { + drop(cached); unsafe { parallel_build( index_relation, @@ -249,18 +242,18 @@ pub unsafe extern "C-unwind" fn ambuild( pgrx::pg_sys::ConditionVariableCancelSleep(); } } else { - let mut indtuples = 0; - reporter.tuples_done(indtuples); - heap.traverse(true, |(ctid, store)| { - for (vector, extra) in store { - let key = ctid_to_key(ctid); - let payload = kv_to_pointer((key, extra)); - crate::index::vchordg::algo::insert(opfamily, &index, payload, vector); - } - indtuples += 1; - reporter.tuples_done(indtuples); - }); - reporter.tuples_total(indtuples); + unsafe { + let indtuples = sequential_build( + index_relation, + heap_relation, + index_info, + &cached, + |indtuples| { + reporter.tuples_done(indtuples); + }, + ); + reporter.tuples_total(indtuples); + } } unsafe { pgrx::pgbox::PgBox::::alloc0().into_pg() } } @@ -304,13 +297,8 @@ impl VchordgLeader { heap_relation: pgrx::pg_sys::Relation, index_relation: pgrx::pg_sys::Relation, isconcurrent: bool, - cache: vchordg_cached::VchordgCached, + cached: &[u8], ) -> Option { - let _cache = cache.serialize(); - #[expect(clippy::drop_non_drop)] - drop(cache); - let cache = _cache; - unsafe fn compute_parallel_workers( heap_relation: pgrx::pg_sys::Relation, index_relation: pgrx::pg_sys::Relation, @@ -372,7 +360,7 @@ impl VchordgLeader { estimate_keys(&mut (*pcxt).estimator, 1); estimate_chunk(&mut (*pcxt).estimator, est_tablescandesc); estimate_keys(&mut (*pcxt).estimator, 1); - estimate_chunk(&mut (*pcxt).estimator, 8 + cache.len()); + estimate_chunk(&mut (*pcxt).estimator, 8 + cached.len()); estimate_keys(&mut (*pcxt).estimator, 1); } @@ -414,9 +402,9 @@ impl VchordgLeader { }; let vchordgcached = unsafe { - let x = pgrx::pg_sys::shm_toc_allocate((*pcxt).toc, 8 + cache.len()).cast::(); - (x as *mut u64).write_unaligned(cache.len() as _); - std::ptr::copy(cache.as_ptr(), x.add(8), cache.len()); + let x = pgrx::pg_sys::shm_toc_allocate((*pcxt).toc, 8 + cached.len()).cast::(); + (x as *mut u64).write_unaligned(cached.len() as _); + std::ptr::copy(cached.as_ptr(), x.add(8), cached.len()); x }; @@ -583,6 +571,43 @@ unsafe fn parallel_build( } } +unsafe fn sequential_build( + index_relation: pgrx::pg_sys::Relation, + heap_relation: pgrx::pg_sys::Relation, + index_info: *mut pgrx::pg_sys::IndexInfo, + vchordgcached: &[u8], + mut callback: impl FnMut(u64), +) -> u64 { + use vchordg_cached::VchordgCachedReader; + let cached = VchordgCachedReader::deserialize_ref(vchordgcached); + let index = unsafe { PostgresRelation::new(index_relation) }; + + let opfamily = unsafe { opfamily(index_relation) }; + let heap = Heap { + heap_relation, + index_relation, + index_info, + opfamily, + scan: std::ptr::null_mut(), + }; + + let mut indtuples = 0; + match cached { + VchordgCachedReader::_0(_) => { + heap.traverse(true, |(ctid, store)| { + for (vector, extra) in store { + let key = ctid_to_key(ctid); + let payload = kv_to_pointer((key, extra)); + crate::index::vchordg::algo::insert(opfamily, &index, payload, vector); + } + indtuples += 1; + callback(indtuples); + }); + } + } + indtuples +} + #[pgrx::pg_guard] pub unsafe extern "C-unwind" fn ambuildempty(_index_relation: pgrx::pg_sys::Relation) { pgrx::error!("Unlogged indexes are not supported."); diff --git a/src/index/vchordrq/am/am_build.rs b/src/index/vchordrq/am/am_build.rs index 8bf65eff..df476cb7 100644 --- a/src/index/vchordrq/am/am_build.rs +++ b/src/index/vchordrq/am/am_build.rs @@ -34,11 +34,12 @@ use vector::vect::VectOwned; #[repr(u16)] pub enum BuildPhaseCode { Initializing = 0, - InternalBuild = 1, - ExternalBuild = 2, - Build = 3, - Inserting = 4, - Compacting = 5, + DefaultBuild = 1, + InternalBuild = 2, + ExternalBuild = 3, + Build = 4, + Inserting = 5, + Compacting = 6, } pub struct BuildPhase(BuildPhaseCode, u16); @@ -47,6 +48,7 @@ impl BuildPhase { pub const fn new(code: BuildPhaseCode, k: u16) -> Option { match (code, k) { (BuildPhaseCode::Initializing, 0) => Some(BuildPhase(code, k)), + (BuildPhaseCode::DefaultBuild, 0) => Some(BuildPhase(code, k)), (BuildPhaseCode::InternalBuild, 0..102) => Some(BuildPhase(code, k)), (BuildPhaseCode::ExternalBuild, 0) => Some(BuildPhase(code, k)), (BuildPhaseCode::Build, 0) => Some(BuildPhase(code, k)), @@ -61,6 +63,10 @@ impl BuildPhase { static RAW: [&CStr; 1] = [c"initializing"]; RAW[k as usize] } + BuildPhase(BuildPhaseCode::DefaultBuild, k) => { + static RAW: [&CStr; 1] = [c"initializing index, by default build"]; + RAW[k as usize] + } BuildPhase(BuildPhaseCode::InternalBuild, k) => { static RAWS: [&[&CStr]; 2] = [ &[c"initializing index, by internal build"], @@ -122,6 +128,7 @@ impl BuildPhase { } pub const fn from_value(value: u32) -> Option { const INITIALIZING: u16 = BuildPhaseCode::Initializing as _; + const DEFAULT_BUILD: u16 = BuildPhaseCode::DefaultBuild as _; const INTERNAL_BUILD: u16 = BuildPhaseCode::InternalBuild as _; const EXTERNAL_BUILD: u16 = BuildPhaseCode::ExternalBuild as _; const BUILD: u16 = BuildPhaseCode::Build as _; @@ -130,6 +137,7 @@ impl BuildPhase { let k = value as u16; match (value >> 16) as u16 { INITIALIZING => Self::new(BuildPhaseCode::Initializing, k), + DEFAULT_BUILD => Self::new(BuildPhaseCode::DefaultBuild, k), INTERNAL_BUILD => Self::new(BuildPhaseCode::InternalBuild, k), EXTERNAL_BUILD => Self::new(BuildPhaseCode::ExternalBuild, k), BUILD => Self::new(BuildPhaseCode::Build, k), @@ -276,12 +284,11 @@ pub unsafe extern "C-unwind" fn ambuild( scan: std::ptr::null_mut(), }; let mut reporter = PostgresReporter {}; - let structures = match vchordrq_options.build.source.clone() { - VchordrqBuildSourceOptions::External(external_build) => { - reporter.phase(BuildPhase::from_code(BuildPhaseCode::ExternalBuild)); - let reltuples = unsafe { (*(*index_relation).rd_rel).reltuples }; - reporter.tuples_total(reltuples as u64); - make_external_build(vector_options.clone(), opfamily, external_build.clone()) + reporter.tuples_total(unsafe { (*(*index_relation).rd_rel).reltuples as u64 }); + let mut structures = match vchordrq_options.build.source.clone() { + VchordrqBuildSourceOptions::Default(default_build) => { + reporter.phase(BuildPhase::from_code(BuildPhaseCode::DefaultBuild)); + make_default_build(vector_options, default_build) } VchordrqBuildSourceOptions::Internal(internal_build) => { reporter.phase(BuildPhase::from_code(BuildPhaseCode::InternalBuild)); @@ -321,18 +328,22 @@ pub unsafe extern "C-unwind" fn ambuild( samples }; reporter.tuples_total(tuples_total); - make_internal_build( - vector_options.clone(), - internal_build.clone(), - samples, - &mut reporter, - ) + make_internal_build(vector_options, internal_build, samples, &mut reporter) + } + VchordrqBuildSourceOptions::External(external_build) => { + reporter.phase(BuildPhase::from_code(BuildPhaseCode::ExternalBuild)); + make_external_build(vector_options, opfamily, external_build) } }; + for structure in structures.iter_mut() { + for centroid in structure.centroids.iter_mut() { + *centroid = rabitq::rotate::rotate(centroid); + } + } reporter.phase(BuildPhase::from_code(BuildPhaseCode::Build)); crate::index::vchordrq::algo::build(vector_options, vchordrq_options.index, &index, structures); reporter.phase(BuildPhase::from_code(BuildPhaseCode::Inserting)); - let cache = if vchordrq_options.build.pin { + let cached = if vchordrq_options.build.pin { let mut trace = vchordrq::cache(&index); trace.sort(); trace.dedup(); @@ -349,15 +360,17 @@ pub unsafe extern "C-unwind" fn ambuild( } } else { vchordrq_cached::VchordrqCached::_0 {} - }; + } + .serialize(); if let Some(leader) = unsafe { VchordrqLeader::enter( heap_relation, index_relation, (*index_info).ii_Concurrent, - cache, + &cached, ) } { + drop(cached); unsafe { parallel_build( index_relation, @@ -389,18 +402,18 @@ pub unsafe extern "C-unwind" fn ambuild( pgrx::pg_sys::ConditionVariableCancelSleep(); } } else { - let mut indtuples = 0; - reporter.tuples_done(indtuples); - heap.traverse(true, |(ctid, store)| { - for (vector, extra) in store { - let key = ctid_to_key(ctid); - let payload = kv_to_pointer((key, extra)); - crate::index::vchordrq::algo::insert(opfamily, &index, payload, vector, true); - } - indtuples += 1; - reporter.tuples_done(indtuples); - }); - reporter.tuples_total(indtuples); + unsafe { + let indtuples = sequential_build( + index_relation, + heap_relation, + index_info, + &cached, + |indtuples| { + reporter.tuples_done(indtuples); + }, + ); + reporter.tuples_total(indtuples); + } } let check = || { pgrx::check_for_interrupts!(); @@ -581,12 +594,8 @@ impl VchordrqLeader { heap_relation: pgrx::pg_sys::Relation, index_relation: pgrx::pg_sys::Relation, isconcurrent: bool, - cache: vchordrq_cached::VchordrqCached, + vchordrq_cached: &[u8], ) -> Option { - let _cache = cache.serialize(); - drop(cache); - let cache = _cache; - unsafe fn compute_parallel_workers( heap_relation: pgrx::pg_sys::Relation, index_relation: pgrx::pg_sys::Relation, @@ -648,7 +657,7 @@ impl VchordrqLeader { estimate_keys(&mut (*pcxt).estimator, 1); estimate_chunk(&mut (*pcxt).estimator, est_tablescandesc); estimate_keys(&mut (*pcxt).estimator, 1); - estimate_chunk(&mut (*pcxt).estimator, 8 + cache.len()); + estimate_chunk(&mut (*pcxt).estimator, 8 + vchordrq_cached.len()); estimate_keys(&mut (*pcxt).estimator, 1); } @@ -690,9 +699,10 @@ impl VchordrqLeader { }; let vchordrqcached = unsafe { - let x = pgrx::pg_sys::shm_toc_allocate((*pcxt).toc, 8 + cache.len()).cast::(); - (x as *mut u64).write_unaligned(cache.len() as _); - std::ptr::copy(cache.as_ptr(), x.add(8), cache.len()); + let x = + pgrx::pg_sys::shm_toc_allocate((*pcxt).toc, 8 + vchordrq_cached.len()).cast::(); + (x as *mut u64).write_unaligned(vchordrq_cached.len() as _); + std::ptr::copy(vchordrq_cached.as_ptr(), x.add(8), vchordrq_cached.len()); x }; @@ -882,6 +892,58 @@ unsafe fn parallel_build( } } +unsafe fn sequential_build( + index_relation: pgrx::pg_sys::Relation, + heap_relation: pgrx::pg_sys::Relation, + index_info: *mut pgrx::pg_sys::IndexInfo, + vchordrqcached: &[u8], + mut callback: impl FnMut(u64), +) -> u64 { + use vchordrq_cached::VchordrqCachedReader; + let cached = VchordrqCachedReader::deserialize_ref(vchordrqcached); + let index = unsafe { PostgresRelation::new(index_relation) }; + + let opfamily = unsafe { opfamily(index_relation) }; + let heap = Heap { + heap_relation, + index_relation, + index_info, + opfamily, + scan: std::ptr::null_mut(), + }; + + let mut indtuples = 0; + match cached { + VchordrqCachedReader::_0(_) => { + heap.traverse(true, |(ctid, store)| { + for (vector, extra) in store { + let key = ctid_to_key(ctid); + let payload = kv_to_pointer((key, extra)); + crate::index::vchordrq::algo::insert(opfamily, &index, payload, vector, true); + } + indtuples += 1; + callback(indtuples); + }); + } + VchordrqCachedReader::_1(cached) => { + let index = CachingRelation { + cache: cached, + relation: index, + }; + heap.traverse(true, |(ctid, store)| { + for (vector, extra) in store { + let key = ctid_to_key(ctid); + let payload = kv_to_pointer((key, extra)); + crate::index::vchordrq::algo::insert(opfamily, &index, payload, vector, true); + } + indtuples += 1; + callback(indtuples); + }); + } + } + indtuples +} + #[pgrx::pg_guard] pub unsafe extern "C-unwind" fn ambuildempty(_index_relation: pgrx::pg_sys::Relation) { pgrx::error!("Unlogged indexes are not supported."); @@ -945,16 +1007,23 @@ unsafe fn options( (vector, rabitq) } +fn make_default_build( + vector_options: VectorOptions, + _default_build: VchordrqDefaultBuildOptions, +) -> Vec>> { + vec![Structure::> { + centroids: vec![vec![0.0f32; vector_options.dims as usize]], + children: vec![vec![]], + }] +} + fn make_internal_build( vector_options: VectorOptions, internal_build: VchordrqInternalBuildOptions, - mut samples: Vec>, + samples: Vec>, reporter: &mut PostgresReporter, ) -> Vec>> { use std::iter::once; - k_means::preprocess(internal_build.build_threads as _, &mut samples, |sample| { - *sample = rabitq::rotate::rotate(sample) - }); let mut result = Vec::>>::new(); for w in internal_build.lists.iter().rev().copied().chain(once(1)) { let input = if let Some(structure) = result.last() { @@ -1014,7 +1083,18 @@ fn make_internal_build( if let Some(structure) = result.last() { let mut children = vec![Vec::new(); centroids.len()]; for i in 0..structure.len() as u32 { - let target = k_means::k_means_lookup(&structure.centroids[i as usize], ¢roids); + pub fn k_means_lookup(vector: &[f32], centroids: &[Vec]) -> usize { + assert_ne!(centroids.len(), 0); + let mut result = (f32::INFINITY, 0); + for i in 0..centroids.len() { + let dis = f32::reduce_sum_of_d2(vector, ¢roids[i]); + if dis <= result.0 { + result = (dis, i); + } + } + result.1 + } + let target = k_means_lookup(&structure.centroids[i as usize], ¢roids); children[target].push(i); } let (centroids, children) = std::iter::zip(centroids, children) @@ -1078,7 +1158,7 @@ fn make_external_build( if vector_options.dims != vector.as_borrowed().dims() { pgrx::error!("external build: incorrect dimension, id = {id}"); } - vectors.insert(id, rabitq::rotate::rotate(vector.as_borrowed().slice())); + vectors.insert(id, vector.as_borrowed().slice().to_vec()); } }); if parents.len() >= 2 && parents.values().all(|x| x.is_none()) { diff --git a/src/index/vchordrq/am/mod.rs b/src/index/vchordrq/am/mod.rs index bb24d316..03bb274c 100644 --- a/src/index/vchordrq/am/mod.rs +++ b/src/index/vchordrq/am/mod.rs @@ -228,7 +228,7 @@ pub unsafe extern "C-unwind" fn amcostestimate( let denumerator = r.clone(); let scale = r.skip(1).chain(std::iter::once(tuples)); for (scale, (numerator, denumerator)) in scale.zip(numerator.zip(denumerator)) { - count += (scale as f64) * ((numerator as f64) / (denumerator as f64)); + count += (scale as f64) * 1.0f64.min((numerator as f64) / (denumerator as f64)); } count }; diff --git a/src/index/vchordrq/types.rs b/src/index/vchordrq/types.rs index 1a1b21a0..44fb0ec9 100644 --- a/src/index/vchordrq/types.rs +++ b/src/index/vchordrq/types.rs @@ -16,6 +16,17 @@ use serde::{Deserialize, Serialize}; use validator::{Validate, ValidationError, ValidationErrors}; use vchordrq::types::VchordrqIndexOptions; +#[derive(Debug, Clone, Serialize, Deserialize, Validate)] +#[serde(deny_unknown_fields)] +pub struct VchordrqDefaultBuildOptions {} + +#[allow(clippy::derivable_impls)] +impl Default for VchordrqDefaultBuildOptions { + fn default() -> Self { + Self {} + } +} + #[derive(Debug, Clone, Serialize, Deserialize, Validate)] #[serde(deny_unknown_fields)] pub struct VchordrqInternalBuildOptions { @@ -84,13 +95,14 @@ pub struct VchordrqExternalBuildOptions { #[serde(deny_unknown_fields)] #[serde(rename_all = "snake_case")] pub enum VchordrqBuildSourceOptions { + Default(VchordrqDefaultBuildOptions), Internal(VchordrqInternalBuildOptions), External(VchordrqExternalBuildOptions), } impl Default for VchordrqBuildSourceOptions { fn default() -> Self { - Self::Internal(Default::default()) + Self::Default(Default::default()) } } @@ -98,6 +110,7 @@ impl Validate for VchordrqBuildSourceOptions { fn validate(&self) -> Result<(), ValidationErrors> { use VchordrqBuildSourceOptions::*; match self { + Default(default_build) => default_build.validate(), Internal(internal_build) => internal_build.validate(), External(external_build) => external_build.validate(), } diff --git a/tests/vchordg/sequential_build.slt b/tests/vchordg/sequential_build.slt new file mode 100644 index 00000000..826de42f --- /dev/null +++ b/tests/vchordg/sequential_build.slt @@ -0,0 +1,17 @@ +statement ok +CREATE TABLE t (val vector(3)); + +statement ok +INSERT INTO t (val) SELECT ARRAY[random(), random(), random()]::real[] FROM generate_series(1, 1000); + +statement ok +SET max_parallel_workers = 0; + +statement ok +SET max_parallel_maintenance_workers = 0; + +statement ok +CREATE INDEX ON t USING vchordg (val vector_ip_ops); + +statement ok +DROP TABLE t; diff --git a/tests/vchordrq/sequential_build.slt b/tests/vchordrq/sequential_build.slt new file mode 100644 index 00000000..b214b64f --- /dev/null +++ b/tests/vchordrq/sequential_build.slt @@ -0,0 +1,24 @@ +statement ok +CREATE TABLE t (val vector(3)); + +statement ok +INSERT INTO t (val) SELECT ARRAY[random(), random(), random()]::real[] FROM generate_series(1, 1000); + +statement ok +SET max_parallel_workers = 0; + +statement ok +SET max_parallel_maintenance_workers = 0; + +statement ok +CREATE INDEX ON t USING vchordrq (val vector_ip_ops) +WITH (options = $$ +residual_quantization = false +build.pin = true +build.internal.lists = [32] +build.internal.build_threads = 2 +build.internal.spherical_centroids = true +$$); + +statement ok +DROP TABLE t;