diff --git a/crates/search/src/lib.rs b/crates/search/src/lib.rs index 6dc4471..c58f0ad 100644 --- a/crates/search/src/lib.rs +++ b/crates/search/src/lib.rs @@ -2,6 +2,7 @@ use db::Db; use domain::{CatalogueObject, ObjectId}; +use meilisearch_sdk::tasks::Task; use serde::{Deserialize, Serialize}; /// Errors from the search subsystem. @@ -38,6 +39,16 @@ pub struct SearchClient { index_uid: String, } +/// Turn a completed task into an error if Meilisearch rejected it. +fn check_task(task: Task) -> Result<(), SearchError> { + match task { + Task::Failed { content } => Err(SearchError::Meili( + meilisearch_sdk::errors::Error::Meilisearch(content.error), + )), + _ => Ok(()), + } +} + impl SearchClient { pub fn connect(url: &str, api_key: &str, index_uid: &str) -> Result { let client = meilisearch_sdk::client::Client::new(url, Some(api_key))?; @@ -49,41 +60,61 @@ impl SearchClient { } pub async fn ensure_index(&self) -> Result<(), SearchError> { - self.client + let task = self + .client .create_index(&self.index_uid, Some("id")) .await? .wait_for_completion(&self.client, None, None) .await?; - self.client + // Tolerate "index already exists"; surface any other task failure. + if let Task::Failed { content } = &task { + if content.error.error_code != meilisearch_sdk::errors::ErrorCode::IndexAlreadyExists { + return Err(SearchError::Meili( + meilisearch_sdk::errors::Error::Meilisearch(content.error.clone()), + )); + } + } + + // set_filterable_attributes is idempotent on an existing index + let task = self + .client .index(&self.index_uid) .set_filterable_attributes(["visibility"]) .await? .wait_for_completion(&self.client, None, None) .await?; + check_task(task)?; + Ok(()) } pub async fn index_object(&self, doc: &SearchDocument) -> Result<(), SearchError> { - self.client + let task = self + .client .index(&self.index_uid) .add_or_replace(std::slice::from_ref(doc), Some("id")) .await? .wait_for_completion(&self.client, None, None) .await?; + check_task(task)?; + Ok(()) } pub async fn remove_object(&self, id: ObjectId) -> Result<(), SearchError> { - self.client + let task = self + .client .index(&self.index_uid) .delete_document(id.to_string()) .await? .wait_for_completion(&self.client, None, None) .await?; + check_task(task)?; + Ok(()) } @@ -113,12 +144,14 @@ impl SearchClient { pub async fn reindex_all(&self, db: &Db) -> Result<(), SearchError> { let index = self.client.index(&self.index_uid); - index + let task = index .delete_all_documents() .await? .wait_for_completion(&self.client, None, None) .await?; + check_task(task)?; + let objects = db::catalog::list_objects(db.pool()).await?; let mut docs = Vec::with_capacity(objects.len()); @@ -128,11 +161,13 @@ impl SearchClient { } if !docs.is_empty() { - index + let task = index .add_or_replace(&docs, Some("id")) .await? .wait_for_completion(&self.client, None, None) .await?; + + check_task(task)?; } Ok(()) diff --git a/crates/search/tests/search.rs b/crates/search/tests/search.rs index f120037..dcac090 100644 --- a/crates/search/tests/search.rs +++ b/crates/search/tests/search.rs @@ -50,3 +50,21 @@ async fn index_search_and_remove() { client.remove_object(vase).await.unwrap(); assert!(client.search("wood").await.unwrap().is_empty()); } + +#[tokio::test] +async fn ensure_index_is_idempotent() { + let (url, key) = meili(); + let index = unique_index(); + let client = SearchClient::connect(&url, &key, &index).unwrap(); + client.ensure_index().await.unwrap(); + // second call against the now-existing index must succeed + client.ensure_index().await.unwrap(); + + // and the client still works + let id = domain::ObjectId::new(); + client + .index_object(&doc(&id.to_string(), "lamp", &[])) + .await + .unwrap(); + assert_eq!(client.search("lamp").await.unwrap(), vec![id]); +}