Parallelize downloads with HTTP/2

This commit implements parallel downloads using `libcurl` powered by
`libnghttp2` over HTTP/2. Using all of the previous refactorings this actually
implements usage of `Multi` to download crates in parallel. This achieves some
large wins locally, taking download times from 30s to 2s in the best case.

The standard output of Cargo is also changed as a result of this commit. It's
no longer useful for Cargo to print "Downloading ..." for each crate really as
they all start instantaneously. Instead Cargo now no longer prints `Downloading`
by default (unless attached to a pipe) and instead only has one progress bar for
all downloads. Currently this progress bar is discrete and based on the total
number of downloads, no longer specifying how much of one particular download
has happened. This provides a less granular view into what Cargo is doing but
it's hoped that it looks reasonable from an outside perspective as there's
still a progress bar indicating what's happening.
This commit is contained in:
Alex Crichton 2018-09-10 17:28:18 -07:00
parent 44a7ee7db8
commit 468f243e0e
8 changed files with 315 additions and 87 deletions

View file

@ -21,7 +21,7 @@ atty = "0.2"
crates-io = { path = "src/crates-io", version = "0.20" }
crossbeam-utils = "0.5"
crypto-hash = "0.3.1"
curl = "0.4.13"
curl = { version = "0.4.15", features = ['http2'] }
env_logger = "0.5.11"
failure = "0.1.2"
filetime = "0.2"

View file

