diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/client.rs | 13 | ||||
-rw-r--r-- | src/client/fs.rs | 634 | ||||
-rw-r--r-- | src/client/fs/dcache.rs | 107 | ||||
-rw-r--r-- | src/client/fs/fcache.rs | 31 | ||||
-rw-r--r-- | src/client/fs/fh.rs | 110 | ||||
-rw-r--r-- | src/client/fs/file.rs | 78 | ||||
-rw-r--r-- | src/client/fs/ino.rs | 51 | ||||
-rw-r--r-- | src/common.rs | 26 | ||||
-rw-r--r-- | src/common/hash.rs | 109 | ||||
-rw-r--r-- | src/common/json.rs | 7 | ||||
-rw-r--r-- | src/common/mime.rs | 56 | ||||
-rw-r--r-- | src/common/slot_map.rs | 85 | ||||
-rw-r--r-- | src/common/sqlite.rs | 60 | ||||
-rw-r--r-- | src/main.rs | 65 | ||||
-rw-r--r-- | src/server.rs | 139 | ||||
-rw-r--r-- | src/server/attrs.rs | 47 | ||||
-rw-r--r-- | src/server/blobref.rs | 62 | ||||
-rw-r--r-- | src/server/index.rs | 260 | ||||
-rw-r--r-- | src/server/object.rs | 51 | ||||
-rw-r--r-- | src/server/storage.rs | 112 | ||||
-rw-r--r-- | src/server/storage/backend.rs | 26 | ||||
-rw-r--r-- | src/server/storage/backend/disk.rs | 65 | ||||
-rw-r--r-- | src/server/storage/record.rs | 56 |
23 files changed, 2250 insertions, 0 deletions
diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 0000000..e4cb652 --- /dev/null +++ b/src/client.rs @@ -0,0 +1,13 @@ +use serde::{Deserialize, Serialize}; + +pub mod fs; + +#[derive(Serialize, Deserialize)] +pub struct Config { + pub fs: FsConfig, +} + +#[derive(Serialize, Deserialize)] +pub struct FsConfig { + pub mountpoint: String, +} diff --git a/src/client/fs.rs b/src/client/fs.rs new file mode 100644 index 0000000..10949f0 --- /dev/null +++ b/src/client/fs.rs @@ -0,0 +1,634 @@ +mod dcache; +mod fcache; +mod fh; +mod file; +mod ino; + +use self::{dcache::Dcache, fcache::Fcache, fh::FileHandle, file::File, ino::Ino}; +use crate::{ + common::{ + mime::{self, MimeType}, + slot_map::SlotMap, + }, + server::{attrs::Attr, blobref::BlobRef, Server}, +}; +use fuser::TimeOrNow; +use libc::{EBADF, EEXIST, EFBIG, EINVAL, EIO, ENFILE, ENOENT, O_RDONLY, O_RDWR, O_WRONLY}; +use log::{debug, error, warn}; +use std::{ + cmp, + ffi::{c_int, OsStr, OsString}, + io, + time::{Duration, SystemTime}, +}; + +const GENERATION: u64 = 0; +const CACHE_TTL: Duration = Duration::from_secs(1); + +pub struct FileSystem { + server: Server, + ino: Ino, + dcache: Dcache, + fcache: Fcache, + // TODO: bcache ino -> blobref + handles: SlotMap<FileHandle>, +} + +impl FileSystem { + pub fn new(server: Server) -> Self { + let ino = Ino::new(); + + let mut fs = FileSystem { + server, + ino, + dcache: Dcache::new(), + fcache: Fcache::new(), + handles: SlotMap::new(), + }; + + fs.setup_root(); + + fs + } + + pub fn mount(self, mountpoint: &str) -> io::Result<()> { + // TODO: Ignore permissions + let opts = &[ + fuser::MountOption::AllowOther, + fuser::MountOption::AutoUnmount, + fuser::MountOption::NoExec, + ]; + + fuser::mount2(self, mountpoint, opts) + } + + fn setup_root(&mut self) { + let root_ino = self.ino; + let parent_ino = self.next_ino(); + + self.dcache.add_dentry(root_ino, parent_ino); + + let mut root = File::new_directory(root_ino, "."); + root.set_parent(parent_ino); + + self.fcache.insert(root_ino, root); + self.fcache + .insert(parent_ino, File::new_directory(parent_ino, "..")); + } + + fn next_ino(&mut self) -> Ino { + *self.ino.next() + } + + fn empty_handle(&mut self) -> std::io::Result<Option<u64>> { + Ok(self.handles.insert(FileHandle::empty()?).map(u64::from)) + } + // + fn release_handle(&mut self, fh: u64) { + let fh = u32::try_from(fh).expect("Invalid file handle"); + self.handles.remove(fh); + } + // + fn file_by_name(&self, parent: Ino, name: &OsStr) -> Option<&File> { + let ino = self.dcache.get_ino(parent, name)?; + self.fcache.get(*ino) + } + // + // fn get_blobref_by_name(&self, parent: u64, name: &OsStr) -> Option<BlobRef> { + // self.get_by_name(parent, name) + // .and_then(|f| f.blobref.clone()) + // } + + fn get_blobref_by_ino(&self, ino: Ino) -> Option<BlobRef> { + self.fcache.get(ino).and_then(|f| f.blobref.clone()) + } + + fn get_parent_blobref(&self, file: &File) -> Option<BlobRef> { + file.parent().and_then(|ino| self.get_blobref_by_ino(ino)) + } + + fn get_parent_blobref_by_ino(&self, ino: Ino) -> Option<BlobRef> { + self.fcache + .get(ino) + .and_then(|f| self.get_parent_blobref(f)) + } + // + // fn remove_from_cache_by_name(&mut self, parent: u64, name: &OsStr) -> Option<u64> { + // let ino = self + // .dcache + // .get_mut(&parent) + // .and_then(|xs| xs.remove(name))?; + // self.dcache.remove(&ino); + // self.fcache.remove(&ino); + // + // Some(ino) + // } +} + +impl fuser::Filesystem for FileSystem { + fn init( + &mut self, + _req: &fuser::Request<'_>, + _config: &mut fuser::KernelConfig, + ) -> Result<(), c_int> { + Ok(()) + } + + fn lookup( + &mut self, + _req: &fuser::Request<'_>, + parent: u64, + name: &OsStr, + reply: fuser::ReplyEntry, + ) { + debug!("lookup(parent: {:#x?}, name {:?})", parent, name); + + if let Some(file) = self.file_by_name(parent.into(), name) { + reply.entry(&CACHE_TTL, &file.attr, 0); + } else { + warn!("lookup(parent: {parent:#x?}, name {name:?}): ENOENT"); + reply.error(ENOENT); + } + } + + fn getattr(&mut self, _req: &fuser::Request<'_>, ino: u64, reply: fuser::ReplyAttr) { + debug!("getattr(ino: {:#x?})", ino); + + if let Some(file) = self.fcache.get(ino.into()) { + reply.attr(&CACHE_TTL, &file.attr); + } else { + warn!("getattr(ino: {:#x?}): ENOENT", ino); + reply.error(ENOENT); + } + } + + fn open(&mut self, _req: &fuser::Request<'_>, ino: u64, flags: i32, reply: fuser::ReplyOpen) { + debug!("open(ino: {ino:#x?}, flags: {flags:b})"); + + // For now, we only support read-only, write-only and read-write. + if flags & 0xf & !(O_RDONLY | O_WRONLY | O_RDWR) != 0 { + error!("open(ino: {ino:#x?}): EIO: Unsupported mode"); + reply.error(EIO); + return; + } + + // File should be cached first (via `readdir`). + if self.fcache.get_mut(ino.into()).is_none() { + error!("open(ino: {ino:#x?}): ENOENT"); + reply.error(ENOENT); + return; + }; + + match self.empty_handle() { + Ok(Some(fh)) => reply.opened(fh, u32::try_from(flags).expect("Invalid flags")), + Ok(None) => { + // No file handle available. + error!("open(ino: {ino:#x?}): ENFILE"); + reply.error(ENFILE); + } + Err(e) => { + error!("open(ino: {ino:#x?}): EIO: {e}"); + reply.error(EIO); + } + } + } + + fn mkdir( + &mut self, + _req: &fuser::Request<'_>, + parent: u64, + name: &OsStr, + mode: u32, + umask: u32, + reply: fuser::ReplyEntry, + ) { + debug!("mkdir(parent: {parent:#x?}, name: {name:?}, mode: {mode}, umask: {umask:#x?})"); + + let parent_blobref = self.get_blobref_by_ino(parent.into()); + + let Some(dentry) = self.dcache.get_mut(parent.into()) else { + warn!("mkdir(parent: {parent:#x?}, name: {name:?}): ENOENT"); + reply.error(ENOENT); + return; + }; + + self.ino.next(); + + if dentry.try_insert(name.into(), self.ino).is_err() { + self.ino.prev(); + + warn!("mkdir(parent: {parent:#x?}, name: {name:?}): EEXIST"); + reply.error(EEXIST); + return; + } + + let mut file = File::new_directory(self.ino, name); + file.set_parent(parent.into()); + + let mut attrs = vec![Attr::Mime(MimeType::ApplicationXGroup)]; + if let Some(b) = parent_blobref { + attrs.push(Attr::Group(b)) + }; + + if let Some(name) = name.to_str() { + attrs.push(Attr::Name(name.to_string())); + } else { + warn!("mkdir(parent: {parent:#x?}, name: {name:?}): EINVAL"); + reply.error(EINVAL); + return; + } + + let Ok(blobref) = self.server.put_with_attrs(&[] as &[u8], &attrs) else { + dentry.remove(name); + warn!("mkdir(parent: {parent:#x?}, name: {name:?}): EIO"); + reply.error(EIO); + return; + }; + + file.blobref = Some(blobref); + file.attr.mtime = SystemTime::now(); + + reply.entry(&CACHE_TTL, &file.attr, 0); + + self.dcache.add_dentry(self.ino, parent.into()); + self.fcache.insert(self.ino, file); + } + + fn readdir( + &mut self, + _req: &fuser::Request<'_>, + ino: u64, + fh: u64, + offset: i64, + mut reply: fuser::ReplyDirectory, + ) { + debug!("readdir(ino: {ino:#x?}, fh: {fh}, offset: {offset})"); + + let offset = usize::try_from(offset).expect("Invalid offset"); + + // Load dentry from the index + // TODO: Move to opendir + if let Err(e) = load_dentry( + &self.server, + &mut self.ino, + ino.into(), + &mut self.dcache, + &mut self.fcache, + ) { + error!("readdir(ino: {ino:#x?}, fh: {fh}, offset: {offset}): {e:?}"); + reply.error(e); + return; + } + + let Some(dentry) = self.dcache.get(ino.into()) else { + warn!("readdir(ino: {ino:#x?}, fh: {fh}, offset: {offset}): ENOENT"); + reply.error(ENOENT); + return; + }; + + for (i, (name, ino)) in dentry.iter().skip(offset).enumerate() { + let Some(file) = self.fcache.get(*ino) else { + error!("readdir(ino: {ino:#x?}, fh: {fh}, offset: {offset}): EIO"); + reply.error(EIO); + return; + }; + + let curr_offset = i64::try_from(offset + i + 1).expect("Too many files in dentry"); + let full = reply.add(file.attr.ino, curr_offset, file.attr.kind, name); + if full { + break; + } + } + + reply.ok(); + } + + fn create( + &mut self, + _req: &fuser::Request<'_>, + parent: u64, + name: &std::ffi::OsStr, + mode: u32, + umask: u32, + flags: i32, + reply: fuser::ReplyCreate, + ) { + debug!( + "create(parent: {:#x?}, name: {:?}, mode: {}, umask: {:#x?}, flags: {:#x?})", + parent, name, mode, umask, flags + ); + + let ino = self.next_ino(); + + match self.dcache.try_insert_name(parent.into(), name.into(), ino) { + Some(Ok(())) => { + let mut file = File::new_regular_file(ino, name); + file.set_parent(parent.into()); + file.attr.flags = u32::try_from(flags).unwrap_or(0); + + match self.empty_handle() { + Ok(Some(fh)) => { + reply.created(&CACHE_TTL, &file.attr, GENERATION, fh, file.attr.flags); + self.fcache.insert(ino, file); + } + Ok(None) => { + error!("create(ino: {ino:#x?}): ENFILE"); + reply.error(ENFILE); + } + Err(e) => { + error!("create(ino: {ino:#x?}): EIO: {e}"); + reply.error(EIO); + } + } + } + Some(Err(())) => { + warn!( + "create(parent: {:#x?}, name: {:?}, mode: {}, umask: {:#x?}, flags: {:#x?}): EEXIST", + parent, name, mode, umask, flags + ); + + reply.error(EEXIST); + } + None => { + warn!( + "create(parent: {:#x?}, name: {:?}, mode: {}, umask: {:#x?}, flags: {:#x?}): ENOENT", + parent, name, mode, umask, flags + ); + + reply.error(ENOENT); + } + } + } + + fn setattr( + &mut self, + _req: &fuser::Request<'_>, + ino: u64, + mode: Option<u32>, + uid: Option<u32>, + gid: Option<u32>, + size: Option<u64>, + atime: Option<fuser::TimeOrNow>, + mtime: Option<fuser::TimeOrNow>, + ctime: Option<std::time::SystemTime>, + fh: Option<u64>, + _crtime: Option<std::time::SystemTime>, + _chgtime: Option<std::time::SystemTime>, + _bkuptime: Option<std::time::SystemTime>, + flags: Option<u32>, + reply: fuser::ReplyAttr, + ) { + debug!( + "setattr(ino: {:#x?}, mode: {:?}, uid: {:?}, \ + gid: {:?}, size: {:?}, fh: {:?}, flags: {:?})", + ino, mode, uid, gid, size, fh, flags + ); + + if let Some(file) = self.fcache.get_mut(ino.into()) { + if let Some(TimeOrNow::SpecificTime(t)) = atime { + file.attr.atime = t; + } + if let Some(TimeOrNow::SpecificTime(t)) = mtime { + file.attr.mtime = t; + } + file.attr.ctime = ctime.unwrap_or(file.attr.ctime); + file.attr.size = size.unwrap_or(file.attr.size); + file.attr.flags = flags.unwrap_or(file.attr.flags); + + reply.attr(&CACHE_TTL, &file.attr); + } else { + warn!("setattr(ino: {ino:#x?}): ENOENT"); + reply.error(ENOENT); + } + } + + fn flush( + &mut self, + _req: &fuser::Request<'_>, + ino: u64, + fh: u64, + lock_owner: u64, + reply: fuser::ReplyEmpty, + ) { + debug!("flush(ino: {ino:#x?}, fh: {fh}, lock_owner: {lock_owner:?})"); + + let parent_blobref = self.get_parent_blobref_by_ino(ino.into()); + + let Some(file) = self.fcache.get_mut(ino.into()) else { + warn!("flush(ino: {ino:#x?}): ENOENT"); + reply.error(ENOENT); + return; + }; + + let fh = u32::try_from(fh).expect("Invalid file handle"); + let Some(handle) = self.handles.get_mut(fh) else { + warn!("flush(ino: {ino:#x?}): EBADF"); + reply.error(EBADF); + return; + }; + + if !handle.is_dirty() { + // Nothing to write + reply.ok(); + return; + } + + file.attr.size = handle.buflen() as u64; + file.attr.mtime = SystemTime::now(); + + let mut attrs = vec![Attr::CreatedAt(file.attr.crtime.into())]; + if let Ok(name) = file.name().into_string() { + if let Some(m) = mime::guess(&name) { + attrs.push(Attr::Mime(m)) + }; + attrs.push(Attr::Name(name)); + } + if let Some(b) = parent_blobref { + attrs.push(Attr::Group(b)) + }; + + // TODO: self.server.append if file has a blobref already + // -- or self.server.replace depending on whether we're + // appending or replacing. + let Ok(blobref) = self + .server + .put_with_attrs(handle.buffer().as_slice(), &attrs) + else { + // NOTE: Should we clear the handle on error too? + // Unsure if we should be able to retry a flush? + error!("flush(ino: {ino:#x?}): EIO"); + reply.error(EIO); + return; + }; + + file.blobref = Some(blobref); + handle.clear(); + reply.ok(); + } + + fn write( + &mut self, + _req: &fuser::Request<'_>, + ino: u64, + fh: u64, + offset: i64, + data: &[u8], + write_flags: u32, + flags: i32, + lock_owner: Option<u64>, + reply: fuser::ReplyWrite, + ) { + debug!( + "write(ino: {ino:#x?}, fh: {fh}, offset: {offset}, size: {}, write_flags: {write_flags:#x?}, flags: {flags:#x?}, lock_owner: {lock_owner:?})", + data.len(), + ); + + let size: u32 = data.len().try_into().unwrap_or(0); + + if size < 1 && !data.is_empty() { + // The block is too big. + error!( + "write(ino: {ino:#x?}, offset: {offset}, size: {}): EFBIG", + data.len() + ); + reply.error(EFBIG); + return; + } + + let fh = u32::try_from(fh).expect("Invalid file handle"); + // TODO: Should auto-flush when necessary + if let Some(ref mut handle) = self.handles.get_mut(fh) { + let offset = usize::try_from(offset).expect("Invalid offset"); + // FIXME: Get written size from handle.write result + if handle.write(data, offset).is_none() { + error!( + "write(ino: {ino:#x?}, offset: {offset}, size: {}): EIO", + data.len() + ); + reply.error(EIO); + return; + } + + reply.written(size); + } else { + warn!("write(ino: {ino:#x?}): EBADF"); + reply.error(EBADF); + } + } + + fn read( + &mut self, + _req: &fuser::Request<'_>, + ino: u64, + fh: u64, + offset: i64, + size: u32, + flags: i32, + lock_owner: Option<u64>, + reply: fuser::ReplyData, + ) { + debug!( + "read(ino: {:#x?}, fh: {}, offset: {}, size: {}, \ + flags: {:#x?}, lock_owner: {:?})", + ino, fh, offset, size, flags, lock_owner + ); + + let Some(file) = self.fcache.get(ino.into()) else { + warn!("read(ino: {ino:#x?}): EBADF"); + reply.error(ENOENT); + return; + }; + + let fh = u32::try_from(fh).expect("Invalid file handle"); + let Some(handle) = self.handles.get_mut(fh) else { + warn!("read(ino: {ino:#x?}): EBADF"); + reply.error(EBADF); + return; + }; + + // TODO: Check if offset > handle.buflen() or file.size() + let offset = usize::try_from(offset).expect("Invalid offset"); + + let Some(ref blobref) = file.blobref else { + // We haven't flushed the handle yet, but we should still be able to read from it + reply.data(&handle.read(offset, size as usize)); + return; + }; + + let Ok(bytes) = self.server.read(blobref, offset..(offset + size as usize)) else { + warn!("read(ino: {ino:#x?}): EIO"); + reply.error(EIO); + return; + }; + + reply.data(&bytes); + } + + fn release( + &mut self, + _req: &fuser::Request<'_>, + ino: u64, + fh: u64, + _flags: i32, + _lock_owner: Option<u64>, + _flush: bool, // TODO: flush if true + reply: fuser::ReplyEmpty, + ) { + debug!("release(ino: {ino:#x?}, fh: {fh})"); + self.release_handle(fh); + reply.ok(); + } +} + +fn load_dentry( + server: &Server, + ino: &mut Ino, + parent_ino: Ino, + dcache: &mut Dcache, + fcache: &mut Fcache, +) -> Result<(), c_int> { + let dentry = dcache.get_mut(parent_ino).ok_or(ENOENT)?; + if dentry.loaded { + return Ok(()); + } + + let parent_blobref = fcache + .get(parent_ino) + .and_then(|f| f.blobref.clone()) + .map(Attr::Group); + + // TODO: Pass range to `load_dentry` + let objects = server.list(0..10_000, parent_blobref).map_err(|_| EIO)?; + + let mut dir_inos = Vec::new(); + for object in objects { + let ino = ino.next(); + + let name: OsString = object + .get_name() + .unwrap_or(format!(".carton.x-nameless-{ino}")) + .into(); + + let file = if object.is_group() { + let mut file = File::new_directory(*ino, name.clone()); + file.blobref = Some(object.blobref().clone()); + dir_inos.push(*ino); + file + } else { + let mut file = File::new_regular_file(*ino, name.clone()); + file.blobref = Some(object.blobref().clone()); + file.attr.size = object.size() as u64; + file + }; + + dentry.insert(name, *ino); + fcache.insert(*ino, file); + } + dentry.loaded = true; + + for dir_ino in &dir_inos { + dcache.add_dentry(*dir_ino, parent_ino); + } + + Ok(()) +} diff --git a/src/client/fs/dcache.rs b/src/client/fs/dcache.rs new file mode 100644 index 0000000..b171abc --- /dev/null +++ b/src/client/fs/dcache.rs @@ -0,0 +1,107 @@ +use super::ino::Ino; +use std::{ + collections::{ + btree_map::{self, OccupiedError}, + BTreeMap, HashMap, + }, + ffi::{OsStr, OsString}, +}; + +#[derive(Debug)] +pub(super) struct Dcache { + inner: HashMap<Ino, Dentry>, +} + +impl Dcache { + pub fn new() -> Self { + Self { + inner: HashMap::new(), + } + } + + pub fn get_ino(&self, parent: Ino, name: &OsStr) -> Option<&Ino> { + self.get(parent).and_then(|dentry| dentry.get(name)) + } + + pub fn try_insert_name( + &mut self, + parent: Ino, + name: OsString, + ino: Ino, + ) -> Option<Result<(), ()>> { + match self + .get_mut(parent) + .map(|dentry| dentry.try_insert(name, ino)) + { + Some(Ok(_)) => Some(Ok(())), + Some(Err(_)) => Some(Err(())), + None => None, + } + } + + pub fn add_dentry(&mut self, ino: Ino, parent: Ino) -> Option<Dentry> { + self.insert(ino, Dentry::new(ino, parent)) + } + + // Map-like API + + pub fn insert(&mut self, ino: Ino, dentry: Dentry) -> Option<Dentry> { + self.inner.insert(ino, dentry) + } + + pub fn get(&self, ino: Ino) -> Option<&Dentry> { + self.inner.get(&ino) + } + + pub fn get_mut(&mut self, ino: Ino) -> Option<&mut Dentry> { + self.inner.get_mut(&ino) + } + + pub fn remove(&mut self, ino: Ino) -> Option<Dentry> { + self.inner.remove(&ino) + } +} + +#[derive(Debug)] +pub(super) struct Dentry { + inner: BTreeMap<OsString, Ino>, + pub loaded: bool, +} + +impl Dentry { + pub fn new(ino: Ino, parent: Ino) -> Self { + let mut dentry = Self { + inner: BTreeMap::new(), + loaded: false, + }; + + dentry.insert(".".into(), ino); + dentry.insert("..".into(), parent); + + dentry + } + + pub fn insert(&mut self, k: OsString, v: Ino) -> Option<Ino> { + self.inner.insert(k, v) + } + + pub fn try_insert( + &mut self, + k: OsString, + v: Ino, + ) -> Result<&mut Ino, btree_map::OccupiedError<'_, OsString, Ino>> { + self.inner.try_insert(k, v) + } + + pub fn get(&self, k: &OsStr) -> Option<&Ino> { + self.inner.get(k) + } + + pub fn remove(&mut self, k: &OsStr) -> Option<Ino> { + self.inner.remove(k) + } + + pub fn iter(&self) -> impl Iterator<Item = (&OsString, &Ino)> { + self.inner.iter() + } +} diff --git a/src/client/fs/fcache.rs b/src/client/fs/fcache.rs new file mode 100644 index 0000000..03785c0 --- /dev/null +++ b/src/client/fs/fcache.rs @@ -0,0 +1,31 @@ +use super::{file::File, ino::Ino}; +use std::collections::HashMap; + +#[derive(Debug)] +pub struct Fcache { + inner: HashMap<Ino, File>, +} + +impl Fcache { + pub fn new() -> Self { + Self { + inner: HashMap::new(), + } + } + + pub fn insert(&mut self, ino: Ino, file: File) -> Option<File> { + self.inner.insert(ino, file) + } + + pub fn get(&self, ino: Ino) -> Option<&File> { + self.inner.get(&ino) + } + + pub fn get_mut(&mut self, ino: Ino) -> Option<&mut File> { + self.inner.get_mut(&ino) + } + + pub fn remove(&mut self, ino: Ino) -> Option<File> { + self.inner.remove(&ino) + } +} diff --git a/src/client/fs/fh.rs b/src/client/fs/fh.rs new file mode 100644 index 0000000..b1e8739 --- /dev/null +++ b/src/client/fs/fh.rs @@ -0,0 +1,110 @@ +use crate::{common::temp_file, server::Stream}; +use log::error; +use std::{ + fs::{self, File}, + io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write}, + ops::Index, + path::PathBuf, + slice::SliceIndex, +}; + +pub struct FileHandle { + buffer: Vec<u8>, + size: usize, + tmp: Option<(PathBuf, BufWriter<File>)>, + reader: Option<BufReader<File>>, + dirty: bool, +} + +impl FileHandle { + pub fn empty() -> std::io::Result<Self> { + let (temp_path, temp_file) = temp_file()?; + + Ok(Self { + buffer: Vec::new(), + size: 0, + tmp: Some((temp_path, BufWriter::new(temp_file))), + reader: None, + dirty: false, + }) + } + + pub fn new_rw(reader: BufReader<File>) -> std::io::Result<Self> { + // TODO: Copy read to tmp + let (temp_path, temp_file) = temp_file()?; + + Ok(Self { + buffer: Vec::new(), + size: 0, + tmp: Some((temp_path, BufWriter::new(temp_file))), + reader: Some(reader), + dirty: false, + }) + } + + pub fn new_ro(reader: BufReader<File>) -> Self { + Self { + buffer: Vec::new(), + size: 0, + tmp: None, + reader: Some(reader), + dirty: false, + } + } + + pub fn is_dirty(&self) -> bool { + self.dirty + } + + pub fn write(&mut self, data: &[u8], offset: usize) -> Option<usize> { + let Some((_, ref mut tmp)) = self.tmp else { + error!("Tried to write to a RO file handle"); + return None; + }; + + tmp.seek(SeekFrom::Start(offset as u64)) + .inspect_err(|e| error!("Seek error: {e}")) + .ok()?; + let written = tmp + .write(data) + .inspect_err(|e| error!("Write error: {e}")) + .ok()?; + + self.dirty = true; + + Some(written) + } + + pub fn read(&mut self, offset: usize, size: usize) -> Vec<u8> { + let Some(mut reader) = self.reader else { + error!("Tried to read from an empty file handle"); + return Vec::new(); + }; + + let mut buf = Vec::with_capacity(size); + + // TODO: error handling... + reader.seek(SeekFrom::Start(offset as u64)).unwrap(); + reader.read_exact(&mut buf).unwrap(); + + buf + } + + pub fn clear(&mut self) { + self.tmp.as_mut().map(|b| b.1.flush()); + self.buffer.clear(); + self.dirty = false; + } +} + +impl Drop for FileHandle { + fn drop(&mut self) { + let Some((ref temp_path, _)) = self.tmp else { + return; + }; + + if let Err(e) = fs::remove_file(temp_path) { + error!("Couldn't delete temp file {temp_path:?}: {e}"); + } + } +} diff --git a/src/client/fs/file.rs b/src/client/fs/file.rs new file mode 100644 index 0000000..5d51913 --- /dev/null +++ b/src/client/fs/file.rs @@ -0,0 +1,78 @@ +use super::ino::Ino; +use crate::server::blobref::BlobRef; +use fuser::{FileAttr, FileType}; +use std::{ + ffi::{OsStr, OsString}, + time::SystemTime, +}; + +const DEFAULT_PERMISSIONS: u16 = 0o644; + +#[derive(Debug)] +pub struct File { + // Files only have a blobref if they were written to. No blob is created if the + // files is only `touch`ed. This means empty files will disappear on `umount`. + pub blobref: Option<BlobRef>, + parent: Option<Ino>, + pub attr: FileAttr, + name: OsString, +} + +impl File { + fn new(ino: Ino, name: &OsStr, kind: FileType) -> Self { + let now = SystemTime::now(); + + let attr = FileAttr { + ino: ino.into(), + size: 0, + blocks: 0, + atime: now, + mtime: now, + ctime: now, + crtime: now, + kind, + perm: DEFAULT_PERMISSIONS, + nlink: 0, + uid: 0, + gid: 0, + rdev: 0, + flags: 0, + blksize: 0, + }; + + File { + blobref: None, + parent: None, + attr, + name: name.into(), + } + } + + pub fn new_regular_file<T: Into<OsString>>(ino: Ino, name: T) -> Self { + Self::new(ino, &name.into(), FileType::RegularFile) + } + + pub fn new_directory<T: Into<OsString>>(ino: Ino, name: T) -> Self { + Self::new(ino, &name.into(), FileType::Directory) + } + + pub fn set_parent(&mut self, ino: Ino) { + self.parent = Some(ino); + } + + pub fn name(&self) -> OsString { + self.name.clone() + } + + pub fn parent(&self) -> Option<Ino> { + self.parent + } + + pub fn ino(&self) -> Ino { + self.attr.ino.into() + } + + pub fn size(&self) -> usize { + self.attr.size as usize + } +} diff --git a/src/client/fs/ino.rs b/src/client/fs/ino.rs new file mode 100644 index 0000000..0b7628e --- /dev/null +++ b/src/client/fs/ino.rs @@ -0,0 +1,51 @@ +use std::fmt::Display; + +const ROOT_INO: u64 = 1; + +#[derive(Clone, Debug, Eq, PartialEq, Hash, Copy)] +pub struct Ino(u64); + +impl Ino { + pub fn new() -> Self { + Self(ROOT_INO) + } + + pub fn next(&mut self) -> &Self { + self.0 += 1; + self + } + + pub fn prev(&mut self) { + self.0 -= 1; + } +} + +impl From<Ino> for u64 { + fn from(ino: Ino) -> Self { + ino.0 + } +} + +impl From<u64> for Ino { + fn from(ino: u64) -> Self { + Self(ino) + } +} + +impl From<u32> for Ino { + fn from(ino: u32) -> Self { + Self(u64::from(ino)) + } +} + +impl Display for Ino { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + +impl Default for Ino { + fn default() -> Self { + Self::new() + } +} diff --git a/src/common.rs b/src/common.rs new file mode 100644 index 0000000..4a622f5 --- /dev/null +++ b/src/common.rs @@ -0,0 +1,26 @@ +pub mod hash; +pub mod json; +pub mod mime; +pub mod slot_map; +pub mod sqlite; + +use std::{fs::File, path::PathBuf}; + +use data_encoding::Encoding; +use data_encoding_macro::new_encoding; +use rand::{thread_rng, Rng}; + +pub const BASE32: Encoding = new_encoding! { + symbols: "ABCDEFGHIJKLMNOPQRSTUVWXYZ234567", + translate_from: "abcdefghijklmnopqrstuvwxyz", + translate_to: "ABCDEFGHIJKLMNOPQRSTUVWXYZ", +}; + +pub fn temp_file() -> std::io::Result<(PathBuf, File)> { + let key: [u8; 16] = thread_rng().gen(); + + let mut path = std::env::temp_dir(); + path.push(format!("carton-{}", BASE32.encode(&key))); + + Ok((path.clone(), File::create_new(path)?)) +} diff --git a/src/common/hash.rs b/src/common/hash.rs new file mode 100644 index 0000000..0d46da0 --- /dev/null +++ b/src/common/hash.rs @@ -0,0 +1,109 @@ +use super::BASE32; + +#[derive(Debug)] +pub enum Error { + ReadHash(String), + InvalidBytes, +} + +pub const BLAKE3_BYTES: usize = 32; + +#[derive(Clone, Copy, Eq, PartialEq, Hash, PartialOrd, Ord)] +pub enum Hash { + Blake3([u8; BLAKE3_BYTES]), +} + +pub enum Hasher { + Blake3(blake3::Hasher), +} + +impl Default for Hasher { + fn default() -> Self { + Hasher::Blake3(blake3::Hasher::new()) + } +} + +impl Hasher { + pub fn update(&mut self, bytes: &[u8]) { + match self { + Hasher::Blake3(ref mut h) => { + h.update(bytes); + } + } + } + + pub fn finish(&self) -> Hash { + match self { + Hasher::Blake3(ref h) => { + let result = h.finalize(); + let mut hash = [0; BLAKE3_BYTES]; + hash.clone_from_slice(result.as_bytes()); + Hash::Blake3(hash) + } + } + } +} + +impl std::fmt::Debug for Hash { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(fmt, "{}", self.to_base32()) + } +} + +#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)] +enum Algo { + Blake3 = 1, +} + +impl Hash { + pub fn to_bytes(self) -> [u8; 1 + BLAKE3_BYTES] { + match self { + Hash::Blake3(ref s) => { + let mut out = [0; 1 + BLAKE3_BYTES]; + out[0] = Algo::Blake3 as u8; + out[1..].clone_from_slice(s); + out + } + } + } + + pub fn from_bytes(s: &[u8]) -> Option<Self> { + if s.len() >= 1 + BLAKE3_BYTES && s[0] == Algo::Blake3 as u8 { + let mut out = [0; BLAKE3_BYTES]; + out.clone_from_slice(&s[1..]); + Some(Hash::Blake3(out)) + } else { + None + } + } + + pub fn validate(s: &[u8]) -> Result<(), Error> { + if s.len() >= 1 + BLAKE3_BYTES && s[0] == Algo::Blake3 as u8 { + Ok(()) + } else { + Err(Error::InvalidBytes) + } + } + + pub fn to_base32(self) -> String { + let hash = self.to_bytes(); + BASE32.encode(&hash) + } + + pub fn from_base32(s: &[u8]) -> Option<Self> { + let bytes = BASE32.decode(s).ok()?; + Self::from_bytes(&bytes) + } +} + +impl std::str::FromStr for Hash { + type Err = Error; + + fn from_str(s: &str) -> Result<Self, Self::Err> { + if let Some(b) = Self::from_base32(s.as_bytes()) { + Ok(b) + } else { + Err(Error::ReadHash(s.to_string())) + } + } +} diff --git a/src/common/json.rs b/src/common/json.rs new file mode 100644 index 0000000..50bd788 --- /dev/null +++ b/src/common/json.rs @@ -0,0 +1,7 @@ +use serde::Serialize; + +pub fn serialize<S: Serialize>(s: S) -> Vec<u8> { + let mut buffer: Vec<u8> = Vec::new(); + serde_json::to_writer(&mut buffer, &s).unwrap(); + buffer +} diff --git a/src/common/mime.rs b/src/common/mime.rs new file mode 100644 index 0000000..1345721 --- /dev/null +++ b/src/common/mime.rs @@ -0,0 +1,56 @@ +use rusqlite::{ + types::{FromSql, FromSqlError}, + ToSql, +}; +use serde::{Deserialize, Serialize}; +use std::path::Path; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum MimeType { + #[serde(rename = "application/x.group")] + ApplicationXGroup, + #[serde(rename = "application/pdf")] + ApplicationPdf, + #[serde(rename = "application/zip")] + ApplicationZip, + #[serde(rename = "image/png")] + ImagePng, + #[serde(rename = "image/jpeg")] + ImageJpeg, + #[serde(rename = "text/csv")] + ImageXXcf, + #[serde(rename = "image/x-xcf")] + TextCsv, + #[serde(rename = "text/css")] + TextCss, +} + +pub fn guess(name: &str) -> Option<MimeType> { + match Path::new(name).extension()?.to_str()? { + "pdf" => Some(MimeType::ApplicationPdf), + "zip" => Some(MimeType::ApplicationZip), + "png" => Some(MimeType::ImagePng), + "jpg" | "jpeg" => Some(MimeType::ImageJpeg), + "csv" => Some(MimeType::TextCsv), + "css" => Some(MimeType::TextCss), + "xcf" => Some(MimeType::ImageXXcf), + _ => None, + } +} + +impl ToSql for MimeType { + fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> { + if let Ok(serde_json::Value::String(mime)) = serde_json::to_value(self) { + return Ok(mime.into()); + } + + unreachable!() + } +} + +impl FromSql for MimeType { + fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> { + let v: String = FromSql::column_result(value)?; + serde_json::from_str(&format!("\"{}\"", v)).map_err(|_| FromSqlError::InvalidType) + } +} diff --git a/src/common/slot_map.rs b/src/common/slot_map.rs new file mode 100644 index 0000000..110ee59 --- /dev/null +++ b/src/common/slot_map.rs @@ -0,0 +1,85 @@ +use std::collections::HashMap; + +pub struct SlotMap<T> { + bitmap: u64, + map: HashMap<u32, T>, +} + +impl<T> SlotMap<T> { + pub fn new() -> Self { + Self { + bitmap: u64::MAX, + map: HashMap::new(), + } + } + + pub fn insert(&mut self, value: T) -> Option<u32> { + let slot = self.get_free_slot()?; + self.map.insert(slot, value); + self.use_slot(slot); + + Some(slot) + } + + pub fn remove(&mut self, slot: u32) -> Option<T> { + let value = self.map.remove(&slot)?; + self.release_slot(slot); + + Some(value) + } + + pub fn get(&self, slot: u32) -> Option<&T> { + self.map.get(&slot) + } + + pub fn get_mut(&mut self, slot: u32) -> Option<&mut T> { + self.map.get_mut(&slot) + } + + fn get_free_slot(&self) -> Option<u32> { + let leading_zeros = self.bitmap.leading_zeros(); + + if leading_zeros > 63 { + None + } else { + Some(63 - leading_zeros) + } + } + + fn use_slot(&mut self, slot: u32) { + let mask = u64::MAX; + println!("{:0b}", self.bitmap); + self.bitmap &= !(1 << slot) & mask; + } + + fn release_slot(&mut self, slot: u32) { + self.bitmap |= 1 << slot; + } +} + +#[test] +fn releases_a_slot_after_removal() { + let mut slot_map = SlotMap::new(); + + assert_eq!(slot_map.insert(1), Some(63)); + assert_eq!(slot_map.insert(2), Some(62)); + assert_eq!(slot_map.insert(3), Some(61)); + + assert_eq!(slot_map.remove(&62), Some(2)); + + assert_eq!(slot_map.insert(4), Some(62)); + assert_eq!(slot_map.insert(5), Some(60)); +} + +#[test] +fn uses_all_available_slots() { + let mut slot_map = SlotMap::new(); + + for x in 0..64 { + assert_eq!(slot_map.insert(0), Some(63 - x)); + } + + assert_eq!(slot_map.insert(0), None); + assert_eq!(slot_map.remove(&43), Some(0)); + assert_eq!(slot_map.insert(0), Some(43)); +} diff --git a/src/common/sqlite.rs b/src/common/sqlite.rs new file mode 100644 index 0000000..d373218 --- /dev/null +++ b/src/common/sqlite.rs @@ -0,0 +1,60 @@ +use log::error; +use rusqlite::{Connection, Params, Row}; + +pub fn get<F, P, T>(db: &Connection, query: &str, params: P, row_mapper: F) -> rusqlite::Result<T> +where + F: FnMut(&Row<'_>) -> rusqlite::Result<T>, + P: Params, +{ + let mut stmt = match db.prepare(query) { + Ok(stmt) => stmt, + Err(e) => { + error!("Couldn't prepare get statement: {e:?}"); + return Err(e); + } + }; + + stmt.query_row(params, row_mapper).inspect_err(|e| { + error!("Couldn't read from database: {e:?}"); + }) +} + +pub fn list<F, P, T>( + db: &Connection, + query: &str, + params: P, + row_mapper: F, +) -> rusqlite::Result<Vec<T>> +where + F: FnMut(&Row<'_>) -> rusqlite::Result<T>, + P: Params, +{ + let mut stmt = match db.prepare(query) { + Ok(stmt) => stmt, + Err(e) => { + error!("Couldn't prepare list statement: {e:?}"); + return Err(e); + } + }; + + let result = stmt.query_map(params, row_mapper); + + match result { + Ok(res) => { + let records: rusqlite::Result<Vec<T>> = res.collect(); + + match records { + Ok(records) => Ok(records), + Err(e) => { + error!("Couldn't read from database: {e:?}"); + Err(e) + } + } + } + Err(rusqlite::Error::QueryReturnedNoRows) => Ok(vec![]), + Err(e) => { + error!("Couldn't read from database: {e:?}"); + Err(e) + } + } +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..3d0eee5 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,65 @@ +#![deny(clippy::all)] +#![warn(clippy::pedantic)] +#![allow(clippy::module_name_repetitions)] +#![feature(file_create_new)] +#![feature(map_try_insert)] +#![feature(result_option_inspect)] + +mod client; +mod common; +mod index; +mod server; + +// use fs::FileSystem; +// use index::Index; +use client::fs::FileSystem; +use serde::{Deserialize, Serialize}; +use server::Server; +use std::env; + +const CONFIG: &str = "carton.toml"; + +#[derive(Serialize, Deserialize)] +struct Config { + server: server::Config, + client: client::Config, +} + +fn main() { + let mut logger = env_logger::Builder::from_default_env(); + logger.filter(None, log::LevelFilter::Debug).init(); + + // Config + let config: Config = toml::from_str( + &std::fs::read_to_string(CONFIG).expect("Missing config file at {CONFIG:?}"), + ) + .expect("Invalid config file at {CONFIG:?}"); + + // Args + let args: Vec<String> = env::args().collect(); + + if args.len() < 2 { + println!("Missing command for carton."); + return; + } + + match args[1].as_str() { + // "index-scan" => cmd_index_scan(&args[2..]), + // "reindex" => cmd_index_scan(&args[2..]), + "fs-mount" => cmd_fs_mount(config, &args[2..]), + _ => todo!(), + } +} + +// fn cmd_index_scan(args: &[String]) { +// index::spawn_scan(Path::new(ROOT)); +// } + +fn cmd_fs_mount(config: Config, _args: &[String]) { + let server = Server::new(&config.server).expect("Couldn't initialize server."); + + // let index = Index::new(Path::new(ROOT)).unwrap(); + + let fs = FileSystem::new(server); + fs.mount(&config.client.fs.mountpoint).unwrap(); +} diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..72f848a --- /dev/null +++ b/src/server.rs @@ -0,0 +1,139 @@ +pub mod attrs; +pub mod blobref; +mod index; +mod object; +mod storage; + +use self::{attrs::Attr, blobref::BlobRef, index::Index, object::Object, storage::Storage}; +use log::{error, warn}; +use serde::{Deserialize, Serialize}; +use std::{cmp, io::{Read, BufReader}, ops::Range, fs::File}; + +#[derive(Debug)] +pub enum Error { + Storage, + Index, +} + +type Result<T> = std::result::Result<T, Error>; + +#[derive(Serialize, Deserialize)] +pub struct Config { + storage: storage::Config, + index: index::Config, +} + +pub struct Server { + storage: Storage, + index: Index, +} + +pub struct Stream; + +impl Server { + pub fn new(config: &Config) -> Result<Self> { + let index = Index::new(&config.index).map_err(|_| Error::Index)?; + let storage = Storage::new(&config.storage); + + Ok(Self { storage, index }) + } + + pub fn read(&self, blobref: &BlobRef, range: Range<usize>) -> Result<Vec<u8>> { + let object = self.index.get(blobref).map_err(|_| Error::Index)?; + + let mut bytes: Vec<u8> = Vec::with_capacity(range.len()); + let mut initial_chunk_offset = None; + + for chunk in &object.chunks { + let chunk_end = chunk.offset + chunk.size; + + if chunk_end < range.start { + continue; + } + if chunk.offset > range.end { + break; + } + + if initial_chunk_offset.is_none() { + initial_chunk_offset = Some(chunk.offset); + } + + let chunk_bytes = self + .storage + .get(&chunk.blobref) + .map_err(|_| Error::Storage)?; + bytes.extend_from_slice(&chunk_bytes); + } + + let start = range.start - initial_chunk_offset.unwrap_or(0); + let end = cmp::min(start + range.len(), bytes.len()); + Ok(bytes[start..end].to_vec()) + } + + pub fn copy(&self, data: &BufReader<File>) -> Result<BlobRef> { + // From fs: a. Write to temp file (via BufWriter?), then on flush + // create a BufReader for temp file and pass it to this + // method. + // b. When writing to an existing file, first copy the data + // in a temp file, then proceed with a. + Err(Error::Storage) + } + + pub fn put<T: Read>(&self, data: T) -> Result<BlobRef> { + let (anchor, chunks) = self.storage.put(data).map_err(|e| { + error!("storage error: {e:?}"); + Error::Storage + })?; + + self.index + .put(&anchor, &chunks) + .map_err(|e| { + warn!("index error: {e:?}"); + }) + .ok(); + + Ok(anchor) + } + + pub fn put_with_attrs<T: Read>(&mut self, data: T, attrs: &[Attr]) -> Result<BlobRef> { + let anchor = self.put(data)?; + self.set_attrs(&anchor, attrs)?; + + Ok(anchor) + } + + fn extend<T: Read>(&self, anchor: BlobRef, data: T) -> Result<BlobRef> { + self.storage + .extend(anchor, data) + .map_err(|e| { + error!("storage error: {e:?}"); + Error::Storage + }) + .map(|r| r.0) + // TODO self.index.extend + } + + pub fn set_attrs(&mut self, anchor: &BlobRef, attrs: &[Attr]) -> Result<BlobRef> { + let patch = self.storage.set_attrs(anchor.clone(), attrs).map_err(|e| { + error!("storage error: {e:?}"); + Error::Storage + })?; + + self.index + .set_attrs(anchor, attrs) + .map_err(|e| { + warn!("index error: {e:?}"); + }) + .ok(); + + Ok(patch) + } + + pub fn list(&self, range: Range<usize>, filter: Option<Attr>) -> Result<Vec<Object>> { + let limit = range.len(); + + self.index + .list(limit, range.start, filter) + .map_err(|_| Error::Index) + } +} diff --git a/src/server/attrs.rs b/src/server/attrs.rs new file mode 100644 index 0000000..eeb273a --- /dev/null +++ b/src/server/attrs.rs @@ -0,0 +1,47 @@ +use super::blobref::BlobRef; +use crate::common::{json, mime::MimeType}; +use rusqlite::ToSql; +use serde::{Deserialize, Serialize}; +use time::OffsetDateTime; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub enum Attr { + Name(String), + Group(BlobRef), + Mime(MimeType), + CreatedAt(#[serde(with = "time::serde::rfc3339")] OffsetDateTime), + UpdatedAt(#[serde(with = "time::serde::rfc3339")] OffsetDateTime), + DeletedAt(#[serde(with = "time::serde::rfc3339")] OffsetDateTime), + Tags(Vec<String>), + Note(String), +} + +pub fn key(attr: &Attr) -> String { + match attr { + Attr::Name(_) => "name", + Attr::Group(_) => "group", + Attr::Mime(_) => "mime", + Attr::CreatedAt(_) => "created_at", + Attr::UpdatedAt(_) => "updated_at", + Attr::DeletedAt(_) => "deleted_at", + Attr::Tags(_) => "tags", + Attr::Note(_) => "note", + } + .into() +} + +impl ToSql for Attr { + fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> { + match self { + Attr::Name(name) => name.to_sql(), + Attr::Group(group) => group.to_sql(), + Attr::Mime(mime) => mime.to_sql(), + Attr::CreatedAt(created_at) => created_at.to_sql(), + Attr::UpdatedAt(updated_at) => updated_at.to_sql(), + Attr::DeletedAt(deleted_at) => deleted_at.to_sql(), + Attr::Tags(tags) => Ok(json::serialize(tags).into()), + Attr::Note(note) => note.to_sql(), + } + } +} diff --git a/src/server/blobref.rs b/src/server/blobref.rs new file mode 100644 index 0000000..7d9fa66 --- /dev/null +++ b/src/server/blobref.rs @@ -0,0 +1,62 @@ +use crate::common::hash::{self, Hasher}; +use rusqlite::types::{FromSql, FromSqlError}; +use rusqlite::{ + types::{FromSqlResult, ToSqlOutput}, + ToSql, +}; +use serde::{Deserialize, Serialize}; +use std::fmt::Display; +use std::path::PathBuf; +use std::str::FromStr; + +#[derive(Debug)] +pub enum Error { + InvalidHash(hash::Error), +} + +type Result<T> = std::result::Result<T, Error>; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Hash, Eq)] +pub struct BlobRef(String); + +impl BlobRef { + pub fn from_str(s: &str) -> Result<Self> { + hash::Hash::from_str(s).map_err(Error::InvalidHash)?; + Ok(Self(s.to_string())) + } + + pub fn for_bytes(bytes: &[u8]) -> Self { + let mut hasher = Hasher::default(); + hasher.update(bytes); + BlobRef(hasher.finish().to_base32()) + } + + pub(super) fn path(&self) -> PathBuf { + let mut buf = PathBuf::new(); + buf.push(&self.0[0..4]); + buf.push(&self.0[4..]); + + buf + } +} + +impl Display for BlobRef { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "blobref:{}", self.0) + } +} + +impl ToSql for BlobRef { + fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> { + Ok(ToSqlOutput::Borrowed(rusqlite::types::ValueRef::Text( + self.0.as_bytes(), + ))) + } +} + +impl FromSql for BlobRef { + fn column_result(value: rusqlite::types::ValueRef<'_>) -> FromSqlResult<Self> { + let v: String = FromSql::column_result(value)?; + BlobRef::from_str(&v).map_err(|_| FromSqlError::InvalidType) + } +} diff --git a/src/server/index.rs b/src/server/index.rs new file mode 100644 index 0000000..042c4aa --- /dev/null +++ b/src/server/index.rs @@ -0,0 +1,260 @@ +use super::attrs::{self, Attr}; +use super::blobref::BlobRef; +use super::object::Chunk; +use super::object::{Attrs, Object}; +use crate::common::{json, sqlite}; +use log::error; +use rusqlite::{params, Connection, Row}; +use serde::{Deserialize, Serialize}; +use std::path::PathBuf; +use time::OffsetDateTime; + +#[derive(Serialize, Deserialize)] +pub struct Config { + pub root: PathBuf, +} + +const DB_NAME: &str = "index.sqlite3"; +const SQL_INIT: &str = " + create table objects ( + anchor text primary key, + size integer not null, + chunks text not null, -- JSON array + created_at datetime not null, + updated_at datetime not null, + x_name text, + x_group text, + x_mime text, + x_created_at text, + x_updated_at text, + x_deleted_at text, + x_tags text, + x_note text + ); +"; + +#[derive(Debug)] +pub enum Error { + Internal(rusqlite::Error), +} + +type Result<T> = std::result::Result<T, Error>; + +pub struct Index { + db: Connection, +} + +impl Index { + pub fn new(config: &Config) -> Result<Self> { + let db = match Connection::open(config.root.join(DB_NAME)) { + Ok(conn) => conn, + Err(e) => { + error!("Couldn't open connection to the database: {e:?}"); + return Err(Error::Internal(e)); + } + }; + + Ok(Self { db }) + } + + pub fn put(&self, anchor: &BlobRef, chunks: &[Chunk]) -> Result<()> { + // Calculate the overall size + let size: usize = chunks.iter().map(|c| c.size).sum(); + + // Insert or update the object for `anchor` + let query = " + insert into objects (anchor, size, chunks, created_at, updated_at) + values (?, ?, ?, ?, ?) + on conflict do update set + size = excluded.size, + chunks = excluded.chunks, + updated_at = excluded.updated_at + "; + + let now = OffsetDateTime::now_utc(); + let mut stmt = self.db.prepare(query).map_err(Error::Internal)?; + stmt.execute(params![anchor, size, json::serialize(chunks), now, now]) + .map_err(Error::Internal)?; + + Ok(()) + } + + pub fn set_attrs(&mut self, anchor: &BlobRef, attrs: &[Attr]) -> Result<()> { + let now = OffsetDateTime::now_utc(); + let tx = self.db.transaction().map_err(Error::Internal)?; + + for attr in attrs { + let query = format!( + r#"update objects set "x_{}" = ? where anchor = ?"#, + attrs::key(attr) + ); + let mut stmt = tx.prepare(&query).map_err(Error::Internal)?; + + stmt.execute(params![attr, anchor]) + .map_err(Error::Internal)?; + } + + { + let mut stmt = tx + .prepare("update objects set updated_at = ? where anchor = ?") + .map_err(Error::Internal)?; + stmt.execute(params![now, anchor]) + .map_err(Error::Internal)?; + } + + tx.commit().map_err(Error::Internal)?; + + Ok(()) + } + + pub fn get(&self, anchor: &BlobRef) -> Result<Object> { + let query = "select + anchor, + size, + chunks, + created_at, + updated_at, + x_name, + x_group, + x_mime, + x_created_at, + x_updated_at, + x_deleted_at, + x_tags, + x_note + from objects + where anchor = ?"; + + sqlite::get(&self.db, query, (anchor,), |r| Object::try_from(r)).map_err(Error::Internal) + } + + pub fn list(&self, limit: usize, offset: usize, filter: Option<Attr>) -> Result<Vec<Object>> { + let where_clause = if let Some(ref filter) = filter { + format!(r#""x_{}" = ?"#, attrs::key(filter)) + } else { + "1=1".into() + }; + + let query = format!( + r#"select + anchor, + size, + chunks, + created_at, + updated_at, + x_name, + x_group, + x_mime, + x_created_at, + x_updated_at, + x_deleted_at, + x_tags, + x_note + from objects + where {where_clause} + order by rowid + limit {limit} offset {offset} + "# + ); + + if let Some(filter) = filter { + sqlite::list(&self.db, &query, (filter,), |r| Object::try_from(r)) + .map_err(Error::Internal) + } else { + sqlite::list(&self.db, &query, (), |r| Object::try_from(r)).map_err(Error::Internal) + } + } +} +// +// impl Object { +// fn new(blobref: BlobRef) -> Self { +// Object { +// blobref, +// group: None, +// is_group: false, +// name: None, +// kind: None, +// size: None, +// created_at: None, +// updated_at: None, +// deleted_at: None, +// tags: Vec::new(), +// } +// } +// +// pub fn get_name(&self) -> String { +// self.name.clone().unwrap_or(self.blobref.to_string()) +// } +// +// pub fn get_size(&self) -> u64 { +// self.size.unwrap_or(0) +// } +// +// pub fn is_group(&self) -> bool { +// self.is_group +// } +// +// fn apply_patch(&mut self, patch: Patch) { +// for change in patch.changes { +// self.apply_change(change); +// } +// } +// +// fn apply_change(&mut self, change: Change) { +// match change { +// Change::MarkAsGroup => self.is_group = true, +// Change::SetName { name } => self.name = Some(name), +// Change::SetGroup { group } => self.group = Some(group), +// Change::SetSize { size } => self.size = Some(size), +// Change::SetCreatedAt { created_at } => self.created_at = Some(created_at), +// Change::SetUpdatedAt { updated_at } => self.updated_at = Some(updated_at), +// Change::SetDeletedAt { deleted_at } => self.deleted_at = Some(deleted_at), +// Change::SetTags { tags } => self.tags = tags, +// }; +// } +// } +// +// fn row_to_object(row: &Row) -> rusqlite::Result<Object> { +// Ok(Object { +// blobref: row.get(0)?, +// group: row.get(1)?, +// is_group: row.get(2)?, +// name: row.get(3)?, +// size: row.get(4)?, +// kind: row.get(5)?, +// created_at: row.get(6)?, +// updated_at: row.get(7)?, +// deleted_at: row.get(8)?, +// tags: vec![], +// }) +// } + +impl TryFrom<&Row<'_>> for Object { + type Error = rusqlite::Error; + + fn try_from(row: &Row) -> std::result::Result<Object, Self::Error> { + let chunks_string: Vec<u8> = row.get(2)?; + let chunks = serde_json::from_slice::<Vec<Chunk>>(&chunks_string) + .map_err(|_| rusqlite::Error::InvalidQuery)?; + + let attrs = Attrs { + name: row.get(5)?, + group: row.get(6)?, + mime: row.get(7)?, + created_at: row.get(8)?, + updated_at: row.get(9)?, + deleted_at: row.get(10)?, + tags: Vec::new(), + note: row.get(12)?, + }; + + Ok(Object { + blobref: row.get(0)?, + size: row.get(1)?, + chunks, + attrs, + created_at: row.get(3)?, + updated_at: row.get(4)?, + }) + } +} diff --git a/src/server/object.rs b/src/server/object.rs new file mode 100644 index 0000000..9bad6c9 --- /dev/null +++ b/src/server/object.rs @@ -0,0 +1,51 @@ +use super::blobref::BlobRef; +use crate::common::mime::MimeType; +use serde::{Deserialize, Serialize}; +use time::OffsetDateTime; + +#[derive(Debug)] +pub(super) struct Attrs { + pub name: Option<String>, + pub group: Option<BlobRef>, + pub mime: Option<MimeType>, + pub created_at: Option<OffsetDateTime>, + pub updated_at: Option<OffsetDateTime>, + pub deleted_at: Option<OffsetDateTime>, + pub tags: Vec<String>, + pub note: Option<String>, +} + +#[derive(Debug, Serialize, Deserialize)] +pub(super) struct Chunk { + pub size: usize, + pub offset: usize, + pub blobref: BlobRef, +} + +#[derive(Debug)] +pub struct Object { + pub(super) blobref: BlobRef, + pub(super) size: usize, + pub(super) chunks: Vec<Chunk>, + pub(super) attrs: Attrs, + pub(super) created_at: Option<OffsetDateTime>, + pub(super) updated_at: Option<OffsetDateTime>, +} + +impl Object { + pub fn blobref(&self) -> &BlobRef { + &self.blobref + } + + pub fn size(&self) -> usize { + self.size + } + + pub fn is_group(&self) -> bool { + matches!(self.attrs.mime, Some(MimeType::ApplicationXGroup)) + } + + pub fn get_name(&self) -> Option<String> { + self.attrs.name.clone() + } +} diff --git a/src/server/storage.rs b/src/server/storage.rs new file mode 100644 index 0000000..2b81ce1 --- /dev/null +++ b/src/server/storage.rs @@ -0,0 +1,112 @@ +pub mod backend; +mod record; + +use self::{ + backend::StorageBackend, + record::{Anchor, Change, Patch}, +}; +use super::{attrs::Attr, blobref::BlobRef, object::Chunk}; +use crate::common::{hash, json}; +use fastcdc::v2020::StreamCDC; +use serde::{Deserialize, Serialize}; +use std::io::Read; + +#[derive(Debug)] +pub enum Error { + Io, + InvalidHash(hash::Error), +} + +type Result<T> = std::result::Result<T, Error>; + +type Backend = backend::disk::Disk; + +const ANCHOR_BUCKET: &str = "anchor"; +const PATCH_BUCKET: &str = "patch"; +const PLAIN_BUCKET: &str = "plain"; + +pub struct Storage { + anchors: Backend, + patches: Backend, + plains: Backend, +} + +#[derive(Serialize, Deserialize)] +pub struct Config { + pub backend: <Backend as StorageBackend>::Config, +} + +impl Storage { + pub fn new(config: &Config) -> Self { + let backend_factory = backend::factory(&config.backend); + + let anchors = backend_factory(ANCHOR_BUCKET); + let patches = backend_factory(PATCH_BUCKET); + let plains = backend_factory(PLAIN_BUCKET); + + Self { + anchors, + patches, + plains, + } + } + + pub fn get(&self, blobref: &BlobRef) -> Result<Vec<u8>> { + self.plains.get(blobref).and_then(|x| x.ok_or(Error::Io)) + } + + pub fn put<T: Read>(&self, data: T) -> Result<(BlobRef, Vec<Chunk>)> { + let anchor = self.anchors.put(&json::serialize(Anchor::new())).unwrap(); + + self.extend(anchor, data) + } + + pub fn extend<T: Read>(&self, anchor: BlobRef, data: T) -> Result<(BlobRef, Vec<Chunk>)> { + let chunker = StreamCDC::new(data, 4096, 4_194_304, 16_777_216); + let mut chunks = Vec::new(); + + for chunk in chunker { + let chunk = chunk.map_err(|_| Error::Io)?; + let chunk_blobref = self.plains.put(&chunk.data).unwrap(); + + let mut patch = Patch::new(anchor.clone()); + + patch.add_change(Change::AddChunk { + blobref: chunk_blobref.clone(), + offset: chunk.offset, + size: chunk.length, + }); + chunks.push(Chunk { + blobref: chunk_blobref, + offset: chunk.offset as usize, + size: chunk.length, + }); + + self.patches.put(&json::serialize(patch)).unwrap(); + } + + Ok((anchor, chunks)) + } + + pub fn set_attr(&self, anchor: BlobRef, attr: Attr) -> Result<BlobRef> { + let mut patch = Patch::new(anchor); + patch.add_change(Change::SetAttr(attr)); + self.patches.put(&json::serialize(patch)) + } + + pub fn set_attrs(&self, anchor: BlobRef, attrs: &[Attr]) -> Result<BlobRef> { + let mut patch = Patch::new(anchor); + + for attr in attrs { + patch.add_change(Change::SetAttr(attr.clone())); + } + + self.patches.put(&json::serialize(patch)) + } + + // pub fn get_anchor(&self, blobref: &BlobRef) -> Result<Option<Anchor>> {} + + // pub fn get_patch(&self, blobref: &BlobRef) -> Result<Option<Patch>> {} + + // pub fn get_plain(&self, blobref: &BlobRef) -> Result<Option<Vec<u8>>> {} +} diff --git a/src/server/storage/backend.rs b/src/server/storage/backend.rs new file mode 100644 index 0000000..8ae62b4 --- /dev/null +++ b/src/server/storage/backend.rs @@ -0,0 +1,26 @@ +pub mod disk; + +use super::Result; +use crate::server::blobref::BlobRef; + +pub fn factory<'a, T: StorageBackend>(config: &T::Config) -> impl Fn(&'a str) -> T + '_ { + |bucket: &str| T::new(bucket, config) +} + +pub trait StorageBackend: Iterator { + type Config; + + fn new(bucket: &str, config: &Self::Config) -> Self; + + fn put(&self, data: &[u8]) -> Result<BlobRef>; + + fn get(&self, blobref: &BlobRef) -> Result<Option<Vec<u8>>>; + + fn exists(&self, blobref: &BlobRef) -> bool { + if let Ok(Some(_)) = self.get(blobref) { + return true; + } + + false + } +} diff --git a/src/server/storage/backend/disk.rs b/src/server/storage/backend/disk.rs new file mode 100644 index 0000000..3137376 --- /dev/null +++ b/src/server/storage/backend/disk.rs @@ -0,0 +1,65 @@ +use super::StorageBackend; +use crate::server::blobref::BlobRef; +use crate::server::storage::{Error, Result}; +use log::debug; +use serde::{Deserialize, Serialize}; +use std::{fs, io::ErrorKind, path::PathBuf}; + +pub struct Disk { + bucket: String, + pub root: PathBuf, +} + +#[derive(Serialize, Deserialize)] +pub struct Config { + pub root: PathBuf, +} + +impl StorageBackend for Disk { + type Config = Config; + + fn new(bucket: &str, config: &Self::Config) -> Self { + Self { + bucket: bucket.to_string(), + root: config.root.clone().join(bucket), + } + } + + fn put(&self, data: &[u8]) -> Result<BlobRef> { + let blobref = BlobRef::for_bytes(data); + debug!("Preparing {blobref}"); + + if !self.exists(&blobref) { + let blobpath = self.root.join(blobref.path()); + let blobdir = blobpath.parent().expect("blobpath should have a parent"); + debug!("Writing blob to {blobpath:?}"); + + fs::create_dir_all(blobdir).map_err(|_| Error::Io)?; + fs::write(blobpath, data).map_err(|_| Error::Io)?; + } + + Ok(blobref) + } + + fn get(&self, blobref: &BlobRef) -> Result<Option<Vec<u8>>> { + let blobpath = self.root.join(blobref.path()); + + match fs::read(blobpath) { + Ok(contents) => Ok(Some(contents)), + Err(e) if e.kind() == ErrorKind::NotFound => Ok(None), + Err(e) => Err(Error::Io), + } + } + + fn exists(&self, blobref: &BlobRef) -> bool { + self.root.join(blobref.path()).exists() + } +} + +impl Iterator for Disk { + type Item = BlobRef; + + fn next(&mut self) -> Option<Self::Item> { + todo!() + } +} diff --git a/src/server/storage/record.rs b/src/server/storage/record.rs new file mode 100644 index 0000000..ab5c977 --- /dev/null +++ b/src/server/storage/record.rs @@ -0,0 +1,56 @@ +use crate::server::{attrs::Attr, blobref::BlobRef}; +use rand::{thread_rng, Rng}; +use serde::{Deserialize, Serialize}; +use time::OffsetDateTime; + +#[derive(Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub(super) struct Anchor { + rand: u64, + #[serde(with = "time::serde::rfc3339")] + created_at: OffsetDateTime, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub(super) struct Patch { + pub anchor: BlobRef, + pub changes: Vec<Change>, + #[serde(with = "time::serde::rfc3339")] + created_at: OffsetDateTime, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +// #[allow(clippy::enum_variant_names)] +pub(super) enum Change { + AddChunk { + blobref: BlobRef, + offset: u64, + size: usize, + }, + SetAttr(Attr), +} + +impl Anchor { + pub(super) fn new() -> Self { + Self { + rand: thread_rng().gen(), + created_at: OffsetDateTime::now_utc(), + } + } +} + +impl Patch { + pub(super) fn new(anchor: BlobRef) -> Self { + Self { + anchor, + changes: Vec::new(), + created_at: OffsetDateTime::now_utc(), + } + } + + pub(super) fn add_change(&mut self, change: Change) { + self.changes.push(change); + } +} |