diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/main.rs | 44 | ||||
| -rw-r--r-- | src/rdb.rs | 14 | ||||
| -rw-r--r-- | src/resp_commands.rs | 122 | ||||
| -rw-r--r-- | src/resp_parser.rs | 17 | ||||
| -rw-r--r-- | src/server.rs | 718 | ||||
| -rw-r--r-- | src/shared_cache.rs | 3 |
6 files changed, 654 insertions, 264 deletions
diff --git a/src/main.rs b/src/main.rs index 25bc6c9..3fe5559 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,15 +14,16 @@ use std::{ use codecrafters_redis::{ rdb::{KeyExpiry, ParseError, RDBFile, RedisValue}, resp_bytes, + server::SharedMut, shared_cache::*, }; -use codecrafters_redis::{resp_commands::RedisCommands, server::RedisServer}; +use codecrafters_redis::{resp_commands::RedisCommand, server::RedisServer}; use codecrafters_redis::{ resp_parser::{parse, RespType}, server::SlaveServer, }; -fn spawn_cleanup_thread(cache: SharedCache) { +fn spawn_cleanup_thread(cache: SharedMut<Cache>) { let cache_clone = cache.clone(); std::thread::spawn(move || { loop { @@ -68,12 +69,25 @@ fn handle_client(mut stream: TcpStream, server: Arc<Mutex<RedisServer>>) { }; let request = parse(&buffer).unwrap(); - let server_clone = Arc::clone(&server); - let response = RedisCommands::from(request.0.clone()).execute(server_clone); + + let mut server = server.lock().unwrap(); + + // Big State vars + let cache = server.cache().clone(); + let server_state = server.get_server_state().clone(); + let config = server.config(); + let brodcaster = server.as_broadcaster(); + + let response = RedisCommand::from(request.0.clone()).execute( + cache.clone(), + config, + server_state, + brodcaster, + ); let mut request_command = "".to_string(); - // FIXME: Find a solution for this mess!! + // FIXME: Find a solution for this mess!! (Design better API) match &request.0 { RespType::Array(arr) => { if let RespType::BulkString(s) = arr[0].clone() { @@ -84,22 +98,28 @@ fn handle_client(mut stream: TcpStream, server: Arc<Mutex<RedisServer>>) { } // Store the persistent connection - let shared_stream = Arc::new(Mutex::new( - stream.try_clone().expect("What could go wrong? :)"), - )); + // let shared_stream = Arc::new(Mutex::new( + // stream.try_clone().expect("What could go wrong? :)"), + // )); // if this true immediately write and send back rdb file after response // HACK: This just feels wrong I feel this shouldn't be handled here and should be handled // in the exexute command if request_command.starts_with("PSYNC") { stream.write(&response).unwrap(); - let _ = write_rdb_to_stream(&mut stream); - // handshake completed and I should add the server sending me the handshake to my replicas let replica_addr = stream .peer_addr() .expect("This shouldn't fail right? right?? :)"); - let mut server = server.lock().unwrap(); - server.add_replica(replica_addr, shared_stream); + + server.add_replica( + replica_addr, + Arc::new(Mutex::new( + stream.try_clone().expect("What could go wrong? :)"), + )), + ); + + let _ = write_rdb_to_stream(&mut stream); + // handshake completed and I should add the server sending me the handshake to my replicas } else { // write respose back to the client stream.write(&response).unwrap(); @@ -600,6 +600,20 @@ impl FromBytes for RDBFile { let mut total_consumed = 0; let mut databases = HashMap::new(); + // special case when rdb file is sent by the master to replicas in the following format + // $<length>/r/n<bytes_of_length_length> + if bytes[0] == '$' as u8 { + // consume up to the CRLF + let (consumed, rest) = bytes + .windows(2) + .position(|window| window == b"\r\n") + .map(|pos| (&bytes[..pos], &bytes[pos + 2..])) + .ok_or(ParseError::UnexpectedEof)?; + println!("Consumed {:?}", consumed); + remaining = rest; + total_consumed += consumed.len(); + } + // 1. Parse the RDB header ("REDIS" + version) let (header, consumed) = RDBHeader::from_bytes(remaining)?; total_consumed += consumed; diff --git a/src/resp_commands.rs b/src/resp_commands.rs index 453024c..981f7aa 100644 --- a/src/resp_commands.rs +++ b/src/resp_commands.rs @@ -1,7 +1,6 @@ use crate::server::*; use crate::{resp_parser::*, shared_cache::*}; use regex::Regex; -use std::sync::{Arc, Mutex}; use std::time::{SystemTime, UNIX_EPOCH}; #[derive(Debug, Clone)] @@ -110,7 +109,7 @@ fn extract_string(resp: &RespType) -> Option<String> { } } -pub enum RedisCommands { +pub enum RedisCommand { Ping, Echo(String), Get(String), @@ -123,15 +122,20 @@ pub enum RedisCommands { Invalid, } -impl RedisCommands { - pub fn execute(self, server: Arc<Mutex<RedisServer>>) -> Vec<u8> { - use RedisCommands as RC; +impl RedisCommand { + pub fn execute<'a>( + self, + cache: SharedMut<Cache>, + config: Shared<ServerConfig>, + server_state: ServerState, + broadcaster: Option<SharedMut<&mut dyn CanBroadcast>>, + ) -> Vec<u8> { + use RedisCommand as RC; match self { RC::Ping => resp_bytes!("PONG"), RC::Echo(echo_string) => resp_bytes!(echo_string), RC::Get(key) => { - let server = server.lock().unwrap(); - let mut cache = server.cache().lock().unwrap(); + let mut cache = cache.lock().unwrap(); match cache.get(&key).cloned() { Some(entry) => { @@ -146,8 +150,7 @@ impl RedisCommands { } } RC::Set(command) => { - let mut server = server.lock().unwrap(); - let mut cache = server.cache().lock().unwrap(); + let mut cache = cache.lock().unwrap(); // Check conditions (NX/XX) let key_exists = cache.contains_key(&command.key); @@ -190,17 +193,24 @@ impl RedisCommands { }, ); - // Broadcast SET to replicas after mutating local state - let broadcast_cmd = resp_bytes!(array => [ - resp!(bulk "SET"), - resp!(bulk command.key), - resp!(bulk command.value) - ]); - - // Unlock the mutex so that I can access to broadcast the messaage - drop(cache); + println!( + "My role is {:?} and I just inserted {} to {}", + server_state.role, &command.key, command.value + ); - server.broadcast_command(&broadcast_cmd); + // Broadcast to replicas if this is a master + if let Some(broadcaster) = broadcaster { + // Broadcast SET to replicas after mutating local state + let broadcast_cmd = resp_bytes!(array => [ + resp!(bulk "SET"), + resp!(bulk command.key), + resp!(bulk command.value) + ]); + broadcaster + .lock() + .unwrap() + .broadcast_command_to_replicas(&broadcast_cmd); + } if !command.get_old_value { return resp_bytes!("OK"); @@ -213,8 +223,9 @@ impl RedisCommands { } RC::ConfigGet(s) => { use RespType as RT; - let server = server.lock().unwrap(); - if let (Some(dir), Some(dbfilename)) = (server.dir(), server.dbfilename()) { + if let (Some(dir), Some(dbfilename)) = + (config.dir.clone(), config.dbfilename.clone()) + { match s.as_str() { "dir" => RT::Array(vec![ RT::BulkString(s.as_bytes().to_vec()), @@ -236,9 +247,7 @@ impl RedisCommands { use RespType as RT; let query = query.replace('*', ".*"); - - let server = server.lock().unwrap(); - let cache = server.cache().lock().unwrap(); + let cache = cache.lock().unwrap(); let regex = Regex::new(&query).unwrap(); let matching_keys: Vec<RT> = cache .keys() @@ -252,53 +261,42 @@ impl RedisCommands { } RC::Info(_sub_command) => { use RespType as RT; - let server = server.lock().unwrap(); - match &*server { - RedisServer::Master(master) => { - let response = format!( - "# Replication\r\nrole:{}master_replid:{}master_repl_offset:{}", - server.role(), - master.replid.clone().unwrap_or("".to_string()), - master.current_offset.lock().unwrap(), - ) - .as_bytes() - .to_vec(); - RT::BulkString(response).to_resp_bytes() - } - RedisServer::Slave(slave) => { - let response = format!( - "# Replication\r\nrole:{}master_replid:{}master_repl_offset:{}", - server.role(), - slave.master_replid.clone().unwrap_or("".to_string()), - slave.master_repl_offset.lock().unwrap() - ) - .as_bytes() - .to_vec(); - RT::BulkString(response).to_resp_bytes() - } - } + let role_str = match server_state.role { + ServerRole::Master => "master", + ServerRole::Slave => "slave", + }; + + let response = format!( + "# Replication\r\nrole:{}\r\nmaster_replid:{}\r\nmaster_repl_offset:{}", + role_str, server_state.repl_id, server_state.repl_offset, + ) + .into_bytes(); + + RT::BulkString(response).to_resp_bytes() } RC::ReplConf((op1, op2)) => match (op1.to_uppercase().as_str(), op2.as_str()) { ("GETACK", "*") => { println!("Did i even get here?"); - resp_bytes!(array => [resp!(bulk "REPLCONF"), resp!(bulk "ACK"), resp!(bulk server.lock().unwrap().repl_offset().to_string())]) + resp_bytes!(array => [resp!(bulk "REPLCONF"), resp!(bulk "ACK"), resp!(bulk server_state.repl_offset.to_string())]) } _ => resp_bytes!("OK"), }, RC::Psync((_, _)) => { - let server = server.lock().unwrap(); - if let RedisServer::Master(master) = &*server { - let response = format!( - "FULLRESYNC {} 0", - master.replid.clone().unwrap_or("".to_string()), - ); - resp_bytes!(response) - } else { - // TODO: Find a way to report this error back up the program trace - unreachable!("I should never come here") + // This should only be called on masters + match server_state.role { + ServerRole::Master => { + let response = format!("FULLRESYNC {} 0", server_state.repl_id); + resp_bytes!(response) + } + // This shouldn't happen, but handle gracefully + ServerRole::Slave => { + // TODO: I actually forgot that you could send error messages with redis do + // more of this across the whole codebase when it makes sense + resp_bytes!(error "ERR PSYNC not supported on slave") + } } } - RC::Invalid => todo!(), + RC::Invalid => resp_bytes!(error "ERR Invalid Command"), } } } @@ -403,7 +401,7 @@ impl SetOptionParser { } } -impl From<RespType> for RedisCommands { +impl From<RespType> for RedisCommand { fn from(value: RespType) -> Self { // Alternative approach using a more functional style with iterators let RespType::Array(command) = value else { diff --git a/src/resp_parser.rs b/src/resp_parser.rs index 952176c..646d2b1 100644 --- a/src/resp_parser.rs +++ b/src/resp_parser.rs @@ -380,18 +380,17 @@ pub fn parse_bulk_strings(bytes: &[u8]) -> Result<(RespType, &[u8]), RespError> return Err(RespError::UnexpectedEnd); } - let mut bulk_string: Vec<u8> = Vec::with_capacity(length as usize); + let bulk_string = remained[..length as usize].to_vec(); + let remaining_after_string = &remained[length as usize..]; - for i in 0..length { - bulk_string.push(remained[i as usize]); - } - - let consumed = RespType::BulkString(bulk_string); - - if !(&remained[length as usize..]).starts_with(b"\r\n") { + if !remaining_after_string.starts_with(b"\r\n") { return Err(RespError::UnexpectedEnd); } - return Ok((consumed, &remained[length as usize + 2..])); + + Ok(( + RespType::BulkString(bulk_string), + &remaining_after_string[2..], + )) } [] => Err(RespError::Custom(String::from("Empty data"))), } diff --git a/src/server.rs b/src/server.rs index 6da29eb..8c09f73 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,96 +1,279 @@ -use crate::resp_commands::RedisCommands; +use crate::rdb::{FromBytes, RDBFile}; +use crate::resp_commands::RedisCommand; use crate::resp_parser::{parse, RespType}; use std::collections::HashMap; use std::io::{Read, Write}; use std::net::{SocketAddr, TcpStream}; use std::sync::{Arc, Mutex}; -use std::time::Duration; use std::{env, thread}; -use crate::shared_cache::SharedCache; +use crate::shared_cache::Cache; + +// TODO: add functions to access member variables instead of accessing them directly #[derive(Debug, Clone)] -pub struct MasterServer { +pub struct ServerConfig { pub dir: Option<String>, pub dbfilename: Option<String>, pub port: String, - pub replid: Option<String>, - pub current_offset: Arc<Mutex<usize>>, - pub cache: SharedCache, - replicas: Vec<SlaveServer>, +} + +// Server state that commands might need +#[derive(Debug, Clone)] +pub struct ServerState { + pub role: ServerRole, + pub repl_id: String, + pub repl_offset: usize, +} + +#[derive(Debug, Clone)] +pub enum ServerRole { + Master, + Slave, +} + +// Trait for broadcasting - only masters can do this +pub trait CanBroadcast: Send { + fn broadcast_command_to_replicas(&mut self, command: &[u8]); +} + +// Implementation for Master +impl CanBroadcast for MasterServer { + fn broadcast_command_to_replicas(&mut self, command: &[u8]) { + self.broadcast_command(command); + } +} + +// Helper methods to extract server state +impl MasterServer { + pub fn get_server_state(&self) -> ServerState { + ServerState { + role: ServerRole::Master, + repl_id: self.get_replid(), + repl_offset: self.get_repl_offset(), + } + } +} + +impl SlaveServer { + pub fn get_server_state(&self) -> ServerState { + let state = self.state.lock().unwrap(); + ServerState { + role: ServerRole::Slave, + repl_id: state.master_replid.clone(), + repl_offset: state.master_repl_offset, + } + } +} + +impl RedisServer { + pub fn get_server_state(&self) -> ServerState { + match self { + RedisServer::Master(master) => master.get_server_state(), + RedisServer::Slave(slave) => slave.get_server_state(), + } + } + + pub fn as_broadcaster(&mut self) -> Option<SharedMut<&mut dyn CanBroadcast>> { + match self { + RedisServer::Master(master) => { + Some(Arc::new(Mutex::new(master as &mut dyn CanBroadcast))) + } + RedisServer::Slave(_) => None, + } + } +} + +#[derive(Debug)] +pub struct MasterState { + pub replid: String, + pub current_offset: usize, + pub replicas: Vec<ReplicaConnection>, +} + +// Slave-specific state +#[derive(Debug)] +pub struct SlaveState { + pub master_replid: String, + pub master_repl_offset: usize, + pub master_host: String, + pub master_port: String, + pub connection: Option<TcpStream>, +} + +#[derive(Debug)] +pub struct ReplicaConnection { + pub port: String, + pub connection: Arc<Mutex<TcpStream>>, +} + +pub type SharedMut<T> = Arc<Mutex<T>>; +pub type Shared<T> = Arc<T>; + +#[derive(Debug, Clone)] +pub struct MasterServer { + config: Shared<ServerConfig>, + state: SharedMut<MasterState>, + cache: SharedMut<Cache>, } impl MasterServer { fn new() -> Self { - Self { + let config = Arc::new(ServerConfig { dir: None, dbfilename: None, port: "6379".to_string(), - replid: Some("8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb".to_string()), - current_offset: Arc::new(Mutex::new(0)), - cache: Arc::new(Mutex::new(HashMap::new())), + }); + + let state = Arc::new(Mutex::new(MasterState { + replid: "8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb".to_string(), + current_offset: 0, replicas: vec![], + })); + + let cache = Arc::new(Mutex::new(HashMap::new())); + + Self { + config, + state, + cache, } } fn port(&self) -> &str { - &self.port + &self.config.port } pub fn broadcast_command(&mut self, command: &[u8]) { - println!("Hello from brodcast"); - self.replicas.retain(|replica| { - if let Some(conn) = &replica.connection { - let mut conn = conn.lock().unwrap(); - if let Err(e) = conn.write_all(command) { - eprintln!("Failed to send to replica {}: {}", replica.port, e); - false // Drop dead connections - } else { - true - } + let mut state = self.state.lock().unwrap(); + + state.replicas.retain(|replica| { + let mut conn = replica.connection.lock().unwrap(); + if let Err(e) = conn.write_all(command) { + eprintln!("Failed to send to replica {}: {}", replica.port, e); + false // Drop dead connections } else { - false + true } - }); + }) + } + + pub fn add_replica(&self, replica_addr: SocketAddr, connection: Arc<Mutex<TcpStream>>) { + let replica = ReplicaConnection { + port: replica_addr.port().to_string(), + connection, + }; + + self.state.lock().unwrap().replicas.push(replica); + } + + pub fn get_repl_offset(&self) -> usize { + self.state.lock().unwrap().current_offset + } + + pub fn increment_repl_offset(&self, amount: usize) { + self.state.lock().unwrap().current_offset += amount; + } + + pub fn get_replid(&self) -> String { + self.state.lock().unwrap().replid.clone() } } #[derive(Debug, Clone)] pub struct SlaveServer { - pub dir: Option<String>, - pub dbfilename: Option<String>, - pub port: String, - pub master_replid: Option<String>, - pub master_repl_offset: Arc<Mutex<usize>>, - pub master_host: String, - pub master_port: String, - pub connection: Option<Arc<Mutex<TcpStream>>>, - pub cache: SharedCache, + config: Shared<ServerConfig>, + state: SharedMut<SlaveState>, + cache: SharedMut<Cache>, +} + +fn read_rdb_from_stream<R: Read>(reader: &mut R) -> Result<Vec<u8>, String> { + let mut buffer = [0u8; 1024]; + + // Read until we get the length prefix ($<length>\r\n) + let mut length_bytes = Vec::new(); + + loop { + let bytes_read = reader + .read(&mut buffer) + .map_err(|e| format!("Failed to read: {}", e))?; + if bytes_read == 0 { + return Err("Connection closed while reading RDB length".to_string()); + } + + length_bytes.extend_from_slice(&buffer[..bytes_read]); + + if length_bytes.len() >= 2 && &length_bytes[length_bytes.len() - 2..] == b"\r\n" { + break; + } + } + + // Parse the length prefix ($<length>\r\n) + let (resp, remaining) = + parse(&length_bytes).map_err(|e| format!("Failed to parse RDB length: {:?}", e))?; + let length = match resp { + RespType::BulkString(_) => { + let len_str = String::from_utf8_lossy(&length_bytes[1..length_bytes.len() - 2]); + len_str + .parse::<usize>() + .map_err(|e| format!("Invalid RDB length: {}", e))? + } + _ => return Err("Expected bulk string for RDB length".to_string()), + }; + + // Read the exact number of bytes for the RDB file + let mut rdb_bytes = vec![0u8; length]; + let mut total_read = remaining.len(); + rdb_bytes[..remaining.len()].copy_from_slice(remaining); + + while total_read < length { + let bytes_read = reader + .read(&mut buffer) + .map_err(|e| format!("Failed to read RDB: {}", e))?; + if bytes_read == 0 { + return Err("Connection closed while reading RDB file".to_string()); + } + let end = (total_read + bytes_read).min(length); + rdb_bytes[total_read..end].copy_from_slice(&buffer[..(end - total_read)]); + total_read += bytes_read; + } + + Ok(rdb_bytes) } impl SlaveServer { - fn new( - port: String, - master_host: String, - master_port: String, - connection: Option<Arc<Mutex<TcpStream>>>, - ) -> Self { - Self { + fn new(port: String, master_host: String, master_port: String) -> Self { + let config = Arc::new(ServerConfig { dir: None, dbfilename: None, port, - master_replid: Some("8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb".to_string()), - master_repl_offset: Arc::new(Mutex::new(0)), + }); + + let state = Arc::new(Mutex::new(SlaveState { + master_replid: "8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb".to_string(), + master_repl_offset: 0, master_host, master_port, - connection, - cache: Arc::new(Mutex::new(HashMap::new())), + connection: None, + })); + + let cache = Arc::new(Mutex::new(HashMap::new())); + + Self { + config, + state, + cache, } } + pub fn increment_repl_offset(&mut self, amount: usize) { + self.state.lock().unwrap().master_repl_offset += amount; + } + fn connect(&self) -> Result<TcpStream, std::io::Error> { - let master_address = format!("{}:{}", self.master_host, self.master_port); - return TcpStream::connect(master_address); + let state = self.state.lock().unwrap(); + let master_address = format!("{}:{}", state.master_host, state.master_port); + TcpStream::connect(master_address) } fn handshake(&mut self) -> Result<(), String> { @@ -98,128 +281,254 @@ impl SlaveServer { Ok(mut stream) => { let mut buffer = [0; 1024]; - let mut send_command = |command: &[u8]| -> Result<(), String> { + let mut send_command = |command: &[u8], read: bool| -> Result<(), String> { stream .write_all(command) .map_err(|e| format!("Failed to send: {}", e))?; - match stream.read(&mut buffer) { - Ok(0) | Err(_) => return Ok(()), // connection closed or error - Ok(_) => { - println!("Recieved some bytes here!"); - Ok(()) + if read { + match stream.read(&mut buffer) { + Ok(0) | Err(_) => return Ok(()), // connection closed or error + Ok(_) => { + println!("Recieved some bytes here!"); + Ok(()) + } } + } else { + Ok(()) } }; - // PING - send_command(&resp_bytes!(array => [resp!(bulk "PING")]))?; - - // REPLCONF listening-port <PORT> - send_command(&resp_bytes!(array => [ - resp!(bulk "REPLCONF"), - resp!(bulk "listening-port"), - resp!(bulk self.port.clone()) - ]))?; + // Step1: PING + send_command(&resp_bytes!(array => [resp!(bulk "PING")]), true)?; + + let port = self.config.port.clone(); + // Step2: REPLCONF listening-port <PORT> + send_command( + &resp_bytes!(array => [ + resp!(bulk "REPLCONF"), + resp!(bulk "listening-port"), + resp!(bulk port) + ]), + true, + )?; + + // Step3: REPLCONF capa psync2 + send_command( + &resp_bytes!(array => [ + resp!(bulk "REPLCONF"), + resp!(bulk "capa"), + resp!(bulk "psync2") + ]), + true, + )?; + + // Step 4: PSYNC <REPL_ID> <REPL_OFFSSET> + send_command( + &resp_bytes!(array => [ + resp!(bulk "PSYNC"), + resp!(bulk "?"), + resp!(bulk "-1") + ]), + false, + )?; + + // Step 5: Read FULLRESYNC response + let bytes_read = stream + .read(&mut buffer) + .map_err(|e| format!("Failed to read FULLRESYNC: {}", e))?; + let (parsed, mut rest) = parse(&buffer[..bytes_read]) + .map_err(|e| format!("Failed to parse FULLRESYNC: {:?}", e))?; + match parsed { + RespType::SimpleString(s) if s.starts_with("FULLRESYNC") => { + // Expected response + } + _ => return Err("Invalid FULLRESYNC response".to_string()), + } - // REPLCONF capa psync2 - send_command(&resp_bytes!(array => [ - resp!(bulk "REPLCONF"), - resp!(bulk "capa"), - resp!(bulk "psync2") - ]))?; + println!("rest: {:?}", rest); - // PSYNC <REPL_ID> <REPL_OFFSSET> - send_command(&resp_bytes!(array => [ - resp!(bulk "PSYNC"), - resp!(bulk "?"), - resp!(bulk "-1") - ]))?; + println!("FULLRESYNC response bytes read: {}", bytes_read); - thread::sleep(Duration::new(1, 0)); + // So there is an interesting behaviour where the FULLRESYNC + RDB and if you are + // really lucky the REPLCONF would all get sent in one TCP segment so I should + // assume I would get nice segments refelecting each command + if !rest.is_empty() { + // TODO: Sync the rdb_file with the slave's cache + // TODO: Find a way to propagate the error up the stack by using anyhow or something + let (rdb_file, bytes_consumed) = RDBFile::from_bytes(rest).unwrap(); + rest = &rest[bytes_consumed..]; + println!("rdb bytes: {}", bytes_consumed); + println!("remaining btyes after rdb: {}", rest.len()); + } // Store the persistent connection - let shared_stream = Arc::new(Mutex::new(stream)); - self.connection = Some(shared_stream.clone()); - - // Spawn the background listener thread - let cache_clone = self.cache.clone(); - let master_repl_offset = self.master_repl_offset.clone(); - thread::spawn(move || { - loop { - let bytes_read = { - let mut stream = shared_stream.lock().unwrap(); - match stream.read(&mut buffer) { - Ok(0) => { - println!("Master disconnected"); - break; - } - Ok(n) => n, - Err(e) => { - eprintln!("Error reading from master: {}", e); - break; - } - } - }; // stream lock is dropped here - - dbg!(&bytes_read); - - // Parse and execute all commands in the buffer - let mut remaining_bytes = &buffer[..bytes_read]; - - dbg!(&remaining_bytes); - - let mut flag = true; - - while !remaining_bytes.is_empty() || flag { - flag = false; - match parse(remaining_bytes) { - Ok((parsed_command, leftover)) => { - dbg!(&parsed_command, leftover); - - // Create a temporary slave server for command execution - let temp_slave = RedisServer::Slave(SlaveServer::new( - "0".to_string(), // dummy port - "localhost".to_string(), // dummy host - "6379".to_string(), // dummy master port - None, // no connection needed for execution - )); - - // Set the cache to our actual cache - let mut temp_slave = temp_slave; - temp_slave.set_repl_offset(&master_repl_offset); - temp_slave.set_cache(&cache_clone); - let server_arc = Arc::new(Mutex::new(temp_slave)); - - let response = RedisCommands::from(parsed_command.clone()) - .execute(server_arc.clone()); - - let mut shared_stream = shared_stream.lock().unwrap(); - - if let RespType::Array(arr) = &parsed_command { - if arr[0].to_resp_bytes() == b"REPLCONF".to_vec() { - shared_stream.write(&response).unwrap(); - } - } - - // Update remaining bytes for next iteration - remaining_bytes = leftover; - server_arc.lock().unwrap().repl_offset_increment( - parsed_command.to_resp_bytes().len(), - ); - } - Err(_) => { - // If parsing fails, break out of the loop - break; - } + self.state.lock().unwrap().connection = Some(stream); + self.start_replication_listener(&mut rest); + + Ok(()) + } + Err(e) => Err(format!("Master node doesn't exist: {}", e)), + } + } + + // TODO: This should return a Result + fn start_replication_listener<'a>(&'a self, rest: &mut &[u8]) { + let state = self.state.clone(); + let cache = self.cache.clone(); + let config = self.config.clone(); + let server_state = self.get_server_state(); + let broadcaster = None::<Arc<Mutex<&mut dyn CanBroadcast>>>; + + // if it's not empty then there is probably a REPLCONF command sent and I should handle it + // before reading anymore bytes + if !rest.is_empty() { + // TODO: Sync the rdb_file with the slave's cache + // TODO: Find a way to propagate the error up the stack by using anyhow or something + if rest[0] == '$' as u8 { + // this means that the rdb segment got in here some how so I have to deal with it here + let (rdb_file, bytes_consumed) = RDBFile::from_bytes(rest).unwrap(); + *rest = &rest[bytes_consumed..]; + + println!("rdb bytes: {}", bytes_consumed); + println!("remaining btyes after rdb: {}", rest.len()); + if rest.len() > 0 { + match parse(rest) { + Ok((resp, leftover)) => { + dbg!(&resp, leftover); + + // Update replication offset + let command_size = resp.to_resp_bytes().len(); + let mut state_guard = state.lock().unwrap(); + + let command = RedisCommand::from(resp); + + let response = command.execute( + cache.clone(), + config.clone(), + server_state.clone(), + broadcaster.clone(), + ); + + if let Some(ref mut stream) = state_guard.connection { + let _ = stream.write_all(&response); + let _ = stream.flush(); } + + state_guard.master_repl_offset += command_size; } + Err(_) => {} } - }); - Ok(()) + } + } else { + match parse(rest) { + Ok((resp, leftover)) => { + dbg!(&resp, leftover); + + // Update replication offset + let command_size = resp.to_resp_bytes().len(); + let |
