diff --git a/nora-registry/src/audit.rs b/nora-registry/src/audit.rs index 597f29e..01452ec 100644 --- a/nora-registry/src/audit.rs +++ b/nora-registry/src/audit.rs @@ -12,6 +12,7 @@ use serde::Serialize; use std::fs::{self, OpenOptions}; use std::io::Write; use std::path::PathBuf; +use std::sync::Arc; use tracing::{info, warn}; #[derive(Debug, Clone, Serialize)] @@ -39,7 +40,7 @@ impl AuditEntry { pub struct AuditLog { path: PathBuf, - writer: Mutex>, + writer: Arc>>, } impl AuditLog { @@ -48,23 +49,26 @@ impl AuditLog { let writer = match OpenOptions::new().create(true).append(true).open(&path) { Ok(f) => { info!(path = %path.display(), "Audit log initialized"); - Mutex::new(Some(f)) + Arc::new(Mutex::new(Some(f))) } Err(e) => { warn!(path = %path.display(), error = %e, "Failed to open audit log, auditing disabled"); - Mutex::new(None) + Arc::new(Mutex::new(None)) } }; Self { path, writer } } pub fn log(&self, entry: AuditEntry) { - if let Some(ref mut file) = *self.writer.lock() { - if let Ok(json) = serde_json::to_string(&entry) { - let _ = writeln!(file, "{}", json); - let _ = file.flush(); + let writer = Arc::clone(&self.writer); + tokio::task::spawn_blocking(move || { + if let Some(ref mut file) = *writer.lock() { + if let Ok(json) = serde_json::to_string(&entry) { + let _ = writeln!(file, "{}", json); + let _ = file.flush(); + } } - } + }); } pub fn path(&self) -> &PathBuf { @@ -100,23 +104,25 @@ mod tests { assert!(log.path().ends_with("audit.jsonl")); } - #[test] - fn test_audit_log_write_entry() { + #[tokio::test] + async fn test_audit_log_write_entry() { let tmp = TempDir::new().unwrap(); let log = AuditLog::new(tmp.path().to_str().unwrap()); let entry = AuditEntry::new("pull", "user1", "lodash", "npm", "downloaded"); log.log(entry); - // Verify file contains the entry + // spawn_blocking is fire-and-forget; give it time to flush + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + let content = std::fs::read_to_string(log.path()).unwrap(); assert!(content.contains(r#""action":"pull""#)); assert!(content.contains(r#""actor":"user1""#)); assert!(content.contains(r#""artifact":"lodash""#)); } - #[test] - fn test_audit_log_multiple_entries() { + #[tokio::test] + async fn test_audit_log_multiple_entries() { let tmp = TempDir::new().unwrap(); let log = AuditLog::new(tmp.path().to_str().unwrap()); @@ -124,6 +130,8 @@ mod tests { log.log(AuditEntry::new("pull", "user", "b", "npm", "")); log.log(AuditEntry::new("delete", "admin", "c", "maven", "")); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + let content = std::fs::read_to_string(log.path()).unwrap(); let lines: Vec<&str> = content.lines().collect(); assert_eq!(lines.len(), 3); diff --git a/nora-registry/src/dashboard_metrics.rs b/nora-registry/src/dashboard_metrics.rs index 62d9cba..a9deb58 100644 --- a/nora-registry/src/dashboard_metrics.rs +++ b/nora-registry/src/dashboard_metrics.rs @@ -111,8 +111,8 @@ impl DashboardMetrics { metrics } - /// Save current metrics to disk - pub fn save(&self) { + /// Save current metrics to disk (async to avoid blocking the runtime) + pub async fn save(&self) { let Some(path) = &self.persist_path else { return; }; @@ -134,8 +134,8 @@ impl DashboardMetrics { // Atomic write: write to tmp then rename let tmp = path.with_extension("json.tmp"); if let Ok(data) = serde_json::to_string_pretty(&snap) { - if std::fs::write(&tmp, &data).is_ok() { - let _ = std::fs::rename(&tmp, path); + if tokio::fs::write(&tmp, &data).await.is_ok() { + let _ = tokio::fs::rename(&tmp, path).await; } } } @@ -317,8 +317,8 @@ mod tests { assert_eq!(m.get_registry_uploads("unknown"), 0); } - #[test] - fn test_persistence_save_and_load() { + #[tokio::test] + async fn test_persistence_save_and_load() { let tmp = TempDir::new().unwrap(); let path = tmp.path().to_str().unwrap(); @@ -329,7 +329,7 @@ mod tests { m.record_download("docker"); m.record_upload("maven"); m.record_cache_hit(); - m.save(); + m.save().await; } // Load in new instance diff --git a/nora-registry/src/main.rs b/nora-registry/src/main.rs index d75f8fa..e2bbb32 100644 --- a/nora-registry/src/main.rs +++ b/nora-registry/src/main.rs @@ -441,9 +441,9 @@ async fn run_server(config: Config, storage: Storage) { let mut interval = tokio::time::interval(std::time::Duration::from_secs(30)); loop { interval.tick().await; - metrics_state.metrics.save(); + metrics_state.metrics.save().await; if let Some(ref token_store) = metrics_state.tokens { - token_store.flush_last_used(); + token_store.flush_last_used().await; } registry::docker::cleanup_expired_sessions(&metrics_state.upload_sessions); } @@ -459,7 +459,7 @@ async fn run_server(config: Config, storage: Storage) { .expect("Server error"); // Save metrics on shutdown - state.metrics.save(); + state.metrics.save().await; info!( uptime_seconds = state.start_time.elapsed().as_secs(), diff --git a/nora-registry/src/tokens.rs b/nora-registry/src/tokens.rs index 80d0f6e..2f87bdf 100644 --- a/nora-registry/src/tokens.rs +++ b/nora-registry/src/tokens.rs @@ -270,9 +270,9 @@ impl TokenStore { tokens } - /// Flush pending last_used timestamps to disk. + /// Flush pending last_used timestamps to disk (async to avoid blocking runtime). /// Called periodically by background task (every 30s). - pub fn flush_last_used(&self) { + pub async fn flush_last_used(&self) { let pending: HashMap = { let mut map = self.pending_last_used.write(); std::mem::take(&mut *map) @@ -284,7 +284,7 @@ impl TokenStore { for (file_prefix, timestamp) in &pending { let file_path = self.storage_path.join(format!("{}.json", file_prefix)); - let content = match fs::read_to_string(&file_path) { + let content = match tokio::fs::read_to_string(&file_path).await { Ok(c) => c, Err(_) => continue, }; @@ -294,7 +294,7 @@ impl TokenStore { }; info.last_used = Some(*timestamp); if let Ok(json) = serde_json::to_string_pretty(&info) { - let _ = fs::write(&file_path, &json); + let _ = tokio::fs::write(&file_path, &json).await; set_file_permissions_600(&file_path); } } @@ -597,8 +597,8 @@ mod tests { assert_eq!(store.list_tokens("user2").len(), 1); } - #[test] - fn test_token_updates_last_used() { + #[tokio::test] + async fn test_token_updates_last_used() { let temp_dir = TempDir::new().unwrap(); let store = TokenStore::new(temp_dir.path()); @@ -609,7 +609,7 @@ mod tests { store.verify_token(&token).unwrap(); // last_used is deferred — flush to persist - store.flush_last_used(); + store.flush_last_used().await; let tokens = store.list_tokens("testuser"); assert!(tokens[0].last_used.is_some());