From d7d2377772e31fafb56f8107a6a22df4a26846d9 Mon Sep 17 00:00:00 2001 From: omagdy Date: Wed, 23 Jul 2025 07:53:14 +0300 Subject: refactor+feat: Did overhaul refactoring for how I model each server data and also add a feat to propagate write commands to replicas --- src/server.rs | 354 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 354 insertions(+) create mode 100644 src/server.rs (limited to 'src/server.rs') diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..2d7cca7 --- /dev/null +++ b/src/server.rs @@ -0,0 +1,354 @@ +use crate::resp_parser::RespType; +use std::collections::HashMap; +use std::io::{Read, Write}; +use std::net::{SocketAddr, TcpStream}; +use std::sync::{Arc, Mutex}; +use std::{env, thread}; + +use crate::shared_cache::SharedCache; + +#[derive(Debug, Clone)] +pub struct MasterServer { + pub dir: Option, + pub dbfilename: Option, + pub replid: Option, + pub current_offset: Option, + pub port: String, + pub cache: SharedCache, + replicas: Vec, +} + +impl MasterServer { + fn new() -> Self { + Self { + dir: None, + dbfilename: None, + port: "6379".to_string(), + replid: Some("8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb".to_string()), + current_offset: Some("0".to_string()), + cache: Arc::new(Mutex::new(HashMap::new())), + replicas: vec![], + } + } + + fn port(&self) -> &str { + &self.port + } + + pub fn broadcast_command(&mut self, command: &[u8]) { + println!("Hello from brodcast"); + self.replicas.retain(|replica| { + if let Some(conn) = &replica.connection { + let mut conn = conn.lock().unwrap(); + if let Err(e) = conn.write_all(command) { + eprintln!("Failed to send to replica {}: {}", replica.port, e); + false // Drop dead connections + } else { + true + } + } else { + false + } + }); + } +} + +#[derive(Debug, Clone)] +pub struct SlaveServer { + pub dir: Option, + pub dbfilename: Option, + pub port: String, + pub master_replid: Option, + pub master_repl_offset: Option, + pub master_host: String, + pub master_port: String, + pub connection: Option>>, + pub cache: SharedCache, +} + +impl SlaveServer { + fn new( + port: String, + master_host: String, + master_port: String, + connection: Option>>, + ) -> Self { + Self { + dir: None, + dbfilename: None, + port, + master_replid: Some("8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb".to_string()), + master_repl_offset: Some("0".to_string()), + master_host, + master_port, + connection, + cache: Arc::new(Mutex::new(HashMap::new())), + } + } + + fn connect(&self) -> Result { + let master_address = format!("{}:{}", self.master_host, self.master_port); + return TcpStream::connect(master_address); + } + + fn handshake(&mut self) -> Result<(), String> { + match self.connect() { + Ok(mut stream) => { + let mut buffer = [0; 512]; + + let mut send_command = |command: &[u8]| -> Result<(), String> { + stream + .write_all(command) + .map_err(|e| format!("Failed to send: {}", e))?; + + match stream.read(&mut buffer) { + Ok(0) | Err(_) => return Ok(()), // connection closed or error + Ok(_) => Ok(()), + } + }; + + // PING + send_command(&resp_bytes!(array => [resp!(bulk "PING")]))?; + + // REPLCONF listening-port + send_command(&resp_bytes!(array => [ + resp!(bulk "REPLCONF"), + resp!(bulk "listening-port"), + resp!(bulk self.port.clone()) + ]))?; + + // REPLCONF capa psync2 + send_command(&resp_bytes!(array => [ + resp!(bulk "REPLCONF"), + resp!(bulk "capa"), + resp!(bulk "psync2") + ]))?; + + // PSYNC + send_command(&resp_bytes!(array => [ + resp!(bulk "PSYNC"), + resp!(bulk "?"), + resp!(bulk "-1") + ]))?; + + // Store the persistent connection + let shared_stream = Arc::new(Mutex::new(stream)); + self.connection = Some(shared_stream.clone()); + + // Spawn the background listener thread + thread::spawn(move || { + let mut buffer = [0u8; 1024]; + loop { + let mut stream = shared_stream.lock().unwrap(); + match stream.read(&mut buffer) { + Ok(0) => { + println!("Master disconnected"); + break; + } + Ok(n) => { + println!( + "REPLICA received: {}", + String::from_utf8_lossy(&buffer[..n]) + ); + } + Err(e) => { + eprintln!("Error reading from master: {}", e); + break; + } + } + } + }); + + Ok(()) + } + Err(e) => Err(format!("Master node doesn't exist: {}", e)), + } + } +} + +#[derive(Debug, Clone)] +pub enum RedisServer { + Master(MasterServer), + Slave(SlaveServer), +} + +impl RedisServer { + pub fn master() -> Self { + RedisServer::Master(MasterServer::new()) + } + + pub fn slave(port: String, master_host: String, master_port: String) -> Self { + RedisServer::Slave(SlaveServer::new(port, master_host, master_port, None)) + } + + // Helper methods to access common fields regardless of variant + pub fn port(&self) -> &str { + match self { + RedisServer::Master(master) => &master.port, + RedisServer::Slave(slave) => &slave.port, + } + } + + pub fn set_port(&mut self, port: String) { + match self { + RedisServer::Master(master) => master.port = port, + RedisServer::Slave(slave) => slave.port = port, + } + } + + pub fn dir(&self) -> &Option { + match self { + RedisServer::Master(master) => &master.dir, + RedisServer::Slave(slave) => &slave.dir, + } + } + + pub fn set_dir(&mut self, dir: Option) { + match self { + RedisServer::Master(master) => master.dir = dir, + RedisServer::Slave(slave) => slave.dir = dir, + } + } + + pub fn dbfilename(&self) -> &Option { + match self { + RedisServer::Master(master) => &master.dbfilename, + RedisServer::Slave(slave) => &slave.dbfilename, + } + } + + pub fn set_dbfilename(&mut self, dbfilename: Option) { + match self { + RedisServer::Master(master) => master.dbfilename = dbfilename, + RedisServer::Slave(slave) => slave.dbfilename = dbfilename, + } + } + + pub fn cache(&self) -> &SharedCache { + match self { + RedisServer::Master(master) => &master.cache, + RedisServer::Slave(slave) => &slave.cache, + } + } + + pub fn set_cache(&mut self, cache: &SharedCache) { + match self { + RedisServer::Master(master) => master.cache = cache.clone(), + RedisServer::Slave(slave) => slave.cache = cache.clone(), + } + } + + pub fn role(&self) -> &str { + match self { + RedisServer::Master(_) => "master", + RedisServer::Slave(_) => "slave", + } + } + + pub fn add_replica(&mut self, replica_adr: SocketAddr, connection: Arc>) { + match self { + // TODO: Should probably add host to MasterServer and SlaveServer as member field + RedisServer::Master(master) => master.replicas.push(SlaveServer::new( + replica_adr.port().to_string(), + "localhost".to_owned(), + master.port().to_owned(), + Some(connection), + )), + RedisServer::Slave(_) => { + unreachable!("Slaves don't have replicas") + } + } + } + + pub fn broadcast_command(&mut self, command: &[u8]) { + if let RedisServer::Master(master) = self { + master.broadcast_command(command); + } + } + + pub fn is_master(&self) -> bool { + matches!(self, RedisServer::Master(_)) + } + + pub fn is_slave(&self) -> bool { + matches!(self, RedisServer::Slave(_)) + } +} + +impl RedisServer { + pub fn new() -> Result, String> { + let args: Vec = env::args().collect(); + + if args.len() == 1 { + return Ok(None); + } + + let mut redis_server = RedisServer::master(); + let mut dir = None; + let mut dbfilename = None; + + let mut i = 1; // Skip program name + while i < args.len() { + match args[i].as_str() { + "--dir" => { + if i + 1 >= args.len() { + return Err("--dir requires a value".to_string()); + } + dir = Some(args[i + 1].clone()); + i += 2; + } + "--dbfilename" => { + if i + 1 >= args.len() { + return Err("--dbfilename requires a value".to_string()); + } + dbfilename = Some(args[i + 1].clone()); + i += 2; + } + "--port" => { + if i + 1 >= args.len() { + return Err("--port requires a value".to_string()); + } + redis_server.set_port(args[i + 1].clone()); + i += 2; + } + "--replicaof" => { + if i + 1 >= args.len() { + return Err("--replicaof requires a value".to_string()); + } + + // TODO: Find a better name for this variable info + let info = args[i + 1].clone(); + let (master_host, master_port) = info.split_once(' ').ok_or_else(|| { + "Invalid --replicaof format. Expected 'host port'".to_string() + })?; + + // Get current port or use default + let current_port = redis_server.port().to_string(); + + // Create new slave server + redis_server = RedisServer::slave( + current_port, + master_host.to_string(), + master_port.to_string(), + ); + + // Perform handshake + if let RedisServer::Slave(mut slave) = redis_server.clone() { + slave.handshake()?; + } + + i += 2; + } + _ => { + return Err(format!("Unknown argument: {}", args[i])); + } + } + } + + // Set dir and dbfilename after server is finalized + redis_server.set_dir(dir); + redis_server.set_dbfilename(dbfilename); + + Ok(Some(redis_server)) + } +} -- cgit v1.2.3