mirror of
https://github.com/getnora-io/nora.git
synced 2026-04-12 16:10:31 +00:00
feat: add Docker image metadata support
- Store metadata (.meta.json) alongside manifests with: - push_timestamp, last_pulled, downloads counter - size_bytes, os, arch, variant - layers list with digest and size - Update metadata on manifest pull (increment downloads, update last_pulled) - Extract OS/arch from config blob on push - Extend UI API TagInfo with metadata fields - Add public_url config option for pull commands - Add Docker upstream proxy with auth support - Add raw repository support - Bump version to 0.2.12
This commit is contained in:
@@ -1,4 +1,6 @@
|
||||
use crate::activity_log::{ActionType, ActivityEntry};
|
||||
use crate::registry::docker_auth::DockerAuth;
|
||||
use crate::storage::Storage;
|
||||
use crate::validation::{validate_digest, validate_docker_name, validate_docker_reference};
|
||||
use crate::AppState;
|
||||
use axum::{
|
||||
@@ -10,9 +12,32 @@ use axum::{
|
||||
Json, Router,
|
||||
};
|
||||
use parking_lot::RwLock;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::{json, Value};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
|
||||
/// Metadata for a Docker image stored alongside manifests
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
||||
pub struct ImageMetadata {
|
||||
pub push_timestamp: u64,
|
||||
pub last_pulled: u64,
|
||||
pub downloads: u64,
|
||||
pub size_bytes: u64,
|
||||
pub os: String,
|
||||
pub arch: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub variant: Option<String>,
|
||||
pub layers: Vec<LayerInfo>,
|
||||
}
|
||||
|
||||
/// Information about a single layer in a Docker image
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct LayerInfo {
|
||||
pub digest: String,
|
||||
pub size: u64,
|
||||
}
|
||||
|
||||
/// In-progress upload sessions for chunked uploads
|
||||
/// Maps UUID -> accumulated data
|
||||
@@ -75,25 +100,63 @@ async fn download_blob(
|
||||
}
|
||||
|
||||
let key = format!("docker/{}/blobs/{}", name, digest);
|
||||
match state.storage.get(&key).await {
|
||||
Ok(data) => {
|
||||
|
||||
// Try local storage first
|
||||
if let Ok(data) = state.storage.get(&key).await {
|
||||
state.metrics.record_download("docker");
|
||||
state.metrics.record_cache_hit();
|
||||
state.activity.push(ActivityEntry::new(
|
||||
ActionType::Pull,
|
||||
format!("{}@{}", name, &digest[..19.min(digest.len())]),
|
||||
"docker",
|
||||
"LOCAL",
|
||||
));
|
||||
return (
|
||||
StatusCode::OK,
|
||||
[(header::CONTENT_TYPE, "application/octet-stream")],
|
||||
data,
|
||||
)
|
||||
.into_response();
|
||||
}
|
||||
|
||||
// Try upstream proxies
|
||||
for upstream in &state.config.docker.upstreams {
|
||||
if let Ok(data) = fetch_blob_from_upstream(
|
||||
&upstream.url,
|
||||
&name,
|
||||
&digest,
|
||||
&state.docker_auth,
|
||||
state.config.docker.proxy_timeout,
|
||||
)
|
||||
.await
|
||||
{
|
||||
state.metrics.record_download("docker");
|
||||
state.metrics.record_cache_hit();
|
||||
state.metrics.record_cache_miss();
|
||||
state.activity.push(ActivityEntry::new(
|
||||
ActionType::Pull,
|
||||
ActionType::ProxyFetch,
|
||||
format!("{}@{}", name, &digest[..19.min(digest.len())]),
|
||||
"docker",
|
||||
"LOCAL",
|
||||
"PROXY",
|
||||
));
|
||||
(
|
||||
|
||||
// Cache in storage (fire and forget)
|
||||
let storage = state.storage.clone();
|
||||
let key_clone = key.clone();
|
||||
let data_clone = data.clone();
|
||||
tokio::spawn(async move {
|
||||
let _ = storage.put(&key_clone, &data_clone).await;
|
||||
});
|
||||
|
||||
return (
|
||||
StatusCode::OK,
|
||||
[(header::CONTENT_TYPE, "application/octet-stream")],
|
||||
data,
|
||||
Bytes::from(data),
|
||||
)
|
||||
.into_response()
|
||||
.into_response();
|
||||
}
|
||||
Err(_) => StatusCode::NOT_FOUND.into_response(),
|
||||
}
|
||||
|
||||
StatusCode::NOT_FOUND.into_response()
|
||||
}
|
||||
|
||||
async fn start_upload(Path(name): Path<String>) -> Response {
|
||||
@@ -213,35 +276,106 @@ async fn get_manifest(
|
||||
}
|
||||
|
||||
let key = format!("docker/{}/manifests/{}.json", name, reference);
|
||||
match state.storage.get(&key).await {
|
||||
Ok(data) => {
|
||||
|
||||
// Try local storage first
|
||||
if let Ok(data) = state.storage.get(&key).await {
|
||||
state.metrics.record_download("docker");
|
||||
state.metrics.record_cache_hit();
|
||||
state.activity.push(ActivityEntry::new(
|
||||
ActionType::Pull,
|
||||
format!("{}:{}", name, reference),
|
||||
"docker",
|
||||
"LOCAL",
|
||||
));
|
||||
|
||||
// Calculate digest for Docker-Content-Digest header
|
||||
use sha2::Digest;
|
||||
let digest = format!("sha256:{:x}", sha2::Sha256::digest(&data));
|
||||
|
||||
// Detect manifest media type from content
|
||||
let content_type = detect_manifest_media_type(&data);
|
||||
|
||||
// Update metadata (downloads, last_pulled) in background
|
||||
let meta_key = format!("docker/{}/manifests/{}.meta.json", name, reference);
|
||||
let storage_clone = state.storage.clone();
|
||||
tokio::spawn(update_metadata_on_pull(storage_clone, meta_key));
|
||||
|
||||
return (
|
||||
StatusCode::OK,
|
||||
[
|
||||
(header::CONTENT_TYPE, content_type),
|
||||
(HeaderName::from_static("docker-content-digest"), digest),
|
||||
],
|
||||
data,
|
||||
)
|
||||
.into_response();
|
||||
}
|
||||
|
||||
// Try upstream proxies
|
||||
for upstream in &state.config.docker.upstreams {
|
||||
if let Ok((data, content_type)) = fetch_manifest_from_upstream(
|
||||
&upstream.url,
|
||||
&name,
|
||||
&reference,
|
||||
&state.docker_auth,
|
||||
state.config.docker.proxy_timeout,
|
||||
)
|
||||
.await
|
||||
{
|
||||
state.metrics.record_download("docker");
|
||||
state.metrics.record_cache_hit();
|
||||
state.metrics.record_cache_miss();
|
||||
state.activity.push(ActivityEntry::new(
|
||||
ActionType::Pull,
|
||||
ActionType::ProxyFetch,
|
||||
format!("{}:{}", name, reference),
|
||||
"docker",
|
||||
"LOCAL",
|
||||
"PROXY",
|
||||
));
|
||||
|
||||
// Calculate digest for Docker-Content-Digest header
|
||||
use sha2::Digest;
|
||||
let digest = format!("sha256:{:x}", sha2::Sha256::digest(&data));
|
||||
(
|
||||
|
||||
// Cache manifest and create metadata (fire and forget)
|
||||
let storage = state.storage.clone();
|
||||
let key_clone = key.clone();
|
||||
let data_clone = data.clone();
|
||||
let name_clone = name.clone();
|
||||
let reference_clone = reference.clone();
|
||||
let digest_clone = digest.clone();
|
||||
tokio::spawn(async move {
|
||||
// Store manifest by tag and digest
|
||||
let _ = storage.put(&key_clone, &data_clone).await;
|
||||
let digest_key = format!("docker/{}/manifests/{}.json", name_clone, digest_clone);
|
||||
let _ = storage.put(&digest_key, &data_clone).await;
|
||||
|
||||
// Extract and save metadata
|
||||
let metadata = extract_metadata(&data_clone, &storage, &name_clone).await;
|
||||
if let Ok(meta_json) = serde_json::to_vec(&metadata) {
|
||||
let meta_key = format!(
|
||||
"docker/{}/manifests/{}.meta.json",
|
||||
name_clone, reference_clone
|
||||
);
|
||||
let _ = storage.put(&meta_key, &meta_json).await;
|
||||
|
||||
let digest_meta_key =
|
||||
format!("docker/{}/manifests/{}.meta.json", name_clone, digest_clone);
|
||||
let _ = storage.put(&digest_meta_key, &meta_json).await;
|
||||
}
|
||||
});
|
||||
|
||||
return (
|
||||
StatusCode::OK,
|
||||
[
|
||||
(
|
||||
header::CONTENT_TYPE,
|
||||
"application/vnd.docker.distribution.manifest.v2+json".to_string(),
|
||||
),
|
||||
(header::CONTENT_TYPE, content_type),
|
||||
(HeaderName::from_static("docker-content-digest"), digest),
|
||||
],
|
||||
data,
|
||||
Bytes::from(data),
|
||||
)
|
||||
.into_response()
|
||||
.into_response();
|
||||
}
|
||||
Err(_) => StatusCode::NOT_FOUND.into_response(),
|
||||
}
|
||||
|
||||
StatusCode::NOT_FOUND.into_response()
|
||||
}
|
||||
|
||||
async fn put_manifest(
|
||||
@@ -272,6 +406,17 @@ async fn put_manifest(
|
||||
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
|
||||
}
|
||||
|
||||
// Extract and save metadata
|
||||
let metadata = extract_metadata(&body, &state.storage, &name).await;
|
||||
let meta_key = format!("docker/{}/manifests/{}.meta.json", name, reference);
|
||||
if let Ok(meta_json) = serde_json::to_vec(&metadata) {
|
||||
let _ = state.storage.put(&meta_key, &meta_json).await;
|
||||
|
||||
// Also save metadata by digest
|
||||
let digest_meta_key = format!("docker/{}/manifests/{}.meta.json", name, digest);
|
||||
let _ = state.storage.put(&digest_meta_key, &meta_json).await;
|
||||
}
|
||||
|
||||
state.metrics.record_upload("docker");
|
||||
state.activity.push(ActivityEntry::new(
|
||||
ActionType::Push,
|
||||
@@ -308,3 +453,314 @@ async fn list_tags(State(state): State<Arc<AppState>>, Path(name): Path<String>)
|
||||
.collect();
|
||||
(StatusCode::OK, Json(json!({"name": name, "tags": tags}))).into_response()
|
||||
}
|
||||
|
||||
/// Fetch a blob from an upstream Docker registry
|
||||
async fn fetch_blob_from_upstream(
|
||||
upstream_url: &str,
|
||||
name: &str,
|
||||
digest: &str,
|
||||
docker_auth: &DockerAuth,
|
||||
timeout: u64,
|
||||
) -> Result<Vec<u8>, ()> {
|
||||
let url = format!(
|
||||
"{}/v2/{}/blobs/{}",
|
||||
upstream_url.trim_end_matches('/'),
|
||||
name,
|
||||
digest
|
||||
);
|
||||
|
||||
let client = reqwest::Client::builder()
|
||||
.timeout(Duration::from_secs(timeout))
|
||||
.build()
|
||||
.map_err(|_| ())?;
|
||||
|
||||
// First try without auth
|
||||
let response = client.get(&url).send().await.map_err(|_| ())?;
|
||||
|
||||
let response = if response.status() == reqwest::StatusCode::UNAUTHORIZED {
|
||||
// Get Www-Authenticate header and fetch token
|
||||
let www_auth = response
|
||||
.headers()
|
||||
.get("www-authenticate")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.map(String::from);
|
||||
|
||||
if let Some(token) = docker_auth
|
||||
.get_token(upstream_url, name, www_auth.as_deref())
|
||||
.await
|
||||
{
|
||||
client
|
||||
.get(&url)
|
||||
.header("Authorization", format!("Bearer {}", token))
|
||||
.send()
|
||||
.await
|
||||
.map_err(|_| ())?
|
||||
} else {
|
||||
return Err(());
|
||||
}
|
||||
} else {
|
||||
response
|
||||
};
|
||||
|
||||
if !response.status().is_success() {
|
||||
return Err(());
|
||||
}
|
||||
|
||||
response.bytes().await.map(|b| b.to_vec()).map_err(|_| ())
|
||||
}
|
||||
|
||||
/// Fetch a manifest from an upstream Docker registry
|
||||
/// Returns (manifest_bytes, content_type)
|
||||
async fn fetch_manifest_from_upstream(
|
||||
upstream_url: &str,
|
||||
name: &str,
|
||||
reference: &str,
|
||||
docker_auth: &DockerAuth,
|
||||
timeout: u64,
|
||||
) -> Result<(Vec<u8>, String), ()> {
|
||||
let url = format!(
|
||||
"{}/v2/{}/manifests/{}",
|
||||
upstream_url.trim_end_matches('/'),
|
||||
name,
|
||||
reference
|
||||
);
|
||||
|
||||
let client = reqwest::Client::builder()
|
||||
.timeout(Duration::from_secs(timeout))
|
||||
.build()
|
||||
.map_err(|_| ())?;
|
||||
|
||||
// Request with Accept header for manifest types
|
||||
let accept_header = "application/vnd.docker.distribution.manifest.v2+json, \
|
||||
application/vnd.docker.distribution.manifest.list.v2+json, \
|
||||
application/vnd.oci.image.manifest.v1+json, \
|
||||
application/vnd.oci.image.index.v1+json";
|
||||
|
||||
// First try without auth
|
||||
let response = client
|
||||
.get(&url)
|
||||
.header("Accept", accept_header)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|_| ())?;
|
||||
|
||||
let response = if response.status() == reqwest::StatusCode::UNAUTHORIZED {
|
||||
// Get Www-Authenticate header and fetch token
|
||||
let www_auth = response
|
||||
.headers()
|
||||
.get("www-authenticate")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.map(String::from);
|
||||
|
||||
if let Some(token) = docker_auth
|
||||
.get_token(upstream_url, name, www_auth.as_deref())
|
||||
.await
|
||||
{
|
||||
client
|
||||
.get(&url)
|
||||
.header("Accept", accept_header)
|
||||
.header("Authorization", format!("Bearer {}", token))
|
||||
.send()
|
||||
.await
|
||||
.map_err(|_| ())?
|
||||
} else {
|
||||
return Err(());
|
||||
}
|
||||
} else {
|
||||
response
|
||||
};
|
||||
|
||||
if !response.status().is_success() {
|
||||
return Err(());
|
||||
}
|
||||
|
||||
let content_type = response
|
||||
.headers()
|
||||
.get("content-type")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.unwrap_or("application/vnd.docker.distribution.manifest.v2+json")
|
||||
.to_string();
|
||||
|
||||
let bytes = response.bytes().await.map_err(|_| ())?;
|
||||
|
||||
Ok((bytes.to_vec(), content_type))
|
||||
}
|
||||
|
||||
/// Detect manifest media type from its JSON content
|
||||
fn detect_manifest_media_type(data: &[u8]) -> String {
|
||||
// Try to parse as JSON and extract mediaType
|
||||
if let Ok(json) = serde_json::from_slice::<Value>(data) {
|
||||
if let Some(media_type) = json.get("mediaType").and_then(|v| v.as_str()) {
|
||||
return media_type.to_string();
|
||||
}
|
||||
|
||||
// Check schemaVersion for older manifests
|
||||
if let Some(schema_version) = json.get("schemaVersion").and_then(|v| v.as_u64()) {
|
||||
if schema_version == 1 {
|
||||
return "application/vnd.docker.distribution.manifest.v1+json".to_string();
|
||||
}
|
||||
// schemaVersion 2 without mediaType is likely docker manifest v2
|
||||
if json.get("config").is_some() {
|
||||
return "application/vnd.docker.distribution.manifest.v2+json".to_string();
|
||||
}
|
||||
// If it has "manifests" array, it's an index/list
|
||||
if json.get("manifests").is_some() {
|
||||
return "application/vnd.oci.image.index.v1+json".to_string();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Default fallback
|
||||
"application/vnd.docker.distribution.manifest.v2+json".to_string()
|
||||
}
|
||||
|
||||
/// Extract metadata from a Docker manifest
|
||||
/// Handles both single-arch manifests and multi-arch indexes
|
||||
async fn extract_metadata(manifest: &[u8], storage: &Storage, name: &str) -> ImageMetadata {
|
||||
let now = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_secs();
|
||||
|
||||
let mut metadata = ImageMetadata {
|
||||
push_timestamp: now,
|
||||
last_pulled: 0,
|
||||
downloads: 0,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let Ok(json) = serde_json::from_slice::<Value>(manifest) else {
|
||||
return metadata;
|
||||
};
|
||||
|
||||
// Check if this is a manifest list/index (multi-arch)
|
||||
if json.get("manifests").is_some() {
|
||||
// For multi-arch, extract info from the first platform manifest
|
||||
if let Some(manifests) = json.get("manifests").and_then(|m| m.as_array()) {
|
||||
// Sum sizes from all platform manifests
|
||||
let total_size: u64 = manifests
|
||||
.iter()
|
||||
.filter_map(|m| m.get("size").and_then(|s| s.as_u64()))
|
||||
.sum();
|
||||
metadata.size_bytes = total_size;
|
||||
|
||||
// Get OS/arch from first platform (usually linux/amd64)
|
||||
if let Some(first) = manifests.first() {
|
||||
if let Some(platform) = first.get("platform") {
|
||||
metadata.os = platform
|
||||
.get("os")
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("multi-arch")
|
||||
.to_string();
|
||||
metadata.arch = platform
|
||||
.get("architecture")
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("multi")
|
||||
.to_string();
|
||||
metadata.variant = platform
|
||||
.get("variant")
|
||||
.and_then(|v| v.as_str())
|
||||
.map(String::from);
|
||||
}
|
||||
}
|
||||
}
|
||||
return metadata;
|
||||
}
|
||||
|
||||
// Single-arch manifest - extract layers
|
||||
if let Some(layers) = json.get("layers").and_then(|l| l.as_array()) {
|
||||
let mut total_size: u64 = 0;
|
||||
for layer in layers {
|
||||
let digest = layer
|
||||
.get("digest")
|
||||
.and_then(|d| d.as_str())
|
||||
.unwrap_or("")
|
||||
.to_string();
|
||||
let size = layer.get("size").and_then(|s| s.as_u64()).unwrap_or(0);
|
||||
total_size += size;
|
||||
metadata.layers.push(LayerInfo { digest, size });
|
||||
}
|
||||
metadata.size_bytes = total_size;
|
||||
}
|
||||
|
||||
// Try to get OS/arch from config blob
|
||||
if let Some(config) = json.get("config") {
|
||||
if let Some(config_digest) = config.get("digest").and_then(|d| d.as_str()) {
|
||||
let (os, arch, variant) = get_config_info(storage, name, config_digest).await;
|
||||
metadata.os = os;
|
||||
metadata.arch = arch;
|
||||
metadata.variant = variant;
|
||||
}
|
||||
}
|
||||
|
||||
// If we couldn't get OS/arch, set defaults
|
||||
if metadata.os.is_empty() {
|
||||
metadata.os = "unknown".to_string();
|
||||
}
|
||||
if metadata.arch.is_empty() {
|
||||
metadata.arch = "unknown".to_string();
|
||||
}
|
||||
|
||||
metadata
|
||||
}
|
||||
|
||||
/// Get OS/arch information from a config blob
|
||||
async fn get_config_info(
|
||||
storage: &Storage,
|
||||
name: &str,
|
||||
config_digest: &str,
|
||||
) -> (String, String, Option<String>) {
|
||||
let key = format!("docker/{}/blobs/{}", name, config_digest);
|
||||
|
||||
let Ok(data) = storage.get(&key).await else {
|
||||
return ("unknown".to_string(), "unknown".to_string(), None);
|
||||
};
|
||||
|
||||
let Ok(config) = serde_json::from_slice::<Value>(&data) else {
|
||||
return ("unknown".to_string(), "unknown".to_string(), None);
|
||||
};
|
||||
|
||||
let os = config
|
||||
.get("os")
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("unknown")
|
||||
.to_string();
|
||||
|
||||
let arch = config
|
||||
.get("architecture")
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("unknown")
|
||||
.to_string();
|
||||
|
||||
let variant = config
|
||||
.get("variant")
|
||||
.and_then(|v| v.as_str())
|
||||
.map(String::from);
|
||||
|
||||
(os, arch, variant)
|
||||
}
|
||||
|
||||
/// Update metadata when a manifest is pulled
|
||||
/// Increments download counter and updates last_pulled timestamp
|
||||
async fn update_metadata_on_pull(storage: Storage, meta_key: String) {
|
||||
let now = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_secs();
|
||||
|
||||
// Try to read existing metadata
|
||||
let mut metadata = if let Ok(data) = storage.get(&meta_key).await {
|
||||
serde_json::from_slice::<ImageMetadata>(&data).unwrap_or_default()
|
||||
} else {
|
||||
ImageMetadata::default()
|
||||
};
|
||||
|
||||
// Update pull stats
|
||||
metadata.downloads += 1;
|
||||
metadata.last_pulled = now;
|
||||
|
||||
// Save back
|
||||
if let Ok(json) = serde_json::to_vec(&metadata) {
|
||||
let _ = storage.put(&meta_key, &json).await;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user