aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/resp_commands.rs15
-rw-r--r--src/server.rs68
2 files changed, 67 insertions, 16 deletions
diff --git a/src/resp_commands.rs b/src/resp_commands.rs
index 03ec9dc..453024c 100644
--- a/src/resp_commands.rs
+++ b/src/resp_commands.rs
@@ -259,7 +259,7 @@ impl RedisCommands {
"# Replication\r\nrole:{}master_replid:{}master_repl_offset:{}",
server.role(),
master.replid.clone().unwrap_or("".to_string()),
- master.current_offset.clone().unwrap_or("".to_string())
+ master.current_offset.lock().unwrap(),
)
.as_bytes()
.to_vec();
@@ -270,7 +270,7 @@ impl RedisCommands {
"# Replication\r\nrole:{}master_replid:{}master_repl_offset:{}",
server.role(),
slave.master_replid.clone().unwrap_or("".to_string()),
- slave.master_repl_offset.clone().unwrap_or("".to_string())
+ slave.master_repl_offset.lock().unwrap()
)
.as_bytes()
.to_vec();
@@ -278,9 +278,13 @@ impl RedisCommands {
}
}
}
- RC::ReplConf((_, _)) => {
- resp_bytes!("OK")
- }
+ RC::ReplConf((op1, op2)) => match (op1.to_uppercase().as_str(), op2.as_str()) {
+ ("GETACK", "*") => {
+ println!("Did i even get here?");
+ resp_bytes!(array => [resp!(bulk "REPLCONF"), resp!(bulk "ACK"), resp!(bulk server.lock().unwrap().repl_offset().to_string())])
+ }
+ _ => resp_bytes!("OK"),
+ },
RC::Psync((_, _)) => {
let server = server.lock().unwrap();
if let RedisServer::Master(master) = &*server {
@@ -482,6 +486,7 @@ impl From<RespType> for RedisCommands {
let Some(op2) = args.next() else {
return Self::Invalid;
};
+ println!("Hello from here");
Self::ReplConf((op1, op2))
}
"PSYNC" => {
diff --git a/src/server.rs b/src/server.rs
index 5636234..6da29eb 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -4,6 +4,7 @@ use std::collections::HashMap;
use std::io::{Read, Write};
use std::net::{SocketAddr, TcpStream};
use std::sync::{Arc, Mutex};
+use std::time::Duration;
use std::{env, thread};
use crate::shared_cache::SharedCache;
@@ -12,9 +13,9 @@ use crate::shared_cache::SharedCache;
pub struct MasterServer {
pub dir: Option<String>,
pub dbfilename: Option<String>,
- pub replid: Option<String>,
- pub current_offset: Option<String>,
pub port: String,
+ pub replid: Option<String>,
+ pub current_offset: Arc<Mutex<usize>>,
pub cache: SharedCache,
replicas: Vec<SlaveServer>,
}
@@ -26,7 +27,7 @@ impl MasterServer {
dbfilename: None,
port: "6379".to_string(),
replid: Some("8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb".to_string()),
- current_offset: Some("0".to_string()),
+ current_offset: Arc::new(Mutex::new(0)),
cache: Arc::new(Mutex::new(HashMap::new())),
replicas: vec![],
}
@@ -60,7 +61,7 @@ pub struct SlaveServer {
pub dbfilename: Option<String>,
pub port: String,
pub master_replid: Option<String>,
- pub master_repl_offset: Option<String>,
+ pub master_repl_offset: Arc<Mutex<usize>>,
pub master_host: String,
pub master_port: String,
pub connection: Option<Arc<Mutex<TcpStream>>>,
@@ -79,7 +80,7 @@ impl SlaveServer {
dbfilename: None,
port,
master_replid: Some("8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb".to_string()),
- master_repl_offset: Some("0".to_string()),
+ master_repl_offset: Arc::new(Mutex::new(0)),
master_host,
master_port,
connection,
@@ -95,7 +96,7 @@ impl SlaveServer {
fn handshake(&mut self) -> Result<(), String> {
match self.connect() {
Ok(mut stream) => {
- let mut buffer = [0; 512];
+ let mut buffer = [0; 1024];
let mut send_command = |command: &[u8]| -> Result<(), String> {
stream
@@ -135,14 +136,16 @@ impl SlaveServer {
resp!(bulk "-1")
]))?;
+ thread::sleep(Duration::new(1, 0));
+
// Store the persistent connection
let shared_stream = Arc::new(Mutex::new(stream));
self.connection = Some(shared_stream.clone());
// Spawn the background listener thread
let cache_clone = self.cache.clone();
+ let master_repl_offset = self.master_repl_offset.clone();
thread::spawn(move || {
- let mut buffer = [0u8; 1024];
loop {
let bytes_read = {
let mut stream = shared_stream.lock().unwrap();
@@ -158,12 +161,22 @@ impl SlaveServer {
}
}
}; // stream lock is dropped here
- // Parse and execute all commands in the buffer
+
+ dbg!(&bytes_read);
+
+ // Parse and execute all commands in the buffer
let mut remaining_bytes = &buffer[..bytes_read];
- while !remaining_bytes.is_empty() {
+ dbg!(&remaining_bytes);
+
+ let mut flag = true;
+
+ while !remaining_bytes.is_empty() || flag {
+ flag = false;
match parse(remaining_bytes) {
Ok((parsed_command, leftover)) => {
+ dbg!(&parsed_command, leftover);
+
// Create a temporary slave server for command execution
let temp_slave = RedisServer::Slave(SlaveServer::new(
"0".to_string(), // dummy port
@@ -174,14 +187,26 @@ impl SlaveServer {
// Set the cache to our actual cache
let mut temp_slave = temp_slave;
+ temp_slave.set_repl_offset(&master_repl_offset);
temp_slave.set_cache(&cache_clone);
let server_arc = Arc::new(Mutex::new(temp_slave));
- let _ = RedisCommands::from(parsed_command.clone())
- .execute(server_arc);
+ let response = RedisCommands::from(parsed_command.clone())
+ .execute(server_arc.clone());
+
+ let mut shared_stream = shared_stream.lock().unwrap();
+
+ if let RespType::Array(arr) = &parsed_command {
+ if arr[0].to_resp_bytes() == b"REPLCONF".to_vec() {
+ shared_stream.write(&response).unwrap();
+ }
+ }
// Update remaining bytes for next iteration
remaining_bytes = leftover;
+ server_arc.lock().unwrap().repl_offset_increment(
+ parsed_command.to_resp_bytes().len(),
+ );
}
Err(_) => {
// If parsing fails, break out of the loop
@@ -263,6 +288,13 @@ impl RedisServer {
}
}
+ pub fn repl_offset(&self) -> usize {
+ match self {
+ RedisServer::Master(master) => *master.current_offset.lock().unwrap(),
+ RedisServer::Slave(slave) => *slave.master_repl_offset.lock().unwrap(),
+ }
+ }
+
pub fn set_cache(&mut self, cache: &SharedCache) {
match self {
RedisServer::Master(master) => master.cache = cache.clone(),
@@ -270,6 +302,20 @@ impl RedisServer {
}
}
+ pub fn set_repl_offset(&mut self, repl_offset: &Arc<Mutex<usize>>) {
+ match self {
+ RedisServer::Master(master) => master.current_offset = repl_offset.clone(),
+ RedisServer::Slave(slave) => slave.master_repl_offset = repl_offset.clone(),
+ }
+ }
+
+ pub fn repl_offset_increment(&mut self, amount: usize) {
+ match self {
+ RedisServer::Master(master) => *master.current_offset.lock().unwrap() += amount,
+ RedisServer::Slave(slave) => *slave.master_repl_offset.lock().unwrap() += amount,
+ }
+ }
+
pub fn role(&self) -> &str {
match self {
RedisServer::Master(_) => "master",