Building Cloud Storage from Scratch

With Python (30 - 40 mins read time)

Welcome back to The Lambda Scheme. In our last post we reflected on the long history of classical computing and explored the mysterious new world of quantum algorithms. Now that we’ve amply flexed our philosopher muscles I thought we could spend this post doing something much more practical. We’re going to build some good ol’ cloud infrastructure from scratch. With Python! In particular, we’re going to build a cloud-ready block storage service, like a simplified version of Amazon EBS or OpenStack Cinder

Imagine you’re tasked with building Cinder. Where would you start? Building your own version of a system you use regularly is a great way to richen your understanding, both of the system itself and of general systems design principles. For those of you who are interviewing at a big tech co. like any FAANG, this post will follow a very similar flow to the “systems design” section of the interview so it’s good practice.

In Part 1 we will learn:

In Part 2 we will uplevel our service for cloud scale by:

Finaly in Part 3 we will see how our system actually performs by:

Follow along with the source code on github. Let’s begin!

Part 1: A Single Node Service

The What and Why of Block Devices

Block devices are the parts of any computer system that make it durable. They store all the state that a computer has when it boots, its operating system, its data, and its programs. Many things can be block devices, rotating hard disk drives, solid state drives, older magnetic tapes, or whatever. The abstract idea of a “block device” caputres the common features across all these devices namely: they’re cheap, they’re big, and they’re non-volatile. Cheap and big basically mean the same thing; the cost per bit of a block device is much lower than that of RAM or CPU registers, so we can buy a lot more bits.

This cheapness and bigness comes at a cost. For hardware reasons we won’t go into, the more storage a device has the longer it takes to retrieve any one piece of data. For block devices this cost is so high that you only want to pay it to retrieve a large amount of data, called a block (hence the name), at a time. Because the whole block is collocated on your storage device you pay the same overhead you would for a single byte but amortized over the size of the block, typically 4KiB.

One of the jobs of your operating system is to manage these blocks for you so that you, the application programmer don’t need to think about them. Your OS will have a myriad techniques for managing blocks efficiently. It may cache them in memory when it anticipates you’ll need them again soon, or it may cleverly do disk reads in sequence when it knows the blocks are close to each other on disk. When it comes time to actually retrieve or store your blocks it’s your OS, or more precisely the OS kernel with its privileged hardware access, that issues the request to your hard drive.

A common interface between your kernel and the device here is the SCSI (pronounced “scuzzie”) protocol which we’ll meet again when we want to provide blocks as a network service. In short, say your program needs to fetch a block from your hard-drive, then your kernel will issue the drive a SCSI command like “read me block 42” and then copy the block into main memory for your program.

Blocks are a pain to deal with. They are fixed in size and use otherwise meaningless numbers like 0x34586209 as addresses. So from the perspective of an application program a kernel provides a more abstract and more friendly “file system” interface.

With a file system we get a tree of directories and files which get to have meaningful names and can be of arbitrary size. Your kernel then translates operations on these files into storage and retrieval of blocks. As you can imagine, there are infinitely many ways the OS can do this mapping, but over time a handful have emerged as the most common, Ext4 on Linux, NTFS on Windows, and APFS on Mac. Let’s look at a simple view of the disk layout for Ext4.

What Linux does is store all key attributes of the file system itself in the first block of the drive, called the super-block. At the other end of the drive we have all our file blocks. These contain the raw contents of individual files and directories. Sandwiched in between the superblock and the file blocks is a collection of inode blocks.

Each inode stores metadata about an individual file or directory and can tell you which data blocks make up its contents. Inodes are how the kernel can actually index and assemble file blocks into a single file of arbitrary size. When we zoom into ext4 this process gets quite hairy, but we don’t have to think about that. File systems are built on block devices, so If we implement blocks as a network service we get ext4 for free.

Finally before starting to write our service let’s briefly understand why we would want network block devices instead of say a Network Filesystem or blob storage like Amazon S3. The truth is our needs depend on our application. For applications like storing photos which would have lots of independent and variably sized files coming in we would in fact want S3 or NFS.

Other applications like storing relational data in a database, which we’ll do in Part 3, or even writing NFS itself rely on block storage as a primitive. Databases and file-systems will do lots of in-place block updates, log writing with contiguous blocks, and use ordering guarantees that are really only correct and performant on a block device. You should always take implicit constraints and tradeoffs like these and make them explicit when designing a system. So with our motivation clear let’s start writing our service.

Network Block Devices

We’ve seen how your kernel might send SCSI commands over a SATA cable when it’s communicating with a physical hard drive. It turns out SCSI is a generic enough protocol that we can use it over the internet as well. In particular if you’ve ever heard of iSCSI (“eye scuzzie”), it’s a way to issue SCSI commands over TCP/IP. The proliferation of SCSI for disk drives is what makes iSCSI so great as a protocol for Storage Area Networks.

