From 43e1a12b5bce11b4a28a53acca243e35c2be6d3e Mon Sep 17 00:00:00 2001 From: evuez Date: Wed, 3 Apr 2024 22:43:16 +0200 Subject: Initial commit --- src/server.rs | 139 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 139 insertions(+) create mode 100644 src/server.rs (limited to 'src/server.rs') diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..72f848a --- /dev/null +++ b/src/server.rs @@ -0,0 +1,139 @@ +pub mod attrs; +pub mod blobref; +mod index; +mod object; +mod storage; + +use self::{attrs::Attr, blobref::BlobRef, index::Index, object::Object, storage::Storage}; +use log::{error, warn}; +use serde::{Deserialize, Serialize}; +use std::{cmp, io::{Read, BufReader}, ops::Range, fs::File}; + +#[derive(Debug)] +pub enum Error { + Storage, + Index, +} + +type Result = std::result::Result; + +#[derive(Serialize, Deserialize)] +pub struct Config { + storage: storage::Config, + index: index::Config, +} + +pub struct Server { + storage: Storage, + index: Index, +} + +pub struct Stream; + +impl Server { + pub fn new(config: &Config) -> Result { + let index = Index::new(&config.index).map_err(|_| Error::Index)?; + let storage = Storage::new(&config.storage); + + Ok(Self { storage, index }) + } + + pub fn read(&self, blobref: &BlobRef, range: Range) -> Result> { + let object = self.index.get(blobref).map_err(|_| Error::Index)?; + + let mut bytes: Vec = Vec::with_capacity(range.len()); + let mut initial_chunk_offset = None; + + for chunk in &object.chunks { + let chunk_end = chunk.offset + chunk.size; + + if chunk_end < range.start { + continue; + } + if chunk.offset > range.end { + break; + } + + if initial_chunk_offset.is_none() { + initial_chunk_offset = Some(chunk.offset); + } + + let chunk_bytes = self + .storage + .get(&chunk.blobref) + .map_err(|_| Error::Storage)?; + bytes.extend_from_slice(&chunk_bytes); + } + + let start = range.start - initial_chunk_offset.unwrap_or(0); + let end = cmp::min(start + range.len(), bytes.len()); + Ok(bytes[start..end].to_vec()) + } + + pub fn copy(&self, data: &BufReader) -> Result { + // From fs: a. Write to temp file (via BufWriter?), then on flush + // create a BufReader for temp file and pass it to this + // method. + // b. When writing to an existing file, first copy the data + // in a temp file, then proceed with a. + Err(Error::Storage) + } + + pub fn put(&self, data: T) -> Result { + let (anchor, chunks) = self.storage.put(data).map_err(|e| { + error!("storage error: {e:?}"); + Error::Storage + })?; + + self.index + .put(&anchor, &chunks) + .map_err(|e| { + warn!("index error: {e:?}"); + }) + .ok(); + + Ok(anchor) + } + + pub fn put_with_attrs(&mut self, data: T, attrs: &[Attr]) -> Result { + let anchor = self.put(data)?; + self.set_attrs(&anchor, attrs)?; + + Ok(anchor) + } + + fn extend(&self, anchor: BlobRef, data: T) -> Result { + self.storage + .extend(anchor, data) + .map_err(|e| { + error!("storage error: {e:?}"); + Error::Storage + }) + .map(|r| r.0) + // TODO self.index.extend + } + + pub fn set_attrs(&mut self, anchor: &BlobRef, attrs: &[Attr]) -> Result { + let patch = self.storage.set_attrs(anchor.clone(), attrs).map_err(|e| { + error!("storage error: {e:?}"); + Error::Storage + })?; + + self.index + .set_attrs(anchor, attrs) + .map_err(|e| { + warn!("index error: {e:?}"); + }) + .ok(); + + Ok(patch) + } + + pub fn list(&self, range: Range, filter: Option) -> Result> { + let limit = range.len(); + + self.index + .list(limit, range.start, filter) + .map_err(|_| Error::Index) + } +} -- cgit v1.2.3