refactor(docker): move upload sessions from global static to AppState

Upload sessions were stored in a global LazyLock<RwLock<HashMap>>,
making them impossible to test in isolation and invisible to other
parts of the system. Multi-instance deployments would also lose
sessions started on a different node.

Changes:
- Move upload_sessions into AppState as Arc<RwLock<HashMap>>
- Add State extractor to start_upload, patch_blob and their _ns wrappers
- Expire sessions in the existing 30s background task (alongside metrics)
- Make UploadSession and cleanup_expired_sessions pub for AppState access
This commit is contained in:
2026-04-02 12:12:00 +00:00
parent 1d47e92d3b
commit 848f5f5571
2 changed files with 31 additions and 25 deletions

View File

@@ -43,6 +43,9 @@ use repo_index::RepoIndex;
pub use storage::Storage; pub use storage::Storage;
use tokens::TokenStore; use tokens::TokenStore;
use parking_lot::RwLock;
use std::collections::HashMap;
#[derive(Parser)] #[derive(Parser)]
#[command(name = "nora", version, about = "Multi-protocol artifact registry")] #[command(name = "nora", version, about = "Multi-protocol artifact registry")]
struct Cli { struct Cli {
@@ -109,6 +112,7 @@ pub struct AppState {
pub docker_auth: registry::DockerAuth, pub docker_auth: registry::DockerAuth,
pub repo_index: RepoIndex, pub repo_index: RepoIndex,
pub http_client: reqwest::Client, pub http_client: reqwest::Client,
pub upload_sessions: Arc<RwLock<HashMap<String, registry::docker::UploadSession>>>,
} }
#[tokio::main] #[tokio::main]
@@ -369,6 +373,7 @@ async fn run_server(config: Config, storage: Storage) {
docker_auth, docker_auth,
repo_index: RepoIndex::new(), repo_index: RepoIndex::new(),
http_client, http_client,
upload_sessions: Arc::new(RwLock::new(HashMap::new())),
}); });
let app = Router::new() let app = Router::new()
@@ -440,6 +445,7 @@ async fn run_server(config: Config, storage: Storage) {
if let Some(ref token_store) = metrics_state.tokens { if let Some(ref token_store) = metrics_state.tokens {
token_store.flush_last_used(); token_store.flush_last_used();
} }
registry::docker::cleanup_expired_sessions(&metrics_state.upload_sessions);
} }
}); });

View File