Linux also supports another protocol called NBD (the “no big deal” protocol, obviously). It’s simpler than iSCSI so we can implement a server in about 70 lines of Python but it won’t be as performant. Ah well, worth the tradeoff 🤷 They both in essence do the same thing, store blocks and retrieve blocks. Let’s take a close look at the protocol for NBD and see what behavior we need to support.

We can break a given NBD session into three parts

There is the handshake where we establish that the server is an NBD server and the client is an NBD client, the options negotiation where both sides say what optional behavior they support (none for now), and finally the actual transmission phase in which our client will store and fetch blocks. Simple enough. Our protocol also makes use of a handful of magic numbers) so I’m gonna start by putting all of these under one class for easy reference

import collections
import socket
import time

DEFAULT_DEVICE_SIZE = (2**19)  # 500Kb


class MagicValues(object):
    MinimalClientFlags = b"\x00\x00\x00\x01"  # set high NBD_FLAG_C_FIXED_NEWSTYLE
    OptionRequestPrefix = b"IHAVEOPT"
    OptionResponsePrefix = b"\x00\x03\xe8\x89\x04\x55\x65\xa9"
    OptionsExportName = b"\x00\x00\x00\x01"
    OptionUnsupported = b"\x80\x00\x00\x01"
    RequestPrefix = b"\x25\x60\x95\x13"
    RequestKindRead = b"\x00\x00"
    RequestKindWrite = b"\x00\x01"
    RequestKindClose = b"\x00\x02"
    ResponsePrefix = b"\x67\x44\x66\x98"

Next we’ll be clean and break up our code into two tiers. The upper tier will work at the logical level of NBD requests and responses so it can be concerned with the abstract behavior of our service. The lower tier can translate those requests and responses into actual byte sequences over the TCP connection. Such a separation of concerns is typical of network stacks like the OSI model). Let’s define the logical types our lower tier will return as named tuples; if you’re unfamiliar with these they’re basically Python’s version of C structs.

Option = collections.namedtuple("Option", ("kind", "data"))
TransmissionRequest = collections.namedtuple(
    "TransmissionRequest",
    ("kind", "handle", "offset", "length", "data"),
)

and write a class for our lower tier that will interpret between a raw TCP connection and our logical types

class NBDInterpreter(object):
    def __init__(self, cxn):
        self._cxn = cxn
        self._handshake()

    def _handshake(self):
        self._cxn.sendall(b"NBDMAGICIHAVEOPT")
        self._cxn.sendall(b"\x00\x01")  # minimal set of handshake flags
        client_flags = self._cxn.recv(4)
        if client_flags != MagicValues.MinimalClientFlags:
            raise ValueError("Unknown client flags: {}".format(client_flags))

    def get_client_options(self):
        # options
        while True:
            prefix = self._cxn.recv(8)
            if prefix != MagicValues.OptionRequestPrefix:
                raise ValueError(
                    "Unknown prefix in client block: {}".format(prefix))
            option = self._cxn.recv(4)
            data_len = int.from_bytes(self._cxn.recv(4), byteorder="big")
            data = self._cxn.recv(data_len)
            if option == MagicValues.OptionsExportName:  # signals transition to transmission phase
                break
            yield Option(option, data)

    def send_option_unsupported(self, option):
        self._cxn.sendall(MagicValues.OptionResponsePrefix)
        self._cxn.sendall(option.kind)
        self._cxn.sendall(MagicValues.OptionUnsupported)
        self._cxn.sendall(b"\x00" * 4)

    def send_export_response(self):
        # export size (500Kb)
        self._cxn.sendall((2**19).to_bytes(byteorder="big", length=8))
        # transmission flags
        self._cxn.sendall(b"\x00\x01")
        # zero padding
        self._cxn.sendall(b"\x00" * 124)

    def get_transmission_requests(self):
        while True:
            prefix = self._cxn.recv(4)
            if prefix != MagicValues.RequestPrefix:
                raise ValueError("Unknown block prefix: {}".format(prefix))
            flags = self._cxn.recv(2)
            if flags != b"\x00\x00":
                raise ValueError(
                    "Didn't expect any flags for command but got: {}".format(
                        flags))
            req_type = self._cxn.recv(2)
            handle = self._cxn.recv(8)
            offset = int.from_bytes(self._cxn.recv(8), byteorder="big")
            length = int.from_bytes(self._cxn.recv(4), byteorder="big")
            data = None
            if req_type == MagicValues.RequestKindWrite and length > 0:
                data = self._cxn.recv(length)
            yield TransmissionRequest(req_type, handle, offset, length, data)

    def send_transmission_response(self, handle, data=None):
        self._cxn.sendall(MagicValues.ResponsePrefix)
        self._cxn.sendall(b"\x00\x00\x00\x00")
        self._cxn.sendall(handle)
        if data:
            self._cxn.sendall(data)

