diff options
| author | omagdy <omar.professional8777@gmail.com> | 2025-07-23 22:39:16 +0300 |
|---|---|---|
| committer | omagdy <omar.professional8777@gmail.com> | 2025-07-23 22:39:16 +0300 |
| commit | 561fb8d783cc000b7b9cc204e10618464c092e18 (patch) | |
| tree | 2d5cd5528ad5e6bdec3736bfedb74189e2757829 /src/server.rs | |
| parent | 91d8ff4f006d669de8b65f6ee32ce352d0598719 (diff) | |
| download | redis-rust-561fb8d783cc000b7b9cc204e10618464c092e18.tar.xz redis-rust-561fb8d783cc000b7b9cc204e10618464c092e18.zip | |
refactor: did some better design choices
Diffstat (limited to 'src/server.rs')
| -rw-r--r-- | src/server.rs | 68 |
1 files changed, 57 insertions, 11 deletions
diff --git a/src/server.rs b/src/server.rs index 5636234..6da29eb 100644 --- a/src/server.rs +++ b/src/server.rs @@ -4,6 +4,7 @@ use std::collections::HashMap; use std::io::{Read, Write}; use std::net::{SocketAddr, TcpStream}; use std::sync::{Arc, Mutex}; +use std::time::Duration; use std::{env, thread}; use crate::shared_cache::SharedCache; @@ -12,9 +13,9 @@ use crate::shared_cache::SharedCache; pub struct MasterServer { pub dir: Option<String>, pub dbfilename: Option<String>, - pub replid: Option<String>, - pub current_offset: Option<String>, pub port: String, + pub replid: Option<String>, + pub current_offset: Arc<Mutex<usize>>, pub cache: SharedCache, replicas: Vec<SlaveServer>, } @@ -26,7 +27,7 @@ impl MasterServer { dbfilename: None, port: "6379".to_string(), replid: Some("8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb".to_string()), - current_offset: Some("0".to_string()), + current_offset: Arc::new(Mutex::new(0)), cache: Arc::new(Mutex::new(HashMap::new())), replicas: vec![], } @@ -60,7 +61,7 @@ pub struct SlaveServer { pub dbfilename: Option<String>, pub port: String, pub master_replid: Option<String>, - pub master_repl_offset: Option<String>, + pub master_repl_offset: Arc<Mutex<usize>>, pub master_host: String, pub master_port: String, pub connection: Option<Arc<Mutex<TcpStream>>>, @@ -79,7 +80,7 @@ impl SlaveServer { dbfilename: None, port, master_replid: Some("8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb".to_string()), - master_repl_offset: Some("0".to_string()), + master_repl_offset: Arc::new(Mutex::new(0)), master_host, master_port, connection, @@ -95,7 +96,7 @@ impl SlaveServer { fn handshake(&mut self) -> Result<(), String> { match self.connect() { Ok(mut stream) => { - let mut buffer = [0; 512]; + let mut buffer = [0; 1024]; let mut send_command = |command: &[u8]| -> Result<(), String> { stream @@ -135,14 +136,16 @@ impl SlaveServer { resp!(bulk "-1") ]))?; + thread::sleep(Duration::new(1, 0)); + // Store the persistent connection let shared_stream = Arc::new(Mutex::new(stream)); self.connection = Some(shared_stream.clone()); // Spawn the background listener thread let cache_clone = self.cache.clone(); + let master_repl_offset = self.master_repl_offset.clone(); thread::spawn(move || { - let mut buffer = [0u8; 1024]; loop { let bytes_read = { let mut stream = shared_stream.lock().unwrap(); @@ -158,12 +161,22 @@ impl SlaveServer { } } }; // stream lock is dropped here - // Parse and execute all commands in the buffer + + dbg!(&bytes_read); + + // Parse and execute all commands in the buffer let mut remaining_bytes = &buffer[..bytes_read]; - while !remaining_bytes.is_empty() { + dbg!(&remaining_bytes); + + let mut flag = true; + + while !remaining_bytes.is_empty() || flag { + flag = false; match parse(remaining_bytes) { Ok((parsed_command, leftover)) => { + dbg!(&parsed_command, leftover); + // Create a temporary slave server for command execution let temp_slave = RedisServer::Slave(SlaveServer::new( "0".to_string(), // dummy port @@ -174,14 +187,26 @@ impl SlaveServer { // Set the cache to our actual cache let mut temp_slave = temp_slave; + temp_slave.set_repl_offset(&master_repl_offset); temp_slave.set_cache(&cache_clone); let server_arc = Arc::new(Mutex::new(temp_slave)); - let _ = RedisCommands::from(parsed_command.clone()) - .execute(server_arc); + let response = RedisCommands::from(parsed_command.clone()) + .execute(server_arc.clone()); + + let mut shared_stream = shared_stream.lock().unwrap(); + + if let RespType::Array(arr) = &parsed_command { + if arr[0].to_resp_bytes() == b"REPLCONF".to_vec() { + shared_stream.write(&response).unwrap(); + } + } // Update remaining bytes for next iteration remaining_bytes = leftover; + server_arc.lock().unwrap().repl_offset_increment( + parsed_command.to_resp_bytes().len(), + ); } Err(_) => { // If parsing fails, break out of the loop @@ -263,6 +288,13 @@ impl RedisServer { } } + pub fn repl_offset(&self) -> usize { + match self { + RedisServer::Master(master) => *master.current_offset.lock().unwrap(), + RedisServer::Slave(slave) => *slave.master_repl_offset.lock().unwrap(), + } + } + pub fn set_cache(&mut self, cache: &SharedCache) { match self { RedisServer::Master(master) => master.cache = cache.clone(), @@ -270,6 +302,20 @@ impl RedisServer { } } + pub fn set_repl_offset(&mut self, repl_offset: &Arc<Mutex<usize>>) { + match self { + RedisServer::Master(master) => master.current_offset = repl_offset.clone(), + RedisServer::Slave(slave) => slave.master_repl_offset = repl_offset.clone(), + } + } + + pub fn repl_offset_increment(&mut self, amount: usize) { + match self { + RedisServer::Master(master) => *master.current_offset.lock().unwrap() += amount, + RedisServer::Slave(slave) => *slave.master_repl_offset.lock().unwrap() += amount, + } + } + pub fn role(&self) -> &str { match self { RedisServer::Master(_) => "master", |
