aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoromagdy <omar.professional8777@gmail.com>2025-07-23 07:53:14 +0300
committeromagdy <omar.professional8777@gmail.com>2025-07-23 07:53:14 +0300
commitd7d2377772e31fafb56f8107a6a22df4a26846d9 (patch)
treea718c6d258a758b2711dd8e6ffc45ae0f5b31905
parent1f2f3a241c59f467df5bf16fbde872f5083a174f (diff)
downloadredis-rust-d7d2377772e31fafb56f8107a6a22df4a26846d9.tar.xz
redis-rust-d7d2377772e31fafb56f8107a6a22df4a26846d9.zip
refactor+feat: Did overhaul refactoring for how I model each server data and also add a feat to propagate write commands to replicas
-rw-r--r--src/lib.rs165
-rw-r--r--src/main.rs101
-rw-r--r--src/resp_commands.rs89
-rw-r--r--src/server.rs354
-rw-r--r--tests/test_commands.rs9
5 files changed, 471 insertions, 247 deletions
diff --git a/src/lib.rs b/src/lib.rs
index 4f3f322..4fccf3f 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,170 +1,7 @@
-use resp_parser::RespType;
-use std::{env, sync::Arc};
-use std::{
- io::{Read, Write},
- net::{TcpListener, TcpStream},
-};
-
-use crate::resp_parser::parse;
-
#[macro_use]
pub mod macros;
pub mod rdb;
pub mod resp_commands;
pub mod resp_parser;
+pub mod server;
pub mod shared_cache;
-
-// TODO: Model this in a better way there could be an enum where a Slave Server is distinguised
-// from master servers in 2 different structs
-#[derive(Debug, Clone, Default)]
-pub struct RedisServer {
- pub role: String,
- pub port: String,
- pub master_host: String,
- pub master_port: String,
- pub master_replid: Option<String>,
- pub master_repl_offset: Option<String>,
-}
-
-impl RedisServer {
- fn new() -> Self {
- Self {
- role: "master".to_string(),
- port: "6379".to_string(),
- master_host: "".to_string(),
- master_port: "".to_string(),
- // HACK: Hardcoded for now
- master_replid: Some("8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb".to_string()),
- master_repl_offset: Some("0".to_string()),
- }
- }
-}
-
-#[derive(Debug, Default)]
-pub struct Config {
- pub dir: Option<String>,
- pub dbfilename: Option<String>,
- pub server: RedisServer,
-}
-
-fn handshake_process(slave: &RedisServer) -> Result<(), String> {
- let master_address = format!("{}:{}", slave.master_host, slave.master_port);
- match TcpStream::connect(master_address) {
- Ok(mut stream) => {
- let mut buffer = [0; 512];
-
- let mut send_command = |command: &[u8]| -> Result<(), String> {
- stream
- .write_all(command)
- .map_err(|e| format!("Failed to send: {}", e))?;
-
- match stream.read(&mut buffer) {
- Ok(0) | Err(_) => return Ok(()), // connection closed or error
- Ok(_) => Ok(()),
- }
- };
-
- // PING
- send_command(&resp_bytes!(array => [resp!(bulk "PING")]))?;
-
- // REPLCONF listening-port <PORT>
- send_command(&resp_bytes!(array => [
- resp!(bulk "REPLCONF"),
- resp!(bulk "listening-port"),
- resp!(bulk slave.port.clone())
- ]))?;
-
- // REPLCONF capa psync2
- send_command(&resp_bytes!(array => [
- resp!(bulk "REPLCONF"),
- resp!(bulk "capa"),
- resp!(bulk "psync2")
- ]))?;
-
- // PSYNC <REPL_ID> <REPL_OFFSSET>
- send_command(&resp_bytes!(array => [
- resp!(bulk "PSYNC"),
- resp!(bulk "?"),
- resp!(bulk "-1")
- ]))?;
-
- Ok(())
- }
- Err(e) => Err(format!("Master node doesn't exist: {}", e)),
- }
-}
-
-pub type SharedConfig = Arc<Option<Config>>;
-
-impl Config {
- pub fn new() -> Result<Option<Config>, String> {
- let args: Vec<String> = env::args().collect();
-
- if args.len() == 1 {
- return Ok(None);
- }
-
- let mut dir = None;
- let mut dbfilename = None;
- let mut redis_server = RedisServer::new();
-
- let mut i = 1; // Skip program name
- while i < args.len() {
- match args[i].as_str() {
- "--dir" => {
- if i + 1 >= args.len() {
- return Err("--dir requires a value".to_string());
- }
- dir = Some(args[i + 1].clone());
- i += 2;
- }
- "--dbfilename" => {
- if i + 1 >= args.len() {
- return Err("--dbfilename requires a value".to_string());
- }
- dbfilename = Some(args[i + 1].clone());
- i += 2;
- }
- "--port" => {
- if i + 1 >= args.len() {
- return Err("--port requires a value".to_string());
- }
- redis_server.port = args[i + 1].clone();
- i += 2;
- }
- "--replicaof" => {
- if i + 1 >= args.len() {
- return Err("--replicaof requires a value".to_string());
- }
-
- // TODO: Find a better name for this variable
- let info = args[i + 1].clone();
-
- let (master_host, master_port) = info.split_once(' ').unwrap();
-
- redis_server.role = "slave".to_string();
-
- // slaves don't have master attributes!!
- redis_server.master_replid = None;
- redis_server.master_repl_offset = None;
-
- redis_server.master_host = master_host.to_string();
- redis_server.master_port = master_port.to_string();
-
- handshake_process(&redis_server)?;
-
- i += 2;
- }
- _ => {
- return Err(format!("Unknown argument: {}", args[i]));
- }
- }
- }
-
- Ok(Some(Config {
- dir,
- dbfilename,
- server: redis_server,
- }))
- }
-}
diff --git a/src/main.rs b/src/main.rs
index 78b9c24..25bc6c9 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -16,10 +16,10 @@ use codecrafters_redis::{
resp_bytes,
shared_cache::*,
};
-use codecrafters_redis::{resp_commands::RedisCommands, Config};
+use codecrafters_redis::{resp_commands::RedisCommands, server::RedisServer};
use codecrafters_redis::{
resp_parser::{parse, RespType},
- SharedConfig,
+ server::SlaveServer,
};
fn spawn_cleanup_thread(cache: SharedCache) {
@@ -57,7 +57,7 @@ fn write_rdb_to_stream<W: Write>(writer: &mut W) -> Result<(), Box<dyn std::erro
}
// TODO: This should return a Result to handle the plethora of different errors
-fn handle_client(mut stream: TcpStream, cache: SharedCache, config: SharedConfig) {
+fn handle_client(mut stream: TcpStream, server: Arc<Mutex<RedisServer>>) {
let mut buffer = [0; 512];
loop {
@@ -68,8 +68,8 @@ fn handle_client(mut stream: TcpStream, cache: SharedCache, config: SharedConfig
};
let request = parse(&buffer).unwrap();
- let response =
- RedisCommands::from(request.0.clone()).execute(cache.clone(), config.clone());
+ let server_clone = Arc::clone(&server);
+ let response = RedisCommands::from(request.0.clone()).execute(server_clone);
let mut request_command = "".to_string();
@@ -83,12 +83,23 @@ fn handle_client(mut stream: TcpStream, cache: SharedCache, config: SharedConfig
_ => {}
}
+ // Store the persistent connection
+ let shared_stream = Arc::new(Mutex::new(
+ stream.try_clone().expect("What could go wrong? :)"),
+ ));
+
// if this true immediately write and send back rdb file after response
// HACK: This just feels wrong I feel this shouldn't be handled here and should be handled
// in the exexute command
if request_command.starts_with("PSYNC") {
stream.write(&response).unwrap();
let _ = write_rdb_to_stream(&mut stream);
+ // handshake completed and I should add the server sending me the handshake to my replicas
+ let replica_addr = stream
+ .peer_addr()
+ .expect("This shouldn't fail right? right?? :)");
+ let mut server = server.lock().unwrap();
+ server.add_replica(replica_addr, shared_stream);
} else {
// write respose back to the client
stream.write(&response).unwrap();
@@ -97,62 +108,56 @@ fn handle_client(mut stream: TcpStream, cache: SharedCache, config: SharedConfig
}
fn main() -> std::io::Result<()> {
- let cache: SharedCache = Arc::new(Mutex::new(HashMap::new()));
- let mut config: SharedConfig = Arc::new(None);
- let mut port = "6379".to_string();
-
- match Config::new() {
- Ok(conf) => {
- if let Some(conf) = conf {
- let mut cache = cache.lock().unwrap();
- let dir = conf.dir.clone().unwrap_or("".to_string());
- let dbfilename = conf.dbfilename.clone().unwrap_or("".to_string());
- let redis_server = conf.server.clone();
- port = redis_server.port.clone();
- if let Ok(rdb_file) = RDBFile::read(dir, dbfilename) {
- if let Some(rdb) = rdb_file {
- let hash_table = &rdb.databases.get(&0).unwrap().hash_table;
-
- for (key, db_entry) in hash_table.iter() {
- let value = match &db_entry.value {
- RedisValue::String(data) => {
- String::from_utf8(data.clone()).unwrap()
- }
- RedisValue::Integer(data) => data.to_string(),
- _ => {
- unreachable!()
- }
- };
- let expires_at = if let Some(key_expiry) = &db_entry.expiry {
- Some(key_expiry.timestamp)
- } else {
- None
- };
- let cache_entry = CacheEntry { value, expires_at };
- cache.insert(String::from_utf8(key.clone()).unwrap(), cache_entry);
- }
- }
- }
- config = Arc::new(Some(conf));
- }
- }
+ let server = match RedisServer::new() {
+ Ok(Some(server)) => server,
+ Ok(None) => RedisServer::master(), // Default to master if no args
Err(e) => {
eprintln!("Error: {}", e);
std::process::exit(1);
}
+ };
+
+ // Load RDB file if dir and dbfilename are provided
+ if let (Some(dir), Some(dbfilename)) = (server.dir().clone(), server.dbfilename().clone()) {
+ if let Ok(rdb_file) = RDBFile::read(dir, dbfilename) {
+ if let Some(rdb) = rdb_file {
+ let mut cache = server.cache().lock().unwrap();
+ let hash_table = &rdb.databases.get(&0).unwrap().hash_table;
+
+ for (key, db_entry) in hash_table.iter() {
+ let value = match &db_entry.value {
+ RedisValue::String(data) => String::from_utf8(data.clone()).unwrap(),
+ RedisValue::Integer(data) => data.to_string(),
+ _ => {
+ unreachable!()
+ }
+ };
+ let expires_at = if let Some(key_expiry) = &db_entry.expiry {
+ Some(key_expiry.timestamp)
+ } else {
+ None
+ };
+ let cache_entry = CacheEntry { value, expires_at };
+ cache.insert(String::from_utf8(key.clone()).unwrap(), cache_entry);
+ }
+ }
+ }
}
+ let port = server.port().to_string();
let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
- spawn_cleanup_thread(cache.clone());
+ spawn_cleanup_thread(server.cache().clone());
+
+ let server = Arc::new(Mutex::new(server));
for stream in listener.incoming() {
match stream {
Ok(stream) => {
- let cache_clone = cache.clone();
- let config_clone = Arc::clone(&config);
+ let server_clone = Arc::clone(&server);
+ // TODO: Use tokio instead of multi threads to handle multiple clients
thread::spawn(|| {
- handle_client(stream, cache_clone, config_clone);
+ handle_client(stream, server_clone);
});
}
Err(e) => {
diff --git a/src/resp_commands.rs b/src/resp_commands.rs
index 3c18b07..03ec9dc 100644
--- a/src/resp_commands.rs
+++ b/src/resp_commands.rs
@@ -1,6 +1,7 @@
+use crate::server::*;
use crate::{resp_parser::*, shared_cache::*};
-use crate::{RedisServer, SharedConfig};
use regex::Regex;
+use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone)]
@@ -123,13 +124,14 @@ pub enum RedisCommands {
}
impl RedisCommands {
- pub fn execute(self, cache: SharedCache, config: SharedConfig) -> Vec<u8> {
+ pub fn execute(self, server: Arc<Mutex<RedisServer>>) -> Vec<u8> {
use RedisCommands as RC;
match self {
RC::Ping => resp_bytes!("PONG"),
RC::Echo(echo_string) => resp_bytes!(echo_string),
RC::Get(key) => {
- let mut cache = cache.lock().unwrap();
+ let server = server.lock().unwrap();
+ let mut cache = server.cache().lock().unwrap();
match cache.get(&key).cloned() {
Some(entry) => {
@@ -144,7 +146,8 @@ impl RedisCommands {
}
}
RC::Set(command) => {
- let mut cache = cache.lock().unwrap();
+ let mut server = server.lock().unwrap();
+ let mut cache = server.cache().lock().unwrap();
// Check conditions (NX/XX)
let key_exists = cache.contains_key(&command.key);
@@ -187,6 +190,18 @@ impl RedisCommands {
},
);
+ // Broadcast SET to replicas after mutating local state
+ let broadcast_cmd = resp_bytes!(array => [
+ resp!(bulk "SET"),
+ resp!(bulk command.key),
+ resp!(bulk command.value)
+ ]);
+
+ // Unlock the mutex so that I can access to broadcast the messaage
+ drop(cache);
+
+ server.broadcast_command(&broadcast_cmd);
+
if !command.get_old_value {
return resp_bytes!("OK");
}
@@ -198,10 +213,8 @@ impl RedisCommands {
}
RC::ConfigGet(s) => {
use RespType as RT;
- let config = config.clone();
- if let Some(conf) = config.as_ref() {
- let dir = conf.dir.clone().unwrap();
- let dbfilename = conf.dbfilename.clone().unwrap();
+ let server = server.lock().unwrap();
+ if let (Some(dir), Some(dbfilename)) = (server.dir(), server.dbfilename()) {
match s.as_str() {
"dir" => RT::Array(vec![
RT::BulkString(s.as_bytes().to_vec()),
@@ -224,7 +237,8 @@ impl RedisCommands {
let query = query.replace('*', ".*");
- let cache = cache.lock().unwrap();
+ let server = server.lock().unwrap();
+ let cache = server.cache().lock().unwrap();
let regex = Regex::new(&query).unwrap();
let matching_keys: Vec<RT> = cache
.keys()
@@ -238,36 +252,47 @@ impl RedisCommands {
}
RC::Info(_sub_command) => {
use RespType as RT;
- let config = config.clone();
- let mut server = RedisServer::new();
- if let Some(conf) = config.as_ref() {
- server = conf.server.clone();
+ let server = server.lock().unwrap();
+ match &*server {
+ RedisServer::Master(master) => {
+ let response = format!(
+ "# Replication\r\nrole:{}master_replid:{}master_repl_offset:{}",
+ server.role(),
+ master.replid.clone().unwrap_or("".to_string()),
+ master.current_offset.clone().unwrap_or("".to_string())
+ )
+ .as_bytes()
+ .to_vec();
+ RT::BulkString(response).to_resp_bytes()
+ }
+ RedisServer::Slave(slave) => {
+ let response = format!(
+ "# Replication\r\nrole:{}master_replid:{}master_repl_offset:{}",
+ server.role(),
+ slave.master_replid.clone().unwrap_or("".to_string()),
+ slave.master_repl_offset.clone().unwrap_or("".to_string())
+ )
+ .as_bytes()
+ .to_vec();
+ RT::BulkString(response).to_resp_bytes()
+ }
}
- let response = format!(
- "# Replication\r\nrole:{}master_replid:{}master_repl_offset:{}",
- server.role,
- server.master_replid.unwrap_or("".to_string()),
- server.master_repl_offset.unwrap_or("".to_string())
- )
- .as_bytes()
- .to_vec();
- RT::BulkString(response).to_resp_bytes()
}
RC::ReplConf((_, _)) => {
resp_bytes!("OK")
}
RC::Psync((_, _)) => {
- let config = config.clone();
- let mut server = RedisServer::new();
- if let Some(conf) = config.as_ref() {
- server = conf.server.clone();
+ let server = server.lock().unwrap();
+ if let RedisServer::Master(master) = &*server {
+ let response = format!(
+ "FULLRESYNC {} 0",
+ master.replid.clone().unwrap_or("".to_string()),
+ );
+ resp_bytes!(response)
+ } else {
+ // TODO: Find a way to report this error back up the program trace
+ unreachable!("I should never come here")
}
- let response = format!(
- "FULLRESYNC {} 0",
- server.master_replid.unwrap_or("".to_string()),
- );
-
- resp_bytes!(response)
}
RC::Invalid => todo!(),
}
diff --git a/src/server.rs b/src/server.rs
new file mode 100644
index 0000000..2d7cca7
--- /dev/null
+++ b/src/server.rs
@@ -0,0 +1,354 @@
+use crate::resp_parser::RespType;
+use std::collections::HashMap;
+use std::io::{Read, Write};
+use std::net::{SocketAddr, TcpStream};
+use std::sync::{Arc, Mutex};
+use std::{env, thread};
+
+use crate::shared_cache::SharedCache;
+
+#[derive(Debug, Clone)]
+pub struct MasterServer {
+ pub dir: Option<String>,
+ pub dbfilename: Option<String>,
+ pub replid: Option<String>,
+ pub current_offset: Option<String>,
+ pub port: String,
+ pub cache: SharedCache,
+ replicas: Vec<SlaveServer>,
+}
+
+impl MasterServer {
+ fn new() -> Self {
+ Self {
+ dir: None,
+ dbfilename: None,
+ port: "6379".to_string(),
+ replid: Some("8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb".to_string()),
+ current_offset: Some("0".to_string()),
+ cache: Arc::new(Mutex::new(HashMap::new())),
+ replicas: vec![],
+ }
+ }
+
+ fn port(&self) -> &str {
+ &self.port
+ }
+
+ pub fn broadcast_command(&mut self, command: &[u8]) {
+ println!("Hello from brodcast");
+ self.replicas.retain(|replica| {
+ if let Some(conn) = &replica.connection {
+ let mut conn = conn.lock().unwrap();
+ if let Err(e) = conn.write_all(command) {
+ eprintln!("Failed to send to replica {}: {}", replica.port, e);
+ false // Drop dead connections
+ } else {
+ true
+ }
+ } else {
+ false
+ }
+ });
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct SlaveServer {
+ pub dir: Option<String>,
+ pub dbfilename: Option<String>,
+ pub port: String,
+ pub master_replid: Option<String>,
+ pub master_repl_offset: Option<String>,
+ pub master_host: String,
+ pub master_port: String,
+ pub connection: Option<Arc<Mutex<TcpStream>>>,
+ pub cache: SharedCache,
+}
+
+impl SlaveServer {
+ fn new(
+ port: String,
+ master_host: String,
+ master_port: String,
+ connection: Option<Arc<Mutex<TcpStream>>>,
+ ) -> Self {
+ Self {
+ dir: None,
+ dbfilename: None,
+ port,
+ master_replid: Some("8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb".to_string()),
+ master_repl_offset: Some("0".to_string()),
+ master_host,
+ master_port,
+ connection,
+ cache: Arc::new(Mutex::new(HashMap::new())),
+ }
+ }
+
+ fn connect(&self) -> Result<TcpStream, std::io::Error> {
+ let master_address = format!("{}:{}", self.master_host, self.master_port);
+ return TcpStream::connect(master_address);
+ }
+
+ fn handshake(&mut self) -> Result<(), String> {
+ match self.connect() {
+ Ok(mut stream) => {
+ let mut buffer = [0; 512];
+
+ let mut send_command = |command: &[u8]| -> Result<(), String> {
+ stream
+ .write_all(command)
+ .map_err(|e| format!("Failed to send: {}", e))?;
+
+ match stream.read(&mut buffer) {
+ Ok(0) | Err(_) => return Ok(()), // connection closed or error
+ Ok(_) => Ok(()),
+ }
+ };
+
+ // PING
+ send_command(&resp_bytes!(array => [resp!(bulk "PING")]))?;
+
+ // REPLCONF listening-port <PORT>
+ send_command(&resp_bytes!(array => [
+ resp!(bulk "REPLCONF"),
+ resp!(bulk "listening-port"),
+ resp!(bulk self.port.clone())
+ ]))?;
+
+ // REPLCONF capa psync2
+ send_command(&resp_bytes!(array => [
+ resp!(bulk "REPLCONF"),
+ resp!(bulk "capa"),
+ resp!(bulk "psync2")
+ ]))?;
+
+ // PSYNC <REPL_ID> <REPL_OFFSSET>
+ send_command(&resp_bytes!(array => [
+ resp!(bulk "PSYNC"),
+ resp!(bulk "?"),
+ resp!(bulk "-1")
+ ]))?;
+
+ // Store the persistent connection
+ let shared_stream = Arc::new(Mutex::new(stream));
+ self.connection = Some(shared_stream.clone());
+
+ // Spawn the background listener thread
+ thread::spawn(move || {
+ let mut buffer = [0u8; 1024];
+ loop {
+ let mut stream = shared_stream.lock().unwrap();
+ match stream.read(&mut buffer) {
+ Ok(0) => {
+ println!("Master disconnected");
+ break;
+ }
+ Ok(n) => {
+ println!(
+ "REPLICA received: {}",
+ String::from_utf8_lossy(&buffer[..n])
+ );
+ }
+ Err(e) => {
+ eprintln!("Error reading from master: {}", e);
+ break;
+ }
+ }
+ }
+ });
+
+ Ok(())
+ }
+ Err(e) => Err(format!("Master node doesn't exist: {}", e)),
+ }
+ }
+}
+
+#[derive(Debug, Clone)]
+pub enum RedisServer {
+ Master(MasterServer),
+ Slave(SlaveServer),
+}
+
+impl RedisServer {
+ pub fn master() -> Self {
+ RedisServer::Master(MasterServer::new())
+ }
+
+ pub fn slave(port: String, master_host: String, master_port: String) -> Self {
+ RedisServer::Slave(SlaveServer::new(port, master_host, master_port, None))
+ }
+
+ // Helper methods to access common fields regardless of variant
+ pub fn port(&self) -> &str {
+ match self {
+ RedisServer::Master(master) => &master.port,
+ RedisServer::Slave(slave) => &slave.port,
+ }
+ }
+
+ pub fn set_port(&mut self, port: String) {
+ match self {
+ RedisServer::Master(master) => master.port = port,
+ RedisServer::Slave(slave) => slave.port = port,
+ }
+ }
+
+ pub fn dir(&self) -> &Option<String> {
+ match self {
+ RedisServer::Master(master) => &master.dir,
+ RedisServer::Slave(slave) => &slave.dir,
+ }
+ }
+
+ pub fn set_dir(&mut self, dir: Option<String>) {
+ match self {
+ RedisServer::Master(master) => master.dir = dir,
+ RedisServer::Slave(slave) => slave.dir = dir,
+ }
+ }
+
+ pub fn dbfilename(&self) -> &Option<String> {
+ match self {
+ RedisServer::Master(master) => &master.dbfilename,
+ RedisServer::Slave(slave) => &slave.dbfilename,
+ }
+ }
+
+ pub fn set_dbfilename(&mut self, dbfilename: Option<String>) {
+ match self {
+ RedisServer::Master(master) => master.dbfilename = dbfilename,
+ RedisServer::Slave(slave) => slave.dbfilename = dbfilename,
+ }
+ }
+
+ pub fn cache(&self) -> &SharedCache {
+ match self {
+ RedisServer::Master(master) => &master.cache,
+ RedisServer::Slave(slave) => &slave.cache,
+ }
+ }
+
+ pub fn set_cache(&mut self, cache: &SharedCache) {
+ match self {
+ RedisServer::Master(master) => master.cache = cache.clone(),
+ RedisServer::Slave(slave) => slave.cache = cache.clone(),
+ }
+ }
+
+ pub fn role(&self) -> &str {
+ match self {
+ RedisServer::Master(_) => "master",
+ RedisServer::Slave(_) => "slave",
+ }
+ }
+
+ pub fn add_replica(&mut self, replica_adr: SocketAddr, connection: Arc<Mutex<TcpStream>>) {
+ match self {
+ // TODO: Should probably add host to MasterServer and SlaveServer as member field
+ RedisServer::Master(master) => master.replicas.push(SlaveServer::new(
+ replica_adr.port().to_string(),
+ "localhost".to_owned(),
+ master.port().to_owned(),
+ Some(connection),
+ )),
+ RedisServer::Slave(_) => {
+ unreachable!("Slaves don't have replicas")
+ }
+ }
+ }
+
+ pub fn broadcast_command(&mut self, command: &[u8]) {
+ if let RedisServer::Master(master) = self {
+ master.broadcast_command(command);
+ }
+ }
+
+ pub fn is_master(&self) -> bool {
+ matches!(self, RedisServer::Master(_))
+ }
+
+ pub fn is_slave(&self) -> bool {
+ matches!(self, RedisServer::Slave(_))
+ }
+}
+
+impl RedisServer {
+ pub fn new() -> Result<Option<RedisServer>, String> {
+ let args: Vec<String> = env::args().collect();
+
+ if args.len() == 1 {
+ return Ok(None);
+ }
+
+ let mut redis_server = RedisServer::master();
+ let mut dir = None;
+ let mut dbfilename = None;
+
+ let mut i = 1; // Skip program name
+ while i < args.len() {
+ match args[i].as_str() {
+ "--dir" => {
+ if i + 1 >= args.len() {
+ return Err("--dir requires a value".to_string());
+ }
+ dir = Some(args[i + 1].clone());
+ i += 2;
+ }
+ "--dbfilename" => {
+ if i + 1 >= args.len() {
+ return Err("--dbfilename requires a value".to_string());
+ }
+ dbfilename = Some(args[i + 1].clone());
+ i += 2;
+ }
+ "--port" => {
+ if i + 1 >= args.len() {
+ return Err("--port requires a value".to_string());
+ }
+ redis_server.set_port(args[i + 1].clone());
+ i += 2;
+ }
+ "--replicaof" => {
+ if i + 1 >= args.len() {
+ return Err("--replicaof requires a value".to_string());
+ }
+
+ // TODO: Find a better name for this variable info
+ let info = args[i + 1].clone();
+ let (master_host, master_port) = info.split_once(' ').ok_or_else(|| {
+ "Invalid --replicaof format. Expected 'host port'".to_string()
+ })?;
+
+ // Get current port or use default
+ let current_port = redis_server.port().to_string();
+
+ // Create new slave server
+ redis_server = RedisServer::slave(
+ current_port,
+ master_host.to_string(),
+ master_port.to_string(),
+ );
+
+ // Perform handshake
+ if let RedisServer::Slave(mut slave) = redis_server.clone() {
+ slave.handshake()?;
+ }
+
+ i += 2;
+ }
+ _ => {
+ return Err(format!("Unknown argument: {}", args[i]));
+ }
+ }
+ }
+
+ // Set dir and dbfilename after server is finalized
+ redis_server.set_dir(dir);
+ redis_server.set_dbfilename(dbfilename);
+
+ Ok(Some(redis_server))
+ }
+}
diff --git a/tests/test_commands.rs b/tests/test_commands.rs
index e71db38..86b9cbc 100644
--- a/tests/test_commands.rs
+++ b/tests/test_commands.rs
@@ -141,7 +141,8 @@ mod command_parser_tests {
/// Tests for the command execution logic in `RedisCommands::execute`.
mod command_execution_tests {
- use codecrafters_redis::{resp_commands::RedisCommands, Config};
+ use codecrafters_redis::resp_commands::RedisCommands;
+ use codecrafters_redis::server::RedisServer;
use std::time::Duration;
use super::*;
@@ -149,8 +150,10 @@ mod command_execution_tests {
/// Helper to parse and execute a command against a cache.
fn run_command(cache: &SharedCache, args: &[&str]) -> Vec<u8> {
let command = RedisCommands::from(build_command_from_str_slice(args));
- let config = Arc::new(Some(Config::default()));
- command.execute(Arc::clone(cache), config)
+ let mut server = RedisServer::master();
+ server.set_cache(cache);
+
+ command.execute(Arc::new(Mutex::new(server)))
}
#[test]