aboutsummaryrefslogtreecommitdiff
path: root/src/main.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/main.rs')
-rw-r--r--src/main.rs101
1 files changed, 53 insertions, 48 deletions
diff --git a/src/main.rs b/src/main.rs
index 78b9c24..25bc6c9 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -16,10 +16,10 @@ use codecrafters_redis::{
resp_bytes,
shared_cache::*,
};
-use codecrafters_redis::{resp_commands::RedisCommands, Config};
+use codecrafters_redis::{resp_commands::RedisCommands, server::RedisServer};
use codecrafters_redis::{
resp_parser::{parse, RespType},
- SharedConfig,
+ server::SlaveServer,
};
fn spawn_cleanup_thread(cache: SharedCache) {
@@ -57,7 +57,7 @@ fn write_rdb_to_stream<W: Write>(writer: &mut W) -> Result<(), Box<dyn std::erro
}
// TODO: This should return a Result to handle the plethora of different errors
-fn handle_client(mut stream: TcpStream, cache: SharedCache, config: SharedConfig) {
+fn handle_client(mut stream: TcpStream, server: Arc<Mutex<RedisServer>>) {
let mut buffer = [0; 512];
loop {
@@ -68,8 +68,8 @@ fn handle_client(mut stream: TcpStream, cache: SharedCache, config: SharedConfig
};
let request = parse(&buffer).unwrap();
- let response =
- RedisCommands::from(request.0.clone()).execute(cache.clone(), config.clone());
+ let server_clone = Arc::clone(&server);
+ let response = RedisCommands::from(request.0.clone()).execute(server_clone);
let mut request_command = "".to_string();
@@ -83,12 +83,23 @@ fn handle_client(mut stream: TcpStream, cache: SharedCache, config: SharedConfig
_ => {}
}
+ // Store the persistent connection
+ let shared_stream = Arc::new(Mutex::new(
+ stream.try_clone().expect("What could go wrong? :)"),
+ ));
+
// if this true immediately write and send back rdb file after response
// HACK: This just feels wrong I feel this shouldn't be handled here and should be handled
// in the exexute command
if request_command.starts_with("PSYNC") {
stream.write(&response).unwrap();
let _ = write_rdb_to_stream(&mut stream);
+ // handshake completed and I should add the server sending me the handshake to my replicas
+ let replica_addr = stream
+ .peer_addr()
+ .expect("This shouldn't fail right? right?? :)");
+ let mut server = server.lock().unwrap();
+ server.add_replica(replica_addr, shared_stream);
} else {
// write respose back to the client
stream.write(&response).unwrap();
@@ -97,62 +108,56 @@ fn handle_client(mut stream: TcpStream, cache: SharedCache, config: SharedConfig
}
fn main() -> std::io::Result<()> {
- let cache: SharedCache = Arc::new(Mutex::new(HashMap::new()));
- let mut config: SharedConfig = Arc::new(None);
- let mut port = "6379".to_string();
-
- match Config::new() {
- Ok(conf) => {
- if let Some(conf) = conf {
- let mut cache = cache.lock().unwrap();
- let dir = conf.dir.clone().unwrap_or("".to_string());
- let dbfilename = conf.dbfilename.clone().unwrap_or("".to_string());
- let redis_server = conf.server.clone();
- port = redis_server.port.clone();
- if let Ok(rdb_file) = RDBFile::read(dir, dbfilename) {
- if let Some(rdb) = rdb_file {
- let hash_table = &rdb.databases.get(&0).unwrap().hash_table;
-
- for (key, db_entry) in hash_table.iter() {
- let value = match &db_entry.value {
- RedisValue::String(data) => {
- String::from_utf8(data.clone()).unwrap()
- }
- RedisValue::Integer(data) => data.to_string(),
- _ => {
- unreachable!()
- }
- };
- let expires_at = if let Some(key_expiry) = &db_entry.expiry {
- Some(key_expiry.timestamp)
- } else {
- None
- };
- let cache_entry = CacheEntry { value, expires_at };
- cache.insert(String::from_utf8(key.clone()).unwrap(), cache_entry);
- }
- }
- }
- config = Arc::new(Some(conf));
- }
- }
+ let server = match RedisServer::new() {
+ Ok(Some(server)) => server,
+ Ok(None) => RedisServer::master(), // Default to master if no args
Err(e) => {
eprintln!("Error: {}", e);
std::process::exit(1);
}
+ };
+
+ // Load RDB file if dir and dbfilename are provided
+ if let (Some(dir), Some(dbfilename)) = (server.dir().clone(), server.dbfilename().clone()) {
+ if let Ok(rdb_file) = RDBFile::read(dir, dbfilename) {
+ if let Some(rdb) = rdb_file {
+ let mut cache = server.cache().lock().unwrap();
+ let hash_table = &rdb.databases.get(&0).unwrap().hash_table;
+
+ for (key, db_entry) in hash_table.iter() {
+ let value = match &db_entry.value {
+ RedisValue::String(data) => String::from_utf8(data.clone()).unwrap(),
+ RedisValue::Integer(data) => data.to_string(),
+ _ => {
+ unreachable!()
+ }
+ };
+ let expires_at = if let Some(key_expiry) = &db_entry.expiry {
+ Some(key_expiry.timestamp)
+ } else {
+ None
+ };
+ let cache_entry = CacheEntry { value, expires_at };
+ cache.insert(String::from_utf8(key.clone()).unwrap(), cache_entry);
+ }
+ }
+ }
}
+ let port = server.port().to_string();
let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
- spawn_cleanup_thread(cache.clone());
+ spawn_cleanup_thread(server.cache().clone());
+
+ let server = Arc::new(Mutex::new(server));
for stream in listener.incoming() {
match stream {
Ok(stream) => {
- let cache_clone = cache.clone();
- let config_clone = Arc::clone(&config);
+ let server_clone = Arc::clone(&server);
+ // TODO: Use tokio instead of multi threads to handle multiple clients
thread::spawn(|| {
- handle_client(stream, cache_clone, config_clone);
+ handle_client(stream, server_clone);
});
}
Err(e) => {