Consensus Algorithm (version 2.x)

Introduction

For all of the reasons described in the previous consensus algorithm, speculative execution was removed from the runtime. This greatly simplifies the consensus algorithm. In fact, there isn’t much need for a consensus algorithm anymore, apart from some edge cases. Hence the fork statement is gone from the language.

Most of this document will describe a different problem that wasn’t discussed in the document describing the previous consensus algorithm: the introduction of a select statement and the way in which components are allowed into a synchronous region.

Reducing the Previous Consensus Algorithm

Without speculative execution, there is no more execution tree: there is only one control flow path a component can take through its PDL code. That execution may not form a complete trace (i.e. reach the end of the sync block) because it encounters a blocking operation. But to reiterate: if there is a complete trace, then it can only have taken one control flow path through its code.

However, there is still are reasons for incorporating parts of the old consensus algorithm. That is: there is still a port mapping, put operations update this port mapping, and get operations (which will not fork anymore, but simply block until a message is present) do the same thing.

There are two reasons for this. Firstly there is the design choice of enforcing strict ordering on the way channels are used between components. If there are two channels between components, then we may have the following code:

comp sender_variant_a(out<u32> tx_a, out<u32> tx_b) {
    sync {
        put(tx_a, 1);
        put(tx_b, 2);
    }
}

comp sender_variant_b(out<u32> tx_a, out<u32> tx_b) {
    sync {
        put(tx_b, 2);
        put(tx_b, 1);
    }
}

comp receiver(in<u32> rx_a, in<u32> rx_b) {
    sync {
        auto va = get(rx_a);
        auto vb = get(rx_b);
    }
}

If we wouldn’t send along the port mapping. Then both sender_variant_a as sender_variant_b would be valid peers for the receiver component. This is because if put operations are still asynchronous (that is: they send the message and continue executing, not waiting for the corresponding get to complete), then both messages will arrive at the receiver component. The receiver component can retrieve these messages in any order. However, if we do send along the port mapping, then only sender_variant_a can be a valid peer of receiver. Note: this is a design choice.

We could reduce this design choice by only sending along the port mapping of the port over which is a message is sent. This would imply that messages over the same channel have to be received in the correct order, but messages over different channels can be received in any desired order. We’ve kept this strict ordering in place by sending along the full port mapping.

The second reason for still including the port mapping has to do with the fact that, by design, put operations are not acknowledged. That is to say: the put operation is asynchronous, and does not prevent the sending component from continuing its execution. For a put operation to be valid, the message must be received by the peer. Without any kind of port mapping we have no way of detecting the following mismatch:

comp sender(out<u32> tx) {
    sync {
        put(tx, 0);
        put(tx, 1);
    }
}

comp receiver(in<u32> rx) {
    sync get(rx);
}

However, there are much simpler ways of designing a consensus algorithm without speculative execution: one may annotate the port with the number of message sent, one may (as stated above) only send along the port mapping for a single port, instead of all shared channels, etc.

That being said, the current implementation still uses branch IDs (differently interpreted in this case: a simple operation counter). There is still a port mapping from port ID to branch ID, and finding the global solution still proceeds in the same way (except now there is only one complete trace per component).

Synchronous Regions

There were some aspects of the consensus algorithm that were specifically left out in the previous document. Among them is when to consider a component part of the synchronous region. In version 1.0 of the Reowolf runtime each peer of each component was considered part of the synchronous region. The idea behind this decision was that the fact that a channel remains unused in a synchronous round should be seen as part of the consensus algorithm: if a component did not get on a port, then the peer must agree that it did not put on a port.

So when a complete trace was found (that is: a component reached the end of its sync block), a component went over all its ports and sent a special message over the unused/silent channels asking the peers to join the synchronous round with their local solutions, where those local solutions are required not to have sent/received anything over those silent channels.

The problem with this approach may be illustrated with a simple example: suppose a set of requesting servers that are connected to a database server of some sorts. In the normal situation it would be perfectly normal for multiple servers to be storing database resources at the same time. These can all be handled in sequence by the database server, but the requesting servers do not necessarily have to wait for one another. Some pseudocode for this example:

union Request { 
    StoreData(u8[]),
    /* more things here in a more complicated example */ 
}

comp requester(out<Request> tx_cmd) {
    // Setup code here
    // Perhaps some code that requires retrieving database resources
    while (!should_exit) {
        sync {
            u8[] data = { 1, 2, 3 } /* something more complicated */
            sync {
                put(tx_cmd, Request::StoreData(data))
            }
        }
    }
}

comp database(in<Request>[] rx_cmds) {
    // Note the array of input ports: there are multiple requesters
    // Lot of complicated stuff
    while (!should_exit && length(rx_cmds) > 0) {
        // Here is the conundrum (and a second one, mentioned later in this document):
        auto command = get(rx_cmds[0]); // take a message from the first requester
        if (let Request::StoreData(to_store) = command) {
            store_the_data(to_store);        
        } else {
            // other commands, in a more complicated example
        }
    }
}

