diff options
| author | omagdy <omar.professional8777@gmail.com> | 2025-07-24 06:04:32 +0300 |
|---|---|---|
| committer | omagdy <omar.professional8777@gmail.com> | 2025-07-24 06:04:32 +0300 |
| commit | 30e6b478d7cd286b68da21d7a5aa5426c588cd02 (patch) | |
| tree | 719096c1bd3975e192bc5d6608f9f27f98e10e16 /src/server.rs | |
| parent | 561fb8d783cc000b7b9cc204e10618464c092e18 (diff) | |
| download | redis-rust-30e6b478d7cd286b68da21d7a5aa5426c588cd02.tar.xz redis-rust-30e6b478d7cd286b68da21d7a5aa5426c588cd02.zip | |
refactor: Refactor how I model the state and config and cache of the server with sepraration of concerns
Diffstat (limited to 'src/server.rs')
| -rw-r--r-- | src/server.rs | 718 |
1 files changed, 539 insertions, 179 deletions
diff --git a/src/server.rs b/src/server.rs index 6da29eb..8c09f73 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,96 +1,279 @@ -use crate::resp_commands::RedisCommands; +use crate::rdb::{FromBytes, RDBFile}; +use crate::resp_commands::RedisCommand; use crate::resp_parser::{parse, RespType}; use std::collections::HashMap; use std::io::{Read, Write}; use std::net::{SocketAddr, TcpStream}; use std::sync::{Arc, Mutex}; -use std::time::Duration; use std::{env, thread}; -use crate::shared_cache::SharedCache; +use crate::shared_cache::Cache; + +// TODO: add functions to access member variables instead of accessing them directly #[derive(Debug, Clone)] -pub struct MasterServer { +pub struct ServerConfig { pub dir: Option<String>, pub dbfilename: Option<String>, pub port: String, - pub replid: Option<String>, - pub current_offset: Arc<Mutex<usize>>, - pub cache: SharedCache, - replicas: Vec<SlaveServer>, +} + +// Server state that commands might need +#[derive(Debug, Clone)] +pub struct ServerState { + pub role: ServerRole, + pub repl_id: String, + pub repl_offset: usize, +} + +#[derive(Debug, Clone)] +pub enum ServerRole { + Master, + Slave, +} + +// Trait for broadcasting - only masters can do this +pub trait CanBroadcast: Send { + fn broadcast_command_to_replicas(&mut self, command: &[u8]); +} + +// Implementation for Master +impl CanBroadcast for MasterServer { + fn broadcast_command_to_replicas(&mut self, command: &[u8]) { + self.broadcast_command(command); + } +} + +// Helper methods to extract server state +impl MasterServer { + pub fn get_server_state(&self) -> ServerState { + ServerState { + role: ServerRole::Master, + repl_id: self.get_replid(), + repl_offset: self.get_repl_offset(), + } + } +} + +impl SlaveServer { + pub fn get_server_state(&self) -> ServerState { + let state = self.state.lock().unwrap(); + ServerState { + role: ServerRole::Slave, + repl_id: state.master_replid.clone(), + repl_offset: state.master_repl_offset, + } + } +} + +impl RedisServer { + pub fn get_server_state(&self) -> ServerState { + match self { + RedisServer::Master(master) => master.get_server_state(), + RedisServer::Slave(slave) => slave.get_server_state(), + } + } + + pub fn as_broadcaster(&mut self) -> Option<SharedMut<&mut dyn CanBroadcast>> { + match self { + RedisServer::Master(master) => { + Some(Arc::new(Mutex::new(master as &mut dyn CanBroadcast))) + } + RedisServer::Slave(_) => None, + } + } +} + +#[derive(Debug)] +pub struct MasterState { + pub replid: String, + pub current_offset: usize, + pub replicas: Vec<ReplicaConnection>, +} + +// Slave-specific state +#[derive(Debug)] +pub struct SlaveState { + pub master_replid: String, + pub master_repl_offset: usize, + pub master_host: String, + pub master_port: String, + pub connection: Option<TcpStream>, +} + +#[derive(Debug)] +pub struct ReplicaConnection { + pub port: String, + pub connection: Arc<Mutex<TcpStream>>, +} + +pub type SharedMut<T> = Arc<Mutex<T>>; +pub type Shared<T> = Arc<T>; + +#[derive(Debug, Clone)] +pub struct MasterServer { + config: Shared<ServerConfig>, + state: SharedMut<MasterState>, + cache: SharedMut<Cache>, } impl MasterServer { fn new() -> Self { - Self { + let config = Arc::new(ServerConfig { dir: None, dbfilename: None, port: "6379".to_string(), - replid: Some("8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb".to_string()), - current_offset: Arc::new(Mutex::new(0)), - cache: Arc::new(Mutex::new(HashMap::new())), + }); + + let state = Arc::new(Mutex::new(MasterState { + replid: "8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb".to_string(), + current_offset: 0, replicas: vec![], + })); + + let cache = Arc::new(Mutex::new(HashMap::new())); + + Self { + config, + state, + cache, } } fn port(&self) -> &str { - &self.port + &self.config.port } pub fn broadcast_command(&mut self, command: &[u8]) { - println!("Hello from brodcast"); - self.replicas.retain(|replica| { - if let Some(conn) = &replica.connection { - let mut conn = conn.lock().unwrap(); - if let Err(e) = conn.write_all(command) { - eprintln!("Failed to send to replica {}: {}", replica.port, e); - false // Drop dead connections - } else { - true - } + let mut state = self.state.lock().unwrap(); + + state.replicas.retain(|replica| { + let mut conn = replica.connection.lock().unwrap(); + if let Err(e) = conn.write_all(command) { + eprintln!("Failed to send to replica {}: {}", replica.port, e); + false // Drop dead connections } else { - false + true } - }); + }) + } + + pub fn add_replica(&self, replica_addr: SocketAddr, connection: Arc<Mutex<TcpStream>>) { + let replica = ReplicaConnection { + port: replica_addr.port().to_string(), + connection, + }; + + self.state.lock().unwrap().replicas.push(replica); + } + + pub fn get_repl_offset(&self) -> usize { + self.state.lock().unwrap().current_offset + } + + pub fn increment_repl_offset(&self, amount: usize) { + self.state.lock().unwrap().current_offset += amount; + } + + pub fn get_replid(&self) -> String { + self.state.lock().unwrap().replid.clone() } } #[derive(Debug, Clone)] pub struct SlaveServer { - pub dir: Option<String>, - pub dbfilename: Option<String>, - pub port: String, - pub master_replid: Option<String>, - pub master_repl_offset: Arc<Mutex<usize>>, - pub master_host: String, - pub master_port: String, - pub connection: Option<Arc<Mutex<TcpStream>>>, - pub cache: SharedCache, + config: Shared<ServerConfig>, + state: SharedMut<SlaveState>, + cache: SharedMut<Cache>, +} + +fn read_rdb_from_stream<R: Read>(reader: &mut R) -> Result<Vec<u8>, String> { + let mut buffer = [0u8; 1024]; + + // Read until we get the length prefix ($<length>\r\n) + let mut length_bytes = Vec::new(); + + loop { + let bytes_read = reader + .read(&mut buffer) + .map_err(|e| format!("Failed to read: {}", e))?; + if bytes_read == 0 { + return Err("Connection closed while reading RDB length".to_string()); + } + + length_bytes.extend_from_slice(&buffer[..bytes_read]); + + if length_bytes.len() >= 2 && &length_bytes[length_bytes.len() - 2..] == b"\r\n" { + break; + } + } + + // Parse the length prefix ($<length>\r\n) + let (resp, remaining) = + parse(&length_bytes).map_err(|e| format!("Failed to parse RDB length: {:?}", e))?; + let length = match resp { + RespType::BulkString(_) => { + let len_str = String::from_utf8_lossy(&length_bytes[1..length_bytes.len() - 2]); + len_str + .parse::<usize>() + .map_err(|e| format!("Invalid RDB length: {}", e))? + } + _ => return Err("Expected bulk string for RDB length".to_string()), + }; + + // Read the exact number of bytes for the RDB file + let mut rdb_bytes = vec![0u8; length]; + let mut total_read = remaining.len(); + rdb_bytes[..remaining.len()].copy_from_slice(remaining); + + while total_read < length { + let bytes_read = reader + .read(&mut buffer) + .map_err(|e| format!("Failed to read RDB: {}", e))?; + if bytes_read == 0 { + return Err("Connection closed while reading RDB file".to_string()); + } + let end = (total_read + bytes_read).min(length); + rdb_bytes[total_read..end].copy_from_slice(&buffer[..(end - total_read)]); + total_read += bytes_read; + } + + Ok(rdb_bytes) } impl SlaveServer { - fn new( - port: String, - master_host: String, - master_port: String, - connection: Option<Arc<Mutex<TcpStream>>>, - ) -> Self { - Self { + fn new(port: String, master_host: String, master_port: String) -> Self { + let config = Arc::new(ServerConfig { dir: None, dbfilename: None, port, - master_replid: Some("8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb".to_string()), - master_repl_offset: Arc::new(Mutex::new(0)), + }); + + let state = Arc::new(Mutex::new(SlaveState { + master_replid: "8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb".to_string(), + master_repl_offset: 0, master_host, master_port, - connection, - cache: Arc::new(Mutex::new(HashMap::new())), + connection: None, + })); + + let cache = Arc::new(Mutex::new(HashMap::new())); + + Self { + config, + state, + cache, } } + pub fn increment_repl_offset(&mut self, amount: usize) { + self.state.lock().unwrap().master_repl_offset += amount; + } + fn connect(&self) -> Result<TcpStream, std::io::Error> { - let master_address = format!("{}:{}", self.master_host, self.master_port); - return TcpStream::connect(master_address); + let state = self.state.lock().unwrap(); + let master_address = format!("{}:{}", state.master_host, state.master_port); + TcpStream::connect(master_address) } fn handshake(&mut self) -> Result<(), String> { @@ -98,128 +281,254 @@ impl SlaveServer { Ok(mut stream) => { let mut buffer = [0; 1024]; - let mut send_command = |command: &[u8]| -> Result<(), String> { + let mut send_command = |command: &[u8], read: bool| -> Result<(), String> { stream .write_all(command) .map_err(|e| format!("Failed to send: {}", e))?; - match stream.read(&mut buffer) { - Ok(0) | Err(_) => return Ok(()), // connection closed or error - Ok(_) => { - println!("Recieved some bytes here!"); - Ok(()) + if read { + match stream.read(&mut buffer) { + Ok(0) | Err(_) => return Ok(()), // connection closed or error + Ok(_) => { + println!("Recieved some bytes here!"); + Ok(()) + } } + } else { + Ok(()) } }; - // PING - send_command(&resp_bytes!(array => [resp!(bulk "PING")]))?; - - // REPLCONF listening-port <PORT> - send_command(&resp_bytes!(array => [ - resp!(bulk "REPLCONF"), - resp!(bulk "listening-port"), - resp!(bulk self.port.clone()) - ]))?; + // Step1: PING + send_command(&resp_bytes!(array => [resp!(bulk "PING")]), true)?; + + let port = self.config.port.clone(); + // Step2: REPLCONF listening-port <PORT> + send_command( + &resp_bytes!(array => [ + resp!(bulk "REPLCONF"), + resp!(bulk "listening-port"), + resp!(bulk port) + ]), + true, + )?; + + // Step3: REPLCONF capa psync2 + send_command( + &resp_bytes!(array => [ + resp!(bulk "REPLCONF"), + resp!(bulk "capa"), + resp!(bulk "psync2") + ]), + true, + )?; + + // Step 4: PSYNC <REPL_ID> <REPL_OFFSSET> + send_command( + &resp_bytes!(array => [ + resp!(bulk "PSYNC"), + resp!(bulk "?"), + resp!(bulk "-1") + ]), + false, + )?; + + // Step 5: Read FULLRESYNC response + let bytes_read = stream + .read(&mut buffer) + .map_err(|e| format!("Failed to read FULLRESYNC: {}", e))?; + let (parsed, mut rest) = parse(&buffer[..bytes_read]) + .map_err(|e| format!("Failed to parse FULLRESYNC: {:?}", e))?; + match parsed { + RespType::SimpleString(s) if s.starts_with("FULLRESYNC") => { + // Expected response + } + _ => return Err("Invalid FULLRESYNC response".to_string()), + } - // REPLCONF capa psync2 - send_command(&resp_bytes!(array => [ - resp!(bulk "REPLCONF"), - resp!(bulk "capa"), - resp!(bulk "psync2") - ]))?; + println!("rest: {:?}", rest); - // PSYNC <REPL_ID> <REPL_OFFSSET> - send_command(&resp_bytes!(array => [ - resp!(bulk "PSYNC"), - resp!(bulk "?"), - resp!(bulk "-1") - ]))?; + println!("FULLRESYNC response bytes read: {}", bytes_read); - thread::sleep(Duration::new(1, 0)); + // So there is an interesting behaviour where the FULLRESYNC + RDB and if you are + // really lucky the REPLCONF would all get sent in one TCP segment so I should + // assume I would get nice segments refelecting each command + if !rest.is_empty() { + // TODO: Sync the rdb_file with the slave's cache + // TODO: Find a way to propagate the error up the stack by using anyhow or something + let (rdb_file, bytes_consumed) = RDBFile::from_bytes(rest).unwrap(); + rest = &rest[bytes_consumed..]; + println!("rdb bytes: {}", bytes_consumed); + println!("remaining btyes after rdb: {}", rest.len()); + } // Store the persistent connection - let shared_stream = Arc::new(Mutex::new(stream)); - self.connection = Some(shared_stream.clone()); - - // Spawn the background listener thread - let cache_clone = self.cache.clone(); - let master_repl_offset = self.master_repl_offset.clone(); - thread::spawn(move || { - loop { - let bytes_read = { - let mut stream = shared_stream.lock().unwrap(); - match stream.read(&mut buffer) { - Ok(0) => { - println!("Master disconnected"); - break; - } - Ok(n) => n, - Err(e) => { - eprintln!("Error reading from master: {}", e); - break; - } - } - }; // stream lock is dropped here - - dbg!(&bytes_read); - - // Parse and execute all commands in the buffer - let mut remaining_bytes = &buffer[..bytes_read]; - - dbg!(&remaining_bytes); - - let mut flag = true; - - while !remaining_bytes.is_empty() || flag { - flag = false; - match parse(remaining_bytes) { - Ok((parsed_command, leftover)) => { - dbg!(&parsed_command, leftover); - - // Create a temporary slave server for command execution - let temp_slave = RedisServer::Slave(SlaveServer::new( - "0".to_string(), // dummy port - "localhost".to_string(), // dummy host - "6379".to_string(), // dummy master port - None, // no connection needed for execution - )); - - // Set the cache to our actual cache - let mut temp_slave = temp_slave; - temp_slave.set_repl_offset(&master_repl_offset); - temp_slave.set_cache(&cache_clone); - let server_arc = Arc::new(Mutex::new(temp_slave)); - - let response = RedisCommands::from(parsed_command.clone()) - .execute(server_arc.clone()); - - let mut shared_stream = shared_stream.lock().unwrap(); - - if let RespType::Array(arr) = &parsed_command { - if arr[0].to_resp_bytes() == b"REPLCONF".to_vec() { - shared_stream.write(&response).unwrap(); - } - } - - // Update remaining bytes for next iteration - remaining_bytes = leftover; - server_arc.lock().unwrap().repl_offset_increment( - parsed_command.to_resp_bytes().len(), - ); - } - Err(_) => { - // If parsing fails, break out of the loop - break; - } + self.state.lock().unwrap().connection = Some(stream); + self.start_replication_listener(&mut rest); + + Ok(()) + } + Err(e) => Err(format!("Master node doesn't exist: {}", e)), + } + } + + // TODO: This should return a Result + fn start_replication_listener<'a>(&'a self, rest: &mut &[u8]) { + let state = self.state.clone(); + let cache = self.cache.clone(); + let config = self.config.clone(); + let server_state = self.get_server_state(); + let broadcaster = None::<Arc<Mutex<&mut dyn CanBroadcast>>>; + + // if it's not empty then there is probably a REPLCONF command sent and I should handle it + // before reading anymore bytes + if !rest.is_empty() { + // TODO: Sync the rdb_file with the slave's cache + // TODO: Find a way to propagate the error up the stack by using anyhow or something + if rest[0] == '$' as u8 { + // this means that the rdb segment got in here some how so I have to deal with it here + let (rdb_file, bytes_consumed) = RDBFile::from_bytes(rest).unwrap(); + *rest = &rest[bytes_consumed..]; + + println!("rdb bytes: {}", bytes_consumed); + println!("remaining btyes after rdb: {}", rest.len()); + if rest.len() > 0 { + match parse(rest) { + Ok((resp, leftover)) => { + dbg!(&resp, leftover); + + // Update replication offset + let command_size = resp.to_resp_bytes().len(); + let mut state_guard = state.lock().unwrap(); + + let command = RedisCommand::from(resp); + + let response = command.execute( + cache.clone(), + config.clone(), + server_state.clone(), + broadcaster.clone(), + ); + + if let Some(ref mut stream) = state_guard.connection { + let _ = stream.write_all(&response); + let _ = stream.flush(); } + + state_guard.master_repl_offset += command_size; } + Err(_) => {} } - }); - Ok(()) + } + } else { + match parse(rest) { + Ok((resp, leftover)) => { + dbg!(&resp, leftover); + + // Update replication offset + let command_size = resp.to_resp_bytes().len(); + let mut state_guard = state.lock().unwrap(); + + let command = RedisCommand::from(resp); + + let response = command.execute( + cache.clone(), + config.clone(), + server_state.clone(), + broadcaster.clone(), + ); + + if let Some(ref mut stream) = state_guard.connection { + let _ = stream.write_all(&response); + let _ = stream.flush(); + } + + state_guard.master_repl_offset += command_size; + } + Err(_) => {} + } } - Err(e) => Err(format!("Master node doesn't exist: {}", e)), } + + // Spawn the background listener thread + thread::spawn(move || { + let mut buffer = [0u8; 1024]; + loop { + let bytes_read = { + let mut state_guard = state.lock().unwrap(); + if let Some(ref mut stream) = state_guard.connection { + match stream.read(&mut buffer) { + Ok(0) => { + println!("Master disconnected"); + break; + } + Ok(n) => n, + Err(e) => { + eprintln!("Error reading from master: {}", e); + break; + } + } + } else { + break; + } + }; + + println!("After handshake: {}", bytes_read); + + // Parse and execute all commands in the buffer + let mut remaining_bytes = &buffer[..bytes_read]; + + println!("remaining_bytes: {:?}", &remaining_bytes); + + // TODO: Sync the rdb_file with the slave's cache + // TODO: Find a way to propagate the error up the stack by using anyhow or something + if remaining_bytes.len() > 0 && remaining_bytes[0] == '$' as u8 { + // this means that the rdb segment got in here some how so I have to deal with it here + let (rdb_file, bytes_consumed) = RDBFile::from_bytes(remaining_bytes).unwrap(); + println!("rdb bytes: {}", bytes_consumed); + remaining_bytes = &remaining_bytes[bytes_consumed..]; + println!( + "remaining btyes length after rdb: {}", + remaining_bytes.len() + ); + println!("remaining btyes after rdb: {:?}", remaining_bytes); + } + + while !remaining_bytes.is_empty() { + match parse(remaining_bytes) { + Ok((resp, leftover)) => { + dbg!(&resp, leftover); + + // Update replication offset + let command_size = resp.to_resp_bytes().len(); + let mut state_guard = state.lock().unwrap(); + + let command = RedisCommand::from(resp); + + let response = command.execute( + cache.clone(), + config.clone(), + server_state.clone(), + broadcaster.clone(), + ); + + if let Some(ref mut stream) = state_guard.connection { + let _ = stream.write_all(&response); + let _ = stream.flush(); + } + + state_guard.master_repl_offset += command_size; + + remaining_bytes = leftover + } + Err(_) => { + // If parsing fails, break out of the loop + break; + } + } + } + } + }); } } @@ -235,53 +544,103 @@ impl RedisServer { } pub fn slave(port: String, master_host: String, master_port: String) -> Self { - RedisServer::Slave(SlaveServer::new(port, master_host, master_port, None)) + RedisServer::Slave(SlaveServer::new(port, master_host, master_port)) } // Helper methods to access common fields regardless of variant pub fn port(&self) -> &str { match self { - RedisServer::Master(master) => &master.port, - RedisServer::Slave(slave) => &slave.port, + RedisServer::Master(master) => &master.config.port, + RedisServer::Slave(slave) => &slave.config.port, + } + } + + pub fn config(&self) -> Arc<ServerConfig> { + match self { + RedisServer::Master(master) => master.config.clone(), + RedisServer::Slave(slave) => slave.config.clone(), } } pub fn set_port(&mut self, port: String) { match self { - RedisServer::Master(master) => master.port = port, - RedisServer::Slave(slave) => slave.port = port, + RedisServer::Master(master) => { + // Create new config with updated port + let new_config = Arc::new(ServerConfig { + port, + dir: master.config.dir.clone(), + dbfilename: master.config.dbfilename.clone(), + }); + master.config = new_config; + } + RedisServer::Slave(slave) => { + let new_config = Arc::new(ServerConfig { + port, + dir: slave.config.dir.clone(), + dbfilename: slave.config.dbfilename.clone(), + }); + slave.config = new_config; + } } } pub fn dir(&self) -> &Option<String> { match self { - RedisServer::Master(master) => &master.dir, - RedisServer::Slave(slave) => &slave.dir, + RedisServer::Master(master) => &master.config.dir, + RedisServer::Slave(slave) => &slave.config.dir, } } pub fn set_dir(&mut self, dir: Option<String>) { match self { - RedisServer::Master(master) => master.dir = dir, - RedisServer::Slave(slave) => slave.dir = dir, + RedisServer::Master(master) => { + let new_config = Arc::new(ServerConfig { + dir, + port: master.config.port.clone(), + dbfilename: master.config.dbfilename.clone(), + }); + master.config = new_config; + } + RedisServer::Slave(slave) => { + let new_config = Arc::new(ServerConfig { + dir, + port: slave.config.port.clone(), + dbfilename: slave.config.dbfilename.clone(), + }); + slave.config = new_config; + } } } pub fn dbfilename(&self) -> &Option<String> { match self { - RedisServer::Master(master) => &master.dbfilename, - RedisServer::Slave(slave) => &slave.dbfilename, + RedisServer::Master(master) => &master.config.dbfilename, + RedisServer::Slave(slave) => &slave.config.dbfilename, } } pub fn set_dbfilename(&mut self, dbfilename: Option<String>) { match self { - RedisServer::Master(master) => master.dbfilename = dbfilename, - RedisServer::Slave(slave) => slave.dbfilename = dbfilename, + RedisServer::Master(master) => { + let new_config = Arc::new(ServerConfig { + dbfilename, + port: master.config.port.clone(), + dir: master.config.dir.clone(), + }); + master.config = new_config; + } + RedisServer::Slave(slave) => { + let new_config = Arc::new(ServerConfig { + dbfilename, + port: slave.config.port.clone(), + dir: slave.config.dir.clone(), + }); + slave.config = new_config; + } } } - pub fn cache(&self) -> &SharedCache { + pub fn cache(&self) -> &SharedMut<Cache> { match self { RedisServer::Master(master) => &master.cache, RedisServer::Slave(slave) => &slave.cache, @@ -290,29 +649,33 @@ impl RedisServer { pub fn repl_offset(&self) -> usize { match self { - RedisServer::Master(master) => *master.current_offset.lock().unwrap(), - RedisServer::Slave(slave) => *slave.master_repl_offset.lock().unwrap(), + RedisServer::Master(master) => master.state.lock().unwrap().current_offset, + RedisServer::Slave(slave) => slave.state.lock().unwrap().master_repl_offset, } } - pub fn set_cache(&mut self, cache: &SharedCache) { + pub fn set_cache(&mut self, cache: &SharedMut<Cache>) { match self { RedisServer::Master(master) => master.cache = cache.clone(), RedisServer::Slave(slave) => slave.cache = cache.clone(), } } - pub fn set_repl_offset(&mut self, repl_offset: &Arc<Mutex<usize>>) { + pub fn set_repl_offset(&mut self, repl_offset: usize) { match self { - RedisServer::Master(master) => master.current_offset = repl_offset.clone(), - RedisServer::Slave(slave) => slave.master_repl_offset = repl_offset.clone(), + RedisServer::Master(master) => { + master.state.lock().unwrap().current_offset = repl_offset; + } + RedisServer::Slave(slave) => { + slave.state.lock().unwrap().master_repl_offset = repl_offset + } } } pub fn repl_offset_increment(&mut self, amount: usize) { match self { - RedisServer::Master(master) => *master.current_offset.lock().unwrap() += amount, - RedisServer::Slave(slave) => *slave.master_repl_offset.lock().unwrap() += amount, + RedisServer::Master(master) => master.state.lock().unwrap().current_offset += amount, + RedisServer::Slave(slave) => slave.state.lock().unwrap().master_repl_offset += amount, } } @@ -326,12 +689,9 @@ impl RedisServer { pub fn add_replica(&mut self, replica_adr: SocketAddr, connection: Arc<Mutex<TcpStream>>) { match self { // TODO: Should probably add host to MasterServer and SlaveServer as member field - RedisServer::Master(master) => master.replicas.push(SlaveServer::new( - replica_adr.port().to_string(), - "localhost".to_owned(), - master.port().to_owned(), - Some(connection), - )), + RedisServer::Master(master) => { + master.add_replica(replica_adr, connection); + } RedisServer::Slave(_) => { unreachable!("Slaves don't have replicas") } |
