1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
|
pub mod attrs;
pub mod blobref;
mod index;
mod object;
mod storage;
use self::{attrs::Attr, blobref::BlobRef, index::Index, object::Object, storage::Storage};
use log::{error, warn};
use serde::{Deserialize, Serialize};
use std::{cmp, io::{Read, BufReader}, ops::Range, fs::File};
#[derive(Debug)]
pub enum Error {
Storage,
Index,
}
type Result<T> = std::result::Result<T, Error>;
#[derive(Serialize, Deserialize)]
pub struct Config {
storage: storage::Config,
index: index::Config,
}
pub struct Server {
storage: Storage,
index: Index,
}
pub struct Stream;
impl Server {
pub fn new(config: &Config) -> Result<Self> {
let index = Index::new(&config.index).map_err(|_| Error::Index)?;
let storage = Storage::new(&config.storage);
Ok(Self { storage, index })
}
pub fn read(&self, blobref: &BlobRef, range: Range<usize>) -> Result<Vec<u8>> {
let object = self.index.get(blobref).map_err(|_| Error::Index)?;
let mut bytes: Vec<u8> = Vec::with_capacity(range.len());
let mut initial_chunk_offset = None;
for chunk in &object.chunks {
let chunk_end = chunk.offset + chunk.size;
if chunk_end < range.start {
continue;
}
if chunk.offset > range.end {
break;
}
if initial_chunk_offset.is_none() {
initial_chunk_offset = Some(chunk.offset);
}
let chunk_bytes = self
.storage
.get(&chunk.blobref)
.map_err(|_| Error::Storage)?;
bytes.extend_from_slice(&chunk_bytes);
}
let start = range.start - initial_chunk_offset.unwrap_or(0);
let end = cmp::min(start + range.len(), bytes.len());
Ok(bytes[start..end].to_vec())
}
pub fn copy(&self, data: &BufReader<File>) -> Result<BlobRef> {
// From fs: a. Write to temp file (via BufWriter?), then on flush
// create a BufReader for temp file and pass it to this
// method.
// b. When writing to an existing file, first copy the data
// in a temp file, then proceed with a.
Err(Error::Storage)
}
pub fn put<T: Read>(&self, data: T) -> Result<BlobRef> {
let (anchor, chunks) = self.storage.put(data).map_err(|e| {
error!("storage error: {e:?}");
Error::Storage
})?;
self.index
.put(&anchor, &chunks)
.map_err(|e| {
warn!("index error: {e:?}");
})
.ok();
Ok(anchor)
}
pub fn put_with_attrs<T: Read>(&mut self, data: T, attrs: &[Attr]) -> Result<BlobRef> {
let anchor = self.put(data)?;
self.set_attrs(&anchor, attrs)?;
Ok(anchor)
}
fn extend<T: Read>(&self, anchor: BlobRef, data: T) -> Result<BlobRef> {
self.storage
.extend(anchor, data)
.map_err(|e| {
error!("storage error: {e:?}");
Error::Storage
})
.map(|r| r.0)
// TODO self.index.extend
}
pub fn set_attrs(&mut self, anchor: &BlobRef, attrs: &[Attr]) -> Result<BlobRef> {
let patch = self.storage.set_attrs(anchor.clone(), attrs).map_err(|e| {
error!("storage error: {e:?}");
Error::Storage
})?;
self.index
.set_attrs(anchor, attrs)
.map_err(|e| {
warn!("index error: {e:?}");
})
.ok();
Ok(patch)
}
pub fn list(&self, range: Range<usize>, filter: Option<Attr>) -> Result<Vec<Object>> {
let limit = range.len();
self.index
.list(limit, range.start, filter)
.map_err(|_| Error::Index)
}
}
|