use anyhow::Result;
use crates_index::{Crate, GitIndex};
use futures::stream::{StreamExt, TryStreamExt};
use openvet_common::{
rust::{CrateName, VersionInfo},
storage::StorageClient,
};
use reqwest::Client;
use std::{collections::BTreeSet, net::SocketAddr, path::Path, sync::Arc};
use tarpc::{client::Config, context, tokio_serde::formats::Json};
use tokio::{
sync::{
mpsc::{channel, Receiver, Sender},
Mutex,
},
task::{spawn_blocking, JoinHandle},
};
use tokio_stream::wrappers::ReceiverStream;
use tracing::{error, info};
use url::Url;
const CRATE_VERSION_CHANNEL_LIMIT: usize = 1024;
pub mod download_sources;
pub mod dump;
pub mod update_crates;
pub struct Sync {
client: Client,
index: Arc<Mutex<GitIndex>>,
crate_channel: Sender<Arc<Crate>>,
crate_handler: JoinHandle<Result<()>>,
storage: StorageClient,
}
impl Sync {
pub async fn new(path: &Path, url: &Url, storage: SocketAddr) -> Result<Self> {
let mut transport = tarpc::serde_transport::tcp::connect(storage, Json::default);
transport.config_mut().max_frame_length(usize::MAX);
let storage = StorageClient::new(Config::default(), transport.await?).spawn();
let handshake = storage
.ping(context::current(), "Hello".to_string())
.await?;
let index = GitIndex::with_path(path, url.to_string())?;
let (sender, receiver) = channel(CRATE_VERSION_CHANNEL_LIMIT);
let handle = tokio::spawn(update_crates::handle_crate_stream(
storage.clone(),
receiver,
));
Ok(Self {
client: Client::default(),
index: Arc::new(Mutex::new(index)),
crate_channel: sender,
crate_handler: handle,
storage,
})
}
pub async fn launch(&self) -> Result<()> {
self.update_crates().await?;
Ok(())
}
pub async fn shutdown(self) -> Result<()> {
let Self {
index,
crate_channel,
crate_handler,
storage,
client,
} = self;
drop(index);
drop(crate_channel);
drop(storage);
drop(client);
crate_handler.await?;
Ok(())
}
}