chore(unstable/publish): ordered publish of packages in workspace (#21550)

Co-authored-by: Luca Casonato <hello@lcas.dev>
This commit is contained in:
David Sherret 2023-12-14 10:55:56 +01:00 committed by GitHub
parent 5b96f7bf21
commit 4b6fc64646
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 573 additions and 136 deletions

View file

@ -1,9 +1,8 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use std::collections::HashMap;
use std::fmt::Write;
use std::io::IsTerminal;
use std::path::Path;
use std::path::PathBuf;
use std::rc::Rc;
use std::sync::Arc;
@ -16,6 +15,8 @@ use deno_core::anyhow::Context;
use deno_core::error::AnyError;
use deno_core::serde_json;
use deno_core::serde_json::json;
use deno_core::unsync::JoinHandle;
use deno_core::unsync::JoinSet;
use deno_runtime::colors;
use deno_runtime::deno_fetch::reqwest;
use http::header::AUTHORIZATION;
@ -34,6 +35,9 @@ use crate::factory::CliFactory;
use crate::http_util::HttpClient;
use crate::util::import_map::ImportMapUnfurler;
use self::publish_order::PublishOrderGraph;
mod publish_order;
mod tar;
enum AuthMethod {
@ -75,31 +79,22 @@ static SUGGESTED_ENTRYPOINTS: [&str; 4] =
["mod.ts", "mod.js", "index.ts", "index.js"];
async fn prepare_publish(
initial_cwd: &Path,
directory: PathBuf,
import_map: &ImportMap,
deno_json: &ConfigFile,
import_map: Arc<ImportMap>,
) -> Result<PreparedPublishPackage, AnyError> {
let directory_path = initial_cwd.join(directory);
// TODO: doesn't handle jsonc
let deno_json_path = directory_path.join("deno.json");
let deno_json = ConfigFile::read(&deno_json_path).with_context(|| {
format!(
"Failed to read deno configuration file at {}",
deno_json_path.display()
)
})?;
let config_path = deno_json.specifier.to_file_path().unwrap();
let dir_path = config_path.parent().unwrap().to_path_buf();
let Some(version) = deno_json.json.version.clone() else {
bail!("{} is missing 'version' field", deno_json_path.display());
bail!("{} is missing 'version' field", deno_json.specifier);
};
let Some(name) = deno_json.json.name.clone() else {
bail!("{} is missing 'name' field", deno_json_path.display());
bail!("{} is missing 'name' field", deno_json.specifier);
};
if deno_json.json.exports.is_none() {
let mut suggested_entrypoint = None;
for entrypoint in SUGGESTED_ENTRYPOINTS {
if directory_path.join(entrypoint).exists() {
if dir_path.join(entrypoint).exists() {
suggested_entrypoint = Some(entrypoint);
break;
}
@ -119,7 +114,7 @@ async fn prepare_publish(
bail!(
"You did not specify an entrypoint to \"{}\" package in {}. Add `exports` mapping in the configuration file, eg:\n{}",
name,
deno_json_path.display(),
deno_json.specifier,
exports_content
);
}
@ -130,11 +125,12 @@ async fn prepare_publish(
bail!("Invalid package name, use '@<scope_name>/<package_name> format");
};
let unfurler = ImportMapUnfurler::new(import_map);
let (tarball, diagnostics) =
tar::create_gzipped_tarball(directory_path, unfurler)
.context("Failed to create a tarball")?;
let (tarball, diagnostics) = deno_core::unsync::spawn_blocking(move || {
let unfurler = ImportMapUnfurler::new(&import_map);
tar::create_gzipped_tarball(&dir_path, unfurler)
.context("Failed to create a tarball")
})
.await??;
let tarball_hash_bytes: Vec<u8> =
sha2::Sha256::digest(&tarball).iter().cloned().collect();
@ -298,12 +294,14 @@ fn print_diagnostics(diagnostics: Vec<String>) {
async fn perform_publish(
http_client: &Arc<HttpClient>,
packages: Vec<PreparedPublishPackage>,
mut publish_order_graph: PublishOrderGraph,
mut prepared_package_by_name: HashMap<String, PreparedPublishPackage>,
auth_method: AuthMethod,
) -> Result<(), AnyError> {
let client = http_client.client()?;
let registry_url = deno_registry_api_url().to_string();
let packages = prepared_package_by_name.values().collect::<Vec<_>>();
let diagnostics = packages
.iter()
.flat_map(|p| p.diagnostics.clone())
@ -320,7 +318,9 @@ async fn perform_publish(
})
.collect::<Vec<_>>();
let authorizations = match auth_method {
let mut authorizations = HashMap::with_capacity(packages.len());
match auth_method {
AuthMethod::Interactive => {
let verifier = uuid::Uuid::new_v4().to_string();
let challenge = BASE64_STANDARD.encode(sha2::Sha256::digest(&verifier));
@ -376,11 +376,13 @@ async fn perform_publish(
colors::cyan(res.user.name)
);
let authorization: Rc<str> = format!("Bearer {}", res.token).into();
let mut authorizations = Vec::new();
for _ in &packages {
authorizations.push(authorization.clone());
for pkg in &packages {
authorizations.insert(
(pkg.scope.clone(), pkg.package.clone(), pkg.version.clone()),
authorization.clone(),
);
}
break authorizations;
break;
}
Err(err) => {
if err.code == "authorizationPending" {
@ -394,14 +396,15 @@ async fn perform_publish(
}
AuthMethod::Token(token) => {
let authorization: Rc<str> = format!("Bearer {}", token).into();
let mut authorizations = Vec::new();
for _ in &packages {
authorizations.push(authorization.clone());
for pkg in &packages {
authorizations.insert(
(pkg.scope.clone(), pkg.package.clone(), pkg.version.clone()),
authorization.clone(),
);
}
authorizations
}
AuthMethod::Oidc(oidc_config) => {
let mut authorizations = Vec::new();
let mut chunked_packages = packages.chunks(16);
for permissions in permissions.chunks(16) {
let audience = json!({ "permissions": permissions }).to_string();
let url = format!(
@ -439,110 +442,151 @@ async fn perform_publish(
})?;
let authorization: Rc<str> = format!("githuboidc {}", value).into();
for _ in permissions {
authorizations.push(authorization.clone());
for pkg in chunked_packages.next().unwrap() {
authorizations.insert(
(pkg.scope.clone(), pkg.package.clone(), pkg.version.clone()),
authorization.clone(),
);
}
}
authorizations
}
};
assert_eq!(packages.len(), authorizations.len());
for (package, authorization) in
packages.into_iter().zip(authorizations.into_iter())
{
println!(
"{} @{}/{}@{} ...",
colors::intense_blue("Publishing"),
package.scope,
package.package,
package.version
);
assert_eq!(prepared_package_by_name.len(), authorizations.len());
let mut futures: JoinSet<Result<String, AnyError>> = JoinSet::default();
loop {
let next_batch = publish_order_graph.next();
let url = format!(
"{}scopes/{}/packages/{}/versions/{}",
registry_url, package.scope, package.package, package.version
);
for package_name in next_batch {
let package = prepared_package_by_name.remove(&package_name).unwrap();
let authorization = authorizations
.remove(&(
package.scope.clone(),
package.package.clone(),
package.version.clone(),
))
.unwrap();
let registry_url = registry_url.clone();
let http_client = http_client.clone();
futures.spawn(async move {
let display_name =
format!("@{}/{}@{}", package.scope, package.package, package.version);
publish_package(&http_client, package, &registry_url, &authorization)
.await
.with_context(|| format!("Failed to publish {}", display_name))?;
Ok(package_name)
});
}
let response = client
.post(url)
.header(AUTHORIZATION, &*authorization)
.header(CONTENT_ENCODING, "gzip")
.body(package.tarball)
.send()
.await?;
let res = parse_response::<PublishingTask>(response).await;
let mut task = match res {
Ok(task) => task,
Err(err) if err.code == "duplicateVersionPublish" => {
println!(
"{} @{}/{}@{}",
colors::yellow("Skipping, already published"),
package.scope,
package.package,
package.version
);
continue;
}
Err(err) => {
return Err(err).with_context(|| {
format!(
"Failed to publish @{}/{} at {}",
package.scope, package.package, package.version
)
})
}
let Some(result) = futures.join_next().await else {
// done, ensure no circular dependency
publish_order_graph.ensure_no_pending()?;
break;
};
let interval = std::time::Duration::from_secs(2);
while task.status != "success" && task.status != "failure" {
tokio::time::sleep(interval).await;
let resp = client
.get(format!("{}publish_status/{}", registry_url, task.id))
.send()
.await
.with_context(|| {
format!(
"Failed to get publishing status for @{}/{} at {}",
package.scope, package.package, package.version
)
})?;
task =
parse_response::<PublishingTask>(resp)
.await
.with_context(|| {
format!(
"Failed to get publishing status for @{}/{} at {}",
package.scope, package.package, package.version
)
})?;
}
let package_name = result??;
publish_order_graph.finish_package(&package_name);
}
if let Some(error) = task.error {
bail!(
"{} @{}/{} at {}: {}",
colors::red("Failed to publish"),
Ok(())
}
async fn publish_package(
http_client: &HttpClient,
package: PreparedPublishPackage,
registry_url: &str,
authorization: &str,
) -> Result<(), AnyError> {
let client = http_client.client()?;
println!(
"{} @{}/{}@{} ...",
colors::intense_blue("Publishing"),
package.scope,
package.package,
package.version
);
let url = format!(
"{}scopes/{}/packages/{}/versions/{}",
registry_url, package.scope, package.package, package.version
);
let response = client
.post(url)
.header(AUTHORIZATION, authorization)
.header(CONTENT_ENCODING, "gzip")
.body(package.tarball)
.send()
.await?;
let res = parse_response::<PublishingTask>(response).await;
let mut task = match res {
Ok(task) => task,
Err(err) if err.code == "duplicateVersionPublish" => {
println!(
"{} @{}/{}@{}",
colors::yellow("Skipping, already published"),
package.scope,
package.package,
package.version,
error.message
package.version
);
return Ok(());
}
Err(err) => {
return Err(err).with_context(|| {
format!(
"Failed to publish @{}/{} at {}",
package.scope, package.package, package.version
)
})
}
};
println!(
"{} @{}/{}@{}",
colors::green("Successfully published"),
let interval = std::time::Duration::from_secs(2);
while task.status != "success" && task.status != "failure" {
tokio::time::sleep(interval).await;
let resp = client
.get(format!("{}publish_status/{}", registry_url, task.id))
.send()
.await
.with_context(|| {
format!(
"Failed to get publishing status for @{}/{} at {}",
package.scope, package.package, package.version
)
})?;
task = parse_response::<PublishingTask>(resp)
.await
.with_context(|| {
format!(
"Failed to get publishing status for @{}/{} at {}",
package.scope, package.package, package.version
)
})?;
}
if let Some(error) = task.error {
bail!(
"{} @{}/{} at {}: {}",
colors::red("Failed to publish"),
package.scope,
package.package,
package.version
);
println!(
"{}@{}/{}/{}_meta.json",
registry_url, package.scope, package.package, package.version
package.version,
error.message
);
}
println!(
"{} @{}/{}@{}",
colors::green("Successfully published"),
package.scope,
package.package,
package.version
);
println!(
"{}@{}/{}/{}_meta.json",
registry_url, package.scope, package.package, package.version
);
Ok(())
}
@ -601,25 +645,62 @@ pub async fn publish(
)
})?;
let mut packages =
Vec::with_capacity(std::cmp::max(1, deno_json.json.workspaces.len()));
let workspace_config = deno_json.to_workspace_config()?;
let members = &deno_json.json.workspaces;
if members.is_empty() {
packages
.push(prepare_publish(&initial_cwd, directory_path, &import_map).await?);
} else {
println!("Publishing a workspace...");
for member in members {
let member_dir = directory_path.join(member);
packages
.push(prepare_publish(&initial_cwd, member_dir, &import_map).await?);
let (publish_order_graph, prepared_package_by_name) = match workspace_config {
Some(workspace_config) => {
println!("Publishing a workspace...");
let mut prepared_package_by_name =
HashMap::with_capacity(workspace_config.members.len());
let publish_order_graph = publish_order::build_publish_graph(
&workspace_config,
cli_factory.module_graph_builder().await?.as_ref(),
)
.await?;
let results = workspace_config
.members
.iter()
.cloned()
.map(|member| {
let import_map = import_map.clone();
deno_core::unsync::spawn(async move {
let package = prepare_publish(&member.config_file, import_map)
.await
.with_context(|| {
format!("Failed preparing '{}'.", member.package_name)
})?;
Ok((member.package_name, package))
})
})
.collect::<Vec<JoinHandle<Result<(String, PreparedPublishPackage), AnyError>>>>();
let results = deno_core::futures::future::join_all(results).await;
for result in results {
let (package_name, package) = result??;
prepared_package_by_name.insert(package_name, package);
}
(publish_order_graph, prepared_package_by_name)
}
}
None => {
let mut prepared_package_by_name = HashMap::with_capacity(1);
let package = prepare_publish(&deno_json, import_map).await?;
let package_name = package.package.clone();
let publish_order_graph =
PublishOrderGraph::new_single(package_name.clone());
prepared_package_by_name.insert(package_name, package);
(publish_order_graph, prepared_package_by_name)
}
};
if packages.is_empty() {
if prepared_package_by_name.is_empty() {
bail!("No packages to publish");
}
perform_publish(cli_factory.http_client(), packages, auth_method).await
perform_publish(
cli_factory.http_client(),
publish_order_graph,
prepared_package_by_name,
auth_method,
)
.await
}

View file

@ -0,0 +1,356 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use std::collections::HashMap;
use std::collections::HashSet;
use std::collections::VecDeque;
use deno_ast::ModuleSpecifier;
use deno_config::WorkspaceConfig;
use deno_core::anyhow::bail;
use deno_core::anyhow::Context;
use deno_core::error::AnyError;
use crate::graph_util::ModuleGraphBuilder;
pub struct PublishOrderGraph {
packages: HashMap<String, HashSet<String>>,
in_degree: HashMap<String, usize>,
reverse_map: HashMap<String, Vec<String>>,
}
impl PublishOrderGraph {
pub fn new_single(package_name: String) -> Self {
Self {
packages: HashMap::from([(package_name.clone(), HashSet::new())]),
in_degree: HashMap::from([(package_name.clone(), 0)]),
reverse_map: HashMap::from([(package_name, Vec::new())]),
}
}
pub fn next(&mut self) -> Vec<String> {
let mut package_names_with_depth = self
.in_degree
.iter()
.filter_map(|(name, &degree)| if degree == 0 { Some(name) } else { None })
.map(|item| (item.clone(), self.compute_depth(item, HashSet::new())))
.collect::<Vec<_>>();
// sort by depth to in order to prioritize those packages
package_names_with_depth.sort_by(|a, b| match b.1.cmp(&a.1) {
std::cmp::Ordering::Equal => a.0.cmp(&b.0),
other => other,
});
let sorted_package_names = package_names_with_depth
.into_iter()
.map(|(name, _)| name)
.collect::<Vec<_>>();
for name in &sorted_package_names {
self.in_degree.remove(name);
}
sorted_package_names
}
pub fn finish_package(&mut self, name: &str) {
if let Some(package_names) = self.reverse_map.remove(name) {
for name in package_names {
*self.in_degree.get_mut(&name).unwrap() -= 1;
}
}
}
/// There could be pending packages if there's a circular dependency.
pub fn ensure_no_pending(&self) -> Result<(), AnyError> {
// this is inefficient, but that's ok because it's simple and will
// only ever happen when there's an error
fn identify_cycle<'a>(
current_name: &'a String,
mut visited: HashSet<&'a String>,
packages: &HashMap<String, HashSet<String>>,
) -> Option<Vec<String>> {
if visited.insert(current_name) {
let deps = packages.get(current_name).unwrap();
for dep in deps {
if let Some(mut cycle) =
identify_cycle(dep, visited.clone(), packages)
{
cycle.push(current_name.to_string());
return Some(cycle);
}
}
None
} else {
Some(vec![current_name.to_string()])
}
}
if self.in_degree.is_empty() {
Ok(())
} else {
let mut pkg_names = self.in_degree.keys().collect::<Vec<_>>();
pkg_names.sort(); // determinism
let mut cycle =
identify_cycle(pkg_names[0], HashSet::new(), &self.packages).unwrap();
cycle.reverse();
bail!(
"Circular package dependency detected: {}",
cycle.join(" -> ")
);
}
}
fn compute_depth(
&self,
package_name: &String,
mut visited: HashSet<String>,
) -> usize {
if visited.contains(package_name) {
return 0; // cycle
}
visited.insert(package_name.clone());
let Some(parents) = self.reverse_map.get(package_name) else {
return 0;
};
let max_depth = parents
.iter()
.map(|child| self.compute_depth(child, visited.clone()))
.max()
.unwrap_or(0);
1 + max_depth
}
}
pub async fn build_publish_graph(
workspace_config: &WorkspaceConfig,
module_graph_builder: &ModuleGraphBuilder,
) -> Result<PublishOrderGraph, AnyError> {
let roots = get_workspace_roots(workspace_config)?;
let graph = module_graph_builder
.create_graph(
deno_graph::GraphKind::All,
roots.iter().flat_map(|r| r.exports.clone()).collect(),
)
.await?;
graph.valid()?;
let packages = build_pkg_deps(graph, roots);
Ok(build_graph(packages))
}
#[derive(Debug)]
struct MemberRoots {
name: String,
dir_url: ModuleSpecifier,
exports: Vec<ModuleSpecifier>,
}
fn get_workspace_roots(
config: &WorkspaceConfig,
) -> Result<Vec<MemberRoots>, AnyError> {
let mut members = Vec::with_capacity(config.members.len());
let mut seen_names = HashSet::with_capacity(config.members.len());
for member in &config.members {
let exports_config = member
.config_file
.to_exports_config()
.with_context(|| {
format!(
"Failed to parse exports at {}",
member.config_file.specifier
)
})?
.into_map();
if !seen_names.insert(&member.package_name) {
bail!(
"Cannot have two workspace packages with the same name ('{}' at {})",
member.package_name,
member.path.display(),
);
}
let mut member_root = MemberRoots {
name: member.package_name.clone(),
dir_url: member.config_file.specifier.join("./").unwrap().clone(),
exports: Vec::with_capacity(exports_config.len()),
};
for (_, value) in exports_config {
let entry_point =
member.config_file.specifier.join(&value).with_context(|| {
format!(
"Failed to join {} with {}",
member.config_file.specifier, value
)
})?;
member_root.exports.push(entry_point);
}
members.push(member_root);
}
Ok(members)
}
fn build_pkg_deps(
graph: deno_graph::ModuleGraph,
roots: Vec<MemberRoots>,
) -> HashMap<String, HashSet<String>> {
let mut members = HashMap::with_capacity(roots.len());
let mut seen_modules = HashSet::with_capacity(graph.modules().count());
for root in &roots {
let mut deps = HashSet::new();
let mut pending = VecDeque::new();
pending.extend(root.exports.clone());
while let Some(specifier) = pending.pop_front() {
let Some(module) = graph.get(&specifier).and_then(|m| m.esm()) else {
continue;
};
let mut dep_specifiers =
Vec::with_capacity(module.dependencies.len() + 1);
if let Some(types_dep) = &module.maybe_types_dependency {
if let Some(specifier) = types_dep.dependency.maybe_specifier() {
dep_specifiers.push(specifier);
}
}
for (_, dep) in &module.dependencies {
if let Some(specifier) = dep.maybe_code.maybe_specifier() {
dep_specifiers.push(specifier);
}
if let Some(specifier) = dep.maybe_type.maybe_specifier() {
dep_specifiers.push(specifier);
}
}
for specifier in dep_specifiers {
let specifier = graph.resolve(specifier);
if specifier.scheme() != "file" {
continue;
}
if specifier.as_str().starts_with(root.dir_url.as_str()) {
if seen_modules.insert(specifier.clone()) {
pending.push_back(specifier.clone());
}
} else {
let found_root = roots
.iter()
.find(|root| specifier.as_str().starts_with(root.dir_url.as_str()));
if let Some(root) = found_root {
deps.insert(root.name.clone());
}
}
}
}
members.insert(root.name.clone(), deps);
}
members
}
fn build_graph(
packages: HashMap<String, HashSet<String>>,
) -> PublishOrderGraph {
let mut in_degree = HashMap::new();
let mut reverse_map: HashMap<String, Vec<String>> = HashMap::new();
// build the graph, in-degree map, and set of all nodes
for (pkg_name, deps) in &packages {
in_degree.insert(pkg_name.clone(), deps.len());
for dep in deps {
reverse_map
.entry(dep.clone())
.or_default()
.push(pkg_name.clone());
}
}
PublishOrderGraph {
packages: packages.clone(),
in_degree,
reverse_map,
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_graph_no_deps() {
let mut graph = build_graph(HashMap::from([
("a".to_string(), HashSet::new()),
("b".to_string(), HashSet::new()),
("c".to_string(), HashSet::new()),
]));
assert_eq!(
graph.next(),
vec!["a".to_string(), "b".to_string(), "c".to_string()],
);
graph.finish_package("a");
assert!(graph.next().is_empty());
graph.finish_package("b");
assert!(graph.next().is_empty());
graph.finish_package("c");
assert!(graph.next().is_empty());
graph.ensure_no_pending().unwrap();
}
#[test]
fn test_graph_single_dep() {
let mut graph = build_graph(HashMap::from([
("a".to_string(), HashSet::from(["b".to_string()])),
("b".to_string(), HashSet::from(["c".to_string()])),
("c".to_string(), HashSet::new()),
]));
assert_eq!(graph.next(), vec!["c".to_string()]);
graph.finish_package("c");
assert_eq!(graph.next(), vec!["b".to_string()]);
graph.finish_package("b");
assert_eq!(graph.next(), vec!["a".to_string()]);
graph.finish_package("a");
assert!(graph.next().is_empty());
graph.ensure_no_pending().unwrap();
}
#[test]
fn test_graph_multiple_dep() {
let mut graph = build_graph(HashMap::from([
(
"a".to_string(),
HashSet::from(["b".to_string(), "c".to_string()]),
),
("b".to_string(), HashSet::from(["c".to_string()])),
("c".to_string(), HashSet::new()),
("d".to_string(), HashSet::new()),
("e".to_string(), HashSet::from(["f".to_string()])),
("f".to_string(), HashSet::new()),
]));
assert_eq!(
graph.next(),
vec!["c".to_string(), "f".to_string(), "d".to_string()]
);
graph.finish_package("f");
assert_eq!(graph.next(), vec!["e".to_string()]);
graph.finish_package("e");
assert!(graph.next().is_empty());
graph.finish_package("d");
assert!(graph.next().is_empty());
graph.finish_package("c");
assert_eq!(graph.next(), vec!["b".to_string()]);
graph.finish_package("b");
assert_eq!(graph.next(), vec!["a".to_string()]);
graph.finish_package("a");
assert!(graph.next().is_empty());
graph.ensure_no_pending().unwrap();
}
#[test]
fn test_graph_circular_dep() {
let mut graph = build_graph(HashMap::from([
("a".to_string(), HashSet::from(["b".to_string()])),
("b".to_string(), HashSet::from(["c".to_string()])),
("c".to_string(), HashSet::from(["a".to_string()])),
]));
assert!(graph.next().is_empty());
assert_eq!(
graph.ensure_no_pending().unwrap_err().to_string(),
"Circular package dependency detected: a -> b -> c -> a"
);
}
}

View file

@ -6,13 +6,13 @@ use deno_core::error::AnyError;
use deno_core::url::Url;
use hyper::body::Bytes;
use std::io::Write;
use std::path::PathBuf;
use std::path::Path;
use tar::Header;
use crate::util::import_map::ImportMapUnfurler;
pub fn create_gzipped_tarball(
dir: PathBuf,
dir: &Path,
// TODO(bartlomieju): this is too specific, factor it out into a callback that
// returns data
unfurler: ImportMapUnfurler,