aboutsummaryrefslogtreecommitdiff
path: root/src/server.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/server.rs')
-rw-r--r--src/server.rs139
1 files changed, 139 insertions, 0 deletions
diff --git a/src/server.rs b/src/server.rs
new file mode 100644
index 0000000..72f848a
--- /dev/null
+++ b/src/server.rs
@@ -0,0 +1,139 @@
+pub mod attrs;
+pub mod blobref;
+mod index;
+mod object;
+mod storage;
+
+use self::{attrs::Attr, blobref::BlobRef, index::Index, object::Object, storage::Storage};
+use log::{error, warn};
+use serde::{Deserialize, Serialize};
+use std::{cmp, io::{Read, BufReader}, ops::Range, fs::File};
+
+#[derive(Debug)]
+pub enum Error {
+ Storage,
+ Index,
+}
+
+type Result<T> = std::result::Result<T, Error>;
+
+#[derive(Serialize, Deserialize)]
+pub struct Config {
+ storage: storage::Config,
+ index: index::Config,
+}
+
+pub struct Server {
+ storage: Storage,
+ index: Index,
+}
+
+pub struct Stream;
+
+impl Server {
+ pub fn new(config: &Config) -> Result<Self> {
+ let index = Index::new(&config.index).map_err(|_| Error::Index)?;
+ let storage = Storage::new(&config.storage);
+
+ Ok(Self { storage, index })
+ }
+
+ pub fn read(&self, blobref: &BlobRef, range: Range<usize>) -> Result<Vec<u8>> {
+ let object = self.index.get(blobref).map_err(|_| Error::Index)?;
+
+ let mut bytes: Vec<u8> = Vec::with_capacity(range.len());
+ let mut initial_chunk_offset = None;
+
+ for chunk in &object.chunks {
+ let chunk_end = chunk.offset + chunk.size;
+
+ if chunk_end < range.start {
+ continue;
+ }
+ if chunk.offset > range.end {
+ break;
+ }
+
+ if initial_chunk_offset.is_none() {
+ initial_chunk_offset = Some(chunk.offset);
+ }
+
+ let chunk_bytes = self
+ .storage
+ .get(&chunk.blobref)
+ .map_err(|_| Error::Storage)?;
+ bytes.extend_from_slice(&chunk_bytes);
+ }
+
+ let start = range.start - initial_chunk_offset.unwrap_or(0);
+ let end = cmp::min(start + range.len(), bytes.len());
+ Ok(bytes[start..end].to_vec())
+ }
+
+ pub fn copy(&self, data: &BufReader<File>) -> Result<BlobRef> {
+ // From fs: a. Write to temp file (via BufWriter?), then on flush
+ // create a BufReader for temp file and pass it to this
+ // method.
+ // b. When writing to an existing file, first copy the data
+ // in a temp file, then proceed with a.
+ Err(Error::Storage)
+ }
+
+ pub fn put<T: Read>(&self, data: T) -> Result<BlobRef> {
+ let (anchor, chunks) = self.storage.put(data).map_err(|e| {
+ error!("storage error: {e:?}");
+ Error::Storage
+ })?;
+
+ self.index
+ .put(&anchor, &chunks)
+ .map_err(|e| {
+ warn!("index error: {e:?}");
+ })
+ .ok();
+
+ Ok(anchor)
+ }
+
+ pub fn put_with_attrs<T: Read>(&mut self, data: T, attrs: &[Attr]) -> Result<BlobRef> {
+ let anchor = self.put(data)?;
+ self.set_attrs(&anchor, attrs)?;
+
+ Ok(anchor)
+ }
+
+ fn extend<T: Read>(&self, anchor: BlobRef, data: T) -> Result<BlobRef> {
+ self.storage
+ .extend(anchor, data)
+ .map_err(|e| {
+ error!("storage error: {e:?}");
+ Error::Storage
+ })
+ .map(|r| r.0)
+ // TODO self.index.extend
+ }
+
+ pub fn set_attrs(&mut self, anchor: &BlobRef, attrs: &[Attr]) -> Result<BlobRef> {
+ let patch = self.storage.set_attrs(anchor.clone(), attrs).map_err(|e| {
+ error!("storage error: {e:?}");
+ Error::Storage
+ })?;
+
+ self.index
+ .set_attrs(anchor, attrs)
+ .map_err(|e| {
+ warn!("index error: {e:?}");
+ })
+ .ok();
+
+ Ok(patch)
+ }
+
+ pub fn list(&self, range: Range<usize>, filter: Option<Attr>) -> Result<Vec<Object>> {
+ let limit = range.len();
+
+ self.index
+ .list(limit, range.start, filter)
+ .map_err(|_| Error::Index)
+ }
+}