Finally we’ll write a simple server to listen for incoming connections and process their requests.

def handle_cxn(cxn, state):
    iptr = NBDInterpreter(cxn)
    for opt in iptr.get_client_options():
        # we don't support any extra options
        print("Ignoring client option: {}".format(opt.kind))
        iptr.send_option_unsupported(opt)

    iptr.send_export_response()
    print("Entering transmission phase")
    for req in iptr.get_transmission_requests():
        if req.kind == MagicValues.RequestKindRead:
            print("Reading bytes {} - {}".format(req.offset,
                                                 req.offset + req.length))
            data = b"".join(state[req.offset:req.offset + req.length])
            iptr.send_transmission_response(req.handle, data)
        elif req.kind == MagicValues.RequestKindWrite:
            print("Writing bytes {} - {}".format(req.offset,
                                                 req.offset + req.length))
            state[req.offset:req.offset + req.length] = [
                b.to_bytes(byteorder="big", length=1) for b in req.data
            ]
            iptr.send_transmission_response(req.handle)
        elif req.kind == MagicValues.RequestKindClose:
            cxn.shutdown(socket.SHUT_RDWR)
            cxn.close()
            break
        else:
            raise ValueError("Unknown request type: {}".format(req.kind))


def main():
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.bind(('0.0.0.0', 2000))
    sock.setblocking(True)
    sock.listen(1)

    # contains all blocks for all devices as a contiguous list of bytes
    blocks = []
    # a list of all devices so we know the starting offset of a given device in `blocks`
    # (all devices are fixed size)
    volumes = []

    # Prototype will listen to one client at a time
    # -- can be made concurrent without much extra work
    logging.info("NBD Server Starting...")
    while True:
        cxn, client = sock.accept()
        logging.info("Connection accepted from client {}".format(client))
        handle_cxn(cxn, blocks, volumes)
        logging.info(
            "Connection closed by client {} -- listening for next client".
            format(client))

if __name__ == "__main__":
    main()

As a first pass the service can write blocks to a list in memory. Obviously for the durability guarantees we want we could swap in a file handle and use f.seek()) and f.write() instead (exercise for you). So far so easy. Let’s put it all together and test it.

Testing the Service

I’m going to want to run this service with multiple isolated clients so I’m gonna Dockerize it early and run the server and clients as containers. To be an nbd client you need to load the NBD kernel module, so sorry to folks using Docker-for-mac, this will not work out of the box for you.

Let’s write our dockerfile

FROM python:3

RUN apt-get update && apt-get install -y vim kmod nbd-server nbd-client

RUN pip3 install PySyncObj

RUN mkdir /export && touch /export/blkdev && dd if=/dev/zero of=/export/blkdev bs=1k count=500

ADD nbd.conf /etc/nbd-server/config
ADD nbd_allow.conf /etc/nbd-server/allow

and build with docker build -t lambdascheme/nbd . – next we’ll create a network for our containers. If you’re running a swarm manager you can make this an overlay network to test across multiple hosts.

$ docker network create --attachable -d overlay mycloud

Now let’s run our server

$ docker run --name nbd-srv --network mycloud -it --rm lambdascheme/nbd
NBD Server Starting...
█

Our client will be a little bit tricker because it needs access to some things on the host. So we’ll run privileged and bind mount in some special files.

$ docker run --network mycloud --entrypoint bash --privileged -v /lib/modules:/lib/modules -v /dev:/dev -it --rm lambdascheme/nbd
root@2ce8e968baf7:/#

From inside our container we’ll tell our kernel to load the nbd module and then run the nbd-client. This will make our kernel pretend that the network block device is actually a physical device connected to our machine.

# modprobe nbd
# lsmod | grep nbd
nbd                    40960  0
# nbd-client nbd-srv 2000 /dev/nbd0 -nonetlink
This method now uses the newstyle protocol with a default export
Negotiation: ..size = 4MB
bs=512, sz=4194304 bytes

What does this mean in practice? We now have a special file called /dev/nbd0 that will act as the interface to our block device. Linux has one of these device files for every device attached to the machine. It’s part of the universal i/o philosophy of Unix that ensures that all tools speak the same language. If you want a program to interact with the device at the block level it doesn’t need to do anything special. The program can read and write bytes just as it would to a file, but what Linux is doing behind the scenes is converting those to block-level operations like SCSI commands or NBD requests.

We can now use one of these programs to initialize a filesystem on our network device. For ext4 we’ll run

# mkfs.ext4 /dev/nbd0

Now we can take that filesystem and mount it under our own root filesystem

# mkdir /mnt/permanent-storage
# mount -t ext4 -o norecovery /dev/nbd0 /mnt/permanent-storage

