From 30e6b478d7cd286b68da21d7a5aa5426c588cd02 Mon Sep 17 00:00:00 2001 From: omagdy Date: Thu, 24 Jul 2025 06:04:32 +0300 Subject: refactor: Refactor how I model the state and config and cache of the server with sepraration of concerns --- src/main.rs | 44 +++- src/rdb.rs | 14 + src/resp_commands.rs | 122 +++++---- src/resp_parser.rs | 17 +- src/server.rs | 718 ++++++++++++++++++++++++++++++++++++++------------- src/shared_cache.rs | 3 +- 6 files changed, 654 insertions(+), 264 deletions(-) (limited to 'src') diff --git a/src/main.rs b/src/main.rs index 25bc6c9..3fe5559 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,15 +14,16 @@ use std::{ use codecrafters_redis::{ rdb::{KeyExpiry, ParseError, RDBFile, RedisValue}, resp_bytes, + server::SharedMut, shared_cache::*, }; -use codecrafters_redis::{resp_commands::RedisCommands, server::RedisServer}; +use codecrafters_redis::{resp_commands::RedisCommand, server::RedisServer}; use codecrafters_redis::{ resp_parser::{parse, RespType}, server::SlaveServer, }; -fn spawn_cleanup_thread(cache: SharedCache) { +fn spawn_cleanup_thread(cache: SharedMut) { let cache_clone = cache.clone(); std::thread::spawn(move || { loop { @@ -68,12 +69,25 @@ fn handle_client(mut stream: TcpStream, server: Arc>) { }; let request = parse(&buffer).unwrap(); - let server_clone = Arc::clone(&server); - let response = RedisCommands::from(request.0.clone()).execute(server_clone); + + let mut server = server.lock().unwrap(); + + // Big State vars + let cache = server.cache().clone(); + let server_state = server.get_server_state().clone(); + let config = server.config(); + let brodcaster = server.as_broadcaster(); + + let response = RedisCommand::from(request.0.clone()).execute( + cache.clone(), + config, + server_state, + brodcaster, + ); let mut request_command = "".to_string(); - // FIXME: Find a solution for this mess!! + // FIXME: Find a solution for this mess!! (Design better API) match &request.0 { RespType::Array(arr) => { if let RespType::BulkString(s) = arr[0].clone() { @@ -84,22 +98,28 @@ fn handle_client(mut stream: TcpStream, server: Arc>) { } // Store the persistent connection - let shared_stream = Arc::new(Mutex::new( - stream.try_clone().expect("What could go wrong? :)"), - )); + // let shared_stream = Arc::new(Mutex::new( + // stream.try_clone().expect("What could go wrong? :)"), + // )); // if this true immediately write and send back rdb file after response // HACK: This just feels wrong I feel this shouldn't be handled here and should be handled // in the exexute command if request_command.starts_with("PSYNC") { stream.write(&response).unwrap(); - let _ = write_rdb_to_stream(&mut stream); - // handshake completed and I should add the server sending me the handshake to my replicas let replica_addr = stream .peer_addr() .expect("This shouldn't fail right? right?? :)"); - let mut server = server.lock().unwrap(); - server.add_replica(replica_addr, shared_stream); + + server.add_replica( + replica_addr, + Arc::new(Mutex::new( + stream.try_clone().expect("What could go wrong? :)"), + )), + ); + + let _ = write_rdb_to_stream(&mut stream); + // handshake completed and I should add the server sending me the handshake to my replicas } else { // write respose back to the client stream.write(&response).unwrap(); diff --git a/src/rdb.rs b/src/rdb.rs index 0195cb9..272d168 100644 --- a/src/rdb.rs +++ b/src/rdb.rs @@ -600,6 +600,20 @@ impl FromBytes for RDBFile { let mut total_consumed = 0; let mut databases = HashMap::new(); + // special case when rdb file is sent by the master to replicas in the following format + // $/r/n + if bytes[0] == '$' as u8 { + // consume up to the CRLF + let (consumed, rest) = bytes + .windows(2) + .position(|window| window == b"\r\n") + .map(|pos| (&bytes[..pos], &bytes[pos + 2..])) + .ok_or(ParseError::UnexpectedEof)?; + println!("Consumed {:?}", consumed); + remaining = rest; + total_consumed += consumed.len(); + } + // 1. Parse the RDB header ("REDIS" + version) let (header, consumed) = RDBHeader::from_bytes(remaining)?; total_consumed += consumed; diff --git a/src/resp_commands.rs b/src/resp_commands.rs index 453024c..981f7aa 100644 --- a/src/resp_commands.rs +++ b/src/resp_commands.rs @@ -1,7 +1,6 @@ use crate::server::*; use crate::{resp_parser::*, shared_cache::*}; use regex::Regex; -use std::sync::{Arc, Mutex}; use std::time::{SystemTime, UNIX_EPOCH}; #[derive(Debug, Clone)] @@ -110,7 +109,7 @@ fn extract_string(resp: &RespType) -> Option { } } -pub enum RedisCommands { +pub enum RedisCommand { Ping, Echo(String), Get(String), @@ -123,15 +122,20 @@ pub enum RedisCommands { Invalid, } -impl RedisCommands { - pub fn execute(self, server: Arc>) -> Vec { - use RedisCommands as RC; +impl RedisCommand { + pub fn execute<'a>( + self, + cache: SharedMut, + config: Shared, + server_state: ServerState, + broadcaster: Option>, + ) -> Vec { + use RedisCommand as RC; match self { RC::Ping => resp_bytes!("PONG"), RC::Echo(echo_string) => resp_bytes!(echo_string), RC::Get(key) => { - let server = server.lock().unwrap(); - let mut cache = server.cache().lock().unwrap(); + let mut cache = cache.lock().unwrap(); match cache.get(&key).cloned() { Some(entry) => { @@ -146,8 +150,7 @@ impl RedisCommands { } } RC::Set(command) => { - let mut server = server.lock().unwrap(); - let mut cache = server.cache().lock().unwrap(); + let mut cache = cache.lock().unwrap(); // Check conditions (NX/XX) let key_exists = cache.contains_key(&command.key); @@ -190,17 +193,24 @@ impl RedisCommands { }, ); - // Broadcast SET to replicas after mutating local state - let broadcast_cmd = resp_bytes!(array => [ - resp!(bulk "SET"), - resp!(bulk command.key), - resp!(bulk command.value) - ]); - - // Unlock the mutex so that I can access to broadcast the messaage - drop(cache); + println!( + "My role is {:?} and I just inserted {} to {}", + server_state.role, &command.key, command.value + ); - server.broadcast_command(&broadcast_cmd); + // Broadcast to replicas if this is a master + if let Some(broadcaster) = broadcaster { + // Broadcast SET to replicas after mutating local state + let broadcast_cmd = resp_bytes!(array => [ + resp!(bulk "SET"), + resp!(bulk command.key), + resp!(bulk command.value) + ]); + broadcaster + .lock() + .unwrap() + .broadcast_command_to_replicas(&broadcast_cmd); + } if !command.get_old_value { return resp_bytes!("OK"); @@ -213,8 +223,9 @@ impl RedisCommands { } RC::ConfigGet(s) => { use RespType as RT; - let server = server.lock().unwrap(); - if let (Some(dir), Some(dbfilename)) = (server.dir(), server.dbfilename()) { + if let (Some(dir), Some(dbfilename)) = + (config.dir.clone(), config.dbfilename.clone()) + { match s.as_str() { "dir" => RT::Array(vec![ RT::BulkString(s.as_bytes().to_vec()), @@ -236,9 +247,7 @@ impl RedisCommands { use RespType as RT; let query = query.replace('*', ".*"); - - let server = server.lock().unwrap(); - let cache = server.cache().lock().unwrap(); + let cache = cache.lock().unwrap(); let regex = Regex::new(&query).unwrap(); let matching_keys: Vec = cache .keys() @@ -252,53 +261,42 @@ impl RedisCommands { } RC::Info(_sub_command) => { use RespType as RT; - let server = server.lock().unwrap(); - match &*server { - RedisServer::Master(master) => { - let response = format!( - "# Replication\r\nrole:{}master_replid:{}master_repl_offset:{}", - server.role(), - master.replid.clone().unwrap_or("".to_string()), - master.current_offset.lock().unwrap(), - ) - .as_bytes() - .to_vec(); - RT::BulkString(response).to_resp_bytes() - } - RedisServer::Slave(slave) => { - let response = format!( - "# Replication\r\nrole:{}master_replid:{}master_repl_offset:{}", - server.role(), - slave.master_replid.clone().unwrap_or("".to_string()), - slave.master_repl_offset.lock().unwrap() - ) - .as_bytes() - .to_vec(); - RT::BulkString(response).to_resp_bytes() - } - } + let role_str = match server_state.role { + ServerRole::Master => "master", + ServerRole::Slave => "slave", + }; + + let response = format!( + "# Replication\r\nrole:{}\r\nmaster_replid:{}\r\nmaster_repl_offset:{}", + role_str, server_state.repl_id, server_state.repl_offset, + ) + .into_bytes(); + + RT::BulkString(response).to_resp_bytes() } RC::ReplConf((op1, op2)) => match (op1.to_uppercase().as_str(), op2.as_str()) { ("GETACK", "*") => { println!("Did i even get here?"); - resp_bytes!(array => [resp!(bulk "REPLCONF"), resp!(bulk "ACK"), resp!(bulk server.lock().unwrap().repl_offset().to_string())]) + resp_bytes!(array => [resp!(bulk "REPLCONF"), resp!(bulk "ACK"), resp!(bulk server_state.repl_offset.to_string())]) } _ => resp_bytes!("OK"), }, RC::Psync((_, _)) => { - let server = server.lock().unwrap(); - if let RedisServer::Master(master) = &*server { - let response = format!( - "FULLRESYNC {} 0", - master.replid.clone().unwrap_or("".to_string()), - ); - resp_bytes!(response) - } else { - // TODO: Find a way to report this error back up the program trace - unreachable!("I should never come here") + // This should only be called on masters + match server_state.role { + ServerRole::Master => { + let response = format!("FULLRESYNC {} 0", server_state.repl_id); + resp_bytes!(response) + } + // This shouldn't happen, but handle gracefully + ServerRole::Slave => { + // TODO: I actually forgot that you could send error messages with redis do + // more of this across the whole codebase when it makes sense + resp_bytes!(error "ERR PSYNC not supported on slave") + } } } - RC::Invalid => todo!(), + RC::Invalid => resp_bytes!(error "ERR Invalid Command"), } } } @@ -403,7 +401,7 @@ impl SetOptionParser { } } -impl From for RedisCommands { +impl From for RedisCommand { fn from(value: RespType) -> Self { // Alternative approach using a more functional style with iterators let RespType::Array(command) = value else { diff --git a/src/resp_parser.rs b/src/resp_parser.rs index 952176c..646d2b1 100644 --- a/src/resp_parser.rs +++ b/src/resp_parser.rs @@ -380,18 +380,17 @@ pub fn parse_bulk_strings(bytes: &[u8]) -> Result<(RespType, &[u8]), RespError> return Err(RespError::UnexpectedEnd); } - let mut bulk_string: Vec = Vec::with_capacity(length as usize); + let bulk_string = remained[..length as usize].to_vec(); + let remaining_after_string = &remained[length as usize..]; - for i in 0..length { - bulk_string.push(remained[i as usize]); - } - - let consumed = RespType::BulkString(bulk_string); - - if !(&remained[length as usize..]).starts_with(b"\r\n") { + if !remaining_after_string.starts_with(b"\r\n") { return Err(RespError::UnexpectedEnd); } - return Ok((consumed, &remained[length as usize + 2..])); + + Ok(( + RespType::BulkString(bulk_string), + &remaining_after_string[2..], + )) } [] => Err(RespError::Custom(String::from("Empty data"))), } diff --git a/src/server.rs b/src/server.rs index 6da29eb..8c09f73 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,96 +1,279 @@ -use crate::resp_commands::RedisCommands; +use crate::rdb::{FromBytes, RDBFile}; +use crate::resp_commands::RedisCommand; use crate::resp_parser::{parse, RespType}; use std::collections::HashMap; use std::io::{Read, Write}; use std::net::{SocketAddr, TcpStream}; use std::sync::{Arc, Mutex}; -use std::time::Duration; use std::{env, thread}; -use crate::shared_cache::SharedCache; +use crate::shared_cache::Cache; + +// TODO: add functions to access member variables instead of accessing them directly #[derive(Debug, Clone)] -pub struct MasterServer { +pub struct ServerConfig { pub dir: Option, pub dbfilename: Option, pub port: String, - pub replid: Option, - pub current_offset: Arc>, - pub cache: SharedCache, - replicas: Vec, +} + +// Server state that commands might need +#[derive(Debug, Clone)] +pub struct ServerState { + pub role: ServerRole, + pub repl_id: String, + pub repl_offset: usize, +} + +#[derive(Debug, Clone)] +pub enum ServerRole { + Master, + Slave, +} + +// Trait for broadcasting - only masters can do this +pub trait CanBroadcast: Send { + fn broadcast_command_to_replicas(&mut self, command: &[u8]); +} + +// Implementation for Master +impl CanBroadcast for MasterServer { + fn broadcast_command_to_replicas(&mut self, command: &[u8]) { + self.broadcast_command(command); + } +} + +// Helper methods to extract server state +impl MasterServer { + pub fn get_server_state(&self) -> ServerState { + ServerState { + role: ServerRole::Master, + repl_id: self.get_replid(), + repl_offset: self.get_repl_offset(), + } + } +} + +impl SlaveServer { + pub fn get_server_state(&self) -> ServerState { + let state = self.state.lock().unwrap(); + ServerState { + role: ServerRole::Slave, + repl_id: state.master_replid.clone(), + repl_offset: state.master_repl_offset, + } + } +} + +impl RedisServer { + pub fn get_server_state(&self) -> ServerState { + match self { + RedisServer::Master(master) => master.get_server_state(), + RedisServer::Slave(slave) => slave.get_server_state(), + } + } + + pub fn as_broadcaster(&mut self) -> Option> { + match self { + RedisServer::Master(master) => { + Some(Arc::new(Mutex::new(master as &mut dyn CanBroadcast))) + } + RedisServer::Slave(_) => None, + } + } +} + +#[derive(Debug)] +pub struct MasterState { + pub replid: String, + pub current_offset: usize, + pub replicas: Vec, +} + +// Slave-specific state +#[derive(Debug)] +pub struct SlaveState { + pub master_replid: String, + pub master_repl_offset: usize, + pub master_host: String, + pub master_port: String, + pub connection: Option, +} + +#[derive(Debug)] +pub struct ReplicaConnection { + pub port: String, + pub connection: Arc>, +} + +pub type SharedMut = Arc>; +pub type Shared = Arc; + +#[derive(Debug, Clone)] +pub struct MasterServer { + config: Shared, + state: SharedMut, + cache: SharedMut, } impl MasterServer { fn new() -> Self { - Self { + let config = Arc::new(ServerConfig { dir: None, dbfilename: None, port: "6379".to_string(), - replid: Some("8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb".to_string()), - current_offset: Arc::new(Mutex::new(0)), - cache: Arc::new(Mutex::new(HashMap::new())), + }); + + let state = Arc::new(Mutex::new(MasterState { + replid: "8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb".to_string(), + current_offset: 0, replicas: vec![], + })); + + let cache = Arc::new(Mutex::new(HashMap::new())); + + Self { + config, + state, + cache, } } fn port(&self) -> &str { - &self.port + &self.config.port } pub fn broadcast_command(&mut self, command: &[u8]) { - println!("Hello from brodcast"); - self.replicas.retain(|replica| { - if let Some(conn) = &replica.connection { - let mut conn = conn.lock().unwrap(); - if let Err(e) = conn.write_all(command) { - eprintln!("Failed to send to replica {}: {}", replica.port, e); - false // Drop dead connections - } else { - true - } + let mut state = self.state.lock().unwrap(); + + state.replicas.retain(|replica| { + let mut conn = replica.connection.lock().unwrap(); + if let Err(e) = conn.write_all(command) { + eprintln!("Failed to send to replica {}: {}", replica.port, e); + false // Drop dead connections } else { - false + true } - }); + }) + } + + pub fn add_replica(&self, replica_addr: SocketAddr, connection: Arc>) { + let replica = ReplicaConnection { + port: replica_addr.port().to_string(), + connection, + }; + + self.state.lock().unwrap().replicas.push(replica); + } + + pub fn get_repl_offset(&self) -> usize { + self.state.lock().unwrap().current_offset + } + + pub fn increment_repl_offset(&self, amount: usize) { + self.state.lock().unwrap().current_offset += amount; + } + + pub fn get_replid(&self) -> String { + self.state.lock().unwrap().replid.clone() } } #[derive(Debug, Clone)] pub struct SlaveServer { - pub dir: Option, - pub dbfilename: Option, - pub port: String, - pub master_replid: Option, - pub master_repl_offset: Arc>, - pub master_host: String, - pub master_port: String, - pub connection: Option>>, - pub cache: SharedCache, + config: Shared, + state: SharedMut, + cache: SharedMut, +} + +fn read_rdb_from_stream(reader: &mut R) -> Result, String> { + let mut buffer = [0u8; 1024]; + + // Read until we get the length prefix ($\r\n) + let mut length_bytes = Vec::new(); + + loop { + let bytes_read = reader + .read(&mut buffer) + .map_err(|e| format!("Failed to read: {}", e))?; + if bytes_read == 0 { + return Err("Connection closed while reading RDB length".to_string()); + } + + length_bytes.extend_from_slice(&buffer[..bytes_read]); + + if length_bytes.len() >= 2 && &length_bytes[length_bytes.len() - 2..] == b"\r\n" { + break; + } + } + + // Parse the length prefix ($\r\n) + let (resp, remaining) = + parse(&length_bytes).map_err(|e| format!("Failed to parse RDB length: {:?}", e))?; + let length = match resp { + RespType::BulkString(_) => { + let len_str = String::from_utf8_lossy(&length_bytes[1..length_bytes.len() - 2]); + len_str + .parse::() + .map_err(|e| format!("Invalid RDB length: {}", e))? + } + _ => return Err("Expected bulk string for RDB length".to_string()), + }; + + // Read the exact number of bytes for the RDB file + let mut rdb_bytes = vec![0u8; length]; + let mut total_read = remaining.len(); + rdb_bytes[..remaining.len()].copy_from_slice(remaining); + + while total_read < length { + let bytes_read = reader + .read(&mut buffer) + .map_err(|e| format!("Failed to read RDB: {}", e))?; + if bytes_read == 0 { + return Err("Connection closed while reading RDB file".to_string()); + } + let end = (total_read + bytes_read).min(length); + rdb_bytes[total_read..end].copy_from_slice(&buffer[..(end - total_read)]); + total_read += bytes_read; + } + + Ok(rdb_bytes) } impl SlaveServer { - fn new( - port: String, - master_host: String, - master_port: String, - connection: Option>>, - ) -> Self { - Self { + fn new(port: String, master_host: String, master_port: String) -> Self { + let config = Arc::new(ServerConfig { dir: None, dbfilename: None, port, - master_replid: Some("8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb".to_string()), - master_repl_offset: Arc::new(Mutex::new(0)), + }); + + let state = Arc::new(Mutex::new(SlaveState { + master_replid: "8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb".to_string(), + master_repl_offset: 0, master_host, master_port, - connection, - cache: Arc::new(Mutex::new(HashMap::new())), + connection: None, + })); + + let cache = Arc::new(Mutex::new(HashMap::new())); + + Self { + config, + state, + cache, } } + pub fn increment_repl_offset(&mut self, amount: usize) { + self.state.lock().unwrap().master_repl_offset += amount; + } + fn connect(&self) -> Result { - let master_address = format!("{}:{}", self.master_host, self.master_port); - return TcpStream::connect(master_address); + let state = self.state.lock().unwrap(); + let master_address = format!("{}:{}", state.master_host, state.master_port); + TcpStream::connect(master_address) } fn handshake(&mut self) -> Result<(), String> { @@ -98,128 +281,254 @@ impl SlaveServer { Ok(mut stream) => { let mut buffer = [0; 1024]; - let mut send_command = |command: &[u8]| -> Result<(), String> { + let mut send_command = |command: &[u8], read: bool| -> Result<(), String> { stream .write_all(command) .map_err(|e| format!("Failed to send: {}", e))?; - match stream.read(&mut buffer) { - Ok(0) | Err(_) => return Ok(()), // connection closed or error - Ok(_) => { - println!("Recieved some bytes here!"); - Ok(()) + if read { + match stream.read(&mut buffer) { + Ok(0) | Err(_) => return Ok(()), // connection closed or error + Ok(_) => { + println!("Recieved some bytes here!"); + Ok(()) + } } + } else { + Ok(()) } }; - // PING - send_command(&resp_bytes!(array => [resp!(bulk "PING")]))?; - - // REPLCONF listening-port - send_command(&resp_bytes!(array => [ - resp!(bulk "REPLCONF"), - resp!(bulk "listening-port"), - resp!(bulk self.port.clone()) - ]))?; + // Step1: PING + send_command(&resp_bytes!(array => [resp!(bulk "PING")]), true)?; + + let port = self.config.port.clone(); + // Step2: REPLCONF listening-port + send_command( + &resp_bytes!(array => [ + resp!(bulk "REPLCONF"), + resp!(bulk "listening-port"), + resp!(bulk port) + ]), + true, + )?; + + // Step3: REPLCONF capa psync2 + send_command( + &resp_bytes!(array => [ + resp!(bulk "REPLCONF"), + resp!(bulk "capa"), + resp!(bulk "psync2") + ]), + true, + )?; + + // Step 4: PSYNC + send_command( + &resp_bytes!(array => [ + resp!(bulk "PSYNC"), + resp!(bulk "?"), + resp!(bulk "-1") + ]), + false, + )?; + + // Step 5: Read FULLRESYNC response + let bytes_read = stream + .read(&mut buffer) + .map_err(|e| format!("Failed to read FULLRESYNC: {}", e))?; + let (parsed, mut rest) = parse(&buffer[..bytes_read]) + .map_err(|e| format!("Failed to parse FULLRESYNC: {:?}", e))?; + match parsed { + RespType::SimpleString(s) if s.starts_with("FULLRESYNC") => { + // Expected response + } + _ => return Err("Invalid FULLRESYNC response".to_string()), + } - // REPLCONF capa psync2 - send_command(&resp_bytes!(array => [ - resp!(bulk "REPLCONF"), - resp!(bulk "capa"), - resp!(bulk "psync2") - ]))?; + println!("rest: {:?}", rest); - // PSYNC - send_command(&resp_bytes!(array => [ - resp!(bulk "PSYNC"), - resp!(bulk "?"), - resp!(bulk "-1") - ]))?; + println!("FULLRESYNC response bytes read: {}", bytes_read); - thread::sleep(Duration::new(1, 0)); + // So there is an interesting behaviour where the FULLRESYNC + RDB and if you are + // really lucky the REPLCONF would all get sent in one TCP segment so I should + // assume I would get nice segments refelecting each command + if !rest.is_empty() { + // TODO: Sync the rdb_file with the slave's cache + // TODO: Find a way to propagate the error up the stack by using anyhow or something + let (rdb_file, bytes_consumed) = RDBFile::from_bytes(rest).unwrap(); + rest = &rest[bytes_consumed..]; + println!("rdb bytes: {}", bytes_consumed); + println!("remaining btyes after rdb: {}", rest.len()); + } // Store the persistent connection - let shared_stream = Arc::new(Mutex::new(stream)); - self.connection = Some(shared_stream.clone()); - - // Spawn the background listener thread - let cache_clone = self.cache.clone(); - let master_repl_offset = self.master_repl_offset.clone(); - thread::spawn(move || { - loop { - let bytes_read = { - let mut stream = shared_stream.lock().unwrap(); - match stream.read(&mut buffer) { - Ok(0) => { - println!("Master disconnected"); - break; - } - Ok(n) => n, - Err(e) => { - eprintln!("Error reading from master: {}", e); - break; - } - } - }; // stream lock is dropped here - - dbg!(&bytes_read); - - // Parse and execute all commands in the buffer - let mut remaining_bytes = &buffer[..bytes_read]; - - dbg!(&remaining_bytes); - - let mut flag = true; - - while !remaining_bytes.is_empty() || flag { - flag = false; - match parse(remaining_bytes) { - Ok((parsed_command, leftover)) => { - dbg!(&parsed_command, leftover); - - // Create a temporary slave server for command execution - let temp_slave = RedisServer::Slave(SlaveServer::new( - "0".to_string(), // dummy port - "localhost".to_string(), // dummy host - "6379".to_string(), // dummy master port - None, // no connection needed for execution - )); - - // Set the cache to our actual cache - let mut temp_slave = temp_slave; - temp_slave.set_repl_offset(&master_repl_offset); - temp_slave.set_cache(&cache_clone); - let server_arc = Arc::new(Mutex::new(temp_slave)); - - let response = RedisCommands::from(parsed_command.clone()) - .execute(server_arc.clone()); - - let mut shared_stream = shared_stream.lock().unwrap(); - - if let RespType::Array(arr) = &parsed_command { - if arr[0].to_resp_bytes() == b"REPLCONF".to_vec() { - shared_stream.write(&response).unwrap(); - } - } - - // Update remaining bytes for next iteration - remaining_bytes = leftover; - server_arc.lock().unwrap().repl_offset_increment( - parsed_command.to_resp_bytes().len(), - ); - } - Err(_) => { - // If parsing fails, break out of the loop - break; - } + self.state.lock().unwrap().connection = Some(stream); + self.start_replication_listener(&mut rest); + + Ok(()) + } + Err(e) => Err(format!("Master node doesn't exist: {}", e)), + } + } + + // TODO: This should return a Result + fn start_replication_listener<'a>(&'a self, rest: &mut &[u8]) { + let state = self.state.clone(); + let cache = self.cache.clone(); + let config = self.config.clone(); + let server_state = self.get_server_state(); + let broadcaster = None::>>; + + // if it's not empty then there is probably a REPLCONF command sent and I should handle it + // before reading anymore bytes + if !rest.is_empty() { + // TODO: Sync the rdb_file with the slave's cache + // TODO: Find a way to propagate the error up the stack by using anyhow or something + if rest[0] == '$' as u8 { + // this means that the rdb segment got in here some how so I have to deal with it here + let (rdb_file, bytes_consumed) = RDBFile::from_bytes(rest).unwrap(); + *rest = &rest[bytes_consumed..]; + + println!("rdb bytes: {}", bytes_consumed); + println!("remaining btyes after rdb: {}", rest.len()); + if rest.len() > 0 { + match parse(rest) { + Ok((resp, leftover)) => { + dbg!(&resp, leftover); + + // Update replication offset + let command_size = resp.to_resp_bytes().len(); + let mut state_guard = state.lock().unwrap(); + + let command = RedisCommand::from(resp); + + let response = command.execute( + cache.clone(), + config.clone(), + server_state.clone(), + broadcaster.clone(), + ); + + if let Some(ref mut stream) = state_guard.connection { + let _ = stream.write_all(&response); + let _ = stream.flush(); } + + state_guard.master_repl_offset += command_size; } + Err(_) => {} } - }); - Ok(()) + } + } else { + match parse(rest) { + Ok((resp, leftover)) => { + dbg!(&resp, leftover); + + // Update replication offset + let command_size = resp.to_resp_bytes().len(); + let mut state_guard = state.lock().unwrap(); + + let command = RedisCommand::from(resp); + + let response = command.execute( + cache.clone(), + config.clone(), + server_state.clone(), + broadcaster.clone(), + ); + + if let Some(ref mut stream) = state_guard.connection { + let _ = stream.write_all(&response); + let _ = stream.flush(); + } + + state_guard.master_repl_offset += command_size; + } + Err(_) => {} + } } - Err(e) => Err(format!("Master node doesn't exist: {}", e)), } + + // Spawn the background listener thread + thread::spawn(move || { + let mut buffer = [0u8; 1024]; + loop { + let bytes_read = { + let mut state_guard = state.lock().unwrap(); + if let Some(ref mut stream) = state_guard.connection { + match stream.read(&mut buffer) { + Ok(0) => { + println!("Master disconnected"); + break; + } + Ok(n) => n, + Err(e) => { + eprintln!("Error reading from master: {}", e); + break; + } + } + } else { + break; + } + }; + + println!("After handshake: {}", bytes_read); + + // Parse and execute all commands in the buffer + let mut remaining_bytes = &buffer[..bytes_read]; + + println!("remaining_bytes: {:?}", &remaining_bytes); + + // TODO: Sync the rdb_file with the slave's cache + // TODO: Find a way to propagate the error up the stack by using anyhow or something + if remaining_bytes.len() > 0 && remaining_bytes[0] == '$' as u8 { + // this means that the rdb segment got in here some how so I have to deal with it here + let (rdb_file, bytes_consumed) = RDBFile::from_bytes(remaining_bytes).unwrap(); + println!("rdb bytes: {}", bytes_consumed); + remaining_bytes = &remaining_bytes[bytes_consumed..]; + println!( + "remaining btyes length after rdb: {}", + remaining_bytes.len() + ); + println!("remaining btyes after rdb: {:?}", remaining_bytes); + } + + while !remaining_bytes.is_empty() { + match parse(remaining_bytes) { + Ok((resp, leftover)) => { + dbg!(&resp, leftover); + + // Update replication offset + let command_size = resp.to_resp_bytes().len(); + let mut state_guard = state.lock().unwrap(); + + let command = RedisCommand::from(resp); + + let response = command.execute( + cache.clone(), + config.clone(), + server_state.clone(), + broadcaster.clone(), + ); + + if let Some(ref mut stream) = state_guard.connection { + let _ = stream.write_all(&response); + let _ = stream.flush(); + } + + state_guard.master_repl_offset += command_size; + + remaining_bytes = leftover + } + Err(_) => { + // If parsing fails, break out of the loop + break; + } + } + } + } + }); } } @@ -235,53 +544,103 @@ impl RedisServer { } pub fn slave(port: String, master_host: String, master_port: String) -> Self { - RedisServer::Slave(SlaveServer::new(port, master_host, master_port, None)) + RedisServer::Slave(SlaveServer::new(port, master_host, master_port)) } // Helper methods to access common fields regardless of variant pub fn port(&self) -> &str { match self { - RedisServer::Master(master) => &master.port, - RedisServer::Slave(slave) => &slave.port, + RedisServer::Master(master) => &master.config.port, + RedisServer::Slave(slave) => &slave.config.port, + } + } + + pub fn config(&self) -> Arc { + match self { + RedisServer::Master(master) => master.config.clone(), + RedisServer::Slave(slave) => slave.config.clone(), } } pub fn set_port(&mut self, port: String) { match self { - RedisServer::Master(master) => master.port = port, - RedisServer::Slave(slave) => slave.port = port, + RedisServer::Master(master) => { + // Create new config with updated port + let new_config = Arc::new(ServerConfig { + port, + dir: master.config.dir.clone(), + dbfilename: master.config.dbfilename.clone(), + }); + master.config = new_config; + } + RedisServer::Slave(slave) => { + let new_config = Arc::new(ServerConfig { + port, + dir: slave.config.dir.clone(), + dbfilename: slave.config.dbfilename.clone(), + }); + slave.config = new_config; + } } } pub fn dir(&self) -> &Option { match self { - RedisServer::Master(master) => &master.dir, - RedisServer::Slave(slave) => &slave.dir, + RedisServer::Master(master) => &master.config.dir, + RedisServer::Slave(slave) => &slave.config.dir, } } pub fn set_dir(&mut self, dir: Option) { match self { - RedisServer::Master(master) => master.dir = dir, - RedisServer::Slave(slave) => slave.dir = dir, + RedisServer::Master(master) => { + let new_config = Arc::new(ServerConfig { + dir, + port: master.config.port.clone(), + dbfilename: master.config.dbfilename.clone(), + }); + master.config = new_config; + } + RedisServer::Slave(slave) => { + let new_config = Arc::new(ServerConfig { + dir, + port: slave.config.port.clone(), + dbfilename: slave.config.dbfilename.clone(), + }); + slave.config = new_config; + } } } pub fn dbfilename(&self) -> &Option { match self { - RedisServer::Master(master) => &master.dbfilename, - RedisServer::Slave(slave) => &slave.dbfilename, + RedisServer::Master(master) => &master.config.dbfilename, + RedisServer::Slave(slave) => &slave.config.dbfilename, } } pub fn set_dbfilename(&mut self, dbfilename: Option) { match self { - RedisServer::Master(master) => master.dbfilename = dbfilename, - RedisServer::Slave(slave) => slave.dbfilename = dbfilename, + RedisServer::Master(master) => { + let new_config = Arc::new(ServerConfig { + dbfilename, + port: master.config.port.clone(), + dir: master.config.dir.clone(), + }); + master.config = new_config; + } + RedisServer::Slave(slave) => { + let new_config = Arc::new(ServerConfig { + dbfilename, + port: slave.config.port.clone(), + dir: slave.config.dir.clone(), + }); + slave.config = new_config; + } } } - pub fn cache(&self) -> &SharedCache { + pub fn cache(&self) -> &SharedMut { match self { RedisServer::Master(master) => &master.cache, RedisServer::Slave(slave) => &slave.cache, @@ -290,29 +649,33 @@ impl RedisServer { pub fn repl_offset(&self) -> usize { match self { - RedisServer::Master(master) => *master.current_offset.lock().unwrap(), - RedisServer::Slave(slave) => *slave.master_repl_offset.lock().unwrap(), + RedisServer::Master(master) => master.state.lock().unwrap().current_offset, + RedisServer::Slave(slave) => slave.state.lock().unwrap().master_repl_offset, } } - pub fn set_cache(&mut self, cache: &SharedCache) { + pub fn set_cache(&mut self, cache: &SharedMut) { match self { RedisServer::Master(master) => master.cache = cache.clone(), RedisServer::Slave(slave) => slave.cache = cache.clone(), } } - pub fn set_repl_offset(&mut self, repl_offset: &Arc>) { + pub fn set_repl_offset(&mut self, repl_offset: usize) { match self { - RedisServer::Master(master) => master.current_offset = repl_offset.clone(), - RedisServer::Slave(slave) => slave.master_repl_offset = repl_offset.clone(), + RedisServer::Master(master) => { + master.state.lock().unwrap().current_offset = repl_offset; + } + RedisServer::Slave(slave) => { + slave.state.lock().unwrap().master_repl_offset = repl_offset + } } } pub fn repl_offset_increment(&mut self, amount: usize) { match self { - RedisServer::Master(master) => *master.current_offset.lock().unwrap() += amount, - RedisServer::Slave(slave) => *slave.master_repl_offset.lock().unwrap() += amount, + RedisServer::Master(master) => master.state.lock().unwrap().current_offset += amount, + RedisServer::Slave(slave) => slave.state.lock().unwrap().master_repl_offset += amount, } } @@ -326,12 +689,9 @@ impl RedisServer { pub fn add_replica(&mut self, replica_adr: SocketAddr, connection: Arc>) { match self { // TODO: Should probably add host to MasterServer and SlaveServer as member field - RedisServer::Master(master) => master.replicas.push(SlaveServer::new( - replica_adr.port().to_string(), - "localhost".to_owned(), - master.port().to_owned(), - Some(connection), - )), + RedisServer::Master(master) => { + master.add_replica(replica_adr, connection); + } RedisServer::Slave(_) => { unreachable!("Slaves don't have replicas") } diff --git a/src/shared_cache.rs b/src/shared_cache.rs index 4d6aec2..f7672f4 100644 --- a/src/shared_cache.rs +++ b/src/shared_cache.rs @@ -1,6 +1,5 @@ use std::{ collections::HashMap, - sync::{Arc, Mutex}, time::{SystemTime, UNIX_EPOCH}, }; @@ -24,4 +23,4 @@ impl CacheEntry { } } -pub type SharedCache = Arc>>; +pub type Cache = HashMap; -- cgit v1.2.3