No compromises I/O

Published: Dec. 26, 2022, 10:22 a.m.

"Mr. h33p, what kind of substance are you on? All I want is 2 simple read/write functions!", said everyone in the room. Well, if you just let me get into the weeds, I'll tell you why that's not enough, and why we can do much, much better.

Preface

Happy holidays, everyone! I/O is at the heart of memflow - it is the path from read/write invokation on a virtual process all the way to low level LeechCore calls. That path may cross several translation steps, it may even cross machines through networking, and every step adds overhead. The question is, how much of it is necessary? That's exactly what I want to solve - making that path as efficient as possible, without sacrificing API simplicity.

Layers of complexity

What is the simplest read function you can write? In what ways can we make it more complex? What wins do these layers of complexity provide?

Failure-free operation

  1. Basic single-byte read
fn read(address: usize) -> u8;

You can't get any simpler than this. The smallest addressable piece of data gets read at the smallest granularity address possible. This is the core upon which everything else builds on.

  1. Buffer read (memcpy)
fn read(address: usize, buffer: &mut [u8]);

This is equivalent to memcpy(dest, src, size), except address is src, and dest with size are encoded within buffer. Say it's a more rusty version of memcpy.

  1. Multi-buffer read
struct Read<'a> {
    address: usize,
    buffer: &'a mut [u8]
}

fn read(ops: &mut [Read]);

This is merely putting all the read instances to one function call. Every function call incurs a latency penalty, and switching to kernel side incurs an especially heavy penalty. Having all reads happen in one go minimizes that overhead.

  1. Streamed read
fn read(ops: impl Iterator<Item = Read>);

It's the same here, except you don't know how many elements there are, you may or may not choose to split it up according to hard limits of underlying implementation. But the idea is, that there is an arbitrary number of reads.

Axiom 1: each layer of complexity can be implemented using a different layer.

Axiom 2: higher layer of complexity, implemented using lower layer, requires multiple simpler operations to perform.

Axiom 3: multiple invokations of a lower layer can be abstracted by fewer invokations of a higher layer.

Claim: each layer of complexity adds some constant computation overhead, but it is insignificant, compared to reduction in computational complexity achieved (note that layers 3 and 4 are somewhat equivalent, but I chose to split them apart to make the transition not seem as large. From now on, I will only display streamed reads).

Fallible operation

  1. Basic single-byte read
fn read(address: usize) -> Result<u8>;

The read will either succeed or fail with specific error. There is nothing to improve here.

  1. Buffer read
fn read(address: usize, buffer: &mut [u8]) -> Result<()>;

The read will either succeed fully or fail in undefined way. Buffer may need to be fragmented, we can improve by returning partial successes and failures.

  1. Streamed read
fn read(ops: impl Iterator<Item = Read>, out: impl Fn(Result<()>));

The main problem with this design is that there is no way for dealing with partial successes or data fragmentation

Fragmentable operation

Fragmentable operation implies data separation and reordering. We need to be able to distinguish what the output of which part of each read is, hence we attach some metadata.

  1. Basic single-byte read
fn read(address: usize) -> Result<u8>;

No change.

  1. Buffer read
struct Output<'a> {
    address: usize,
    buffer: &mut [u8],
    error: Option<Error>
}

fn read(address: usize, buffer: &mut [u8], out: impl Fn(Output));
  1. Streamed read
struct Read<'a> {
    elem_idx: usize,
    address: usize,
    buffer: &'a mut [u8]
}

struct Output<'a> {
    elem_idx: usize,
    address: usize,
    buffer: &mut [u8],
    error: Option<Error>
}

fn read(ops: impl Iterator<Item = Read>, out: impl Fn(Output));

Forward-redirectable operation

During fragmentation destination address of each buffer may change. To output results efficiently, source address or offset must be attached as additional metadata.

  1. Basic single-byte read
fn read(address: usize) -> Result<u8>;

No change

  1. Buffer read
struct Output<'a> {
    src_address: usize,
    dst_address: usize,
    buffer: &mut [u8],
    error: Option<Error>
}

fn read(src_address: usize, dst_address: usize, buffer: &mut [u8], out: impl Fn(Output));

src_address may be the starting address of the read (at the root src_address = dst_address), or 0, which would then yield output of the offset within the buffer.

  1. Streamed read
