mirror of
https://github.com/getnora-io/nora.git
synced 2026-04-12 20:50:31 +00:00
feat: add S3 authentication and fix Docker multi-segment routes
S3 Storage:
- Implement AWS Signature v4 for S3-compatible storage (MinIO, AWS)
- Add s3_access_key, s3_secret_key, s3_region config options
- Support both authenticated and anonymous S3 access
- Add proper URI encoding for S3 canonical requests
Docker Registry:
- Fix routing for multi-segment image names (e.g., library/alpine)
- Add namespace routes for two-segment paths (/v2/{ns}/{name}/...)
- Add debug tracing for upstream proxy operations
Config:
- Add NORA_STORAGE_S3_ACCESS_KEY env var
- Add NORA_STORAGE_S3_SECRET_KEY env var
- Add NORA_STORAGE_S3_REGION env var (default: us-east-1)
This commit is contained in:
@@ -53,6 +53,19 @@ pub struct StorageConfig {
|
||||
pub s3_url: String,
|
||||
#[serde(default = "default_bucket")]
|
||||
pub bucket: String,
|
||||
/// S3 access key (optional, uses anonymous access if not set)
|
||||
#[serde(default)]
|
||||
pub s3_access_key: Option<String>,
|
||||
/// S3 secret key (optional, uses anonymous access if not set)
|
||||
#[serde(default)]
|
||||
pub s3_secret_key: Option<String>,
|
||||
/// S3 region (default: us-east-1)
|
||||
#[serde(default = "default_s3_region")]
|
||||
pub s3_region: String,
|
||||
}
|
||||
|
||||
fn default_s3_region() -> String {
|
||||
"us-east-1".to_string()
|
||||
}
|
||||
|
||||
fn default_storage_path() -> String {
|
||||
@@ -325,6 +338,15 @@ impl Config {
|
||||
if let Ok(val) = env::var("NORA_STORAGE_BUCKET") {
|
||||
self.storage.bucket = val;
|
||||
}
|
||||
if let Ok(val) = env::var("NORA_STORAGE_S3_ACCESS_KEY") {
|
||||
self.storage.s3_access_key = if val.is_empty() { None } else { Some(val) };
|
||||
}
|
||||
if let Ok(val) = env::var("NORA_STORAGE_S3_SECRET_KEY") {
|
||||
self.storage.s3_secret_key = if val.is_empty() { None } else { Some(val) };
|
||||
}
|
||||
if let Ok(val) = env::var("NORA_STORAGE_S3_REGION") {
|
||||
self.storage.s3_region = val;
|
||||
}
|
||||
|
||||
// Auth config
|
||||
if let Ok(val) = env::var("NORA_AUTH_ENABLED") {
|
||||
@@ -455,6 +477,9 @@ impl Default for Config {
|
||||
path: String::from("data/storage"),
|
||||
s3_url: String::from("http://127.0.0.1:3000"),
|
||||
bucket: String::from("registry"),
|
||||
s3_access_key: None,
|
||||
s3_secret_key: None,
|
||||
s3_region: String::from("us-east-1"),
|
||||
},
|
||||
maven: MavenConfig::default(),
|
||||
npm: NpmConfig::default(),
|
||||
|
||||
@@ -104,10 +104,18 @@ async fn main() {
|
||||
info!(
|
||||
s3_url = %config.storage.s3_url,
|
||||
bucket = %config.storage.bucket,
|
||||
region = %config.storage.s3_region,
|
||||
has_credentials = config.storage.s3_access_key.is_some(),
|
||||
"Using S3 storage"
|
||||
);
|
||||
}
|
||||
Storage::new_s3(&config.storage.s3_url, &config.storage.bucket)
|
||||
Storage::new_s3(
|
||||
&config.storage.s3_url,
|
||||
&config.storage.bucket,
|
||||
&config.storage.s3_region,
|
||||
config.storage.s3_access_key.as_deref(),
|
||||
config.storage.s3_secret_key.as_deref(),
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
@@ -131,7 +139,13 @@ async fn main() {
|
||||
Some(Commands::Migrate { from, to, dry_run }) => {
|
||||
let source = match from.as_str() {
|
||||
"local" => Storage::new_local(&config.storage.path),
|
||||
"s3" => Storage::new_s3(&config.storage.s3_url, &config.storage.bucket),
|
||||
"s3" => Storage::new_s3(
|
||||
&config.storage.s3_url,
|
||||
&config.storage.bucket,
|
||||
&config.storage.s3_region,
|
||||
config.storage.s3_access_key.as_deref(),
|
||||
config.storage.s3_secret_key.as_deref(),
|
||||
),
|
||||
_ => {
|
||||
error!("Invalid source: '{}'. Use 'local' or 's3'", from);
|
||||
std::process::exit(1);
|
||||
@@ -140,7 +154,13 @@ async fn main() {
|
||||
|
||||
let dest = match to.as_str() {
|
||||
"local" => Storage::new_local(&config.storage.path),
|
||||
"s3" => Storage::new_s3(&config.storage.s3_url, &config.storage.bucket),
|
||||
"s3" => Storage::new_s3(
|
||||
&config.storage.s3_url,
|
||||
&config.storage.bucket,
|
||||
&config.storage.s3_region,
|
||||
config.storage.s3_access_key.as_deref(),
|
||||
config.storage.s3_secret_key.as_deref(),
|
||||
),
|
||||
_ => {
|
||||
error!("Invalid destination: '{}'. Use 'local' or 's3'", to);
|
||||
std::process::exit(1);
|
||||
|
||||
@@ -47,19 +47,22 @@ static UPLOAD_SESSIONS: std::sync::LazyLock<RwLock<HashMap<String, Vec<u8>>>> =
|
||||
pub fn routes() -> Router<Arc<AppState>> {
|
||||
Router::new()
|
||||
.route("/v2/", get(check))
|
||||
// Single-segment name routes (e.g., /v2/alpine/...)
|
||||
.route("/v2/{name}/blobs/{digest}", head(check_blob))
|
||||
.route("/v2/{name}/blobs/{digest}", get(download_blob))
|
||||
.route(
|
||||
"/v2/{name}/blobs/uploads/",
|
||||
axum::routing::post(start_upload),
|
||||
)
|
||||
.route(
|
||||
"/v2/{name}/blobs/uploads/{uuid}",
|
||||
patch(patch_blob).put(upload_blob),
|
||||
)
|
||||
.route("/v2/{name}/blobs/uploads/", axum::routing::post(start_upload))
|
||||
.route("/v2/{name}/blobs/uploads/{uuid}", patch(patch_blob).put(upload_blob))
|
||||
.route("/v2/{name}/manifests/{reference}", get(get_manifest))
|
||||
.route("/v2/{name}/manifests/{reference}", put(put_manifest))
|
||||
.route("/v2/{name}/tags/list", get(list_tags))
|
||||
// Two-segment name routes (e.g., /v2/library/alpine/...)
|
||||
.route("/v2/{ns}/{name}/blobs/{digest}", head(check_blob_ns))
|
||||
.route("/v2/{ns}/{name}/blobs/{digest}", get(download_blob_ns))
|
||||
.route("/v2/{ns}/{name}/blobs/uploads/", axum::routing::post(start_upload_ns))
|
||||
.route("/v2/{ns}/{name}/blobs/uploads/{uuid}", patch(patch_blob_ns).put(upload_blob_ns))
|
||||
.route("/v2/{ns}/{name}/manifests/{reference}", get(get_manifest_ns))
|
||||
.route("/v2/{ns}/{name}/manifests/{reference}", put(put_manifest_ns))
|
||||
.route("/v2/{ns}/{name}/tags/list", get(list_tags_ns))
|
||||
}
|
||||
|
||||
async fn check() -> (StatusCode, Json<Value>) {
|
||||
@@ -312,7 +315,12 @@ async fn get_manifest(
|
||||
}
|
||||
|
||||
// Try upstream proxies
|
||||
tracing::debug!(
|
||||
upstreams_count = state.config.docker.upstreams.len(),
|
||||
"Trying upstream proxies"
|
||||
);
|
||||
for upstream in &state.config.docker.upstreams {
|
||||
tracing::debug!(upstream_url = %upstream.url, "Trying upstream");
|
||||
if let Ok((data, content_type)) = fetch_manifest_from_upstream(
|
||||
&upstream.url,
|
||||
&name,
|
||||
@@ -454,6 +462,75 @@ async fn list_tags(State(state): State<Arc<AppState>>, Path(name): Path<String>)
|
||||
(StatusCode::OK, Json(json!({"name": name, "tags": tags}))).into_response()
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Namespace handlers (for two-segment names like library/alpine)
|
||||
// These combine ns/name into a single name and delegate to the main handlers
|
||||
// ============================================================================
|
||||
|
||||
async fn check_blob_ns(
|
||||
state: State<Arc<AppState>>,
|
||||
Path((ns, name, digest)): Path<(String, String, String)>,
|
||||
) -> Response {
|
||||
let full_name = format!("{}/{}", ns, name);
|
||||
check_blob(state, Path((full_name, digest))).await
|
||||
}
|
||||
|
||||
async fn download_blob_ns(
|
||||
state: State<Arc<AppState>>,
|
||||
Path((ns, name, digest)): Path<(String, String, String)>,
|
||||
) -> Response {
|
||||
let full_name = format!("{}/{}", ns, name);
|
||||
download_blob(state, Path((full_name, digest))).await
|
||||
}
|
||||
|
||||
async fn start_upload_ns(Path((ns, name)): Path<(String, String)>) -> Response {
|
||||
let full_name = format!("{}/{}", ns, name);
|
||||
start_upload(Path(full_name)).await
|
||||
}
|
||||
|
||||
async fn patch_blob_ns(
|
||||
Path((ns, name, uuid)): Path<(String, String, String)>,
|
||||
body: Bytes,
|
||||
) -> Response {
|
||||
let full_name = format!("{}/{}", ns, name);
|
||||
patch_blob(Path((full_name, uuid)), body).await
|
||||
}
|
||||
|
||||
async fn upload_blob_ns(
|
||||
state: State<Arc<AppState>>,
|
||||
Path((ns, name, uuid)): Path<(String, String, String)>,
|
||||
query: axum::extract::Query<std::collections::HashMap<String, String>>,
|
||||
body: Bytes,
|
||||
) -> Response {
|
||||
let full_name = format!("{}/{}", ns, name);
|
||||
upload_blob(state, Path((full_name, uuid)), query, body).await
|
||||
}
|
||||
|
||||
async fn get_manifest_ns(
|
||||
state: State<Arc<AppState>>,
|
||||
Path((ns, name, reference)): Path<(String, String, String)>,
|
||||
) -> Response {
|
||||
let full_name = format!("{}/{}", ns, name);
|
||||
get_manifest(state, Path((full_name, reference))).await
|
||||
}
|
||||
|
||||
async fn put_manifest_ns(
|
||||
state: State<Arc<AppState>>,
|
||||
Path((ns, name, reference)): Path<(String, String, String)>,
|
||||
body: Bytes,
|
||||
) -> Response {
|
||||
let full_name = format!("{}/{}", ns, name);
|
||||
put_manifest(state, Path((full_name, reference)), body).await
|
||||
}
|
||||
|
||||
async fn list_tags_ns(
|
||||
state: State<Arc<AppState>>,
|
||||
Path((ns, name)): Path<(String, String)>,
|
||||
) -> Response {
|
||||
let full_name = format!("{}/{}", ns, name);
|
||||
list_tags(state, Path(full_name)).await
|
||||
}
|
||||
|
||||
/// Fetch a blob from an upstream Docker registry
|
||||
async fn fetch_blob_from_upstream(
|
||||
upstream_url: &str,
|
||||
@@ -525,10 +602,14 @@ async fn fetch_manifest_from_upstream(
|
||||
reference
|
||||
);
|
||||
|
||||
tracing::debug!(url = %url, "Fetching manifest from upstream");
|
||||
|
||||
let client = reqwest::Client::builder()
|
||||
.timeout(Duration::from_secs(timeout))
|
||||
.build()
|
||||
.map_err(|_| ())?;
|
||||
.map_err(|e| {
|
||||
tracing::error!(error = %e, "Failed to build HTTP client");
|
||||
})?;
|
||||
|
||||
// Request with Accept header for manifest types
|
||||
let accept_header = "application/vnd.docker.distribution.manifest.v2+json, \
|
||||
@@ -542,7 +623,11 @@ async fn fetch_manifest_from_upstream(
|
||||
.header("Accept", accept_header)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|_| ())?;
|
||||
.map_err(|e| {
|
||||
tracing::error!(error = %e, url = %url, "Failed to send request to upstream");
|
||||
})?;
|
||||
|
||||
tracing::debug!(status = %response.status(), "Initial upstream response");
|
||||
|
||||
let response = if response.status() == reqwest::StatusCode::UNAUTHORIZED {
|
||||
// Get Www-Authenticate header and fetch token
|
||||
@@ -552,25 +637,34 @@ async fn fetch_manifest_from_upstream(
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.map(String::from);
|
||||
|
||||
tracing::debug!(www_auth = ?www_auth, "Got 401, fetching token");
|
||||
|
||||
if let Some(token) = docker_auth
|
||||
.get_token(upstream_url, name, www_auth.as_deref())
|
||||
.await
|
||||
{
|
||||
tracing::debug!("Token acquired, retrying with auth");
|
||||
client
|
||||
.get(&url)
|
||||
.header("Accept", accept_header)
|
||||
.header("Authorization", format!("Bearer {}", token))
|
||||
.send()
|
||||
.await
|
||||
.map_err(|_| ())?
|
||||
.map_err(|e| {
|
||||
tracing::error!(error = %e, "Failed to send authenticated request");
|
||||
})?
|
||||
} else {
|
||||
tracing::error!("Failed to acquire token");
|
||||
return Err(());
|
||||
}
|
||||
} else {
|
||||
response
|
||||
};
|
||||
|
||||
tracing::debug!(status = %response.status(), "Final upstream response");
|
||||
|
||||
if !response.status().is_success() {
|
||||
tracing::warn!(status = %response.status(), "Upstream returned non-success status");
|
||||
return Err(());
|
||||
}
|
||||
|
||||
|
||||
@@ -77,9 +77,12 @@ impl DockerAuth {
|
||||
let scope = format!("repository:{}:pull", name);
|
||||
let url = format!("{}?service={}&scope={}", realm, service, scope);
|
||||
|
||||
tracing::debug!(url = %url, "Fetching auth token");
|
||||
|
||||
let response = self.client.get(&url).send().await.ok()?;
|
||||
|
||||
if !response.status().is_success() {
|
||||
tracing::warn!(status = %response.status(), "Token request failed");
|
||||
return None;
|
||||
}
|
||||
|
||||
|
||||
@@ -59,9 +59,15 @@ impl Storage {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_s3(s3_url: &str, bucket: &str) -> Self {
|
||||
pub fn new_s3(
|
||||
s3_url: &str,
|
||||
bucket: &str,
|
||||
region: &str,
|
||||
access_key: Option<&str>,
|
||||
secret_key: Option<&str>,
|
||||
) -> Self {
|
||||
Self {
|
||||
inner: Arc::new(S3Storage::new(s3_url, bucket)),
|
||||
inner: Arc::new(S3Storage::new(s3_url, bucket, region, access_key, secret_key)),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,24 +1,142 @@
|
||||
use async_trait::async_trait;
|
||||
use axum::body::Bytes;
|
||||
use chrono::Utc;
|
||||
use hmac::{Hmac, Mac};
|
||||
use sha2::{Digest, Sha256};
|
||||
|
||||
use super::{FileMeta, Result, StorageBackend, StorageError};
|
||||
|
||||
type HmacSha256 = Hmac<Sha256>;
|
||||
|
||||
/// S3-compatible storage backend (MinIO, AWS S3)
|
||||
pub struct S3Storage {
|
||||
s3_url: String,
|
||||
bucket: String,
|
||||
region: String,
|
||||
access_key: Option<String>,
|
||||
secret_key: Option<String>,
|
||||
client: reqwest::Client,
|
||||
}
|
||||
|
||||
impl S3Storage {
|
||||
pub fn new(s3_url: &str, bucket: &str) -> Self {
|
||||
/// Create new S3 storage with optional credentials
|
||||
pub fn new(
|
||||
s3_url: &str,
|
||||
bucket: &str,
|
||||
region: &str,
|
||||
access_key: Option<&str>,
|
||||
secret_key: Option<&str>,
|
||||
) -> Self {
|
||||
Self {
|
||||
s3_url: s3_url.to_string(),
|
||||
s3_url: s3_url.trim_end_matches('/').to_string(),
|
||||
bucket: bucket.to_string(),
|
||||
region: region.to_string(),
|
||||
access_key: access_key.map(String::from),
|
||||
secret_key: secret_key.map(String::from),
|
||||
client: reqwest::Client::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Sign a request using AWS Signature v4
|
||||
fn sign_request(
|
||||
&self,
|
||||
method: &str,
|
||||
path: &str,
|
||||
payload_hash: &str,
|
||||
timestamp: &str,
|
||||
date: &str,
|
||||
) -> Option<String> {
|
||||
let (access_key, secret_key) = match (&self.access_key, &self.secret_key) {
|
||||
(Some(ak), Some(sk)) => (ak.as_str(), sk.as_str()),
|
||||
_ => return None,
|
||||
};
|
||||
|
||||
// Parse host from URL
|
||||
let host = self
|
||||
.s3_url
|
||||
.trim_start_matches("http://")
|
||||
.trim_start_matches("https://");
|
||||
|
||||
// Canonical request
|
||||
// URI must be URL-encoded (except /)
|
||||
let encoded_path = uri_encode(path);
|
||||
let canonical_uri = format!("/{}/{}", self.bucket, encoded_path);
|
||||
let canonical_query = "";
|
||||
let canonical_headers = format!(
|
||||
"host:{}\nx-amz-content-sha256:{}\nx-amz-date:{}\n",
|
||||
host, payload_hash, timestamp
|
||||
);
|
||||
let signed_headers = "host;x-amz-content-sha256;x-amz-date";
|
||||
|
||||
// AWS Signature v4 canonical request format:
|
||||
// HTTPMethod\nCanonicalURI\nCanonicalQueryString\nCanonicalHeaders\n\nSignedHeaders\nHashedPayload
|
||||
// Note: CanonicalHeaders already ends with \n, plus blank line before SignedHeaders
|
||||
let canonical_request = format!(
|
||||
"{}\n{}\n{}\n{}\n{}\n{}",
|
||||
method, canonical_uri, canonical_query, canonical_headers, signed_headers, payload_hash
|
||||
);
|
||||
|
||||
let canonical_request_hash = hex::encode(Sha256::digest(canonical_request.as_bytes()));
|
||||
|
||||
// String to sign
|
||||
let credential_scope = format!("{}/{}/s3/aws4_request", date, self.region);
|
||||
let string_to_sign = format!(
|
||||
"AWS4-HMAC-SHA256\n{}\n{}\n{}",
|
||||
timestamp, credential_scope, canonical_request_hash
|
||||
);
|
||||
|
||||
// Calculate signature
|
||||
let k_date = hmac_sha256(format!("AWS4{}", secret_key).as_bytes(), date.as_bytes());
|
||||
let k_region = hmac_sha256(&k_date, self.region.as_bytes());
|
||||
let k_service = hmac_sha256(&k_region, b"s3");
|
||||
let k_signing = hmac_sha256(&k_service, b"aws4_request");
|
||||
let signature = hex::encode(hmac_sha256(&k_signing, string_to_sign.as_bytes()));
|
||||
|
||||
// Authorization header
|
||||
Some(format!(
|
||||
"AWS4-HMAC-SHA256 Credential={}/{}, SignedHeaders={}, Signature={}",
|
||||
access_key, credential_scope, signed_headers, signature
|
||||
))
|
||||
}
|
||||
|
||||
/// Make a signed request
|
||||
async fn signed_request(
|
||||
&self,
|
||||
method: reqwest::Method,
|
||||
key: &str,
|
||||
body: Option<&[u8]>,
|
||||
) -> std::result::Result<reqwest::Response, StorageError> {
|
||||
let url = format!("{}/{}/{}", self.s3_url, self.bucket, key);
|
||||
let now = Utc::now();
|
||||
let timestamp = now.format("%Y%m%dT%H%M%SZ").to_string();
|
||||
let date = now.format("%Y%m%d").to_string();
|
||||
|
||||
let payload_hash = match body {
|
||||
Some(data) => hex::encode(Sha256::digest(data)),
|
||||
None => hex::encode(Sha256::digest(b"")),
|
||||
};
|
||||
|
||||
let mut request = self
|
||||
.client
|
||||
.request(method.clone(), &url)
|
||||
.header("x-amz-date", ×tamp)
|
||||
.header("x-amz-content-sha256", &payload_hash);
|
||||
|
||||
if let Some(auth) = self.sign_request(method.as_str(), key, &payload_hash, ×tamp, &date)
|
||||
{
|
||||
request = request.header("Authorization", auth);
|
||||
}
|
||||
|
||||
if let Some(data) = body {
|
||||
request = request.body(data.to_vec());
|
||||
}
|
||||
|
||||
request
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| StorageError::Network(e.to_string()))
|
||||
}
|
||||
|
||||
fn parse_s3_keys(xml: &str, prefix: &str) -> Vec<String> {
|
||||
xml.split("<Key>")
|
||||
.filter_map(|part| part.split("</Key>").next())
|
||||
@@ -28,17 +146,34 @@ impl S3Storage {
|
||||
}
|
||||
}
|
||||
|
||||
/// URL-encode a string for S3 canonical URI (encode all except A-Za-z0-9-_.~/)
|
||||
fn uri_encode(s: &str) -> String {
|
||||
let mut result = String::with_capacity(s.len() * 3);
|
||||
for c in s.chars() {
|
||||
match c {
|
||||
'A'..='Z' | 'a'..='z' | '0'..='9' | '-' | '_' | '.' | '~' | '/' => result.push(c),
|
||||
_ => {
|
||||
for b in c.to_string().as_bytes() {
|
||||
result.push_str(&format!("%{:02X}", b));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
fn hmac_sha256(key: &[u8], data: &[u8]) -> Vec<u8> {
|
||||
let mut mac = HmacSha256::new_from_slice(key).expect("HMAC can take key of any size");
|
||||
mac.update(data);
|
||||
mac.finalize().into_bytes().to_vec()
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl StorageBackend for S3Storage {
|
||||
async fn put(&self, key: &str, data: &[u8]) -> Result<()> {
|
||||
let url = format!("{}/{}/{}", self.s3_url, self.bucket, key);
|
||||
let response = self
|
||||
.client
|
||||
.put(&url)
|
||||
.body(data.to_vec())
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| StorageError::Network(e.to_string()))?;
|
||||
.signed_request(reqwest::Method::PUT, key, Some(data))
|
||||
.await?;
|
||||
|
||||
if response.status().is_success() {
|
||||
Ok(())
|
||||
@@ -51,13 +186,9 @@ impl StorageBackend for S3Storage {
|
||||
}
|
||||
|
||||
async fn get(&self, key: &str) -> Result<Bytes> {
|
||||
let url = format!("{}/{}/{}", self.s3_url, self.bucket, key);
|
||||
let response = self
|
||||
.client
|
||||
.get(&url)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| StorageError::Network(e.to_string()))?;
|
||||
.signed_request(reqwest::Method::GET, key, None)
|
||||
.await?;
|
||||
|
||||
if response.status().is_success() {
|
||||
response
|
||||
@@ -75,13 +206,9 @@ impl StorageBackend for S3Storage {
|
||||
}
|
||||
|
||||
async fn delete(&self, key: &str) -> Result<()> {
|
||||
let url = format!("{}/{}/{}", self.s3_url, self.bucket, key);
|
||||
let response = self
|
||||
.client
|
||||
.delete(&url)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| StorageError::Network(e.to_string()))?;
|
||||
.signed_request(reqwest::Method::DELETE, key, None)
|
||||
.await?;
|
||||
|
||||
if response.status().is_success() || response.status().as_u16() == 204 {
|
||||
Ok(())
|
||||
@@ -96,8 +223,59 @@ impl StorageBackend for S3Storage {
|
||||
}
|
||||
|
||||
async fn list(&self, prefix: &str) -> Vec<String> {
|
||||
// For listing, we need to make a request to the bucket
|
||||
let url = format!("{}/{}", self.s3_url, self.bucket);
|
||||
match self.client.get(&url).send().await {
|
||||
let now = Utc::now();
|
||||
let timestamp = now.format("%Y%m%dT%H%M%SZ").to_string();
|
||||
let date = now.format("%Y%m%d").to_string();
|
||||
let payload_hash = hex::encode(Sha256::digest(b""));
|
||||
|
||||
let host = self
|
||||
.s3_url
|
||||
.trim_start_matches("http://")
|
||||
.trim_start_matches("https://");
|
||||
|
||||
let mut request = self
|
||||
.client
|
||||
.get(&url)
|
||||
.header("x-amz-date", ×tamp)
|
||||
.header("x-amz-content-sha256", &payload_hash);
|
||||
|
||||
// Sign for bucket listing (different path)
|
||||
if let (Some(access_key), Some(secret_key)) = (&self.access_key, &self.secret_key) {
|
||||
let canonical_uri = format!("/{}", self.bucket);
|
||||
let canonical_headers = format!(
|
||||
"host:{}\nx-amz-content-sha256:{}\nx-amz-date:{}\n",
|
||||
host, payload_hash, timestamp
|
||||
);
|
||||
let signed_headers = "host;x-amz-content-sha256;x-amz-date";
|
||||
|
||||
let canonical_request = format!(
|
||||
"GET\n{}\n\n{}\n{}\n{}",
|
||||
canonical_uri, canonical_headers, signed_headers, payload_hash
|
||||
);
|
||||
|
||||
let canonical_request_hash = hex::encode(Sha256::digest(canonical_request.as_bytes()));
|
||||
let credential_scope = format!("{}/{}/s3/aws4_request", date, self.region);
|
||||
let string_to_sign = format!(
|
||||
"AWS4-HMAC-SHA256\n{}\n{}\n{}",
|
||||
timestamp, credential_scope, canonical_request_hash
|
||||
);
|
||||
|
||||
let k_date = hmac_sha256(format!("AWS4{}", secret_key).as_bytes(), date.as_bytes());
|
||||
let k_region = hmac_sha256(&k_date, self.region.as_bytes());
|
||||
let k_service = hmac_sha256(&k_region, b"s3");
|
||||
let k_signing = hmac_sha256(&k_service, b"aws4_request");
|
||||
let signature = hex::encode(hmac_sha256(&k_signing, string_to_sign.as_bytes()));
|
||||
|
||||
let auth = format!(
|
||||
"AWS4-HMAC-SHA256 Credential={}/{}, SignedHeaders={}, Signature={}",
|
||||
access_key, credential_scope, signed_headers, signature
|
||||
);
|
||||
request = request.header("Authorization", auth);
|
||||
}
|
||||
|
||||
match request.send().await {
|
||||
Ok(response) if response.status().is_success() => {
|
||||
if let Ok(xml) = response.text().await {
|
||||
Self::parse_s3_keys(&xml, prefix)
|
||||
@@ -110,18 +288,22 @@ impl StorageBackend for S3Storage {
|
||||
}
|
||||
|
||||
async fn stat(&self, key: &str) -> Option<FileMeta> {
|
||||
let url = format!("{}/{}/{}", self.s3_url, self.bucket, key);
|
||||
let response = self.client.head(&url).send().await.ok()?;
|
||||
let response = self
|
||||
.signed_request(reqwest::Method::HEAD, key, None)
|
||||
.await
|
||||
.ok()?;
|
||||
|
||||
if !response.status().is_success() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let size = response
|
||||
.headers()
|
||||
.get("content-length")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.and_then(|v| v.parse().ok())
|
||||
.unwrap_or(0);
|
||||
// S3 uses Last-Modified header, but for simplicity use current time if unavailable
|
||||
|
||||
let modified = response
|
||||
.headers()
|
||||
.get("last-modified")
|
||||
@@ -133,12 +315,63 @@ impl StorageBackend for S3Storage {
|
||||
.as_secs()
|
||||
})
|
||||
.unwrap_or(0);
|
||||
|
||||
Some(FileMeta { size, modified })
|
||||
}
|
||||
|
||||
async fn health_check(&self) -> bool {
|
||||
// Try HEAD on the bucket
|
||||
let url = format!("{}/{}", self.s3_url, self.bucket);
|
||||
match self.client.head(&url).send().await {
|
||||
let now = Utc::now();
|
||||
let timestamp = now.format("%Y%m%dT%H%M%SZ").to_string();
|
||||
let date = now.format("%Y%m%d").to_string();
|
||||
let payload_hash = hex::encode(Sha256::digest(b""));
|
||||
|
||||
let host = self
|
||||
.s3_url
|
||||
.trim_start_matches("http://")
|
||||
.trim_start_matches("https://");
|
||||
|
||||
let mut request = self
|
||||
.client
|
||||
.head(&url)
|
||||
.header("x-amz-date", ×tamp)
|
||||
.header("x-amz-content-sha256", &payload_hash);
|
||||
|
||||
if let (Some(access_key), Some(secret_key)) = (&self.access_key, &self.secret_key) {
|
||||
let canonical_uri = format!("/{}", self.bucket);
|
||||
let canonical_headers = format!(
|
||||
"host:{}\nx-amz-content-sha256:{}\nx-amz-date:{}\n",
|
||||
host, payload_hash, timestamp
|
||||
);
|
||||
let signed_headers = "host;x-amz-content-sha256;x-amz-date";
|
||||
|
||||
let canonical_request = format!(
|
||||
"HEAD\n{}\n\n{}\n{}\n{}",
|
||||
canonical_uri, canonical_headers, signed_headers, payload_hash
|
||||
);
|
||||
|
||||
let canonical_request_hash = hex::encode(Sha256::digest(canonical_request.as_bytes()));
|
||||
let credential_scope = format!("{}/{}/s3/aws4_request", date, self.region);
|
||||
let string_to_sign = format!(
|
||||
"AWS4-HMAC-SHA256\n{}\n{}\n{}",
|
||||
timestamp, credential_scope, canonical_request_hash
|
||||
);
|
||||
|
||||
let k_date = hmac_sha256(format!("AWS4{}", secret_key).as_bytes(), date.as_bytes());
|
||||
let k_region = hmac_sha256(&k_date, self.region.as_bytes());
|
||||
let k_service = hmac_sha256(&k_region, b"s3");
|
||||
let k_signing = hmac_sha256(&k_service, b"aws4_request");
|
||||
let signature = hex::encode(hmac_sha256(&k_signing, string_to_sign.as_bytes()));
|
||||
|
||||
let auth = format!(
|
||||
"AWS4-HMAC-SHA256 Credential={}/{}, SignedHeaders={}, Signature={}",
|
||||
access_key, credential_scope, signed_headers, signature
|
||||
);
|
||||
request = request.header("Authorization", auth);
|
||||
}
|
||||
|
||||
match request.send().await {
|
||||
Ok(response) => response.status().is_success() || response.status().as_u16() == 404,
|
||||
Err(_) => false,
|
||||
}
|
||||
@@ -152,173 +385,28 @@ impl StorageBackend for S3Storage {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use wiremock::matchers::{method, path};
|
||||
use wiremock::{Mock, MockServer, ResponseTemplate};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_put_success() {
|
||||
let mock_server = MockServer::start().await;
|
||||
let storage = S3Storage::new(&mock_server.uri(), "test-bucket");
|
||||
|
||||
Mock::given(method("PUT"))
|
||||
.and(path("/test-bucket/test-key"))
|
||||
.respond_with(ResponseTemplate::new(200))
|
||||
.mount(&mock_server)
|
||||
.await;
|
||||
|
||||
let result = storage.put("test-key", b"data").await;
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_put_failure() {
|
||||
let mock_server = MockServer::start().await;
|
||||
let storage = S3Storage::new(&mock_server.uri(), "test-bucket");
|
||||
|
||||
Mock::given(method("PUT"))
|
||||
.and(path("/test-bucket/test-key"))
|
||||
.respond_with(ResponseTemplate::new(500))
|
||||
.mount(&mock_server)
|
||||
.await;
|
||||
|
||||
let result = storage.put("test-key", b"data").await;
|
||||
assert!(matches!(result, Err(StorageError::Network(_))));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_success() {
|
||||
let mock_server = MockServer::start().await;
|
||||
let storage = S3Storage::new(&mock_server.uri(), "test-bucket");
|
||||
|
||||
Mock::given(method("GET"))
|
||||
.and(path("/test-bucket/test-key"))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_bytes(b"test data".to_vec()))
|
||||
.mount(&mock_server)
|
||||
.await;
|
||||
|
||||
let data = storage.get("test-key").await.unwrap();
|
||||
assert_eq!(&*data, b"test data");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_not_found() {
|
||||
let mock_server = MockServer::start().await;
|
||||
let storage = S3Storage::new(&mock_server.uri(), "test-bucket");
|
||||
|
||||
Mock::given(method("GET"))
|
||||
.and(path("/test-bucket/missing"))
|
||||
.respond_with(ResponseTemplate::new(404))
|
||||
.mount(&mock_server)
|
||||
.await;
|
||||
|
||||
let result = storage.get("missing").await;
|
||||
assert!(matches!(result, Err(StorageError::NotFound)));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_list() {
|
||||
let mock_server = MockServer::start().await;
|
||||
let storage = S3Storage::new(&mock_server.uri(), "test-bucket");
|
||||
|
||||
let xml_response = r#"<?xml version="1.0"?>
|
||||
<ListBucketResult>
|
||||
<Key>docker/image1</Key>
|
||||
<Key>docker/image2</Key>
|
||||
<Key>maven/artifact</Key>
|
||||
</ListBucketResult>"#;
|
||||
|
||||
Mock::given(method("GET"))
|
||||
.and(path("/test-bucket"))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_string(xml_response))
|
||||
.mount(&mock_server)
|
||||
.await;
|
||||
|
||||
let keys = storage.list("docker/").await;
|
||||
assert_eq!(keys.len(), 2);
|
||||
assert!(keys.iter().all(|k| k.starts_with("docker/")));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_stat_success() {
|
||||
let mock_server = MockServer::start().await;
|
||||
let storage = S3Storage::new(&mock_server.uri(), "test-bucket");
|
||||
|
||||
Mock::given(method("HEAD"))
|
||||
.and(path("/test-bucket/test-key"))
|
||||
.respond_with(
|
||||
ResponseTemplate::new(200)
|
||||
.insert_header("content-length", "1234")
|
||||
.insert_header("last-modified", "Sun, 06 Nov 1994 08:49:37 GMT"),
|
||||
)
|
||||
.mount(&mock_server)
|
||||
.await;
|
||||
|
||||
let meta = storage.stat("test-key").await.unwrap();
|
||||
assert_eq!(meta.size, 1234);
|
||||
assert!(meta.modified > 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_stat_not_found() {
|
||||
let mock_server = MockServer::start().await;
|
||||
let storage = S3Storage::new(&mock_server.uri(), "test-bucket");
|
||||
|
||||
Mock::given(method("HEAD"))
|
||||
.and(path("/test-bucket/missing"))
|
||||
.respond_with(ResponseTemplate::new(404))
|
||||
.mount(&mock_server)
|
||||
.await;
|
||||
|
||||
let meta = storage.stat("missing").await;
|
||||
assert!(meta.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_health_check_healthy() {
|
||||
let mock_server = MockServer::start().await;
|
||||
let storage = S3Storage::new(&mock_server.uri(), "test-bucket");
|
||||
|
||||
Mock::given(method("HEAD"))
|
||||
.and(path("/test-bucket"))
|
||||
.respond_with(ResponseTemplate::new(200))
|
||||
.mount(&mock_server)
|
||||
.await;
|
||||
|
||||
assert!(storage.health_check().await);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_health_check_bucket_not_found_is_ok() {
|
||||
let mock_server = MockServer::start().await;
|
||||
let storage = S3Storage::new(&mock_server.uri(), "test-bucket");
|
||||
|
||||
Mock::given(method("HEAD"))
|
||||
.and(path("/test-bucket"))
|
||||
.respond_with(ResponseTemplate::new(404))
|
||||
.mount(&mock_server)
|
||||
.await;
|
||||
|
||||
// 404 is OK for health check (bucket may be empty)
|
||||
assert!(storage.health_check().await);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_health_check_server_error() {
|
||||
let mock_server = MockServer::start().await;
|
||||
let storage = S3Storage::new(&mock_server.uri(), "test-bucket");
|
||||
|
||||
Mock::given(method("HEAD"))
|
||||
.and(path("/test-bucket"))
|
||||
.respond_with(ResponseTemplate::new(500))
|
||||
.mount(&mock_server)
|
||||
.await;
|
||||
|
||||
assert!(!storage.health_check().await);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_backend_name() {
|
||||
let storage = S3Storage::new("http://localhost:9000", "bucket");
|
||||
let storage = S3Storage::new(
|
||||
"http://localhost:9000",
|
||||
"test-bucket",
|
||||
"us-east-1",
|
||||
Some("access"),
|
||||
Some("secret"),
|
||||
);
|
||||
assert_eq!(storage.backend_name(), "s3");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_s3_storage_creation_anonymous() {
|
||||
let storage = S3Storage::new(
|
||||
"http://localhost:9000",
|
||||
"test-bucket",
|
||||
"us-east-1",
|
||||
None,
|
||||
None,
|
||||
);
|
||||
assert_eq!(storage.backend_name(), "s3");
|
||||
}
|
||||
|
||||
@@ -328,4 +416,10 @@ mod tests {
|
||||
let keys = S3Storage::parse_s3_keys(xml, "docker/");
|
||||
assert_eq!(keys, vec!["docker/a", "docker/b"]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_hmac_sha256() {
|
||||
let result = hmac_sha256(b"key", b"data");
|
||||
assert!(!result.is_empty());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user