From 1f2f3a241c59f467df5bf16fbde872f5083a174f Mon Sep 17 00:00:00 2001 From: omagdy Date: Wed, 23 Jul 2025 04:06:13 +0300 Subject: feat: Added syncing with empty RDB file --- src/lib.rs | 6 +++--- src/main.rs | 48 ++++++++++++++++++++++++++++++++++++++++++++---- src/resp_commands.rs | 1 + 3 files changed, 48 insertions(+), 7 deletions(-) (limited to 'src') diff --git a/src/lib.rs b/src/lib.rs index 34a958a..4f3f322 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -67,21 +67,21 @@ fn handshake_process(slave: &RedisServer) -> Result<(), String> { // PING send_command(&resp_bytes!(array => [resp!(bulk "PING")]))?; - // REPLCONF listening-port + // REPLCONF listening-port send_command(&resp_bytes!(array => [ resp!(bulk "REPLCONF"), resp!(bulk "listening-port"), resp!(bulk slave.port.clone()) ]))?; - // REPLCONF capa + // REPLCONF capa psync2 send_command(&resp_bytes!(array => [ resp!(bulk "REPLCONF"), resp!(bulk "capa"), resp!(bulk "psync2") ]))?; - // PSYNC + // PSYNC send_command(&resp_bytes!(array => [ resp!(bulk "PSYNC"), resp!(bulk "?"), diff --git a/src/main.rs b/src/main.rs index 3186b6c..78b9c24 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,6 +3,7 @@ use core::time; use std::{ collections::HashMap, env, + fmt::format, io::{Read, Write}, net::{TcpListener, TcpStream}, sync::{Arc, Mutex}, @@ -12,6 +13,7 @@ use std::{ use codecrafters_redis::{ rdb::{KeyExpiry, ParseError, RDBFile, RedisValue}, + resp_bytes, shared_cache::*, }; use codecrafters_redis::{resp_commands::RedisCommands, Config}; @@ -38,6 +40,23 @@ fn spawn_cleanup_thread(cache: SharedCache) { }); } +use base64::{engine::general_purpose, Engine as _}; + +fn write_rdb_to_stream(writer: &mut W) -> Result<(), Box> { + let hardcoded_rdb = "UkVESVMwMDEx+glyZWRpcy12ZXIFNy4yLjD6CnJlZGlzLWJpdHPAQPoFY3RpbWXCbQi8ZfoIdXNlZC1tZW3CsMQQAPoIYW9mLWJhc2XAAP/wbjv+wP9aog=="; + + let bytes = general_purpose::STANDARD.decode(hardcoded_rdb)?; + + let mut response = format!("${}\r\n", bytes.len()).into_bytes(); + response.extend_from_slice(&bytes); + + // Write the binary RDB data + writer.write_all(&response)?; + + Ok(()) +} + +// TODO: This should return a Result to handle the plethora of different errors fn handle_client(mut stream: TcpStream, cache: SharedCache, config: SharedConfig) { let mut buffer = [0; 512]; @@ -48,11 +67,32 @@ fn handle_client(mut stream: TcpStream, cache: SharedCache, config: SharedConfig Err(_) => return, // error occurred }; - let parsed_resp = parse(&buffer).unwrap(); - let response = RedisCommands::from(parsed_resp.0).execute(cache.clone(), config.clone()); + let request = parse(&buffer).unwrap(); + let response = + RedisCommands::from(request.0.clone()).execute(cache.clone(), config.clone()); + + let mut request_command = "".to_string(); - // write respose back to the client - stream.write(&response).unwrap(); + // FIXME: Find a solution for this mess!! + match &request.0 { + RespType::Array(arr) => { + if let RespType::BulkString(s) = arr[0].clone() { + request_command = String::from_utf8(s).unwrap(); + } + } + _ => {} + } + + // if this true immediately write and send back rdb file after response + // HACK: This just feels wrong I feel this shouldn't be handled here and should be handled + // in the exexute command + if request_command.starts_with("PSYNC") { + stream.write(&response).unwrap(); + let _ = write_rdb_to_stream(&mut stream); + } else { + // write respose back to the client + stream.write(&response).unwrap(); + } } } diff --git a/src/resp_commands.rs b/src/resp_commands.rs index 04d304c..3c18b07 100644 --- a/src/resp_commands.rs +++ b/src/resp_commands.rs @@ -266,6 +266,7 @@ impl RedisCommands { "FULLRESYNC {} 0", server.master_replid.unwrap_or("".to_string()), ); + resp_bytes!(response) } RC::Invalid => todo!(), -- cgit v1.2.3