struct Read<'a> {
    elem_idx: usize,
    src_address: usize,
    dst_address: usize,
    buffer: &'a mut [u8]
}

struct Output<'a> {
    elem_idx: usize,
    address: usize,
    buffer: &mut [u8],
    error: Option<Error>
}

fn read(ops: impl Iterator<Item = Read>, out: impl Fn(Output));

There is one thing we did not consider here - buffer availability. Sometimes you may want to allocate on-demand without overallocating regions that will fail during the process. This is especially important for networked transfer.

Buffer handle

This design implies buffers that exist only in an abstract sense. Different designs are possible, though, with different levels abstractness. Which is optimal? Let's find out.

  1. Basic single-byte read
fn read(address: usize) -> Result<u8>;

No change, because abstract buffers are not applicable.

  1. Buffer read
struct Buffer<'a> {
    data: &'a mut (),
    vtbl: &'static BufferVtbl,
}

struct BufferVtbl {
    get_mut: for<'a> fn(&'a mut self) -> &'a mut [u8],
    len: fn(&self) -> usize,
    split_at: fn(self, pos: usize) -> (Buffer, Buffer),
}

impl<'a> From<&'a mut [u8]> for Buffer<'a> { ... }

struct ByteAddressBuffer<'a> {
    data: &'a mut [u8],
    address: usize,
}

impl<'a> From<ByteAddressBuffer<'a>> for Buffer<'a> { ... }

struct Output<'a> {
    buffer: Buffer,
    error: Option<Error>
}

fn read(address: usize, buffer: Buffer, out: impl Fn(Output));

We can actually drop src_address! This data can be attached to the buffer and forwarded through. Alternatively, it can be swapped for something different for different tracking purposes.

  1. Streamed read
struct ByteIdxAddressBuffer<'a> {
    data: &'a mut [u8],
    address: usize,
    elem: usize,
}

impl<'a> From<ByteIdxAddressBuffer<'a>> for Buffer<'a> { ... }

fn read(ops: impl Iterator<Item = Read>, out: impl Fn(Output));

Should indexing on output be needed - just have the index stored within the buffer. Otherwise, the interface stays the same as single buffer read!

Abstract buffers actually have a surprising improvement in simplicity! You pay for what you need as end user!

Now, there is a bit of a flaw here. You put the data in the buffer, but how do you actually use it when it's returned out? The answer is opaquing the callback.

Opaquing the output callback

To access the buffer within output, the output callback must be attached to the input data. I.e. either every buffer must be of same type, or there needs to be a individual callback for each buffer. The callback could be attached to the buffer's vtable. Let's see this in action!

  1. Basic single-byte read
fn read(address: usize) -> Result<u8>;

No change, because abstract buffers are not applicable.

  1. Buffer read
struct BufferVtbl {
    get_mut: for<'a> fn(&'a mut self) -> &'a mut [u8],
    len: fn(&self) -> usize,
    split_at: fn(self, pos: usize) -> (Buffer, Buffer),
    output: fn(&mut self, error: Option<Error>),
}

fn read(address: usize, buffer: Buffer);
  1. Streamed read
fn read(ops: impl Iterator<Item = Read>);

There are some end conditions to consider. One being success - write to buffer, call output(None). Second is failure - call output(err). Third is nothing. No output was called. What do we do then? Implicit error? Panic?

Success usage may be simplified with a wrapper, but failure may still need to be handled.

Final thing to consider is attaching vtable to each buffer. What if we always use the same data structure? It would be more efficient to have separate vtable and then passing pointers to buffers themselves. In addition, each buffer object is merely a pointer to underlying data store, which requires a heap allocation, or some clever stack ussage. Heap would prove prohibitavely expensive, perhaps an arena? Who knows, but this detail is making things tricky. Let's think about it.

Solving the alloc problem

We can avoid creating a mutated buffer upon splitting, by storing bounds of the said buffer on the abstract side, like so:

struct BufferObj<'a> {
    buffer: *mut (),
    start: usize,
    end: usize,
    phantom: PhantomData<&'a mut u8>,
}

struct Buffer<'a> {
    buffer: BufferObj<'a>,
    vtbl: &'a BufferVtbl,
}

struct BufferVtbl {
    get_mut: for<'a> fn(&'a mut BufferObj) -> &'a mut [u8],
    output: fn(BufferObj, error: Option<Error>),
}