so that whatever files we write to /mnt/permanent-storage will actually be stored over the network. Pretty cool, right? Now our container can spontaneously die and we’ll be able to recover state by re-attaching its virtual device to another container. Just to be sure it works, we’ll write a bunch of files, disconnect, re-attach, re-mount and…

# nbd-client nbd-srv 2000 /dev/nbd0 -nonetlink
Warning: the oldstyle protocol is no longer supported.
This method now uses the newstyle protocol with a default export
Negotiation: ..size = 4MB
bs=512, sz=4194304 bytes
# mkdir /mnt/permanent-storage
# mount -t ext4 -o norecovery /dev/nbd0 /mnt/permanent-storage
# tree /mnt/permanent-storage/
  /mnt/permanent-storage/
  ├── baz
  │   └── ryan_was_here
  ├── foobar
  └── lost+found

Voila! Our files remain in tact even if our container dies and we start it up on another machine. Thanks to NBD we can start to treat our machines more like cattle and less like pets because any state they store is actually on our NBD server. Many of you will rightfully say here, “but Ryan we just kicked the can down the road; our NBD servers are still pets” – you are correct. They are indeed pets, but we have made our job simpler. Let’s see if we can make our nbd servers into cattle as well.

Part 2: A Multi-Node Service

High Availability

Availability is one of those things you measure in units of “nines” – maybe you want four nines (99.99%) of uptime or four nines of successful responses. These are, from the perspective of the client, the sorts of guarantees you make and often codify in an SLA. To actually achieve these guarantees you have to think in terms of things that can go wrong or “faults”, how likely they are and how your system will respond to them. We’re going to make our system reslient to two common classes of faults, node failures (power loss, spontaneous combustion, &c) as well as network partitions (legit because someone pulled some wrong cable somewhere). For a typical cloud provider this should be enough to get you four nines of uptime.

The same strategy will actually handle both faults for us, namely we will replicate our block writes across multiple nodes. This shouldn’t be a surprise. Almost every database there is supports some form of replication for exactly this reason. Where these databases differ is in their consistency model. You see, if you have two or more copies of a piece of data they are always at risk of being out of sync or “inconsistent”. This is inevitable because of how causality works, go figure. So the question is what gaurantees CAN we provide about keeping our replicas in sync that will still make our service usable?

We’re gonna be pretty conservative with our guarantees here and say our system needs to be at least read-after-write consistent, meaning a client that updates a block will never again see an out-of-date version of that block. A block device without this guarantee is pretty useless, not just because you’d be surprised if you wrote one value and then saw a different value, but because order of writes matters a lot for our clients. If your filesystem is journaled or your database uses a write-ahead-log then it preemptively stores a stream of all writes before it actually updates them in place. These methods allow your client to prevent data corruption and recover if there is a failure part way through a write, but it won’t work if the updates show up as reads sooner than the write-aheads do. A weeker guarantee like eventual consistency has lots of applications, but this ain’t one of 'em.

To get this guarantee we’ll use a majority consensus. Let’s say we have three total nodes, then we’ll have every request, reads and writes, go through a single leader. Otherwise we’ll risk reads being stale. The leader will then have to confirm every write request with at least one other replica before the request succeeds. This way any one node can become unavailable and we’re guaranteed one of the other two has all its data.

Now if we just confirmed requests with the naive majority above we would have a bad time. We might run into a scenario where two writes to the same block happen at the same time but are seen by different replicas in different orders. Then which one’s right?

The one in the leader is because the leader is the authoritative source of truth, but the replicas have no way of knowing that. So rather than getting a consensus on individual writes, what our leader will actually do is distribute and confirm writes to a log, like the journals and write-ahead-logs we saw before. This log ensures every replica updates its state in the same order. As another benefit, if a node becomes unavailable for a while but comes back to life it can get the diff of writes since it was last updated.

Some of you will look at this diagram and say, “Hey! That looks just like Raft, that distributed consensus algorithm that’s supposed to be easier to understand than PAXOS.” To which I will respond, “Yeah! Stop interrupting. I’m trying to explain this stuff.” As I was saying, we’re going to be using Raft, a distributed consensus algorithm that works by single-leader log replication and is easier to understand than PAXOS. I don’t have the space in this article to explain or to implement Raft, but I highly recommend the visual explanation from thesecretlivesofdata. We will use Filipp Ozinov’s excellent PySyncObj library as our implementation.

The library comes with a batteries included replicated list which we’ll augment to support slices

class ReplBlocks(ReplList):
    @replicated
    def setslicesubset(self, i, j, sequence):
        self.rawData()[i:j] = sequence

    # Break up the write, otherwise it may pickle beyond the size of one packet
    # and PySyncObj isn't set up for that
    def setslice(self, i, j, sequence):
        size = 2**12
        asyncs = []
        offset = i
        while offset < j:
            end = min(offset + size, j)
            asc = AsyncResult()
            self.setslicesubset(offset, end, sequence[offset - i: end - i], callback=asc.onResult)
            asyncs.append(asc)
            offset += size

        for asc in asyncs:
            asc.event.wait(None)

    def __setitem__(self, k, v):
        if isinstance(k, slice):
            self.setslice(k.start, k.stop, v)
        else:
            super()[k] = v

