fix(search): surface failed Meilisearch tasks; make ensure_index idempotent
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
use db::Db;
|
use db::Db;
|
||||||
use domain::{CatalogueObject, ObjectId};
|
use domain::{CatalogueObject, ObjectId};
|
||||||
|
use meilisearch_sdk::tasks::Task;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
/// Errors from the search subsystem.
|
/// Errors from the search subsystem.
|
||||||
@@ -38,6 +39,16 @@ pub struct SearchClient {
|
|||||||
index_uid: String,
|
index_uid: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Turn a completed task into an error if Meilisearch rejected it.
|
||||||
|
fn check_task(task: Task) -> Result<(), SearchError> {
|
||||||
|
match task {
|
||||||
|
Task::Failed { content } => Err(SearchError::Meili(
|
||||||
|
meilisearch_sdk::errors::Error::Meilisearch(content.error),
|
||||||
|
)),
|
||||||
|
_ => Ok(()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl SearchClient {
|
impl SearchClient {
|
||||||
pub fn connect(url: &str, api_key: &str, index_uid: &str) -> Result<Self, SearchError> {
|
pub fn connect(url: &str, api_key: &str, index_uid: &str) -> Result<Self, SearchError> {
|
||||||
let client = meilisearch_sdk::client::Client::new(url, Some(api_key))?;
|
let client = meilisearch_sdk::client::Client::new(url, Some(api_key))?;
|
||||||
@@ -49,41 +60,61 @@ impl SearchClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn ensure_index(&self) -> Result<(), SearchError> {
|
pub async fn ensure_index(&self) -> Result<(), SearchError> {
|
||||||
self.client
|
let task = self
|
||||||
|
.client
|
||||||
.create_index(&self.index_uid, Some("id"))
|
.create_index(&self.index_uid, Some("id"))
|
||||||
.await?
|
.await?
|
||||||
.wait_for_completion(&self.client, None, None)
|
.wait_for_completion(&self.client, None, None)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
self.client
|
// Tolerate "index already exists"; surface any other task failure.
|
||||||
|
if let Task::Failed { content } = &task {
|
||||||
|
if content.error.error_code != meilisearch_sdk::errors::ErrorCode::IndexAlreadyExists {
|
||||||
|
return Err(SearchError::Meili(
|
||||||
|
meilisearch_sdk::errors::Error::Meilisearch(content.error.clone()),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// set_filterable_attributes is idempotent on an existing index
|
||||||
|
let task = self
|
||||||
|
.client
|
||||||
.index(&self.index_uid)
|
.index(&self.index_uid)
|
||||||
.set_filterable_attributes(["visibility"])
|
.set_filterable_attributes(["visibility"])
|
||||||
.await?
|
.await?
|
||||||
.wait_for_completion(&self.client, None, None)
|
.wait_for_completion(&self.client, None, None)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
check_task(task)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn index_object(&self, doc: &SearchDocument) -> Result<(), SearchError> {
|
pub async fn index_object(&self, doc: &SearchDocument) -> Result<(), SearchError> {
|
||||||
self.client
|
let task = self
|
||||||
|
.client
|
||||||
.index(&self.index_uid)
|
.index(&self.index_uid)
|
||||||
.add_or_replace(std::slice::from_ref(doc), Some("id"))
|
.add_or_replace(std::slice::from_ref(doc), Some("id"))
|
||||||
.await?
|
.await?
|
||||||
.wait_for_completion(&self.client, None, None)
|
.wait_for_completion(&self.client, None, None)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
check_task(task)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn remove_object(&self, id: ObjectId) -> Result<(), SearchError> {
|
pub async fn remove_object(&self, id: ObjectId) -> Result<(), SearchError> {
|
||||||
self.client
|
let task = self
|
||||||
|
.client
|
||||||
.index(&self.index_uid)
|
.index(&self.index_uid)
|
||||||
.delete_document(id.to_string())
|
.delete_document(id.to_string())
|
||||||
.await?
|
.await?
|
||||||
.wait_for_completion(&self.client, None, None)
|
.wait_for_completion(&self.client, None, None)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
check_task(task)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -113,12 +144,14 @@ impl SearchClient {
|
|||||||
pub async fn reindex_all(&self, db: &Db) -> Result<(), SearchError> {
|
pub async fn reindex_all(&self, db: &Db) -> Result<(), SearchError> {
|
||||||
let index = self.client.index(&self.index_uid);
|
let index = self.client.index(&self.index_uid);
|
||||||
|
|
||||||
index
|
let task = index
|
||||||
.delete_all_documents()
|
.delete_all_documents()
|
||||||
.await?
|
.await?
|
||||||
.wait_for_completion(&self.client, None, None)
|
.wait_for_completion(&self.client, None, None)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
check_task(task)?;
|
||||||
|
|
||||||
let objects = db::catalog::list_objects(db.pool()).await?;
|
let objects = db::catalog::list_objects(db.pool()).await?;
|
||||||
|
|
||||||
let mut docs = Vec::with_capacity(objects.len());
|
let mut docs = Vec::with_capacity(objects.len());
|
||||||
@@ -128,11 +161,13 @@ impl SearchClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !docs.is_empty() {
|
if !docs.is_empty() {
|
||||||
index
|
let task = index
|
||||||
.add_or_replace(&docs, Some("id"))
|
.add_or_replace(&docs, Some("id"))
|
||||||
.await?
|
.await?
|
||||||
.wait_for_completion(&self.client, None, None)
|
.wait_for_completion(&self.client, None, None)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
check_task(task)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|||||||
@@ -50,3 +50,21 @@ async fn index_search_and_remove() {
|
|||||||
client.remove_object(vase).await.unwrap();
|
client.remove_object(vase).await.unwrap();
|
||||||
assert!(client.search("wood").await.unwrap().is_empty());
|
assert!(client.search("wood").await.unwrap().is_empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn ensure_index_is_idempotent() {
|
||||||
|
let (url, key) = meili();
|
||||||
|
let index = unique_index();
|
||||||
|
let client = SearchClient::connect(&url, &key, &index).unwrap();
|
||||||
|
client.ensure_index().await.unwrap();
|
||||||
|
// second call against the now-existing index must succeed
|
||||||
|
client.ensure_index().await.unwrap();
|
||||||
|
|
||||||
|
// and the client still works
|
||||||
|
let id = domain::ObjectId::new();
|
||||||
|
client
|
||||||
|
.index_object(&doc(&id.to_string(), "lamp", &[]))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(client.search("lamp").await.unwrap(), vec![id]);
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user