len and split_at functions become part of the impl Buffer, and it becomes responsibility of each implementation to ensure memory safety guarantees when accessing the data (notice how buffer is now a pointer, not a mut ref).

Now, the buffer pointers still need to be created somehow, don't they? That is an unfortunate limitation of the approach, unless we created some sort of (metadata) stacking system.

Zero-access buffer

What if you never had access to the bytes of the buffer in the first place? Not requiring the buffer to have an underlying byte representation would be incredibly powerful - reading directly to network streams, using GPU objects, DMA, etc. That is powerful. Nothing really changes here, apart from the buffer vtable:

struct BufferVtbl {
    get_mut: for<'a> fn(&'a mut BufferObj) -> &'a mut [u8],
    write: fn(&mut BufferObj, data: &[u8]),
    output: fn(BufferObj, error: Option<Error>),
}

The idea is simple - if you already have a byte slice - call write. It will pass your slice down to be copied without intermediate buffers. If it's a file, it will call file.write(data), if it's a network stream, something equivalent, if it's another byte buffer, well, then it will fall back to copy to a get_mut result. But it will be branchless and efficient.

If you don't have a byte buffer, and APIs called by you expect one, then you should use get_mut, and pass the buffer over to API. If the underlying buffer is a byte buffer, it will cost you nothing! If it's a stream, then there will be an allocation, which can be optimized with arenas and what not.

Can we reduce this even further? What if we could pass from stream to stream? Okay, that might actually be a bit too close to the sun. Let's stick with the last part.

Improving buffer states

There is a slight problem, however, with this approach. It would become key to call output function to correctly synchronize temporary buffers. This is error prone.

Let's rework the vtable so that it functions more as a state machine (and let's use fully valid syntax for a change):

struct BufferVtbl {
    get_mut: for<'a> fn(BufferObj<'a>) -> AllocedBufferObj<'a>,
    write: for<'a> for<'b> fn(BufferObj<'a>, data: &'b [u8]),
    error: for<'a> fn(BufferObj<'a>, error: Error),
}

struct AllocedBufferObj<'a> {
    alloced_buffer: &'a mut [u8],
    buffer: BufferObj<'a>,
    write: for<'a> fn(BufferObj<'a>, data: &'a mut [u8]),
    error: for<'a> fn(BufferObj<'a>, data: &'a mut [u8], error: Error),
}

impl<'a> Drop for AlloceBufferObj<'a> {
    fn drop(&mut self) {
        (self.write)(self.buffer, self.alloced_buffer);
    }
}

impl<'a> Deref for AlloceBufferObj<'a> {
    type Target = [u8];
    // ...
}

impl<'a> DerefMut for AlloceBufferObj<'a> {
    // ...
}

Now user may not invoke the same state twice. That is good, because then there is no need to check whether buffer was allocated or not before freeing, because that state is perfectly known ahead of time. In fact, the buffer within AllocatedBufferObj may be a different type of buffer!

Async

I/O takes time, right? Can we somehow make this whole design asynchronous?

fn read(address: usize, buffer: Buffer) -> impl Stream<Item = Output>;

That's right, we don't need to deal with iterators of buffers anymore. Think about it, you can enqueue multiple reads before flushing them by calling .await. In theory it works like magic and makes me look stupid for dealing with those iterators and callbacks all this time!

To give a better idea, take the following psueodode:

let fut1 = reader.read_val::<u64>(0xdeadbeef);
let fut2 = reader.read_val::<u32>(0xdeadbefe);
let mut buf = [0u8; 256];
let fut3 = reader.read(0xdeedfeeb, Buffer::new(&mut buf));

let (val64, val32, val_buf) = join!(fut1, fut2, fut3).await;

And here's a hypothetical reader impl:

let mut ops = vec![];

loop {
    while let Some(op) = rx.next().await {
        // flush is sent whenever there is an await happening
        if op.is_flush() {
            break;
        }
        ops.push(op);
        if ops.len() >= 256 {
            break;
        }
    }
    perform_all_reads(&mut ops);
}

Automatic batching, this is literal magic!

However, there still is a million dollar question that needs solving - how do you actually relate input Buffer with the output Stream?

Note that you start with one buffer for the output stream (i.e. one buffer to output), but after fragmentation you end up with multiple buffers pointing to one stream. Completed buffers need to be stored somewhere. We could allocate a reference counted list on the stream, but we need to remember that heap allocations are expensive, thus we need to reuse these lists as much as possible. One object that never moves throughout execution of the future is the reader - we can allocate the lists there, and while we're at it, let's allocate the streams themselves on the reader!

