diff options
author | evuez <julien@mulga.net> | 2024-04-03 22:43:16 +0200 |
---|---|---|
committer | evuez <julien@mulga.net> | 2024-04-03 22:43:16 +0200 |
commit | 43e1a12b5bce11b4a28a53acca243e35c2be6d3e (patch) | |
tree | 07d64823718bfee063ab7b3d5721ac1e950ae17c /src/server.rs | |
download | carton-43e1a12b5bce11b4a28a53acca243e35c2be6d3e.tar.gz |
Initial commit
Diffstat (limited to 'src/server.rs')
-rw-r--r-- | src/server.rs | 139 |
1 files changed, 139 insertions, 0 deletions
diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..72f848a --- /dev/null +++ b/src/server.rs @@ -0,0 +1,139 @@ +pub mod attrs; +pub mod blobref; +mod index; +mod object; +mod storage; + +use self::{attrs::Attr, blobref::BlobRef, index::Index, object::Object, storage::Storage}; +use log::{error, warn}; +use serde::{Deserialize, Serialize}; +use std::{cmp, io::{Read, BufReader}, ops::Range, fs::File}; + +#[derive(Debug)] +pub enum Error { + Storage, + Index, +} + +type Result<T> = std::result::Result<T, Error>; + +#[derive(Serialize, Deserialize)] +pub struct Config { + storage: storage::Config, + index: index::Config, +} + +pub struct Server { + storage: Storage, + index: Index, +} + +pub struct Stream; + +impl Server { + pub fn new(config: &Config) -> Result<Self> { + let index = Index::new(&config.index).map_err(|_| Error::Index)?; + let storage = Storage::new(&config.storage); + + Ok(Self { storage, index }) + } + + pub fn read(&self, blobref: &BlobRef, range: Range<usize>) -> Result<Vec<u8>> { + let object = self.index.get(blobref).map_err(|_| Error::Index)?; + + let mut bytes: Vec<u8> = Vec::with_capacity(range.len()); + let mut initial_chunk_offset = None; + + for chunk in &object.chunks { + let chunk_end = chunk.offset + chunk.size; + + if chunk_end < range.start { + continue; + } + if chunk.offset > range.end { + break; + } + + if initial_chunk_offset.is_none() { + initial_chunk_offset = Some(chunk.offset); + } + + let chunk_bytes = self + .storage + .get(&chunk.blobref) + .map_err(|_| Error::Storage)?; + bytes.extend_from_slice(&chunk_bytes); + } + + let start = range.start - initial_chunk_offset.unwrap_or(0); + let end = cmp::min(start + range.len(), bytes.len()); + Ok(bytes[start..end].to_vec()) + } + + pub fn copy(&self, data: &BufReader<File>) -> Result<BlobRef> { + // From fs: a. Write to temp file (via BufWriter?), then on flush + // create a BufReader for temp file and pass it to this + // method. + // b. When writing to an existing file, first copy the data + // in a temp file, then proceed with a. + Err(Error::Storage) + } + + pub fn put<T: Read>(&self, data: T) -> Result<BlobRef> { + let (anchor, chunks) = self.storage.put(data).map_err(|e| { + error!("storage error: {e:?}"); + Error::Storage + })?; + + self.index + .put(&anchor, &chunks) + .map_err(|e| { + warn!("index error: {e:?}"); + }) + .ok(); + + Ok(anchor) + } + + pub fn put_with_attrs<T: Read>(&mut self, data: T, attrs: &[Attr]) -> Result<BlobRef> { + let anchor = self.put(data)?; + self.set_attrs(&anchor, attrs)?; + + Ok(anchor) + } + + fn extend<T: Read>(&self, anchor: BlobRef, data: T) -> Result<BlobRef> { + self.storage + .extend(anchor, data) + .map_err(|e| { + error!("storage error: {e:?}"); + Error::Storage + }) + .map(|r| r.0) + // TODO self.index.extend + } + + pub fn set_attrs(&mut self, anchor: &BlobRef, attrs: &[Attr]) -> Result<BlobRef> { + let patch = self.storage.set_attrs(anchor.clone(), attrs).map_err(|e| { + error!("storage error: {e:?}"); + Error::Storage + })?; + + self.index + .set_attrs(anchor, attrs) + .map_err(|e| { + warn!("index error: {e:?}"); + }) + .ok(); + + Ok(patch) + } + + pub fn list(&self, range: Range<usize>, filter: Option<Attr>) -> Result<Vec<Object>> { + let limit = range.len(); + + self.index + .list(limit, range.start, filter) + .map_err(|_| Error::Index) + } +} |