diff options
Diffstat (limited to 'src/server/storage.rs')
-rw-r--r-- | src/server/storage.rs | 112 |
1 files changed, 112 insertions, 0 deletions
diff --git a/src/server/storage.rs b/src/server/storage.rs new file mode 100644 index 0000000..2b81ce1 --- /dev/null +++ b/src/server/storage.rs @@ -0,0 +1,112 @@ +pub mod backend; +mod record; + +use self::{ + backend::StorageBackend, + record::{Anchor, Change, Patch}, +}; +use super::{attrs::Attr, blobref::BlobRef, object::Chunk}; +use crate::common::{hash, json}; +use fastcdc::v2020::StreamCDC; +use serde::{Deserialize, Serialize}; +use std::io::Read; + +#[derive(Debug)] +pub enum Error { + Io, + InvalidHash(hash::Error), +} + +type Result<T> = std::result::Result<T, Error>; + +type Backend = backend::disk::Disk; + +const ANCHOR_BUCKET: &str = "anchor"; +const PATCH_BUCKET: &str = "patch"; +const PLAIN_BUCKET: &str = "plain"; + +pub struct Storage { + anchors: Backend, + patches: Backend, + plains: Backend, +} + +#[derive(Serialize, Deserialize)] +pub struct Config { + pub backend: <Backend as StorageBackend>::Config, +} + +impl Storage { + pub fn new(config: &Config) -> Self { + let backend_factory = backend::factory(&config.backend); + + let anchors = backend_factory(ANCHOR_BUCKET); + let patches = backend_factory(PATCH_BUCKET); + let plains = backend_factory(PLAIN_BUCKET); + + Self { + anchors, + patches, + plains, + } + } + + pub fn get(&self, blobref: &BlobRef) -> Result<Vec<u8>> { + self.plains.get(blobref).and_then(|x| x.ok_or(Error::Io)) + } + + pub fn put<T: Read>(&self, data: T) -> Result<(BlobRef, Vec<Chunk>)> { + let anchor = self.anchors.put(&json::serialize(Anchor::new())).unwrap(); + + self.extend(anchor, data) + } + + pub fn extend<T: Read>(&self, anchor: BlobRef, data: T) -> Result<(BlobRef, Vec<Chunk>)> { + let chunker = StreamCDC::new(data, 4096, 4_194_304, 16_777_216); + let mut chunks = Vec::new(); + + for chunk in chunker { + let chunk = chunk.map_err(|_| Error::Io)?; + let chunk_blobref = self.plains.put(&chunk.data).unwrap(); + + let mut patch = Patch::new(anchor.clone()); + + patch.add_change(Change::AddChunk { + blobref: chunk_blobref.clone(), + offset: chunk.offset, + size: chunk.length, + }); + chunks.push(Chunk { + blobref: chunk_blobref, + offset: chunk.offset as usize, + size: chunk.length, + }); + + self.patches.put(&json::serialize(patch)).unwrap(); + } + + Ok((anchor, chunks)) + } + + pub fn set_attr(&self, anchor: BlobRef, attr: Attr) -> Result<BlobRef> { + let mut patch = Patch::new(anchor); + patch.add_change(Change::SetAttr(attr)); + self.patches.put(&json::serialize(patch)) + } + + pub fn set_attrs(&self, anchor: BlobRef, attrs: &[Attr]) -> Result<BlobRef> { + let mut patch = Patch::new(anchor); + + for attr in attrs { + patch.add_change(Change::SetAttr(attr.clone())); + } + + self.patches.put(&json::serialize(patch)) + } + + // pub fn get_anchor(&self, blobref: &BlobRef) -> Result<Option<Anchor>> {} + + // pub fn get_patch(&self, blobref: &BlobRef) -> Result<Option<Patch>> {} + + // pub fn get_plain(&self, blobref: &BlobRef) -> Result<Option<Vec<u8>>> {} +} |