aboutsummaryrefslogtreecommitdiff
path: root/src/server/storage.rs
diff options
context:
space:
mode:
authorevuez <julien@mulga.net>2024-04-03 22:43:16 +0200
committerevuez <julien@mulga.net>2024-04-03 22:43:16 +0200
commit43e1a12b5bce11b4a28a53acca243e35c2be6d3e (patch)
tree07d64823718bfee063ab7b3d5721ac1e950ae17c /src/server/storage.rs
downloadcarton-43e1a12b5bce11b4a28a53acca243e35c2be6d3e.tar.gz
Initial commit
Diffstat (limited to 'src/server/storage.rs')
-rw-r--r--src/server/storage.rs112
1 files changed, 112 insertions, 0 deletions
diff --git a/src/server/storage.rs b/src/server/storage.rs
new file mode 100644
index 0000000..2b81ce1
--- /dev/null
+++ b/src/server/storage.rs
@@ -0,0 +1,112 @@
+pub mod backend;
+mod record;
+
+use self::{
+ backend::StorageBackend,
+ record::{Anchor, Change, Patch},
+};
+use super::{attrs::Attr, blobref::BlobRef, object::Chunk};
+use crate::common::{hash, json};
+use fastcdc::v2020::StreamCDC;
+use serde::{Deserialize, Serialize};
+use std::io::Read;
+
+#[derive(Debug)]
+pub enum Error {
+ Io,
+ InvalidHash(hash::Error),
+}
+
+type Result<T> = std::result::Result<T, Error>;
+
+type Backend = backend::disk::Disk;
+
+const ANCHOR_BUCKET: &str = "anchor";
+const PATCH_BUCKET: &str = "patch";
+const PLAIN_BUCKET: &str = "plain";
+
+pub struct Storage {
+ anchors: Backend,
+ patches: Backend,
+ plains: Backend,
+}
+
+#[derive(Serialize, Deserialize)]
+pub struct Config {
+ pub backend: <Backend as StorageBackend>::Config,
+}
+
+impl Storage {
+ pub fn new(config: &Config) -> Self {
+ let backend_factory = backend::factory(&config.backend);
+
+ let anchors = backend_factory(ANCHOR_BUCKET);
+ let patches = backend_factory(PATCH_BUCKET);
+ let plains = backend_factory(PLAIN_BUCKET);
+
+ Self {
+ anchors,
+ patches,
+ plains,
+ }
+ }
+
+ pub fn get(&self, blobref: &BlobRef) -> Result<Vec<u8>> {
+ self.plains.get(blobref).and_then(|x| x.ok_or(Error::Io))
+ }
+
+ pub fn put<T: Read>(&self, data: T) -> Result<(BlobRef, Vec<Chunk>)> {
+ let anchor = self.anchors.put(&json::serialize(Anchor::new())).unwrap();
+
+ self.extend(anchor, data)
+ }
+
+ pub fn extend<T: Read>(&self, anchor: BlobRef, data: T) -> Result<(BlobRef, Vec<Chunk>)> {
+ let chunker = StreamCDC::new(data, 4096, 4_194_304, 16_777_216);
+ let mut chunks = Vec::new();
+
+ for chunk in chunker {
+ let chunk = chunk.map_err(|_| Error::Io)?;
+ let chunk_blobref = self.plains.put(&chunk.data).unwrap();
+
+ let mut patch = Patch::new(anchor.clone());
+
+ patch.add_change(Change::AddChunk {
+ blobref: chunk_blobref.clone(),
+ offset: chunk.offset,
+ size: chunk.length,
+ });
+ chunks.push(Chunk {
+ blobref: chunk_blobref,
+ offset: chunk.offset as usize,
+ size: chunk.length,
+ });
+
+ self.patches.put(&json::serialize(patch)).unwrap();
+ }
+
+ Ok((anchor, chunks))
+ }
+
+ pub fn set_attr(&self, anchor: BlobRef, attr: Attr) -> Result<BlobRef> {
+ let mut patch = Patch::new(anchor);
+ patch.add_change(Change::SetAttr(attr));
+ self.patches.put(&json::serialize(patch))
+ }
+
+ pub fn set_attrs(&self, anchor: BlobRef, attrs: &[Attr]) -> Result<BlobRef> {
+ let mut patch = Patch::new(anchor);
+
+ for attr in attrs {
+ patch.add_change(Change::SetAttr(attr.clone()));
+ }
+
+ self.patches.put(&json::serialize(patch))
+ }
+
+ // pub fn get_anchor(&self, blobref: &BlobRef) -> Result<Option<Anchor>> {}
+
+ // pub fn get_patch(&self, blobref: &BlobRef) -> Result<Option<Patch>> {}
+
+ // pub fn get_plain(&self, blobref: &BlobRef) -> Result<Option<Vec<u8>>> {}
+}