diff --git a/nora-registry/src/main.rs b/nora-registry/src/main.rs index efab92b..d75f8fa 100644 --- a/nora-registry/src/main.rs +++ b/nora-registry/src/main.rs @@ -43,6 +43,9 @@ use repo_index::RepoIndex; pub use storage::Storage; use tokens::TokenStore; +use parking_lot::RwLock; +use std::collections::HashMap; + #[derive(Parser)] #[command(name = "nora", version, about = "Multi-protocol artifact registry")] struct Cli { @@ -109,6 +112,7 @@ pub struct AppState { pub docker_auth: registry::DockerAuth, pub repo_index: RepoIndex, pub http_client: reqwest::Client, + pub upload_sessions: Arc>>, } #[tokio::main] @@ -369,6 +373,7 @@ async fn run_server(config: Config, storage: Storage) { docker_auth, repo_index: RepoIndex::new(), http_client, + upload_sessions: Arc::new(RwLock::new(HashMap::new())), }); let app = Router::new() @@ -440,6 +445,7 @@ async fn run_server(config: Config, storage: Storage) { if let Some(ref token_store) = metrics_state.tokens { token_store.flush_last_used(); } + registry::docker::cleanup_expired_sessions(&metrics_state.upload_sessions); } }); diff --git a/nora-registry/src/registry/docker.rs b/nora-registry/src/registry/docker.rs index 799b0f7..a1d0579 100644 --- a/nora-registry/src/registry/docker.rs +++ b/nora-registry/src/registry/docker.rs @@ -45,7 +45,7 @@ pub struct LayerInfo { } /// In-progress upload session with metadata -struct UploadSession { +pub struct UploadSession { data: Vec, name: String, created_at: std::time::Instant, @@ -75,21 +75,16 @@ fn max_session_size() -> usize { mb.saturating_mul(1024 * 1024) } -/// In-progress upload sessions for chunked uploads -/// Maps UUID -> UploadSession with limits and TTL -static UPLOAD_SESSIONS: std::sync::LazyLock>> = - std::sync::LazyLock::new(|| RwLock::new(HashMap::new())); - -/// Remove expired upload sessions (called periodically) -fn cleanup_expired_sessions() { - let mut sessions = UPLOAD_SESSIONS.write(); - let before = sessions.len(); - sessions.retain(|_, s| s.created_at.elapsed() < SESSION_TTL); - let removed = before - sessions.len(); +/// Remove expired upload sessions (called by background task) +pub fn cleanup_expired_sessions(sessions: &RwLock>) { + let mut guard = sessions.write(); + let before = guard.len(); + guard.retain(|_, s| s.created_at.elapsed() < SESSION_TTL); + let removed = before - guard.len(); if removed > 0 { tracing::info!( removed = removed, - remaining = sessions.len(), + remaining = guard.len(), "Cleaned up expired upload sessions" ); } @@ -305,17 +300,14 @@ async fn download_blob( StatusCode::NOT_FOUND.into_response() } -async fn start_upload(Path(name): Path) -> Response { +async fn start_upload(State(state): State>, Path(name): Path) -> Response { if let Err(e) = validate_docker_name(&name) { return (StatusCode::BAD_REQUEST, e.to_string()).into_response(); } - // Cleanup expired sessions before checking limits - cleanup_expired_sessions(); - // Enforce max concurrent sessions { - let sessions = UPLOAD_SESSIONS.read(); + let sessions = state.upload_sessions.read(); let max_sessions = max_upload_sessions(); if sessions.len() >= max_sessions { tracing::warn!( @@ -331,7 +323,7 @@ async fn start_upload(Path(name): Path) -> Response { // Create session with metadata { - let mut sessions = UPLOAD_SESSIONS.write(); + let mut sessions = state.upload_sessions.write(); sessions.insert( uuid.clone(), UploadSession { @@ -355,14 +347,18 @@ async fn start_upload(Path(name): Path) -> Response { /// PATCH handler for chunked blob uploads /// Docker client sends data chunks via PATCH, then finalizes with PUT -async fn patch_blob(Path((name, uuid)): Path<(String, String)>, body: Bytes) -> Response { +async fn patch_blob( + State(state): State>, + Path((name, uuid)): Path<(String, String)>, + body: Bytes, +) -> Response { if let Err(e) = validate_docker_name(&name) { return (StatusCode::BAD_REQUEST, e.to_string()).into_response(); } // Append data to the upload session and get total size let total_size = { - let mut sessions = UPLOAD_SESSIONS.write(); + let mut sessions = state.upload_sessions.write(); let session = match sessions.get_mut(&uuid) { Some(s) => s, None => { @@ -449,7 +445,7 @@ async fn upload_blob( // Get data from chunked session if exists, otherwise use body directly let data = { - let mut sessions = UPLOAD_SESSIONS.write(); + let mut sessions = state.upload_sessions.write(); if let Some(session) = sessions.remove(&uuid) { // Verify session belongs to this repository if session.name != name { @@ -921,17 +917,21 @@ async fn download_blob_ns( download_blob(state, Path((full_name, digest))).await } -async fn start_upload_ns(Path((ns, name)): Path<(String, String)>) -> Response { +async fn start_upload_ns( + state: State>, + Path((ns, name)): Path<(String, String)>, +) -> Response { let full_name = format!("{}/{}", ns, name); - start_upload(Path(full_name)).await + start_upload(state, Path(full_name)).await } async fn patch_blob_ns( + state: State>, Path((ns, name, uuid)): Path<(String, String, String)>, body: Bytes, ) -> Response { let full_name = format!("{}/{}", ns, name); - patch_blob(Path((full_name, uuid)), body).await + patch_blob(state, Path((full_name, uuid)), body).await } async fn upload_blob_ns(