3 Commits

Author SHA1 Message Date
d0a9459acd Fix Docker push/pull: add PATCH endpoint for chunked uploads
- Add PATCH handler for /v2/{name}/blobs/uploads/{uuid} to support
  chunked blob uploads (Docker sends data chunks via PATCH)
- Include Range header in PATCH response to indicate bytes received
- Add Docker-Content-Digest header to GET manifest responses
- Store manifests by both tag and digest for proper pull support
- Add parking_lot dependency for upload session state management
2026-01-26 12:01:05 +00:00
482a68637e Fix rate limiting: exempt health/metrics, increase upload limits
- Health, metrics, UI, and API docs are now exempt from rate limiting
- Increased upload rate limits to 200 req/s with burst of 500 for Docker compatibility
2026-01-26 11:04:14 +00:00
61f8a39279 Use self-hosted runner for release builds
16-core runner should be 3-4x faster than GitHub's 2-core runners
2026-01-26 10:39:04 +00:00
6 changed files with 127 additions and 39 deletions

View File

@@ -11,7 +11,7 @@ env:
jobs: jobs:
build: build:
name: Build & Push name: Build & Push
runs-on: ubuntu-latest runs-on: self-hosted
permissions: permissions:
contents: read contents: read
packages: write packages: write

1
Cargo.lock generated
View File

