diff options
Diffstat (limited to 'src/resp_commands.rs')
| -rw-r--r-- | src/resp_commands.rs | 122 |
1 files changed, 60 insertions, 62 deletions
diff --git a/src/resp_commands.rs b/src/resp_commands.rs index 453024c..981f7aa 100644 --- a/src/resp_commands.rs +++ b/src/resp_commands.rs @@ -1,7 +1,6 @@ use crate::server::*; use crate::{resp_parser::*, shared_cache::*}; use regex::Regex; -use std::sync::{Arc, Mutex}; use std::time::{SystemTime, UNIX_EPOCH}; #[derive(Debug, Clone)] @@ -110,7 +109,7 @@ fn extract_string(resp: &RespType) -> Option<String> { } } -pub enum RedisCommands { +pub enum RedisCommand { Ping, Echo(String), Get(String), @@ -123,15 +122,20 @@ pub enum RedisCommands { Invalid, } -impl RedisCommands { - pub fn execute(self, server: Arc<Mutex<RedisServer>>) -> Vec<u8> { - use RedisCommands as RC; +impl RedisCommand { + pub fn execute<'a>( + self, + cache: SharedMut<Cache>, + config: Shared<ServerConfig>, + server_state: ServerState, + broadcaster: Option<SharedMut<&mut dyn CanBroadcast>>, + ) -> Vec<u8> { + use RedisCommand as RC; match self { RC::Ping => resp_bytes!("PONG"), RC::Echo(echo_string) => resp_bytes!(echo_string), RC::Get(key) => { - let server = server.lock().unwrap(); - let mut cache = server.cache().lock().unwrap(); + let mut cache = cache.lock().unwrap(); match cache.get(&key).cloned() { Some(entry) => { @@ -146,8 +150,7 @@ impl RedisCommands { } } RC::Set(command) => { - let mut server = server.lock().unwrap(); - let mut cache = server.cache().lock().unwrap(); + let mut cache = cache.lock().unwrap(); // Check conditions (NX/XX) let key_exists = cache.contains_key(&command.key); @@ -190,17 +193,24 @@ impl RedisCommands { }, ); - // Broadcast SET to replicas after mutating local state - let broadcast_cmd = resp_bytes!(array => [ - resp!(bulk "SET"), - resp!(bulk command.key), - resp!(bulk command.value) - ]); - - // Unlock the mutex so that I can access to broadcast the messaage - drop(cache); + println!( + "My role is {:?} and I just inserted {} to {}", + server_state.role, &command.key, command.value + ); - server.broadcast_command(&broadcast_cmd); + // Broadcast to replicas if this is a master + if let Some(broadcaster) = broadcaster { + // Broadcast SET to replicas after mutating local state + let broadcast_cmd = resp_bytes!(array => [ + resp!(bulk "SET"), + resp!(bulk command.key), + resp!(bulk command.value) + ]); + broadcaster + .lock() + .unwrap() + .broadcast_command_to_replicas(&broadcast_cmd); + } if !command.get_old_value { return resp_bytes!("OK"); @@ -213,8 +223,9 @@ impl RedisCommands { } RC::ConfigGet(s) => { use RespType as RT; - let server = server.lock().unwrap(); - if let (Some(dir), Some(dbfilename)) = (server.dir(), server.dbfilename()) { + if let (Some(dir), Some(dbfilename)) = + (config.dir.clone(), config.dbfilename.clone()) + { match s.as_str() { "dir" => RT::Array(vec![ RT::BulkString(s.as_bytes().to_vec()), @@ -236,9 +247,7 @@ impl RedisCommands { use RespType as RT; let query = query.replace('*', ".*"); - - let server = server.lock().unwrap(); - let cache = server.cache().lock().unwrap(); + let cache = cache.lock().unwrap(); let regex = Regex::new(&query).unwrap(); let matching_keys: Vec<RT> = cache .keys() @@ -252,53 +261,42 @@ impl RedisCommands { } RC::Info(_sub_command) => { use RespType as RT; - let server = server.lock().unwrap(); - match &*server { - RedisServer::Master(master) => { - let response = format!( - "# Replication\r\nrole:{}master_replid:{}master_repl_offset:{}", - server.role(), - master.replid.clone().unwrap_or("".to_string()), - master.current_offset.lock().unwrap(), - ) - .as_bytes() - .to_vec(); - RT::BulkString(response).to_resp_bytes() - } - RedisServer::Slave(slave) => { - let response = format!( - "# Replication\r\nrole:{}master_replid:{}master_repl_offset:{}", - server.role(), - slave.master_replid.clone().unwrap_or("".to_string()), - slave.master_repl_offset.lock().unwrap() - ) - .as_bytes() - .to_vec(); - RT::BulkString(response).to_resp_bytes() - } - } + let role_str = match server_state.role { + ServerRole::Master => "master", + ServerRole::Slave => "slave", + }; + + let response = format!( + "# Replication\r\nrole:{}\r\nmaster_replid:{}\r\nmaster_repl_offset:{}", + role_str, server_state.repl_id, server_state.repl_offset, + ) + .into_bytes(); + + RT::BulkString(response).to_resp_bytes() } RC::ReplConf((op1, op2)) => match (op1.to_uppercase().as_str(), op2.as_str()) { ("GETACK", "*") => { println!("Did i even get here?"); - resp_bytes!(array => [resp!(bulk "REPLCONF"), resp!(bulk "ACK"), resp!(bulk server.lock().unwrap().repl_offset().to_string())]) + resp_bytes!(array => [resp!(bulk "REPLCONF"), resp!(bulk "ACK"), resp!(bulk server_state.repl_offset.to_string())]) } _ => resp_bytes!("OK"), }, RC::Psync((_, _)) => { - let server = server.lock().unwrap(); - if let RedisServer::Master(master) = &*server { - let response = format!( - "FULLRESYNC {} 0", - master.replid.clone().unwrap_or("".to_string()), - ); - resp_bytes!(response) - } else { - // TODO: Find a way to report this error back up the program trace - unreachable!("I should never come here") + // This should only be called on masters + match server_state.role { + ServerRole::Master => { + let response = format!("FULLRESYNC {} 0", server_state.repl_id); + resp_bytes!(response) + } + // This shouldn't happen, but handle gracefully + ServerRole::Slave => { + // TODO: I actually forgot that you could send error messages with redis do + // more of this across the whole codebase when it makes sense + resp_bytes!(error "ERR PSYNC not supported on slave") + } } } - RC::Invalid => todo!(), + RC::Invalid => resp_bytes!(error "ERR Invalid Command"), } } } @@ -403,7 +401,7 @@ impl SetOptionParser { } } -impl From<RespType> for RedisCommands { +impl From<RespType> for RedisCommand { fn from(value: RespType) -> Self { // Alternative approach using a more functional style with iterators let RespType::Array(command) = value else { |