then throw that in our connection handler and we are good to go.

def main():
    # log everything to stderr because compose containers for some reason aren't logging stdout
    logging.basicConfig(level=logging.DEBUG, filename='/proc/self/fd/2', filemode='w')

    peers = None if "NBDD_PEERS" not in os.environ else os.environ["NBDD_PEERS"].split(",")
    blocks = []
    volumes = []
    if peers:
        blocks = ReplBlocks()
        volumes = ReplList()
        self_address = "{}:2001".format(os.environ["NBDD_HOSTNAME"])
        peer_addresses = ["{}:2001".format(peer) for peer in peers]
        syncObj = SyncObj(self_address, peer_addresses, consumers=[blocks, volumes])
 

    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.bind(('0.0.0.0', 2000))
    sock.setblocking(True)
    sock.listen(1)

    # Prototype will listen connect to one client at a time
    # -- can be made concurrent without much extra work
    import sys
    logging.info("NBD Server Starting with peers {}...".format(peers))
    while True:
        cxn, client = sock.accept()
        logging.info("Connection accepted from client {}".format(client))
        handle_cxn(cxn, blocks, volumes)
        logging.info("Connection closed by client {} -- listening for next client".
              format(client))
        logging.info("block len is {}".format(len(blocks)))
        for i in range(DEFAULT_DEVICE_SIZE):
            if len(blocks[i]) != 1 or not isinstance(blocks[i], bytes):
                raise Exception("blocks {} is {}".format(i, repr(blocks[i])))

if __name__ == "__main__":
    main()

Easy!

Adding Control Endpoints

Thanks to our nifty Raft library we get fault tolernace without much of a code change. Let’s say our bad replica sticks around though, so it can’t talk to the other replicas but still takes in client requests.

This would be bad, m’kay? Every third request a client made would go to a server that will fail to do any operations. In response we’ll add a health check to each of our replicas. Its job will be to do a basic operation, randomly add or subtract from a counter, to verify that it is still part of a quorum. If the node fails enough times then we can indicate to a higher level control system like Swarm that this replica is bad and should be replaced. Let’s start by running a control HTTP service side-by-side with our NBD service.

class HealthHandler(BaseHTTPRequestHandler):
    counter = None

    def do_GET(s):
        if not HealthHandler.counter:
            s.send_response(200)
            s.end_headers()
            s.wfile.write(b"OK")
            return
        inc_value = random.choice([1, -1])
        try:
            HealthHandler.counter.add(inc_value)
            s.send_response(200)
            s.end_headers()
            s.wfile.write(b"OK")
        except:
            s.send_response(500)
            s.end_headers()
            s.wfile.write(b"Error writing to distributed log")

Initialize a distributed counter when we initialize our distributed block store

    if peers:
        blocks = ReplBlocks()
        volumes = ReplList()
        health_counter = ReplCounter()
        HealthHandler.counter = health_counter
        self_address = "{}:2001".format(os.environ["NBDD_HOSTNAME"])
        peer_addresses = ["{}:2001".format(peer) for peer in peers]
        syncObj = SyncObj(self_address,
                          peer_addresses,
                          consumers=[blocks, volumes, health_counter])

And finally start the health daemon in a separate thread

httpd = HTTPServer(('localhost', 8080), HealthHandler)
_thread.start_new_thread(httpd.serve_forever, ())

Start it up with docker-compose up and we have a service with health checks. Yay!!

High Throughput

What’s next for us? Well this article did say cloud scale, right? And I like to think I keep my promises. We definitely won’t get cloud scale with three measly replica nodes, but a three-replica group does make a nice unit. What we’ll want to do is start horizantally spinning up more of these units and make each unit responsible for some subset of our data, a technique called sharding. This process should be opaque to the client because the client expects to simply connect to your service by name and get an NBD volume, without knowing much about service internals. So we start to see an architecture forming, with smart load balancers in front that will proxy our requests to the appropriate shard.

Since volume names are permanent we can use them as our shard keys. The load balancer will take a hash of the key to assign it to a shard and then proxy all block data back and forth between the replica and the client.

import json
import logging
import os
import random
import socket
import _thread

from nbd.iptr import NBDInterpreter, MagicValues

PROXY_BUFFER_SIZE = 1024


