diff options
author | evuez <julien@mulga.net> | 2022-11-26 15:38:06 -0500 |
---|---|---|
committer | evuez <julien@mulga.net> | 2024-04-03 22:44:12 +0200 |
commit | 86098797034cbc7eb6db0cee54e17f8dcaedbc5d (patch) | |
tree | 29b6225ead843eb9022296a54657bbadfa1c4da0 /src/server.rs | |
download | blom-main.tar.gz |
Diffstat (limited to 'src/server.rs')
-rw-r--r-- | src/server.rs | 291 |
1 files changed, 291 insertions, 0 deletions
diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..c23d74a --- /dev/null +++ b/src/server.rs @@ -0,0 +1,291 @@ +mod conn; +mod parser; + +use crate::common::db::Database; +use crate::common::expr::Error as ExprError; +use crate::common::query::Error as QueryError; +use conn::Conn; +use conn::Error as ConnError; +use log::debug; +use log::error; +use log::info; +use log::trace; +use log::warn; +use mio::event::Event; +use mio::event::Events; +use mio::net::TcpListener; +use mio::Interest; +use mio::Poll; +use mio::Token; +use std::collections::HashMap; +use std::fmt; +use std::io::ErrorKind; +use std::io::Read; +use std::net::SocketAddr; +use std::process; +use std::process::ExitCode; +use std::time::Duration; + +pub const USAGE: &str = " +Usage: blom start + + Starts the blom server. + +Options + + -b, --bind Bind to the given ADDRESS:PORT. Default is + 0.0.0.0:4902 + + +"; + +pub const HEADER: &str = " + ___ ___ ___ ___ + /\\ \\ /\\__\\ /\\ \\ /\\__\\ + /..\\ \\ /./ / /..\\ \\ /..L_L_ + /..\\.\\__\\ /./__/ /./\\.\\__\\ /./L.\\__\\ + \\.\\../ / \\.\\ \\ \\.\\/./ / \\/_/./ / + \\../ / \\.\\__\\ \\../ / /./ / + \\/__/ \\/__/ \\/__/ \\/__/ +"; + +const SERVER_TOKEN: Token = Token(0); +const SERVER_BUFLEN: usize = 512; +const CONN_BUFLEN: usize = 2048; +const MAX_CONN: usize = 1000; + +#[derive(Debug)] +enum Error { + IO(std::io::Error), + Query(QueryError), + Expr(ExprError), +} + +struct Server { + addr: SocketAddr, + pid: u32, + poll: Poll, + pool: Vec<Token>, + listener: TcpListener, + conns: HashMap<Token, Conn>, + buffer: [u8; SERVER_BUFLEN], + db: Database, +} + +pub fn cmd(args: &[String]) -> ExitCode { + println!("{}\n", HEADER); + + let addr = match args.get(0).map(|x| x.as_str()) { + Some("--bind") | Some("-b") => { + Some(args.get(1).expect("Missing address for --bind.").as_str()) + } + _ => None, + } + .unwrap_or("0.0.0.0:4902"); + + let mut server = match Server::bind(addr.parse().unwrap(), MAX_CONN) { + Some(server) => server, + None => return ExitCode::FAILURE, + }; + println!( + "PID: {} | Listening on {}; press Ctrl-C to exit...\n\n", + server.pid, server.addr + ); + + match server.start() { + Ok(()) => { + info!("Received signal X, terminating."); + ExitCode::SUCCESS + } + Err(e) => { + error!("Error: {e:?}"); + ExitCode::FAILURE + } + } +} + +impl Server { + fn bind(addr: SocketAddr, max_conns: usize) -> Option<Self> { + assert!(max_conns > 0); + + let mut listener = match TcpListener::bind(addr) { + Ok(listener) => listener, + Err(ref e) if e.kind() == ErrorKind::AddrInUse => { + println!("{addr} already in use."); + return None; + } + Err(_) => { + println!("Couldn't bind to {addr}."); + return None; + } + }; + + let poll = Poll::new().unwrap(); + + poll.registry() + .register(&mut listener, SERVER_TOKEN, Interest::READABLE) + .unwrap(); + + Some(Server { + addr, + pid: process::id(), + poll, + listener, + conns: HashMap::new(), + buffer: [0; SERVER_BUFLEN], + pool: (1..max_conns + 1).map(Token).collect(), + db: Database::new(), + }) + } + + fn start(&mut self) -> Result<(), Error> { + let mut events = Events::with_capacity(512); + + loop { + match self.poll.poll(&mut events, Some(Duration::from_secs(30))) { + Ok(()) => (), + Err(ref e) if e.kind() == ErrorKind::Interrupted => break, + Err(e) => return Err(Error::IO(e)), + }; + + if events.is_empty() { + trace!("Pulse"); + } + + for event in &events { + match event.token() { + SERVER_TOKEN => self.handle_new(), + token => self.handle_conn(token, event), + } + } + } + + Ok(()) + } + + fn handle_new(&mut self) { + loop { + match self.listener.accept() { + Ok((mut socket, _)) => match self.pool.pop() { + Some(token) => { + self.poll + .registry() + .register(&mut socket, token, Interest::READABLE | Interest::WRITABLE) + .unwrap(); + + let conn = Conn::new(socket, token); + + debug!( + "[{}] Connected. {} clients connected.", + conn, + MAX_CONN - self.pool.len() + ); + + self.conns.insert(token, conn); + } + None => { + warn!( + "[{}] Connection refused: Too many clients.", + socket.peer_addr().unwrap() + ); + break; + } + }, + Err(ref e) if e.kind() == ErrorKind::WouldBlock => break, + Err(_) => panic!("FATAL | Unexpected error"), + } + } + } + + fn handle_conn(&mut self, token: Token, event: &Event) { + let mut did_work = true; + while did_work { + did_work = false; + + let conn = self.conns.get_mut(&token).unwrap(); + + if conn.pending_exec() { + did_work = true; + if let Err(ConnError::Fatal) = conn.handle_pending(&mut self.db) { + self.close_conn(token); + break; + } + } + + if event.is_readable() && conn.read_ready() { + did_work = true; + + match conn.read(&mut self.buffer) { + Ok(0) => { + self.close_conn(token); + break; + } + Ok(len) => { + if let Err(ConnError::Fatal) = + conn.handle_msg(&mut self.db, &self.buffer[0..len]) + { + self.close_conn(token); + break; + } + } + Err(ref e) if e.kind() == ErrorKind::WouldBlock => break, + Err(ref e) if e.kind() == ErrorKind::ConnectionReset => { + self.close_conn(token); + break; + } + Err(e) => { + error!("!!! [{conn}] Fatal error: {e:?}."); + self.close_conn(token); + break; + } + } + } + + if event.is_writable() && conn.write_ready() { + did_work = true; + if let Err(e) = conn.reply() { + error!("!!! [{conn}] Fatal error: {e:?}."); + self.close_conn(token); + break; + }; + } + } + } + + fn close_conn(&mut self, token: Token) { + let conn = self.conns.remove(&token).unwrap(); + self.pool.push(token); + debug!( + "[{conn}] Closed. {} clients connected.", + MAX_CONN - self.pool.len() + ); + } +} + +// Display + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let res = match self { + Error::IO(err) => err.to_string(), + Error::Query(err) => err.to_string(), + Error::Expr(err) => err.to_string(), + }; + + write!(f, "{}", res) + } +} + +// Errors + +impl From<QueryError> for Error { + fn from(err: QueryError) -> Self { + Error::Query(err) + } +} + +impl From<ExprError> for Error { + fn from(err: ExprError) -> Self { + Error::Expr(err) + } +} |