Introduction

In the previous post, We designed a communication protocol to be used between the client and the key-value store server. In this post, we will start accepting requests, and test if our protocol works as expected.

Accepting requests

To start accepting commands, we will need some form of transport to carry the bytes representing our commands, for that we will start a TCP server powered by the tokio library.

The server will handle accepting requests, and spawning a separate tokio task per client connection to handle command parsing and execution.

pub async fn run(listener: TcpListener) {
    loop {
        let (stream, _) = listener.accept().await.unwrap();

        // start a new tokio worker task to handle client connections.
        tokio::spawn(async move {
            if let Err(msg) = process(stream).await {
                println!("An error happened while processing the request: {:?}", msg);
            }
        });
    }
}

The code for the run function is straight forward, given a TcpListener, which we will create at startup based on some parameters given by the user, start accepting clients connections, and handle each connection in it’s own separate tokio task, this will allow us to not block the main server thread, and scale our workload across multiple CPU cores thanks to the tokio runtime task scheduler.

async fn process(stream: TcpStream) -> Result<()> {
    let mut handler = ConnectionHandler::new(stream);
    loop {
        match handler.read_command().await {
            Ok(val) => match val {
                Some(cmd) => {
                    handler.execute(cmd).await?;
                }
                None => {
                    break;
                }
            },
            Err(msg) => {
                return Err(msg);
            }
        }
    }
    Ok(())
}

The process function takes ownership of the TcpStream, and will delegate the command parsing and execution to another component that is created per connection: ConnectionHandler.

The function will try to continuously read commands from the client and execute them as they come, and it will stop once the connection is closed from the client, or an error happens.

pub struct ConnectionHandler {
    stream: BufWriter<TcpStream>,

    buf: BytesMut,
}

The ConnectionHandler type keeps a reference of a buffered TcpStream, and an internal buffer to read bytes coming from the TCP stream.

Buffering the TcpStream is necessary to not cause individual write calls to issue individual syscalls which is expensive and unnecessary. instead, we want the stream to only issue the write syscalls once the buffer is full or we explicitly flush it.

The read buffer heald by the ConnectionHandler is a type from the bytes crate, and you can look at it as a [u8] but with a lot of convenience methods, it is created with a fixed capacity (defaulted to 1024), i.e we suppose that each command shouldn’t take more than a given size, and if that isn’t the case, we will deal with it later on.

Reading the command from the stream is as simple as calling protocol::Parser::parse from the previous blog post, but before doing so we must ensure that the buffer is full or has some data, this is done before every call to read_command by calling the ensure_filled function.

pub async fn read_command(&mut self) -> Result<Option<Command>> {
    if let None = self.ensure_filled().await? {
        // `None` is returned when the connection is closed and no bytes can be read.
        return Ok(None);
    }

    let mut cursor = Cursor::new(&self.buf[..]);
    let command = Parser::parse(&mut cursor)?;
    let length = cursor.position() as usize;
    self.buf.advance(length);
    Ok(Some(command))
}

When trying to parse a command from a buffer, we are always sure that the buffer has some data and the cursor to the buffer is always moved after a parse succeeds.

Note: in a future post we will add a pre-scanning for the contents of the buffer, to make sure that a command parse attempt will most likely succeed, and if we detect invalid or unexpected bytes, we error-out before doing any more unnecessary work which could improve performance.)

After executing a command the response is written by calling the protocol::Writer::write from the previous post as well.

For this post, we only have one single command, which is a test command to verify that the server is responsive, the ping command.

pub async fn execute(&mut self, cmd: Command) -> Result<()> {
    match cmd {
        Command::Ping(key) => {
            return self.handle_ping(key).await;
        }
        _ => {
            // TODO: future posts.
            unimplemented!();
        }
    }
}

async fn handle_ping(&mut self, key: String) -> Result<()> {
    if key.is_empty() {
        // Default to a ping.
        self.write_response(&Response::Ok("PONG".into())).await?;
    } else {
        self.write_response(&Response::Ok(key)).await?;
    }
    Ok(())
}

A ping command will just return PONG (the way Redis does), or it will return the passed string.

To make sure that our whole setup works, we will create an integration test, where we will start a server on a random port, and we will execute the ping command and make sure that the response is what we expect.

To be able to do this, we should have a simple client to make interacting with the server easier, a simple client could look like this:

pub struct Client {
    handler: ConnectionHandler,
}

impl Client {
    pub async fn ping(&mut self, key: String) -> Result<Option<Response>> {
        let command = Command::Ping(key);
        self.handler.write_command(&command).await?;
        return self.handler.read_response().await;
    }
}

Now our tests will live under the kvstore/tests directory, and the ping test will look something like:

async fn start_server() -> Result<SocketAddr> {
    let listener = TcpListener::bind("0.0.0.0:0").await.unwrap();
    let addr = listener.local_addr().unwrap();
    tokio::spawn(async move { kvstore::server::run(listener).await });
    Ok(addr)
}

#[tokio::test]
async fn test_ping() {
    let addr = start_server().await.unwrap();
    let mut client = kvstore::client::create(addr).await.unwrap();
    let res = client.ping(String::from("")).await.unwrap();
    assert_eq!(res, Some(Response::Ok(String::from("PONG"))));
}

#[tokio::test]
async fn test_ping_with_value() {
    let addr = start_server().await.unwrap();
    let mut client = kvstore::client::create(addr).await.unwrap();
    let res = client.ping(String::from("Value")).await.unwrap();
    assert_eq!(res, Some(Response::Ok(String::from("Value"))));
}

start_server will start a kvstore server on a random unused port, and will return the address to the created server, which we will use to connect our client and start issuing commands by creating the corresponding objects from the protocol module and trying to assert on the returned values.

Conclusion

In this post, we successfully managed to create a TCP server and started accepting connections and executing ping commands, in the following posts we will dig into the data model, and how we will persist the data in disk/in-memory and we will implement get and set commands.

The full source code could be found here.

See part 3 of the series, stay tuned.