Writing an Alternative TCP

with Python

You don’t really understand something until you’ve thoroughly considered its alternative. I’ve always felt that. This is a point maybe reminiscent in your mind of David Foster Wallace’s Parable of the two fish who don’t know what water is or perhaps of John Stuart Mill’s famous quote, “He who knows only his own side of the case knows little of that”. Maybe you know this frame of mind simply as “first principles thinking.”

Whatever this frame makes you think of, today we’re going to put it into practice and consider an alternative to something that has become so ubiquitous in our internet infrastructure that it may as well be the water we don’t know exists. I’m talking about the all too standard Transmission Control Protocol.

In particular we’re going to replace the protocol’s core sliding window of packets with something I’ll call a “sliding fence”. Our work here is more than just a fun science project. My suspicion is that this new approach outperforms the old one in situations with high rates of randomly dropped packets as we might expect in a peer-to-peer network like the coming Internet of Things or perhaps Starlink. We’ll spend parts 1 and 2 going over the design and implementation of this new system and then in part 3 we’ll put it to the test and see how it fares against the classic sliding window. Part 3 will also be a great opportunity for us to explore and learn about Linux virtual networking primitives which we’ll be using to set up our experiment. As always, the code here (and its unit tests) are available on github.

Part 1: The Design

Sliding Windows and Sliding Fences

Hopefully you’re all familiar with the basic mechanism of TCP, but as a refresher, imagine you want to send a stream of text to someone over the internet. You do this every time you make a search request to Google.

>>> import socket
>>>
>>> sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
>>> sock.connect(("www.google.com", 80))
>>>
>>> sock.send(b"GET / HTTP/1.1\n\n")
>>> print(sock.recv(100).split(b"\n")[0].strip().decode("utf-8"))

HTTP/1.1 200 OK

TCP lets you write this stream of characters and then it will, behind the scenes, disassemble your stream into a set of packets that can then be transported over the internet. Each packet gets 20 bytes of header data so that the other end can re-assemble the packets in the right order and be aware of any packets that get dropped. Of course the socket to which you’re writing is full-duplex so Google can likewise respond with the result of your query by sending a stream of packets you can re-assmble.

TCP’s sliding window is what lets each side retransmit any packets that are suspected of being lost in transit

Basically each side will periodically send acknowledgements of the most recent contiguous packet it received. In other words if the client gets an “ACK” for packet 2, the server is saying, “Hey, I got every packet up to and including packet 2.” The client will have a fixed sized window of say 16 contiguous packets that it is sending on repeat, and when it receives an ack it slides that window to the right of the acked packet to establish a new base. The client can also delete all the old packets at this point since they’ve been successfully delivered.

This approach is so elegant that there’s no wonder it has been used for over 40 years, but can we do better?! Let’s observe that if the network becomes hyper-congested and our routers have to start dropping packets at a high rate, then our client and server will start to send a lot of redundant packets. Let’s imagine for simplicity that the net effect of our congested routers is that exactly half of the packets will be dropped by the time they reach the server. We can simulate this packet dropping with the help of python

