aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authoromagdy <omar.professional8777@gmail.com>2025-07-23 03:20:14 +0300
committeromagdy <omar.professional8777@gmail.com>2025-07-23 03:20:14 +0300
commitb7efe705e2865fa89cd3d3bfd3926fdb9667847d (patch)
tree2ce862e6fefdeade4ea459eae8e74d9a65d3987c /src
parent089ed7f549eaf82fb7bc3c46616b19615c55d72a (diff)
downloadredis-rust-b7efe705e2865fa89cd3d3bfd3926fdb9667847d.tar.xz
redis-rust-b7efe705e2865fa89cd3d3bfd3926fdb9667847d.zip
feat: Implemented last step of handshake process
Diffstat (limited to 'src')
-rw-r--r--src/lib.rs19
-rw-r--r--src/resp_commands.rs22
2 files changed, 41 insertions, 0 deletions
diff --git a/src/lib.rs b/src/lib.rs
index 4aef994..30e2d9f 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -79,6 +79,25 @@ fn handshake_process(slave: &RedisServer) -> Result<(), String> {
if let Err(e) = stream.write_all(&resp_bytes!(array => [resp!(bulk "REPLCONF"), resp!(bulk "capa"), resp!(bulk "psync2")])) {
return Err(format!("Failed to send: {}", e));
}
+
+ let _ = match stream.read(&mut buffer) {
+ Ok(0) => return Ok(()), // connection closed
+ Ok(n) => n,
+ Err(_) => return Ok(()), // error occurred
+ };
+
+ if let Err(e) = stream.write_all(
+ &resp_bytes!(array => [resp!(bulk "PSYNC"), resp!(bulk "?"), resp!(bulk "-1")]),
+ ) {
+ return Err(format!("Failed to send: {}", e));
+ }
+
+ let _ = match stream.read(&mut buffer) {
+ Ok(0) => return Ok(()), // connection closed
+ Ok(n) => n,
+ Err(_) => return Ok(()), // error occurred
+ };
+
Ok(())
}
Err(e) => Err(format!("Master node doesn't exists: {}", e)),
diff --git a/src/resp_commands.rs b/src/resp_commands.rs
index dcb717f..04d304c 100644
--- a/src/resp_commands.rs
+++ b/src/resp_commands.rs
@@ -118,6 +118,7 @@ pub enum RedisCommands {
Keys(String),
Info(String),
ReplConf((String, String)),
+ Psync((String, String)),
Invalid,
}
@@ -255,6 +256,18 @@ impl RedisCommands {
RC::ReplConf((_, _)) => {
resp_bytes!("OK")
}
+ RC::Psync((_, _)) => {
+ let config = config.clone();
+ let mut server = RedisServer::new();
+ if let Some(conf) = config.as_ref() {
+ server = conf.server.clone();
+ }
+ let response = format!(
+ "FULLRESYNC {} 0",
+ server.master_replid.unwrap_or("".to_string()),
+ );
+ resp_bytes!(response)
+ }
RC::Invalid => todo!(),
}
}
@@ -445,6 +458,15 @@ impl From<RespType> for RedisCommands {
};
Self::ReplConf((op1, op2))
}
+ "PSYNC" => {
+ let Some(repl_id) = args.next() else {
+ return Self::Invalid;
+ };
+ let Some(repl_offset) = args.next() else {
+ return Self::Invalid;
+ };
+ Self::Psync((repl_id, repl_offset))
+ }
_ => Self::Invalid,
}
}