diff options
Diffstat (limited to 'src/server')
-rw-r--r-- | src/server/attrs.rs | 47 | ||||
-rw-r--r-- | src/server/blobref.rs | 62 | ||||
-rw-r--r-- | src/server/index.rs | 260 | ||||
-rw-r--r-- | src/server/object.rs | 51 | ||||
-rw-r--r-- | src/server/storage.rs | 112 | ||||
-rw-r--r-- | src/server/storage/backend.rs | 26 | ||||
-rw-r--r-- | src/server/storage/backend/disk.rs | 65 | ||||
-rw-r--r-- | src/server/storage/record.rs | 56 |
8 files changed, 679 insertions, 0 deletions
diff --git a/src/server/attrs.rs b/src/server/attrs.rs new file mode 100644 index 0000000..eeb273a --- /dev/null +++ b/src/server/attrs.rs @@ -0,0 +1,47 @@ +use super::blobref::BlobRef; +use crate::common::{json, mime::MimeType}; +use rusqlite::ToSql; +use serde::{Deserialize, Serialize}; +use time::OffsetDateTime; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub enum Attr { + Name(String), + Group(BlobRef), + Mime(MimeType), + CreatedAt(#[serde(with = "time::serde::rfc3339")] OffsetDateTime), + UpdatedAt(#[serde(with = "time::serde::rfc3339")] OffsetDateTime), + DeletedAt(#[serde(with = "time::serde::rfc3339")] OffsetDateTime), + Tags(Vec<String>), + Note(String), +} + +pub fn key(attr: &Attr) -> String { + match attr { + Attr::Name(_) => "name", + Attr::Group(_) => "group", + Attr::Mime(_) => "mime", + Attr::CreatedAt(_) => "created_at", + Attr::UpdatedAt(_) => "updated_at", + Attr::DeletedAt(_) => "deleted_at", + Attr::Tags(_) => "tags", + Attr::Note(_) => "note", + } + .into() +} + +impl ToSql for Attr { + fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> { + match self { + Attr::Name(name) => name.to_sql(), + Attr::Group(group) => group.to_sql(), + Attr::Mime(mime) => mime.to_sql(), + Attr::CreatedAt(created_at) => created_at.to_sql(), + Attr::UpdatedAt(updated_at) => updated_at.to_sql(), + Attr::DeletedAt(deleted_at) => deleted_at.to_sql(), + Attr::Tags(tags) => Ok(json::serialize(tags).into()), + Attr::Note(note) => note.to_sql(), + } + } +} diff --git a/src/server/blobref.rs b/src/server/blobref.rs new file mode 100644 index 0000000..7d9fa66 --- /dev/null +++ b/src/server/blobref.rs @@ -0,0 +1,62 @@ +use crate::common::hash::{self, Hasher}; +use rusqlite::types::{FromSql, FromSqlError}; +use rusqlite::{ + types::{FromSqlResult, ToSqlOutput}, + ToSql, +}; +use serde::{Deserialize, Serialize}; +use std::fmt::Display; +use std::path::PathBuf; +use std::str::FromStr; + +#[derive(Debug)] +pub enum Error { + InvalidHash(hash::Error), +} + +type Result<T> = std::result::Result<T, Error>; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Hash, Eq)] +pub struct BlobRef(String); + +impl BlobRef { + pub fn from_str(s: &str) -> Result<Self> { + hash::Hash::from_str(s).map_err(Error::InvalidHash)?; + Ok(Self(s.to_string())) + } + + pub fn for_bytes(bytes: &[u8]) -> Self { + let mut hasher = Hasher::default(); + hasher.update(bytes); + BlobRef(hasher.finish().to_base32()) + } + + pub(super) fn path(&self) -> PathBuf { + let mut buf = PathBuf::new(); + buf.push(&self.0[0..4]); + buf.push(&self.0[4..]); + + buf + } +} + +impl Display for BlobRef { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "blobref:{}", self.0) + } +} + +impl ToSql for BlobRef { + fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> { + Ok(ToSqlOutput::Borrowed(rusqlite::types::ValueRef::Text( + self.0.as_bytes(), + ))) + } +} + +impl FromSql for BlobRef { + fn column_result(value: rusqlite::types::ValueRef<'_>) -> FromSqlResult<Self> { + let v: String = FromSql::column_result(value)?; + BlobRef::from_str(&v).map_err(|_| FromSqlError::InvalidType) + } +} diff --git a/src/server/index.rs b/src/server/index.rs new file mode 100644 index 0000000..042c4aa --- /dev/null +++ b/src/server/index.rs @@ -0,0 +1,260 @@ +use super::attrs::{self, Attr}; +use super::blobref::BlobRef; +use super::object::Chunk; +use super::object::{Attrs, Object}; +use crate::common::{json, sqlite}; +use log::error; +use rusqlite::{params, Connection, Row}; +use serde::{Deserialize, Serialize}; +use std::path::PathBuf; +use time::OffsetDateTime; + +#[derive(Serialize, Deserialize)] +pub struct Config { + pub root: PathBuf, +} + +const DB_NAME: &str = "index.sqlite3"; +const SQL_INIT: &str = " + create table objects ( + anchor text primary key, + size integer not null, + chunks text not null, -- JSON array + created_at datetime not null, + updated_at datetime not null, + x_name text, + x_group text, + x_mime text, + x_created_at text, + x_updated_at text, + x_deleted_at text, + x_tags text, + x_note text + ); +"; + +#[derive(Debug)] +pub enum Error { + Internal(rusqlite::Error), +} + +type Result<T> = std::result::Result<T, Error>; + +pub struct Index { + db: Connection, +} + +impl Index { + pub fn new(config: &Config) -> Result<Self> { + let db = match Connection::open(config.root.join(DB_NAME)) { + Ok(conn) => conn, + Err(e) => { + error!("Couldn't open connection to the database: {e:?}"); + return Err(Error::Internal(e)); + } + }; + + Ok(Self { db }) + } + + pub fn put(&self, anchor: &BlobRef, chunks: &[Chunk]) -> Result<()> { + // Calculate the overall size + let size: usize = chunks.iter().map(|c| c.size).sum(); + + // Insert or update the object for `anchor` + let query = " + insert into objects (anchor, size, chunks, created_at, updated_at) + values (?, ?, ?, ?, ?) + on conflict do update set + size = excluded.size, + chunks = excluded.chunks, + updated_at = excluded.updated_at + "; + + let now = OffsetDateTime::now_utc(); + let mut stmt = self.db.prepare(query).map_err(Error::Internal)?; + stmt.execute(params![anchor, size, json::serialize(chunks), now, now]) + .map_err(Error::Internal)?; + + Ok(()) + } + + pub fn set_attrs(&mut self, anchor: &BlobRef, attrs: &[Attr]) -> Result<()> { + let now = OffsetDateTime::now_utc(); + let tx = self.db.transaction().map_err(Error::Internal)?; + + for attr in attrs { + let query = format!( + r#"update objects set "x_{}" = ? where anchor = ?"#, + attrs::key(attr) + ); + let mut stmt = tx.prepare(&query).map_err(Error::Internal)?; + + stmt.execute(params![attr, anchor]) + .map_err(Error::Internal)?; + } + + { + let mut stmt = tx + .prepare("update objects set updated_at = ? where anchor = ?") + .map_err(Error::Internal)?; + stmt.execute(params![now, anchor]) + .map_err(Error::Internal)?; + } + + tx.commit().map_err(Error::Internal)?; + + Ok(()) + } + + pub fn get(&self, anchor: &BlobRef) -> Result<Object> { + let query = "select + anchor, + size, + chunks, + created_at, + updated_at, + x_name, + x_group, + x_mime, + x_created_at, + x_updated_at, + x_deleted_at, + x_tags, + x_note + from objects + where anchor = ?"; + + sqlite::get(&self.db, query, (anchor,), |r| Object::try_from(r)).map_err(Error::Internal) + } + + pub fn list(&self, limit: usize, offset: usize, filter: Option<Attr>) -> Result<Vec<Object>> { + let where_clause = if let Some(ref filter) = filter { + format!(r#""x_{}" = ?"#, attrs::key(filter)) + } else { + "1=1".into() + }; + + let query = format!( + r#"select + anchor, + size, + chunks, + created_at, + updated_at, + x_name, + x_group, + x_mime, + x_created_at, + x_updated_at, + x_deleted_at, + x_tags, + x_note + from objects + where {where_clause} + order by rowid + limit {limit} offset {offset} + "# + ); + + if let Some(filter) = filter { + sqlite::list(&self.db, &query, (filter,), |r| Object::try_from(r)) + .map_err(Error::Internal) + } else { + sqlite::list(&self.db, &query, (), |r| Object::try_from(r)).map_err(Error::Internal) + } + } +} +// +// impl Object { +// fn new(blobref: BlobRef) -> Self { +// Object { +// blobref, +// group: None, +// is_group: false, +// name: None, +// kind: None, +// size: None, +// created_at: None, +// updated_at: None, +// deleted_at: None, +// tags: Vec::new(), +// } +// } +// +// pub fn get_name(&self) -> String { +// self.name.clone().unwrap_or(self.blobref.to_string()) +// } +// +// pub fn get_size(&self) -> u64 { +// self.size.unwrap_or(0) +// } +// +// pub fn is_group(&self) -> bool { +// self.is_group +// } +// +// fn apply_patch(&mut self, patch: Patch) { +// for change in patch.changes { +// self.apply_change(change); +// } +// } +// +// fn apply_change(&mut self, change: Change) { +// match change { +// Change::MarkAsGroup => self.is_group = true, +// Change::SetName { name } => self.name = Some(name), +// Change::SetGroup { group } => self.group = Some(group), +// Change::SetSize { size } => self.size = Some(size), +// Change::SetCreatedAt { created_at } => self.created_at = Some(created_at), +// Change::SetUpdatedAt { updated_at } => self.updated_at = Some(updated_at), +// Change::SetDeletedAt { deleted_at } => self.deleted_at = Some(deleted_at), +// Change::SetTags { tags } => self.tags = tags, +// }; +// } +// } +// +// fn row_to_object(row: &Row) -> rusqlite::Result<Object> { +// Ok(Object { +// blobref: row.get(0)?, +// group: row.get(1)?, +// is_group: row.get(2)?, +// name: row.get(3)?, +// size: row.get(4)?, +// kind: row.get(5)?, +// created_at: row.get(6)?, +// updated_at: row.get(7)?, +// deleted_at: row.get(8)?, +// tags: vec![], +// }) +// } + +impl TryFrom<&Row<'_>> for Object { + type Error = rusqlite::Error; + + fn try_from(row: &Row) -> std::result::Result<Object, Self::Error> { + let chunks_string: Vec<u8> = row.get(2)?; + let chunks = serde_json::from_slice::<Vec<Chunk>>(&chunks_string) + .map_err(|_| rusqlite::Error::InvalidQuery)?; + + let attrs = Attrs { + name: row.get(5)?, + group: row.get(6)?, + mime: row.get(7)?, + created_at: row.get(8)?, + updated_at: row.get(9)?, + deleted_at: row.get(10)?, + tags: Vec::new(), + note: row.get(12)?, + }; + + Ok(Object { + blobref: row.get(0)?, + size: row.get(1)?, + chunks, + attrs, + created_at: row.get(3)?, + updated_at: row.get(4)?, + }) + } +} diff --git a/src/server/object.rs b/src/server/object.rs new file mode 100644 index 0000000..9bad6c9 --- /dev/null +++ b/src/server/object.rs @@ -0,0 +1,51 @@ +use super::blobref::BlobRef; +use crate::common::mime::MimeType; +use serde::{Deserialize, Serialize}; +use time::OffsetDateTime; + +#[derive(Debug)] +pub(super) struct Attrs { + pub name: Option<String>, + pub group: Option<BlobRef>, + pub mime: Option<MimeType>, + pub created_at: Option<OffsetDateTime>, + pub updated_at: Option<OffsetDateTime>, + pub deleted_at: Option<OffsetDateTime>, + pub tags: Vec<String>, + pub note: Option<String>, +} + +#[derive(Debug, Serialize, Deserialize)] +pub(super) struct Chunk { + pub size: usize, + pub offset: usize, + pub blobref: BlobRef, +} + +#[derive(Debug)] +pub struct Object { + pub(super) blobref: BlobRef, + pub(super) size: usize, + pub(super) chunks: Vec<Chunk>, + pub(super) attrs: Attrs, + pub(super) created_at: Option<OffsetDateTime>, + pub(super) updated_at: Option<OffsetDateTime>, +} + +impl Object { + pub fn blobref(&self) -> &BlobRef { + &self.blobref + } + + pub fn size(&self) -> usize { + self.size + } + + pub fn is_group(&self) -> bool { + matches!(self.attrs.mime, Some(MimeType::ApplicationXGroup)) + } + + pub fn get_name(&self) -> Option<String> { + self.attrs.name.clone() + } +} diff --git a/src/server/storage.rs b/src/server/storage.rs new file mode 100644 index 0000000..2b81ce1 --- /dev/null +++ b/src/server/storage.rs @@ -0,0 +1,112 @@ +pub mod backend; +mod record; + +use self::{ + backend::StorageBackend, + record::{Anchor, Change, Patch}, +}; +use super::{attrs::Attr, blobref::BlobRef, object::Chunk}; +use crate::common::{hash, json}; +use fastcdc::v2020::StreamCDC; +use serde::{Deserialize, Serialize}; +use std::io::Read; + +#[derive(Debug)] +pub enum Error { + Io, + InvalidHash(hash::Error), +} + +type Result<T> = std::result::Result<T, Error>; + +type Backend = backend::disk::Disk; + +const ANCHOR_BUCKET: &str = "anchor"; +const PATCH_BUCKET: &str = "patch"; +const PLAIN_BUCKET: &str = "plain"; + +pub struct Storage { + anchors: Backend, + patches: Backend, + plains: Backend, +} + +#[derive(Serialize, Deserialize)] +pub struct Config { + pub backend: <Backend as StorageBackend>::Config, +} + +impl Storage { + pub fn new(config: &Config) -> Self { + let backend_factory = backend::factory(&config.backend); + + let anchors = backend_factory(ANCHOR_BUCKET); + let patches = backend_factory(PATCH_BUCKET); + let plains = backend_factory(PLAIN_BUCKET); + + Self { + anchors, + patches, + plains, + } + } + + pub fn get(&self, blobref: &BlobRef) -> Result<Vec<u8>> { + self.plains.get(blobref).and_then(|x| x.ok_or(Error::Io)) + } + + pub fn put<T: Read>(&self, data: T) -> Result<(BlobRef, Vec<Chunk>)> { + let anchor = self.anchors.put(&json::serialize(Anchor::new())).unwrap(); + + self.extend(anchor, data) + } + + pub fn extend<T: Read>(&self, anchor: BlobRef, data: T) -> Result<(BlobRef, Vec<Chunk>)> { + let chunker = StreamCDC::new(data, 4096, 4_194_304, 16_777_216); + let mut chunks = Vec::new(); + + for chunk in chunker { + let chunk = chunk.map_err(|_| Error::Io)?; + let chunk_blobref = self.plains.put(&chunk.data).unwrap(); + + let mut patch = Patch::new(anchor.clone()); + + patch.add_change(Change::AddChunk { + blobref: chunk_blobref.clone(), + offset: chunk.offset, + size: chunk.length, + }); + chunks.push(Chunk { + blobref: chunk_blobref, + offset: chunk.offset as usize, + size: chunk.length, + }); + + self.patches.put(&json::serialize(patch)).unwrap(); + } + + Ok((anchor, chunks)) + } + + pub fn set_attr(&self, anchor: BlobRef, attr: Attr) -> Result<BlobRef> { + let mut patch = Patch::new(anchor); + patch.add_change(Change::SetAttr(attr)); + self.patches.put(&json::serialize(patch)) + } + + pub fn set_attrs(&self, anchor: BlobRef, attrs: &[Attr]) -> Result<BlobRef> { + let mut patch = Patch::new(anchor); + + for attr in attrs { + patch.add_change(Change::SetAttr(attr.clone())); + } + + self.patches.put(&json::serialize(patch)) + } + + // pub fn get_anchor(&self, blobref: &BlobRef) -> Result<Option<Anchor>> {} + + // pub fn get_patch(&self, blobref: &BlobRef) -> Result<Option<Patch>> {} + + // pub fn get_plain(&self, blobref: &BlobRef) -> Result<Option<Vec<u8>>> {} +} diff --git a/src/server/storage/backend.rs b/src/server/storage/backend.rs new file mode 100644 index 0000000..8ae62b4 --- /dev/null +++ b/src/server/storage/backend.rs @@ -0,0 +1,26 @@ +pub mod disk; + +use super::Result; +use crate::server::blobref::BlobRef; + +pub fn factory<'a, T: StorageBackend>(config: &T::Config) -> impl Fn(&'a str) -> T + '_ { + |bucket: &str| T::new(bucket, config) +} + +pub trait StorageBackend: Iterator { + type Config; + + fn new(bucket: &str, config: &Self::Config) -> Self; + + fn put(&self, data: &[u8]) -> Result<BlobRef>; + + fn get(&self, blobref: &BlobRef) -> Result<Option<Vec<u8>>>; + + fn exists(&self, blobref: &BlobRef) -> bool { + if let Ok(Some(_)) = self.get(blobref) { + return true; + } + + false + } +} diff --git a/src/server/storage/backend/disk.rs b/src/server/storage/backend/disk.rs new file mode 100644 index 0000000..3137376 --- /dev/null +++ b/src/server/storage/backend/disk.rs @@ -0,0 +1,65 @@ +use super::StorageBackend; +use crate::server::blobref::BlobRef; +use crate::server::storage::{Error, Result}; +use log::debug; +use serde::{Deserialize, Serialize}; +use std::{fs, io::ErrorKind, path::PathBuf}; + +pub struct Disk { + bucket: String, + pub root: PathBuf, +} + +#[derive(Serialize, Deserialize)] +pub struct Config { + pub root: PathBuf, +} + +impl StorageBackend for Disk { + type Config = Config; + + fn new(bucket: &str, config: &Self::Config) -> Self { + Self { + bucket: bucket.to_string(), + root: config.root.clone().join(bucket), + } + } + + fn put(&self, data: &[u8]) -> Result<BlobRef> { + let blobref = BlobRef::for_bytes(data); + debug!("Preparing {blobref}"); + + if !self.exists(&blobref) { + let blobpath = self.root.join(blobref.path()); + let blobdir = blobpath.parent().expect("blobpath should have a parent"); + debug!("Writing blob to {blobpath:?}"); + + fs::create_dir_all(blobdir).map_err(|_| Error::Io)?; + fs::write(blobpath, data).map_err(|_| Error::Io)?; + } + + Ok(blobref) + } + + fn get(&self, blobref: &BlobRef) -> Result<Option<Vec<u8>>> { + let blobpath = self.root.join(blobref.path()); + + match fs::read(blobpath) { + Ok(contents) => Ok(Some(contents)), + Err(e) if e.kind() == ErrorKind::NotFound => Ok(None), + Err(e) => Err(Error::Io), + } + } + + fn exists(&self, blobref: &BlobRef) -> bool { + self.root.join(blobref.path()).exists() + } +} + +impl Iterator for Disk { + type Item = BlobRef; + + fn next(&mut self) -> Option<Self::Item> { + todo!() + } +} diff --git a/src/server/storage/record.rs b/src/server/storage/record.rs new file mode 100644 index 0000000..ab5c977 --- /dev/null +++ b/src/server/storage/record.rs @@ -0,0 +1,56 @@ +use crate::server::{attrs::Attr, blobref::BlobRef}; +use rand::{thread_rng, Rng}; +use serde::{Deserialize, Serialize}; +use time::OffsetDateTime; + +#[derive(Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub(super) struct Anchor { + rand: u64, + #[serde(with = "time::serde::rfc3339")] + created_at: OffsetDateTime, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub(super) struct Patch { + pub anchor: BlobRef, + pub changes: Vec<Change>, + #[serde(with = "time::serde::rfc3339")] + created_at: OffsetDateTime, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +// #[allow(clippy::enum_variant_names)] +pub(super) enum Change { + AddChunk { + blobref: BlobRef, + offset: u64, + size: usize, + }, + SetAttr(Attr), +} + +impl Anchor { + pub(super) fn new() -> Self { + Self { + rand: thread_rng().gen(), + created_at: OffsetDateTime::now_utc(), + } + } +} + +impl Patch { + pub(super) fn new(anchor: BlobRef) -> Self { + Self { + anchor, + changes: Vec::new(), + created_at: OffsetDateTime::now_utc(), + } + } + + pub(super) fn add_change(&mut self, change: Change) { + self.changes.push(change); + } +} |