use crate::{database::Database, handler::StorageHandler, objects::Objects};
use anyhow::Result;
use futures::{future, prelude::*};
use openvet_common::storage::Storage;
use std::{net::SocketAddr, path::Path};
use tarpc::{
server::{self, incoming::Incoming, Channel},
tokio_serde::formats::Json,
};
use tracing::*;
#[derive(Clone, Debug)]
pub struct StorageService {
database: Database,
objects: Objects,
}
async fn spawn(fut: impl Future<Output = ()> + Send + 'static) {
tokio::spawn(fut);
}
impl StorageService {
pub async fn new(path: &Path) -> Result<Self> {
let database = Database::open(path).await?;
let objects = Objects::memory().await?;
Ok(Self { database, objects })
}
pub async fn memory() -> Result<Self> {
let database = Database::memory().await?;
let objects = Objects::memory().await?;
Ok(Self { database, objects })
}
pub async fn listen(&self, socket: SocketAddr) -> Result<()> {
let mut listener = tarpc::serde_transport::tcp::listen(&socket, Json::default).await?;
info!("Listening on {}", listener.local_addr());
listener.config_mut().max_frame_length(usize::MAX);
let handler = StorageHandler::new(self.clone());
listener
.filter_map(|r| future::ready(r.ok()))
.map(server::BaseChannel::with_defaults)
.max_channels_per_key(4, |t| t.transport().peer_addr().unwrap().ip())
.map(|channel| channel.execute(handler.clone().serve()).for_each(spawn))
.buffer_unordered(10)
.for_each(|_| async {})
.await;
Ok(())
}
pub fn database(&self) -> &Database {
&self.database
}
pub fn objects(&self) -> &Objects {
&self.objects
}
}