From d7d2377772e31fafb56f8107a6a22df4a26846d9 Mon Sep 17 00:00:00 2001 From: omagdy Date: Wed, 23 Jul 2025 07:53:14 +0300 Subject: refactor+feat: Did overhaul refactoring for how I model each server data and also add a feat to propagate write commands to replicas --- src/main.rs | 101 +++++++++++++++++++++++++++++++----------------------------- 1 file changed, 53 insertions(+), 48 deletions(-) (limited to 'src/main.rs') diff --git a/src/main.rs b/src/main.rs index 78b9c24..25bc6c9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,10 +16,10 @@ use codecrafters_redis::{ resp_bytes, shared_cache::*, }; -use codecrafters_redis::{resp_commands::RedisCommands, Config}; +use codecrafters_redis::{resp_commands::RedisCommands, server::RedisServer}; use codecrafters_redis::{ resp_parser::{parse, RespType}, - SharedConfig, + server::SlaveServer, }; fn spawn_cleanup_thread(cache: SharedCache) { @@ -57,7 +57,7 @@ fn write_rdb_to_stream(writer: &mut W) -> Result<(), Box>) { let mut buffer = [0; 512]; loop { @@ -68,8 +68,8 @@ fn handle_client(mut stream: TcpStream, cache: SharedCache, config: SharedConfig }; let request = parse(&buffer).unwrap(); - let response = - RedisCommands::from(request.0.clone()).execute(cache.clone(), config.clone()); + let server_clone = Arc::clone(&server); + let response = RedisCommands::from(request.0.clone()).execute(server_clone); let mut request_command = "".to_string(); @@ -83,12 +83,23 @@ fn handle_client(mut stream: TcpStream, cache: SharedCache, config: SharedConfig _ => {} } + // Store the persistent connection + let shared_stream = Arc::new(Mutex::new( + stream.try_clone().expect("What could go wrong? :)"), + )); + // if this true immediately write and send back rdb file after response // HACK: This just feels wrong I feel this shouldn't be handled here and should be handled // in the exexute command if request_command.starts_with("PSYNC") { stream.write(&response).unwrap(); let _ = write_rdb_to_stream(&mut stream); + // handshake completed and I should add the server sending me the handshake to my replicas + let replica_addr = stream + .peer_addr() + .expect("This shouldn't fail right? right?? :)"); + let mut server = server.lock().unwrap(); + server.add_replica(replica_addr, shared_stream); } else { // write respose back to the client stream.write(&response).unwrap(); @@ -97,62 +108,56 @@ fn handle_client(mut stream: TcpStream, cache: SharedCache, config: SharedConfig } fn main() -> std::io::Result<()> { - let cache: SharedCache = Arc::new(Mutex::new(HashMap::new())); - let mut config: SharedConfig = Arc::new(None); - let mut port = "6379".to_string(); - - match Config::new() { - Ok(conf) => { - if let Some(conf) = conf { - let mut cache = cache.lock().unwrap(); - let dir = conf.dir.clone().unwrap_or("".to_string()); - let dbfilename = conf.dbfilename.clone().unwrap_or("".to_string()); - let redis_server = conf.server.clone(); - port = redis_server.port.clone(); - if let Ok(rdb_file) = RDBFile::read(dir, dbfilename) { - if let Some(rdb) = rdb_file { - let hash_table = &rdb.databases.get(&0).unwrap().hash_table; - - for (key, db_entry) in hash_table.iter() { - let value = match &db_entry.value { - RedisValue::String(data) => { - String::from_utf8(data.clone()).unwrap() - } - RedisValue::Integer(data) => data.to_string(), - _ => { - unreachable!() - } - }; - let expires_at = if let Some(key_expiry) = &db_entry.expiry { - Some(key_expiry.timestamp) - } else { - None - }; - let cache_entry = CacheEntry { value, expires_at }; - cache.insert(String::from_utf8(key.clone()).unwrap(), cache_entry); - } - } - } - config = Arc::new(Some(conf)); - } - } + let server = match RedisServer::new() { + Ok(Some(server)) => server, + Ok(None) => RedisServer::master(), // Default to master if no args Err(e) => { eprintln!("Error: {}", e); std::process::exit(1); } + }; + + // Load RDB file if dir and dbfilename are provided + if let (Some(dir), Some(dbfilename)) = (server.dir().clone(), server.dbfilename().clone()) { + if let Ok(rdb_file) = RDBFile::read(dir, dbfilename) { + if let Some(rdb) = rdb_file { + let mut cache = server.cache().lock().unwrap(); + let hash_table = &rdb.databases.get(&0).unwrap().hash_table; + + for (key, db_entry) in hash_table.iter() { + let value = match &db_entry.value { + RedisValue::String(data) => String::from_utf8(data.clone()).unwrap(), + RedisValue::Integer(data) => data.to_string(), + _ => { + unreachable!() + } + }; + let expires_at = if let Some(key_expiry) = &db_entry.expiry { + Some(key_expiry.timestamp) + } else { + None + }; + let cache_entry = CacheEntry { value, expires_at }; + cache.insert(String::from_utf8(key.clone()).unwrap(), cache_entry); + } + } + } } + let port = server.port().to_string(); let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap(); - spawn_cleanup_thread(cache.clone()); + spawn_cleanup_thread(server.cache().clone()); + + let server = Arc::new(Mutex::new(server)); for stream in listener.incoming() { match stream { Ok(stream) => { - let cache_clone = cache.clone(); - let config_clone = Arc::clone(&config); + let server_clone = Arc::clone(&server); + // TODO: Use tokio instead of multi threads to handle multiple clients thread::spawn(|| { - handle_client(stream, cache_clone, config_clone); + handle_client(stream, server_clone); }); } Err(e) => { -- cgit v1.2.3