mirror of
https://github.com/getnora-io/nora.git
synced 2026-04-12 05:40:31 +00:00
fix: proxy dedup, multi-registry GC, TOCTOU and credential hygiene (#83)
- Deduplicate proxy_fetch/proxy_fetch_text into generic proxy_fetch_core with response extractor closure (removes ~50 lines of copy-paste) - GC now scans all registry prefixes, not just docker/ - Add tracing::warn to fire-and-forget cache writes in docker proxy - Mark S3 credentials as skip_serializing to prevent accidental leaks - Remove TOCTOU race in LocalStorage get/delete (redundant exists check)
This commit is contained in:
@@ -72,10 +72,10 @@ pub struct StorageConfig {
|
||||
#[serde(default = "default_bucket")]
|
||||
pub bucket: String,
|
||||
/// S3 access key (optional, uses anonymous access if not set)
|
||||
#[serde(default)]
|
||||
#[serde(default, skip_serializing)]
|
||||
pub s3_access_key: Option<String>,
|
||||
/// S3 secret key (optional, uses anonymous access if not set)
|
||||
#[serde(default)]
|
||||
#[serde(default, skip_serializing)]
|
||||
pub s3_secret_key: Option<String>,
|
||||
/// S3 region (default: us-east-1)
|
||||
#[serde(default = "default_s3_region")]
|
||||
|
||||
@@ -72,10 +72,15 @@ pub async fn run_gc(storage: &Storage, dry_run: bool) -> GcResult {
|
||||
|
||||
async fn collect_all_blobs(storage: &Storage) -> Vec<String> {
|
||||
let mut blobs = Vec::new();
|
||||
let docker_blobs = storage.list("docker/").await;
|
||||
for key in docker_blobs {
|
||||
if key.contains("/blobs/") {
|
||||
blobs.push(key);
|
||||
// Collect blobs from all registry types, not just Docker
|
||||
for prefix in &[
|
||||
"docker/", "maven/", "npm/", "cargo/", "pypi/", "raw/", "go/",
|
||||
] {
|
||||
let keys = storage.list(prefix).await;
|
||||
for key in keys {
|
||||
if key.contains("/blobs/") || key.contains("/tarballs/") {
|
||||
blobs.push(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
blobs
|
||||
|
||||
@@ -284,7 +284,9 @@ async fn download_blob(
|
||||
let key_clone = key.clone();
|
||||
let data_clone = data.clone();
|
||||
tokio::spawn(async move {
|
||||
let _ = storage.put(&key_clone, &data_clone).await;
|
||||
if let Err(e) = storage.put(&key_clone, &data_clone).await {
|
||||
tracing::warn!(key = %key_clone, error = %e, "Failed to cache blob in storage");
|
||||
}
|
||||
});
|
||||
|
||||
return (
|
||||
@@ -687,7 +689,9 @@ async fn get_manifest(
|
||||
let key_clone = key.clone();
|
||||
let data_clone = data.clone();
|
||||
tokio::spawn(async move {
|
||||
let _ = storage.put(&key_clone, &data_clone).await;
|
||||
if let Err(e) = storage.put(&key_clone, &data_clone).await {
|
||||
tracing::warn!(key = %key_clone, error = %e, "Failed to cache blob in storage");
|
||||
}
|
||||
});
|
||||
|
||||
state.repo_index.invalidate("docker");
|
||||
|
||||
@@ -22,57 +22,6 @@ pub use raw::routes as raw_routes;
|
||||
use crate::config::basic_auth_header;
|
||||
use std::time::Duration;
|
||||
|
||||
/// Fetch from upstream proxy with timeout and 1 retry.
|
||||
///
|
||||
/// On transient errors (timeout, connection reset), retries once after a short delay.
|
||||
/// Non-retryable errors (4xx) fail immediately.
|
||||
pub(crate) async fn proxy_fetch(
|
||||
client: &reqwest::Client,
|
||||
url: &str,
|
||||
timeout_secs: u64,
|
||||
auth: Option<&str>,
|
||||
) -> Result<Vec<u8>, ProxyError> {
|
||||
for attempt in 0..2 {
|
||||
let mut request = client.get(url).timeout(Duration::from_secs(timeout_secs));
|
||||
if let Some(credentials) = auth {
|
||||
request = request.header("Authorization", basic_auth_header(credentials));
|
||||
}
|
||||
|
||||
match request.send().await {
|
||||
Ok(response) => {
|
||||
if response.status().is_success() {
|
||||
return response
|
||||
.bytes()
|
||||
.await
|
||||
.map(|b| b.to_vec())
|
||||
.map_err(|e| ProxyError::Network(e.to_string()));
|
||||
}
|
||||
let status = response.status().as_u16();
|
||||
// Don't retry client errors (4xx)
|
||||
if (400..500).contains(&status) {
|
||||
return Err(ProxyError::NotFound);
|
||||
}
|
||||
// Server error (5xx) — retry
|
||||
if attempt == 0 {
|
||||
tracing::debug!(url, status, "upstream 5xx, retrying in 1s");
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
continue;
|
||||
}
|
||||
return Err(ProxyError::Upstream(status));
|
||||
}
|
||||
Err(e) => {
|
||||
if attempt == 0 {
|
||||
tracing::debug!(url, error = %e, "upstream error, retrying in 1s");
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
continue;
|
||||
}
|
||||
return Err(ProxyError::Network(e.to_string()));
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(ProxyError::Network("max retries exceeded".into()))
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[allow(dead_code)]
|
||||
pub(crate) enum ProxyError {
|
||||
@@ -81,15 +30,19 @@ pub(crate) enum ProxyError {
|
||||
Network(String),
|
||||
}
|
||||
|
||||
/// Fetch text content from upstream proxy with timeout and 1 retry.
|
||||
/// Same as proxy_fetch but returns String (for HTML pages like PyPI simple index).
|
||||
pub(crate) async fn proxy_fetch_text(
|
||||
/// Core fetch logic with retry. Callers provide a response extractor.
|
||||
async fn proxy_fetch_core<T, F, Fut>(
|
||||
client: &reqwest::Client,
|
||||
url: &str,
|
||||
timeout_secs: u64,
|
||||
auth: Option<&str>,
|
||||
extra_headers: Option<(&str, &str)>,
|
||||
) -> Result<String, ProxyError> {
|
||||
extract: F,
|
||||
) -> Result<T, ProxyError>
|
||||
where
|
||||
F: Fn(reqwest::Response) -> Fut + Copy,
|
||||
Fut: std::future::Future<Output = Result<T, reqwest::Error>>,
|
||||
{
|
||||
for attempt in 0..2 {
|
||||
let mut request = client.get(url).timeout(Duration::from_secs(timeout_secs));
|
||||
if let Some(credentials) = auth {
|
||||
@@ -102,8 +55,7 @@ pub(crate) async fn proxy_fetch_text(
|
||||
match request.send().await {
|
||||
Ok(response) => {
|
||||
if response.status().is_success() {
|
||||
return response
|
||||
.text()
|
||||
return extract(response)
|
||||
.await
|
||||
.map_err(|e| ProxyError::Network(e.to_string()));
|
||||
}
|
||||
@@ -131,6 +83,30 @@ pub(crate) async fn proxy_fetch_text(
|
||||
Err(ProxyError::Network("max retries exceeded".into()))
|
||||
}
|
||||
|
||||
/// Fetch binary content from upstream proxy with timeout and 1 retry.
|
||||
pub(crate) async fn proxy_fetch(
|
||||
client: &reqwest::Client,
|
||||
url: &str,
|
||||
timeout_secs: u64,
|
||||
auth: Option<&str>,
|
||||
) -> Result<Vec<u8>, ProxyError> {
|
||||
proxy_fetch_core(client, url, timeout_secs, auth, None, |r| async {
|
||||
r.bytes().await.map(|b| b.to_vec())
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Fetch text content from upstream proxy with timeout and 1 retry.
|
||||
pub(crate) async fn proxy_fetch_text(
|
||||
client: &reqwest::Client,
|
||||
url: &str,
|
||||
timeout_secs: u64,
|
||||
auth: Option<&str>,
|
||||
extra_headers: Option<(&str, &str)>,
|
||||
) -> Result<String, ProxyError> {
|
||||
proxy_fetch_core(client, url, timeout_secs, auth, extra_headers, |r| r.text()).await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -68,10 +68,6 @@ impl StorageBackend for LocalStorage {
|
||||
async fn get(&self, key: &str) -> Result<Bytes> {
|
||||
let path = self.key_to_path(key);
|
||||
|
||||
if !path.exists() {
|
||||
return Err(StorageError::NotFound);
|
||||
}
|
||||
|
||||
let mut file = fs::File::open(&path).await.map_err(|e| {
|
||||
if e.kind() == std::io::ErrorKind::NotFound {
|
||||
StorageError::NotFound
|
||||
@@ -91,13 +87,13 @@ impl StorageBackend for LocalStorage {
|
||||
async fn delete(&self, key: &str) -> Result<()> {
|
||||
let path = self.key_to_path(key);
|
||||
|
||||
if !path.exists() {
|
||||
return Err(StorageError::NotFound);
|
||||
}
|
||||
|
||||
fs::remove_file(&path)
|
||||
.await
|
||||
.map_err(|e| StorageError::Io(e.to_string()))?;
|
||||
fs::remove_file(&path).await.map_err(|e| {
|
||||
if e.kind() == std::io::ErrorKind::NotFound {
|
||||
StorageError::NotFound
|
||||
} else {
|
||||
StorageError::Io(e.to_string())
|
||||
}
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user