fix(io): replace blocking I/O with async in hot paths

Three subsystems were using std::fs (blocking) inside async context,
which stalls the tokio runtime thread during I/O:

- DashboardMetrics::save(): now uses tokio::fs::write + rename
- TokenStore::flush_last_used(): now uses tokio::fs for batch updates
- AuditLog::log(): moved file write to spawn_blocking (fire-and-forget)

The background task and shutdown handler now properly .await the
async save/flush methods. AuditLog writer wrapped in Arc for
cross-thread access from spawn_blocking.
This commit is contained in:
2026-04-02 12:17:47 +00:00
parent 848f5f5571
commit be7e882391
4 changed files with 38 additions and 30 deletions

View File

@@ -12,6 +12,7 @@ use serde::Serialize;
use std::fs::{self, OpenOptions};
use std::io::Write;
use std::path::PathBuf;
use std::sync::Arc;
use tracing::{info, warn};
#[derive(Debug, Clone, Serialize)]
@@ -39,7 +40,7 @@ impl AuditEntry {
pub struct AuditLog {
path: PathBuf,
writer: Mutex<Option<fs::File>>,
writer: Arc<Mutex<Option<fs::File>>>,
}
impl AuditLog {
@@ -48,23 +49,26 @@ impl AuditLog {
let writer = match OpenOptions::new().create(true).append(true).open(&path) {
Ok(f) => {
info!(path = %path.display(), "Audit log initialized");
Mutex::new(Some(f))
Arc::new(Mutex::new(Some(f)))
}
Err(e) => {
warn!(path = %path.display(), error = %e, "Failed to open audit log, auditing disabled");
Mutex::new(None)
Arc::new(Mutex::new(None))
}
};
Self { path, writer }
}
pub fn log(&self, entry: AuditEntry) {
if let Some(ref mut file) = *self.writer.lock() {
let writer = Arc::clone(&self.writer);
tokio::task::spawn_blocking(move || {
if let Some(ref mut file) = *writer.lock() {
if let Ok(json) = serde_json::to_string(&entry) {
let _ = writeln!(file, "{}", json);
let _ = file.flush();
}
}
});
}
pub fn path(&self) -> &PathBuf {
@@ -100,23 +104,25 @@ mod tests {
assert!(log.path().ends_with("audit.jsonl"));
}
#[test]
fn test_audit_log_write_entry() {
#[tokio::test]
async fn test_audit_log_write_entry() {
let tmp = TempDir::new().unwrap();
let log = AuditLog::new(tmp.path().to_str().unwrap());
let entry = AuditEntry::new("pull", "user1", "lodash", "npm", "downloaded");
log.log(entry);
// Verify file contains the entry
// spawn_blocking is fire-and-forget; give it time to flush
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let content = std::fs::read_to_string(log.path()).unwrap();
assert!(content.contains(r#""action":"pull""#));
assert!(content.contains(r#""actor":"user1""#));
assert!(content.contains(r#""artifact":"lodash""#));
}
#[test]
fn test_audit_log_multiple_entries() {
#[tokio::test]
async fn test_audit_log_multiple_entries() {
let tmp = TempDir::new().unwrap();
let log = AuditLog::new(tmp.path().to_str().unwrap());
@@ -124,6 +130,8 @@ mod tests {
log.log(AuditEntry::new("pull", "user", "b", "npm", ""));
log.log(AuditEntry::new("delete", "admin", "c", "maven", ""));
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let content = std::fs::read_to_string(log.path()).unwrap();
let lines: Vec<&str> = content.lines().collect();
assert_eq!(lines.len(), 3);

View File

@@ -111,8 +111,8 @@ impl DashboardMetrics {
metrics
}
/// Save current metrics to disk
pub fn save(&self) {
/// Save current metrics to disk (async to avoid blocking the runtime)
pub async fn save(&self) {
let Some(path) = &self.persist_path else {
return;
};
@@ -134,8 +134,8 @@ impl DashboardMetrics {
// Atomic write: write to tmp then rename
let tmp = path.with_extension("json.tmp");
if let Ok(data) = serde_json::to_string_pretty(&snap) {
if std::fs::write(&tmp, &data).is_ok() {
let _ = std::fs::rename(&tmp, path);
if tokio::fs::write(&tmp, &data).await.is_ok() {
let _ = tokio::fs::rename(&tmp, path).await;
}
}
}
@@ -317,8 +317,8 @@ mod tests {
assert_eq!(m.get_registry_uploads("unknown"), 0);
}
#[test]
fn test_persistence_save_and_load() {
#[tokio::test]
async fn test_persistence_save_and_load() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().to_str().unwrap();
@@ -329,7 +329,7 @@ mod tests {
m.record_download("docker");
m.record_upload("maven");
m.record_cache_hit();
m.save();
m.save().await;
}
// Load in new instance

View File

@@ -441,9 +441,9 @@ async fn run_server(config: Config, storage: Storage) {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
loop {
interval.tick().await;
metrics_state.metrics.save();
metrics_state.metrics.save().await;
if let Some(ref token_store) = metrics_state.tokens {
token_store.flush_last_used();
token_store.flush_last_used().await;
}
registry::docker::cleanup_expired_sessions(&metrics_state.upload_sessions);
}
@@ -459,7 +459,7 @@ async fn run_server(config: Config, storage: Storage) {
.expect("Server error");
// Save metrics on shutdown
state.metrics.save();
state.metrics.save().await;
info!(
uptime_seconds = state.start_time.elapsed().as_secs(),

View File

@@ -270,9 +270,9 @@ impl TokenStore {
tokens
}
/// Flush pending last_used timestamps to disk.
/// Flush pending last_used timestamps to disk (async to avoid blocking runtime).
/// Called periodically by background task (every 30s).
pub fn flush_last_used(&self) {
pub async fn flush_last_used(&self) {
let pending: HashMap<String, u64> = {
let mut map = self.pending_last_used.write();
std::mem::take(&mut *map)
@@ -284,7 +284,7 @@ impl TokenStore {
for (file_prefix, timestamp) in &pending {
let file_path = self.storage_path.join(format!("{}.json", file_prefix));
let content = match fs::read_to_string(&file_path) {
let content = match tokio::fs::read_to_string(&file_path).await {
Ok(c) => c,
Err(_) => continue,
};
@@ -294,7 +294,7 @@ impl TokenStore {
};
info.last_used = Some(*timestamp);
if let Ok(json) = serde_json::to_string_pretty(&info) {
let _ = fs::write(&file_path, &json);
let _ = tokio::fs::write(&file_path, &json).await;
set_file_permissions_600(&file_path);
}
}
@@ -597,8 +597,8 @@ mod tests {
assert_eq!(store.list_tokens("user2").len(), 1);
}
#[test]
fn test_token_updates_last_used() {
#[tokio::test]
async fn test_token_updates_last_used() {
let temp_dir = TempDir::new().unwrap();
let store = TokenStore::new(temp_dir.path());
@@ -609,7 +609,7 @@ mod tests {
store.verify_token(&token).unwrap();
// last_used is deferred — flush to persist
store.flush_last_used();
store.flush_last_used().await;
let tokens = store.list_tokens("testuser");
assert!(tokens[0].last_used.is_some());