use super::*;
use openvet_common::rust::{CrateInfo, CrateMetadata, VersionInfo};
const CRATE_HANDLE_PARALLELISM: usize = 32;
async fn handle_crate(client: StorageClient, krate: Arc<Crate>) -> Result<()> {
let name = CrateName::new(krate.name())?;
let info = client.crate_info(context::current(), name.clone()).await?;
let mut crate_info = CrateInfo {
metadata: CrateMetadata { name: name.clone() },
versions: Default::default(),
};
for version in krate.versions() {
info!("Handling {} v{}", version.name(), version.version());
let version_info = VersionInfo {
krate: name.clone(),
version: version.version().parse()?,
yanked: version.is_yanked(),
checksum: version.checksum().into(),
};
crate_info
.versions
.insert(version_info.version.clone(), version_info);
}
client.crate_write(context::current(), crate_info).await?;
Ok(())
}
pub async fn handle_crate_stream(
client: StorageClient,
receiver: Receiver<Arc<Crate>>,
) -> Result<()> {
let stream = ReceiverStream::new(receiver);
let mut results = stream
.map(|krate| handle_crate(client.clone(), krate))
.buffer_unordered(CRATE_HANDLE_PARALLELISM);
while let Some(result) = results.next().await {
result?;
}
Ok(())
}
impl Sync {
pub async fn update_crates(&self) -> Result<()> {
let mut index = self.index.clone().lock_owned().await;
let channel = self.crate_channel.clone();
spawn_blocking(move || {
index.update()?;
let mut seen_crates = BTreeSet::default();
for krate in index.crates() {
let krate = Arc::new(krate);
let name = CrateName::new(krate.name())?;
if !seen_crates.insert(name) {
error!("Crate {} appeared twice", krate.name());
}
channel.blocking_send(krate)?;
}
Ok(seen_crates) as Result<BTreeSet<CrateName>>
})
.await??;
Ok(())
}
}