aboutsummaryrefslogtreecommitdiff
path: root/src/resp_commands.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/resp_commands.rs')
-rw-r--r--src/resp_commands.rs89
1 files changed, 57 insertions, 32 deletions
diff --git a/src/resp_commands.rs b/src/resp_commands.rs
index 3c18b07..03ec9dc 100644
--- a/src/resp_commands.rs
+++ b/src/resp_commands.rs
@@ -1,6 +1,7 @@
+use crate::server::*;
use crate::{resp_parser::*, shared_cache::*};
-use crate::{RedisServer, SharedConfig};
use regex::Regex;
+use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone)]
@@ -123,13 +124,14 @@ pub enum RedisCommands {
}
impl RedisCommands {
- pub fn execute(self, cache: SharedCache, config: SharedConfig) -> Vec<u8> {
+ pub fn execute(self, server: Arc<Mutex<RedisServer>>) -> Vec<u8> {
use RedisCommands as RC;
match self {
RC::Ping => resp_bytes!("PONG"),
RC::Echo(echo_string) => resp_bytes!(echo_string),
RC::Get(key) => {
- let mut cache = cache.lock().unwrap();
+ let server = server.lock().unwrap();
+ let mut cache = server.cache().lock().unwrap();
match cache.get(&key).cloned() {
Some(entry) => {
@@ -144,7 +146,8 @@ impl RedisCommands {
}
}
RC::Set(command) => {
- let mut cache = cache.lock().unwrap();
+ let mut server = server.lock().unwrap();
+ let mut cache = server.cache().lock().unwrap();
// Check conditions (NX/XX)
let key_exists = cache.contains_key(&command.key);
@@ -187,6 +190,18 @@ impl RedisCommands {
},
);
+ // Broadcast SET to replicas after mutating local state
+ let broadcast_cmd = resp_bytes!(array => [
+ resp!(bulk "SET"),
+ resp!(bulk command.key),
+ resp!(bulk command.value)
+ ]);
+
+ // Unlock the mutex so that I can access to broadcast the messaage
+ drop(cache);
+
+ server.broadcast_command(&broadcast_cmd);
+
if !command.get_old_value {
return resp_bytes!("OK");
}
@@ -198,10 +213,8 @@ impl RedisCommands {
}
RC::ConfigGet(s) => {
use RespType as RT;
- let config = config.clone();
- if let Some(conf) = config.as_ref() {
- let dir = conf.dir.clone().unwrap();
- let dbfilename = conf.dbfilename.clone().unwrap();
+ let server = server.lock().unwrap();
+ if let (Some(dir), Some(dbfilename)) = (server.dir(), server.dbfilename()) {
match s.as_str() {
"dir" => RT::Array(vec![
RT::BulkString(s.as_bytes().to_vec()),
@@ -224,7 +237,8 @@ impl RedisCommands {
let query = query.replace('*', ".*");
- let cache = cache.lock().unwrap();
+ let server = server.lock().unwrap();
+ let cache = server.cache().lock().unwrap();
let regex = Regex::new(&query).unwrap();
let matching_keys: Vec<RT> = cache
.keys()
@@ -238,36 +252,47 @@ impl RedisCommands {
}
RC::Info(_sub_command) => {
use RespType as RT;
- let config = config.clone();
- let mut server = RedisServer::new();
- if let Some(conf) = config.as_ref() {
- server = conf.server.clone();
+ let server = server.lock().unwrap();
+ match &*server {
+ RedisServer::Master(master) => {
+ let response = format!(
+ "# Replication\r\nrole:{}master_replid:{}master_repl_offset:{}",
+ server.role(),
+ master.replid.clone().unwrap_or("".to_string()),
+ master.current_offset.clone().unwrap_or("".to_string())
+ )
+ .as_bytes()
+ .to_vec();
+ RT::BulkString(response).to_resp_bytes()
+ }
+ RedisServer::Slave(slave) => {
+ let response = format!(
+ "# Replication\r\nrole:{}master_replid:{}master_repl_offset:{}",
+ server.role(),
+ slave.master_replid.clone().unwrap_or("".to_string()),
+ slave.master_repl_offset.clone().unwrap_or("".to_string())
+ )
+ .as_bytes()
+ .to_vec();
+ RT::BulkString(response).to_resp_bytes()
+ }
}
- let response = format!(
- "# Replication\r\nrole:{}master_replid:{}master_repl_offset:{}",
- server.role,
- server.master_replid.unwrap_or("".to_string()),
- server.master_repl_offset.unwrap_or("".to_string())
- )
- .as_bytes()
- .to_vec();
- RT::BulkString(response).to_resp_bytes()
}
RC::ReplConf((_, _)) => {
resp_bytes!("OK")
}
RC::Psync((_, _)) => {
- let config = config.clone();
- let mut server = RedisServer::new();
- if let Some(conf) = config.as_ref() {
- server = conf.server.clone();
+ let server = server.lock().unwrap();
+ if let RedisServer::Master(master) = &*server {
+ let response = format!(
+ "FULLRESYNC {} 0",
+ master.replid.clone().unwrap_or("".to_string()),
+ );
+ resp_bytes!(response)
+ } else {
+ // TODO: Find a way to report this error back up the program trace
+ unreachable!("I should never come here")
}
- let response = format!(
- "FULLRESYNC {} 0",
- server.master_replid.unwrap_or("".to_string()),
- );
-
- resp_bytes!(response)
}
RC::Invalid => todo!(),
}