use crate::server::*; use crate::{resp_parser::*, shared_cache::*}; use regex::Regex; use std::time::{SystemTime, UNIX_EPOCH}; #[derive(Debug, Clone)] pub enum SetCondition { /// NX - only set if key doesn't exists NotExists, /// XX - only set if key already exists Exists, } #[derive(Debug, Clone)] pub enum ExpiryOption { /// EX seconds - expire in N seconds Seconds(u64), /// PX milliseconds - expire in N milliseconds Milliseconds(u64), /// EXAT timestamp-seconds - expire at Unix timestamp (seconds) ExpiresAtSeconds(u64), /// PXAT timestamp-milliseconds - expire at Unix timestamp (milliseconds) ExpiresAtMilliseconds(u64), /// KEEPTTL - retain existing TTL KeepTtl, } /// Link: https://redis.io/docs/latest/commands/set/ /// Syntax: /// ------- /// SET key value [NX | XX] [GET] [EX seconds | PX milliseconds | /// EXAT unix-time-seconds | PXAT unix-time-milliseconds | KEEPTTL] /// /// Options: /// -------- /// EX seconds -- Set the specified expire time, in seconds (a positive integer). /// PX milliseconds -- Set the specified expire time, in milliseconds (a positive integer). /// EXAT timestamp-seconds -- Set the specified Unix time at which the key will expire, in seconds (a positive integer). /// PXAT timestamp-milliseconds -- Set the specified Unix time at which the key will expire, in milliseconds (a positive integer). /// NX -- Only set the key if it does not already exist. /// XX -- Only set the key if it already exists. /// KEEPTTL -- Retain the time to live associated with the key. /// GET -- Return the old string stored at key, or nil if key did not exist. An error is returned and SET aborted if the value stored at key is not a string. #[derive(Debug, Clone)] pub struct SetCommand { pub key: String, pub value: String, pub condition: Option, pub expiry: Option, pub get_old_value: bool, } impl SetCommand { pub fn new(key: String, value: String) -> Self { Self { key, value, condition: None, expiry: None, get_old_value: false, } } pub fn with_condition(mut self, condition: Option) -> Self { self.condition = condition; self } pub fn with_expiry(mut self, expiry: Option) -> Self { self.expiry = expiry; self } pub fn with_get(mut self, value: bool) -> Self { self.get_old_value = value; self } /// Calculate the absolute expiry time in milliseconds since Unix epoch pub fn calculate_expiry_time(&self) -> Option { match &self.expiry { Some(ExpiryOption::Seconds(secs)) => { let now = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_millis() as u64; Some(now + (secs * 1000)) } Some(ExpiryOption::Milliseconds(ms)) => { let now = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_millis() as u64; Some(now + ms) } Some(ExpiryOption::ExpiresAtSeconds(timestamp)) => Some(timestamp * 1000), Some(ExpiryOption::ExpiresAtMilliseconds(timestamp)) => Some(*timestamp), Some(ExpiryOption::KeepTtl) => None, // Handled specially None => None, } } } // Helper function to extract string from BulkString fn extract_string(resp: &RespType) -> Option { match resp { RespType::BulkString(bytes) => str::from_utf8(bytes).ok().map(|s| s.to_owned()), _ => None, } } pub enum RedisCommand { Ping, Echo(String), Get(String), Set(SetCommand), ConfigGet(String), Keys(String), Info(String), ReplConf((String, String)), Psync((String, String)), Invalid, } impl RedisCommand { pub fn execute<'a>( self, cache: SharedMut, config: Shared, server_state: ServerState, broadcaster: Option>, ) -> Vec { use RedisCommand as RC; match self { RC::Ping => resp_bytes!("PONG"), RC::Echo(echo_string) => resp_bytes!(echo_string), RC::Get(key) => { let mut cache = cache.lock().unwrap(); match cache.get(&key).cloned() { Some(entry) => { if entry.is_expired() { cache.remove(&key); // Clean up expired key resp_bytes!(null) } else { resp_bytes!(bulk entry.value) } } None => resp_bytes!(null), } } RC::Set(command) => { let mut cache = cache.lock().unwrap(); // Check conditions (NX/XX) let key_exists = cache.contains_key(&command.key); match command.condition { Some(SetCondition::NotExists) if key_exists => { return resp_bytes!(null); // Key exists, NX failed } Some(SetCondition::Exists) if !key_exists => { return resp_bytes!(null); // Key doesn't exist, XX failed } _ => {} // No condition or condition met } let mut get_value: Option = None; // Handle GET option if command.get_old_value { match cache.get(&command.key) { Some(val) => get_value = Some(val.value.clone()), None => {} } } else { } // Calculate expiry let expires_at = if let Some(ExpiryOption::KeepTtl) = command.expiry { // Keep existing TTL cache.get(&command.key).and_then(|e| e.expires_at) } else { command.calculate_expiry_time() }; // Set the value cache.insert( command.key.clone(), CacheEntry { value: command.value.clone(), expires_at, }, ); println!( "My role is {:?} and I just inserted {} to {}", server_state.role, &command.key, command.value ); // Broadcast to replicas if this is a master if let Some(broadcaster) = broadcaster { // Broadcast SET to replicas after mutating local state let broadcast_cmd = resp_bytes!(array => [ resp!(bulk "SET"), resp!(bulk command.key), resp!(bulk command.value) ]); broadcaster .lock() .unwrap() .broadcast_command_to_replicas(&broadcast_cmd); } if !command.get_old_value { return resp_bytes!("OK"); } match get_value { Some(val) => return resp_bytes!(bulk val), None => return resp_bytes!(null), } } RC::ConfigGet(s) => { use RespType as RT; if let (Some(dir), Some(dbfilename)) = (config.dir.clone(), config.dbfilename.clone()) { match s.as_str() { "dir" => RT::Array(vec![ RT::BulkString(s.as_bytes().to_vec()), RT::BulkString(dir.as_bytes().to_vec()), ]) .to_resp_bytes(), "dbfilename" => RT::Array(vec![ RT::BulkString(s.as_bytes().to_vec()), RT::BulkString(dbfilename.as_bytes().to_vec()), ]) .to_resp_bytes(), _ => unreachable!(), } } else { unreachable!() } } RC::Keys(query) => { use RespType as RT; let query = query.replace('*', ".*"); let cache = cache.lock().unwrap(); let regex = Regex::new(&query).unwrap(); let matching_keys: Vec = cache .keys() .filter_map(|key| { regex .is_match(key) .then(|| RT::BulkString(key.as_bytes().to_vec())) }) .collect(); RT::Array(matching_keys).to_resp_bytes() } RC::Info(_sub_command) => { use RespType as RT; let role_str = match server_state.role { ServerRole::Master => "master", ServerRole::Slave => "slave", }; let response = format!( "# Replication\r\nrole:{}\r\nmaster_replid:{}\r\nmaster_repl_offset:{}", role_str, server_state.repl_id, server_state.repl_offset, ) .into_bytes(); RT::BulkString(response).to_resp_bytes() } RC::ReplConf((op1, op2)) => match (op1.to_uppercase().as_str(), op2.as_str()) { ("GETACK", "*") => { println!("Did i even get here?"); resp_bytes!(array => [resp!(bulk "REPLCONF"), resp!(bulk "ACK"), resp!(bulk server_state.repl_offset.to_string())]) } _ => resp_bytes!("OK"), }, RC::Psync((_, _)) => { // This should only be called on masters match server_state.role { ServerRole::Master => { let response = format!("FULLRESYNC {} 0", server_state.repl_id); resp_bytes!(response) } // This shouldn't happen, but handle gracefully ServerRole::Slave => { // TODO: I actually forgot that you could send error messages with redis do // more of this across the whole codebase when it makes sense resp_bytes!(error "ERR PSYNC not supported on slave") } } } RC::Invalid => resp_bytes!(error "ERR Invalid Command"), } } } // Parser for SET command options struct SetOptionParser { command: SetCommand, } impl SetOptionParser { fn new(key: String, value: String) -> Self { Self { command: SetCommand::new(key, value), } } fn parse_option(&mut self, option: &str, next_arg: Option<&str>) -> Result { match option.to_ascii_uppercase().as_str() { "GET" => { self.command = self.command.clone().with_get(true); Ok(false) // doesn't consume next argument } "NX" => { self.command = self .command .clone() .with_condition(Some(SetCondition::NotExists)); Ok(false) } "XX" => { self.command = self .command .clone() .with_condition(Some(SetCondition::Exists)); Ok(false) } "KEEPTTL" => { self.command = self .command .clone() .with_expiry(Some(ExpiryOption::KeepTtl)); Ok(false) } "EX" => { let seconds = next_arg .ok_or("EX requires a value")? .parse::() .map_err(|_| "Invalid EX value")?; self.command = self .command .clone() .with_expiry(Some(ExpiryOption::Seconds(seconds))); Ok(true) // consumes next argument } "PX" => { let ms = next_arg .ok_or("PX requires a value")? .parse::() .map_err(|_| "Invalid PX value")?; self.command = self .command .clone() .with_expiry(Some(ExpiryOption::Milliseconds(ms))); Ok(true) } "EXAT" => { let timestamp = next_arg .ok_or("EXAT requires a value")? .parse::() .map_err(|_| "Invalid EXAT value")?; self.command = self .command .clone() .with_expiry(Some(ExpiryOption::ExpiresAtSeconds(timestamp))); Ok(true) } "PXAT" => { let timestamp = next_arg .ok_or("PXAT requires a value")? .parse::() .map_err(|_| "Invalid PXAT value")?; self.command = self .command .clone() .with_expiry(Some(ExpiryOption::ExpiresAtMilliseconds(timestamp))); Ok(true) } _ => Err("Unknown SET option"), } } fn parse_options(mut self, options: &[String]) -> Result { let mut i = 0; while i < options.len() { let option = &options[i]; let next_arg = options.get(i + 1).map(|s| s.as_str()); let consumes_next = self.parse_option(option, next_arg)?; i += if consumes_next { 2 } else { 1 }; } Ok(self.command) } } impl From for RedisCommand { fn from(value: RespType) -> Self { // Alternative approach using a more functional style with iterators let RespType::Array(command) = value else { return Self::Invalid; }; let mut args = command.iter().filter_map(extract_string); let Some(cmd_name) = args.next() else { return Self::Invalid; }; match cmd_name.to_ascii_uppercase().as_str() { "PING" => { if args.next().is_none() { Self::Ping } else { Self::Invalid } } "ECHO" => match (args.next(), args.next()) { (Some(echo_string), None) => Self::Echo(echo_string), _ => Self::Invalid, }, "GET" => match (args.next(), args.next()) { (Some(key), None) => Self::Get(key), _ => Self::Invalid, }, "SET" => { let Some(key) = args.next() else { return Self::Invalid; }; let Some(value) = args.next() else { return Self::Invalid; }; let options: Vec = args.collect(); if options.is_empty() { Self::Set(SetCommand::new(key, value)) } else { let parser = SetOptionParser::new(key, value); match parser.parse_options(&options) { Ok(set_command) => Self::Set(set_command), Err(_) => Self::Invalid, } } } "KEYS" => { let Some(query) = args.next() else { return Self::Invalid; }; Self::Keys(query) } "CONFIG" => { let Some(sub_command) = args.next() else { return Self::Invalid; }; let Some(key) = args.next() else { return Self::Invalid; }; if &sub_command.to_uppercase() == &"GET" { return Self::ConfigGet(key); } Self::Invalid } "INFO" => { let Some(sub_command) = args.next() else { return Self::Invalid; }; if &sub_command.to_uppercase() == &"REPLICATION" { return Self::Info(sub_command); } Self::Invalid } "REPLCONF" => { let Some(op1) = args.next() else { return Self::Invalid; }; let Some(op2) = args.next() else { return Self::Invalid; }; println!("Hello from here"); Self::ReplConf((op1, op2)) } "PSYNC" => { let Some(repl_id) = args.next() else { return Self::Invalid; }; let Some(repl_offset) = args.next() else { return Self::Invalid; }; Self::Psync((repl_id, repl_offset)) } _ => Self::Invalid, } } }