aboutsummaryrefslogtreecommitdiff
path: root/src/server/storage.rs
blob: 2b81ce10fb1b2eac8599bc7d320d5197a39cdf6b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
pub mod backend;
mod record;

use self::{
    backend::StorageBackend,
    record::{Anchor, Change, Patch},
};
use super::{attrs::Attr, blobref::BlobRef, object::Chunk};
use crate::common::{hash, json};
use fastcdc::v2020::StreamCDC;
use serde::{Deserialize, Serialize};
use std::io::Read;

#[derive(Debug)]
pub enum Error {
    Io,
    InvalidHash(hash::Error),
}

type Result<T> = std::result::Result<T, Error>;

type Backend = backend::disk::Disk;

const ANCHOR_BUCKET: &str = "anchor";
const PATCH_BUCKET: &str = "patch";
const PLAIN_BUCKET: &str = "plain";

pub struct Storage {
    anchors: Backend,
    patches: Backend,
    plains: Backend,
}

#[derive(Serialize, Deserialize)]
pub struct Config {
    pub backend: <Backend as StorageBackend>::Config,
}

impl Storage {
    pub fn new(config: &Config) -> Self {
        let backend_factory = backend::factory(&config.backend);

        let anchors = backend_factory(ANCHOR_BUCKET);
        let patches = backend_factory(PATCH_BUCKET);
        let plains = backend_factory(PLAIN_BUCKET);

        Self {
            anchors,
            patches,
            plains,
        }
    }

    pub fn get(&self, blobref: &BlobRef) -> Result<Vec<u8>> {
        self.plains.get(blobref).and_then(|x| x.ok_or(Error::Io))
    }

    pub fn put<T: Read>(&self, data: T) -> Result<(BlobRef, Vec<Chunk>)> {
        let anchor = self.anchors.put(&json::serialize(Anchor::new())).unwrap();

        self.extend(anchor, data)
    }

    pub fn extend<T: Read>(&self, anchor: BlobRef, data: T) -> Result<(BlobRef, Vec<Chunk>)> {
        let chunker = StreamCDC::new(data, 4096, 4_194_304, 16_777_216);
        let mut chunks = Vec::new();

        for chunk in chunker {
            let chunk = chunk.map_err(|_| Error::Io)?;
            let chunk_blobref = self.plains.put(&chunk.data).unwrap();

            let mut patch = Patch::new(anchor.clone());

            patch.add_change(Change::AddChunk {
                blobref: chunk_blobref.clone(),
                offset: chunk.offset,
                size: chunk.length,
            });
            chunks.push(Chunk {
                blobref: chunk_blobref,
                offset: chunk.offset as usize,
                size: chunk.length,
            });

            self.patches.put(&json::serialize(patch)).unwrap();
        }

        Ok((anchor, chunks))
    }

    pub fn set_attr(&self, anchor: BlobRef, attr: Attr) -> Result<BlobRef> {
        let mut patch = Patch::new(anchor);
        patch.add_change(Change::SetAttr(attr));
        self.patches.put(&json::serialize(patch))
    }

    pub fn set_attrs(&self, anchor: BlobRef, attrs: &[Attr]) -> Result<BlobRef> {
        let mut patch = Patch::new(anchor);

        for attr in attrs {
            patch.add_change(Change::SetAttr(attr.clone()));
        }

        self.patches.put(&json::serialize(patch))
    }

    // pub fn get_anchor(&self, blobref: &BlobRef) -> Result<Option<Anchor>> {}

    // pub fn get_patch(&self, blobref: &BlobRef) -> Result<Option<Patch>> {}

    // pub fn get_plain(&self, blobref: &BlobRef) -> Result<Option<Vec<u8>>> {}
}