Initial design

Before we think too much about micro-optimizing data reuse, let's first write a very naive design:

use flume::{Receiver, RecvStream, Sender, unbounded};

// For now this is just a Receiver::into_stream
type BufferStream = RecvStream<'static, Output>;

struct Reader {
    tx: Sender<(usize, Buffer)>,
    rx: Receiver<(usize, Buffer)>,
}

impl Reader {
    fn new() -> Self {
        let (tx, rx) = unbounded();
        Self { tx, rx }
    }

    async fn run(&self) {
        let mut stream = self.rx.stream();
        while let Some((addr, op)) = stream.next().await {
            // read impl
            op.error(Error::NotImplemented);
        }
    }

    fn read(&self, address: usize, buffer: Buffer) -> BufferStream {
        let (tx, rx) = unbounded();

        buffer.set_output(tx);
        self.tx.send(buffer).unwrap();

        rx.into_stream()
    }
}

struct BufferObj<'a> {
    buffer: *mut (),
    start: usize,
    end: usize,
    // new field:
    sender: Sender<Output>,
    phantom: PhantomData<&'a mut u8>,
}

struct Buffer<'a> {
    buffer: BufferObj<'a>,
    vtbl: &'a BufferVtbl,
}

impl<'a> Buffer<'a> {
    fn set_output(&mut self, sender: Sender<Output>) {
        self.buffer.sender = Some(sender);
    }
}

struct BufferVtbl {
    get_mut: for<'a> fn(BufferObj<'a>) -> AllocedBufferObj<'a>,
}

struct AllocedBufferObj<'a> {
    alloced_buffer: &'a mut [u8],
    buffer: BufferObj<'a>,
}

Buffer write and error are now implemented as part of Buffer and AllocatedBufferObj impls.

It's pretty hard to fit everything in a very small space, but this is an end-to-end implementation. One thing we can improve is the fact that buffers need a stream attached only at the start of the chain:

impl Reader {
    fn alloc_stream(&self, buffer: &mut Buffer) -> BufferStream {
        let (tx, rx) = unbounded();
        buffer.set_output(tx);
        rx.into_stream();
    }

    fn send_read(&self, address: usize, buffer: Buffer) {
        self.tx.send((address, buffer)).unwrap();
    }

    fn read(&self, address: usize, mut buffer: Buffer) -> BufferStream {
        let ret = self.alloc_stream(&mut buffer);
        self.send_read(address, buffer);
        ret
    }
}

Now, read becomes the frontend method, while alloc_stream with send_stream become implementation details. To make this sufficiently performant we need to allocate a BufferStream object on the Reader and recycle it after use, hence why we have 2 functions for allocating a stream and sending the read.

We could pass the stream as additional parameter during send, but I think it's very important to make it hard to missend the buffer down the wrong path at the API level.

Inserting the flush

Currently there is a risk of deadlock upon .await operation, because the Reader may choose to batch up an arbitrary number of reads into one large operation for performance reasons, and there is no way to explicitly tell when to stop. The final piece needed to make all of this work is implicit insertion of flushes whenever the user is awaiting for the result. This requires us to create a custom BufferStream implementation, I will not write the whole thing down, but the gist of it is as follows:

#[pin_project::pin_project]
struct BufferStream {
    #[pin]
    inner: RecvStream<'static, Output>,
    flush: Arc<AtomicBool>,
}

impl Stream for BufferStream {
    type Item = Output;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Item> {
        let this = self.project();
        match this.inner.poll_next(cx) {
            Poll::Pending => {
                this.flush.store(true, Ordering::SeqCst);
                Poll::Pending
            }
            x => x
        }
    }
}


struct Reader {
    tx: Sender<(usize, Buffer)>,
    rx: Receiver<(usize, Buffer)>,
    flush: Arc<AtomicBool>,
}

impl Reader {
    fn new() -> Self {
        let (tx, rx) = unbounded();
        Self { tx, rx, flush: Default::default() }
    }

    async fn run(&self) {
        let mut stream = self.rx.stream();
        loop {
            // wait for the next element or until self.flush becomes true
            let next = stream_next_or_wait_true(&mut stream, &self.flush);
            // if flush happened, try_read all elements without blocking and perform read
        }
    }
}