class NBDLoadBalancer(object):
    def __init__(self, shards, socket_descriptor=('0.0.0.0', 2000)):
        self.shards = shards
        self.socket_descriptor = socket_descriptor

    def listen_forever(self):
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.bind(self.socket_descriptor)
        sock.setblocking(True)
        sock.listen(1)

        logging.info("NBD Load Balancer Starting...")
        while True:
            cxn, client = sock.accept()
            logging.info("Connection accepted from client {}".format(client))
            _thread.start_new_thread(self.process_cxn, (cxn, ))

    def process_cxn(self, cxn):
        iptr = NBDInterpreter(cxn)
        volume = None
        for opt in iptr.get_client_options():
            if opt.kind == MagicValues.OptionsExportName:
                volume = opt.data
            else:
                # we don't support any extra options
                logging.info("Ignoring client option: {}".format(opt.kind))
                iptr.send_option_unsupported(opt)
        shard_ix = (hash(volume) % len(self.shards))
        random.seed()
        replica = random.choice(self.shards[shard_ix])
        logging.info(
            "Sending client to replica {} shard {} for volume {}".format(
                replica, shard_ix, volume))
        repl_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        repl_sock.connect((replica, 2000))
        repl_iptr = NBDInterpreter(repl_sock, client=True)
        repl_iptr.start_session(volume)
        _thread.start_new_thread(self.proxy, (cxn, repl_sock))
        self.proxy(repl_sock, cxn)

    def proxy(self, src_sock, dest_sock):
        while True:
            payload = src_sock.recv(PROXY_BUFFER_SIZE)
            if not payload:
                dest_sock.close()
                break
            dest_sock.sendall(payload)


def main():
    # log everything to stderr because compose containers for some reason aren't logging stdout
    logging.basicConfig(level=logging.DEBUG,
                        filename='/proc/self/fd/2',
                        filemode='w')
    shards = json.loads(os.environ["NBD_SHARDS"])
    lb = NBDLoadBalancer(shards)
    lb.listen_forever()


if __name__ == "__main__":
    main()

define our services with a docker-compose yaml

version: "3.0"
services:
  nbd-lb:
    deploy:
      replicas: 3
    image: lambdascheme/nbd
    entrypoint: ["python3", "-m", "nbd.lb"]
    environment:
      NBD_SHARDS: '[["nbd-1-1", "nbd-1-2", "nbd-1-3"], ["nbd-2-1", "nbd-2-2", "nbd-2-3"]]'
  # NOTE we could use 3 replicas of one service here with the magic {{.Task.Slot}}
  # template value, but that will only work with Swarm, not standalone compose
  nbd-1-1:
    image: lambdascheme/nbd
    environment:
      NBDD_HOSTNAME: nbd-1-1
      NBDD_PEERS: nbd-1-2,nbd-1-3

  nbd-1-2:
    image: lambdascheme/nbd
    environment:
      NBDD_HOSTNAME: nbd-1-2
      NBDD_PEERS: nbd-1-1,nbd-1-3

  nbd-1-3:
    image: lambdascheme/nbd
    environment:
      NBDD_HOSTNAME: nbd-1-3
      NBDD_PEERS: nbd-1-1,nbd-1-2

  nbd-2-1:
    image: lambdascheme/nbd
    environment:
      NBDD_HOSTNAME: nbd-2-1
      NBDD_PEERS: nbd-2-2,nbd-2-3

  nbd-2-2:
    image: lambdascheme/nbd
    environment:
      NBDD_HOSTNAME: nbd-2-2
      NBDD_PEERS: nbd-2-1,nbd-2-3

  nbd-2-3:
    image: lambdascheme/nbd
    environment:
      NBDD_HOSTNAME: nbd-2-3
      NBDD_PEERS: nbd-2-1,nbd-2-2

networks:
  default:
    external:
      name: mycloud

and finally spin it all up

$ docker-compose up
nbd-lb_1   | INFO:root:NBD Load Balancer Starting...
nbd-lb_2   | INFO:root:NBD Load Balancer Starting...
nbd-lb_3   | INFO:root:NBD Load Balancer Starting...
nbd-2-3_1  | INFO:root:NBD Server 'nbd-2-3' Starting with peers ['nbd-2-1', 'nbd-2-2']...
nbd-1-2_1  | INFO:root:NBD Server 'nbd-1-2' Starting with peers ['nbd-1-1', 'nbd-1-3']...
nbd-1-3_1  | INFO:root:NBD Server 'nbd-1-3' Starting with peers ['nbd-1-1', 'nbd-1-2']...
nbd-2-2_1  | INFO:root:NBD Server 'nbd-2-2' Starting with peers ['nbd-2-1', 'nbd-2-3']...
nbd-2-1_1  | INFO:root:NBD Server 'nbd-2-1' Starting with peers ['nbd-2-2', 'nbd-2-3']...
nbd-1-1_1  | INFO:root:NBD Server 'nbd-1-1' Starting with peers ['nbd-1-2', 'nbd-1-3']...

Spin up a couple of containers, and attach some volumes

