From 91d8ff4f006d669de8b65f6ee32ce352d0598719 Mon Sep 17 00:00:00 2001 From: omagdy Date: Wed, 23 Jul 2025 08:48:54 +0300 Subject: feat: Propagate write commands such as SET to replicas and actually execute them on the replicas --- src/server.rs | 66 ++++++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 49 insertions(+), 17 deletions(-) diff --git a/src/server.rs b/src/server.rs index 2d7cca7..5636234 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,4 +1,5 @@ -use crate::resp_parser::RespType; +use crate::resp_commands::RedisCommands; +use crate::resp_parser::{parse, RespType}; use std::collections::HashMap; use std::io::{Read, Write}; use std::net::{SocketAddr, TcpStream}; @@ -103,7 +104,10 @@ impl SlaveServer { match stream.read(&mut buffer) { Ok(0) | Err(_) => return Ok(()), // connection closed or error - Ok(_) => Ok(()), + Ok(_) => { + println!("Recieved some bytes here!"); + Ok(()) + } } }; @@ -136,29 +140,57 @@ impl SlaveServer { self.connection = Some(shared_stream.clone()); // Spawn the background listener thread + let cache_clone = self.cache.clone(); thread::spawn(move || { let mut buffer = [0u8; 1024]; loop { - let mut stream = shared_stream.lock().unwrap(); - match stream.read(&mut buffer) { - Ok(0) => { - println!("Master disconnected"); - break; - } - Ok(n) => { - println!( - "REPLICA received: {}", - String::from_utf8_lossy(&buffer[..n]) - ); + let bytes_read = { + let mut stream = shared_stream.lock().unwrap(); + match stream.read(&mut buffer) { + Ok(0) => { + println!("Master disconnected"); + break; + } + Ok(n) => n, + Err(e) => { + eprintln!("Error reading from master: {}", e); + break; + } } - Err(e) => { - eprintln!("Error reading from master: {}", e); - break; + }; // stream lock is dropped here + // Parse and execute all commands in the buffer + let mut remaining_bytes = &buffer[..bytes_read]; + + while !remaining_bytes.is_empty() { + match parse(remaining_bytes) { + Ok((parsed_command, leftover)) => { + // Create a temporary slave server for command execution + let temp_slave = RedisServer::Slave(SlaveServer::new( + "0".to_string(), // dummy port + "localhost".to_string(), // dummy host + "6379".to_string(), // dummy master port + None, // no connection needed for execution + )); + + // Set the cache to our actual cache + let mut temp_slave = temp_slave; + temp_slave.set_cache(&cache_clone); + let server_arc = Arc::new(Mutex::new(temp_slave)); + + let _ = RedisCommands::from(parsed_command.clone()) + .execute(server_arc); + + // Update remaining bytes for next iteration + remaining_bytes = leftover; + } + Err(_) => { + // If parsing fails, break out of the loop + break; + } } } } }); - Ok(()) } Err(e) => Err(format!("Master node doesn't exist: {}", e)), -- cgit v1.2.3