That's close to it! The only thing left is allocation optimizations, but that doesn't change the core of it - reads end up processed in one long loop, as opposed to multiple individual function calls.

Achilles' heel

Some of you may have raised some eyebrows seeing multiple parallel thinking "surely this can't be efficient". I, after some talking with ko1N, share the concern. So the question is, is it good enough?

I rigged up a performance benchmark testing various scenarios, and I counted the number of calls performed per second. Here are the results:

Bare function                      | 59269249
Deque sequential                   | 57558620
Concurrent queue sequential        | 38223797
Single channel sequential          | 32853343
Single channel sequential async    | 24920480
Double channel sequential          | 22133939
Double channel sequential async    | 15569659
Bare function with dummy syscall   | 5354162
2 tasks 2 chan on threaded runtime | 2175128
2 async tasks and 2 channels       | 1046951
2 threads and 2 channels           | 377872

Bare function represents the theoretical maximum that can be achieved in direct memory copy scenarios, while dummy syscall is used as an anchor point for scenarios where kernel context switch occurs. As you can see, a single context switch lowers the performance by the factor of 10 (on arm64 macOS machine), so while the difference for everything in-between is large, it's worth to note that the methods are still very fast. That being said, it's evident that any parallel processing done by the methods after the syscall is simply too slow, potentially due to the complexity involved in synchronizing the 2 sides. This poses a critical problem.

First of all, is my hypothesis correct? I used tokio-metrics to monitor execution of my async functions. It's as simple as surrounding my tasks with TaskMonitor::instrument and then print the results afterwards.

Here are the results:

Single channel sequential async:
24630635/s 0.00003ms
TaskMetrics {
    instrumented_count: 1,
    dropped_count: 1,
    first_poll_count: 1,
    total_first_poll_delay: 2.125µs,
    total_idled_count: 0,
    total_idle_duration: 0ns,
    total_scheduled_count: 0,
    total_scheduled_duration: 0ns,
    total_poll_count: 1,
    total_poll_duration: 1.000395041s,
    total_fast_poll_count: 0,
    total_fast_poll_duration: 0ns,
    total_slow_poll_count: 1,
    total_slow_poll_duration: 1.000395041s,
}
Double channel sequential:
22362663/s 0.00004ms
Double channel sequential async:
14271546/s 0.00006ms
TaskMetrics {
    instrumented_count: 1,
    dropped_count: 1,
    first_poll_count: 1,
    total_first_poll_delay: 1.833µs,
    total_idled_count: 0,
    total_idle_duration: 0ns,
    total_scheduled_count: 0,
    total_scheduled_duration: 0ns,
    total_poll_count: 1,
    total_poll_duration: 1.000018625s,
    total_fast_poll_count: 0,
    total_fast_poll_duration: 0ns,
    total_slow_poll_count: 1,
    total_slow_poll_duration: 1.000018625s,
}
2 jobs and channels (MT runtime):
2175128/s 0.000ms
TaskMetrics {
    instrumented_count: 2,
    dropped_count: 2,
    first_poll_count: 2,
    total_first_poll_delay: 13.416µs,
    total_idled_count: 4350256,
    total_idle_duration: 572.511412ms,
    total_scheduled_count: 4350257,
    total_scheduled_duration: 740.525747ms,
    total_poll_count: 4350259,
    total_poll_duration: 638.974828ms,
    total_fast_poll_count: 4350248,
    total_fast_poll_duration: 637.946415ms,
    total_slow_poll_count: 11,
    total_slow_poll_duration: 1.028413ms,
}
2 jobs and channels:
1187762/s 0.001ms
TaskMetrics {
    instrumented_count: 2,
    dropped_count: 2,
    first_poll_count: 2,
    total_first_poll_delay: 6.625µs,
    total_idled_count: 2375524,
    total_idle_duration: 807.33773ms,
    total_scheduled_count: 2375525,
    total_scheduled_duration: 852.521808ms,
    total_poll_count: 2375527,
    total_poll_duration: 311.660277ms,
    total_fast_poll_count: 2375525,
    total_fast_poll_duration: 311.499944ms,
    total_slow_poll_count: 2,
    total_slow_poll_duration: 160.333µs,
}

