diff --git a/Cargo.lock b/Cargo.lock index de4bf94..b92b4ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -38,6 +38,17 @@ version = "1.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3d036a3c4ab069c7b410a2ce876bd74808d2d0888a82667669f8e783a898bf1" +[[package]] +name = "async-lock" +version = "3.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "290f7f2596bd5b78a9fec8088ccd89180d7f9f55b94b0576823bbbdc72ee8311" +dependencies = [ + "event-listener", + "event-listener-strategy", + "pin-project-lite", +] + [[package]] name = "async-trait" version = "0.1.89" @@ -165,6 +176,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "console" version = "0.16.3" @@ -371,6 +391,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-deque" version = "0.8.6" @@ -512,6 +541,27 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "event-listener" +version = "5.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "fastrand" version = "2.4.1" @@ -571,6 +621,7 @@ checksum = "8b147ee9d1f6d097cef9ce628cd2ee62288d963e16fb287bd9286455b241382d" dependencies = [ "futures-channel", "futures-core", + "futures-executor", "futures-io", "futures-sink", "futures-task", @@ -593,12 +644,34 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" +[[package]] +name = "futures-executor" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf29c38818342a3b26b5b923639e7b1f4a61fc5e76102d4b1981c6dc7a7579d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" +[[package]] +name = "futures-macro" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.32" @@ -620,6 +693,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-io", + "futures-macro", "futures-sink", "futures-task", "memchr", @@ -1212,6 +1286,26 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "moka" +version = "0.12.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "957228ad12042ee839f93c8f257b62b4c0ab5eaae1d4fa60de53b27c9d7c5046" +dependencies = [ + "async-lock", + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "equivalent", + "event-listener", + "futures-util", + "parking_lot", + "portable-atomic", + "smallvec", + "tagptr", + "uuid", +] + [[package]] name = "object" version = "0.39.1" @@ -1236,6 +1330,12 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.12.5" @@ -1287,6 +1387,12 @@ version = "0.3.33" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e" +[[package]] +name = "portable-atomic" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c33a9471896f1c69cecef8d20cbe2f7accd12527ce60845ff44c153bb2a21b49" + [[package]] name = "postcard" version = "1.1.3" @@ -1962,6 +2068,12 @@ dependencies = [ "libc", ] +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "target-lexicon" version = "0.13.5" @@ -2197,9 +2309,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" dependencies = [ "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tracing-core" version = "0.1.36" @@ -2269,6 +2393,7 @@ version = "1.23.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d258b83ceec21034727ecee8c382cfa6c3e133699b0742c64571814fb420c9f7" dependencies = [ + "getrandom 0.4.2", "js-sys", "wasm-bindgen", ] @@ -2841,11 +2966,14 @@ name = "whoareyou-server" version = "0.1.0" dependencies = [ "async-trait", + "futures", + "moka", "reqwest", "serde", "serde_json", "thiserror 2.0.18", "tokio", + "tracing", "wasmtime", ] diff --git a/crates/server/Cargo.toml b/crates/server/Cargo.toml index 3a78c3e..f5bda30 100644 --- a/crates/server/Cargo.toml +++ b/crates/server/Cargo.toml @@ -6,10 +6,13 @@ authors.workspace = true [dependencies] async-trait = "0.1" +futures = "0.3" +moka = { version = "0.12", features = ["future"] } reqwest = "0.13" serde = { version = "1", features = ["derive"] } thiserror = "2" tokio = { version = "1", features = ["full"] } +tracing = "0.1" wasmtime = { version = "45", features = ["component-model"] } [dev-dependencies] diff --git a/crates/server/src/lib.rs b/crates/server/src/lib.rs index 3849a7b..81f6d19 100644 --- a/crates/server/src/lib.rs +++ b/crates/server/src/lib.rs @@ -1,2 +1,4 @@ pub mod error; +pub mod fetch; pub mod model; +pub mod service; diff --git a/crates/server/src/service.rs b/crates/server/src/service.rs new file mode 100644 index 0000000..42c1cbc --- /dev/null +++ b/crates/server/src/service.rs @@ -0,0 +1,322 @@ +use std::collections::BTreeMap; +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use moka::future::Cache; +use tracing::warn; + +use crate::error::{FetchError, HostError}; +use crate::model::{FetchedResponse, ParseOutcome, ProviderResult}; + +/// A loaded provider. Implemented by `wasm::WasmProvider`; faked in tests. +/// Methods are sync — WASM calls are CPU-bound; the service wraps them in +/// `spawn_blocking`. +pub trait ProviderHandle: Send + Sync { + fn name(&self) -> &str; + fn requests(&self, number: &str) -> Result, HostError>; + fn parse(&self, number: &str, responses: &[FetchedResponse]) -> ParseOutcome; +} + +#[async_trait] +pub trait Fetch: Send + Sync { + async fn fetch(&self, url: &str) -> Result; +} + +pub struct LookupService { + providers: Vec>, + fetcher: Arc, + cache: Cache, +} + +impl LookupService { + pub fn new( + providers: Vec>, + fetcher: Arc, + cache_ttl: Duration, + ) -> Self { + Self { + providers, + fetcher, + cache: Cache::builder().time_to_live(cache_ttl).build(), + } + } + + pub fn provider_names(&self) -> Vec<&str> { + self.providers.iter().map(|p| p.name()).collect() + } + + /// Run all providers concurrently; one result per provider name. + pub async fn lookup(&self, number: &str) -> BTreeMap { + let tasks = self.providers.iter().map(|provider| { + let provider = provider.clone(); + let fetcher = self.fetcher.clone(); + let cache = self.cache.clone(); + let number = number.to_string(); + + async move { + let name = provider.name().to_string(); + let key = format!("{name}:{number}"); + + if let Some(hit) = cache.get(&key).await { + return (name, hit); + } + + let result = run_provider(provider, &number, fetcher).await; + + // Transient failures must not poison the cache. + if result != ProviderResult::FetchFailed { + cache.insert(key, result.clone()).await; + } + + (name, result) + } + }); + + futures::future::join_all(tasks).await.into_iter().collect() + } +} + +async fn run_provider( + provider: Arc, + number: &str, + fetcher: Arc, +) -> ProviderResult { + let name = provider.name().to_string(); + + let urls = { + let provider = provider.clone(); + let number = number.to_string(); + + match tokio::task::spawn_blocking(move || provider.requests(&number)).await { + Ok(Ok(urls)) => urls, + Ok(Err(error)) => { + warn!(provider = %name, %error, "requests() failed"); + return ProviderResult::ParseFailed; + } + Err(error) => { + warn!(provider = %name, %error, "requests() panicked"); + return ProviderResult::ParseFailed; + } + } + }; + + let fetched = futures::future::join_all(urls.iter().map(|url| fetcher.fetch(url))).await; + + let mut responses = Vec::with_capacity(fetched.len()); + + for result in fetched { + match result { + Ok(response) => responses.push(response), + Err(error) => { + warn!(provider = %name, %error, "fetch failed"); + return ProviderResult::FetchFailed; + } + } + } + + let outcome = { + let provider = provider.clone(); + let number = number.to_string(); + + tokio::task::spawn_blocking(move || provider.parse(&number, &responses)).await + }; + + match outcome { + Ok(ParseOutcome::Ok(entry)) => ProviderResult::Ok { entry }, + Ok(ParseOutcome::NoData) => ProviderResult::NoData, + Ok(ParseOutcome::Failed(message)) => { + warn!(provider = %name, %message, "parse failed — scraper rot?"); + ProviderResult::ParseFailed + } + Err(error) => { + warn!(provider = %name, %error, "parse() panicked"); + ProviderResult::ParseFailed + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::time::Duration; + + use async_trait::async_trait; + + use super::*; + use crate::error::{FetchError, HostError}; + use crate::model::{Comment, Entry, FetchedResponse, ParseOutcome, ProviderResult}; + + fn entry() -> Entry { + Entry { + messages: vec![], + history: vec!["history".to_string()], + comments: vec![Comment { + timestamp: Some(1547746162), + title: None, + message: "spam".to_string(), + }], + } + } + + /// Provider whose parse outcome is scripted per call. + struct FakeProvider { + name: &'static str, + outcome: fn() -> ParseOutcome, + } + + impl ProviderHandle for FakeProvider { + fn name(&self) -> &str { + self.name + } + + fn requests(&self, number: &str) -> Result, HostError> { + Ok(vec![format!("https://example.test/{number}")]) + } + + fn parse(&self, _number: &str, _responses: &[FetchedResponse]) -> ParseOutcome { + (self.outcome)() + } + } + + /// Fetcher that counts calls and can be told to fail. + struct FakeFetcher { + calls: AtomicUsize, + fail: bool, + } + + impl FakeFetcher { + fn new(fail: bool) -> Self { + Self { + calls: AtomicUsize::new(0), + fail, + } + } + } + + #[async_trait] + impl Fetch for FakeFetcher { + async fn fetch(&self, _url: &str) -> Result { + self.calls.fetch_add(1, Ordering::SeqCst); + + if self.fail { + // reqwest::Error cannot be constructed directly; produce a real + // one via an immediately-refused local connection (port 1). + let err = reqwest::Client::new() + .get("http://127.0.0.1:1/unreachable") + .send() + .await + .unwrap_err(); + return Err(FetchError::Request(err)); + } + + Ok(FetchedResponse { + status: 200, + body: "body".to_string(), + }) + } + } + + fn service(providers: Vec>, fetcher: Arc) -> LookupService { + LookupService::new(providers, fetcher, Duration::from_secs(60)) + } + + #[tokio::test] + async fn ok_result_is_returned_and_cached() { + let provider = Arc::new(FakeProvider { + name: "fake.se", + outcome: || ParseOutcome::Ok(entry()), + }); + let fetcher = Arc::new(FakeFetcher::new(false)); + let svc = service(vec![provider], fetcher.clone()); + + let results = svc.lookup("0700000000").await; + assert_eq!(results["fake.se"], ProviderResult::Ok { entry: entry() }); + + // second lookup served from cache — fetcher not called again + let results = svc.lookup("0700000000").await; + assert_eq!(results["fake.se"], ProviderResult::Ok { entry: entry() }); + assert_eq!(fetcher.calls.load(Ordering::SeqCst), 1); + } + + #[tokio::test] + async fn no_data_is_cached() { + let provider = Arc::new(FakeProvider { + name: "fake.se", + outcome: || ParseOutcome::NoData, + }); + let fetcher = Arc::new(FakeFetcher::new(false)); + let svc = service(vec![provider], fetcher.clone()); + + assert_eq!( + svc.lookup("0700000000").await["fake.se"], + ProviderResult::NoData + ); + assert_eq!( + svc.lookup("0700000000").await["fake.se"], + ProviderResult::NoData + ); + assert_eq!(fetcher.calls.load(Ordering::SeqCst), 1); + } + + #[tokio::test] + async fn parse_failure_maps_and_is_cached() { + let provider = Arc::new(FakeProvider { + name: "fake.se", + outcome: || ParseOutcome::Failed("rot".to_string()), + }); + let fetcher = Arc::new(FakeFetcher::new(false)); + let svc = service(vec![provider], fetcher.clone()); + + assert_eq!( + svc.lookup("0700000000").await["fake.se"], + ProviderResult::ParseFailed + ); + assert_eq!( + svc.lookup("0700000000").await["fake.se"], + ProviderResult::ParseFailed + ); + assert_eq!(fetcher.calls.load(Ordering::SeqCst), 1); + } + + #[tokio::test] + async fn fetch_failure_is_not_cached() { + let provider = Arc::new(FakeProvider { + name: "fake.se", + outcome: || ParseOutcome::NoData, + }); + let fetcher = Arc::new(FakeFetcher::new(true)); + let svc = service(vec![provider], fetcher.clone()); + + assert_eq!( + svc.lookup("0700000000").await["fake.se"], + ProviderResult::FetchFailed + ); + assert_eq!( + svc.lookup("0700000000").await["fake.se"], + ProviderResult::FetchFailed + ); + // NOT cached: fetcher tried twice + assert_eq!(fetcher.calls.load(Ordering::SeqCst), 2); + } + + #[tokio::test] + async fn multiple_providers_keyed_by_name() { + let a = Arc::new(FakeProvider { + name: "a.se", + outcome: || ParseOutcome::NoData, + }); + let b = Arc::new(FakeProvider { + name: "b.se", + outcome: || ParseOutcome::Ok(entry()), + }); + let fetcher = Arc::new(FakeFetcher::new(false)); + let svc = service(vec![a, b], fetcher); + + let results = svc.lookup("0700000000").await; + assert_eq!(results.len(), 2); + assert_eq!(results["a.se"], ProviderResult::NoData); + assert!(matches!(results["b.se"], ProviderResult::Ok { .. })); + } +}