diff --git a/server/migrations/2024-12-05-201534_remove-chunk-counts-trigger/down.sql b/server/migrations/2024-12-05-201534_remove-chunk-counts-trigger/down.sql new file mode 100644 index 0000000000..192c967d81 --- /dev/null +++ b/server/migrations/2024-12-05-201534_remove-chunk-counts-trigger/down.sql @@ -0,0 +1,39 @@ +-- This file should undo anything in `up.sql` +CREATE OR REPLACE FUNCTION update_chunk_metadata_counts() +RETURNS TRIGGER AS $$ +DECLARE + d_id UUID; + new_count INT; +BEGIN + SELECT dataset_id INTO d_id FROM modified WHERE dataset_id IS NOT NULL LIMIT 1; + IF d_id IS NULL THEN + RETURN NULL; + END IF; + SELECT COUNT(modified.id) INTO new_count FROM modified; + + IF TG_OP = 'INSERT' THEN + INSERT INTO dataset_usage_counts (dataset_id, chunk_count) + VALUES (d_id, new_count) + ON CONFLICT (dataset_id) DO UPDATE + SET chunk_count = dataset_usage_counts.chunk_count + new_count; + ELSIF TG_OP = 'DELETE' THEN + UPDATE dataset_usage_counts + SET chunk_count = dataset_usage_counts.chunk_count - new_count + WHERE dataset_id = d_id; + END IF; + + RETURN NULL; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER increase_chunk_metadata_counts_trigger +AFTER INSERT ON chunk_metadata +REFERENCING NEW TABLE modified +FOR EACH STATEMENT +EXECUTE FUNCTION update_chunk_metadata_counts(); + +CREATE TRIGGER delete_chunk_metadata_counts_trigger +AFTER DELETE ON chunk_metadata +REFERENCING OLD TABLE modified +FOR EACH STATEMENT +EXECUTE FUNCTION update_chunk_metadata_counts(); diff --git a/server/migrations/2024-12-05-201534_remove-chunk-counts-trigger/up.sql b/server/migrations/2024-12-05-201534_remove-chunk-counts-trigger/up.sql new file mode 100644 index 0000000000..6d35c5862c --- /dev/null +++ b/server/migrations/2024-12-05-201534_remove-chunk-counts-trigger/up.sql @@ -0,0 +1,4 @@ +-- Your SQL goes here +DROP TRIGGER increase_chunk_metadata_counts_trigger ON chunk_metadata; +DROP TRIGGER delete_chunk_metadata_counts_trigger ON chunk_metadata; +DROP FUNCTION update_chunk_metadata_counts; diff --git a/server/src/bin/ingestion-worker.rs b/server/src/bin/ingestion-worker.rs index a88cc8a340..5c673d6317 100644 --- a/server/src/bin/ingestion-worker.rs +++ b/server/src/bin/ingestion-worker.rs @@ -818,18 +818,16 @@ pub async fn bulk_upload_chunks( return Err(err); } - if qdrant_only { - log::info!( - "Updating dataset chunk count by {}", - inserted_chunk_metadata_ids.len() - ); - update_dataset_chunk_count( - payload.dataset_id, - inserted_chunk_metadata_ids.len() as i32, - web_pool.clone(), - ) - .await?; - } + log::info!( + "Updating dataset chunk count by {}", + inserted_chunk_metadata_ids.len() + ); + update_dataset_chunk_count( + payload.dataset_id, + inserted_chunk_metadata_ids.len() as i32, + web_pool.clone(), + ) + .await?; Ok(inserted_chunk_metadata_ids) } diff --git a/server/src/operators/chunk_operator.rs b/server/src/operators/chunk_operator.rs index 1ffb1e8fe6..8a40d79e87 100644 --- a/server/src/operators/chunk_operator.rs +++ b/server/src/operators/chunk_operator.rs @@ -625,7 +625,9 @@ pub async fn bulk_delete_chunks_query( match deleted_point_ids { Ok(point_ids) => { - delete_points_from_qdrant(point_ids, qdrant_collection.clone()).await?; + delete_points_from_qdrant(point_ids.clone(), qdrant_collection.clone()).await?; + update_dataset_chunk_count(dataset_id, -(point_ids.len() as i32), pool.clone()) + .await?; } Err(e) => { log::error!("Failed to delete chunks: {:?}", e); @@ -636,12 +638,8 @@ pub async fn bulk_delete_chunks_query( } } else { delete_points_from_qdrant(qdrant_point_ids.clone(), qdrant_collection.clone()).await?; - update_dataset_chunk_count( - dataset_id, - -(qdrant_point_ids.clone().len() as i32), - pool.clone(), - ) - .await?; + update_dataset_chunk_count(dataset_id, -(qdrant_point_ids.len() as i32), pool.clone()) + .await?; } offset = offset_id; diff --git a/server/src/operators/dataset_operator.rs b/server/src/operators/dataset_operator.rs index caee542278..ec865f6d02 100644 --- a/server/src/operators/dataset_operator.rs +++ b/server/src/operators/dataset_operator.rs @@ -5,7 +5,7 @@ use crate::data::models::{ }; use crate::handlers::chunk_handler::ChunkFilter; use crate::handlers::dataset_handler::{GetDatasetsPagination, TagsWithCount}; -use crate::operators::chunk_operator::bulk_delete_chunks_query; +use crate::operators::chunk_operator::{bulk_delete_chunks_query, update_dataset_chunk_count}; use crate::operators::clickhouse_operator::ClickHouseEvent; use crate::operators::qdrant_operator::{ delete_points_from_qdrant, get_qdrant_collection_from_dataset_config, @@ -529,6 +529,7 @@ pub async fn clear_dataset_query( )) .await; + update_dataset_chunk_count(id, -(chunk_ids.len() as i32), pool.clone()).await?; log::info!("Deleted {} chunks from {}", chunk_ids.len(), id); last_offset_id = *chunk_ids.last().unwrap();