Add retry support to sparse registries

This commit is contained in:
Arlo Siemsen 2022-09-09 15:58:58 -05:00 committed by Arlo Siemsen
parent 37f39f6993
commit dcc512b317
10 changed files with 196 additions and 103 deletions

View file

@ -72,7 +72,7 @@ pub struct RegistryBuilder {
/// Write the registry in configuration.
configure_registry: bool,
/// API responders.
custom_responders: HashMap<&'static str, Box<dyn Send + Fn(&Request) -> Response>>,
custom_responders: HashMap<&'static str, Box<dyn Send + Fn(&Request, &HttpServer) -> Response>>,
}
pub struct TestRegistry {
@ -117,7 +117,7 @@ impl RegistryBuilder {
/// Adds a custom HTTP response for a specific url
#[must_use]
pub fn add_responder<R: 'static + Send + Fn(&Request) -> Response>(
pub fn add_responder<R: 'static + Send + Fn(&Request, &HttpServer) -> Response>(
mut self,
url: &'static str,
responder: R,
@ -497,12 +497,12 @@ pub struct Response {
pub body: Vec<u8>,
}
struct HttpServer {
pub struct HttpServer {
listener: TcpListener,
registry_path: PathBuf,
dl_path: PathBuf,
token: Option<String>,
custom_responders: HashMap<&'static str, Box<dyn Send + Fn(&Request) -> Response>>,
custom_responders: HashMap<&'static str, Box<dyn Send + Fn(&Request, &HttpServer) -> Response>>,
}
impl HttpServer {
@ -510,7 +510,10 @@ impl HttpServer {
registry_path: PathBuf,
dl_path: PathBuf,
token: Option<String>,
api_responders: HashMap<&'static str, Box<dyn Send + Fn(&Request) -> Response>>,
api_responders: HashMap<
&'static str,
Box<dyn Send + Fn(&Request, &HttpServer) -> Response>,
>,
) -> HttpServerHandle {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
@ -620,7 +623,7 @@ impl HttpServer {
// Check for custom responder
if let Some(responder) = self.custom_responders.get(req.url.path()) {
return responder(&req);
return responder(&req, self);
}
let path: Vec<_> = req.url.path()[1..].split('/').collect();
match (req.method.as_str(), path.as_slice()) {
@ -668,7 +671,7 @@ impl HttpServer {
}
/// Unauthorized response
fn unauthorized(&self, _req: &Request) -> Response {
pub fn unauthorized(&self, _req: &Request) -> Response {
Response {
code: 401,
headers: vec![],
@ -677,7 +680,7 @@ impl HttpServer {
}
/// Not found response
fn not_found(&self, _req: &Request) -> Response {
pub fn not_found(&self, _req: &Request) -> Response {
Response {
code: 404,
headers: vec![],
@ -686,7 +689,7 @@ impl HttpServer {
}
/// Respond OK without doing anything
fn ok(&self, _req: &Request) -> Response {
pub fn ok(&self, _req: &Request) -> Response {
Response {
code: 200,
headers: vec![],
@ -694,8 +697,17 @@ impl HttpServer {
}
}
/// Return an internal server error (HTTP 500)
pub fn internal_server_error(&self, _req: &Request) -> Response {
Response {
code: 500,
headers: vec![],
body: br#"internal server error"#.to_vec(),
}
}
/// Serve the download endpoint
fn dl(&self, req: &Request) -> Response {
pub fn dl(&self, req: &Request) -> Response {
let file = self
.dl_path
.join(req.url.path().strip_prefix("/dl/").unwrap());
@ -711,7 +723,7 @@ impl HttpServer {
}
/// Serve the registry index
fn index(&self, req: &Request) -> Response {
pub fn index(&self, req: &Request) -> Response {
let file = self
.registry_path
.join(req.url.path().strip_prefix("/index/").unwrap());
@ -761,7 +773,7 @@ impl HttpServer {
}
}
fn publish(&self, req: &Request) -> Response {
pub fn publish(&self, req: &Request) -> Response {
if let Some(body) = &req.body {
// Get the metadata of the package
let (len, remaining) = body.split_at(4);

View file

@ -27,7 +27,7 @@ use crate::core::{Dependency, Manifest, PackageId, SourceId, Target};
use crate::core::{SourceMap, Summary, Workspace};
use crate::ops;
use crate::util::config::PackageCacheLock;
use crate::util::errors::{CargoResult, HttpNot200};
use crate::util::errors::{CargoResult, HttpNotSuccessful};
use crate::util::interning::InternedString;
use crate::util::network::Retry;
use crate::util::{self, internal, Config, Progress, ProgressStyle};
@ -868,18 +868,19 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
let code = handle.response_code()?;
if code != 200 && code != 0 {
let url = handle.effective_url()?.unwrap_or(url);
return Err(HttpNot200 {
return Err(HttpNotSuccessful {
code,
url: url.to_string(),
body: data,
}
.into());
}
Ok(())
Ok(data)
})
.with_context(|| format!("failed to download from `{}`", dl.url))?
};
match ret {
Some(()) => break (dl, data),
Some(data) => break (dl, data),
None => {
self.pending_ids.insert(dl.id);
self.enqueue(dl, handle)?

View file

@ -28,7 +28,7 @@ use crate::sources::{RegistrySource, SourceConfigMap, CRATES_IO_DOMAIN, CRATES_I
use crate::util::config::{self, Config, SslVersionConfig, SslVersionConfigRange};
use crate::util::errors::CargoResult;
use crate::util::important_paths::find_root_manifest_for_wd;
use crate::util::IntoUrl;
use crate::util::{truncate_with_ellipsis, IntoUrl};
use crate::{drop_print, drop_println, version};
mod auth;
@ -963,18 +963,6 @@ pub fn search(
limit: u32,
reg: Option<String>,
) -> CargoResult<()> {
fn truncate_with_ellipsis(s: &str, max_width: usize) -> String {
// We should truncate at grapheme-boundary and compute character-widths,
// yet the dependencies on unicode-segmentation and unicode-width are
// not worth it.
let mut chars = s.chars();
let mut prefix = (&mut chars).take(max_width - 1).collect::<String>();
if chars.next().is_some() {
prefix.push('…');
}
prefix
}
let (mut registry, _, source_id) =
registry(config, None, index.as_deref(), reg.as_deref(), false, false)?;
let (crates, total_crates) = registry.search(query, limit).with_context(|| {

View file

@ -7,8 +7,9 @@ use crate::ops;
use crate::sources::registry::download;
use crate::sources::registry::MaybeLock;
use crate::sources::registry::{LoadResponse, RegistryConfig, RegistryData};
use crate::util::errors::CargoResult;
use crate::util::{Config, Filesystem, IntoUrl, Progress, ProgressStyle};
use crate::util::errors::{CargoResult, HttpNotSuccessful};
use crate::util::network::Retry;
use crate::util::{internal, Config, Filesystem, IntoUrl, Progress, ProgressStyle};
use anyhow::Context;
use cargo_util::paths;
use curl::easy::{HttpVersion, List};
@ -83,15 +84,12 @@ pub struct Downloads<'cfg> {
/// When a download is started, it is added to this map. The key is a
/// "token" (see `Download::token`). It is removed once the download is
/// finished.
pending: HashMap<usize, (Download, EasyHandle)>,
/// Set of paths currently being downloaded, mapped to their tokens.
pending: HashMap<usize, (Download<'cfg>, EasyHandle)>,
/// Set of paths currently being downloaded.
/// This should stay in sync with `pending`.
pending_ids: HashMap<PathBuf, usize>,
/// The final result of each download. A pair `(token, result)`. This is a
/// temporary holding area, needed because curl can report multiple
/// downloads at once, but the main loop (`wait`) is written to only
/// handle one at a time.
results: HashMap<PathBuf, Result<CompletedDownload, curl::Error>>,
pending_paths: HashSet<PathBuf>,
/// The final result of each download.
results: HashMap<PathBuf, CargoResult<CompletedDownload>>,
/// The next ID to use for creating a token (see `Download::token`).
next: usize,
/// Progress bar.
@ -103,7 +101,7 @@ pub struct Downloads<'cfg> {
blocking_calls: usize,
}
struct Download {
struct Download<'cfg> {
/// The token for this download, used as the key of the `Downloads::pending` map
/// and stored in `EasyHandle` as well.
token: usize,
@ -116,6 +114,9 @@ struct Download {
/// ETag or Last-Modified header received from the server (if any).
index_version: RefCell<Option<String>>,
/// Logic used to track retrying this download if it's a spurious failure.
retry: Retry<'cfg>,
}
struct CompletedDownload {
@ -154,7 +155,7 @@ impl<'cfg> HttpRegistry<'cfg> {
downloads: Downloads {
next: 0,
pending: HashMap::new(),
pending_ids: HashMap::new(),
pending_paths: HashSet::new(),
results: HashMap::new(),
progress: RefCell::new(Some(Progress::with_style(
"Fetch",
@ -213,41 +214,64 @@ impl<'cfg> HttpRegistry<'cfg> {
fn handle_completed_downloads(&mut self) -> CargoResult<()> {
assert_eq!(
self.downloads.pending.len(),
self.downloads.pending_ids.len()
self.downloads.pending_paths.len()
);
// Collect the results from the Multi handle.
let pending = &mut self.downloads.pending;
self.multi.messages(|msg| {
let token = msg.token().expect("failed to read token");
let (_, handle) = &pending[&token];
let result = match msg.result_for(handle) {
Some(result) => result,
None => return, // transfer is not yet complete.
};
let (download, mut handle) = pending.remove(&token).unwrap();
self.downloads.pending_ids.remove(&download.path).unwrap();
let result = match result {
Ok(()) => {
self.downloads.downloads_finished += 1;
match handle.response_code() {
Ok(code) => Ok(CompletedDownload {
response_code: code,
data: download.data.take(),
index_version: download
.index_version
.take()
.unwrap_or_else(|| UNKNOWN.to_string()),
}),
Err(e) => Err(e),
let results = {
let mut results = Vec::new();
let pending = &mut self.downloads.pending;
self.multi.messages(|msg| {
let token = msg.token().expect("failed to read token");
let (_, handle) = &pending[&token];
if let Some(result) = msg.result_for(handle) {
results.push((token, result));
};
});
results
};
for (token, result) in results {
let (mut download, handle) = self.downloads.pending.remove(&token).unwrap();
let mut handle = self.multi.remove(handle)?;
let data = download.data.take();
let url = self.full_url(&download.path);
let result = match download.retry.r#try(|| {
result.with_context(|| format!("failed to download from `{}`", url))?;
let code = handle.response_code()?;
// Keep this list of expected status codes in sync with the codes handled in `load`
if !matches!(code, 200 | 304 | 401 | 404 | 451) {
let url = handle.effective_url()?.unwrap_or(&url);
return Err(HttpNotSuccessful {
code,
url: url.to_owned(),
body: data,
}
.into());
}
Ok(data)
}) {
Ok(Some(data)) => Ok(CompletedDownload {
response_code: handle.response_code()?,
data,
index_version: download
.index_version
.take()
.unwrap_or_else(|| UNKNOWN.to_string()),
}),
Ok(None) => {
// retry the operation
let handle = self.multi.add(handle)?;
self.downloads.pending.insert(token, (download, handle));
continue;
}
Err(e) => Err(e),
};
assert!(self.downloads.pending_paths.remove(&download.path));
self.downloads.results.insert(download.path, result);
});
self.downloads.downloads_finished += 1;
}
self.downloads.tick()?;
Ok(())
@ -305,7 +329,7 @@ impl<'cfg> RegistryData for HttpRegistry<'cfg> {
index_version: Option<&str>,
) -> Poll<CargoResult<LoadResponse>> {
trace!("load: {}", path.display());
if let Some(_token) = self.downloads.pending_ids.get(path) {
if let Some(_token) = self.downloads.pending_paths.get(path) {
debug!("dependency is still pending: {}", path.display());
return Poll::Pending;
}
@ -339,6 +363,8 @@ impl<'cfg> RegistryData for HttpRegistry<'cfg> {
debug!("downloaded the index file `{}` twice", path.display())
}
// The status handled here need to be kept in sync with the codes handled
// in `handle_completed_downloads`
match result.response_code {
200 => {}
304 => {
@ -355,13 +381,7 @@ impl<'cfg> RegistryData for HttpRegistry<'cfg> {
return Poll::Ready(Ok(LoadResponse::NotFound));
}
code => {
return Err(anyhow::anyhow!(
"server returned unexpected HTTP status code {} for {}\nbody: {}",
code,
self.full_url(path),
str::from_utf8(&result.data).unwrap_or("<invalid utf8>"),
))
.into();
return Err(internal(format!("unexpected HTTP status code {code}"))).into();
}
}
@ -371,13 +391,6 @@ impl<'cfg> RegistryData for HttpRegistry<'cfg> {
}));
}
if self.config.offline() {
return Poll::Ready(Err(anyhow::anyhow!(
"can't download index file from '{}': you are in offline mode (--offline)",
self.url
)));
}
// Looks like we're going to have to do a network request.
self.start_fetch()?;
@ -430,9 +443,8 @@ impl<'cfg> RegistryData for HttpRegistry<'cfg> {
let token = self.downloads.next;
self.downloads.next += 1;
debug!("downloading {} as {}", path.display(), token);
assert_eq!(
self.downloads.pending_ids.insert(path.to_path_buf(), token),
None,
assert!(
self.downloads.pending_paths.insert(path.to_path_buf()),
"path queued for download more than once"
);
@ -482,6 +494,7 @@ impl<'cfg> RegistryData for HttpRegistry<'cfg> {
data: RefCell::new(Vec::new()),
path: path.to_path_buf(),
index_version: RefCell::new(None),
retry: Retry::new(self.config)?,
};
// Finally add the request we've lined up to the pool of requests that cURL manages.
@ -598,7 +611,7 @@ impl<'cfg> RegistryData for HttpRegistry<'cfg> {
let timeout = self
.multi
.get_timeout()?
.unwrap_or_else(|| Duration::new(5, 0));
.unwrap_or_else(|| Duration::new(1, 0));
self.multi
.wait(&mut [], timeout)
.with_context(|| "failed to wait on curl `Multi`")?;

View file

@ -4,25 +4,32 @@ use anyhow::Error;
use std::fmt;
use std::path::PathBuf;
use super::truncate_with_ellipsis;
pub type CargoResult<T> = anyhow::Result<T>;
#[derive(Debug)]
pub struct HttpNot200 {
pub struct HttpNotSuccessful {
pub code: u32,
pub url: String,
pub body: Vec<u8>,
}
impl fmt::Display for HttpNot200 {
impl fmt::Display for HttpNotSuccessful {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let body = std::str::from_utf8(&self.body)
.map(|s| truncate_with_ellipsis(s, 512))
.unwrap_or_else(|_| format!("[{} non-utf8 bytes]", self.body.len()));
write!(
f,
"failed to get 200 response from `{}`, got {}",
"failed to get successful HTTP response from `{}`, got {}\nbody:\n{body}",
self.url, self.code
)
}
}
impl std::error::Error for HttpNot200 {}
impl std::error::Error for HttpNotSuccessful {}
// =============================================================================
// Verbose error

View file

@ -110,3 +110,15 @@ pub fn indented_lines(text: &str) -> String {
})
.collect()
}
pub fn truncate_with_ellipsis(s: &str, max_width: usize) -> String {
// We should truncate at grapheme-boundary and compute character-widths,
// yet the dependencies on unicode-segmentation and unicode-width are
// not worth it.
let mut chars = s.chars();
let mut prefix = (&mut chars).take(max_width - 1).collect::<String>();
if chars.next().is_some() {
prefix.push('…');
}
prefix
}

View file

@ -1,6 +1,6 @@
use anyhow::Error;
use crate::util::errors::{CargoResult, HttpNot200};
use crate::util::errors::{CargoResult, HttpNotSuccessful};
use crate::util::Config;
use std::task::Poll;
@ -31,6 +31,7 @@ impl<'a> Retry<'a> {
})
}
/// Returns `Ok(None)` for operations that should be re-tried.
pub fn r#try<T>(&mut self, f: impl FnOnce() -> CargoResult<T>) -> CargoResult<Option<T>> {
match f() {
Err(ref e) if maybe_spurious(e) && self.remaining > 0 => {
@ -73,7 +74,7 @@ fn maybe_spurious(err: &Error) -> bool {
return true;
}
}
if let Some(not_200) = err.downcast_ref::<HttpNot200>() {
if let Some(not_200) = err.downcast_ref::<HttpNotSuccessful>() {
if 500 <= not_200.code && not_200.code < 600 {
return true;
}
@ -114,14 +115,16 @@ fn with_retry_repeats_the_call_then_works() {
use crate::core::Shell;
//Error HTTP codes (5xx) are considered maybe_spurious and will prompt retry
let error1 = HttpNot200 {
let error1 = HttpNotSuccessful {
code: 501,
url: "Uri".to_string(),
body: Vec::new(),
}
.into();
let error2 = HttpNot200 {
let error2 = HttpNotSuccessful {
code: 502,
url: "Uri".to_string(),
body: Vec::new(),
}
.into();
let mut results: Vec<CargoResult<()>> = vec![Ok(()), Err(error1), Err(error2)];
@ -137,14 +140,16 @@ fn with_retry_finds_nested_spurious_errors() {
//Error HTTP codes (5xx) are considered maybe_spurious and will prompt retry
//String error messages are not considered spurious
let error1 = anyhow::Error::from(HttpNot200 {
let error1 = anyhow::Error::from(HttpNotSuccessful {
code: 501,
url: "Uri".to_string(),
body: Vec::new(),
});
let error1 = anyhow::Error::from(error1.context("A non-spurious wrapping err"));
let error2 = anyhow::Error::from(HttpNot200 {
let error2 = anyhow::Error::from(HttpNotSuccessful {
code: 502,
url: "Uri".to_string(),
body: Vec::new(),
});
let error2 = anyhow::Error::from(error2.context("A second chained error"));
let mut results: Vec<CargoResult<()>> = vec![Ok(()), Err(error1), Err(error2)];

View file

@ -1443,7 +1443,7 @@ fn api_error_json() {
let _registry = registry::RegistryBuilder::new()
.alternative()
.http_api()
.add_responder("/api/v1/crates/new", |_| Response {
.add_responder("/api/v1/crates/new", |_, _| Response {
body: br#"{"errors": [{"detail": "you must be logged in"}]}"#.to_vec(),
code: 403,
headers: vec![],
@ -1490,7 +1490,7 @@ fn api_error_200() {
let _registry = registry::RegistryBuilder::new()
.alternative()
.http_api()
.add_responder("/api/v1/crates/new", |_| Response {
.add_responder("/api/v1/crates/new", |_, _| Response {
body: br#"{"errors": [{"detail": "max upload size is 123"}]}"#.to_vec(),
code: 200,
headers: vec![],
@ -1537,7 +1537,7 @@ fn api_error_code() {
let _registry = registry::RegistryBuilder::new()
.alternative()
.http_api()
.add_responder("/api/v1/crates/new", |_| Response {
.add_responder("/api/v1/crates/new", |_, _| Response {
body: br#"go away"#.to_vec(),
code: 400,
headers: vec![],
@ -1590,7 +1590,7 @@ fn api_curl_error() {
let _registry = registry::RegistryBuilder::new()
.alternative()
.http_api()
.add_responder("/api/v1/crates/new", |_| {
.add_responder("/api/v1/crates/new", |_, _| {
panic!("broke");
})
.build();
@ -1639,7 +1639,7 @@ fn api_other_error() {
let _registry = registry::RegistryBuilder::new()
.alternative()
.http_api()
.add_responder("/api/v1/crates/new", |_| Response {
.add_responder("/api/v1/crates/new", |_, _| Response {
body: b"\xff".to_vec(),
code: 200,
headers: vec![],

View file

@ -11,6 +11,7 @@ use cargo_test_support::{git, install::cargo_home, t};
use cargo_util::paths::remove_dir_all;
use std::fs::{self, File};
use std::path::Path;
use std::sync::Mutex;
fn cargo_http(p: &Project, s: &str) -> Execs {
let mut e = p.cargo(s);
@ -2741,3 +2742,57 @@ Caused by:
)
.run();
}
#[cargo_test]
fn sparse_retry() {
let fail_count = Mutex::new(0);
let _registry = RegistryBuilder::new()
.http_index()
.add_responder("/index/3/b/bar", move |req, server| {
let mut fail_count = fail_count.lock().unwrap();
if *fail_count < 2 {
*fail_count += 1;
server.internal_server_error(req)
} else {
server.index(req)
}
})
.build();
let p = project()
.file(
"Cargo.toml",
r#"
[package]
name = "foo"
version = "0.0.1"
authors = []
[dependencies]
bar = ">= 0.0.0"
"#,
)
.file("src/main.rs", "fn main() {}")
.build();
Package::new("bar", "0.0.1").publish();
cargo_http(&p, "build")
.with_stderr(
"\
[UPDATING] `dummy-registry` index
warning: spurious network error (2 tries remaining): failed to get successful HTTP response from `[..]`, got 500
body:
internal server error
warning: spurious network error (1 tries remaining): failed to get successful HTTP response from `[..]`, got 500
body:
internal server error
[DOWNLOADING] crates ...
[DOWNLOADED] bar v0.0.1 (registry `dummy-registry`)
[COMPILING] bar v0.0.1
[COMPILING] foo v0.0.1 ([CWD])
[FINISHED] dev [unoptimized + debuginfo] target(s) in [..]s
",
)
.run();
}

View file

@ -78,7 +78,7 @@ postgres = \"0.17.3\" # A native, synchronous PostgreSQL client
fn setup() -> RegistryBuilder {
RegistryBuilder::new()
.http_api()
.add_responder("/api/v1/crates", |_| Response {
.add_responder("/api/v1/crates", |_, _| Response {
code: 200,
headers: vec![],
body: SEARCH_API_RESPONSE.to_vec(),