@@ -45,7 +45,7 @@ pub struct LayerInfo {
} }
/// In-progress upload session with metadata /// In-progress upload session with metadata
struct UploadSession { pub struct UploadSession {
data: Vec<u8>, data: Vec<u8>,
name: String, name: String,
created_at: std::time::Instant, created_at: std::time::Instant,
@@ -75,21 +75,16 @@ fn max_session_size() -> usize {
mb.saturating_mul(1024 * 1024) mb.saturating_mul(1024 * 1024)
} }
/// In-progress upload sessions for chunked uploads /// Remove expired upload sessions (called by background task)
/// Maps UUID -> UploadSession with limits and TTL pub fn cleanup_expired_sessions(sessions: &RwLock<HashMap<String, UploadSession>>) {
static UPLOAD_SESSIONS: std::sync::LazyLock<RwLock<HashMap<String, UploadSession>>> = let mut guard = sessions.write();
std::sync::LazyLock::new(|| RwLock::new(HashMap::new())); let before = guard.len();
guard.retain(|_, s| s.created_at.elapsed() < SESSION_TTL);
/// Remove expired upload sessions (called periodically) let removed = before - guard.len();
fn cleanup_expired_sessions() {
let mut sessions = UPLOAD_SESSIONS.write();
let before = sessions.len();
sessions.retain(|_, s| s.created_at.elapsed() < SESSION_TTL);
let removed = before - sessions.len();
if removed > 0 { if removed > 0 {
tracing::info!( tracing::info!(
removed = removed, removed = removed,
remaining = sessions.len(), remaining = guard.len(),
"Cleaned up expired upload sessions" "Cleaned up expired upload sessions"
); );
} }
@@ -305,17 +300,14 @@ async fn download_blob(
StatusCode::NOT_FOUND.into_response() StatusCode::NOT_FOUND.into_response()
} }
async fn start_upload(Path(name): Path<String>) -> Response { async fn start_upload(State(state): State<Arc<AppState>>, Path(name): Path<String>) -> Response {
if let Err(e) = validate_docker_name(&name) { if let Err(e) = validate_docker_name(&name) {
return (StatusCode::BAD_REQUEST, e.to_string()).into_response(); return (StatusCode::BAD_REQUEST, e.to_string()).into_response();
} }
// Cleanup expired sessions before checking limits
cleanup_expired_sessions();
// Enforce max concurrent sessions // Enforce max concurrent sessions
{ {
let sessions = UPLOAD_SESSIONS.read(); let sessions = state.upload_sessions.read();
let max_sessions = max_upload_sessions(); let max_sessions = max_upload_sessions();
if sessions.len() >= max_sessions { if sessions.len() >= max_sessions {
tracing::warn!( tracing::warn!(
@@ -331,7 +323,7 @@ async fn start_upload(Path(name): Path<String>) -> Response {
// Create session with metadata // Create session with metadata
{ {
let mut sessions = UPLOAD_SESSIONS.write(); let mut sessions = state.upload_sessions.write();
sessions.insert( sessions.insert(
uuid.clone(), uuid.clone(),
UploadSession { UploadSession {
@@ -355,14 +347,18 @@ async fn start_upload(Path(name): Path<String>) -> Response {
/// PATCH handler for chunked blob uploads /// PATCH handler for chunked blob uploads
/// Docker client sends data chunks via PATCH, then finalizes with PUT /// Docker client sends data chunks via PATCH, then finalizes with PUT
async fn patch_blob(Path((name, uuid)): Path<(String, String)>, body: Bytes) -> Response { async fn patch_blob(
State(state): State<Arc<AppState>>,
Path((name, uuid)): Path<(String, String)>,
body: Bytes,
) -> Response {
if let Err(e) = validate_docker_name(&name) { if let Err(e) = validate_docker_name(&name) {
return (StatusCode::BAD_REQUEST, e.to_string()).into_response(); return (StatusCode::BAD_REQUEST, e.to_string()).into_response();
} }
// Append data to the upload session and get total size // Append data to the upload session and get total size
let total_size = { let total_size = {
let mut sessions = UPLOAD_SESSIONS.write(); let mut sessions = state.upload_sessions.write();
let session = match sessions.get_mut(&uuid) { let session = match sessions.get_mut(&uuid) {
Some(s) => s, Some(s) => s,
None => { None => {
@@ -449,7 +445,7 @@ async fn upload_blob(
// Get data from chunked session if exists, otherwise use body directly // Get data from chunked session if exists, otherwise use body directly
let data = { let data = {
let mut sessions = UPLOAD_SESSIONS.write(); let mut sessions = state.upload_sessions.write();
if let Some(session) = sessions.remove(&uuid) { if let Some(session) = sessions.remove(&uuid) {
// Verify session belongs to this repository // Verify session belongs to this repository
if session.name != name { if session.name != name {
@@ -921,17 +917,21 @@ async fn download_blob_ns(
download_blob(state, Path((full_name, digest))).await download_blob(state, Path((full_name, digest))).await
} }
async fn start_upload_ns(Path((ns, name)): Path<(String, String)>) -> Response { async fn start_upload_ns(
state: State<Arc<AppState>>,
Path((ns, name)): Path<(String, String)>,
) -> Response {
let full_name = format!("{}/{}", ns, name); let full_name = format!("{}/{}", ns, name);
start_upload(Path(full_name)).await start_upload(state, Path(full_name)).await
} }
async fn patch_blob_ns( async fn patch_blob_ns(
state: State<Arc<AppState>>,
Path((ns, name, uuid)): Path<(String, String, String)>, Path((ns, name, uuid)): Path<(String, String, String)>,
body: Bytes, body: Bytes,
) -> Response { ) -> Response {
let full_name = format!("{}/{}", ns, name); let full_name = format!("{}/{}", ns, name);
patch_blob(Path((full_name, uuid)), body).await patch_blob(state, Path((full_name, uuid)), body).await
} }
async fn upload_blob_ns( async fn upload_blob_ns(