@ -15,7 +15,7 @@
//! (for example, with and without tests), so we actually build a dependency
//! graph of `Unit`s, which capture these properties.
use std::cell::{RefCell, Cell};
use std::cell::RefCell;
use std::collections::{HashMap, HashSet};
use CargoResult;
@ -28,7 +28,7 @@ struct State<'a: 'tmp, 'cfg: 'a, 'tmp> {
bcx: &'tmp BuildContext<'a, 'cfg>,
deps: &'tmp mut HashMap<Unit<'a>, Vec<Unit<'a>>>,
pkgs: RefCell<&'tmp mut HashMap<&'a PackageId, &'a Package>>,
waiting_on_downloads: Cell<bool>,
waiting_on_download: HashSet<&'a PackageId>,
}
pub fn build_unit_dependencies<'a, 'cfg>(
@ -43,7 +43,7 @@ pub fn build_unit_dependencies<'a, 'cfg>(
bcx,
deps,
pkgs: RefCell::new(pkgs),
waiting_on_downloads: Cell::new(false),
waiting_on_download: HashSet::new(),
};
loop {
@ -64,7 +64,7 @@ pub fn build_unit_dependencies<'a, 'cfg>(
deps_of(unit, &mut state, profile_for)?;
}
if state.waiting_on_downloads.replace(false) {
if state.waiting_on_download.len() > 0 {
state.finish_some_downloads()?;
state.deps.clear();
} else {
@ -492,11 +492,14 @@ impl<'a, 'cfg, 'tmp> State<'a, 'cfg, 'tmp> {
if let Some(pkg) = pkgs.get(id) {
return Ok(Some(pkg))
}
if !self.waiting_on_download.insert(id) {
return Ok(None)
}
if let Some(pkg) = self.bcx.packages.start_download(id)? {
pkgs.insert(id, pkg);
self.waiting_on_download.remove(id);
return Ok(Some(pkg))
}
self.waiting_on_downloads.set(true);
Ok(None)
}
@ -512,6 +515,7 @@ impl<'a, 'cfg, 'tmp> State<'a, 'cfg, 'tmp> {
assert!(self.bcx.packages.remaining_downloads() > 0);
loop {
let pkg = self.bcx.packages.wait_for_download()?;
self.waiting_on_download.remove(pkg.package_id());
self.pkgs.borrow_mut().insert(pkg.package_id(), pkg);
// Arbitrarily choose that 5 or more packages concurrently download

View file

@ -1,20 +1,26 @@
use std::cell::{Ref, RefCell};
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::hash;
use std::mem;
use std::path::{Path, PathBuf};
use std::time::Duration;
use semver::Version;
use serde::ser;
use toml;
use lazycell::LazyCell;
use curl::easy::{Easy, HttpVersion};
use curl::multi::{Multi, EasyHandle};
use core::{Dependency, Manifest, PackageId, SourceId, Target};
use core::{FeatureMap, SourceMap, Summary};
use core::source::MaybePackage;
use core::interning::InternedString;
use util::{internal, lev_distance, Config};
use util::errors::{CargoResult, CargoResultExt};
use ops;
use util::{internal, lev_distance, Config, Progress, ProgressStyle};
use util::errors::{CargoResult, CargoResultExt, HttpNot200};
use util::network::Retry;
/// Information about a package that is available somewhere in the file system.
///
@ -237,17 +243,30 @@ impl hash::Hash for Package {
}
}
#[derive(Debug)]
pub struct PackageSet<'cfg> {
packages: HashMap<PackageId, LazyCell<Package>>,
sources: RefCell<SourceMap<'cfg>>,
config: &'cfg Config,
downloads: RefCell<Downloads>,
downloads: RefCell<Downloads<'cfg>>,
}
#[derive(Default, Debug)]
struct Downloads {
pending: Vec<(String, String, PackageId)>,
pub(crate) struct Downloads<'cfg> {
pending: HashMap<usize, (Download, EasyHandle)>,
pending_ids: HashSet<PackageId>,
results: Vec<(usize, CargoResult<()>)>,
next: usize,
multi: Multi,
retry: Retry<'cfg>,
progress: Progress<'cfg>,
downloads_finished: usize,
}
struct Download {
token: usize,
id: PackageId,
data: RefCell<Vec<u8>>,
url: String,
descriptor: String,
}
impl<'cfg> PackageSet<'cfg> {
@ -255,30 +274,61 @@ impl<'cfg> PackageSet<'cfg> {
package_ids: &[PackageId],
sources: SourceMap<'cfg>,
config: &'cfg Config,
) -> PackageSet<'cfg> {
PackageSet {
) -> CargoResult<PackageSet<'cfg>> {
// We've enabled the `http2` feature of `curl` in Cargo, so treat
// failures here as fatal as it would indicate a build-time problem.
let mut multi = Multi::new();
multi.pipelining(true, true)
.chain_err(|| "failed to enable multiplexing/pipelining in curl")?;
Ok(PackageSet {
packages: package_ids
.iter()
.map(|id| (id.clone(), LazyCell::new()))
.collect(),
sources: RefCell::new(sources),
config,
downloads: Default::default(),
}
downloads: RefCell::new(Downloads {
multi,
next: 0,
pending: HashMap::new(),
pending_ids: HashSet::new(),
results: Vec::new(),
retry: Retry::new(config)?,
progress: Progress::with_style(
"Downloading",
ProgressStyle::Ratio,
config,
),
downloads_finished: 0,
}),
})
}
pub fn package_ids<'a>(&'a self) -> Box<Iterator<Item = &'a PackageId> + 'a> {
Box::new(self.packages.keys())
}
/// Starts to download the package for the `id` specified.
///
/// Returns `None` if the package is queued up for download and will
/// eventually be returned from `wait_for_download`. Returns `Some(pkg)` if
/// the package is ready and doesn't need to be downloaded.
pub fn start_download(&self, id: &PackageId) -> CargoResult<Option<&Package>> {
let mut downloads = self.downloads.borrow_mut();
// First up see if we've already cached this package, in which case
// there's nothing to do.
let slot = self.packages
.get(id)
.ok_or_else(|| internal(format!("couldn't find `{}` in package set", id)))?;
if let Some(pkg) = slot.borrow() {
return Ok(Some(pkg));
}
// Ask the original source fo this `PackageId` for the corresponding
// package. That may immediately come back and tell us that the package
// is ready, or it could tell us that it needs to be downloaded.
let mut sources = self.sources.borrow_mut();
let source = sources
.get_mut(id.source_id())
@ -286,34 +336,135 @@ impl<'cfg> PackageSet<'cfg> {
let pkg = source
.download(id)
.chain_err(|| format_err!("unable to get packages from source"))?;
match pkg {
let (url, descriptor) = match pkg {
MaybePackage::Ready(pkg) => {
debug!("{} doesn't need a download", id);
assert!(slot.fill(pkg).is_ok());
Ok(Some(slot.borrow().unwrap()))
return Ok(Some(slot.borrow().unwrap()))
}
MaybePackage::Download { url, descriptor } => {
downloads.pending.push((url, descriptor, id.clone()));
Ok(None)
}
}
MaybePackage::Download { url, descriptor } => (url, descriptor),
};
// Ok we're going to download this crate, so let's set up all our
// internal state and hand off an `Easy` handle to our libcurl `Multi`
// handle. This won't actually start the transfer, but later it'll
// hapen during `wait_for_download`
let token = downloads.next;
downloads.next += 1;
debug!("downloading {} as {}", id, token);
assert!(downloads.pending_ids.insert(id.clone()));
let mut handle = ops::http_handle(self.config)?;
handle.get(true)?;
handle.url(&url)?;
handle.follow_location(true)?; // follow redirects
// Enable HTTP/2 to be used as it'll allow true multiplexing which makes
// downloads much faster. Currently Cargo requests the `http2` feature
// of the `curl` crate which means it should always be built in, so
// treat it as a fatal error of http/2 support isn't found.
handle.http_version(HttpVersion::V2)
.chain_err(|| "failed to enable HTTP2, is curl not built right?")?;
// This is an option to `libcurl` which indicates that if there's a
// bunch of parallel requests to the same host they all wait until the
// pipelining status of the host is known. This means that we won't
// initiate dozens of connections to crates.io, but rather only one.
// Once the main one is opened we realized that pipelining is possible
// and multiplexing is possible with static.crates.io. All in all this
// reduces the number of connections done to a more manageable state.
handle.pipewait(true)?;
handle.write_function(move |buf| {
debug!("{} - {} bytes of data", token, buf.len());
tls::with(|downloads| {
downloads.pending[&token].0.data.borrow_mut().extend_from_slice(buf);
});
Ok(buf.len())
})?;
let dl = Download {
token,
data: RefCell::new(Vec::new()),
id: id.clone(),
url,
descriptor,
};
downloads.enqueue(dl, handle)?;
Ok(None)
}
/// Returns the number of crates that are still downloading
pub fn remaining_downloads(&self) -> usize {
let downloads = self.downloads.borrow();
downloads.pending.len()
}
/// Blocks the current thread waiting for a package to finish downloading.
///
/// This method will wait for a previously enqueued package to finish
/// downloading and return a reference to it after it's done downloading.
///
/// # Panics
///
/// This function will panic if there are no remaining downloads.
pub fn wait_for_download(&self) -> CargoResult<&Package> {
let mut downloads = self.downloads.borrow_mut();
let (url, descriptor, id) = downloads.pending.pop().unwrap();
self.config.shell().status("Downloading", descriptor)?;
let data = download(self.config, &url)?;
let downloads = &mut *downloads;
downloads.tick_now()?;
let (dl, data) = loop {
assert_eq!(downloads.pending.len(), downloads.pending_ids.len());
let (token, result) = downloads.wait_for_curl()?;
debug!("{} finished with {:?}", token, result);
let (mut dl, handle) = downloads.pending.remove(&token)
.expect("got a token for a non-in-progress transfer");
let data = mem::replace(&mut *dl.data.borrow_mut(), Vec::new());
let mut handle = downloads.multi.remove(handle)?;
// Check if this was a spurious error. If it was a spurious error
// then we want to re-enqueue our request for another attempt and
// then we wait for another request to finish.
let ret = {
downloads.retry.try(|| {
result.chain_err(|| {
format!("failed to download from `{}`", dl.url)
})?;
let code = handle.response_code()?;
if code != 200 && code != 0 {
handle.url(&dl.url)?;
let url = handle.effective_url()?.unwrap_or(&dl.url);
return Err(HttpNot200 {
code,
url: url.to_string(),
}.into())
}
Ok(())
})?
};
if ret.is_some() {
break (dl, data)
}
downloads.enqueue(dl, handle)?;
};
downloads.pending_ids.remove(&dl.id);
if !downloads.progress.is_enabled() {
self.config.shell().status("Downloading", &dl.descriptor)?;
}
downloads.downloads_finished += 1;
downloads.tick_now()?;
// Inform the original source that the download is finished which
// should allow us to actually get the package and fill it in now.
let mut sources = self.sources.borrow_mut();
let source = sources
.get_mut(id.source_id())
.ok_or_else(|| internal(format!("couldn't find source for `{}`", id)))?;
let pkg = source.finish_download(&id, data)?;
let slot = &self.packages[&id];
.get_mut(dl.id.source_id())
.ok_or_else(|| internal(format!("couldn't find source for `{}`", dl.id)))?;
let pkg = source.finish_download(&dl.id, data)?;
let slot = &self.packages[&dl.id];
assert!(slot.fill(pkg).is_ok());
Ok(slot.borrow().unwrap())
}
@ -331,43 +482,91 @@ impl<'cfg> PackageSet<'cfg> {
}
}
fn download(config: &Config, url: &str) -> CargoResult<Vec<u8>> {
use util::network;
use util::Progress;
use util::errors::HttpNot200;
impl<'cfg> Downloads<'cfg> {
fn enqueue(&mut self, dl: Download, handle: Easy) -> CargoResult<()> {
let mut handle = self.multi.add(handle)?;
handle.set_token(dl.token)?;
self.pending.insert(dl.token, (dl, handle));
Ok(())
}
let mut handle = config.http()?.borrow_mut();
handle.get(true)?;
handle.url(&url)?;
handle.follow_location(true)?;
let mut body = Vec::new();
network::with_retry(config, || {
body = Vec::new();
let mut pb = Progress::new("Fetch", config);
{
handle.progress(true)?;
let mut handle = handle.transfer();
handle.progress_function(|dl_total, dl_cur, _, _| {
pb.tick(dl_cur as usize, dl_total as usize).is_ok()
})?;
handle.write_function(|buf| {
body.extend_from_slice(buf);
Ok(buf.len())
})?;
handle.perform().chain_err(|| {
format!("failed to download from `{}`", url)
fn wait_for_curl(&mut self) -> CargoResult<(usize, CargoResult<()>)> {
// This is the main workhorse loop. We use libcurl's portable `wait`
// method to actually perform blocking. This isn't necessarily too
// efficient in terms of fd management, but we should only be juggling
// a few anyway.
//
// Here we start off by asking the `multi` handle to do some work via
// the `perform` method. This will actually do I/O work (nonblocking)
// and attempt to make progress. Afterwards we ask about the `messages`
// contained in the handle which will inform us if anything has finished
// transferring.
//
// If we've got a finished transfer after all that work we break out
// and process the finished transfer at the end. Otherwise we need to
// actually block waiting for I/O to happen, which we achieve with the
// `wait` method on `multi`.
loop {
let n = tls::set(self, || {
self.multi.perform()
.chain_err(|| "failed to perform http requests")
})?;
debug!("handles remaining: {}", n);
let results = &mut self.results;
self.multi.messages(|msg| {
let token = msg.token().expect("failed to read token");
if let Some(result) = msg.result() {
results.push((token, result.map_err(|e| e.into())));
} else {
debug!("message without a result (?)");
}
});
if let Some(pair) = results.pop() {
break Ok(pair)
}
assert!(self.pending.len() > 0);
self.multi.wait(&mut [], Duration::new(60, 0))
.chain_err(|| "failed to wait on curl `Multi`")?;
}
let code = handle.response_code()?;
if code != 200 && code != 0 {
let url = handle.effective_url()?.unwrap_or(&url);
Err(HttpNot200 {
code,
url: url.to_string(),
}.into())
} else {
Ok(())
}
})?;
Ok(body)
}
fn tick_now(&mut self) -> CargoResult<()> {
self.progress.tick(
self.downloads_finished,
self.downloads_finished + self.pending.len(),
)
}
}
mod tls {
use std::cell::Cell;
use super::Downloads;
thread_local!(static PTR: Cell<usize> = Cell::new(0));
pub(crate) fn with<R>(f: impl FnOnce(&Downloads) -> R) -> R {
let ptr = PTR.with(|p| p.get());
assert!(ptr != 0);
unsafe {
f(&*(ptr as *const Downloads))
}
}
pub(crate) fn set<R>(dl: &Downloads, f: impl FnOnce() -> R) -> R {
struct Reset<'a, T: Copy + 'a>(&'a Cell<T>, T);
impl<'a, T: Copy> Drop for Reset<'a, T> {
fn drop(&mut self) {
self.0.set(self.1);
}
}
PTR.with(|p| {
let _reset = Reset(p, p.get());
p.set(dl as *const Downloads as usize);
f()
})
}
}

View file

@ -94,7 +94,7 @@ impl<'cfg> PackageRegistry<'cfg> {
})
}
pub fn get(self, package_ids: &[PackageId]) -> PackageSet<'cfg> {
pub fn get(self, package_ids: &[PackageId]) -> CargoResult<PackageSet<'cfg>> {
trace!("getting packages; sources={}", self.sources.len());
PackageSet::new(package_ids, self.sources, self.config)
}

View file

@ -518,7 +518,7 @@ where
let pkg = {
let mut map = SourceMap::new();
map.insert(Box::new(&mut source));
PackageSet::new(&[pkgid.clone()], map, config)
PackageSet::new(&[pkgid.clone()], map, config)?
.get_one(&pkgid)?
.clone()
};

View file

@ -16,7 +16,7 @@ use util::profile;
pub fn resolve_ws<'a>(ws: &Workspace<'a>) -> CargoResult<(PackageSet<'a>, Resolve)> {
let mut registry = PackageRegistry::new(ws.config())?;
let resolve = resolve_with_registry(ws, &mut registry, true)?;
let packages = get_resolved_packages(&resolve, registry);
let packages = get_resolved_packages(&resolve, registry)?;
Ok((packages, resolve))
}
@ -96,7 +96,7 @@ pub fn resolve_ws_with_method<'a>(
true,
)?;
let packages = get_resolved_packages(&resolved_with_overrides, registry);
let packages = get_resolved_packages(&resolved_with_overrides, registry)?;
Ok((packages, resolved_with_overrides))
}
@ -374,7 +374,7 @@ pub fn add_overrides<'a>(
pub fn get_resolved_packages<'a>(
resolve: &Resolve,
registry: PackageRegistry<'a>,
) -> PackageSet<'a> {
) -> CargoResult<PackageSet<'a>> {
let ids: Vec<PackageId> = resolve.iter().cloned().collect();
registry.get(&ids)
}

