aboutsummaryrefslogtreecommitdiff
path: root/src/server
diff options
context:
space:
mode:
authorevuez <julien@mulga.net>2024-04-03 22:43:16 +0200
committerevuez <julien@mulga.net>2024-04-03 22:43:16 +0200
commit43e1a12b5bce11b4a28a53acca243e35c2be6d3e (patch)
tree07d64823718bfee063ab7b3d5721ac1e950ae17c /src/server
downloadcarton-43e1a12b5bce11b4a28a53acca243e35c2be6d3e.tar.gz
Initial commit
Diffstat (limited to 'src/server')
-rw-r--r--src/server/attrs.rs47
-rw-r--r--src/server/blobref.rs62
-rw-r--r--src/server/index.rs260
-rw-r--r--src/server/object.rs51
-rw-r--r--src/server/storage.rs112
-rw-r--r--src/server/storage/backend.rs26
-rw-r--r--src/server/storage/backend/disk.rs65
-rw-r--r--src/server/storage/record.rs56
8 files changed, 679 insertions, 0 deletions
diff --git a/src/server/attrs.rs b/src/server/attrs.rs
new file mode 100644
index 0000000..eeb273a
--- /dev/null
+++ b/src/server/attrs.rs
@@ -0,0 +1,47 @@
+use super::blobref::BlobRef;
+use crate::common::{json, mime::MimeType};
+use rusqlite::ToSql;
+use serde::{Deserialize, Serialize};
+use time::OffsetDateTime;
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "kebab-case")]
+pub enum Attr {
+ Name(String),
+ Group(BlobRef),
+ Mime(MimeType),
+ CreatedAt(#[serde(with = "time::serde::rfc3339")] OffsetDateTime),
+ UpdatedAt(#[serde(with = "time::serde::rfc3339")] OffsetDateTime),
+ DeletedAt(#[serde(with = "time::serde::rfc3339")] OffsetDateTime),
+ Tags(Vec<String>),
+ Note(String),
+}
+
+pub fn key(attr: &Attr) -> String {
+ match attr {
+ Attr::Name(_) => "name",
+ Attr::Group(_) => "group",
+ Attr::Mime(_) => "mime",
+ Attr::CreatedAt(_) => "created_at",
+ Attr::UpdatedAt(_) => "updated_at",
+ Attr::DeletedAt(_) => "deleted_at",
+ Attr::Tags(_) => "tags",
+ Attr::Note(_) => "note",
+ }
+ .into()
+}
+
+impl ToSql for Attr {
+ fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
+ match self {
+ Attr::Name(name) => name.to_sql(),
+ Attr::Group(group) => group.to_sql(),
+ Attr::Mime(mime) => mime.to_sql(),
+ Attr::CreatedAt(created_at) => created_at.to_sql(),
+ Attr::UpdatedAt(updated_at) => updated_at.to_sql(),
+ Attr::DeletedAt(deleted_at) => deleted_at.to_sql(),
+ Attr::Tags(tags) => Ok(json::serialize(tags).into()),
+ Attr::Note(note) => note.to_sql(),
+ }
+ }
+}
diff --git a/src/server/blobref.rs b/src/server/blobref.rs
new file mode 100644
index 0000000..7d9fa66
--- /dev/null
+++ b/src/server/blobref.rs
@@ -0,0 +1,62 @@
+use crate::common::hash::{self, Hasher};
+use rusqlite::types::{FromSql, FromSqlError};
+use rusqlite::{
+ types::{FromSqlResult, ToSqlOutput},
+ ToSql,
+};
+use serde::{Deserialize, Serialize};
+use std::fmt::Display;
+use std::path::PathBuf;
+use std::str::FromStr;
+
+#[derive(Debug)]
+pub enum Error {
+ InvalidHash(hash::Error),
+}
+
+type Result<T> = std::result::Result<T, Error>;
+
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Hash, Eq)]
+pub struct BlobRef(String);
+
+impl BlobRef {
+ pub fn from_str(s: &str) -> Result<Self> {
+ hash::Hash::from_str(s).map_err(Error::InvalidHash)?;
+ Ok(Self(s.to_string()))
+ }
+
+ pub fn for_bytes(bytes: &[u8]) -> Self {
+ let mut hasher = Hasher::default();
+ hasher.update(bytes);
+ BlobRef(hasher.finish().to_base32())
+ }
+
+ pub(super) fn path(&self) -> PathBuf {
+ let mut buf = PathBuf::new();
+ buf.push(&self.0[0..4]);
+ buf.push(&self.0[4..]);
+
+ buf
+ }
+}
+
+impl Display for BlobRef {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "blobref:{}", self.0)
+ }
+}
+
+impl ToSql for BlobRef {
+ fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
+ Ok(ToSqlOutput::Borrowed(rusqlite::types::ValueRef::Text(
+ self.0.as_bytes(),
+ )))
+ }
+}
+
+impl FromSql for BlobRef {
+ fn column_result(value: rusqlite::types::ValueRef<'_>) -> FromSqlResult<Self> {
+ let v: String = FromSql::column_result(value)?;
+ BlobRef::from_str(&v).map_err(|_| FromSqlError::InvalidType)
+ }
+}
diff --git a/src/server/index.rs b/src/server/index.rs
new file mode 100644
index 0000000..042c4aa
--- /dev/null
+++ b/src/server/index.rs
@@ -0,0 +1,260 @@
+use super::attrs::{self, Attr};
+use super::blobref::BlobRef;
+use super::object::Chunk;
+use super::object::{Attrs, Object};
+use crate::common::{json, sqlite};
+use log::error;
+use rusqlite::{params, Connection, Row};
+use serde::{Deserialize, Serialize};
+use std::path::PathBuf;
+use time::OffsetDateTime;
+
+#[derive(Serialize, Deserialize)]
+pub struct Config {
+ pub root: PathBuf,
+}
+
+const DB_NAME: &str = "index.sqlite3";
+const SQL_INIT: &str = "
+ create table objects (
+ anchor text primary key,
+ size integer not null,
+ chunks text not null, -- JSON array
+ created_at datetime not null,
+ updated_at datetime not null,
+ x_name text,
+ x_group text,
+ x_mime text,
+ x_created_at text,
+ x_updated_at text,
+ x_deleted_at text,
+ x_tags text,
+ x_note text
+ );
+";
+
+#[derive(Debug)]
+pub enum Error {
+ Internal(rusqlite::Error),
+}
+
+type Result<T> = std::result::Result<T, Error>;
+
+pub struct Index {
+ db: Connection,
+}
+
+impl Index {
+ pub fn new(config: &Config) -> Result<Self> {
+ let db = match Connection::open(config.root.join(DB_NAME)) {
+ Ok(conn) => conn,
+ Err(e) => {
+ error!("Couldn't open connection to the database: {e:?}");
+ return Err(Error::Internal(e));
+ }
+ };
+
+ Ok(Self { db })
+ }
+
+ pub fn put(&self, anchor: &BlobRef, chunks: &[Chunk]) -> Result<()> {
+ // Calculate the overall size
+ let size: usize = chunks.iter().map(|c| c.size).sum();
+
+ // Insert or update the object for `anchor`
+ let query = "
+ insert into objects (anchor, size, chunks, created_at, updated_at)
+ values (?, ?, ?, ?, ?)
+ on conflict do update set
+ size = excluded.size,
+ chunks = excluded.chunks,
+ updated_at = excluded.updated_at
+ ";
+
+ let now = OffsetDateTime::now_utc();
+ let mut stmt = self.db.prepare(query).map_err(Error::Internal)?;
+ stmt.execute(params![anchor, size, json::serialize(chunks), now, now])
+ .map_err(Error::Internal)?;
+
+ Ok(())
+ }
+
+ pub fn set_attrs(&mut self, anchor: &BlobRef, attrs: &[Attr]) -> Result<()> {
+ let now = OffsetDateTime::now_utc();
+ let tx = self.db.transaction().map_err(Error::Internal)?;
+
+ for attr in attrs {
+ let query = format!(
+ r#"update objects set "x_{}" = ? where anchor = ?"#,
+ attrs::key(attr)
+ );
+ let mut stmt = tx.prepare(&query).map_err(Error::Internal)?;
+
+ stmt.execute(params![attr, anchor])
+ .map_err(Error::Internal)?;
+ }
+
+ {
+ let mut stmt = tx
+ .prepare("update objects set updated_at = ? where anchor = ?")
+ .map_err(Error::Internal)?;
+ stmt.execute(params![now, anchor])
+ .map_err(Error::Internal)?;
+ }
+
+ tx.commit().map_err(Error::Internal)?;
+
+ Ok(())
+ }
+
+ pub fn get(&self, anchor: &BlobRef) -> Result<Object> {
+ let query = "select
+ anchor,
+ size,
+ chunks,
+ created_at,
+ updated_at,
+ x_name,
+ x_group,
+ x_mime,
+ x_created_at,
+ x_updated_at,
+ x_deleted_at,
+ x_tags,
+ x_note
+ from objects
+ where anchor = ?";
+
+ sqlite::get(&self.db, query, (anchor,), |r| Object::try_from(r)).map_err(Error::Internal)
+ }
+
+ pub fn list(&self, limit: usize, offset: usize, filter: Option<Attr>) -> Result<Vec<Object>> {
+ let where_clause = if let Some(ref filter) = filter {
+ format!(r#""x_{}" = ?"#, attrs::key(filter))
+ } else {
+ "1=1".into()
+ };
+
+ let query = format!(
+ r#"select
+ anchor,
+ size,
+ chunks,
+ created_at,
+ updated_at,
+ x_name,
+ x_group,
+ x_mime,
+ x_created_at,
+ x_updated_at,
+ x_deleted_at,
+ x_tags,
+ x_note
+ from objects
+ where {where_clause}
+ order by rowid
+ limit {limit} offset {offset}
+ "#
+ );
+
+ if let Some(filter) = filter {
+ sqlite::list(&self.db, &query, (filter,), |r| Object::try_from(r))
+ .map_err(Error::Internal)
+ } else {
+ sqlite::list(&self.db, &query, (), |r| Object::try_from(r)).map_err(Error::Internal)
+ }
+ }
+}
+//
+// impl Object {
+// fn new(blobref: BlobRef) -> Self {
+// Object {
+// blobref,
+// group: None,
+// is_group: false,
+// name: None,
+// kind: None,
+// size: None,
+// created_at: None,
+// updated_at: None,
+// deleted_at: None,
+// tags: Vec::new(),
+// }
+// }
+//
+// pub fn get_name(&self) -> String {
+// self.name.clone().unwrap_or(self.blobref.to_string())
+// }
+//
+// pub fn get_size(&self) -> u64 {
+// self.size.unwrap_or(0)
+// }
+//
+// pub fn is_group(&self) -> bool {
+// self.is_group
+// }
+//
+// fn apply_patch(&mut self, patch: Patch) {
+// for change in patch.changes {
+// self.apply_change(change);
+// }
+// }
+//
+// fn apply_change(&mut self, change: Change) {
+// match change {
+// Change::MarkAsGroup => self.is_group = true,
+// Change::SetName { name } => self.name = Some(name),
+// Change::SetGroup { group } => self.group = Some(group),
+// Change::SetSize { size } => self.size = Some(size),
+// Change::SetCreatedAt { created_at } => self.created_at = Some(created_at),
+// Change::SetUpdatedAt { updated_at } => self.updated_at = Some(updated_at),
+// Change::SetDeletedAt { deleted_at } => self.deleted_at = Some(deleted_at),
+// Change::SetTags { tags } => self.tags = tags,
+// };
+// }
+// }
+//
+// fn row_to_object(row: &Row) -> rusqlite::Result<Object> {
+// Ok(Object {
+// blobref: row.get(0)?,
+// group: row.get(1)?,
+// is_group: row.get(2)?,
+// name: row.get(3)?,
+// size: row.get(4)?,
+// kind: row.get(5)?,
+// created_at: row.get(6)?,
+// updated_at: row.get(7)?,
+// deleted_at: row.get(8)?,
+// tags: vec![],
+// })
+// }
+
+impl TryFrom<&Row<'_>> for Object {
+ type Error = rusqlite::Error;
+
+ fn try_from(row: &Row) -> std::result::Result<Object, Self::Error> {
+ let chunks_string: Vec<u8> = row.get(2)?;
+ let chunks = serde_json::from_slice::<Vec<Chunk>>(&chunks_string)
+ .map_err(|_| rusqlite::Error::InvalidQuery)?;
+
+ let attrs = Attrs {
+ name: row.get(5)?,
+ group: row.get(6)?,
+ mime: row.get(7)?,
+ created_at: row.get(8)?,
+ updated_at: row.get(9)?,
+ deleted_at: row.get(10)?,
+ tags: Vec::new(),
+ note: row.get(12)?,
+ };
+
+ Ok(Object {
+ blobref: row.get(0)?,
+ size: row.get(1)?,
+ chunks,
+ attrs,
+ created_at: row.get(3)?,
+ updated_at: row.get(4)?,
+ })
+ }
+}
diff --git a/src/server/object.rs b/src/server/object.rs
new file mode 100644
index 0000000..9bad6c9
--- /dev/null
+++ b/src/server/object.rs
@@ -0,0 +1,51 @@
+use super::blobref::BlobRef;
+use crate::common::mime::MimeType;
+use serde::{Deserialize, Serialize};
+use time::OffsetDateTime;
+
+#[derive(Debug)]
+pub(super) struct Attrs {
+ pub name: Option<String>,
+ pub group: Option<BlobRef>,
+ pub mime: Option<MimeType>,
+ pub created_at: Option<OffsetDateTime>,
+ pub updated_at: Option<OffsetDateTime>,
+ pub deleted_at: Option<OffsetDateTime>,
+ pub tags: Vec<String>,
+ pub note: Option<String>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub(super) struct Chunk {
+ pub size: usize,
+ pub offset: usize,
+ pub blobref: BlobRef,
+}
+
+#[derive(Debug)]
+pub struct Object {
+ pub(super) blobref: BlobRef,
+ pub(super) size: usize,
+ pub(super) chunks: Vec<Chunk>,
+ pub(super) attrs: Attrs,
+ pub(super) created_at: Option<OffsetDateTime>,
+ pub(super) updated_at: Option<OffsetDateTime>,
+}
+
+impl Object {
+ pub fn blobref(&self) -> &BlobRef {
+ &self.blobref
+ }
+
+ pub fn size(&self) -> usize {
+ self.size
+ }
+
+ pub fn is_group(&self) -> bool {
+ matches!(self.attrs.mime, Some(MimeType::ApplicationXGroup))
+ }
+
+ pub fn get_name(&self) -> Option<String> {
+ self.attrs.name.clone()
+ }
+}
diff --git a/src/server/storage.rs b/src/server/storage.rs
new file mode 100644
index 0000000..2b81ce1
--- /dev/null
+++ b/src/server/storage.rs
@@ -0,0 +1,112 @@
+pub mod backend;
+mod record;
+
+use self::{
+ backend::StorageBackend,
+ record::{Anchor, Change, Patch},
+};
+use super::{attrs::Attr, blobref::BlobRef, object::Chunk};
+use crate::common::{hash, json};
+use fastcdc::v2020::StreamCDC;
+use serde::{Deserialize, Serialize};
+use std::io::Read;
+
+#[derive(Debug)]
+pub enum Error {
+ Io,
+ InvalidHash(hash::Error),
+}
+
+type Result<T> = std::result::Result<T, Error>;
+
+type Backend = backend::disk::Disk;
+
+const ANCHOR_BUCKET: &str = "anchor";
+const PATCH_BUCKET: &str = "patch";
+const PLAIN_BUCKET: &str = "plain";
+
+pub struct Storage {
+ anchors: Backend,
+ patches: Backend,
+ plains: Backend,
+}
+
+#[derive(Serialize, Deserialize)]
+pub struct Config {
+ pub backend: <Backend as StorageBackend>::Config,
+}
+
+impl Storage {
+ pub fn new(config: &Config) -> Self {
+ let backend_factory = backend::factory(&config.backend);
+
+ let anchors = backend_factory(ANCHOR_BUCKET);
+ let patches = backend_factory(PATCH_BUCKET);
+ let plains = backend_factory(PLAIN_BUCKET);
+
+ Self {
+ anchors,
+ patches,
+ plains,
+ }
+ }
+
+ pub fn get(&self, blobref: &BlobRef) -> Result<Vec<u8>> {
+ self.plains.get(blobref).and_then(|x| x.ok_or(Error::Io))
+ }
+
+ pub fn put<T: Read>(&self, data: T) -> Result<(BlobRef, Vec<Chunk>)> {
+ let anchor = self.anchors.put(&json::serialize(Anchor::new())).unwrap();
+
+ self.extend(anchor, data)
+ }
+
+ pub fn extend<T: Read>(&self, anchor: BlobRef, data: T) -> Result<(BlobRef, Vec<Chunk>)> {
+ let chunker = StreamCDC::new(data, 4096, 4_194_304, 16_777_216);
+ let mut chunks = Vec::new();
+
+ for chunk in chunker {
+ let chunk = chunk.map_err(|_| Error::Io)?;
+ let chunk_blobref = self.plains.put(&chunk.data).unwrap();
+
+ let mut patch = Patch::new(anchor.clone());
+
+ patch.add_change(Change::AddChunk {
+ blobref: chunk_blobref.clone(),
+ offset: chunk.offset,
+ size: chunk.length,
+ });
+ chunks.push(Chunk {
+ blobref: chunk_blobref,
+ offset: chunk.offset as usize,
+ size: chunk.length,
+ });
+
+ self.patches.put(&json::serialize(patch)).unwrap();
+ }
+
+ Ok((anchor, chunks))
+ }
+
+ pub fn set_attr(&self, anchor: BlobRef, attr: Attr) -> Result<BlobRef> {
+ let mut patch = Patch::new(anchor);
+ patch.add_change(Change::SetAttr(attr));
+ self.patches.put(&json::serialize(patch))
+ }
+
+ pub fn set_attrs(&self, anchor: BlobRef, attrs: &[Attr]) -> Result<BlobRef> {
+ let mut patch = Patch::new(anchor);
+
+ for attr in attrs {
+ patch.add_change(Change::SetAttr(attr.clone()));
+ }
+
+ self.patches.put(&json::serialize(patch))
+ }
+
+ // pub fn get_anchor(&self, blobref: &BlobRef) -> Result<Option<Anchor>> {}
+
+ // pub fn get_patch(&self, blobref: &BlobRef) -> Result<Option<Patch>> {}
+
+ // pub fn get_plain(&self, blobref: &BlobRef) -> Result<Option<Vec<u8>>> {}
+}
diff --git a/src/server/storage/backend.rs b/src/server/storage/backend.rs
new file mode 100644
index 0000000..8ae62b4
--- /dev/null
+++ b/src/server/storage/backend.rs
@@ -0,0 +1,26 @@
+pub mod disk;
+
+use super::Result;
+use crate::server::blobref::BlobRef;
+
+pub fn factory<'a, T: StorageBackend>(config: &T::Config) -> impl Fn(&'a str) -> T + '_ {
+ |bucket: &str| T::new(bucket, config)
+}
+
+pub trait StorageBackend: Iterator {
+ type Config;
+
+ fn new(bucket: &str, config: &Self::Config) -> Self;
+
+ fn put(&self, data: &[u8]) -> Result<BlobRef>;
+
+ fn get(&self, blobref: &BlobRef) -> Result<Option<Vec<u8>>>;
+
+ fn exists(&self, blobref: &BlobRef) -> bool {
+ if let Ok(Some(_)) = self.get(blobref) {
+ return true;
+ }
+
+ false
+ }
+}
diff --git a/src/server/storage/backend/disk.rs b/src/server/storage/backend/disk.rs
new file mode 100644
index 0000000..3137376
--- /dev/null
+++ b/src/server/storage/backend/disk.rs
@@ -0,0 +1,65 @@
+use super::StorageBackend;
+use crate::server::blobref::BlobRef;
+use crate::server::storage::{Error, Result};
+use log::debug;
+use serde::{Deserialize, Serialize};
+use std::{fs, io::ErrorKind, path::PathBuf};
+
+pub struct Disk {
+ bucket: String,
+ pub root: PathBuf,
+}
+
+#[derive(Serialize, Deserialize)]
+pub struct Config {
+ pub root: PathBuf,
+}
+
+impl StorageBackend for Disk {
+ type Config = Config;
+
+ fn new(bucket: &str, config: &Self::Config) -> Self {
+ Self {
+ bucket: bucket.to_string(),
+ root: config.root.clone().join(bucket),
+ }
+ }
+
+ fn put(&self, data: &[u8]) -> Result<BlobRef> {
+ let blobref = BlobRef::for_bytes(data);
+ debug!("Preparing {blobref}");
+
+ if !self.exists(&blobref) {
+ let blobpath = self.root.join(blobref.path());
+ let blobdir = blobpath.parent().expect("blobpath should have a parent");
+ debug!("Writing blob to {blobpath:?}");
+
+ fs::create_dir_all(blobdir).map_err(|_| Error::Io)?;
+ fs::write(blobpath, data).map_err(|_| Error::Io)?;
+ }
+
+ Ok(blobref)
+ }
+
+ fn get(&self, blobref: &BlobRef) -> Result<Option<Vec<u8>>> {
+ let blobpath = self.root.join(blobref.path());
+
+ match fs::read(blobpath) {
+ Ok(contents) => Ok(Some(contents)),
+ Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
+ Err(e) => Err(Error::Io),
+ }
+ }
+
+ fn exists(&self, blobref: &BlobRef) -> bool {
+ self.root.join(blobref.path()).exists()
+ }
+}
+
+impl Iterator for Disk {
+ type Item = BlobRef;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ todo!()
+ }
+}
diff --git a/src/server/storage/record.rs b/src/server/storage/record.rs
new file mode 100644
index 0000000..ab5c977
--- /dev/null
+++ b/src/server/storage/record.rs
@@ -0,0 +1,56 @@
+use crate::server::{attrs::Attr, blobref::BlobRef};
+use rand::{thread_rng, Rng};
+use serde::{Deserialize, Serialize};
+use time::OffsetDateTime;
+
+#[derive(Serialize, Deserialize)]
+#[serde(rename_all = "kebab-case")]
+pub(super) struct Anchor {
+ rand: u64,
+ #[serde(with = "time::serde::rfc3339")]
+ created_at: OffsetDateTime,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+#[serde(rename_all = "kebab-case")]
+pub(super) struct Patch {
+ pub anchor: BlobRef,
+ pub changes: Vec<Change>,
+ #[serde(with = "time::serde::rfc3339")]
+ created_at: OffsetDateTime,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+#[serde(rename_all = "kebab-case")]
+// #[allow(clippy::enum_variant_names)]
+pub(super) enum Change {
+ AddChunk {
+ blobref: BlobRef,
+ offset: u64,
+ size: usize,
+ },
+ SetAttr(Attr),
+}
+
+impl Anchor {
+ pub(super) fn new() -> Self {
+ Self {
+ rand: thread_rng().gen(),
+ created_at: OffsetDateTime::now_utc(),
+ }
+ }
+}
+
+impl Patch {
+ pub(super) fn new(anchor: BlobRef) -> Self {
+ Self {
+ anchor,
+ changes: Vec::new(),
+ created_at: OffsetDateTime::now_utc(),
+ }
+ }
+
+ pub(super) fn add_change(&mut self, change: Change) {
+ self.changes.push(change);
+ }
+}