aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/lib.rs31
-rw-r--r--src/macros.rs2
-rw-r--r--src/resp_commands.rs13
3 files changed, 40 insertions, 6 deletions
diff --git a/src/lib.rs b/src/lib.rs
index bb91b2e..4aef994 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -5,6 +5,8 @@ use std::{
net::{TcpListener, TcpStream},
};
+use crate::resp_parser::parse;
+
#[macro_use]
pub mod macros;
pub mod rdb;
@@ -50,17 +52,38 @@ fn handshake_process(slave: &RedisServer) -> Result<(), String> {
// PING
match TcpStream::connect(master_address) {
Ok(mut stream) => {
+ let mut buffer = [0; 512];
+
+ // PING
if let Err(e) = stream.write_all(&resp_bytes!(array => [resp!(bulk "PING")])) {
return Err(format!("Failed to send: {}", e));
- } else {
- Ok(())
}
+
+ let _ = match stream.read(&mut buffer) {
+ Ok(0) => return Ok(()), // connection closed
+ Ok(n) => n,
+ Err(_) => return Ok(()), // error occurred
+ };
+
+ // REPLCONF
+ if let Err(e) = stream.write_all(&resp_bytes!(array => [resp!(bulk "REPLCONF"), resp!(bulk "listening-port"), resp!(bulk slave.port.clone())])) {
+ return Err(format!("Failed to send: {}", e));
+ }
+
+ let _ = match stream.read(&mut buffer) {
+ Ok(0) => return Ok(()), // connection closed
+ Ok(n) => n,
+ Err(_) => return Ok(()), // error occurred
+ };
+
+ if let Err(e) = stream.write_all(&resp_bytes!(array => [resp!(bulk "REPLCONF"), resp!(bulk "capa"), resp!(bulk "psync2")])) {
+ return Err(format!("Failed to send: {}", e));
+ }
+ Ok(())
}
Err(e) => Err(format!("Master node doesn't exists: {}", e)),
}
- // REPLCONF
-
// PSYNC
}
diff --git a/src/macros.rs b/src/macros.rs
index 9dbad5d..10b2f9c 100644
--- a/src/macros.rs
+++ b/src/macros.rs
@@ -29,7 +29,6 @@ macro_rules! resp_bytes {
};
// Array: resp!(array => [resp!(bulk "one"), resp!(int 2)])
- // FIXME: this doesn't work and errors
(array => [$($elem:expr),*]) => {
RespType::Array(vec![$($elem),*]).to_resp_bytes()
};
@@ -118,7 +117,6 @@ macro_rules! resp {
};
// Array: resp!(array => [resp!(bulk "one"), resp!(int 2)])
- // FIXME: this doesn't work and errors
(array => [$($elem:expr),*]) => {
RespType::Array(vec![$($elem),*])
};
diff --git a/src/resp_commands.rs b/src/resp_commands.rs
index 554c296..dcb717f 100644
--- a/src/resp_commands.rs
+++ b/src/resp_commands.rs
@@ -117,6 +117,7 @@ pub enum RedisCommands {
ConfigGet(String),
Keys(String),
Info(String),
+ ReplConf((String, String)),
Invalid,
}
@@ -251,6 +252,9 @@ impl RedisCommands {
.to_vec();
RT::BulkString(response).to_resp_bytes()
}
+ RC::ReplConf((_, _)) => {
+ resp_bytes!("OK")
+ }
RC::Invalid => todo!(),
}
}
@@ -432,6 +436,15 @@ impl From<RespType> for RedisCommands {
}
Self::Invalid
}
+ "REPLCONF" => {
+ let Some(op1) = args.next() else {
+ return Self::Invalid;
+ };
+ let Some(op2) = args.next() else {
+ return Self::Invalid;
+ };
+ Self::ReplConf((op1, op2))
+ }
_ => Self::Invalid,
}
}