aboutsummaryrefslogtreecommitdiff
path: root/src/resp_commands.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/resp_commands.rs')
-rw-r--r--src/resp_commands.rs122
1 files changed, 60 insertions, 62 deletions
diff --git a/src/resp_commands.rs b/src/resp_commands.rs
index 453024c..981f7aa 100644
--- a/src/resp_commands.rs
+++ b/src/resp_commands.rs
@@ -1,7 +1,6 @@
use crate::server::*;
use crate::{resp_parser::*, shared_cache::*};
use regex::Regex;
-use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone)]
@@ -110,7 +109,7 @@ fn extract_string(resp: &RespType) -> Option<String> {
}
}
-pub enum RedisCommands {
+pub enum RedisCommand {
Ping,
Echo(String),
Get(String),
@@ -123,15 +122,20 @@ pub enum RedisCommands {
Invalid,
}
-impl RedisCommands {
- pub fn execute(self, server: Arc<Mutex<RedisServer>>) -> Vec<u8> {
- use RedisCommands as RC;
+impl RedisCommand {
+ pub fn execute<'a>(
+ self,
+ cache: SharedMut<Cache>,
+ config: Shared<ServerConfig>,
+ server_state: ServerState,
+ broadcaster: Option<SharedMut<&mut dyn CanBroadcast>>,
+ ) -> Vec<u8> {
+ use RedisCommand as RC;
match self {
RC::Ping => resp_bytes!("PONG"),
RC::Echo(echo_string) => resp_bytes!(echo_string),
RC::Get(key) => {
- let server = server.lock().unwrap();
- let mut cache = server.cache().lock().unwrap();
+ let mut cache = cache.lock().unwrap();
match cache.get(&key).cloned() {
Some(entry) => {
@@ -146,8 +150,7 @@ impl RedisCommands {
}
}
RC::Set(command) => {
- let mut server = server.lock().unwrap();
- let mut cache = server.cache().lock().unwrap();
+ let mut cache = cache.lock().unwrap();
// Check conditions (NX/XX)
let key_exists = cache.contains_key(&command.key);
@@ -190,17 +193,24 @@ impl RedisCommands {
},
);
- // Broadcast SET to replicas after mutating local state
- let broadcast_cmd = resp_bytes!(array => [
- resp!(bulk "SET"),
- resp!(bulk command.key),
- resp!(bulk command.value)
- ]);
-
- // Unlock the mutex so that I can access to broadcast the messaage
- drop(cache);
+ println!(
+ "My role is {:?} and I just inserted {} to {}",
+ server_state.role, &command.key, command.value
+ );
- server.broadcast_command(&broadcast_cmd);
+ // Broadcast to replicas if this is a master
+ if let Some(broadcaster) = broadcaster {
+ // Broadcast SET to replicas after mutating local state
+ let broadcast_cmd = resp_bytes!(array => [
+ resp!(bulk "SET"),
+ resp!(bulk command.key),
+ resp!(bulk command.value)
+ ]);
+ broadcaster
+ .lock()
+ .unwrap()
+ .broadcast_command_to_replicas(&broadcast_cmd);
+ }
if !command.get_old_value {
return resp_bytes!("OK");
@@ -213,8 +223,9 @@ impl RedisCommands {
}
RC::ConfigGet(s) => {
use RespType as RT;
- let server = server.lock().unwrap();
- if let (Some(dir), Some(dbfilename)) = (server.dir(), server.dbfilename()) {
+ if let (Some(dir), Some(dbfilename)) =
+ (config.dir.clone(), config.dbfilename.clone())
+ {
match s.as_str() {
"dir" => RT::Array(vec![
RT::BulkString(s.as_bytes().to_vec()),
@@ -236,9 +247,7 @@ impl RedisCommands {
use RespType as RT;
let query = query.replace('*', ".*");
-
- let server = server.lock().unwrap();
- let cache = server.cache().lock().unwrap();
+ let cache = cache.lock().unwrap();
let regex = Regex::new(&query).unwrap();
let matching_keys: Vec<RT> = cache
.keys()
@@ -252,53 +261,42 @@ impl RedisCommands {
}
RC::Info(_sub_command) => {
use RespType as RT;
- let server = server.lock().unwrap();
- match &*server {
- RedisServer::Master(master) => {
- let response = format!(
- "# Replication\r\nrole:{}master_replid:{}master_repl_offset:{}",
- server.role(),
- master.replid.clone().unwrap_or("".to_string()),
- master.current_offset.lock().unwrap(),
- )
- .as_bytes()
- .to_vec();
- RT::BulkString(response).to_resp_bytes()
- }
- RedisServer::Slave(slave) => {
- let response = format!(
- "# Replication\r\nrole:{}master_replid:{}master_repl_offset:{}",
- server.role(),
- slave.master_replid.clone().unwrap_or("".to_string()),
- slave.master_repl_offset.lock().unwrap()
- )
- .as_bytes()
- .to_vec();
- RT::BulkString(response).to_resp_bytes()
- }
- }
+ let role_str = match server_state.role {
+ ServerRole::Master => "master",
+ ServerRole::Slave => "slave",
+ };
+
+ let response = format!(
+ "# Replication\r\nrole:{}\r\nmaster_replid:{}\r\nmaster_repl_offset:{}",
+ role_str, server_state.repl_id, server_state.repl_offset,
+ )
+ .into_bytes();
+
+ RT::BulkString(response).to_resp_bytes()
}
RC::ReplConf((op1, op2)) => match (op1.to_uppercase().as_str(), op2.as_str()) {
("GETACK", "*") => {
println!("Did i even get here?");
- resp_bytes!(array => [resp!(bulk "REPLCONF"), resp!(bulk "ACK"), resp!(bulk server.lock().unwrap().repl_offset().to_string())])
+ resp_bytes!(array => [resp!(bulk "REPLCONF"), resp!(bulk "ACK"), resp!(bulk server_state.repl_offset.to_string())])
}
_ => resp_bytes!("OK"),
},
RC::Psync((_, _)) => {
- let server = server.lock().unwrap();
- if let RedisServer::Master(master) = &*server {
- let response = format!(
- "FULLRESYNC {} 0",
- master.replid.clone().unwrap_or("".to_string()),
- );
- resp_bytes!(response)
- } else {
- // TODO: Find a way to report this error back up the program trace
- unreachable!("I should never come here")
+ // This should only be called on masters
+ match server_state.role {
+ ServerRole::Master => {
+ let response = format!("FULLRESYNC {} 0", server_state.repl_id);
+ resp_bytes!(response)
+ }
+ // This shouldn't happen, but handle gracefully
+ ServerRole::Slave => {
+ // TODO: I actually forgot that you could send error messages with redis do
+ // more of this across the whole codebase when it makes sense
+ resp_bytes!(error "ERR PSYNC not supported on slave")
+ }
}
}
- RC::Invalid => todo!(),
+ RC::Invalid => resp_bytes!(error "ERR Invalid Command"),
}
}
}
@@ -403,7 +401,7 @@ impl SetOptionParser {
}
}
-impl From<RespType> for RedisCommands {
+impl From<RespType> for RedisCommand {
fn from(value: RespType) -> Self {
// Alternative approach using a more functional style with iterators
let RespType::Array(command) = value else {