diff options
| -rw-r--r-- | src/lib.rs | 76 |
1 files changed, 32 insertions, 44 deletions
@@ -49,61 +49,49 @@ pub struct Config { fn handshake_process(slave: &RedisServer) -> Result<(), String> { let master_address = format!("{}:{}", slave.master_host, slave.master_port); - // PING match TcpStream::connect(master_address) { Ok(mut stream) => { let mut buffer = [0; 512]; - // PING - if let Err(e) = stream.write_all(&resp_bytes!(array => [resp!(bulk "PING")])) { - return Err(format!("Failed to send: {}", e)); - } - - let _ = match stream.read(&mut buffer) { - Ok(0) => return Ok(()), // connection closed - Ok(n) => n, - Err(_) => return Ok(()), // error occurred - }; - - // REPLCONF - if let Err(e) = stream.write_all(&resp_bytes!(array => [resp!(bulk "REPLCONF"), resp!(bulk "listening-port"), resp!(bulk slave.port.clone())])) { - return Err(format!("Failed to send: {}", e)); - } - - let _ = match stream.read(&mut buffer) { - Ok(0) => return Ok(()), // connection closed - Ok(n) => n, - Err(_) => return Ok(()), // error occurred - }; + let mut send_command = |command: &[u8]| -> Result<(), String> { + stream + .write_all(command) + .map_err(|e| format!("Failed to send: {}", e))?; - if let Err(e) = stream.write_all(&resp_bytes!(array => [resp!(bulk "REPLCONF"), resp!(bulk "capa"), resp!(bulk "psync2")])) { - return Err(format!("Failed to send: {}", e)); - } - - let _ = match stream.read(&mut buffer) { - Ok(0) => return Ok(()), // connection closed - Ok(n) => n, - Err(_) => return Ok(()), // error occurred + match stream.read(&mut buffer) { + Ok(0) | Err(_) => return Ok(()), // connection closed or error + Ok(_) => Ok(()), + } }; - if let Err(e) = stream.write_all( - &resp_bytes!(array => [resp!(bulk "PSYNC"), resp!(bulk "?"), resp!(bulk "-1")]), - ) { - return Err(format!("Failed to send: {}", e)); - } - - let _ = match stream.read(&mut buffer) { - Ok(0) => return Ok(()), // connection closed - Ok(n) => n, - Err(_) => return Ok(()), // error occurred - }; + // PING + send_command(&resp_bytes!(array => [resp!(bulk "PING")]))?; + + // REPLCONF listening-port + send_command(&resp_bytes!(array => [ + resp!(bulk "REPLCONF"), + resp!(bulk "listening-port"), + resp!(bulk slave.port.clone()) + ]))?; + + // REPLCONF capa + send_command(&resp_bytes!(array => [ + resp!(bulk "REPLCONF"), + resp!(bulk "capa"), + resp!(bulk "psync2") + ]))?; + + // PSYNC + send_command(&resp_bytes!(array => [ + resp!(bulk "PSYNC"), + resp!(bulk "?"), + resp!(bulk "-1") + ]))?; Ok(()) } - Err(e) => Err(format!("Master node doesn't exists: {}", e)), + Err(e) => Err(format!("Master node doesn't exist: {}", e)), } - - // PSYNC } pub type SharedConfig = Arc<Option<Config>>; |
