diff --git a/Cargo.lock b/Cargo.lock index 1f64525..e1832ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1212,6 +1212,7 @@ dependencies = [ "httpdate", "indicatif", "lazy_static", + "parking_lot", "prometheus", "reqwest", "serde", diff --git a/nora-registry/Cargo.toml b/nora-registry/Cargo.toml index 9df3d13..c24930b 100644 --- a/nora-registry/Cargo.toml +++ b/nora-registry/Cargo.toml @@ -41,6 +41,7 @@ chrono = { version = "0.4", features = ["serde"] } thiserror = "2" tower_governor = "0.8" governor = "0.10" +parking_lot = "0.12" [dev-dependencies] tempfile = "3" diff --git a/nora-registry/src/registry/docker.rs b/nora-registry/src/registry/docker.rs index b9da170..dd7607d 100644 --- a/nora-registry/src/registry/docker.rs +++ b/nora-registry/src/registry/docker.rs @@ -5,12 +5,19 @@ use axum::{ extract::{Path, State}, http::{header, HeaderName, StatusCode}, response::{IntoResponse, Response}, - routing::{get, head, put}, + routing::{get, head, patch, put}, Json, Router, }; +use parking_lot::RwLock; use serde_json::{json, Value}; +use std::collections::HashMap; use std::sync::Arc; +/// In-progress upload sessions for chunked uploads +/// Maps UUID -> accumulated data +static UPLOAD_SESSIONS: std::sync::LazyLock>>> = + std::sync::LazyLock::new(|| RwLock::new(HashMap::new())); + pub fn routes() -> Router> { Router::new() .route("/v2/", get(check)) @@ -20,7 +27,10 @@ pub fn routes() -> Router> { "/v2/{name}/blobs/uploads/", axum::routing::post(start_upload), ) - .route("/v2/{name}/blobs/uploads/{uuid}", put(upload_blob)) + .route( + "/v2/{name}/blobs/uploads/{uuid}", + patch(patch_blob).put(upload_blob), + ) .route("/v2/{name}/manifests/{reference}", get(get_manifest)) .route("/v2/{name}/manifests/{reference}", put(put_manifest)) .route("/v2/{name}/tags/list", get(list_tags)) @@ -92,9 +102,46 @@ async fn start_upload(Path(name): Path) -> Response { .into_response() } +/// PATCH handler for chunked blob uploads +/// Docker client sends data chunks via PATCH, then finalizes with PUT +async fn patch_blob(Path((name, uuid)): Path<(String, String)>, body: Bytes) -> Response { + if let Err(e) = validate_docker_name(&name) { + return (StatusCode::BAD_REQUEST, e.to_string()).into_response(); + } + + // Append data to the upload session and get total size + let total_size = { + let mut sessions = UPLOAD_SESSIONS.write(); + let session = sessions.entry(uuid.clone()).or_insert_with(Vec::new); + session.extend_from_slice(&body); + session.len() + }; + + let location = format!("/v2/{}/blobs/uploads/{}", name, uuid); + // Range header indicates bytes 0 to (total_size - 1) have been received + let range = if total_size > 0 { + format!("0-{}", total_size - 1) + } else { + "0-0".to_string() + }; + + ( + StatusCode::ACCEPTED, + [ + (header::LOCATION, location), + (header::RANGE, range), + (HeaderName::from_static("docker-upload-uuid"), uuid), + ], + ) + .into_response() +} + +/// PUT handler for completing blob uploads +/// Handles both monolithic uploads (body contains all data) and +/// chunked upload finalization (body may be empty, data in session) async fn upload_blob( State(state): State>, - Path((name, _uuid)): Path<(String, String)>, + Path((name, uuid)): Path<(String, String)>, axum::extract::Query(params): axum::extract::Query>, body: Bytes, ) -> Response { @@ -111,8 +158,23 @@ async fn upload_blob( return (StatusCode::BAD_REQUEST, e.to_string()).into_response(); } + // Get data from chunked session if exists, otherwise use body directly + let data = { + let mut sessions = UPLOAD_SESSIONS.write(); + if let Some(mut session_data) = sessions.remove(&uuid) { + // Chunked upload: append any final body data and use session + if !body.is_empty() { + session_data.extend_from_slice(&body); + } + session_data + } else { + // Monolithic upload: use body directly + body.to_vec() + } + }; + let key = format!("docker/{}/blobs/{}", name, digest); - match state.storage.put(&key, &body).await { + match state.storage.put(&key, &data).await { Ok(()) => { let location = format!("/v2/{}/blobs/{}", name, digest); (StatusCode::CREATED, [(header::LOCATION, location)]).into_response() @@ -134,15 +196,23 @@ async fn get_manifest( let key = format!("docker/{}/manifests/{}.json", name, reference); match state.storage.get(&key).await { - Ok(data) => ( - StatusCode::OK, - [( - header::CONTENT_TYPE, - "application/vnd.docker.distribution.manifest.v2+json", - )], - data, - ) - .into_response(), + Ok(data) => { + // Calculate digest for Docker-Content-Digest header + use sha2::Digest; + let digest = format!("sha256:{:x}", sha2::Sha256::digest(&data)); + ( + StatusCode::OK, + [ + ( + header::CONTENT_TYPE, + "application/vnd.docker.distribution.manifest.v2+json".to_string(), + ), + (HeaderName::from_static("docker-content-digest"), digest), + ], + data, + ) + .into_response() + } Err(_) => StatusCode::NOT_FOUND.into_response(), } } @@ -159,23 +229,31 @@ async fn put_manifest( return (StatusCode::BAD_REQUEST, e.to_string()).into_response(); } + // Calculate digest + use sha2::Digest; + let digest = format!("sha256:{:x}", sha2::Sha256::digest(&body)); + + // Store by tag/reference let key = format!("docker/{}/manifests/{}.json", name, reference); - match state.storage.put(&key, &body).await { - Ok(()) => { - use sha2::Digest; - let digest = format!("sha256:{:x}", sha2::Sha256::digest(&body)); - let location = format!("/v2/{}/manifests/{}", name, reference); - ( - StatusCode::CREATED, - [ - (header::LOCATION, location), - (HeaderName::from_static("docker-content-digest"), digest), - ], - ) - .into_response() - } - Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(), + if let Err(_) = state.storage.put(&key, &body).await { + return StatusCode::INTERNAL_SERVER_ERROR.into_response(); } + + // Also store by digest for direct digest lookups + let digest_key = format!("docker/{}/manifests/{}.json", name, digest); + if let Err(_) = state.storage.put(&digest_key, &body).await { + return StatusCode::INTERNAL_SERVER_ERROR.into_response(); + } + + let location = format!("/v2/{}/manifests/{}", name, reference); + ( + StatusCode::CREATED, + [ + (header::LOCATION, location), + (HeaderName::from_static("docker-content-digest"), digest), + ], + ) + .into_response() } async fn list_tags(State(state): State>, Path(name): Path) -> Response {