>>> window_l, window_r = 0, 16
>>> sorted(random.sample(range(window_l, window_r), (window_r - window_l) // 2))
[0, 2, 3, 5, 6, 8, 9, 10]

Running this for a few iterations we get

Iteration Client Sends Window Server Receives Server Acknowledges
1 0 - 15 0,2,3,5,6,8,9,10 0
2 1 - 15 2,7,9,10,11,12,13 0
3 1 - 15 0,7,8,9,11,12,15 3
4 4 - 15 5,6,7,9,12,14 15

If we follow along with packet 15 we’ll see that if any one of its predecessors fails to make it to the other end then we have to retransmit packet 15, even if the server already received it. So we could easily end up transmitting the same packet 4 times! Not only is that super wasteful but it will only compound onto our network congestion and make the problem worse. And we haven’t even accounted for the fact that ACK packets can be dropped as well 😱

So what then is a sliding fence and how does it address this problem? The idea is that the server responds with the highest contiguous packet it received as well as a boolean array of the following 32 packets indicating which ones were received.

Iteration Client Sends Server Receives Server Acknowledges
1 0 - 15 0,2,3,5,6,8,9,10 0,2,3,5,6,8,9,10
2 1,4,7,11,12,13,14,15 1,11,14,15 0-3,5,6,8-11,14,15
3 4,7,12,13 4,13 0-6,8-11,13-15
4 7,12 7,12 0-15

This way the client can build a more detailed model of which packets need to be sent and cut down on the waste. Notice in this case it still took as many iterations to get the full 16 packet window across the network due to the same packet loss, but it took far fewer packets. 30 total as compared to a whopping 58!

The Protocol

Now that we have an idea for what we want to create let’s fill in some details. TCP gives us a lot of additional features and we need to clarify what is valuable for our prototype and what is superfluous.

  1. Stream re-assembly / resilience to out-of-order arrival (via sequence numbers)
  2. Packet retransmission / resilience to router fault (via sliding window)
  3. Multi-plexing (via source/dest ports)
    — cut line —
  4. Error detection / data integrity (via message checksum)
  5. Synchronization / resilience to peer reconnect (via random initial sequence number)

I’ve distilled the list of TCP guarantees to the top five and will say for our purposes we don’t care about items 4 and 5 because they are independent of what we’re measuring. So our job is to devise a protocol that meets critera 1 - 3 and test it. We’ll write our system on top of UDP so that we’ll get a lot for free including an existing socket abstraction and some multi-plexing (property number 3).

What we end up with is a simplified variant of the TCP header that looks like

Where we have just three packet types

Type Function
DAT Open a new connection or send data
ACK Acknowledge receipt of packets
FIN Close an existing connection

which will suffice to establish basic connections, send and acknowledge data, and retransmit lost packets. We’ll make copious use of procedures on timed loops so that we time out dead connections and add a kind of keepalive to our protocol and as we’ll see. We may still have some race conditions when it comes to re-establishing connections for the same peers but we’re happy to ignore those.

The Architecture

We’ll support this protocol using four classes of components, each with a well defined boundary of responsibility

  1. Firstly a Packet Assembler will be responsible for taking packets from the peer as input, storing them, and assembling them into a stream as it becomes possible to do so. It provides the read interface for application logic by combining the bytes of the longest contiguous block of packets from the left. It also is responsible for generating an ack packet because it keeps track of what packets in the sliding fence still have not been received.
  2. Next the Stream Disassembler is basically opposite of the Packet Assembler. It takes as input the stream of bytes provided by the application layer using the write interface and is responsible for turning that byte stream into packets. The crux of the Stream Disassembler’s complexity is that it needs to keep track of which packets the peer has seen so the disassembler is what is notified when an ACK packet is received.
  3. The RCP Socket simply provides a thin facade on top of the Packet Assembler and Stream Disassembler, gluing their methods together, so that the application layer only has to interface with one object, much like a UNIX socket
  4. Finally the RCP Router is what interfaces between our components and the raw UDP packet. Think of it like a UNIX socket before we make the initial connect or accept call. As such when the router is in listen mode, triggered by calling the listen method, the router has three core responsiblities
    • Listen for new client connections, identified by their source address and port, and create a new assembler-disassembler-socket stack for each connection
    • Spin up a read loop, a thread which will listen for incoming packets on the UDP socket and route them to the correct stack
    • Spin up a write loop, a thread which will periodically ask each stack if it has packets ready to send and relay them to the UDP socket, this thread is also responsible for keeping track of how alive each connection is and timing the connection out if it goes dead
      The operation of a Router on the client-side is similar but simpler. The Router just needs to create a single stack and corresponding read loop and write loop.

Note that the mapping from Router to socket stacks is one-to-many. We will always have exactly one Router, one read loop, and one write loop, but they will interface with many Stream Assemblers and Packet Disassemblers, one for each incoming connection.

Part 2: The Implementation

Packets

With our design set from a bird’s eye view let’s start implementing our components. First we need to establish how the components will interface with each other with a common medium of data, namely the packet. A packet should be easy to serialize and send over a socket, but also easy to work with at a higher level with accessible data fields so we’ll use a fancy python3 dataclass for this.



import logging
import os

from dataclasses import dataclass
from enum import Enum

logger = logging.getLogger()

SEQ_NUMBER_BYTES = 4  # 2**32 possible seq numbers
SLIDING_FENCE_SIZE = 32  # the sliding fence will consist of 32 packets at a time


class SlidingFenceType(Enum):
    SYN = 1
    ACK = 2
    FIN = 3


@dataclass
class SlidingFencePacket:
    pckt_type: SlidingFenceType
    seq_number: int
    acks: [bool]
    data: bytes

    def encode(self) -> bytes:
        if self.pckt_type == SlidingFenceType.ACK:
            if len(self.acks) != SLIDING_FENCE_SIZE or len(self.data) != 0:
                raise ValueError(
                    f"ACKS packet should have {SLIDING_FENCE_SIZE} acks and no data"
                )
            return bytes([self.pckt_type.value]) + \
                self.seq_number.to_bytes(SEQ_NUMBER_BYTES, 'big') + \
                sum(2**i for i, v in enumerate(self.acks) if v).to_bytes(SEQ_NUMBER_BYTES, 'big')
        if self.pckt_type == SlidingFenceType.FIN:
            if len(self.acks) > 0 or len(self.data) > 0:
                raise ValueError("FIN packets shouldn't have data or acks")
            return bytes([self.pckt_type.value]) + \
                self.seq_number.to_bytes(SEQ_NUMBER_BYTES, 'big')
        if self.pckt_type == SlidingFenceType.SYN:
            if len(self.acks) > 0:
                raise ValueError("SYN packet shouldn't have acks")
            return bytes([self.pckt_type.value]) + \
                self.seq_number.to_bytes(SEQ_NUMBER_BYTES, 'big') + \
                self.data
        raise ValueError("Unknown packet type")

    @classmethod
    def decode(cls, data: bytes):
        pckt_type = SlidingFenceType(data[0])
        seq_number = int.from_bytes(data[1:5], 'big')
        if pckt_type == SlidingFenceType.SYN:
            return cls(
                pckt_type=pckt_type,
                seq_number=seq_number,
                acks=[],
                data=data[5:],
            )
        if pckt_type == SlidingFenceType.ACK:
            encoded_acks = int.from_bytes(data[5:9], 'big')
            return cls(
                pckt_type=SlidingFenceType(data[0]),
                seq_number=int.from_bytes(data[1:5], 'big'),
                acks=[
                    bool(2**i & encoded_acks)
                    for i in range(SLIDING_FENCE_SIZE)
                ],
                data=b'',
            )
        if pckt_type == SlidingFenceType.FIN:
            return cls(
                pckt_type=pckt_type,
                seq_number=seq_number,
                acks=[],
                data=b'',
            )


The Socket Stack

Next we’ll implement the three components that make up a socket stack, namely an RCP Socket, a Packet Assembler, and a Stream Disassembler. Remember the Assembler needs to provide the read interface and provide a mechanism to create ack packets. Mirror this with the Disassembler that needs to provide a write interface and a mechanism to discard acked packets.



class StreamDisassembler(object):
    def __init__(self, packet_size):
        self.packet_size = packet_size
        self.buff = b''
        self.packets = {}
        self.next_ix = 0
        self.lock = threading.Lock()
        self.finished = False

    def write(self, bts):
        with self.lock:
            if self.finished:
                raise ValueError("Can't write more bytes after finishing")
            self.buff += bts
            while len(self.buff) >= self.packet_size:
                self.packets[self.next_ix] = SlidingFencePacket(
                    pckt_type=SlidingFenceType.SYN,
                    seq_number=self.next_ix,
                    acks=[],
                    data=self.buff[:self.packet_size],
                )
                self.next_ix += 1
                self.buff = self.buff[self.packet_size:]

    def flush(self, finish=False, allow_empty=False):
        with self.lock:
            if self.buff or allow_empty:
                self.packets[self.next_ix] = SlidingFencePacket(
                    pckt_type=SlidingFenceType.SYN,
                    seq_number=self.next_ix,
                    acks=[],
                    data=self.buff,
                )
                self.buff = b''
                self.next_ix += 1
            if finish and not self.finished:
                self.packets[self.next_ix] = SlidingFencePacket(
                    pckt_type=SlidingFenceType.FIN,
                    seq_number=self.next_ix,
                    acks=[],
                    data=b'',
                )
                self.next_ix += 1
                self.finished = True

    def ready_to_send(self):
        with self.lock:
            if not self.packets:
                return []
            min_packet = min(
                self.packets
            )  # would be more performant to keep this dict ordered, but for our purposes this is fine
            return [
                packet for seq_number, packet in self.packets.items()
                if (seq_number - min_packet) < 32
            ]

    def acknowledge(self, packet):
        with self.lock:
            for i in range(SLIDING_FENCE_SIZE):
                seq_number = packet.seq_number + i
                if packet.acks[i] and seq_number in self.packets:
                    # NOTE using this env var will turn our sliding fence into a regular old
                    # sliding window which is useful for benchmarking
                    if "SLIDING_FENCE_DISABLED" not in os.environ:
                        del self.packets[seq_number]
            under = [
                seq_number for seq_number in self.packets
                if seq_number < packet.seq_number
            ]
            for u in under:
                del self.packets[u]

    def done(self):
        with self.lock:
            return self.finished and not self.packets and not self.buff


class PacketAssembler(object):
    def __init__(self):
        self.buff = b''
        self.packets = {}
        self.next_ix = 0
        self.lock = threading.Lock()
        self.finished = False

    def read(self, cap=None):
        while True:
            with self.lock:
                readlen = len(self.buff) if cap is None else cap
                toret = self.buff[:readlen]
                self.buff = self.buff[readlen:]
            if not toret and not self.finished:
                time.sleep(
                    .01
                )  # HACK if there is no content to read, sleep the cycle time
                continue
            return toret

    def assemble(self, packet):
        with self.lock:
            if packet.seq_number < self.next_ix:
                return
            if self.finished:
                raise ValueError(f"Packet after FIN {packet}")
            self.packets[packet.seq_number] = packet
            while self.next_ix in self.packets:
                packet = self.packets[self.next_ix]
                self.buff += packet.data
                del self.packets[self.next_ix]
                self.next_ix += 1
                if packet.pckt_type == SlidingFenceType.FIN:
                    self.finished = True

    def gen_ack(self):
        with self.lock:
            return SlidingFencePacket(
                pckt_type=SlidingFenceType.ACK,
                seq_number=self.next_ix,
                acks=[
                    self.next_ix + i in self.packets
                    for i in range(SLIDING_FENCE_SIZE)
                ],
                data=b'',
            )

    def done(self):
        with self.lock:
            return self.finished and not self.packets and not self.buff



class RCPSocket(object):
    def __init__(self, packet_size):
        self.packet_size = packet_size
        self.assembler = PacketAssembler()
        self.disassembler = StreamDisassembler(self.packet_size)

    def recv(self, cap=None):
        return self.assembler.read(cap)

    def send(self, bts):
        return self.disassembler.write(bts)

    def close(self):
        self.disassembler.flush(finish=True)

    def read(self):
        buff = b''
        while True:
            msg = self.recv()
            if msg == b'':
                return buff
            buff += msg

    def done(self):
        return self.disassembler.done() and self.assembler.done()


Router

Finally we’ll implement the router which will bridge our individual socket stacks to the underlying UDP socket. Note that we confine our read and write loops each to a single method that can be spun up in a separate thread and that we provide the standard UNIX listen, accept, and connect methods.



import socket
import threading
import time


class RCPRouter(object):
    packet_size = 256  # 256 bytes is small but makes for a fair comparison with sliding window
    send_loop_ms = 10
    timeout_ms = 10 * send_loop_ms  # time out a session if we don't hear from it in ten iterations - for 50% packets dropped that's ~.1% chance false positive

    def __init__(self, bind_addr):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.sock.bind(bind_addr)
        self.sessions = {
        }  # map each source to a liveness timestamp and socket
        self.accept_queue = []
        self.lock = threading.Lock()

    def connect(self, addr):
        with self.lock:
            if addr in self.sessions:
                raise ValueError("Attempt to create duplicate session")
            sock = RCPSocket(self.packet_size)
            sock.disassembler.flush(
                allow_empty=True,
            )  # seed the send loop with an initial packet
            self.sessions[addr] = [time.time(), sock]
            self.listen()
            return sock

    def listen(self):
        # TODO would be nice to raise an exception here if we're already listening
        threading.Thread(target=self.write_loop).start()
        threading.Thread(target=self.read_loop).start()

    def accept(self):
        while True:
            with self.lock:
                if self.accept_queue:
                    return self.accept_queue.pop()
            time.sleep(
                self.send_loop_ms / 1000,
            )  # NOTE send loop is a convenient time for this but there's no need to couple

    def write_loop(self):
        while True:
            before = time.time()
            if self.closed():
                return  # May want to finish remaining work first
            with self.lock:
                snapshot = list(self.sessions.items())

            for peer, session in snapshot:
                last_recv, sock = session
                sock.disassembler.flush()
                try:
                    self.sock.sendto(sock.assembler.gen_ack().encode(), peer)
                    for packet in sock.disassembler.ready_to_send():
                        self.sock.sendto(packet.encode(), peer)
                except OSError:
                    return  # HACK likely means the UDP socket is closed
                timedout = ((time.time() - last_recv) * 1000) >= self.timeout_ms
                if timedout or sock.done():
                    with self.lock:
                        del self.sessions[peer]
            after = time.time()
            # TODO add some jitter here to put the two ends out of phase
            time_left = (self.send_loop_ms / 1000) - (after - before)
            if time_left > 0:
                time.sleep(time_left)

    def read_loop(self):
        while True:
            if self.closed():
                return  # May want to finish remaining work first
            max_size = max(
                1 + SEQ_NUMBER_BYTES + SLIDING_FENCE_SIZE / 8,
                1 + SEQ_NUMBER_BYTES + self.packet_size,
            )
            try:
                data, peer = self.sock.recvfrom(max_size)
            except OSError:
                return  # HACK likely UDP socket closed

            packet = SlidingFencePacket.decode(data)
            with self.lock:
                if peer not in self.sessions:
                    if packet.seq_number >= SLIDING_FENCE_SIZE:
                        logger.info(f"Ignoring packet, likely from old session of peer {peer}")
                        continue
                    sock = RCPSocket(self.packet_size)
                    self.sessions[peer] = [time.time(), sock]
                    self.accept_queue.insert(0, sock)
                else:
                    self.sessions[peer][0] = time.time()
                sock = self.sessions[peer][1]
            # NOTE this algorithm likely doesn't do a great job when the client is re-establishing a connection
            # with a server and the server is still on the old one, but for our purposes we don't really care
            if packet.pckt_type == SlidingFenceType.SYN:
                sock.assembler.assemble(packet)
            elif packet.pckt_type == SlidingFenceType.ACK:
                sock.disassembler.acknowledge(packet)
            elif packet.pckt_type == SlidingFenceType.FIN:
                sock.assembler.assemble(packet)
                sock.close()
            else:
                raise ValueError("Unknown packet type")

    def close(self):
        self.sock.close()

    def closed(self):
        return self.sock.fileno() == -1

Part 3: How it Fares

Setting Up the Experiment

At this point our new and shiny network stack should be ready to test on a “real” (virtual) network. This means that for testing we get all that Linux has to offer out of the box for simulating network congestion. Let’s start by setting up a three network stacks, one of which will act as the intermediary “router” to which we’ll introduce some congestion.







%0


cluster0

A


cluster1

B


cluster2

C



AB

AB



BA

BA



AB->BA





BA->AB





BC

BC



CB

CB



BC->CB





CB->BC






ip netns add A
ip netns add B
ip link add AB type veth peer name BA
ip link set AB netns A
ip link set BA netns B

For those of you unfamiliar with network namespaces and veth pairs, basically what we’re doing is created two isolated network stacks, called A and B, on our machine and then setting up a link between them, called AB and BA in the two stacks respectively. If you want to play with this set-up yourself then at this point you can run the following in the two namespaces (with ip netns exec A python)

ETH_P_ALL = 3
import socket
s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(ETH_P_ALL))
s.bind(("AB", 0)) # or BA in netns B
s.send(b'Hello, world!!') # needs to be at least 14 bytes to be a valid frame
s.recv(64)

and you should be sending byte frames out into the virtual ether from one netns that will be received in the other netns. Next we’ll create the same set-up between B and a third netns creatively called C.


ip netns add C
ip link add BC type veth peer name CB
ip link set BC netns B
ip link set CB netns C

Next we’ll add ip addresses to all our virtual ethernet devices so they can support the multi-hop route from A to C.


# Don't assign .0 or .255 as they are reserved (e.g. for broadcasting)
# See RFC 950
ip -n A addr add 10.0.0.1/24 dev AB
ip -n B addr add 10.0.0.2/24 dev BA
ip -n B addr add 10.0.1.1/24 dev BC
ip -n C addr add 10.0.1.2/24 dev CB

ip -n A link set AB up
ip -n B link set BA up
ip -n B link set BC up
ip -n C link set CB up

Finally we’ll bring up the loopback interface in each netns which is necessary to get arp working correctly


for ns in A B C;
do
    ip -n $ns link set lo up
done

At this point you should be able to ping namespace B from namespace A (ip netns exec A ping 10.0.0.2), but if you try to ping namespace C you’ll hit a snag. What’s going on? We need to explicitly configure namespace B to forward traffic from A to C and vice versa. This way when the kernel sees a packet come into interface BA or BC it will ask itself what the final destination is for the packet and send it on its merry way.


ip netns exec B sysctl -w net.ipv4.ip_forward=1
ip netns exec B iptables -A FORWARD -i AB -o BA -j ACCEPT
ip netns exec B iptables -A FORWARD -i BA -o AB -j ACCEPT

Namespaces A and C don’t know that B is the intermediary between them so we have to explicitly add route rules


ip -n A route add 0.0.0.0/0 via 10.0.0.2 dev AB
ip -n C route add 0.0.0.0/0 via 10.0.1.1 dev CB

Simulating Congestion

At this point we can confirm the network route from A to C works by creating a TCP session between them with netcat. In one shell run

sudo ip netns exec A nc -l 10.0.0.1 1234

and in another run

sudo ip netns exec C nc 10.0.0.1 1234

and observe the two way in-order communication. Next we can also simulate traffic congestion by getting our intermediary “router” B to start dropping packets. We’ll run the following to modify our iptables rules to do random packet dropes

sudo ip netns exec B iptables -F FORWARD
sudo ip netns exec B iptables -A FORWARD -m statistic --mode random --probability .5 -j DROP

and then we’ll observe we do start to see a drop

 $ sudo ip netns exec C ping 10.0.0.1
PING 10.0.0.1 (10.0.0.1) 56(84) bytes of data.
64 bytes from 10.0.0.1: icmp_seq=2 ttl=63 time=0.067 ms
64 bytes from 10.0.0.1: icmp_seq=3 ttl=63 time=0.055 ms
64 bytes from 10.0.0.1: icmp_seq=6 ttl=63 time=0.066 ms
64 bytes from 10.0.0.1: icmp_seq=10 ttl=63 time=0.066 ms
64 bytes from 10.0.0.1: icmp_seq=15 ttl=63 time=0.065 ms
^C
--- 10.0.0.1 ping statistics ---
16 packets transmitted, 5 received, 68.75% packet loss, time 15358ms
rtt min/avg/max/mdev = 0.055/0.063/0.067/0.004 ms

Note that the packet drop is 50% likely to happen in each direction so we would actually expect our client to see a 75% drop of ping request-responses which is close to what we see with 16 samples.

Collecting the results

Now we have all the primtives to start collecting results. We will do two batches of tests, one with sliding fences enabled and one without, using a simple echo server and a client that will send random data and confirm the data are echoed back. Let’s start by writing the echo server using the simple socket abstraction we have created

import functools
import logging
import sys
import threading

import rcp

logger = logging.getLogger()


def _serve_sock(sock):
    while msg := sock.recv():
        sock.send(msg)
    sock.close()


def listen_and_serve(router):
    while True:
        sock = router.accept()
        threading.Thread(target=functools.partial(_serve_sock, sock)).start()


def main():
    logging.basicConfig(level=logging.DEBUG)
    addr, port = sys.argv[1:]
    router = rcp.RCPRouter((addr, int(port)))
    logger.info(f"Starting RCP Echo Server at {addr}:{port}")
    router.listen()
    listen_and_serve(router)


if __name__ == "__main__":
    main()

Next we will write the client with some special logic to record and log the number of packets it took send the data

import hashlib
import json
import os
import random
import sys
import threading
import time

import rcp


class PacketCountingMiddleware(object):
    def __init__(self, wrapped, n=2):
        self.wrapped = wrapped
        self.lock = threading.Lock()
        self.total_sends = 0
        self.total_recvs = 0
        self.n = n

    def sendto(self, msg, address):
        with self.lock:
            self.total_sends += 1
        return self.wrapped.sendto(msg, address)

    def recvfrom(self, n):
        with self.lock:
            self.total_recvs += 1
        return self.wrapped.recvfrom(n)

    def fileno(self):
        return self.wrapped.fileno()

    def close(self):
        return self.wrapped.close()


def _log_datum(location, datum, condition):
    results = {}
    if os.path.exists(location):
        with open(location) as f:
            results = json.load(f)
    if condition not in results:
        results[condition] = [datum]
    else:
        results[condition].append(datum)
    with open(location, 'w') as f:
        json.dump(results, f, indent=4)


def main():
    src_addr, src_port, dst_addr, dst_port, condition = sys.argv[1:]
    router = rcp.RCPRouter((src_addr, int(src_port)))
    router.sock = PacketCountingMiddleware(router.sock)
    sock = router.connect((dst_addr, int(dst_port)))

    time_start = time.time()
    random.seed()
    sent_hash = hashlib.sha256()
    sent_len = 0
    for i in range(1000):
        to_send = random.randbytes(random.randrange(200, 400))
        sent_hash.update(to_send)
        sent_len += len(to_send)
        sock.send(to_send)
        time.sleep(random.randrange(2, 20) / 1000)

    recv_hash = hashlib.sha256()
    recv_len = 0
    while recv_len != sent_len:
        msg = sock.recv()
        recv_hash.update(msg)
        recv_len += len(msg)

    total_time_s = time.time() - time_start
    sock.close()
    router.close()

    if recv_hash.hexdigest() == sent_hash.hexdigest():
        print(
            f"Sent and received {sent_len} bytes. Confirmed hashes match: {sent_hash.hexdigest()}"
        )
    else:
        raise ValueError(
            f"Hash mismatch between sent bytes and received bytes: {sent_hash.hexdigest()} != {recv_hash.hexdigest()}"
        )
    print(
        f"Toatal packets sent: {router.sock.total_sends} -- Total packets received: {router.sock.total_recvs}"
    )

    # packets received isn't meaningful here since we don't know how many packets our peer sent
    # so just log the packets sent
    _log_datum("results.json", [router.sock.total_sends, total_time_s], condition)


if __name__ == "__main__":
    main()

Finally we can can bring up our echo server (sudo ip netns exec A python rcp_echo.py) and write a script that will run our experiment with various levels of packet dropping.

#! /bin/bash

for rate in 10 20 30 40 50 60;
do
    ip netns exec B iptables -F FORWARD
    ip netns exec B iptables -A FORWARD -m statistic --mode random --probability 0.$rate -j DROP
    for trial in 1 2 3 4 5;
    do
	echo "Running trial $trial of drop rate $rate"
	# ip netns exec C bash -c "SLIDING_FENCE_DISABLED=1 python3.9 rcp_benchmark.py 10.0.1.2 1234 10.0.0.1 1234 $rate"
	ip netns exec C python3.9 rcp_benchmark.py 10.0.1.2 1234 10.0.0.1 1234 $rate
	sleep 30
    done
done

Mean packets sent (n=5)

Drop Rate Sliding Window Packets Sliding Fence Packets Window TTC (seconds) Fence TTC (seconds)
10% 3693 3289 10.6 10.7
20% 3884 3691 10.6 10.6
30% 4543 4105 10.7 10.6
40% 5972 4859 10.7 10.7
50% 7735 5981 10.6 10.6
60% 10561 7421 10.6 10.7

Conclusions

So we can see as expected that there isn’t much difference in time-to-completion between the two protocols but there is a significant difference in total packets sent. The performance gain in terms of packets grows from ~10% fewer dropped packets to about 25%, well worth the 4 bytes of extra packet overhead. So for the time being our exploration into sliding fences remains a fun project and a nice way to learn about Linux networking primitives, but in a future world with less reliable links our sliding fence may yet be game changing.

Until next time!