Notice how sequential tasks do not idle, while the 2 jobs on multi-threaded and single-threaded runtimes spent 57% and 81% of their time idling respectively. So the hypothesis is correct - a lot of time is wasted in synchronization. Simply eliminating those idles would make good enough for dummy syscall situations. In ideal scenario, the total_poll_duration should approach 1 second. And if we take the single threaded case of 1187762 calls per second, scale it up so that polling duration is 1 second, 1187762 / (1 - 0.3112) = 1724393, which makes it roughly in line with Double channel sequential async - the maths check out.

With careful design, I think there is a way to walk around this problem. The polling implementation of the client could directly poll on the server, completely bypassing the scheduler. Think about it - server waits for the channel to receive elements. Once we create a read stream and attach our buffer to it, we know with certainty that the server's channel is going to have elements available, we just need to poll on it.

So, can we pull this off?

We need to create a detached task that can be polled from multiple places. Something like Mutex<Pin<Box<dyn Future>>>. The poll implementation of every read stream then does try_lock on the future and upon success, polls the internal future until Poll::Pending state is reached. This, in theory, should eliminate the scheduling overhead, although there are still some details to work out when it comes to lock ownership (does the last future hold on to the lock or does it release it as soon as possible?). Let's just see how it fares. Here are the key snippets of code:

#[pin_project::pin_project]
struct SharedFuture<F> {
    inner: Pin<Arc<SharedFutureContext<F>>>,
}

struct SharedFutureContext<F> {
    future: Mutex<Option<F>>,
    finished: AtomicBool,
}

#[derive(Debug, Clone, Copy)]
enum SharedFutureOutput<T> {
    AlreadyFinished,
    JustFinished(T),
    Running,
}

impl<F: Future> SharedFuture<F> {
    async fn run_till_finished(self) {
        loop {
            match self.clone().await {
                SharedFutureOutput::AlreadyFinished | SharedFutureOutput::JustFinished(_) => break,
                _ => {
                    tokio::task::yield_now().await
                }
            }
        }
    }

    async fn run_once(&self) {
        self.clone().await;
    }
}

impl<F: Future> Future for SharedFuture<F> {
    type Output = SharedFutureOutput<F::Output>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.project();
        let inner: Pin<_> = this.inner.as_ref();
        let inner = inner.get_ref();

        if let Ok(mut future_cont) = inner.future.try_lock() {
            if let Some(future) = future_cont.as_mut() {
                let pin = unsafe { Pin::new_unchecked(&mut *future) };
                match pin.poll(cx) {
                    Poll::Pending => Poll::Ready(SharedFutureOutput::Running),
                    Poll::Ready(val) => {
                        *future_cont = None;
                        inner.finished.store(true, Ordering::Relaxed);
                        Poll::Ready(SharedFutureOutput::JustFinished(val))
                    }
                }
            } else {
                Poll::Ready(SharedFutureOutput::AlreadyFinished)
            }
        } else if inner.finished.load(Ordering::Acquire) {
            Poll::Ready(SharedFutureOutput::AlreadyFinished)
        } else {
            Poll::Ready(SharedFutureOutput::Running)
        }
    }
}

And here are different versions of the 2 job benchmark:

// Original impl
async fn test_latency_async(monitor: TaskMonitor) {
    let (tx1, rx1) = flume::unbounded();
    let (tx2, rx2) = flume::unbounded();

    let handle = tokio::spawn(monitor.instrument(tokio::task::unconstrained(async move {
        while let Ok(item) = rx1.recv_async().await {
            tx2.send_async(item).await.ok();
        }
    })));

    let instant = Instant::now();

    let mut elapsed = 0f64;
    let mut calls = 0;

    let mut tick_cnt = 0;

    while instant.elapsed().as_f64() <= 1.0 {
        let s = tick_count();
        let start = Instant::now();
        tx1.send_async(start).await.unwrap();
        let end = rx2.recv_async().await.unwrap();
        calls += 1;
        elapsed += end.elapsed().as_f64();
        tick_cnt += tick_count() - s;
    }

    tick_cnt /= calls;

    println!(
        "{calls}/s {:.3}ms | {tick_cnt}",
        elapsed / calls as f64 * 1000.0
    );

    std::mem::drop(tx1);
    std::mem::drop(rx2);

    handle.await.ok();
}

