aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authoromagdy <omar.professional8777@gmail.com>2025-07-17 01:56:57 +0300
committeromagdy <omar.professional8777@gmail.com>2025-07-17 01:56:57 +0300
commitf13620eb1aa03ed31cb79c999f40d1af048b2656 (patch)
tree13b4db7054f9df401290d2b769fa42a11f7a1d18 /src
parente814746602adf42369abf91882d03b3e16c7c7f0 (diff)
downloadredis-rust-f13620eb1aa03ed31cb79c999f40d1af048b2656.tar.xz
redis-rust-f13620eb1aa03ed31cb79c999f40d1af048b2656.zip
feat: Added expiry options to SET command
Diffstat (limited to 'src')
-rw-r--r--src/main.rs32
-rw-r--r--src/resp_commands.rs190
-rw-r--r--src/resp_parser.rs3
3 files changed, 187 insertions, 38 deletions
diff --git a/src/main.rs b/src/main.rs
index a382f15..1dddb46 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -5,6 +5,7 @@ use std::{
net::{TcpListener, TcpStream},
sync::{Arc, Mutex},
thread,
+ time::{SystemTime, UNIX_EPOCH},
};
mod resp_commands;
@@ -13,12 +14,32 @@ mod resp_parser;
use resp_commands::RedisCommands;
use resp_parser::{parse, RespType};
-pub type SharedCache = Arc<Mutex<HashMap<String, String>>>;
+#[derive(Debug, Clone)]
+pub struct CacheEntry {
+ pub value: String,
+ pub expires_at: Option<u64>, // Unix timestamp in milliseconds
+}
+
+impl CacheEntry {
+ pub fn is_expired(&self) -> bool {
+ if let Some(expiry) = self.expires_at {
+ let now = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap()
+ .as_millis() as u64;
+ now > expiry
+ } else {
+ false
+ }
+ }
+}
+
+pub type SharedCache = Arc<Mutex<HashMap<String, CacheEntry>>>;
fn handle_client(mut stream: TcpStream, cache: SharedCache) {
let mut buffer = [0; 512];
loop {
- let bytes_read = match stream.read(&mut buffer) {
+ let _ = match stream.read(&mut buffer) {
Ok(0) => return, // connection closed
Ok(n) => n,
Err(_) => return, // error occurred
@@ -27,13 +48,8 @@ fn handle_client(mut stream: TcpStream, cache: SharedCache) {
let parsed_resp = parse(&buffer).unwrap();
let response = RedisCommands::from(parsed_resp.0).execute(cache.clone());
- // Hardcode PONG response for now
+ // write respose back to the client
stream.write(&response).unwrap();
-
- // Echo the message back
- // if let Err(_) = stream.write_all(&buffer[..bytes_read]) {
- // return; // writing failed
- // }
}
}
diff --git a/src/resp_commands.rs b/src/resp_commands.rs
index e4861a8..d7fe780 100644
--- a/src/resp_commands.rs
+++ b/src/resp_commands.rs
@@ -1,3 +1,4 @@
+use crate::CacheEntry;
use crate::{resp_parser::*, SharedCache};
use std::collections::{HashMap, HashSet};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
@@ -137,18 +138,6 @@ pub struct SetCommand {
get_old_value: bool,
}
-#[derive(Debug, Clone)]
-pub enum SetResult {
- /// Key was set successfully
- Ok,
- /// Key was set and old value returned (when GET option used)
- OkWithOldValue(String),
- /// Operation aborted due to condition (NX/XX conflict)
- Aborted,
- /// GET option used but key didn't exist
- AbortedNoOldValue,
-}
-
impl SetCommand {
pub fn new(key: String, value: String) -> Self {
Self {
@@ -160,18 +149,18 @@ impl SetCommand {
}
}
- pub fn with_condition(mut self, condition: SetCondition) -> Self {
- self.condition = Some(condition);
+ pub fn with_condition(mut self, condition: Option<SetCondition>) -> Self {
+ self.condition = condition;
self
}
- pub fn with_expiry(mut self, expiry: ExpiryOption) -> Self {
- self.expiry = Some(expiry);
+ pub fn with_expiry(mut self, expiry: Option<ExpiryOption>) -> Self {
+ self.expiry = expiry;
self
}
- pub fn with_get(mut self) -> Self {
- self.get_old_value = true;
+ pub fn with_get(mut self, value: bool) -> Self {
+ self.get_old_value = value;
self
}
@@ -214,16 +203,63 @@ impl RedisCommands {
RedisCommands::PING => resp!("PONG"),
RedisCommands::ECHO(echo_string) => resp!(echo_string),
RedisCommands::GET(key) => {
- let cache = cache.lock().unwrap();
+ let mut cache = cache.lock().unwrap();
match cache.get(&key).cloned() {
- Some(val) => resp!(val),
+ Some(entry) => {
+ if entry.is_expired() {
+ cache.remove(&key); // Clean up expired key
+ resp!(null)
+ } else {
+ resp!(entry.value)
+ }
+ }
None => resp!(null),
}
}
RedisCommands::SET(command) => {
let mut cache = cache.lock().unwrap();
- cache.insert(command.key.clone(), command.value.clone());
- resp!("OK")
+
+ // Check conditions (NX/XX)
+ let key_exists = cache.contains_key(&command.key);
+
+ match command.condition {
+ Some(SetCondition::NotExists) if key_exists => {
+ return resp!(null); // Key exists, NX failed
+ }
+ Some(SetCondition::Exists) if !key_exists => {
+ return resp!(null); // Key doesn't exist, XX failed
+ }
+ _ => {} // No condition or condition met
+ }
+
+ // Handle GET option
+ let old_value = if command.get_old_value {
+ cache.get(&command.key).map(|e| e.value.clone())
+ } else {
+ None
+ };
+
+ // Calculate expiry
+ let expires_at = if let Some(ExpiryOption::KeepTtl) = command.expiry {
+ // Keep existing TTL
+ cache.get(&command.key).and_then(|e| e.expires_at)
+ } else {
+ command.calculate_expiry_time()
+ };
+
+ // Set the value
+ cache.insert(
+ command.key.clone(),
+ CacheEntry {
+ value: command.value.clone(),
+ expires_at,
+ },
+ );
+
+ match old_value {
+ Some(val) => resp!(val),
+ None => resp!("OK"),
+ }
}
RedisCommands::Invalid => todo!(),
}
@@ -239,7 +275,7 @@ impl From<RespType> for RedisCommands {
// Probably PING
1 => {
if let RespType::BulkString(command_name) = command[0].clone() {
- if command_name == b"PING" {
+ if command_name.to_ascii_uppercase() == b"PING" {
return Self::PING;
} else {
// TODO: Handle the case where it's another command with
@@ -254,9 +290,9 @@ impl From<RespType> for RedisCommands {
if let (RespType::BulkString(command_name), RespType::BulkString(key)) =
(command[0].clone(), command[1].clone())
{
- if command_name == b"GET" {
+ if command_name.to_ascii_uppercase() == b"GET" {
return Self::GET(str::from_utf8(&key).unwrap().to_owned());
- } else if command_name == b"ECHO" {
+ } else if command_name.to_ascii_uppercase() == b"ECHO" {
return Self::ECHO(str::from_utf8(&key).unwrap().to_owned());
} else {
// TODO: Handle the case where it's another command with
@@ -274,7 +310,7 @@ impl From<RespType> for RedisCommands {
RespType::BulkString(value),
) = (command[0].clone(), command[1].clone(), command[2].clone())
{
- if command_name == b"SET" {
+ if command_name.to_ascii_uppercase() == b"SET" {
let set_command = SetCommand::new(
str::from_utf8(&key).unwrap().to_owned(),
str::from_utf8(&value).unwrap().to_owned(),
@@ -288,13 +324,109 @@ impl From<RespType> for RedisCommands {
}
return Self::Invalid;
}
- // Probably SET wit key and value and [NX | XX]
+ // Probably SET wit key and value and [NX | XX] [GET] [EX seconds | PX milliseconds]
4 => {
- todo!()
+ if let (
+ RespType::BulkString(command_name),
+ RespType::BulkString(key),
+ RespType::BulkString(value),
+ RespType::BulkString(option_1),
+ ) = (
+ command[0].clone(),
+ command[1].clone(),
+ command[2].clone(),
+ command[3].clone(),
+ ) {
+ if command_name.to_ascii_uppercase() == b"SET" {
+ let mut get_old_value = false;
+ let mut set_condition: Option<SetCondition> = None;
+ let mut expiry_option: Option<ExpiryOption> = None;
+ match option_1.to_ascii_uppercase().as_slice() {
+ b"GET" => get_old_value = true,
+ b"NX" => set_condition = Some(SetCondition::NotExists),
+ b"XX" => set_condition = Some(SetCondition::Exists),
+ b"KEEPTTL" => expiry_option = Some(ExpiryOption::KeepTtl),
+ _ => unreachable!("If I am here the user provided a non existing command and I should probably make this into an error but I am lazy")
+ }
+ let set_command = SetCommand::new(
+ str::from_utf8(&key).unwrap().to_owned(),
+ str::from_utf8(&value).unwrap().to_owned(),
+ )
+ .with_get(get_old_value)
+ .with_condition(set_condition)
+ .with_expiry(expiry_option);
+ return Self::SET(set_command);
+ } else {
+ // TODO: Handle the case where it's another command with
+ // insufficient arugments
+ return Self::Invalid;
+ }
+ }
+ return Self::Invalid;
}
// Probably SET wit key and value and [NX | XX] and possibly [GET]
5 => {
- todo!()
+ if let (
+ RespType::BulkString(command_name),
+ RespType::BulkString(key),
+ RespType::BulkString(value),
+ RespType::BulkString(option_1),
+ RespType::BulkString(option_2),
+ ) = (
+ command[0].clone(),
+ command[1].clone(),
+ command[2].clone(),
+ command[3].clone(),
+ command[4].clone(),
+ ) {
+ if command_name == b"SET" {
+ let mut get_old_value = false;
+ let mut set_condition: Option<SetCondition> = None;
+ let mut expiry_option: Option<ExpiryOption> = None;
+ let option_2_clone = option_2.clone();
+ match option_1.to_ascii_uppercase().as_slice() {
+ b"NX" => set_condition = Some(SetCondition::NotExists),
+ b"XX" => set_condition = Some(SetCondition::Exists),
+ b"GET" => get_old_value = true,
+ b"EX" => expiry_option = Some(ExpiryOption::Seconds(str::from_utf8(&option_2_clone).unwrap().parse::<u64>().unwrap())),
+ b"PX" => expiry_option = Some(ExpiryOption::Milliseconds(str::from_utf8(&option_2_clone).unwrap().parse::<u64>().unwrap())),
+ b"EXAT" => expiry_option = Some(ExpiryOption::ExpiresAtSeconds(str::from_utf8(&option_2_clone).unwrap().parse::<u64>().unwrap())),
+ b"PXAT" => expiry_option = Some(ExpiryOption::ExpiresAtSeconds(str::from_utf8(&option_2_clone).unwrap().parse::<u64>().unwrap())),
+ b"KEEPTTL" => expiry_option = Some(ExpiryOption::KeepTtl),
+ _ => unreachable!("If I am here the user provided a non existing command and I should probably make this into an error but I am lazy") // TODO: Implement that
+ }
+
+ if set_condition.is_some() {
+ match option_2.to_ascii_uppercase().as_slice() {
+ b"GET" => get_old_value = true,
+ b"KEEPTTL" => expiry_option = Some(ExpiryOption::KeepTtl),
+ _ => unreachable!("If I am here the user provided a non existing command and I should probably make this into an error but I am lazy")
+ }
+ }
+ if get_old_value == true {
+ match option_2.to_ascii_uppercase().as_slice() {
+ b"NX" => set_condition = Some(SetCondition::NotExists),
+ b"XX" => set_condition = Some(SetCondition::Exists),
+ b"KEEPTTL" => expiry_option = Some(ExpiryOption::KeepTtl),
+ _ => unreachable!("If I am here the user provided a non existing command and I should probably make this into an error but I am lazy")
+ }
+ }
+
+ let set_command = SetCommand::new(
+ str::from_utf8(&key).unwrap().to_owned(),
+ str::from_utf8(&value).unwrap().to_owned(),
+ )
+ .with_get(get_old_value)
+ .with_condition(set_condition)
+ .with_expiry(expiry_option);
+ return Self::SET(set_command);
+ } else {
+ // TODO: Handle the case where it's another command with
+ // insufficient arugments
+ return Self::Invalid;
+ }
+ }
+ return Self::Invalid;
}
// Probably SET wit key and value and [NX | XX] and possibly [GET] and that
// other plethora of expiry options
diff --git a/src/resp_parser.rs b/src/resp_parser.rs
index bbea0ea..0313679 100644
--- a/src/resp_parser.rs
+++ b/src/resp_parser.rs
@@ -535,7 +535,8 @@ impl RespType {
// work
format!("*{:?}\r\n{:?}", len, elements).into_bytes()
}
- RespType::Null() => b"_\r\n".into(),
+ // this is just a hack because the platform uses RESP2 in RESP3 it should be "_\r\n"
+ RespType::Null() => b"$-1\r\n".into(),
RespType::Boolean(b) => format!("#{}\r\n", if *b { "t" } else { "f" }).into_bytes(),
RespType::Doubles(d) => format!(",{}\r\n", d).into_bytes(),
RespType::BigNumbers(n) => format!("({}\r\n", n).into_bytes(),