aboutsummaryrefslogtreecommitdiff
path: root/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib.rs')
-rw-r--r--src/lib.rs76
1 files changed, 32 insertions, 44 deletions
diff --git a/src/lib.rs b/src/lib.rs
index 30e2d9f..34a958a 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -49,61 +49,49 @@ pub struct Config {
fn handshake_process(slave: &RedisServer) -> Result<(), String> {
let master_address = format!("{}:{}", slave.master_host, slave.master_port);
- // PING
match TcpStream::connect(master_address) {
Ok(mut stream) => {
let mut buffer = [0; 512];
- // PING
- if let Err(e) = stream.write_all(&resp_bytes!(array => [resp!(bulk "PING")])) {
- return Err(format!("Failed to send: {}", e));
- }
-
- let _ = match stream.read(&mut buffer) {
- Ok(0) => return Ok(()), // connection closed
- Ok(n) => n,
- Err(_) => return Ok(()), // error occurred
- };
-
- // REPLCONF
- if let Err(e) = stream.write_all(&resp_bytes!(array => [resp!(bulk "REPLCONF"), resp!(bulk "listening-port"), resp!(bulk slave.port.clone())])) {
- return Err(format!("Failed to send: {}", e));
- }
-
- let _ = match stream.read(&mut buffer) {
- Ok(0) => return Ok(()), // connection closed
- Ok(n) => n,
- Err(_) => return Ok(()), // error occurred
- };
+ let mut send_command = |command: &[u8]| -> Result<(), String> {
+ stream
+ .write_all(command)
+ .map_err(|e| format!("Failed to send: {}", e))?;
- if let Err(e) = stream.write_all(&resp_bytes!(array => [resp!(bulk "REPLCONF"), resp!(bulk "capa"), resp!(bulk "psync2")])) {
- return Err(format!("Failed to send: {}", e));
- }
-
- let _ = match stream.read(&mut buffer) {
- Ok(0) => return Ok(()), // connection closed
- Ok(n) => n,
- Err(_) => return Ok(()), // error occurred
+ match stream.read(&mut buffer) {
+ Ok(0) | Err(_) => return Ok(()), // connection closed or error
+ Ok(_) => Ok(()),
+ }
};
- if let Err(e) = stream.write_all(
- &resp_bytes!(array => [resp!(bulk "PSYNC"), resp!(bulk "?"), resp!(bulk "-1")]),
- ) {
- return Err(format!("Failed to send: {}", e));
- }
-
- let _ = match stream.read(&mut buffer) {
- Ok(0) => return Ok(()), // connection closed
- Ok(n) => n,
- Err(_) => return Ok(()), // error occurred
- };
+ // PING
+ send_command(&resp_bytes!(array => [resp!(bulk "PING")]))?;
+
+ // REPLCONF listening-port
+ send_command(&resp_bytes!(array => [
+ resp!(bulk "REPLCONF"),
+ resp!(bulk "listening-port"),
+ resp!(bulk slave.port.clone())
+ ]))?;
+
+ // REPLCONF capa
+ send_command(&resp_bytes!(array => [
+ resp!(bulk "REPLCONF"),
+ resp!(bulk "capa"),
+ resp!(bulk "psync2")
+ ]))?;
+
+ // PSYNC
+ send_command(&resp_bytes!(array => [
+ resp!(bulk "PSYNC"),
+ resp!(bulk "?"),
+ resp!(bulk "-1")
+ ]))?;
Ok(())
}
- Err(e) => Err(format!("Master node doesn't exists: {}", e)),
+ Err(e) => Err(format!("Master node doesn't exist: {}", e)),
}
-
- // PSYNC
}
pub type SharedConfig = Arc<Option<Config>>;