$ docker run --network mycloud --entrypoint bash --privileged -v /lib/modules:/lib/modules -v /dev:/dev -it --rm nbd
root@2ce8e968baf7:/# nbd-client nbd-lb 2000 /dev/nbd0 -nonetlink
This method now uses the newstyle protocol with a default export
Negotiation: ..size = 4MB
bs=512, sz=4194304 bytes

Hooray! It all works. We should see log messages from both of our shards which tell us we are sharding properly. Hooray!

Part 3: Measuring Performance

Improving Capacity

As mentioned at the beginning we want to run a production Postgres workload on our newly distributed file system. As our rubber hits the road here we realize 5MiB of in-memory storage per drive won’t cut it. Postgres alone has a minimum requirement of 500MiB! If we naively up our DEFAULT_BLOCK_COUNT to give us that capacity we start to see our containers exit with 137 (out of memory killed). So, sooner than we planned we’ll have to start writing blocks to disk. As a side-effect we’ll get durability in the event a replica restarts.

Swapping out our block array for a file handle should be easy enough. We’ll just change from slice writes to an atomic seek-and-write; similarly a slice read can be an atomic seek-and-read. We can’t just add a file handle as an attribute to our ReplFile and call it a day because of how PySyncObj does snapshotting. It serializes the entire object with pickle and file handles are obviously unpicklable. So instead we’ll define our replica’s state with a global object.

class LocalState(object):
    f = None
    lock = None

...
def main():
	...
    LocalState.f = open('blocks', 'r+b')
    LocalState.lock = threading.Lock()

and define a replicated file class that can read and write to the local state

class ReplFile(SyncObjConsumer):
    @replicated
    def write(self, offset, contents):
        with LocalState.lock:
            LocalState.f.seek(offset)
            return LocalState.f.write(contents)
            

    def read(self, offset, length):
        with LocalState.lock:
            LocalState.f.seek(offset)
            return LocalState.f.read(length)

Easy enough. While we’re at it let’s collect and send opentracing data from our function calls so we can home in on the bottle-necks in our service. Set up a jaeger UI to collect the metrics

  jaeger:
    image: jaegertracing/all-in-one:0.8.0
    ports:
      - 16686:16686 

Set the env var JAEGER_AGENT_HOST=jaeger for every replica and configure the tracer

    tracer = jaeger_client.Config(
        config={
            'sampler': {
                'type': 'const',
                'param': 1,
            },
            'logging': True,
        },
        service_name='nbd',
    ).initialize_tracer()

Finally we can add tracers to our write functions

            with tracer.start_span('write-all-replicas'):
                blocks.write(start, req.data)
...
    @replicated
    def write(self, offset, contents):
        with LocalState.lock:
		    with LocalState.tracer.start_span('write-local-replica'):
              LocalState.f.seek(offset)
              return LocalState.f.write(contents)

and do a write heavy operation like the mkfs.ext4 from earlier. Going to our jaeger UI we see

Improving Throughput

From gathering our data we notice that our replicated write operation has quite a high turnaround (in the hundres of microseconds) whereas any inidivual local write is not that expensive. That in combination with the rising memory usage over time tells us that adding our writes to our Raft log is becoming a bottle-neck.

We shouldn’t be too surprised here. Adding the contents of every single write operation we perform to disk to a log is bound to be resource intensive. So we start to have another idea. What if we still use Raft to order our writes but we keep the blocks themselves separate.

The basic idea here is that each replica holds onto some number of recently written blocks that it is able to share. When we replicate a write operation we actually reference the contents by id so each replica fetches the contents and writes them locally.

The approach we’ll go with is a basic pull model in which each replica receives a write operation, asks each of its peers for the corresponding contents, and writes them locally. So start by writing a simple round-robin cache that will keep new write in memory long enough for the peers to ask for them.

class LoglessCache(object):
    def __init__(self, capacity=100):
        self._roundrobin = [b''] * capacity
        self._uuid2write = {}
        self._lock = threading.Lock()
        self._ix = 0

    def set(self, newuuid, val):
        with self._lock:
            curuuid = self._roundrobin[self._ix]
            if curuuid in self._uuid2write:
                del self._uuid2write[curuuid]
            self._roundrobin[self._ix] = newuuid
            self._uuid2write[newuuid] = val
            self._ix = (self._ix + 1) % len(self._roundrobin)

    def get(self, write_uuid):
        with self._lock:
            return self._uuid2write.get(write_uuid, b'')

Now we’ll write a class that will fetch a given write by ID, either locally or from our peers using a very simple key fetch protocol on top of TCP.

