aboutsummaryrefslogtreecommitdiff
path: root/src/server.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/server.rs')
-rw-r--r--src/server.rs291
1 files changed, 291 insertions, 0 deletions
diff --git a/src/server.rs b/src/server.rs
new file mode 100644
index 0000000..c23d74a
--- /dev/null
+++ b/src/server.rs
@@ -0,0 +1,291 @@
+mod conn;
+mod parser;
+
+use crate::common::db::Database;
+use crate::common::expr::Error as ExprError;
+use crate::common::query::Error as QueryError;
+use conn::Conn;
+use conn::Error as ConnError;
+use log::debug;
+use log::error;
+use log::info;
+use log::trace;
+use log::warn;
+use mio::event::Event;
+use mio::event::Events;
+use mio::net::TcpListener;
+use mio::Interest;
+use mio::Poll;
+use mio::Token;
+use std::collections::HashMap;
+use std::fmt;
+use std::io::ErrorKind;
+use std::io::Read;
+use std::net::SocketAddr;
+use std::process;
+use std::process::ExitCode;
+use std::time::Duration;
+
+pub const USAGE: &str = "
+Usage: blom start
+
+ Starts the blom server.
+
+Options
+
+ -b, --bind Bind to the given ADDRESS:PORT. Default is
+ 0.0.0.0:4902
+
+
+";
+
+pub const HEADER: &str = "
+ ___ ___ ___ ___
+ /\\ \\ /\\__\\ /\\ \\ /\\__\\
+ /..\\ \\ /./ / /..\\ \\ /..L_L_
+ /..\\.\\__\\ /./__/ /./\\.\\__\\ /./L.\\__\\
+ \\.\\../ / \\.\\ \\ \\.\\/./ / \\/_/./ /
+ \\../ / \\.\\__\\ \\../ / /./ /
+ \\/__/ \\/__/ \\/__/ \\/__/
+";
+
+const SERVER_TOKEN: Token = Token(0);
+const SERVER_BUFLEN: usize = 512;
+const CONN_BUFLEN: usize = 2048;
+const MAX_CONN: usize = 1000;
+
+#[derive(Debug)]
+enum Error {
+ IO(std::io::Error),
+ Query(QueryError),
+ Expr(ExprError),
+}
+
+struct Server {
+ addr: SocketAddr,
+ pid: u32,
+ poll: Poll,
+ pool: Vec<Token>,
+ listener: TcpListener,
+ conns: HashMap<Token, Conn>,
+ buffer: [u8; SERVER_BUFLEN],
+ db: Database,
+}
+
+pub fn cmd(args: &[String]) -> ExitCode {
+ println!("{}\n", HEADER);
+
+ let addr = match args.get(0).map(|x| x.as_str()) {
+ Some("--bind") | Some("-b") => {
+ Some(args.get(1).expect("Missing address for --bind.").as_str())
+ }
+ _ => None,
+ }
+ .unwrap_or("0.0.0.0:4902");
+
+ let mut server = match Server::bind(addr.parse().unwrap(), MAX_CONN) {
+ Some(server) => server,
+ None => return ExitCode::FAILURE,
+ };
+ println!(
+ "PID: {} | Listening on {}; press Ctrl-C to exit...\n\n",
+ server.pid, server.addr
+ );
+
+ match server.start() {
+ Ok(()) => {
+ info!("Received signal X, terminating.");
+ ExitCode::SUCCESS
+ }
+ Err(e) => {
+ error!("Error: {e:?}");
+ ExitCode::FAILURE
+ }
+ }
+}
+
+impl Server {
+ fn bind(addr: SocketAddr, max_conns: usize) -> Option<Self> {
+ assert!(max_conns > 0);
+
+ let mut listener = match TcpListener::bind(addr) {
+ Ok(listener) => listener,
+ Err(ref e) if e.kind() == ErrorKind::AddrInUse => {
+ println!("{addr} already in use.");
+ return None;
+ }
+ Err(_) => {
+ println!("Couldn't bind to {addr}.");
+ return None;
+ }
+ };
+
+ let poll = Poll::new().unwrap();
+
+ poll.registry()
+ .register(&mut listener, SERVER_TOKEN, Interest::READABLE)
+ .unwrap();
+
+ Some(Server {
+ addr,
+ pid: process::id(),
+ poll,
+ listener,
+ conns: HashMap::new(),
+ buffer: [0; SERVER_BUFLEN],
+ pool: (1..max_conns + 1).map(Token).collect(),
+ db: Database::new(),
+ })
+ }
+
+ fn start(&mut self) -> Result<(), Error> {
+ let mut events = Events::with_capacity(512);
+
+ loop {
+ match self.poll.poll(&mut events, Some(Duration::from_secs(30))) {
+ Ok(()) => (),
+ Err(ref e) if e.kind() == ErrorKind::Interrupted => break,
+ Err(e) => return Err(Error::IO(e)),
+ };
+
+ if events.is_empty() {
+ trace!("Pulse");
+ }
+
+ for event in &events {
+ match event.token() {
+ SERVER_TOKEN => self.handle_new(),
+ token => self.handle_conn(token, event),
+ }
+ }
+ }
+
+ Ok(())
+ }
+
+ fn handle_new(&mut self) {
+ loop {
+ match self.listener.accept() {
+ Ok((mut socket, _)) => match self.pool.pop() {
+ Some(token) => {
+ self.poll
+ .registry()
+ .register(&mut socket, token, Interest::READABLE | Interest::WRITABLE)
+ .unwrap();
+
+ let conn = Conn::new(socket, token);
+
+ debug!(
+ "[{}] Connected. {} clients connected.",
+ conn,
+ MAX_CONN - self.pool.len()
+ );
+
+ self.conns.insert(token, conn);
+ }
+ None => {
+ warn!(
+ "[{}] Connection refused: Too many clients.",
+ socket.peer_addr().unwrap()
+ );
+ break;
+ }
+ },
+ Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
+ Err(_) => panic!("FATAL | Unexpected error"),
+ }
+ }
+ }
+
+ fn handle_conn(&mut self, token: Token, event: &Event) {
+ let mut did_work = true;
+ while did_work {
+ did_work = false;
+
+ let conn = self.conns.get_mut(&token).unwrap();
+
+ if conn.pending_exec() {
+ did_work = true;
+ if let Err(ConnError::Fatal) = conn.handle_pending(&mut self.db) {
+ self.close_conn(token);
+ break;
+ }
+ }
+
+ if event.is_readable() && conn.read_ready() {
+ did_work = true;
+
+ match conn.read(&mut self.buffer) {
+ Ok(0) => {
+ self.close_conn(token);
+ break;
+ }
+ Ok(len) => {
+ if let Err(ConnError::Fatal) =
+ conn.handle_msg(&mut self.db, &self.buffer[0..len])
+ {
+ self.close_conn(token);
+ break;
+ }
+ }
+ Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
+ Err(ref e) if e.kind() == ErrorKind::ConnectionReset => {
+ self.close_conn(token);
+ break;
+ }
+ Err(e) => {
+ error!("!!! [{conn}] Fatal error: {e:?}.");
+ self.close_conn(token);
+ break;
+ }
+ }
+ }
+
+ if event.is_writable() && conn.write_ready() {
+ did_work = true;
+ if let Err(e) = conn.reply() {
+ error!("!!! [{conn}] Fatal error: {e:?}.");
+ self.close_conn(token);
+ break;
+ };
+ }
+ }
+ }
+
+ fn close_conn(&mut self, token: Token) {
+ let conn = self.conns.remove(&token).unwrap();
+ self.pool.push(token);
+ debug!(
+ "[{conn}] Closed. {} clients connected.",
+ MAX_CONN - self.pool.len()
+ );
+ }
+}
+
+// Display
+
+impl fmt::Display for Error {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ let res = match self {
+ Error::IO(err) => err.to_string(),
+ Error::Query(err) => err.to_string(),
+ Error::Expr(err) => err.to_string(),
+ };
+
+ write!(f, "{}", res)
+ }
+}
+
+// Errors
+
+impl From<QueryError> for Error {
+ fn from(err: QueryError) -> Self {
+ Error::Query(err)
+ }
+}
+
+impl From<ExprError> for Error {
+ fn from(err: ExprError) -> Self {
+ Error::Expr(err)
+ }
+}