use super::Sync;
use anyhow::Result;
use bytes::Bytes;
use flate2::bufread::GzDecoder;
use futures::StreamExt;
use openvet_common::{
rust::{Checksum, CrateVersion},
tree::{Kind, Node},
};
use std::{
collections::BTreeMap,
io::{BufRead, Read},
};
use tar::{Archive, EntryType};
use tarpc::context;
use tracing::*;
type Objects = BTreeMap<Checksum, Bytes>;
impl Sync {
pub async fn download_sources(&self) -> Result<()> {
self.download_sources_batch(1024).await?;
Ok(())
}
pub async fn download_sources_batch(&self, count: usize) -> Result<usize> {
let crate_versions = self
.storage
.sources_missing(context::current(), count)
.await?;
let count = crate_versions.len();
info!("Downloading batch of {count} sources");
futures::stream::iter(crate_versions.into_iter())
.map(|crate_version| self.download_crate_version(crate_version))
.buffer_unordered(32)
.for_each(|result| async {
if let Err(error) = result {
error!("error fetching crate: {error}");
}
})
.await;
Ok(count)
}
#[instrument(skip(self))]
async fn download_crate_version(&self, crate_version: CrateVersion) -> Result<()> {
info!("Downloading crate version");
let name = &crate_version.krate;
let version = &crate_version.version;
let url = format!("https://static.crates.io/crates/{name}/{name}-{version}.crate");
let bytes = self.client.get(&url).send().await?.bytes().await?;
let (node, objects) = parse_archive(&bytes[..])?;
futures::stream::iter(objects.into_iter())
.map(|(checksum, bytes)| self.upload_object(checksum, bytes))
.buffer_unordered(4)
.for_each(|result| async {
if let Err(error) = result {
error!("error fetching crate: {error}");
}
})
.await;
self.storage
.crate_tree_write(context::current(), crate_version, node)
.await?;
Ok(())
}
async fn upload_object(&self, checksum: Checksum, bytes: Bytes) -> Result<()> {
info!("Uploading object {checksum}");
if !self
.storage
.object_exists(context::current(), checksum)
.await?
{
let written = self.storage.object_write(context::current(), bytes).await?;
if written != checksum {
anyhow::bail!("mismatched checksum: expected {checksum} got {written}");
}
}
Ok(())
}
}
fn parse_archive(data: impl BufRead) -> Result<(Node, Objects)> {
let mut data = GzDecoder::new(data);
let mut archive = Archive::new(&mut data);
let mut root = Node::root();
let mut objects = Objects::default();
for entry in archive.entries()? {
let mut entry = entry?;
let path = entry.path()?.to_path_buf();
let header = entry.header().clone();
let entry_type = header.entry_type();
let parent = root.mkdir(path.parent().unwrap())?;
let kind = match entry_type {
EntryType::Regular => {
let mut data = vec![];
entry.read_to_end(&mut data)?;
let checksum = Checksum::sha2_256(&data);
objects.insert(checksum, data.into());
Kind::File(checksum)
}
EntryType::Link | EntryType::Symlink => {
Kind::Link(entry.link_name()?.unwrap().to_path_buf())
}
EntryType::Directory => Kind::Directory(Default::default()),
_ => todo!(),
};
let node = Node {
name: path.file_name().unwrap().into(),
mode: header.mode()?,
uid: header.uid().unwrap_or(0),
gid: header.gid().unwrap_or(0),
mtime: header.mtime()?,
kind,
};
parent.insert(node)?;
}
Ok((root, objects))
}
#[cfg(test)]
mod tests {
use super::*;
use insta::assert_debug_snapshot;
use std::{collections::BTreeSet, fs::File, io::BufReader};
#[test]
fn can_decompress_serde_1_0_209() {
let mut reader = BufReader::new(File::open("tests/serde-1.0.209.crate").unwrap());
let (node, objects) = parse_archive(&mut reader).unwrap();
let keys = objects.keys().cloned().collect::<BTreeSet<_>>();
assert_debug_snapshot!((node, keys));
}
#[test]
fn can_decompress_wireguard_keys_0_1_0() {
let mut reader = BufReader::new(File::open("tests/wireguard-keys-0.1.0.crate").unwrap());
let (node, objects) = parse_archive(&mut reader).unwrap();
let keys = objects.keys().cloned().collect::<BTreeSet<_>>();
assert_debug_snapshot!((node, keys));
}
#[test]
fn can_decompress_wireguard_keys_0_1_1() {
let mut reader = BufReader::new(File::open("tests/wireguard-keys-0.1.1.crate").unwrap());
let (node, objects) = parse_archive(&mut reader).unwrap();
let keys = objects.keys().cloned().collect::<BTreeSet<_>>();
assert_debug_snapshot!((node, keys));
}
#[test]
fn can_decompress_axum_0_1_0() {
let mut reader = BufReader::new(File::open("tests/axum-0.1.0.crate").unwrap());
let (node, objects) = parse_archive(&mut reader).unwrap();
let keys = objects.keys().cloned().collect::<BTreeSet<_>>();
assert_debug_snapshot!((node, keys));
}
}