mirror of
https://github.com/getnora-io/nora.git
synced 2026-04-12 10:20:32 +00:00
refactor: use shared reqwest::Client across all registry handlers
Add http_client field to AppState, initialized once at startup. Replace per-request Client::builder() calls in npm, maven, pypi, and docker registry handlers with the shared instance. This reuses the connection pool across requests instead of creating a new client on every proxy fetch. Bump version to 0.2.20.
This commit is contained in:
6
Cargo.lock
generated
6
Cargo.lock
generated
@@ -1201,7 +1201,7 @@ checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "nora-cli"
|
name = "nora-cli"
|
||||||
version = "0.2.18"
|
version = "0.2.20"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"clap",
|
"clap",
|
||||||
"flate2",
|
"flate2",
|
||||||
@@ -1215,7 +1215,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "nora-registry"
|
name = "nora-registry"
|
||||||
version = "0.2.18"
|
version = "0.2.20"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"axum",
|
"axum",
|
||||||
@@ -1253,7 +1253,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "nora-storage"
|
name = "nora-storage"
|
||||||
version = "0.2.18"
|
version = "0.2.20"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"axum",
|
"axum",
|
||||||
"base64",
|
"base64",
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ members = [
|
|||||||
]
|
]
|
||||||
|
|
||||||
[workspace.package]
|
[workspace.package]
|
||||||
version = "0.2.19"
|
version = "0.2.20"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
license = "MIT"
|
license = "MIT"
|
||||||
authors = ["DevITWay <devitway@gmail.com>"]
|
authors = ["DevITWay <devitway@gmail.com>"]
|
||||||
|
|||||||
@@ -85,6 +85,7 @@ pub struct AppState {
|
|||||||
pub activity: ActivityLog,
|
pub activity: ActivityLog,
|
||||||
pub docker_auth: registry::DockerAuth,
|
pub docker_auth: registry::DockerAuth,
|
||||||
pub repo_index: RepoIndex,
|
pub repo_index: RepoIndex,
|
||||||
|
pub http_client: reqwest::Client,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
@@ -271,6 +272,8 @@ async fn run_server(config: Config, storage: Storage) {
|
|||||||
// Initialize Docker auth with proxy timeout
|
// Initialize Docker auth with proxy timeout
|
||||||
let docker_auth = registry::DockerAuth::new(config.docker.proxy_timeout);
|
let docker_auth = registry::DockerAuth::new(config.docker.proxy_timeout);
|
||||||
|
|
||||||
|
let http_client = reqwest::Client::new();
|
||||||
|
|
||||||
let state = Arc::new(AppState {
|
let state = Arc::new(AppState {
|
||||||
storage,
|
storage,
|
||||||
config,
|
config,
|
||||||
@@ -281,6 +284,7 @@ async fn run_server(config: Config, storage: Storage) {
|
|||||||
activity: ActivityLog::new(50),
|
activity: ActivityLog::new(50),
|
||||||
docker_auth,
|
docker_auth,
|
||||||
repo_index: RepoIndex::new(),
|
repo_index: RepoIndex::new(),
|
||||||
|
http_client,
|
||||||
});
|
});
|
||||||
|
|
||||||
// Token routes with strict rate limiting (brute-force protection)
|
// Token routes with strict rate limiting (brute-force protection)
|
||||||
|
|||||||
@@ -167,6 +167,7 @@ async fn download_blob(
|
|||||||
// Try upstream proxies
|
// Try upstream proxies
|
||||||
for upstream in &state.config.docker.upstreams {
|
for upstream in &state.config.docker.upstreams {
|
||||||
if let Ok(data) = fetch_blob_from_upstream(
|
if let Ok(data) = fetch_blob_from_upstream(
|
||||||
|
&state.http_client,
|
||||||
&upstream.url,
|
&upstream.url,
|
||||||
&name,
|
&name,
|
||||||
&digest,
|
&digest,
|
||||||
@@ -367,6 +368,7 @@ async fn get_manifest(
|
|||||||
for upstream in &state.config.docker.upstreams {
|
for upstream in &state.config.docker.upstreams {
|
||||||
tracing::debug!(upstream_url = %upstream.url, "Trying upstream");
|
tracing::debug!(upstream_url = %upstream.url, "Trying upstream");
|
||||||
if let Ok((data, content_type)) = fetch_manifest_from_upstream(
|
if let Ok((data, content_type)) = fetch_manifest_from_upstream(
|
||||||
|
&state.http_client,
|
||||||
&upstream.url,
|
&upstream.url,
|
||||||
&name,
|
&name,
|
||||||
&reference,
|
&reference,
|
||||||
@@ -581,6 +583,7 @@ async fn list_tags_ns(
|
|||||||
|
|
||||||
/// Fetch a blob from an upstream Docker registry
|
/// Fetch a blob from an upstream Docker registry
|
||||||
async fn fetch_blob_from_upstream(
|
async fn fetch_blob_from_upstream(
|
||||||
|
client: &reqwest::Client,
|
||||||
upstream_url: &str,
|
upstream_url: &str,
|
||||||
name: &str,
|
name: &str,
|
||||||
digest: &str,
|
digest: &str,
|
||||||
@@ -594,13 +597,13 @@ async fn fetch_blob_from_upstream(
|
|||||||
digest
|
digest
|
||||||
);
|
);
|
||||||
|
|
||||||
let client = reqwest::Client::builder()
|
|
||||||
.timeout(Duration::from_secs(timeout))
|
|
||||||
.build()
|
|
||||||
.map_err(|_| ())?;
|
|
||||||
|
|
||||||
// First try without auth
|
// First try without auth
|
||||||
let response = client.get(&url).send().await.map_err(|_| ())?;
|
let response = client
|
||||||
|
.get(&url)
|
||||||
|
.timeout(Duration::from_secs(timeout))
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.map_err(|_| ())?;
|
||||||
|
|
||||||
let response = if response.status() == reqwest::StatusCode::UNAUTHORIZED {
|
let response = if response.status() == reqwest::StatusCode::UNAUTHORIZED {
|
||||||
// Get Www-Authenticate header and fetch token
|
// Get Www-Authenticate header and fetch token
|
||||||
@@ -637,6 +640,7 @@ async fn fetch_blob_from_upstream(
|
|||||||
/// Fetch a manifest from an upstream Docker registry
|
/// Fetch a manifest from an upstream Docker registry
|
||||||
/// Returns (manifest_bytes, content_type)
|
/// Returns (manifest_bytes, content_type)
|
||||||
async fn fetch_manifest_from_upstream(
|
async fn fetch_manifest_from_upstream(
|
||||||
|
client: &reqwest::Client,
|
||||||
upstream_url: &str,
|
upstream_url: &str,
|
||||||
name: &str,
|
name: &str,
|
||||||
reference: &str,
|
reference: &str,
|
||||||
@@ -652,13 +656,6 @@ async fn fetch_manifest_from_upstream(
|
|||||||
|
|
||||||
tracing::debug!(url = %url, "Fetching manifest from upstream");
|
tracing::debug!(url = %url, "Fetching manifest from upstream");
|
||||||
|
|
||||||
let client = reqwest::Client::builder()
|
|
||||||
.timeout(Duration::from_secs(timeout))
|
|
||||||
.build()
|
|
||||||
.map_err(|e| {
|
|
||||||
tracing::error!(error = %e, "Failed to build HTTP client");
|
|
||||||
})?;
|
|
||||||
|
|
||||||
// Request with Accept header for manifest types
|
// Request with Accept header for manifest types
|
||||||
let accept_header = "application/vnd.docker.distribution.manifest.v2+json, \
|
let accept_header = "application/vnd.docker.distribution.manifest.v2+json, \
|
||||||
application/vnd.docker.distribution.manifest.list.v2+json, \
|
application/vnd.docker.distribution.manifest.list.v2+json, \
|
||||||
@@ -668,6 +665,7 @@ async fn fetch_manifest_from_upstream(
|
|||||||
// First try without auth
|
// First try without auth
|
||||||
let response = client
|
let response = client
|
||||||
.get(&url)
|
.get(&url)
|
||||||
|
.timeout(Duration::from_secs(timeout))
|
||||||
.header("Accept", accept_header)
|
.header("Accept", accept_header)
|
||||||
.send()
|
.send()
|
||||||
.await
|
.await
|
||||||
|
|||||||
@@ -23,7 +23,6 @@ pub fn routes() -> Router<Arc<AppState>> {
|
|||||||
async fn download(State(state): State<Arc<AppState>>, Path(path): Path<String>) -> Response {
|
async fn download(State(state): State<Arc<AppState>>, Path(path): Path<String>) -> Response {
|
||||||
let key = format!("maven/{}", path);
|
let key = format!("maven/{}", path);
|
||||||
|
|
||||||
// Extract artifact name for logging (last 2-3 path components)
|
|
||||||
let artifact_name = path
|
let artifact_name = path
|
||||||
.split('/')
|
.split('/')
|
||||||
.rev()
|
.rev()
|
||||||
@@ -34,7 +33,6 @@ async fn download(State(state): State<Arc<AppState>>, Path(path): Path<String>)
|
|||||||
.collect::<Vec<_>>()
|
.collect::<Vec<_>>()
|
||||||
.join("/");
|
.join("/");
|
||||||
|
|
||||||
// Try local storage first
|
|
||||||
if let Ok(data) = state.storage.get(&key).await {
|
if let Ok(data) = state.storage.get(&key).await {
|
||||||
state.metrics.record_download("maven");
|
state.metrics.record_download("maven");
|
||||||
state.metrics.record_cache_hit();
|
state.metrics.record_cache_hit();
|
||||||
@@ -47,11 +45,10 @@ async fn download(State(state): State<Arc<AppState>>, Path(path): Path<String>)
|
|||||||
return with_content_type(&path, data).into_response();
|
return with_content_type(&path, data).into_response();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try proxy servers
|
|
||||||
for proxy_url in &state.config.maven.proxies {
|
for proxy_url in &state.config.maven.proxies {
|
||||||
let url = format!("{}/{}", proxy_url.trim_end_matches('/'), path);
|
let url = format!("{}/{}", proxy_url.trim_end_matches('/'), path);
|
||||||
|
|
||||||
match fetch_from_proxy(&url, state.config.maven.proxy_timeout).await {
|
match fetch_from_proxy(&state.http_client, &url, state.config.maven.proxy_timeout).await {
|
||||||
Ok(data) => {
|
Ok(data) => {
|
||||||
state.metrics.record_download("maven");
|
state.metrics.record_download("maven");
|
||||||
state.metrics.record_cache_miss();
|
state.metrics.record_cache_miss();
|
||||||
@@ -62,7 +59,6 @@ async fn download(State(state): State<Arc<AppState>>, Path(path): Path<String>)
|
|||||||
"PROXY",
|
"PROXY",
|
||||||
));
|
));
|
||||||
|
|
||||||
// Cache in local storage (fire and forget)
|
|
||||||
let storage = state.storage.clone();
|
let storage = state.storage.clone();
|
||||||
let key_clone = key.clone();
|
let key_clone = key.clone();
|
||||||
let data_clone = data.clone();
|
let data_clone = data.clone();
|
||||||
@@ -88,7 +84,6 @@ async fn upload(
|
|||||||
) -> StatusCode {
|
) -> StatusCode {
|
||||||
let key = format!("maven/{}", path);
|
let key = format!("maven/{}", path);
|
||||||
|
|
||||||
// Extract artifact name for logging
|
|
||||||
let artifact_name = path
|
let artifact_name = path
|
||||||
.split('/')
|
.split('/')
|
||||||
.rev()
|
.rev()
|
||||||
@@ -115,14 +110,14 @@ async fn upload(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn fetch_from_proxy(url: &str, timeout_secs: u64) -> Result<Vec<u8>, ()> {
|
async fn fetch_from_proxy(client: &reqwest::Client, url: &str, timeout_secs: u64) -> Result<Vec<u8>, ()> {
|
||||||
let client = reqwest::Client::builder()
|
let response = client
|
||||||
|
.get(url)
|
||||||
.timeout(Duration::from_secs(timeout_secs))
|
.timeout(Duration::from_secs(timeout_secs))
|
||||||
.build()
|
.send()
|
||||||
|
.await
|
||||||
.map_err(|_| ())?;
|
.map_err(|_| ())?;
|
||||||
|
|
||||||
let response = client.get(url).send().await.map_err(|_| ())?;
|
|
||||||
|
|
||||||
if !response.status().is_success() {
|
if !response.status().is_success() {
|
||||||
return Err(());
|
return Err(());
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,7 +19,6 @@ pub fn routes() -> Router<Arc<AppState>> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_request(State(state): State<Arc<AppState>>, Path(path): Path<String>) -> Response {
|
async fn handle_request(State(state): State<Arc<AppState>>, Path(path): Path<String>) -> Response {
|
||||||
// Determine if this is a tarball request or metadata request
|
|
||||||
let is_tarball = path.contains("/-/");
|
let is_tarball = path.contains("/-/");
|
||||||
|
|
||||||
let key = if is_tarball {
|
let key = if is_tarball {
|
||||||
@@ -33,14 +32,12 @@ async fn handle_request(State(state): State<Arc<AppState>>, Path(path): Path<Str
|
|||||||
format!("npm/{}/metadata.json", path)
|
format!("npm/{}/metadata.json", path)
|
||||||
};
|
};
|
||||||
|
|
||||||
// Extract package name for logging
|
|
||||||
let package_name = if is_tarball {
|
let package_name = if is_tarball {
|
||||||
path.split("/-/").next().unwrap_or(&path).to_string()
|
path.split("/-/").next().unwrap_or(&path).to_string()
|
||||||
} else {
|
} else {
|
||||||
path.clone()
|
path.clone()
|
||||||
};
|
};
|
||||||
|
|
||||||
// Try local storage first
|
|
||||||
if let Ok(data) = state.storage.get(&key).await {
|
if let Ok(data) = state.storage.get(&key).await {
|
||||||
if is_tarball {
|
if is_tarball {
|
||||||
state.metrics.record_download("npm");
|
state.metrics.record_download("npm");
|
||||||
@@ -55,17 +52,10 @@ async fn handle_request(State(state): State<Arc<AppState>>, Path(path): Path<Str
|
|||||||
return with_content_type(is_tarball, data).into_response();
|
return with_content_type(is_tarball, data).into_response();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try proxy if configured
|
|
||||||
if let Some(proxy_url) = &state.config.npm.proxy {
|
if let Some(proxy_url) = &state.config.npm.proxy {
|
||||||
let url = if is_tarball {
|
let url = format!("{}/{}", proxy_url.trim_end_matches('/'), path);
|
||||||
// Tarball URL: https://registry.npmjs.org/package/-/package-version.tgz
|
|
||||||
format!("{}/{}", proxy_url.trim_end_matches('/'), path)
|
|
||||||
} else {
|
|
||||||
// Metadata URL: https://registry.npmjs.org/package
|
|
||||||
format!("{}/{}", proxy_url.trim_end_matches('/'), path)
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Ok(data) = fetch_from_proxy(&url, state.config.npm.proxy_timeout).await {
|
if let Ok(data) = fetch_from_proxy(&state.http_client, &url, state.config.npm.proxy_timeout).await {
|
||||||
if is_tarball {
|
if is_tarball {
|
||||||
state.metrics.record_download("npm");
|
state.metrics.record_download("npm");
|
||||||
state.metrics.record_cache_miss();
|
state.metrics.record_cache_miss();
|
||||||
@@ -77,7 +67,6 @@ async fn handle_request(State(state): State<Arc<AppState>>, Path(path): Path<Str
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cache in local storage (fire and forget)
|
|
||||||
let storage = state.storage.clone();
|
let storage = state.storage.clone();
|
||||||
let key_clone = key.clone();
|
let key_clone = key.clone();
|
||||||
let data_clone = data.clone();
|
let data_clone = data.clone();
|
||||||
@@ -85,7 +74,6 @@ async fn handle_request(State(state): State<Arc<AppState>>, Path(path): Path<Str
|
|||||||
let _ = storage.put(&key_clone, &data_clone).await;
|
let _ = storage.put(&key_clone, &data_clone).await;
|
||||||
});
|
});
|
||||||
|
|
||||||
// Invalidate index when caching new tarball
|
|
||||||
if is_tarball {
|
if is_tarball {
|
||||||
state.repo_index.invalidate("npm");
|
state.repo_index.invalidate("npm");
|
||||||
}
|
}
|
||||||
@@ -97,14 +85,14 @@ async fn handle_request(State(state): State<Arc<AppState>>, Path(path): Path<Str
|
|||||||
StatusCode::NOT_FOUND.into_response()
|
StatusCode::NOT_FOUND.into_response()
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn fetch_from_proxy(url: &str, timeout_secs: u64) -> Result<Vec<u8>, ()> {
|
async fn fetch_from_proxy(client: &reqwest::Client, url: &str, timeout_secs: u64) -> Result<Vec<u8>, ()> {
|
||||||
let client = reqwest::Client::builder()
|
let response = client
|
||||||
|
.get(url)
|
||||||
.timeout(Duration::from_secs(timeout_secs))
|
.timeout(Duration::from_secs(timeout_secs))
|
||||||
.build()
|
.send()
|
||||||
|
.await
|
||||||
.map_err(|_| ())?;
|
.map_err(|_| ())?;
|
||||||
|
|
||||||
let response = client.get(url).send().await.map_err(|_| ())?;
|
|
||||||
|
|
||||||
if !response.status().is_success() {
|
if !response.status().is_success() {
|
||||||
return Err(());
|
return Err(());
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -85,7 +85,7 @@ async fn package_versions(
|
|||||||
if let Some(proxy_url) = &state.config.pypi.proxy {
|
if let Some(proxy_url) = &state.config.pypi.proxy {
|
||||||
let url = format!("{}/{}/", proxy_url.trim_end_matches('/'), normalized);
|
let url = format!("{}/{}/", proxy_url.trim_end_matches('/'), normalized);
|
||||||
|
|
||||||
if let Ok(html) = fetch_package_page(&url, state.config.pypi.proxy_timeout).await {
|
if let Ok(html) = fetch_package_page(&state.http_client, &url, state.config.pypi.proxy_timeout).await {
|
||||||
// Rewrite URLs in the HTML to point to our registry
|
// Rewrite URLs in the HTML to point to our registry
|
||||||
let rewritten = rewrite_pypi_links(&html, &normalized);
|
let rewritten = rewrite_pypi_links(&html, &normalized);
|
||||||
return (StatusCode::OK, Html(rewritten)).into_response();
|
return (StatusCode::OK, Html(rewritten)).into_response();
|
||||||
@@ -130,10 +130,10 @@ async fn download_file(
|
|||||||
// First, fetch the package page to find the actual download URL
|
// First, fetch the package page to find the actual download URL
|
||||||
let page_url = format!("{}/{}/", proxy_url.trim_end_matches('/'), normalized);
|
let page_url = format!("{}/{}/", proxy_url.trim_end_matches('/'), normalized);
|
||||||
|
|
||||||
if let Ok(html) = fetch_package_page(&page_url, state.config.pypi.proxy_timeout).await {
|
if let Ok(html) = fetch_package_page(&state.http_client, &page_url, state.config.pypi.proxy_timeout).await {
|
||||||
// Find the URL for this specific file
|
// Find the URL for this specific file
|
||||||
if let Some(file_url) = find_file_url(&html, &filename) {
|
if let Some(file_url) = find_file_url(&html, &filename) {
|
||||||
if let Ok(data) = fetch_file(&file_url, state.config.pypi.proxy_timeout).await {
|
if let Ok(data) = fetch_file(&state.http_client, &file_url, state.config.pypi.proxy_timeout).await {
|
||||||
state.metrics.record_download("pypi");
|
state.metrics.record_download("pypi");
|
||||||
state.metrics.record_cache_miss();
|
state.metrics.record_cache_miss();
|
||||||
state.activity.push(ActivityEntry::new(
|
state.activity.push(ActivityEntry::new(
|
||||||
@@ -177,14 +177,10 @@ fn normalize_name(name: &str) -> String {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Fetch package page from upstream
|
/// Fetch package page from upstream
|
||||||
async fn fetch_package_page(url: &str, timeout_secs: u64) -> Result<String, ()> {
|
async fn fetch_package_page(client: &reqwest::Client, url: &str, timeout_secs: u64) -> Result<String, ()> {
|
||||||
let client = reqwest::Client::builder()
|
|
||||||
.timeout(Duration::from_secs(timeout_secs))
|
|
||||||
.build()
|
|
||||||
.map_err(|_| ())?;
|
|
||||||
|
|
||||||
let response = client
|
let response = client
|
||||||
.get(url)
|
.get(url)
|
||||||
|
.timeout(Duration::from_secs(timeout_secs))
|
||||||
.header("Accept", "text/html")
|
.header("Accept", "text/html")
|
||||||
.send()
|
.send()
|
||||||
.await
|
.await
|
||||||
@@ -198,14 +194,14 @@ async fn fetch_package_page(url: &str, timeout_secs: u64) -> Result<String, ()>
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Fetch file from upstream
|
/// Fetch file from upstream
|
||||||
async fn fetch_file(url: &str, timeout_secs: u64) -> Result<Vec<u8>, ()> {
|
async fn fetch_file(client: &reqwest::Client, url: &str, timeout_secs: u64) -> Result<Vec<u8>, ()> {
|
||||||
let client = reqwest::Client::builder()
|
let response = client
|
||||||
|
.get(url)
|
||||||
.timeout(Duration::from_secs(timeout_secs))
|
.timeout(Duration::from_secs(timeout_secs))
|
||||||
.build()
|
.send()
|
||||||
|
.await
|
||||||
.map_err(|_| ())?;
|
.map_err(|_| ())?;
|
||||||
|
|
||||||
let response = client.get(url).send().await.map_err(|_| ())?;
|
|
||||||
|
|
||||||
if !response.status().is_success() {
|
if !response.status().is_success() {
|
||||||
return Err(());
|
return Err(());
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user