mirror of
https://github.com/getnora-io/nora.git
synced 2026-04-12 12:40:31 +00:00
Fix Docker push/pull: add PATCH endpoint for chunked uploads
- Add PATCH handler for /v2/{name}/blobs/uploads/{uuid} to support
chunked blob uploads (Docker sends data chunks via PATCH)
- Include Range header in PATCH response to indicate bytes received
- Add Docker-Content-Digest header to GET manifest responses
- Store manifests by both tag and digest for proper pull support
- Add parking_lot dependency for upload session state management
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -1212,6 +1212,7 @@ dependencies = [
|
|||||||
"httpdate",
|
"httpdate",
|
||||||
"indicatif",
|
"indicatif",
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
|
"parking_lot",
|
||||||
"prometheus",
|
"prometheus",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
"serde",
|
"serde",
|
||||||
|
|||||||
@@ -41,6 +41,7 @@ chrono = { version = "0.4", features = ["serde"] }
|
|||||||
thiserror = "2"
|
thiserror = "2"
|
||||||
tower_governor = "0.8"
|
tower_governor = "0.8"
|
||||||
governor = "0.10"
|
governor = "0.10"
|
||||||
|
parking_lot = "0.12"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tempfile = "3"
|
tempfile = "3"
|
||||||
|
|||||||
@@ -5,12 +5,19 @@ use axum::{
|
|||||||
extract::{Path, State},
|
extract::{Path, State},
|
||||||
http::{header, HeaderName, StatusCode},
|
http::{header, HeaderName, StatusCode},
|
||||||
response::{IntoResponse, Response},
|
response::{IntoResponse, Response},
|
||||||
routing::{get, head, put},
|
routing::{get, head, patch, put},
|
||||||
Json, Router,
|
Json, Router,
|
||||||
};
|
};
|
||||||
|
use parking_lot::RwLock;
|
||||||
use serde_json::{json, Value};
|
use serde_json::{json, Value};
|
||||||
|
use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
/// In-progress upload sessions for chunked uploads
|
||||||
|
/// Maps UUID -> accumulated data
|
||||||
|
static UPLOAD_SESSIONS: std::sync::LazyLock<RwLock<HashMap<String, Vec<u8>>>> =
|
||||||
|
std::sync::LazyLock::new(|| RwLock::new(HashMap::new()));
|
||||||
|
|
||||||
pub fn routes() -> Router<Arc<AppState>> {
|
pub fn routes() -> Router<Arc<AppState>> {
|
||||||
Router::new()
|
Router::new()
|
||||||
.route("/v2/", get(check))
|
.route("/v2/", get(check))
|
||||||
@@ -20,7 +27,10 @@ pub fn routes() -> Router<Arc<AppState>> {
|
|||||||
"/v2/{name}/blobs/uploads/",
|
"/v2/{name}/blobs/uploads/",
|
||||||
axum::routing::post(start_upload),
|
axum::routing::post(start_upload),
|
||||||
)
|
)
|
||||||
.route("/v2/{name}/blobs/uploads/{uuid}", put(upload_blob))
|
.route(
|
||||||
|
"/v2/{name}/blobs/uploads/{uuid}",
|
||||||
|
patch(patch_blob).put(upload_blob),
|
||||||
|
)
|
||||||
.route("/v2/{name}/manifests/{reference}", get(get_manifest))
|
.route("/v2/{name}/manifests/{reference}", get(get_manifest))
|
||||||
.route("/v2/{name}/manifests/{reference}", put(put_manifest))
|
.route("/v2/{name}/manifests/{reference}", put(put_manifest))
|
||||||
.route("/v2/{name}/tags/list", get(list_tags))
|
.route("/v2/{name}/tags/list", get(list_tags))
|
||||||
@@ -92,9 +102,46 @@ async fn start_upload(Path(name): Path<String>) -> Response {
|
|||||||
.into_response()
|
.into_response()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// PATCH handler for chunked blob uploads
|
||||||
|
/// Docker client sends data chunks via PATCH, then finalizes with PUT
|
||||||
|
async fn patch_blob(Path((name, uuid)): Path<(String, String)>, body: Bytes) -> Response {
|
||||||
|
if let Err(e) = validate_docker_name(&name) {
|
||||||
|
return (StatusCode::BAD_REQUEST, e.to_string()).into_response();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Append data to the upload session and get total size
|
||||||
|
let total_size = {
|
||||||
|
let mut sessions = UPLOAD_SESSIONS.write();
|
||||||
|
let session = sessions.entry(uuid.clone()).or_insert_with(Vec::new);
|
||||||
|
session.extend_from_slice(&body);
|
||||||
|
session.len()
|
||||||
|
};
|
||||||
|
|
||||||
|
let location = format!("/v2/{}/blobs/uploads/{}", name, uuid);
|
||||||
|
// Range header indicates bytes 0 to (total_size - 1) have been received
|
||||||
|
let range = if total_size > 0 {
|
||||||
|
format!("0-{}", total_size - 1)
|
||||||
|
} else {
|
||||||
|
"0-0".to_string()
|
||||||
|
};
|
||||||
|
|
||||||
|
(
|
||||||
|
StatusCode::ACCEPTED,
|
||||||
|
[
|
||||||
|
(header::LOCATION, location),
|
||||||
|
(header::RANGE, range),
|
||||||
|
(HeaderName::from_static("docker-upload-uuid"), uuid),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
.into_response()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// PUT handler for completing blob uploads
|
||||||
|
/// Handles both monolithic uploads (body contains all data) and
|
||||||
|
/// chunked upload finalization (body may be empty, data in session)
|
||||||
async fn upload_blob(
|
async fn upload_blob(
|
||||||
State(state): State<Arc<AppState>>,
|
State(state): State<Arc<AppState>>,
|
||||||
Path((name, _uuid)): Path<(String, String)>,
|
Path((name, uuid)): Path<(String, String)>,
|
||||||
axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
|
axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
|
||||||
body: Bytes,
|
body: Bytes,
|
||||||
) -> Response {
|
) -> Response {
|
||||||
@@ -111,8 +158,23 @@ async fn upload_blob(
|
|||||||
return (StatusCode::BAD_REQUEST, e.to_string()).into_response();
|
return (StatusCode::BAD_REQUEST, e.to_string()).into_response();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get data from chunked session if exists, otherwise use body directly
|
||||||
|
let data = {
|
||||||
|
let mut sessions = UPLOAD_SESSIONS.write();
|
||||||
|
if let Some(mut session_data) = sessions.remove(&uuid) {
|
||||||
|
// Chunked upload: append any final body data and use session
|
||||||
|
if !body.is_empty() {
|
||||||
|
session_data.extend_from_slice(&body);
|
||||||
|
}
|
||||||
|
session_data
|
||||||
|
} else {
|
||||||
|
// Monolithic upload: use body directly
|
||||||
|
body.to_vec()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
let key = format!("docker/{}/blobs/{}", name, digest);
|
let key = format!("docker/{}/blobs/{}", name, digest);
|
||||||
match state.storage.put(&key, &body).await {
|
match state.storage.put(&key, &data).await {
|
||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
let location = format!("/v2/{}/blobs/{}", name, digest);
|
let location = format!("/v2/{}/blobs/{}", name, digest);
|
||||||
(StatusCode::CREATED, [(header::LOCATION, location)]).into_response()
|
(StatusCode::CREATED, [(header::LOCATION, location)]).into_response()
|
||||||
@@ -134,15 +196,23 @@ async fn get_manifest(
|
|||||||
|
|
||||||
let key = format!("docker/{}/manifests/{}.json", name, reference);
|
let key = format!("docker/{}/manifests/{}.json", name, reference);
|
||||||
match state.storage.get(&key).await {
|
match state.storage.get(&key).await {
|
||||||
Ok(data) => (
|
Ok(data) => {
|
||||||
|
// Calculate digest for Docker-Content-Digest header
|
||||||
|
use sha2::Digest;
|
||||||
|
let digest = format!("sha256:{:x}", sha2::Sha256::digest(&data));
|
||||||
|
(
|
||||||
StatusCode::OK,
|
StatusCode::OK,
|
||||||
[(
|
[
|
||||||
|
(
|
||||||
header::CONTENT_TYPE,
|
header::CONTENT_TYPE,
|
||||||
"application/vnd.docker.distribution.manifest.v2+json",
|
"application/vnd.docker.distribution.manifest.v2+json".to_string(),
|
||||||
)],
|
),
|
||||||
|
(HeaderName::from_static("docker-content-digest"), digest),
|
||||||
|
],
|
||||||
data,
|
data,
|
||||||
)
|
)
|
||||||
.into_response(),
|
.into_response()
|
||||||
|
}
|
||||||
Err(_) => StatusCode::NOT_FOUND.into_response(),
|
Err(_) => StatusCode::NOT_FOUND.into_response(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -159,11 +229,22 @@ async fn put_manifest(
|
|||||||
return (StatusCode::BAD_REQUEST, e.to_string()).into_response();
|
return (StatusCode::BAD_REQUEST, e.to_string()).into_response();
|
||||||
}
|
}
|
||||||
|
|
||||||
let key = format!("docker/{}/manifests/{}.json", name, reference);
|
// Calculate digest
|
||||||
match state.storage.put(&key, &body).await {
|
|
||||||
Ok(()) => {
|
|
||||||
use sha2::Digest;
|
use sha2::Digest;
|
||||||
let digest = format!("sha256:{:x}", sha2::Sha256::digest(&body));
|
let digest = format!("sha256:{:x}", sha2::Sha256::digest(&body));
|
||||||
|
|
||||||
|
// Store by tag/reference
|
||||||
|
let key = format!("docker/{}/manifests/{}.json", name, reference);
|
||||||
|
if let Err(_) = state.storage.put(&key, &body).await {
|
||||||
|
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Also store by digest for direct digest lookups
|
||||||
|
let digest_key = format!("docker/{}/manifests/{}.json", name, digest);
|
||||||
|
if let Err(_) = state.storage.put(&digest_key, &body).await {
|
||||||
|
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
|
||||||
|
}
|
||||||
|
|
||||||
let location = format!("/v2/{}/manifests/{}", name, reference);
|
let location = format!("/v2/{}/manifests/{}", name, reference);
|
||||||
(
|
(
|
||||||
StatusCode::CREATED,
|
StatusCode::CREATED,
|
||||||
@@ -174,9 +255,6 @@ async fn put_manifest(
|
|||||||
)
|
)
|
||||||
.into_response()
|
.into_response()
|
||||||
}
|
}
|
||||||
Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn list_tags(State(state): State<Arc<AppState>>, Path(name): Path<String>) -> Response {
|
async fn list_tags(State(state): State<Arc<AppState>>, Path(name): Path<String>) -> Response {
|
||||||
if let Err(e) = validate_docker_name(&name) {
|
if let Err(e) = validate_docker_name(&name) {
|
||||||
|
|||||||
Reference in New Issue
Block a user