class WriteSharer(object):
    def __init__(self, peers, cache):
        self.peers = peers
        self.locks = {peer: threading.Lock() for peer in self.peers}
        self.cache = cache
        self.clients = {}

    def listen_for_asks(self):
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.bind(("0.0.0.0", 2002))
        sock.setblocking(True)
        sock.listen(1)
        while True:
            cxn, _client = sock.accept()
            _thread.start_new_thread(self._handle_asks, (cxn, ))

    def _handle_asks(self, cxn):
        while True:
            write_uuid_len = int.from_bytes(next_n_bytes(cxn, 4),
                                            byteorder="big")
            write_uuid = next_n_bytes(cxn, write_uuid_len)
            write = self.cache.get(write_uuid)
            cxn.sendall(len(write).to_bytes(byteorder="big", length=4))
            cxn.sendall(write)

    def get_write(self, write_uuid, check_first):
        peers = list(self.peers)
        if check_first in peers:
            peers.remove(check_first)
            peers.insert(0, check_first)
        write = self.cache.get(write_uuid)
        if write != b'':
            return write
        for peer in peers:
            with self.locks[peer]:
                client = self.clients.get(peer)
                if client is None:
                    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                    client.connect((peer, 2002))
                    self.clients[peer] = client
                client.sendall(
                    len(write_uuid).to_bytes(byteorder="big", length=4))
                client.sendall(write_uuid)
                write_len = int.from_bytes(next_n_bytes(client, 4),
                                           byteorder="big")
                if write_len != 0:
                    return next_n_bytes(client, write_len)
        raise ValueError("Write not found", write_uuid)

Finally we’ll wrap our replicated block write in a method that first puts a given write in the local cache and then references it by ID to all the replicas.

class ReplFile(SyncObjConsumer):
	...
    def lead_write(self, offset, data):
        sync = False
        with LocalState.lock:
            LocalState.write_count += 1
            if LocalState.write_count >= 20:
                LocalState.write_count = 0
                sync = True
        write_uuid = uuid.uuid1().bytes
        LocalState.write_sharer.cache.set(write_uuid, data)
        self.write(LocalState.hostname, offset, write_uuid, sync=sync)

And there we have it. You may be wondering here what Raft actually gets us since we’re writing a lot of the data plane and replication logic ourselves. Remember that Raft still gives us a consistent order in which we apply our writes, a consistent commit point across all replicas in the event of a leader fault, and a single leader in the event of a network partition. In short, Raft still gives us consistency. Now re-run our mkfs.ext4 once more and we see to our smiling faces that blocks writes have been brought down to ~140 microseconds

Measuring the Result

Finally we will run our service through a series of benchmarks to see in aggregate how it performs. First a basic write performance test using dd to write 128MiB.

# time bash -c "sync; dd if=/dev/zero of=/dev/nbd0 bs=1K count=131072; sync"
131072+0 records in
131072+0 records out
134217728 bytes (134 MB, 128 MiB) copied, 0.316046 s, 425 MB/s

real    0m58.020s
user    0m0.080s
sys     0m0.428s

Next a formal read benchmark using hdparm

  # hdparm -tT /dev/nbd0
  
  /dev/nbd0:
   Timing cached reads:     2 MB in  5.25 seconds = 389.93 kB/sec
    HDIO_DRIVE_CMD(identify) failed: Invalid argument
	 Timing buffered disk reads:   2 MB in  5.65 seconds = 362.26 kB/sec

Finally we’re going to actually initialize a postgres database and run pgbench. Start by mounting our device to /mnt/pg and making sure the postgres user owns this directory. Then switch users and initialize the database

# sudo -i -u postgres
# /usr/lib/postgres/11/bin/initdb -D /mnt/pg/pgdata

Start the database

# /usr/lib/postgres/11/bin/postgres -D /mnt/pg/pgdata

and run the benchmark

postgres@ab1e5d60ceb3:~$ /usr/lib/postgresql/11/bin/pgbench -i
...
postgres@ab1e5d60ceb3:~$ /usr/lib/postgresql/11/bin/pgbench -c 10
starting vacuum...end.
transaction type: <builtin: TPC-B (sort of)>
scaling factor: 1
query mode: simple
number of clients: 10
number of threads: 1
number of transactions per client: 10
number of transactions actually processed: 100/100
latency average = 737.976 ms
tps = 13.550585 (including connections establishing)
tps = 13.555556 (excluding connections establishing)

and there we have our benchmarks.

Conclusions

To summarize we can lay out the throughput of our NBD service against a bare cloud instance disk

NBD Service Bare Cloud Instance
dd copy throughput 425 MB/s 312 MB/s
dd sync time (128 MiB) 58s .8s
hdparm cached reads 390 kB/s 9190 MB/s
hdparm buffered reads 360 kB/s 170 MB/s
pgbench tps 14 490

and we’ll see our service is ~1% as performant across the board. Unsurprisingly it’s not ready for the big leagues yet, but it’s not bad for less than 400 lines of Python. The next big hurdle for performance would be to allow multiple writes concurrently, which maybe we’ll address in a future post. The need for a serial request-response on every block really kills our throughput. Regardless I hope you’ve learned plenty here about block storage and the design of distributed systems. I sure have. Until next time!

Further Reading