diff options
| author | omagdy <omar.professional8777@gmail.com> | 2025-07-16 22:08:45 +0300 |
|---|---|---|
| committer | omagdy <omar.professional8777@gmail.com> | 2025-07-16 22:08:45 +0300 |
| commit | e814746602adf42369abf91882d03b3e16c7c7f0 (patch) | |
| tree | d9e1c35e80dcbb41d98e3c7e21ec88f9805a5a6f | |
| parent | 18b7911c656b531fc5d7fe15245e765951f3e65e (diff) | |
| download | redis-rust-e814746602adf42369abf91882d03b3e16c7c7f0.tar.xz redis-rust-e814746602adf42369abf91882d03b3e16c7c7f0.zip | |
feat: Added support for simple commands like PING, ECHO, GET and SET
- Implemented a simple parsing logic for parsing commmands now
- Created a HashMap that will act as our storage and shared it across
threads using `Arc<Mutex<()>>` magic
- Wrote some macros to make instantiating RESP types eaiser for myself
| -rw-r--r-- | src/main.rs | 16 | ||||
| -rw-r--r-- | src/resp_commands.rs | 328 | ||||
| -rw-r--r-- | src/resp_parser.rs | 49 |
3 files changed, 351 insertions, 42 deletions
diff --git a/src/main.rs b/src/main.rs index 7ddd478..a382f15 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,17 +1,21 @@ #![allow(unused_imports)] use std::{ + collections::HashMap, io::{Read, Write}, net::{TcpListener, TcpStream}, + sync::{Arc, Mutex}, thread, }; mod resp_commands; mod resp_parser; -use resp_commands::RespCommands; -use resp_parser::parse; +use resp_commands::RedisCommands; +use resp_parser::{parse, RespType}; -fn handle_client(mut stream: TcpStream) { +pub type SharedCache = Arc<Mutex<HashMap<String, String>>>; + +fn handle_client(mut stream: TcpStream, cache: SharedCache) { let mut buffer = [0; 512]; loop { let bytes_read = match stream.read(&mut buffer) { @@ -21,7 +25,7 @@ fn handle_client(mut stream: TcpStream) { }; let parsed_resp = parse(&buffer).unwrap(); - let response = RespCommands::from(parsed_resp.0).execute(); + let response = RedisCommands::from(parsed_resp.0).execute(cache.clone()); // Hardcode PONG response for now stream.write(&response).unwrap(); @@ -35,12 +39,14 @@ fn handle_client(mut stream: TcpStream) { fn main() -> std::io::Result<()> { let listener = TcpListener::bind("127.0.0.1:6379").unwrap(); + let cache: SharedCache = Arc::new(Mutex::new(HashMap::new())); for stream in listener.incoming() { match stream { Ok(stream) => { + let cache_clone = cache.clone(); thread::spawn(|| { - handle_client(stream); + handle_client(stream, cache_clone); }); } Err(e) => { diff --git a/src/resp_commands.rs b/src/resp_commands.rs index 82c0079..e4861a8 100644 --- a/src/resp_commands.rs +++ b/src/resp_commands.rs @@ -1,58 +1,314 @@ -use crate::resp_parser::*; +use crate::{resp_parser::*, SharedCache}; +use std::collections::{HashMap, HashSet}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; -pub enum RespCommands { +#[macro_export] +macro_rules! resp { + // Null: resp!(null) + (null) => { + $crate::RespType::Null().to_resp_bytes() + }; + + // Simple String: resp!("PONG") or resp!(simple "PONG") + (simple $s:expr) => { + $crate::RespType::SimpleString($s.to_string()).to_resp_bytes() + }; + ($s:expr) => { + $crate::RespType::SimpleString($s.to_string()).to_resp_bytes() + }; + + // Simple Error: resp!(error "ERR message") + (error $s:expr) => { + $crate::RespType::SimpleError($s.to_string()).to_resp_bytes() + }; + + // Integer: resp!(int 123) + (int $i:expr) => { + $crate::RespType::Integer($i).to_resp_bytes() + }; + + // Bulk String: resp!(bulk "hello") or resp!(bulk vec![104, 101, 108, 108, 111]) + (bulk $s:expr) => { + $crate::RespType::BulkString($s.into()).to_resp_bytes() + }; + + // Array: resp!(array [resp!("one"), resp!(int 2)]) + (array [$($elem:expr),*]) => { + $crate::RespType::Array(vec![$($elem),*]).to_resp_bytes() + }; + + // Boolean: resp!(bool true) + (bool $b:expr) => { + $crate::RespType::Boolean($b).to_resp_bytes() + }; + + // Double: resp!(double 3.14) + (double $d:expr) => { + $crate::RespType::Doubles($d).to_resp_bytes() + }; + + // Big Number: resp!(bignumber "123456789") + (bignumber $n:expr) => { + $crate::RespType::BigNumbers($n.to_string()).to_resp_bytes() + }; + + // Bulk Error: resp!(bulkerror [resp!("err1"), resp!("err2")]) + (bulkerror [$($elem:expr),*]) => { + $crate::RespType::BulkErrors(vec![$($elem),*]).to_resp_bytes() + }; + + // Verbatim String: resp!(verbatim [resp!("txt"), resp!("example")]) + (verbatim [$($elem:expr),*]) => { + $crate::RespType::VerbatimStrings(vec![$($elem),*]).to_resp_bytes() + }; + + // Map: resp!(map {resp!("key") => resp!("value")}) + (map {$($key:expr => $value:expr),*}) => { + $crate::RespType::Maps({ + let mut map = HashMap::new(); + $(map.insert($key, $value);)* + map + }).to_resp_bytes() + }; + + // Attributes: resp!(attributes [resp!("key"), resp!("value")]) + (attributes [$($elem:expr),*]) => { + $crate::RespType::Attributes(vec![$($elem),*]).to_resp_bytes() + }; + + // Set: resp!(set [resp!("one"), resp!("two")]) + (set [$($elem:expr),*]) => { + $crate::RespType::Sets({ + let mut set = HashSet::new(); + $(set.insert($elem);)* + set + }).to_resp_bytes() + }; + + // Push: resp!(push [resp!("event"), resp!("data")]) + (push [$($elem:expr),*]) => { + $crate::RespType::Pushes(vec![$($elem),*]).to_resp_bytes() + }; +} + +enum SetCondition { + /// NX - only set if key doesn't exists + NotExists, + /// XX - only set if key already exists + Exists, +} + +#[derive(Debug, Clone)] +pub enum ExpiryOption { + /// EX seconds - expire in N seconds + Seconds(u64), + /// PX milliseconds - expire in N milliseconds + Milliseconds(u64), + /// EXAT timestamp-seconds - expire at Unix timestamp (seconds) + ExpiresAtSeconds(u64), + /// PXAT timestamp-milliseconds - expire at Unix timestamp (milliseconds) + ExpiresAtMilliseconds(u64), + /// KEEPTTL - retain existing TTL + KeepTtl, +} + +/// Link: https://redis.io/docs/latest/commands/set/ +/// Syntax: +/// ------- +/// SET key value [NX | XX] [GET] [EX seconds | PX milliseconds | +/// EXAT unix-time-seconds | PXAT unix-time-milliseconds | KEEPTTL] +/// +/// Options: +/// -------- +/// EX seconds -- Set the specified expire time, in seconds (a positive integer). +/// PX milliseconds -- Set the specified expire time, in milliseconds (a positive integer). +/// EXAT timestamp-seconds -- Set the specified Unix time at which the key will expire, in seconds (a positive integer). +/// PXAT timestamp-milliseconds -- Set the specified Unix time at which the key will expire, in milliseconds (a positive integer). +/// NX -- Only set the key if it does not already exist. +/// XX -- Only set the key if it already exists. +/// KEEPTTL -- Retain the time to live associated with the key. +/// GET -- Return the old string stored at key, or nil if key did not exist. An error is returned and SET aborted if the value stored at key is not a string. + +pub struct SetCommand { + key: String, + value: String, + condition: Option<SetCondition>, + expiry: Option<ExpiryOption>, + get_old_value: bool, +} + +#[derive(Debug, Clone)] +pub enum SetResult { + /// Key was set successfully + Ok, + /// Key was set and old value returned (when GET option used) + OkWithOldValue(String), + /// Operation aborted due to condition (NX/XX conflict) + Aborted, + /// GET option used but key didn't exist + AbortedNoOldValue, +} + +impl SetCommand { + pub fn new(key: String, value: String) -> Self { + Self { + key, + value, + condition: None, + expiry: None, + get_old_value: false, + } + } + + pub fn with_condition(mut self, condition: SetCondition) -> Self { + self.condition = Some(condition); + self + } + + pub fn with_expiry(mut self, expiry: ExpiryOption) -> Self { + self.expiry = Some(expiry); + self + } + + pub fn with_get(mut self) -> Self { + self.get_old_value = true; + self + } + + /// Calculate the absolute expiry time in milliseconds since Unix epoch + pub fn calculate_expiry_time(&self) -> Option<u64> { + match &self.expiry { + Some(ExpiryOption::Seconds(secs)) => { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + Some(now + (secs * 1000)) + } + Some(ExpiryOption::Milliseconds(ms)) => { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + Some(now + ms) + } + Some(ExpiryOption::ExpiresAtSeconds(timestamp)) => Some(timestamp * 1000), + Some(ExpiryOption::ExpiresAtMilliseconds(timestamp)) => Some(*timestamp), + Some(ExpiryOption::KeepTtl) => None, // Handled specially + None => None, + } + } +} + +pub enum RedisCommands { PING, ECHO(String), GET(String), - SET(String), + SET(SetCommand), Invalid, } -impl RespCommands { - pub fn execute(self) -> Vec<u8> { +impl RedisCommands { + pub fn execute(self, cache: SharedCache) -> Vec<u8> { match self { - RespCommands::PING => b"+PONG\r\n".to_vec(), - RespCommands::ECHO(echo_string) => echo_string.into_bytes(), - RespCommands::GET(_) => todo!(), - RespCommands::SET(_) => todo!(), - RespCommands::Invalid => todo!(), + RedisCommands::PING => resp!("PONG"), + RedisCommands::ECHO(echo_string) => resp!(echo_string), + RedisCommands::GET(key) => { + let cache = cache.lock().unwrap(); + match cache.get(&key).cloned() { + Some(val) => resp!(val), + None => resp!(null), + } + } + RedisCommands::SET(command) => { + let mut cache = cache.lock().unwrap(); + cache.insert(command.key.clone(), command.value.clone()); + resp!("OK") + } + RedisCommands::Invalid => todo!(), } } } -impl From<RespType> for RespCommands { +impl From<RespType> for RedisCommands { fn from(value: RespType) -> Self { match value { - RespType::Array(vec) if vec.len() > 1 => match (&vec[0], &vec[1]) { - (RespType::BulkString(command), RespType::BulkString(argument)) => { - if let Ok(command) = str::from_utf8(&command) { - match command { - "PING" => Self::PING, - "ECHO" => Self::ECHO(format!( - "+{}\r\n", - String::from_utf8(argument.clone()).unwrap() - )), - _ => Self::Invalid, + RespType::Array(command) => { + let length = command.len(); + match length { + // Probably PING + 1 => { + if let RespType::BulkString(command_name) = command[0].clone() { + if command_name == b"PING" { + return Self::PING; + } else { + // TODO: Handle the case where it's another command with + // insufficient arugments + return Self::Invalid; + } } - } else { - Self::Invalid + return Self::Invalid; } - } - _ => todo!(), - }, - RespType::Array(vec) => match &vec[0] { - RespType::BulkString(command) => { - if let Ok(command) = str::from_utf8(&command) { - match command { - "PING" => Self::PING, - _ => Self::Invalid, + // Probably GET or ECHO + 2 => { + if let (RespType::BulkString(command_name), RespType::BulkString(key)) = + (command[0].clone(), command[1].clone()) + { + if command_name == b"GET" { + return Self::GET(str::from_utf8(&key).unwrap().to_owned()); + } else if command_name == b"ECHO" { + return Self::ECHO(str::from_utf8(&key).unwrap().to_owned()); + } else { + // TODO: Handle the case where it's another command with + // insufficient arugments + return Self::Invalid; + } + } + return Self::Invalid; + } + // Probably SET wit key and value + 3 => { + if let ( + RespType::BulkString(command_name), + RespType::BulkString(key), + RespType::BulkString(value), + ) = (command[0].clone(), command[1].clone(), command[2].clone()) + { + if command_name == b"SET" { + let set_command = SetCommand::new( + str::from_utf8(&key).unwrap().to_owned(), + str::from_utf8(&value).unwrap().to_owned(), + ); + return Self::SET(set_command); + } else { + // TODO: Handle the case where it's another command with + // insufficient arugments + return Self::Invalid; + } } - } else { - Self::Invalid + return Self::Invalid; + } + // Probably SET wit key and value and [NX | XX] + 4 => { + todo!() + } + // Probably SET wit key and value and [NX | XX] and possibly [GET] + 5 => { + todo!() + } + // Probably SET wit key and value and [NX | XX] and possibly [GET] and that + // other plethora of expiry options + 6 => { + todo!() + } + 7 => { + todo!() + } + _ => { + todo!() } } - _ => Self::Invalid, - }, + } _ => todo!(), } } diff --git a/src/resp_parser.rs b/src/resp_parser.rs index a8400d3..bbea0ea 100644 --- a/src/resp_parser.rs +++ b/src/resp_parser.rs @@ -495,7 +495,7 @@ fn parse_pushes() { todo!() } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum RespType { SimpleString(String), // + SimpleError(String), // - @@ -514,6 +514,53 @@ pub enum RespType { Pushes(Vec<RespType>), // > } +impl RespType { + pub fn to_resp_bytes(&self) -> Vec<u8> { + match self { + RespType::SimpleString(s) => format!("+{}\r\n", s).into_bytes(), + RespType::SimpleError(s) => format!("-{}\r\n", s).into_bytes(), + RespType::Integer(i) => format!(":{}\r\n", i).into_bytes(), + RespType::BulkString(bytes) => { + let len = bytes.len(); + let s = String::from_utf8_lossy(bytes); + format!("${}\r\n{}\r\n", len, s).into_bytes() + } + RespType::Array(arr) => { + let len = arr.len(); + let elements = arr + .iter() + .map(|e| e.to_resp_bytes()) + .collect::<Vec<Vec<u8>>>(); + // TODO: Implement proper Display for elements because this will definitely not + // work + format!("*{:?}\r\n{:?}", len, elements).into_bytes() + } + RespType::Null() => b"_\r\n".into(), + RespType::Boolean(b) => format!("#{}\r\n", if *b { "t" } else { "f" }).into_bytes(), + RespType::Doubles(d) => format!(",{}\r\n", d).into_bytes(), + RespType::BigNumbers(n) => format!("({}\r\n", n).into_bytes(), + RespType::BulkErrors(errors) => { + todo!() + } + RespType::VerbatimStrings(strings) => { + todo!() + } + RespType::Maps(map) => { + todo!() + } + RespType::Attributes(attrs) => { + todo!() + } + RespType::Sets(set) => { + todo!() + } + RespType::Pushes(pushes) => { + todo!() + } + } + } +} + impl PartialEq for RespType { fn eq(&self, other: &Self) -> bool { match (self, other) { |
