aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/main.rs44
-rw-r--r--src/rdb.rs14
-rw-r--r--src/resp_commands.rs122
-rw-r--r--src/resp_parser.rs17
-rw-r--r--src/server.rs718
-rw-r--r--src/shared_cache.rs3
-rw-r--r--tests/test_commands.rs58
-rw-r--r--tests/test_parse_simple_string.rs9
8 files changed, 694 insertions, 291 deletions
diff --git a/src/main.rs b/src/main.rs
index 25bc6c9..3fe5559 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -14,15 +14,16 @@ use std::{
use codecrafters_redis::{
rdb::{KeyExpiry, ParseError, RDBFile, RedisValue},
resp_bytes,
+ server::SharedMut,
shared_cache::*,
};
-use codecrafters_redis::{resp_commands::RedisCommands, server::RedisServer};
+use codecrafters_redis::{resp_commands::RedisCommand, server::RedisServer};
use codecrafters_redis::{
resp_parser::{parse, RespType},
server::SlaveServer,
};
-fn spawn_cleanup_thread(cache: SharedCache) {
+fn spawn_cleanup_thread(cache: SharedMut<Cache>) {
let cache_clone = cache.clone();
std::thread::spawn(move || {
loop {
@@ -68,12 +69,25 @@ fn handle_client(mut stream: TcpStream, server: Arc<Mutex<RedisServer>>) {
};
let request = parse(&buffer).unwrap();
- let server_clone = Arc::clone(&server);
- let response = RedisCommands::from(request.0.clone()).execute(server_clone);
+
+ let mut server = server.lock().unwrap();
+
+ // Big State vars
+ let cache = server.cache().clone();
+ let server_state = server.get_server_state().clone();
+ let config = server.config();
+ let brodcaster = server.as_broadcaster();
+
+ let response = RedisCommand::from(request.0.clone()).execute(
+ cache.clone(),
+ config,
+ server_state,
+ brodcaster,
+ );
let mut request_command = "".to_string();
- // FIXME: Find a solution for this mess!!
+ // FIXME: Find a solution for this mess!! (Design better API)
match &request.0 {
RespType::Array(arr) => {
if let RespType::BulkString(s) = arr[0].clone() {
@@ -84,22 +98,28 @@ fn handle_client(mut stream: TcpStream, server: Arc<Mutex<RedisServer>>) {
}
// Store the persistent connection
- let shared_stream = Arc::new(Mutex::new(
- stream.try_clone().expect("What could go wrong? :)"),
- ));
+ // let shared_stream = Arc::new(Mutex::new(
+ // stream.try_clone().expect("What could go wrong? :)"),
+ // ));
// if this true immediately write and send back rdb file after response
// HACK: This just feels wrong I feel this shouldn't be handled here and should be handled
// in the exexute command
if request_command.starts_with("PSYNC") {
stream.write(&response).unwrap();
- let _ = write_rdb_to_stream(&mut stream);
- // handshake completed and I should add the server sending me the handshake to my replicas
let replica_addr = stream
.peer_addr()
.expect("This shouldn't fail right? right?? :)");
- let mut server = server.lock().unwrap();
- server.add_replica(replica_addr, shared_stream);
+
+ server.add_replica(
+ replica_addr,
+ Arc::new(Mutex::new(
+ stream.try_clone().expect("What could go wrong? :)"),
+ )),
+ );
+
+ let _ = write_rdb_to_stream(&mut stream);
+ // handshake completed and I should add the server sending me the handshake to my replicas
} else {
// write respose back to the client
stream.write(&response).unwrap();
diff --git a/src/rdb.rs b/src/rdb.rs
index 0195cb9..272d168 100644
--- a/src/rdb.rs
+++ b/src/rdb.rs
@@ -600,6 +600,20 @@ impl FromBytes for RDBFile {
let mut total_consumed = 0;
let mut databases = HashMap::new();
+ // special case when rdb file is sent by the master to replicas in the following format
+ // $<length>/r/n<bytes_of_length_length>
+ if bytes[0] == '$' as u8 {
+ // consume up to the CRLF
+ let (consumed, rest) = bytes
+ .windows(2)
+ .position(|window| window == b"\r\n")
+ .map(|pos| (&bytes[..pos], &bytes[pos + 2..]))
+ .ok_or(ParseError::UnexpectedEof)?;
+ println!("Consumed {:?}", consumed);
+ remaining = rest;
+ total_consumed += consumed.len();
+ }
+
// 1. Parse the RDB header ("REDIS" + version)
let (header, consumed) = RDBHeader::from_bytes(remaining)?;
total_consumed += consumed;
diff --git a/src/resp_commands.rs b/src/resp_commands.rs
index 453024c..981f7aa 100644
--- a/src/resp_commands.rs
+++ b/src/resp_commands.rs
@@ -1,7 +1,6 @@
use crate::server::*;
use crate::{resp_parser::*, shared_cache::*};
use regex::Regex;
-use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone)]
@@ -110,7 +109,7 @@ fn extract_string(resp: &RespType) -> Option<String> {
}
}
-pub enum RedisCommands {
+pub enum RedisCommand {
Ping,
Echo(String),
Get(String),
@@ -123,15 +122,20 @@ pub enum RedisCommands {
Invalid,
}
-impl RedisCommands {
- pub fn execute(self, server: Arc<Mutex<RedisServer>>) -> Vec<u8> {
- use RedisCommands as RC;
+impl RedisCommand {
+ pub fn execute<'a>(
+ self,
+ cache: SharedMut<Cache>,
+ config: Shared<ServerConfig>,
+ server_state: ServerState,
+ broadcaster: Option<SharedMut<&mut dyn CanBroadcast>>,
+ ) -> Vec<u8> {
+ use RedisCommand as RC;
match self {
RC::Ping => resp_bytes!("PONG"),
RC::Echo(echo_string) => resp_bytes!(echo_string),
RC::Get(key) => {
- let server = server.lock().unwrap();
- let mut cache = server.cache().lock().unwrap();
+ let mut cache = cache.lock().unwrap();
match cache.get(&key).cloned() {
Some(entry) => {
@@ -146,8 +150,7 @@ impl RedisCommands {
}
}
RC::Set(command) => {
- let mut server = server.lock().unwrap();
- let mut cache = server.cache().lock().unwrap();
+ let mut cache = cache.lock().unwrap();
// Check conditions (NX/XX)
let key_exists = cache.contains_key(&command.key);
@@ -190,17 +193,24 @@ impl RedisCommands {
},
);
- // Broadcast SET to replicas after mutating local state
- let broadcast_cmd = resp_bytes!(array => [
- resp!(bulk "SET"),
- resp!(bulk command.key),
- resp!(bulk command.value)
- ]);
-
- // Unlock the mutex so that I can access to broadcast the messaage
- drop(cache);
+ println!(
+ "My role is {:?} and I just inserted {} to {}",
+ server_state.role, &command.key, command.value
+ );
- server.broadcast_command(&broadcast_cmd);
+ // Broadcast to replicas if this is a master
+ if let Some(broadcaster) = broadcaster {
+ // Broadcast SET to replicas after mutating local state
+ let broadcast_cmd = resp_bytes!(array => [
+ resp!(bulk "SET"),
+ resp!(bulk command.key),
+ resp!(bulk command.value)
+ ]);
+ broadcaster
+ .lock()
+ .unwrap()
+ .broadcast_command_to_replicas(&broadcast_cmd);
+ }
if !command.get_old_value {
return resp_bytes!("OK");
@@ -213,8 +223,9 @@ impl RedisCommands {
}
RC::ConfigGet(s) => {
use RespType as RT;
- let server = server.lock().unwrap();
- if let (Some(dir), Some(dbfilename)) = (server.dir(), server.dbfilename()) {
+ if let (Some(dir), Some(dbfilename)) =
+ (config.dir.clone(), config.dbfilename.clone())
+ {
match s.as_str() {
"dir" => RT::Array(vec![
RT::BulkString(s.as_bytes().to_vec()),
@@ -236,9 +247,7 @@ impl RedisCommands {
use RespType as RT;
let query = query.replace('*', ".*");
-
- let server = server.lock().unwrap();
- let cache = server.cache().lock().unwrap();
+ let cache = cache.lock().unwrap();
let regex = Regex::new(&query).unwrap();
let matching_keys: Vec<RT> = cache
.keys()
@@ -252,53 +261,42 @@ impl RedisCommands {
}
RC::Info(_sub_command) => {
use RespType as RT;
- let server = server.lock().unwrap();
- match &*server {
- RedisServer::Master(master) => {
- let response = format!(
- "# Replication\r\nrole:{}master_replid:{}master_repl_offset:{}",
- server.role(),
- master.replid.clone().unwrap_or("".to_string()),
- master.current_offset.lock().unwrap(),
- )
- .as_bytes()
- .to_vec();
- RT::BulkString(response).to_resp_bytes()
- }
- RedisServer::Slave(slave) => {
- let response = format!(
- "# Replication\r\nrole:{}master_replid:{}master_repl_offset:{}",
- server.role(),
- slave.master_replid.clone().unwrap_or("".to_string()),
- slave.master_repl_offset.lock().unwrap()
- )
- .as_bytes()
- .to_vec();
- RT::BulkString(response).to_resp_bytes()
- }
- }
+ let role_str = match server_state.role {
+ ServerRole::Master => "master",
+ ServerRole::Slave => "slave",
+ };
+
+ let response = format!(
+ "# Replication\r\nrole:{}\r\nmaster_replid:{}\r\nmaster_repl_offset:{}",
+ role_str, server_state.repl_id, server_state.repl_offset,
+ )
+ .into_bytes();
+
+ RT::BulkString(response).to_resp_bytes()
}
RC::ReplConf((op1, op2)) => match (op1.to_uppercase().as_str(), op2.as_str()) {
("GETACK", "*") => {
println!("Did i even get here?");
- resp_bytes!(array => [resp!(bulk "REPLCONF"), resp!(bulk "ACK"), resp!(bulk server.lock().unwrap().repl_offset().to_string())])
+ resp_bytes!(array => [resp!(bulk "REPLCONF"), resp!(bulk "ACK"), resp!(bulk server_state.repl_offset.to_string())])
}
_ => resp_bytes!("OK"),
},
RC::Psync((_, _)) => {
- let server = server.lock().unwrap();
- if let RedisServer::Master(master) = &*server {
- let response = format!(
- "FULLRESYNC {} 0",
- master.replid.clone().unwrap_or("".to_string()),
- );
- resp_bytes!(response)
- } else {
- // TODO: Find a way to report this error back up the program trace
- unreachable!("I should never come here")
+ // This should only be called on masters
+ match server_state.role {
+ ServerRole::Master => {
+ let response = format!("FULLRESYNC {} 0", server_state.repl_id);
+ resp_bytes!(response)
+ }
+ // This shouldn't happen, but handle gracefully
+ ServerRole::Slave => {
+ // TODO: I actually forgot that you could send error messages with redis do
+ // more of this across the whole codebase when it makes sense
+ resp_bytes!(error "ERR PSYNC not supported on slave")
+ }
}
}
- RC::Invalid => todo!(),
+ RC::Invalid => resp_bytes!(error "ERR Invalid Command"),
}
}
}
@@ -403,7 +401,7 @@ impl SetOptionParser {
}
}
-impl From<RespType> for RedisCommands {
+impl From<RespType> for RedisCommand {
fn from(value: RespType) -> Self {
// Alternative approach using a more functional style with iterators
let RespType::Array(command) = value else {
diff --git a/src/resp_parser.rs b/src/resp_parser.rs
index 952176c..646d2b1 100644
--- a/src/resp_parser.rs
+++ b/src/resp_parser.rs
@@ -380,18 +380,17 @@ pub fn parse_bulk_strings(bytes: &[u8]) -> Result<(RespType, &[u8]), RespError>
return Err(RespError::UnexpectedEnd);
}
- let mut bulk_string: Vec<u8> = Vec::with_capacity(length as usize);
+ let bulk_string = remained[..length as usize].to_vec();
+ let remaining_after_string = &remained[length as usize..];
- for i in 0..length {
- bulk_string.push(remained[i as usize]);
- }
-
- let consumed = RespType::BulkString(bulk_string);
-
- if !(&remained[length as usize..]).starts_with(b"\r\n") {
+ if !remaining_after_string.starts_with(b"\r\n") {
return Err(RespError::UnexpectedEnd);
}
- return Ok((consumed, &remained[length as usize + 2..]));
+
+ Ok((
+ RespType::BulkString(bulk_string),
+ &remaining_after_string[2..],
+ ))
}
[] => Err(RespError::Custom(String::from("Empty data"))),
}
diff --git a/src/server.rs b/src/server.rs
index 6da29eb..8c09f73 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -1,96 +1,279 @@
-use crate::resp_commands::RedisCommands;
+use crate::rdb::{FromBytes, RDBFile};
+use crate::resp_commands::RedisCommand;
use crate::resp_parser::{parse, RespType};
use std::collections::HashMap;
use std::io::{Read, Write};
use std::net::{SocketAddr, TcpStream};
use std::sync::{Arc, Mutex};
-use std::time::Duration;
use std::{env, thread};
-use crate::shared_cache::SharedCache;
+use crate::shared_cache::Cache;
+
+// TODO: add functions to access member variables instead of accessing them directly
#[derive(Debug, Clone)]
-pub struct MasterServer {
+pub struct ServerConfig {
pub dir: Option<String>,
pub dbfilename: Option<String>,
pub port: String,
- pub replid: Option<String>,
- pub current_offset: Arc<Mutex<usize>>,
- pub cache: SharedCache,
- replicas: Vec<SlaveServer>,
+}
+
+// Server state that commands might need
+#[derive(Debug, Clone)]
+pub struct ServerState {
+ pub role: ServerRole,
+ pub repl_id: String,
+ pub repl_offset: usize,
+}
+
+#[derive(Debug, Clone)]
+pub enum ServerRole {
+ Master,
+ Slave,
+}
+
+// Trait for broadcasting - only masters can do this
+pub trait CanBroadcast: Send {
+ fn broadcast_command_to_replicas(&mut self, command: &[u8]);
+}
+
+// Implementation for Master
+impl CanBroadcast for MasterServer {
+ fn broadcast_command_to_replicas(&mut self, command: &[u8]) {
+ self.broadcast_command(command);
+ }
+}
+
+// Helper methods to extract server state
+impl MasterServer {
+ pub fn get_server_state(&self) -> ServerState {
+ ServerState {
+ role: ServerRole::Master,
+ repl_id: self.get_replid(),
+ repl_offset: self.get_repl_offset(),
+ }
+ }
+}
+
+impl SlaveServer {
+ pub fn get_server_state(&self) -> ServerState {
+ let state = self.state.lock().unwrap();
+ ServerState {
+ role: ServerRole::Slave,
+ repl_id: state.master_replid.clone(),
+ repl_offset: state.master_repl_offset,
+ }
+ }
+}
+
+impl RedisServer {
+ pub fn get_server_state(&self) -> ServerState {
+ match self {
+ RedisServer::Master(master) => master.get_server_state(),
+ RedisServer::Slave(slave) => slave.get_server_state(),
+ }
+ }
+
+ pub fn as_broadcaster(&mut self) -> Option<SharedMut<&mut dyn CanBroadcast>> {
+ match self {
+ RedisServer::Master(master) => {
+ Some(Arc::new(Mutex::new(master as &mut dyn CanBroadcast)))
+ }
+ RedisServer::Slave(_) => None,
+ }
+ }
+}
+
+#[derive(Debug)]
+pub struct MasterState {
+ pub replid: String,
+ pub current_offset: usize,
+ pub replicas: Vec<ReplicaConnection>,
+}
+
+// Slave-specific state
+#[derive(Debug)]
+pub struct SlaveState {
+ pub master_replid: String,
+ pub master_repl_offset: usize,
+ pub master_host: String,
+ pub master_port: String,
+ pub connection: Option<TcpStream>,
+}
+
+#[derive(Debug)]
+pub struct ReplicaConnection {
+ pub port: String,
+ pub connection: Arc<Mutex<TcpStream>>,
+}
+
+pub type SharedMut<T> = Arc<Mutex<T>>;
+pub type Shared<T> = Arc<T>;
+
+#[derive(Debug, Clone)]
+pub struct MasterServer {
+ config: Shared<ServerConfig>,
+ state: SharedMut<MasterState>,
+ cache: SharedMut<Cache>,
}
impl MasterServer {
fn new() -> Self {
- Self {
+ let config = Arc::new(ServerConfig {
dir: None,
dbfilename: None,
port: "6379".to_string(),
- replid: Some("8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb".to_string()),
- current_offset: Arc::new(Mutex::new(0)),
- cache: Arc::new(Mutex::new(HashMap::new())),
+ });
+
+ let state = Arc::new(Mutex::new(MasterState {
+ replid: "8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb".to_string(),
+ current_offset: 0,
replicas: vec![],
+ }));
+
+ let cache = Arc::new(Mutex::new(HashMap::new()));
+
+ Self {
+ config,
+ state,
+ cache,
}
}
fn port(&self) -> &str {
- &self.port
+ &self.config.port
}
pub fn broadcast_command(&mut self, command: &[u8]) {
- println!("Hello from brodcast");
- self.replicas.retain(|replica| {
- if let Some(conn) = &replica.connection {
- let mut conn = conn.lock().unwrap();
- if let Err(e) = conn.write_all(command) {
- eprintln!("Failed to send to replica {}: {}", replica.port, e);
- false // Drop dead connections
- } else {
- true
- }
+ let mut state = self.state.lock().unwrap();
+
+ state.replicas.retain(|replica| {
+ let mut conn = replica.connection.lock().unwrap();
+ if let Err(e) = conn.write_all(command) {
+ eprintln!("Failed to send to replica {}: {}", replica.port, e);
+ false // Drop dead connections
} else {
- false
+ true
}
- });
+ })
+ }
+
+ pub fn add_replica(&self, replica_addr: SocketAddr, connection: Arc<Mutex<TcpStream>>) {
+ let replica = ReplicaConnection {
+ port: replica_addr.port().to_string(),
+ connection,
+ };
+
+ self.state.lock().unwrap().replicas.push(replica);
+ }
+
+ pub fn get_repl_offset(&self) -> usize {
+ self.state.lock().unwrap().current_offset
+ }
+
+ pub fn increment_repl_offset(&self, amount: usize) {
+ self.state.lock().unwrap().current_offset += amount;
+ }
+
+ pub fn get_replid(&self) -> String {
+ self.state.lock().unwrap().replid.clone()
}
}
#[derive(Debug, Clone)]
pub struct SlaveServer {
- pub dir: Option<String>,
- pub dbfilename: Option<String>,
- pub port: String,
- pub master_replid: Option<String>,
- pub master_repl_offset: Arc<Mutex<usize>>,
- pub master_host: String,
- pub master_port: String,
- pub connection: Option<Arc<Mutex<TcpStream>>>,
- pub cache: SharedCache,
+ config: Shared<ServerConfig>,
+ state: SharedMut<SlaveState>,
+ cache: SharedMut<Cache>,
+}
+
+fn read_rdb_from_stream<R: Read>(reader: &mut R) -> Result<Vec<u8>, String> {
+ let mut buffer = [0u8; 1024];
+
+ // Read until we get the length prefix ($<length>\r\n)
+ let mut length_bytes = Vec::new();
+
+ loop {
+ let bytes_read = reader
+ .read(&mut buffer)
+ .map_err(|e| format!("Failed to read: {}", e))?;
+ if bytes_read == 0 {
+ return Err("Connection closed while reading RDB length".to_string());
+ }
+
+ length_bytes.extend_from_slice(&buffer[..bytes_read]);
+
+ if length_bytes.len() >= 2 && &length_bytes[length_bytes.len() - 2..] == b"\r\n" {
+ break;
+ }
+ }
+
+ // Parse the length prefix ($<length>\r\n)
+ let (resp, remaining) =
+ parse(&length_bytes).map_err(|e| format!("Failed to parse RDB length: {:?}", e))?;
+ let length = match resp {
+ RespType::BulkString(_) => {
+ let len_str = String::from_utf8_lossy(&length_bytes[1..length_bytes.len() - 2]);
+ len_str
+ .parse::<usize>()
+ .map_err(|e| format!("Invalid RDB length: {}", e))?
+ }
+ _ => return Err("Expected bulk string for RDB length".to_string()),
+ };
+
+ // Read the exact number of bytes for the RDB file
+ let mut rdb_bytes = vec![0u8; length];
+ let mut total_read = remaining.len();
+ rdb_bytes[..remaining.len()].copy_from_slice(remaining);
+
+ while total_read < length {
+ let bytes_read = reader
+ .read(&mut buffer)
+ .map_err(|e| format!("Failed to read RDB: {}", e))?;
+ if bytes_read == 0 {
+ return Err("Connection closed while reading RDB file".to_string());
+ }
+ let end = (total_read + bytes_read).min(length);
+ rdb_bytes[total_read..end].copy_from_slice(&buffer[..(end - total_read)]);
+ total_read += bytes_read;
+ }
+
+ Ok(rdb_bytes)
}
impl SlaveServer {
- fn new(
- port: String,
- master_host: String,
- master_port: String,
- connection: Option<Arc<Mutex<TcpStream>>>,
- ) -> Self {
- Self {
+ fn new(port: String, master_host: String, master_port: String) -> Self {
+ let config = Arc::new(ServerConfig {
dir: None,
dbfilename: None,
port,
- master_replid: Some("8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb".to_string()),
- master_repl_offset: Arc::new(Mutex::new(0)),
+ });
+
+ let state = Arc::new(Mutex::new(SlaveState {
+ master_replid: "8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb".to_string(),
+ master_repl_offset: 0,
master_host,
master_port,
- connection,
- cache: Arc::new(Mutex::new(HashMap::new())),
+ connection: None,
+ }));
+
+ let cache = Arc::new(Mutex::new(HashMap::new()));
+
+ Self {
+ config,
+ state,
+ cache,
}
}
+ pub fn increment_repl_offset(&mut self, amount: usize) {
+ self.state.lock().unwrap().master_repl_offset += amount;
+ }
+
fn connect(&self) -> Result<TcpStream, std::io::Error> {
- let master_address = format!("{}:{}", self.master_host, self.master_port);
- return TcpStream::connect(master_address);
+ let state = self.state.lock().unwrap();
+ let master_address = format!("{}:{}", state.master_host, state.master_port);
+ TcpStream::connect(master_address)
}
fn handshake(&mut self) -> Result<(), String> {
@@ -98,128 +281,254 @@ impl SlaveServer {
Ok(mut stream) => {
let mut buffer = [0; 1024];
- let mut send_command = |command: &[u8]| -> Result<(), String> {
+ let mut send_command = |command: &[u8], read: bool| -> Result<(), String> {
stream
.write_all(command)
.map_err(|e| format!("Failed to send: {}", e))?;
- match stream.read(&mut buffer) {
- Ok(0) | Err(_) => return Ok(()), // connection closed or error
- Ok(_) => {
- println!("Recieved some bytes here!");
- Ok(())
+ if read {
+ match stream.read(&mut buffer) {
+ Ok(0) | Err(_) => return Ok(()), // connection closed or error
+ Ok(_) => {
+ println!("Recieved some bytes here!");
+ Ok(())
+ }
}
+ } else {
+ Ok(())
}
};
- // PING
- send_command(&resp_bytes!(array => [resp!(bulk "PING")]))?;
-
- // REPLCONF listening-port <PORT>
- send_command(&resp_bytes!(array => [
- resp!(bulk "REPLCONF"),
- resp!(bulk "listening-port"),
- resp!(bulk self.port.clone())
- ]))?;
+ // Step1: PING
+ send_command(&resp_bytes!(array => [resp!(bulk "PING")]), true)?;
+
+ let port = self.config.port.clone();
+ // Step2: REPLCONF listening-port <PORT>
+ send_command(
+ &resp_bytes!(array => [
+ resp!(bulk "REPLCONF"),
+ resp!(bulk "listening-port"),
+ resp!(bulk port)
+ ]),
+ true,
+ )?;
+
+ // Step3: REPLCONF capa psync2
+ send_command(
+ &resp_bytes!(array => [
+ resp!(bulk "REPLCONF"),
+ resp!(bulk "capa"),
+ resp!(bulk "psync2")
+ ]),
+ true,
+ )?;
+
+ // Step 4: PSYNC <REPL_ID> <REPL_OFFSSET>
+ send_command(
+ &resp_bytes!(array => [
+ resp!(bulk "PSYNC"),
+ resp!(bulk "?"),
+ resp!(bulk "-1")
+ ]),
+ false,
+ )?;
+
+ // Step 5: Read FULLRESYNC response
+ let bytes_read = stream
+ .read(&mut buffer)
+ .map_err(|e| format!("Failed to read FULLRESYNC: {}", e))?;
+ let (parsed, mut rest) = parse(&buffer[..bytes_read])
+ .map_err(|e| format!("Failed to parse FULLRESYNC: {:?}", e))?;
+ match parsed {
+ RespType::SimpleString(s) if s.starts_with("FULLRESYNC") => {
+ // Expected response
+ }
+ _ => return Err("Invalid FULLRESYNC response".to_string()),
+ }
- // REPLCONF capa psync2
- send_command(&resp_bytes!(array => [
- resp!(bulk "REPLCONF"),
- resp!(bulk "capa"),
- resp!(bulk "psync2")
- ]))?;
+ println!("rest: {:?}", rest);
- // PSYNC <REPL_ID> <REPL_OFFSSET>
- send_command(&resp_bytes!(array => [
- resp!(bulk "PSYNC"),
- resp!(bulk "?"),
- resp!(bulk "-1")
- ]))?;
+ println!("FULLRESYNC response bytes read: {}", bytes_read);
- thread::sleep(Duration::new(1, 0));
+ // So there is an interesting behaviour where the FULLRESYNC + RDB and if you are
+ // really lucky the REPLCONF would all get sent in one TCP segment so I should
+ // assume I would get nice segments refelecting each command
+ if !rest.is_empty() {
+ // TODO: Sync the rdb_file with the slave's cache
+ // TODO: Find a way to propagate the error up the stack by using anyhow or something
+ let (rdb_file, bytes_consumed) = RDBFile::from_bytes(rest).unwrap();
+ rest = &rest[bytes_consumed..];
+ println!("rdb bytes: {}", bytes_consumed);
+ println!("remaining btyes after rdb: {}", rest.len());
+ }
// Store the persistent connection
- let shared_stream = Arc::new(Mutex::new(stream));
- self.connection = Some(shared_stream.clone());
-
- // Spawn the background listener thread
- let cache_clone = self.cache.clone();
- let master_repl_offset = self.master_repl_offset.clone();
- thread::spawn(move || {
- loop {
- let bytes_read = {
- let mut stream = shared_stream.lock().unwrap();
- match stream.read(&mut buffer) {
- Ok(0) => {
- println!("Master disconnected");
- break;
- }
- Ok(n) => n,
- Err(e) => {
- eprintln!("Error reading from master: {}", e);
- break;
- }
- }
- }; // stream lock is dropped here
-
- dbg!(&bytes_read);
-
- // Parse and execute all commands in the buffer
- let mut remaining_bytes = &buffer[..bytes_read];
-
- dbg!(&remaining_bytes);
-
- let mut flag = true;
-
- while !remaining_bytes.is_empty() || flag {
- flag = false;
- match parse(remaining_bytes) {
- Ok((parsed_command, leftover)) => {
- dbg!(&parsed_command, leftover);
-
- // Create a temporary slave server for command execution
- let temp_slave = RedisServer::Slave(SlaveServer::new(
- "0".to_string(), // dummy port
- "localhost".to_string(), // dummy host
- "6379".to_string(), // dummy master port
- None, // no connection needed for execution
- ));
-
- // Set the cache to our actual cache
- let mut temp_slave = temp_slave;
- temp_slave.set_repl_offset(&master_repl_offset);
- temp_slave.set_cache(&cache_clone);
- let server_arc = Arc::new(Mutex::new(temp_slave));
-
- let response = RedisCommands::from(parsed_command.clone())
- .execute(server_arc.clone());
-
- let mut shared_stream = shared_stream.lock().unwrap();
-
- if let RespType::Array(arr) = &parsed_command {
- if arr[0].to_resp_bytes() == b"REPLCONF".to_vec() {
- shared_stream.write(&response).unwrap();
- }
- }
-
- // Update remaining bytes for next iteration
- remaining_bytes = leftover;
- server_arc.lock().unwrap().repl_offset_increment(
- parsed_command.to_resp_bytes().len(),
- );
- }
- Err(_) => {
- // If parsing fails, break out of the loop
- break;
- }
+ self.state.lock().unwrap().connection = Some(stream);
+ self.start_replication_listener(&mut rest);
+
+ Ok(())
+ }
+ Err(e) => Err(format!("Master node doesn't exist: {}", e)),
+ }
+ }
+
+ // TODO: This should return a Result
+ fn start_replication_listener<'a>(&'a self, rest: &mut &[u8]) {
+ let state = self.state.clone();
+ let cache = self.cache.clone();
+ let config = self.config.clone();
+ let server_state = self.get_server_state();
+ let broadcaster = None::<Arc<Mutex<&mut dyn CanBroadcast>>>;
+
+ // if it's not empty then there is probably a REPLCONF command sent and I should handle it
+ // before reading anymore bytes
+ if !rest.is_empty() {
+ // TODO: Sync the rdb_file with the slave's cache
+ // TODO: Find a way to propagate the error up the stack by using anyhow or something
+ if rest[0] == '$' as u8 {
+ // this means that the rdb segment got in here some how so I have to deal with it here
+ let (rdb_file, bytes_consumed) = RDBFile::from_bytes(rest).unwrap();
+ *rest = &rest[bytes_consumed..];
+
+ println!("rdb bytes: {}", bytes_consumed);
+ println!("remaining btyes after rdb: {}", rest.len());
+ if rest.len() > 0 {
+ match parse(rest) {
+ Ok((resp, leftover)) => {
+ dbg!(&resp, leftover);
+
+ // Update replication offset
+ let command_size = resp.to_resp_bytes().len();
+ let mut state_guard = state.lock().unwrap();
+
+ let command = RedisCommand::from(resp);
+
+ let response = command.execute(
+ cache.clone(),
+ config.clone(),
+ server_state.clone(),
+ broadcaster.clone(),
+ );
+
+ if let Some(ref mut stream) = state_guard.connection {
+ let _ = stream.write_all(&response);
+ let _ = stream.flush();
}
+
+ state_guard.master_repl_offset += command_size;
}
+ Err(_) => {}
}
- });
- Ok(())
+ }
+ } else {
+ match parse(rest) {
+ Ok((resp, leftover)) => {
+ dbg!(&resp, leftover);
+
+ // Update replication offset
+ let command_size = resp.to_resp_bytes().len();
+ let mut state_guard = state.lock().unwrap();
+
+ let command = RedisCommand::from(resp);
+<