feat: add LookupService with moka cache and provider orchestration
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -6,10 +6,13 @@ authors.workspace = true
|
||||
|
||||
[dependencies]
|
||||
async-trait = "0.1"
|
||||
futures = "0.3"
|
||||
moka = { version = "0.12", features = ["future"] }
|
||||
reqwest = "0.13"
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
thiserror = "2"
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
tracing = "0.1"
|
||||
wasmtime = { version = "45", features = ["component-model"] }
|
||||
|
||||
[dev-dependencies]
|
||||
|
||||
@@ -1,2 +1,4 @@
|
||||
pub mod error;
|
||||
pub mod fetch;
|
||||
pub mod model;
|
||||
pub mod service;
|
||||
|
||||
@@ -0,0 +1,322 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use moka::future::Cache;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::error::{FetchError, HostError};
|
||||
use crate::model::{FetchedResponse, ParseOutcome, ProviderResult};
|
||||
|
||||
/// A loaded provider. Implemented by `wasm::WasmProvider`; faked in tests.
|
||||
/// Methods are sync — WASM calls are CPU-bound; the service wraps them in
|
||||
/// `spawn_blocking`.
|
||||
pub trait ProviderHandle: Send + Sync {
|
||||
fn name(&self) -> &str;
|
||||
fn requests(&self, number: &str) -> Result<Vec<String>, HostError>;
|
||||
fn parse(&self, number: &str, responses: &[FetchedResponse]) -> ParseOutcome;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait Fetch: Send + Sync {
|
||||
async fn fetch(&self, url: &str) -> Result<FetchedResponse, FetchError>;
|
||||
}
|
||||
|
||||
pub struct LookupService {
|
||||
providers: Vec<Arc<dyn ProviderHandle>>,
|
||||
fetcher: Arc<dyn Fetch>,
|
||||
cache: Cache<String, ProviderResult>,
|
||||
}
|
||||
|
||||
impl LookupService {
|
||||
pub fn new(
|
||||
providers: Vec<Arc<dyn ProviderHandle>>,
|
||||
fetcher: Arc<dyn Fetch>,
|
||||
cache_ttl: Duration,
|
||||
) -> Self {
|
||||
Self {
|
||||
providers,
|
||||
fetcher,
|
||||
cache: Cache::builder().time_to_live(cache_ttl).build(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn provider_names(&self) -> Vec<&str> {
|
||||
self.providers.iter().map(|p| p.name()).collect()
|
||||
}
|
||||
|
||||
/// Run all providers concurrently; one result per provider name.
|
||||
pub async fn lookup(&self, number: &str) -> BTreeMap<String, ProviderResult> {
|
||||
let tasks = self.providers.iter().map(|provider| {
|
||||
let provider = provider.clone();
|
||||
let fetcher = self.fetcher.clone();
|
||||
let cache = self.cache.clone();
|
||||
let number = number.to_string();
|
||||
|
||||
async move {
|
||||
let name = provider.name().to_string();
|
||||
let key = format!("{name}:{number}");
|
||||
|
||||
if let Some(hit) = cache.get(&key).await {
|
||||
return (name, hit);
|
||||
}
|
||||
|
||||
let result = run_provider(provider, &number, fetcher).await;
|
||||
|
||||
// Transient failures must not poison the cache.
|
||||
if result != ProviderResult::FetchFailed {
|
||||
cache.insert(key, result.clone()).await;
|
||||
}
|
||||
|
||||
(name, result)
|
||||
}
|
||||
});
|
||||
|
||||
futures::future::join_all(tasks).await.into_iter().collect()
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_provider(
|
||||
provider: Arc<dyn ProviderHandle>,
|
||||
number: &str,
|
||||
fetcher: Arc<dyn Fetch>,
|
||||
) -> ProviderResult {
|
||||
let name = provider.name().to_string();
|
||||
|
||||
let urls = {
|
||||
let provider = provider.clone();
|
||||
let number = number.to_string();
|
||||
|
||||
match tokio::task::spawn_blocking(move || provider.requests(&number)).await {
|
||||
Ok(Ok(urls)) => urls,
|
||||
Ok(Err(error)) => {
|
||||
warn!(provider = %name, %error, "requests() failed");
|
||||
return ProviderResult::ParseFailed;
|
||||
}
|
||||
Err(error) => {
|
||||
warn!(provider = %name, %error, "requests() panicked");
|
||||
return ProviderResult::ParseFailed;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let fetched = futures::future::join_all(urls.iter().map(|url| fetcher.fetch(url))).await;
|
||||
|
||||
let mut responses = Vec::with_capacity(fetched.len());
|
||||
|
||||
for result in fetched {
|
||||
match result {
|
||||
Ok(response) => responses.push(response),
|
||||
Err(error) => {
|
||||
warn!(provider = %name, %error, "fetch failed");
|
||||
return ProviderResult::FetchFailed;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let outcome = {
|
||||
let provider = provider.clone();
|
||||
let number = number.to_string();
|
||||
|
||||
tokio::task::spawn_blocking(move || provider.parse(&number, &responses)).await
|
||||
};
|
||||
|
||||
match outcome {
|
||||
Ok(ParseOutcome::Ok(entry)) => ProviderResult::Ok { entry },
|
||||
Ok(ParseOutcome::NoData) => ProviderResult::NoData,
|
||||
Ok(ParseOutcome::Failed(message)) => {
|
||||
warn!(provider = %name, %message, "parse failed — scraper rot?");
|
||||
ProviderResult::ParseFailed
|
||||
}
|
||||
Err(error) => {
|
||||
warn!(provider = %name, %error, "parse() panicked");
|
||||
ProviderResult::ParseFailed
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
|
||||
use super::*;
|
||||
use crate::error::{FetchError, HostError};
|
||||
use crate::model::{Comment, Entry, FetchedResponse, ParseOutcome, ProviderResult};
|
||||
|
||||
fn entry() -> Entry {
|
||||
Entry {
|
||||
messages: vec![],
|
||||
history: vec!["history".to_string()],
|
||||
comments: vec![Comment {
|
||||
timestamp: Some(1547746162),
|
||||
title: None,
|
||||
message: "spam".to_string(),
|
||||
}],
|
||||
}
|
||||
}
|
||||
|
||||
/// Provider whose parse outcome is scripted per call.
|
||||
struct FakeProvider {
|
||||
name: &'static str,
|
||||
outcome: fn() -> ParseOutcome,
|
||||
}
|
||||
|
||||
impl ProviderHandle for FakeProvider {
|
||||
fn name(&self) -> &str {
|
||||
self.name
|
||||
}
|
||||
|
||||
fn requests(&self, number: &str) -> Result<Vec<String>, HostError> {
|
||||
Ok(vec![format!("https://example.test/{number}")])
|
||||
}
|
||||
|
||||
fn parse(&self, _number: &str, _responses: &[FetchedResponse]) -> ParseOutcome {
|
||||
(self.outcome)()
|
||||
}
|
||||
}
|
||||
|
||||
/// Fetcher that counts calls and can be told to fail.
|
||||
struct FakeFetcher {
|
||||
calls: AtomicUsize,
|
||||
fail: bool,
|
||||
}
|
||||
|
||||
impl FakeFetcher {
|
||||
fn new(fail: bool) -> Self {
|
||||
Self {
|
||||
calls: AtomicUsize::new(0),
|
||||
fail,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Fetch for FakeFetcher {
|
||||
async fn fetch(&self, _url: &str) -> Result<FetchedResponse, FetchError> {
|
||||
self.calls.fetch_add(1, Ordering::SeqCst);
|
||||
|
||||
if self.fail {
|
||||
// reqwest::Error cannot be constructed directly; produce a real
|
||||
// one via an immediately-refused local connection (port 1).
|
||||
let err = reqwest::Client::new()
|
||||
.get("http://127.0.0.1:1/unreachable")
|
||||
.send()
|
||||
.await
|
||||
.unwrap_err();
|
||||
return Err(FetchError::Request(err));
|
||||
}
|
||||
|
||||
Ok(FetchedResponse {
|
||||
status: 200,
|
||||
body: "body".to_string(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn service(providers: Vec<Arc<dyn ProviderHandle>>, fetcher: Arc<dyn Fetch>) -> LookupService {
|
||||
LookupService::new(providers, fetcher, Duration::from_secs(60))
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn ok_result_is_returned_and_cached() {
|
||||
let provider = Arc::new(FakeProvider {
|
||||
name: "fake.se",
|
||||
outcome: || ParseOutcome::Ok(entry()),
|
||||
});
|
||||
let fetcher = Arc::new(FakeFetcher::new(false));
|
||||
let svc = service(vec![provider], fetcher.clone());
|
||||
|
||||
let results = svc.lookup("0700000000").await;
|
||||
assert_eq!(results["fake.se"], ProviderResult::Ok { entry: entry() });
|
||||
|
||||
// second lookup served from cache — fetcher not called again
|
||||
let results = svc.lookup("0700000000").await;
|
||||
assert_eq!(results["fake.se"], ProviderResult::Ok { entry: entry() });
|
||||
assert_eq!(fetcher.calls.load(Ordering::SeqCst), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn no_data_is_cached() {
|
||||
let provider = Arc::new(FakeProvider {
|
||||
name: "fake.se",
|
||||
outcome: || ParseOutcome::NoData,
|
||||
});
|
||||
let fetcher = Arc::new(FakeFetcher::new(false));
|
||||
let svc = service(vec![provider], fetcher.clone());
|
||||
|
||||
assert_eq!(
|
||||
svc.lookup("0700000000").await["fake.se"],
|
||||
ProviderResult::NoData
|
||||
);
|
||||
assert_eq!(
|
||||
svc.lookup("0700000000").await["fake.se"],
|
||||
ProviderResult::NoData
|
||||
);
|
||||
assert_eq!(fetcher.calls.load(Ordering::SeqCst), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn parse_failure_maps_and_is_cached() {
|
||||
let provider = Arc::new(FakeProvider {
|
||||
name: "fake.se",
|
||||
outcome: || ParseOutcome::Failed("rot".to_string()),
|
||||
});
|
||||
let fetcher = Arc::new(FakeFetcher::new(false));
|
||||
let svc = service(vec![provider], fetcher.clone());
|
||||
|
||||
assert_eq!(
|
||||
svc.lookup("0700000000").await["fake.se"],
|
||||
ProviderResult::ParseFailed
|
||||
);
|
||||
assert_eq!(
|
||||
svc.lookup("0700000000").await["fake.se"],
|
||||
ProviderResult::ParseFailed
|
||||
);
|
||||
assert_eq!(fetcher.calls.load(Ordering::SeqCst), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fetch_failure_is_not_cached() {
|
||||
let provider = Arc::new(FakeProvider {
|
||||
name: "fake.se",
|
||||
outcome: || ParseOutcome::NoData,
|
||||
});
|
||||
let fetcher = Arc::new(FakeFetcher::new(true));
|
||||
let svc = service(vec![provider], fetcher.clone());
|
||||
|
||||
assert_eq!(
|
||||
svc.lookup("0700000000").await["fake.se"],
|
||||
ProviderResult::FetchFailed
|
||||
);
|
||||
assert_eq!(
|
||||
svc.lookup("0700000000").await["fake.se"],
|
||||
ProviderResult::FetchFailed
|
||||
);
|
||||
// NOT cached: fetcher tried twice
|
||||
assert_eq!(fetcher.calls.load(Ordering::SeqCst), 2);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn multiple_providers_keyed_by_name() {
|
||||
let a = Arc::new(FakeProvider {
|
||||
name: "a.se",
|
||||
outcome: || ParseOutcome::NoData,
|
||||
});
|
||||
let b = Arc::new(FakeProvider {
|
||||
name: "b.se",
|
||||
outcome: || ParseOutcome::Ok(entry()),
|
||||
});
|
||||
let fetcher = Arc::new(FakeFetcher::new(false));
|
||||
let svc = service(vec![a, b], fetcher);
|
||||
|
||||
let results = svc.lookup("0700000000").await;
|
||||
assert_eq!(results.len(), 2);
|
||||
assert_eq!(results["a.se"], ProviderResult::NoData);
|
||||
assert!(matches!(results["b.se"], ProviderResult::Ok { .. }));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user