mod conn; mod parser; use crate::common::db::Database; use crate::common::expr::Error as ExprError; use crate::common::query::Error as QueryError; use conn::Conn; use conn::Error as ConnError; use log::debug; use log::error; use log::info; use log::trace; use log::warn; use mio::event::Event; use mio::event::Events; use mio::net::TcpListener; use mio::Interest; use mio::Poll; use mio::Token; use std::collections::HashMap; use std::fmt; use std::io::ErrorKind; use std::io::Read; use std::net::SocketAddr; use std::process; use std::process::ExitCode; use std::time::Duration; pub const USAGE: &str = " Usage: blom start Starts the blom server. Options -b, --bind Bind to the given ADDRESS:PORT. Default is 0.0.0.0:4902 "; pub const HEADER: &str = " ___ ___ ___ ___ /\\ \\ /\\__\\ /\\ \\ /\\__\\ /..\\ \\ /./ / /..\\ \\ /..L_L_ /..\\.\\__\\ /./__/ /./\\.\\__\\ /./L.\\__\\ \\.\\../ / \\.\\ \\ \\.\\/./ / \\/_/./ / \\../ / \\.\\__\\ \\../ / /./ / \\/__/ \\/__/ \\/__/ \\/__/ "; const SERVER_TOKEN: Token = Token(0); const SERVER_BUFLEN: usize = 512; const CONN_BUFLEN: usize = 2048; const MAX_CONN: usize = 1000; #[derive(Debug)] enum Error { IO(std::io::Error), Query(QueryError), Expr(ExprError), } struct Server { addr: SocketAddr, pid: u32, poll: Poll, pool: Vec, listener: TcpListener, conns: HashMap, buffer: [u8; SERVER_BUFLEN], db: Database, } pub fn cmd(args: &[String]) -> ExitCode { println!("{}\n", HEADER); let addr = match args.get(0).map(|x| x.as_str()) { Some("--bind") | Some("-b") => { Some(args.get(1).expect("Missing address for --bind.").as_str()) } _ => None, } .unwrap_or("0.0.0.0:4902"); let mut server = match Server::bind(addr.parse().unwrap(), MAX_CONN) { Some(server) => server, None => return ExitCode::FAILURE, }; println!( "PID: {} | Listening on {}; press Ctrl-C to exit...\n\n", server.pid, server.addr ); match server.start() { Ok(()) => { info!("Received signal X, terminating."); ExitCode::SUCCESS } Err(e) => { error!("Error: {e:?}"); ExitCode::FAILURE } } } impl Server { fn bind(addr: SocketAddr, max_conns: usize) -> Option { assert!(max_conns > 0); let mut listener = match TcpListener::bind(addr) { Ok(listener) => listener, Err(ref e) if e.kind() == ErrorKind::AddrInUse => { println!("{addr} already in use."); return None; } Err(_) => { println!("Couldn't bind to {addr}."); return None; } }; let poll = Poll::new().unwrap(); poll.registry() .register(&mut listener, SERVER_TOKEN, Interest::READABLE) .unwrap(); Some(Server { addr, pid: process::id(), poll, listener, conns: HashMap::new(), buffer: [0; SERVER_BUFLEN], pool: (1..max_conns + 1).map(Token).collect(), db: Database::new(), }) } fn start(&mut self) -> Result<(), Error> { let mut events = Events::with_capacity(512); loop { match self.poll.poll(&mut events, Some(Duration::from_secs(30))) { Ok(()) => (), Err(ref e) if e.kind() == ErrorKind::Interrupted => break, Err(e) => return Err(Error::IO(e)), }; if events.is_empty() { trace!("Pulse"); } for event in &events { match event.token() { SERVER_TOKEN => self.handle_new(), token => self.handle_conn(token, event), } } } Ok(()) } fn handle_new(&mut self) { loop { match self.listener.accept() { Ok((mut socket, _)) => match self.pool.pop() { Some(token) => { self.poll .registry() .register(&mut socket, token, Interest::READABLE | Interest::WRITABLE) .unwrap(); let conn = Conn::new(socket, token); debug!( "[{}] Connected. {} clients connected.", conn, MAX_CONN - self.pool.len() ); self.conns.insert(token, conn); } None => { warn!( "[{}] Connection refused: Too many clients.", socket.peer_addr().unwrap() ); break; } }, Err(ref e) if e.kind() == ErrorKind::WouldBlock => break, Err(_) => panic!("FATAL | Unexpected error"), } } } fn handle_conn(&mut self, token: Token, event: &Event) { let mut did_work = true; while did_work { did_work = false; let conn = self.conns.get_mut(&token).unwrap(); if conn.pending_exec() { did_work = true; if let Err(ConnError::Fatal) = conn.handle_pending(&mut self.db) { self.close_conn(token); break; } } if event.is_readable() && conn.read_ready() { did_work = true; match conn.read(&mut self.buffer) { Ok(0) => { self.close_conn(token); break; } Ok(len) => { if let Err(ConnError::Fatal) = conn.handle_msg(&mut self.db, &self.buffer[0..len]) { self.close_conn(token); break; } } Err(ref e) if e.kind() == ErrorKind::WouldBlock => break, Err(ref e) if e.kind() == ErrorKind::ConnectionReset => { self.close_conn(token); break; } Err(e) => { error!("!!! [{conn}] Fatal error: {e:?}."); self.close_conn(token); break; } } } if event.is_writable() && conn.write_ready() { did_work = true; if let Err(e) = conn.reply() { error!("!!! [{conn}] Fatal error: {e:?}."); self.close_conn(token); break; }; } } } fn close_conn(&mut self, token: Token) { let conn = self.conns.remove(&token).unwrap(); self.pool.push(token); debug!( "[{conn}] Closed. {} clients connected.", MAX_CONN - self.pool.len() ); } } // Display impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let res = match self { Error::IO(err) => err.to_string(), Error::Query(err) => err.to_string(), Error::Expr(err) => err.to_string(), }; write!(f, "{}", res) } } // Errors impl From for Error { fn from(err: QueryError) -> Self { Error::Query(err) } } impl From for Error { fn from(err: ExprError) -> Self { Error::Expr(err) } }