Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
-- This file should undo anything in `up.sql`
CREATE OR REPLACE FUNCTION update_chunk_metadata_counts()
RETURNS TRIGGER AS $$
DECLARE
d_id UUID;
new_count INT;
BEGIN
SELECT dataset_id INTO d_id FROM modified WHERE dataset_id IS NOT NULL LIMIT 1;
IF d_id IS NULL THEN
RETURN NULL;
END IF;
SELECT COUNT(modified.id) INTO new_count FROM modified;

IF TG_OP = 'INSERT' THEN
INSERT INTO dataset_usage_counts (dataset_id, chunk_count)
VALUES (d_id, new_count)
ON CONFLICT (dataset_id) DO UPDATE
SET chunk_count = dataset_usage_counts.chunk_count + new_count;
ELSIF TG_OP = 'DELETE' THEN
UPDATE dataset_usage_counts
SET chunk_count = dataset_usage_counts.chunk_count - new_count
WHERE dataset_id = d_id;
END IF;

RETURN NULL;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER increase_chunk_metadata_counts_trigger
AFTER INSERT ON chunk_metadata
REFERENCING NEW TABLE modified
FOR EACH STATEMENT
EXECUTE FUNCTION update_chunk_metadata_counts();

CREATE TRIGGER delete_chunk_metadata_counts_trigger
AFTER DELETE ON chunk_metadata
REFERENCING OLD TABLE modified
FOR EACH STATEMENT
EXECUTE FUNCTION update_chunk_metadata_counts();
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- Your SQL goes here
DROP TRIGGER increase_chunk_metadata_counts_trigger ON chunk_metadata;
DROP TRIGGER delete_chunk_metadata_counts_trigger ON chunk_metadata;
DROP FUNCTION update_chunk_metadata_counts;
22 changes: 10 additions & 12 deletions server/src/bin/ingestion-worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -818,18 +818,16 @@ pub async fn bulk_upload_chunks(
return Err(err);
}

if qdrant_only {
log::info!(
"Updating dataset chunk count by {}",
inserted_chunk_metadata_ids.len()
);
update_dataset_chunk_count(
payload.dataset_id,
inserted_chunk_metadata_ids.len() as i32,
web_pool.clone(),
)
.await?;
}
log::info!(
"Updating dataset chunk count by {}",
inserted_chunk_metadata_ids.len()
);
update_dataset_chunk_count(
payload.dataset_id,
inserted_chunk_metadata_ids.len() as i32,
web_pool.clone(),
)
.await?;

Ok(inserted_chunk_metadata_ids)
}
Expand Down
12 changes: 5 additions & 7 deletions server/src/operators/chunk_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,9 @@ pub async fn bulk_delete_chunks_query(

match deleted_point_ids {
Ok(point_ids) => {
delete_points_from_qdrant(point_ids, qdrant_collection.clone()).await?;
delete_points_from_qdrant(point_ids.clone(), qdrant_collection.clone()).await?;
update_dataset_chunk_count(dataset_id, -(point_ids.len() as i32), pool.clone())
.await?;
}
Err(e) => {
log::error!("Failed to delete chunks: {:?}", e);
Expand All @@ -636,12 +638,8 @@ pub async fn bulk_delete_chunks_query(
}
} else {
delete_points_from_qdrant(qdrant_point_ids.clone(), qdrant_collection.clone()).await?;
update_dataset_chunk_count(
dataset_id,
-(qdrant_point_ids.clone().len() as i32),
pool.clone(),
)
.await?;
update_dataset_chunk_count(dataset_id, -(qdrant_point_ids.len() as i32), pool.clone())
.await?;
}

offset = offset_id;
Expand Down
3 changes: 2 additions & 1 deletion server/src/operators/dataset_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::data::models::{
};
use crate::handlers::chunk_handler::ChunkFilter;
use crate::handlers::dataset_handler::{GetDatasetsPagination, TagsWithCount};
use crate::operators::chunk_operator::bulk_delete_chunks_query;
use crate::operators::chunk_operator::{bulk_delete_chunks_query, update_dataset_chunk_count};
use crate::operators::clickhouse_operator::ClickHouseEvent;
use crate::operators::qdrant_operator::{
delete_points_from_qdrant, get_qdrant_collection_from_dataset_config,
Expand Down Expand Up @@ -529,6 +529,7 @@ pub async fn clear_dataset_query(
))
.await;

update_dataset_chunk_count(id, -(chunk_ids.len() as i32), pool.clone()).await?;
log::info!("Deleted {} chunks from {}", chunk_ids.len(), id);

last_offset_id = *chunk_ids.last().unwrap();
Expand Down