View file

@ -6,6 +6,38 @@ use failure::Error;
use util::Config;
use util::errors::{CargoResult, HttpNot200};
pub struct Retry<'a> {
config: &'a Config,
remaining: u32,
}
impl<'a> Retry<'a> {
pub fn new(config: &'a Config) -> CargoResult<Retry<'a>> {
Ok(Retry {
config,
remaining: config.get::<Option<u32>>("net.retry")?.unwrap_or(2),
})
}
pub fn try<T>(&mut self, f: impl FnOnce() -> CargoResult<T>)
-> CargoResult<Option<T>>
{
match f() {
Err(ref e) if maybe_spurious(e) && self.remaining > 0 => {
let msg = format!(
"spurious network error ({} tries \
remaining): {}",
self.remaining, e
);
self.config.shell().warn(msg)?;
self.remaining -= 1;
Ok(None)
}
other => other.map(Some),
}
}
}
fn maybe_spurious(err: &Error) -> bool {
for e in err.iter_chain() {
if let Some(git_err) = e.downcast_ref::<git2::Error>() {
@ -48,21 +80,10 @@ pub fn with_retry<T, F>(config: &Config, mut callback: F) -> CargoResult<T>
where
F: FnMut() -> CargoResult<T>,
{
let mut remaining = config.get::<Option<u32>>("net.retry")?.unwrap_or(2);
let mut retry = Retry::new(config)?;
loop {
match callback() {
Ok(ret) => return Ok(ret),
Err(ref e) if maybe_spurious(e) && remaining > 0 => {
let msg = format!(
"spurious network error ({} tries \
remaining): {}",
remaining, e
);
config.shell().warn(msg)?;
remaining -= 1;
}
//todo impl from
Err(e) => return Err(e),
if let Some(ret) = retry.try(&mut callback)? {
return Ok(ret)
}
}
}

View file

@ -62,6 +62,10 @@ impl<'cfg> Progress<'cfg> {
self.state = None;
}
pub fn is_enabled(&self) -> bool {
self.state.is_some()
}
pub fn new(name: &str, cfg: &'cfg Config) -> Progress<'cfg> {
Self::with_style(name, ProgressStyle::Percentage, cfg)
}