aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authoromagdy <omar.professional8777@gmail.com>2025-07-23 08:48:54 +0300
committeromagdy <omar.professional8777@gmail.com>2025-07-23 08:48:54 +0300
commit91d8ff4f006d669de8b65f6ee32ce352d0598719 (patch)
treee40bb67575785ea60c77a6fceb4a7490997f0ce9 /src
parentd7d2377772e31fafb56f8107a6a22df4a26846d9 (diff)
downloadredis-rust-91d8ff4f006d669de8b65f6ee32ce352d0598719.tar.xz
redis-rust-91d8ff4f006d669de8b65f6ee32ce352d0598719.zip
feat: Propagate write commands such as SET to replicas and actually execute them on the replicas
Diffstat (limited to 'src')
-rw-r--r--src/server.rs66
1 files changed, 49 insertions, 17 deletions
diff --git a/src/server.rs b/src/server.rs
index 2d7cca7..5636234 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -1,4 +1,5 @@
-use crate::resp_parser::RespType;
+use crate::resp_commands::RedisCommands;
+use crate::resp_parser::{parse, RespType};
use std::collections::HashMap;
use std::io::{Read, Write};
use std::net::{SocketAddr, TcpStream};
@@ -103,7 +104,10 @@ impl SlaveServer {
match stream.read(&mut buffer) {
Ok(0) | Err(_) => return Ok(()), // connection closed or error
- Ok(_) => Ok(()),
+ Ok(_) => {
+ println!("Recieved some bytes here!");
+ Ok(())
+ }
}
};
@@ -136,29 +140,57 @@ impl SlaveServer {
self.connection = Some(shared_stream.clone());
// Spawn the background listener thread
+ let cache_clone = self.cache.clone();
thread::spawn(move || {
let mut buffer = [0u8; 1024];
loop {
- let mut stream = shared_stream.lock().unwrap();
- match stream.read(&mut buffer) {
- Ok(0) => {
- println!("Master disconnected");
- break;
- }
- Ok(n) => {
- println!(
- "REPLICA received: {}",
- String::from_utf8_lossy(&buffer[..n])
- );
+ let bytes_read = {
+ let mut stream = shared_stream.lock().unwrap();
+ match stream.read(&mut buffer) {
+ Ok(0) => {
+ println!("Master disconnected");
+ break;
+ }
+ Ok(n) => n,
+ Err(e) => {
+ eprintln!("Error reading from master: {}", e);
+ break;
+ }
}
- Err(e) => {
- eprintln!("Error reading from master: {}", e);
- break;
+ }; // stream lock is dropped here
+ // Parse and execute all commands in the buffer
+ let mut remaining_bytes = &buffer[..bytes_read];
+
+ while !remaining_bytes.is_empty() {
+ match parse(remaining_bytes) {
+ Ok((parsed_command, leftover)) => {
+ // Create a temporary slave server for command execution
+ let temp_slave = RedisServer::Slave(SlaveServer::new(
+ "0".to_string(), // dummy port
+ "localhost".to_string(), // dummy host
+ "6379".to_string(), // dummy master port
+ None, // no connection needed for execution
+ ));
+
+ // Set the cache to our actual cache
+ let mut temp_slave = temp_slave;
+ temp_slave.set_cache(&cache_clone);
+ let server_arc = Arc::new(Mutex::new(temp_slave));
+
+ let _ = RedisCommands::from(parsed_command.clone())
+ .execute(server_arc);
+
+ // Update remaining bytes for next iteration
+ remaining_bytes = leftover;
+ }
+ Err(_) => {
+ // If parsing fails, break out of the loop
+ break;
+ }
}
}
}
});
-
Ok(())
}
Err(e) => Err(format!("Master node doesn't exist: {}", e)),