// Using SharedFuture
async fn test_latency_async_sfut(monitor: TaskMonitor) {
    let (tx1, rx1) = flume::unbounded();
    let (tx2, rx2) = flume::unbounded();

    let shared_fut = SharedFuture::from(async move {
        while let Ok(item) = rx1.recv_async().await {
            tx2.send_async(item).await.ok();
        }
    });

    let handle = tokio::spawn(monitor.instrument(tokio::task::unconstrained(
        shared_fut.clone().run_till_finished(),
    )));

    let instant = Instant::now();

    let mut elapsed = 0f64;
    let mut calls = 0;

    let mut tick_cnt = 0;

    while instant.elapsed().as_f64() <= 1.0 {
        let s = tick_count();
        let start = Instant::now();
        tx1.send_async(start).await.unwrap();
        shared_fut.run_once().await;
        let end = rx2.recv_async().await.unwrap();
        calls += 1;
        elapsed += end.elapsed().as_f64();
        tick_cnt += tick_count() - s;
    }

    tick_cnt /= calls;

    println!(
        "{calls}/s {:.3}ms | {tick_cnt}",
        elapsed / calls as f64 * 1000.0
    );

    std::mem::drop(tx1);
    std::mem::drop(rx2);

    handle.await.ok();
}

And here are the results:

Bare function                      | 58946058
Deque sequential                   | 57418653
Concurrent queue sequential        | 38125560
Single channel sequential          | 32886669
Single channel sequential async    | 24983136
Double channel sequential          | 22334788
Double channel sequential async    | 14839470
2 tasks 2 chan with SharedFuture   | 6062313
Bare function with dummy syscall   | 5475882
2 tasks 2 chan on threaded runtime | 2215541
2 async tasks and 2 channels       | 1214937
2 threads and 2 channels           | 406485

That's a significant improvement already. However, there still is significant overhead of running shared future. To save the trouble, the overhead stems from redundant poll that occurs when the loop is polled. Simply adding an explicit point that returns Poll::Pending once per loop iteration fixes the issue. I.e. changing the loop as follows:

// Before:
let shared_fut = SharedFuture::from(async move {
    while let Ok(item) = rx1.recv_async().await {
        tx2.send_async(item).await.ok();
    }
});
// After:
let shared_fut = SharedFuture::from(async move {
    let mut cnt = 0;
    while let Ok(item) = rx1.recv_async().await {
        tx2.send_async(item).await.ok();
        core::future::poll_fn(|_| {
            cnt += 1;
            if (cnt % 2) == 0 {
                Poll::Ready(())
            } else {
                Poll::Pending
            }
        }).await;
    }
});

Performance:

Before:            6062313
After:             13418794
Sequential target: 14839470

If we removed all the locking, we'd achieve performance equal to our target (Double channel sequential async). However, it is worth to note that this optimization is too specific to the situation at hand and in real-world it could very easily lead to rescheduling, something we want to avoid. A better path forward would be a more efficient data structure, which is the approach I'm going to take.

In conclusion, SharedFuture shows to be the perfect solution for job synchronization overhead and production implementation will use a variation of this method.

Final API

Due to this API change, there now isn't a strong need to require mutable reference to perform I/O - reads can be issued concurrently without much trouble. That is an incredible win for memflow. With this API shift, memflow can become an incredibly simple-to-use system with amazing scalability.

API itself becomes much simpler - batching becomes automatic, and the smallest number of reads gets issued. Take a look at this example:

let p1 = reader.read_val::<u64>(0xdeadfeeb);

let p2 = async {
    let p = reader.read_val::<usize>(0xdeadbeef).await;
    let p = reader.read_val::<usize>(p).await;
    reader.read_val::<u64>(p + 0x10).await
};

let p3 = async {
    let mut p = 0xfeedbeef;
    for _ in 0..5 {
        p = reader.read_val::<usize>(p).await;
    }
    p
}

let (p1, p2, p3) = join!(p1, p2, p3).await;

How many batches of operations are necessary to perform these reads? 5 is the longest chain, and so is the number of batches needed! Forget the days of thinking about collecting multiple values into a vector of operations - just let the async executor do the job for you!

Conclusion

This came a long way from a simple single-byte read. We iterated through multiple versions of sync API making it as efficient as it can be and then flipped the design on its head with a simpler async API. One could argue it is less efficient due to allocation of stream objects, but through smart object reuse there should not be significant penalty in production. Overall this leads to a monumental shift in memflow I/O making it scale much better.