In this case, the code seems reasonable, but will always fail if there are >1 requesters at the database component. Because once the database reaches the end of its sync block, it will have a mapping for rx_cmds[0], but the remaining ports were all silent. So the consensus algorithm asks the peer requester components if their channels were silent. But they aren’t! Each requester can only send data in its sync block.

So one might be inclined (in fact, its the only way to solve this in the unmodified language) to write the requester as:

comp requester(out<Request> tx_cmd) {
    // Setup code here
    // Perhaps some code that requires retrieving database resources
    while (!should_exit) {
        sync {
            fork {
                // We either do nothing
            } or fork {
                // Or we communicate with the database server 
                u8[] data = { 1, 2, 3 } /* something more complicated */
                sync {
                    put(tx_cmd, Request::StoreData(data))
                }
            }
        }
        
        some_code_that_may_take_a_really_long_time();
    }
}

In some sense the problem is solved. Only one requester component will have its request going through, the other ones have silent channels, and the database component is capable of receiving that message. But there are two further problems here: Firstly there is no way to give any guarantees about fairness: the component that can communicate to the database component the fastest will probably have their traces completed first, hence will be the first ones submitted to the consensus algorithm, hence will likely be the one picked first. If this happens over the internet then the slow connections will almost never be able to submit their information. Secondly we have that none of the components can run at their own pace. Each component always has to wait until all of the other ones have joined the synchronous region. There is no way in which a single requester can do some other work on its own unfettered by the execution speeds of its peers.

And so the rule for joining a synchronous region was changed. Now it is simply: if a component performs a get and receives a message from another component, then they both become part of the same synchronous region (and perhaps this region is already larger because the components are part of larger separate synchronous regions). If a message is in the inbox of the component, but there is no get operation performed, then the message is kept around for perhaps a later synchronous round. Note that this works together with the consensus algorithm in such a way that joining a consensus region happens upon the first accepted message on a channel. Later messages within that synchronous round now must arrive due to the consensus algorithm.

Select Statement

The example above hints at a second problem (figured out a long time ago by our unix overlords): there is no way within PDL code to say that we can perform a variety of behaviours triggered by the arrival of messages. For our database component above, we see that it can have multiple requesters connected to it, but there is no way to indicate that we can have valid behaviour for any one of them. So we introduce the select statement. It is a statement with a set of arms. Each arm has a guard, where we indicate which ports need to have incoming messages, and a block of code, that is executed when all of the indicated ports actually have received a message.

The select statement may only appear within a sync block. The arm’s guard is formulated in terms of an expression or a variable declaration (that may contain get calls, but not put calls). The corresponding block of code is executed when all of the get calls in the guard have a message ready for them. In case multiple arm guards are satisfied then a random arms is chosen by the runtime for execution.

So the select statement takes the form:

select {
    auto value = get(rx_a) + get(rx_b) 
        -> do_something_with_the_value(value),
    auto value = get(rx_a)
        -> do_something_with_the_value(value),
}

This code is transformed by the compiler into something akin to:

while (still_waiting_for_a_message) {
    if (value_present(rx_a) && value_present(rx_b)) {
        auto value = get(rx_a) + get(rx_b);
        do_something_with_the_value(value);
        
        break;
    } else if (value_present(rx_a)) {
        auto value = get(rx_a);
        do_something_with_the_value(value);
        
        break;
    }
    block_until_there_is_a_new_message();
}

The combination of the select statement and the way we introduce components into the synchronous region permits components to run independently of another when their protocol admits it.

The rule where components only enter a synchronous region when the get a message that is present in the inbox still applies here. If, in the example above, the arm requiring messages on channel b executes, then only the peer component of this channel joins the synchronous region. The message that came over channel a will still be present in the inbox for later interactions or sync blocks.

The Current Consensus Algorithm

Because the current consensus algorithm is a scaled down version of the previous one, we’ll be a bit more concise: Each component has a local counter, this produces number we (still) call branch branch numbers. The counter is incremented each time a get or a put call is performed. We’ll use this counter to annotate ports in the port mapping each time a put call is performed. The port mapping is sent along with messages sent through put calls. This message will arrive in the inbox of the receiver. When the peer performs a get call on the receiving end of the channel, it will check if the received port mapping matches the local port mapping. If it doesn’t, we can immediately let the component crash. If it does, then the message is received and the component continues execution. Once a get call has completed, the sender is incorporated into the synchronous region, if it wasn’t already.

Once a component has reached the end of the sync block it will submit its local solution (i.e. the port mapping) for validation by the consensus algorithm. If all components in the synchronous region have submitted a local solution, whose port mappings are pairwise consistent with one another, then we’ve found a global solution. With this global solution the components in the synchronous region are ordered to continue the execution of their PDL code.

As a small side-note: with speculative execution the consensus algorithm amounted to finding, for each component, a complete trace whose interactions are consistent with all of its peers. Without speculative execution we have the multi-party equivalent to an Ack message in the TCP protocol. (Yes, this statement skips over a lot of details of the TCP protocol, but) in TCP both parties perform a best-effort attempt to ensure that a sent message has arrived at the receiver by the receiver Acknowledging that the message has been received. In this cases the consensus algorithm performs this function: it ensures that the sent messages have all arrived at their peers in a single multi-party interaction.