@@ -1212,6 +1212,7 @@ dependencies = [
"httpdate", "httpdate",
"indicatif", "indicatif",
"lazy_static", "lazy_static",
"parking_lot",
"prometheus", "prometheus",
"reqwest", "reqwest",
"serde", "serde",

View File

@@ -41,6 +41,7 @@ chrono = { version = "0.4", features = ["serde"] }
thiserror = "2" thiserror = "2"
tower_governor = "0.8" tower_governor = "0.8"
governor = "0.10" governor = "0.10"
parking_lot = "0.12"
[dev-dependencies] [dev-dependencies]
tempfile = "3" tempfile = "3"

View File

@@ -219,14 +219,22 @@ async fn run_server(config: Config, storage: Storage) {
.merge(registry::pypi_routes()) .merge(registry::pypi_routes())
.layer(rate_limit::upload_rate_limiter()); .layer(rate_limit::upload_rate_limiter());
let app = Router::new() // Routes WITHOUT rate limiting (health, metrics, UI)
let public_routes = Router::new()
.merge(health::routes()) .merge(health::routes())
.merge(metrics::routes()) .merge(metrics::routes())
.merge(ui::routes()) .merge(ui::routes())
.merge(openapi::routes()) .merge(openapi::routes());
// Routes WITH rate limiting
let rate_limited_routes = Router::new()
.merge(auth_routes) .merge(auth_routes)
.merge(registry_routes) .merge(registry_routes)
.layer(rate_limit::general_rate_limiter()) // General rate limit for all routes .layer(rate_limit::general_rate_limiter());
let app = Router::new()
.merge(public_routes)
.merge(rate_limited_routes)
.layer(DefaultBodyLimit::max(100 * 1024 * 1024)) // 100MB default body limit .layer(DefaultBodyLimit::max(100 * 1024 * 1024)) // 100MB default body limit
.layer(middleware::from_fn(request_id::request_id_middleware)) .layer(middleware::from_fn(request_id::request_id_middleware))
.layer(middleware::from_fn(metrics::metrics_middleware)) .layer(middleware::from_fn(metrics::metrics_middleware))

View File

@@ -30,8 +30,8 @@ impl Default for RateLimitConfig {
Self { Self {
auth_rps: 1, // 1 req/sec for auth (strict) auth_rps: 1, // 1 req/sec for auth (strict)
auth_burst: 5, // Allow burst of 5 auth_burst: 5, // Allow burst of 5
upload_rps: 50, // 50 req/sec for uploads (Docker needs parallel) upload_rps: 200, // 200 req/sec for uploads (Docker needs high parallelism)
upload_burst: 100, // Allow burst of 100 upload_burst: 500, // Allow burst of 500
general_rps: 100, // 100 req/sec general general_rps: 100, // 100 req/sec general
general_burst: 200, // Allow burst of 200 general_burst: 200, // Allow burst of 200
} }
@@ -58,16 +58,16 @@ pub fn auth_rate_limiter() -> tower_governor::GovernorLayer<
/// Create rate limiter layer for upload endpoints /// Create rate limiter layer for upload endpoints
/// ///
/// Default: 50 requests per second, burst of 100 /// Default: 200 requests per second, burst of 500
/// Higher limits to accommodate Docker client's parallel layer uploads /// High limits to accommodate Docker client's aggressive parallel layer uploads
pub fn upload_rate_limiter() -> tower_governor::GovernorLayer< pub fn upload_rate_limiter() -> tower_governor::GovernorLayer<
tower_governor::key_extractor::PeerIpKeyExtractor, tower_governor::key_extractor::PeerIpKeyExtractor,
governor::middleware::StateInformationMiddleware, governor::middleware::StateInformationMiddleware,
axum::body::Body, axum::body::Body,
> { > {
let config = GovernorConfigBuilder::default() let config = GovernorConfigBuilder::default()
.per_second(50) .per_second(200)
.burst_size(100) .burst_size(500)
.use_headers() .use_headers()
.finish() .finish()
.unwrap(); .unwrap();
@@ -102,7 +102,7 @@ mod tests {
let config = RateLimitConfig::default(); let config = RateLimitConfig::default();
assert_eq!(config.auth_rps, 1); assert_eq!(config.auth_rps, 1);
assert_eq!(config.auth_burst, 5); assert_eq!(config.auth_burst, 5);
assert_eq!(config.upload_rps, 50); assert_eq!(config.upload_rps, 200);
assert_eq!(config.general_rps, 100); assert_eq!(config.general_rps, 100);
} }

View File

@@ -5,12 +5,19 @@ use axum::{
extract::{Path, State}, extract::{Path, State},
http::{header, HeaderName, StatusCode}, http::{header, HeaderName, StatusCode},
response::{IntoResponse, Response}, response::{IntoResponse, Response},
routing::{get, head, put}, routing::{get, head, patch, put},
Json, Router, Json, Router,
}; };
use parking_lot::RwLock;
use serde_json::{json, Value}; use serde_json::{json, Value};
use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
/// In-progress upload sessions for chunked uploads
/// Maps UUID -> accumulated data
static UPLOAD_SESSIONS: std::sync::LazyLock<RwLock<HashMap<String, Vec<u8>>>> =
std::sync::LazyLock::new(|| RwLock::new(HashMap::new()));
pub fn routes() -> Router<Arc<AppState>> { pub fn routes() -> Router<Arc<AppState>> {
Router::new() Router::new()
.route("/v2/", get(check)) .route("/v2/", get(check))
@@ -20,7 +27,10 @@ pub fn routes() -> Router<Arc<AppState>> {
"/v2/{name}/blobs/uploads/", "/v2/{name}/blobs/uploads/",
axum::routing::post(start_upload), axum::routing::post(start_upload),
) )
.route("/v2/{name}/blobs/uploads/{uuid}", put(upload_blob)) .route(
"/v2/{name}/blobs/uploads/{uuid}",
patch(patch_blob).put(upload_blob),
)
.route("/v2/{name}/manifests/{reference}", get(get_manifest)) .route("/v2/{name}/manifests/{reference}", get(get_manifest))
.route("/v2/{name}/manifests/{reference}", put(put_manifest)) .route("/v2/{name}/manifests/{reference}", put(put_manifest))
.route("/v2/{name}/tags/list", get(list_tags)) .route("/v2/{name}/tags/list", get(list_tags))
@@ -92,9 +102,46 @@ async fn start_upload(Path(name): Path<String>) -> Response {
.into_response() .into_response()
} }
/// PATCH handler for chunked blob uploads
/// Docker client sends data chunks via PATCH, then finalizes with PUT
async fn patch_blob(Path((name, uuid)): Path<(String, String)>, body: Bytes) -> Response {
if let Err(e) = validate_docker_name(&name) {
return (StatusCode::BAD_REQUEST, e.to_string()).into_response();
}
// Append data to the upload session and get total size
let total_size = {
let mut sessions = UPLOAD_SESSIONS.write();
let session = sessions.entry(uuid.clone()).or_insert_with(Vec::new);
session.extend_from_slice(&body);
session.len()
};
let location = format!("/v2/{}/blobs/uploads/{}", name, uuid);
// Range header indicates bytes 0 to (total_size - 1) have been received
let range = if total_size > 0 {
format!("0-{}", total_size - 1)
} else {
"0-0".to_string()
};
(
StatusCode::ACCEPTED,
[
(header::LOCATION, location),
(header::RANGE, range),
(HeaderName::from_static("docker-upload-uuid"), uuid),
],
)
.into_response()
}
/// PUT handler for completing blob uploads
/// Handles both monolithic uploads (body contains all data) and
/// chunked upload finalization (body may be empty, data in session)
async fn upload_blob( async fn upload_blob(
State(state): State<Arc<AppState>>, State(state): State<Arc<AppState>>,
Path((name, _uuid)): Path<(String, String)>, Path((name, uuid)): Path<(String, String)>,
axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>, axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
body: Bytes, body: Bytes,
) -> Response { ) -> Response {
@@ -111,8 +158,23 @@ async fn upload_blob(
return (StatusCode::BAD_REQUEST, e.to_string()).into_response(); return (StatusCode::BAD_REQUEST, e.to_string()).into_response();
} }
// Get data from chunked session if exists, otherwise use body directly
let data = {
let mut sessions = UPLOAD_SESSIONS.write();
if let Some(mut session_data) = sessions.remove(&uuid) {
// Chunked upload: append any final body data and use session
if !body.is_empty() {
session_data.extend_from_slice(&body);
}
session_data
} else {
// Monolithic upload: use body directly
body.to_vec()
}
};
let key = format!("docker/{}/blobs/{}", name, digest); let key = format!("docker/{}/blobs/{}", name, digest);
match state.storage.put(&key, &body).await { match state.storage.put(&key, &data).await {
Ok(()) => { Ok(()) => {
let location = format!("/v2/{}/blobs/{}", name, digest); let location = format!("/v2/{}/blobs/{}", name, digest);
(StatusCode::CREATED, [(header::LOCATION, location)]).into_response() (StatusCode::CREATED, [(header::LOCATION, location)]).into_response()
@@ -134,15 +196,23 @@ async fn get_manifest(
let key = format!("docker/{}/manifests/{}.json", name, reference); let key = format!("docker/{}/manifests/{}.json", name, reference);
match state.storage.get(&key).await { match state.storage.get(&key).await {
Ok(data) => ( Ok(data) => {
StatusCode::OK, // Calculate digest for Docker-Content-Digest header
[( use sha2::Digest;
header::CONTENT_TYPE, let digest = format!("sha256:{:x}", sha2::Sha256::digest(&data));
"application/vnd.docker.distribution.manifest.v2+json", (
)], StatusCode::OK,
data, [
) (
.into_response(), header::CONTENT_TYPE,
"application/vnd.docker.distribution.manifest.v2+json".to_string(),
),
(HeaderName::from_static("docker-content-digest"), digest),
],
data,
)
.into_response()
}
Err(_) => StatusCode::NOT_FOUND.into_response(), Err(_) => StatusCode::NOT_FOUND.into_response(),
} }
} }
@@ -159,23 +229,31 @@ async fn put_manifest(
return (StatusCode::BAD_REQUEST, e.to_string()).into_response(); return (StatusCode::BAD_REQUEST, e.to_string()).into_response();
} }
// Calculate digest
use sha2::Digest;
let digest = format!("sha256:{:x}", sha2::Sha256::digest(&body));
// Store by tag/reference
let key = format!("docker/{}/manifests/{}.json", name, reference); let key = format!("docker/{}/manifests/{}.json", name, reference);
match state.storage.put(&key, &body).await { if let Err(_) = state.storage.put(&key, &body).await {
Ok(()) => { return StatusCode::INTERNAL_SERVER_ERROR.into_response();
use sha2::Digest;
let digest = format!("sha256:{:x}", sha2::Sha256::digest(&body));
let location = format!("/v2/{}/manifests/{}", name, reference);
(
StatusCode::CREATED,
[
(header::LOCATION, location),
(HeaderName::from_static("docker-content-digest"), digest),
],
)
.into_response()
}
Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
} }
// Also store by digest for direct digest lookups
let digest_key = format!("docker/{}/manifests/{}.json", name, digest);
if let Err(_) = state.storage.put(&digest_key, &body).await {
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
}
let location = format!("/v2/{}/manifests/{}", name, reference);
(
StatusCode::CREATED,
[
(header::LOCATION, location),
(HeaderName::from_static("docker-content-digest"), digest),
],
)
.into_response()
} }
async fn list_tags(State(state): State<Arc<AppState>>, Path(name): Path<String>) -> Response { async fn list_tags(State(state): State<Arc<AppState>>, Path(name): Path<String>) -> Response {