1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
//! # OpenVet Sync
//!
//! This module is responsible for synchronizing the crate index of the registry
//! (generally <https://crates.io>) with the OpenVet storage system. It does so
//! by polling the crates Git index.

use anyhow::Result;
use crates_index::{Crate, GitIndex};
use futures::stream::{StreamExt, TryStreamExt};
use openvet_common::{
    rust::{CrateName, VersionInfo},
    storage::StorageClient,
};
use reqwest::Client;
use std::{collections::BTreeSet, net::SocketAddr, path::Path, sync::Arc};
use tarpc::{client::Config, context, tokio_serde::formats::Json};
use tokio::{
    sync::{
        mpsc::{channel, Receiver, Sender},
        Mutex,
    },
    task::{spawn_blocking, JoinHandle},
};
use tokio_stream::wrappers::ReceiverStream;
use tracing::{error, info};
use url::Url;

const CRATE_VERSION_CHANNEL_LIMIT: usize = 1024;

pub mod download_sources;
pub mod dump;
pub mod update_crates;

pub struct Sync {
    client: Client,
    index: Arc<Mutex<GitIndex>>,
    crate_channel: Sender<Arc<Crate>>,
    crate_handler: JoinHandle<Result<()>>,
    storage: StorageClient,
}

impl Sync {
    pub async fn new(path: &Path, url: &Url, storage: SocketAddr) -> Result<Self> {
        // connect to storage
        let mut transport = tarpc::serde_transport::tcp::connect(storage, Json::default);
        transport.config_mut().max_frame_length(usize::MAX);
        let storage = StorageClient::new(Config::default(), transport.await?).spawn();

        let handshake = storage
            .ping(context::current(), "Hello".to_string())
            .await?;

        // setup crates index
        let index = GitIndex::with_path(path, url.to_string())?;
        let (sender, receiver) = channel(CRATE_VERSION_CHANNEL_LIMIT);
        let handle = tokio::spawn(update_crates::handle_crate_stream(
            storage.clone(),
            receiver,
        ));

        Ok(Self {
            client: Client::default(),
            index: Arc::new(Mutex::new(index)),
            crate_channel: sender,
            crate_handler: handle,
            storage,
        })
    }

    pub async fn launch(&self) -> Result<()> {
        self.update_crates().await?;

        Ok(())
    }

    pub async fn shutdown(self) -> Result<()> {
        let Self {
            index,
            crate_channel,
            crate_handler,
            storage,
            client,
        } = self;
        drop(index);
        drop(crate_channel);
        drop(storage);
        drop(client);
        crate_handler.await?;
        Ok(())
    }
}