1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
|
pub mod backend;
mod record;
use self::{
backend::StorageBackend,
record::{Anchor, Change, Patch},
};
use super::{attrs::Attr, blobref::BlobRef, object::Chunk};
use crate::common::{hash, json};
use fastcdc::v2020::StreamCDC;
use serde::{Deserialize, Serialize};
use std::io::Read;
#[derive(Debug)]
pub enum Error {
Io,
InvalidHash(hash::Error),
}
type Result<T> = std::result::Result<T, Error>;
type Backend = backend::disk::Disk;
const ANCHOR_BUCKET: &str = "anchor";
const PATCH_BUCKET: &str = "patch";
const PLAIN_BUCKET: &str = "plain";
pub struct Storage {
anchors: Backend,
patches: Backend,
plains: Backend,
}
#[derive(Serialize, Deserialize)]
pub struct Config {
pub backend: <Backend as StorageBackend>::Config,
}
impl Storage {
pub fn new(config: &Config) -> Self {
let backend_factory = backend::factory(&config.backend);
let anchors = backend_factory(ANCHOR_BUCKET);
let patches = backend_factory(PATCH_BUCKET);
let plains = backend_factory(PLAIN_BUCKET);
Self {
anchors,
patches,
plains,
}
}
pub fn get(&self, blobref: &BlobRef) -> Result<Vec<u8>> {
self.plains.get(blobref).and_then(|x| x.ok_or(Error::Io))
}
pub fn put<T: Read>(&self, data: T) -> Result<(BlobRef, Vec<Chunk>)> {
let anchor = self.anchors.put(&json::serialize(Anchor::new())).unwrap();
self.extend(anchor, data)
}
pub fn extend<T: Read>(&self, anchor: BlobRef, data: T) -> Result<(BlobRef, Vec<Chunk>)> {
let chunker = StreamCDC::new(data, 4096, 4_194_304, 16_777_216);
let mut chunks = Vec::new();
for chunk in chunker {
let chunk = chunk.map_err(|_| Error::Io)?;
let chunk_blobref = self.plains.put(&chunk.data).unwrap();
let mut patch = Patch::new(anchor.clone());
patch.add_change(Change::AddChunk {
blobref: chunk_blobref.clone(),
offset: chunk.offset,
size: chunk.length,
});
chunks.push(Chunk {
blobref: chunk_blobref,
offset: chunk.offset as usize,
size: chunk.length,
});
self.patches.put(&json::serialize(patch)).unwrap();
}
Ok((anchor, chunks))
}
pub fn set_attr(&self, anchor: BlobRef, attr: Attr) -> Result<BlobRef> {
let mut patch = Patch::new(anchor);
patch.add_change(Change::SetAttr(attr));
self.patches.put(&json::serialize(patch))
}
pub fn set_attrs(&self, anchor: BlobRef, attrs: &[Attr]) -> Result<BlobRef> {
let mut patch = Patch::new(anchor);
for attr in attrs {
patch.add_change(Change::SetAttr(attr.clone()));
}
self.patches.put(&json::serialize(patch))
}
// pub fn get_anchor(&self, blobref: &BlobRef) -> Result<Option<Anchor>> {}
// pub fn get_patch(&self, blobref: &BlobRef) -> Result<Option<Patch>> {}
// pub fn get_plain(&self, blobref: &BlobRef) -> Result<Option<Vec<u8>>> {}
}
|