1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
use super::*;
use openvet_common::rust::{CrateInfo, CrateMetadata, VersionInfo};

const CRATE_HANDLE_PARALLELISM: usize = 32;

// - for each crate, sync with data base
// - make API request to crates.io to find out publish date for versions
// - update crate version info (yanked, download count, publish date)
// - download crate and publish files
async fn handle_crate(client: StorageClient, krate: Arc<Crate>) -> Result<()> {
    let name = CrateName::new(krate.name())?;
    let info = client.crate_info(context::current(), name.clone()).await?;

    let mut crate_info = CrateInfo {
        metadata: CrateMetadata { name: name.clone() },
        versions: Default::default(),
    };

    for version in krate.versions() {
        info!("Handling {} v{}", version.name(), version.version());
        let version_info = VersionInfo {
            krate: name.clone(),
            version: version.version().parse()?,
            yanked: version.is_yanked(),
            checksum: version.checksum().into(),
        };

        crate_info
            .versions
            .insert(version_info.version.clone(), version_info);
    }

    client.crate_write(context::current(), crate_info).await?;

    Ok(())
}

pub async fn handle_crate_stream(
    client: StorageClient,
    receiver: Receiver<Arc<Crate>>,
) -> Result<()> {
    let stream = ReceiverStream::new(receiver);
    let mut results = stream
        .map(|krate| handle_crate(client.clone(), krate))
        .buffer_unordered(CRATE_HANDLE_PARALLELISM);

    while let Some(result) = results.next().await {
        result?;
    }

    Ok(())
}

impl Sync {
    pub async fn update_crates(&self) -> Result<()> {
        let mut index = self.index.clone().lock_owned().await;
        let channel = self.crate_channel.clone();
        spawn_blocking(move || {
            index.update()?;

            let mut seen_crates = BTreeSet::default();
            for krate in index.crates() {
                let krate = Arc::new(krate);
                let name = CrateName::new(krate.name())?;
                if !seen_crates.insert(name) {
                    error!("Crate {} appeared twice", krate.name());
                }
                channel.blocking_send(krate)?;
            }

            Ok(seen_crates) as Result<BTreeSet<CrateName>>
        })
        .await??;
        Ok(())
    }
}