diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/resp_commands.rs | 15 | ||||
| -rw-r--r-- | src/server.rs | 68 |
2 files changed, 67 insertions, 16 deletions
diff --git a/src/resp_commands.rs b/src/resp_commands.rs index 03ec9dc..453024c 100644 --- a/src/resp_commands.rs +++ b/src/resp_commands.rs @@ -259,7 +259,7 @@ impl RedisCommands { "# Replication\r\nrole:{}master_replid:{}master_repl_offset:{}", server.role(), master.replid.clone().unwrap_or("".to_string()), - master.current_offset.clone().unwrap_or("".to_string()) + master.current_offset.lock().unwrap(), ) .as_bytes() .to_vec(); @@ -270,7 +270,7 @@ impl RedisCommands { "# Replication\r\nrole:{}master_replid:{}master_repl_offset:{}", server.role(), slave.master_replid.clone().unwrap_or("".to_string()), - slave.master_repl_offset.clone().unwrap_or("".to_string()) + slave.master_repl_offset.lock().unwrap() ) .as_bytes() .to_vec(); @@ -278,9 +278,13 @@ impl RedisCommands { } } } - RC::ReplConf((_, _)) => { - resp_bytes!("OK") - } + RC::ReplConf((op1, op2)) => match (op1.to_uppercase().as_str(), op2.as_str()) { + ("GETACK", "*") => { + println!("Did i even get here?"); + resp_bytes!(array => [resp!(bulk "REPLCONF"), resp!(bulk "ACK"), resp!(bulk server.lock().unwrap().repl_offset().to_string())]) + } + _ => resp_bytes!("OK"), + }, RC::Psync((_, _)) => { let server = server.lock().unwrap(); if let RedisServer::Master(master) = &*server { @@ -482,6 +486,7 @@ impl From<RespType> for RedisCommands { let Some(op2) = args.next() else { return Self::Invalid; }; + println!("Hello from here"); Self::ReplConf((op1, op2)) } "PSYNC" => { diff --git a/src/server.rs b/src/server.rs index 5636234..6da29eb 100644 --- a/src/server.rs +++ b/src/server.rs @@ -4,6 +4,7 @@ use std::collections::HashMap; use std::io::{Read, Write}; use std::net::{SocketAddr, TcpStream}; use std::sync::{Arc, Mutex}; +use std::time::Duration; use std::{env, thread}; use crate::shared_cache::SharedCache; @@ -12,9 +13,9 @@ use crate::shared_cache::SharedCache; pub struct MasterServer { pub dir: Option<String>, pub dbfilename: Option<String>, - pub replid: Option<String>, - pub current_offset: Option<String>, pub port: String, + pub replid: Option<String>, + pub current_offset: Arc<Mutex<usize>>, pub cache: SharedCache, replicas: Vec<SlaveServer>, } @@ -26,7 +27,7 @@ impl MasterServer { dbfilename: None, port: "6379".to_string(), replid: Some("8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb".to_string()), - current_offset: Some("0".to_string()), + current_offset: Arc::new(Mutex::new(0)), cache: Arc::new(Mutex::new(HashMap::new())), replicas: vec![], } @@ -60,7 +61,7 @@ pub struct SlaveServer { pub dbfilename: Option<String>, pub port: String, pub master_replid: Option<String>, - pub master_repl_offset: Option<String>, + pub master_repl_offset: Arc<Mutex<usize>>, pub master_host: String, pub master_port: String, pub connection: Option<Arc<Mutex<TcpStream>>>, @@ -79,7 +80,7 @@ impl SlaveServer { dbfilename: None, port, master_replid: Some("8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb".to_string()), - master_repl_offset: Some("0".to_string()), + master_repl_offset: Arc::new(Mutex::new(0)), master_host, master_port, connection, @@ -95,7 +96,7 @@ impl SlaveServer { fn handshake(&mut self) -> Result<(), String> { match self.connect() { Ok(mut stream) => { - let mut buffer = [0; 512]; + let mut buffer = [0; 1024]; let mut send_command = |command: &[u8]| -> Result<(), String> { stream @@ -135,14 +136,16 @@ impl SlaveServer { resp!(bulk "-1") ]))?; + thread::sleep(Duration::new(1, 0)); + // Store the persistent connection let shared_stream = Arc::new(Mutex::new(stream)); self.connection = Some(shared_stream.clone()); // Spawn the background listener thread let cache_clone = self.cache.clone(); + let master_repl_offset = self.master_repl_offset.clone(); thread::spawn(move || { - let mut buffer = [0u8; 1024]; loop { let bytes_read = { let mut stream = shared_stream.lock().unwrap(); @@ -158,12 +161,22 @@ impl SlaveServer { } } }; // stream lock is dropped here - // Parse and execute all commands in the buffer + + dbg!(&bytes_read); + + // Parse and execute all commands in the buffer let mut remaining_bytes = &buffer[..bytes_read]; - while !remaining_bytes.is_empty() { + dbg!(&remaining_bytes); + + let mut flag = true; + + while !remaining_bytes.is_empty() || flag { + flag = false; match parse(remaining_bytes) { Ok((parsed_command, leftover)) => { + dbg!(&parsed_command, leftover); + // Create a temporary slave server for command execution let temp_slave = RedisServer::Slave(SlaveServer::new( "0".to_string(), // dummy port @@ -174,14 +187,26 @@ impl SlaveServer { // Set the cache to our actual cache let mut temp_slave = temp_slave; + temp_slave.set_repl_offset(&master_repl_offset); temp_slave.set_cache(&cache_clone); let server_arc = Arc::new(Mutex::new(temp_slave)); - let _ = RedisCommands::from(parsed_command.clone()) - .execute(server_arc); + let response = RedisCommands::from(parsed_command.clone()) + .execute(server_arc.clone()); + + let mut shared_stream = shared_stream.lock().unwrap(); + + if let RespType::Array(arr) = &parsed_command { + if arr[0].to_resp_bytes() == b"REPLCONF".to_vec() { + shared_stream.write(&response).unwrap(); + } + } // Update remaining bytes for next iteration remaining_bytes = leftover; + server_arc.lock().unwrap().repl_offset_increment( + parsed_command.to_resp_bytes().len(), + ); } Err(_) => { // If parsing fails, break out of the loop @@ -263,6 +288,13 @@ impl RedisServer { } } + pub fn repl_offset(&self) -> usize { + match self { + RedisServer::Master(master) => *master.current_offset.lock().unwrap(), + RedisServer::Slave(slave) => *slave.master_repl_offset.lock().unwrap(), + } + } + pub fn set_cache(&mut self, cache: &SharedCache) { match self { RedisServer::Master(master) => master.cache = cache.clone(), @@ -270,6 +302,20 @@ impl RedisServer { } } + pub fn set_repl_offset(&mut self, repl_offset: &Arc<Mutex<usize>>) { + match self { + RedisServer::Master(master) => master.current_offset = repl_offset.clone(), + RedisServer::Slave(slave) => slave.master_repl_offset = repl_offset.clone(), + } + } + + pub fn repl_offset_increment(&mut self, amount: usize) { + match self { + RedisServer::Master(master) => *master.current_offset.lock().unwrap() += amount, + RedisServer::Slave(slave) => *slave.master_repl_offset.lock().unwrap() += amount, + } + } + pub fn role(&self) -> &str { match self { RedisServer::Master(_) => "master", |
