feat: add gc command and fix Docker-Content-Digest for Helm OCI

- Add nora gc --dry-run command for orphaned blob cleanup
- Fix Docker-Content-Digest header in blob upload response (enables Helm OCI push)
- Mark-and-sweep GC: list blobs, parse manifests, find/delete orphans

DevITWay
This commit is contained in:
2026-03-03 10:28:39 +00:00
parent e34032d08f
commit f560e5f76b
3 changed files with 144 additions and 1 deletions

118
nora-registry/src/gc.rs Normal file
View File

@@ -0,0 +1,118 @@
//! Garbage Collection for orphaned blobs
//!
//! Mark-and-sweep approach:
//! 1. List all blobs across registries
//! 2. Parse all manifests to find referenced blobs
//! 3. Blobs not referenced by any manifest = orphans
//! 4. Delete orphans (with --dry-run support)
use std::collections::HashSet;
use tracing::info;
use crate::storage::Storage;
pub struct GcResult {
pub total_blobs: usize,
pub referenced_blobs: usize,
pub orphaned_blobs: usize,
pub deleted_blobs: usize,
pub orphan_keys: Vec<String>,
}
pub async fn run_gc(storage: &Storage, dry_run: bool) -> GcResult {
info!("Starting garbage collection (dry_run={})", dry_run);
// 1. Collect all blob keys
let all_blobs = collect_all_blobs(storage).await;
info!("Found {} total blobs", all_blobs.len());
// 2. Collect all referenced digests from manifests
let referenced = collect_referenced_digests(storage).await;
info!("Found {} referenced digests from manifests", referenced.len());
// 3. Find orphans
let mut orphan_keys: Vec<String> = Vec::new();
for key in &all_blobs {
if let Some(digest) = key.rsplit('/').next() {
if !referenced.contains(digest) {
orphan_keys.push(key.clone());
}
}
}
info!("Found {} orphaned blobs", orphan_keys.len());
let mut deleted = 0;
if !dry_run {
for key in &orphan_keys {
if storage.delete(key).await.is_ok() {
deleted += 1;
info!("Deleted: {}", key);
}
}
info!("Deleted {} orphaned blobs", deleted);
} else {
for key in &orphan_keys {
info!("[dry-run] Would delete: {}", key);
}
}
GcResult {
total_blobs: all_blobs.len(),
referenced_blobs: referenced.len(),
orphaned_blobs: orphan_keys.len(),
deleted_blobs: deleted,
orphan_keys,
}
}
async fn collect_all_blobs(storage: &Storage) -> Vec<String> {
let mut blobs = Vec::new();
let docker_blobs = storage.list("docker/").await;
for key in docker_blobs {
if key.contains("/blobs/") {
blobs.push(key);
}
}
blobs
}
async fn collect_referenced_digests(storage: &Storage) -> HashSet<String> {
let mut referenced = HashSet::new();
let all_keys = storage.list("docker/").await;
for key in &all_keys {
if !key.contains("/manifests/") || !key.ends_with(".json") || key.ends_with(".meta.json") {
continue;
}
if let Ok(data) = storage.get(key).await {
if let Ok(json) = serde_json::from_slice::<serde_json::Value>(&data) {
if let Some(config) = json.get("config") {
if let Some(digest) = config.get("digest").and_then(|v| v.as_str()) {
referenced.insert(digest.to_string());
}
}
if let Some(layers) = json.get("layers").and_then(|v| v.as_array()) {
for layer in layers {
if let Some(digest) = layer.get("digest").and_then(|v| v.as_str()) {
referenced.insert(digest.to_string());
}
}
}
if let Some(manifests) = json.get("manifests").and_then(|v| v.as_array()) {
for m in manifests {
if let Some(digest) = m.get("digest").and_then(|v| v.as_str()) {
referenced.insert(digest.to_string());
}
}
}
}
}
}
referenced
}

View File

@@ -11,6 +11,7 @@ mod health;
mod metrics; mod metrics;
mod migrate; mod migrate;
mod openapi; mod openapi;
mod gc;
mod rate_limit; mod rate_limit;
mod registry; mod registry;
mod repo_index; mod repo_index;
@@ -61,6 +62,12 @@ enum Commands {
#[arg(short, long)] #[arg(short, long)]
input: PathBuf, input: PathBuf,
}, },
/// Garbage collect orphaned blobs
Gc {
/// Dry run - show what would be deleted without deleting
#[arg(long, default_value = "false")]
dry_run: bool,
},
/// Migrate artifacts between storage backends /// Migrate artifacts between storage backends
Migrate { Migrate {
/// Source storage: local or s3 /// Source storage: local or s3
@@ -143,6 +150,17 @@ async fn main() {
std::process::exit(1); std::process::exit(1);
} }
} }
Some(Commands::Gc { dry_run }) => {
let result = gc::run_gc(&storage, dry_run).await;
println!("GC Summary:");
println!(" Total blobs: {}", result.total_blobs);
println!(" Referenced: {}", result.referenced_blobs);
println!(" Orphaned: {}", result.orphaned_blobs);
println!(" Deleted: {}", result.deleted_blobs);
if dry_run && !result.orphan_keys.is_empty() {
println!("\nRun without --dry-run to delete orphaned blobs.");
}
}
Some(Commands::Migrate { from, to, dry_run }) => { Some(Commands::Migrate { from, to, dry_run }) => {
let source = match from.as_str() { let source = match from.as_str() {
"local" => Storage::new_local(&config.storage.path), "local" => Storage::new_local(&config.storage.path),

View File

@@ -307,7 +307,14 @@ async fn upload_blob(
)); ));
state.repo_index.invalidate("docker"); state.repo_index.invalidate("docker");
let location = format!("/v2/{}/blobs/{}", name, digest); let location = format!("/v2/{}/blobs/{}", name, digest);
(StatusCode::CREATED, [(header::LOCATION, location)]).into_response() (
StatusCode::CREATED,
[
(header::LOCATION, location),
(HeaderName::from_static("docker-content-digest"), digest.to_string()),
],
)
.into_response()
} }
Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(), Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
} }