Consensus Algorithm (version 1.x)

Introduction

The previous consensus algorithm (the one within Reowolf 1.0 and 1.1) had support for speculative execution. This means that the user may (directly or indirectly) fork the execution of a component. That particular execution (which may be a forked execution already) then becomes >1 executions. At some point a component will have to choose which particular execution will be committed to memory. This is one reason for the existence of a sync block: a block of code wherein one may perform forking, and at the end a component will have to choose the execution that is committed to memory.

With speculative execution we may have multiple components that are all forking their execution and sending/receiving messages. So we do not end up with one component that has to choose its final execution, but all components choosing their final execution. Note that one component’s execution may apply restrictions on the validity of another component’s execution. As an example, suppose the following components and their executions:

  • Component A: Has two executions:
    • Execution A1: Component A has sent a message to component B.
    • Execution A2: Component A has received a message from component B.
  • Component B: Has three executions:
    • Execution B1: Component B has received a message from component A, then sends a message back to component A.
    • Execution B2: Component B has received a message from component A.
    • Execution B3: Component B has sent two messages to component A.

Without delving into too much detail, one may see that the only valid solution to this problem is the combination of A1 and B2. So the components cannot just pick any execution, but must pick an execution that is agreeable with the chosen executions of the components it has interacted with.

Component Execution Tree, and Execution Traces

Components execute PDL code, which may contain calls to fork, put, and get. A fork explicitly forks the execution of the code. A put sends a message to a particular component, and a get receives a message from a component and forks (as explained later).

As the component enters a sync block, it has only one possible execution. But as stated above there are reasons for the execution to split up. These individual executions may themselves split up later, thereby forming a so-called “execution tree”:

                             +-----+       +------+
                             | put |------>| sync |
+-------+      +------+----->+-----+       | end  |
| sync  |      | fork |                    +------+
| start |----->+------+----->+-----+
+-------+                    | get |------>+------+
                             +-----+       | sync |
                                   |       | end  |
                                   |       +------+
                                   |
                                   +------>+------+
                                   |       | sync |
                                   |       | end  |
                                   |       +------+
                                   |
                                   +--> ...

This tree was formed by executing the following following PDL code:

primitive some_component(out<u32> tx, in<u32> rx) {
  sync {
    fork {
      put(tx, 5);
    } or fork {
      get(rx, 1);
    }
}

We can see the reason for calling the execution tree a “tree”. There are several things to note about the execution tree: Firstly that some executions have been completed and form a complete trace, that is: starting from the “sync start” a complete trace may be represented by the path running to the “sync end”. Conversely, there is one trace that is incomplete: there is a trace waiting at the get for a message. We’ll call a place where the execution splits into multiple branches/executions a “branching point”.

Note that the branching points can in the general case only be discovered at runtime. Any code may have control flow points like if statements, or while loops. Consider the following code:

primitive some_component(out<u32> tx, bool which_way) {
  sync {
    if (which_way) {
      fork {
        put(tx, 1);
      } or fork {
        put(tx, 2);
      }
    } else {
      put(tx, 3);
    }
  }
}

Depending on the value of which_way we produce two different execution trees (of which we can determine all traces). The compiler cannot decide at compile time which execution tree will be generated.

Note that the get branching points have an arbitrary number of forked executions arising from them. We’ll call them “waiting points”. In the general case we cannot figure out how many forked executions arise from a get branching point. The reason being might be illustrated by the following simple example:

primitive sender(out<u32> tx, u32 num_forks) {
  sync {
    auto fork_counter = 1;
    while (fork_counter < num_forks) {
      fork {
        put(tx, fork_counter); 
      } or fork { } // empty case
    }
    put(tx, num_forks);
  }
}

primitive receiver(in<u32> rx) {
  u32[] values = {};
  sync {
    bool keep_going = true;
    while (keep_going) {
      auto new_value = get(rx);
      values @= { new_value }; // append
      fork {
        keep_going = false; 
      } or fork { }
    }
  }
}

If the sender is connected to the receiver, then the sender will send anywhere between 1 and num_forks messages (distributed over num_forks forks), depending on a user-supplied parameter (which we cannot figure out at compile-time). The isolated receiver can generate an infinite number of forked executions. We can analyze that the receiver will at most have num_forks + 1 forked executions arising from its get branching point (the num_forks branches that do receive, and one final fork that is infinitely waiting on another message), but the compiler cannot.

For this reason a get branching point needs to be kept around for the entire duration of the sync block. The runtime will always need to have a copy of the component’s memory and execution state the moment it encountered a get instruction, because it might just be that another component (in perhaps a new fork, which we cannot predict) will send it another message, such that it needs to produce a new forked execution.

A get operation is also a “blocking operation”: in the general case the component needs to know the value produced by the get operation in order to continue its execution (perhaps more specifically: the first time a read operation is performed on the variable that will store the transmitted message). Consider the simple case where the received message contains a boolean that is used in the test expression of an if statement: we’ll need to have actually received that boolean before we can decide which control flow path to take. Speculating on the contents of messages is too computationally expensive to be taken seriously. A put operation is not a blocking operation: the message is sent and the component continues executing its code.

We’ve touched upon control flow points multiple times. We’ll touch upon some aspects of control flow here, to more easily introduce the algorithm for finding consensus later. A component is fully described by its memory (i.e. all of the memory locations it has access to through its variables) and execution state (i.e. its current position in its code). So once a component encounters a control flow point, it can only take one control flow path. Which path may be computed from its memory state. The calling of certain impure functions (e.g. retrieving a cryptographically secure random number) does not change this fact. Note that receiving values from other components might change a component’s memory state, hence influence the control flow path it takes in the subsequent forked execution. Conversely, a component sending a value might influence another component’s memory state.

So before treading into more detail, here we’ve found that in the general case:

  • Speculative execution may only occur inside sync blocks.
  • Speculative execution implies that we end up with an execution tree.
  • A path through the execution tree that reaches the end of the sync block is called a complete trace, and represents a valid execution of the sync block for the component (but perhaps not compatible with a particular complete trace of a peer it interacted with).
  • The set of traces produced by a component in its sync block can practically only be discovered at runtime.
  • A get operation is necessarily a blocking operation that always incurs a branching point. A put operation is a nonblocking operation that will not fork into multiple executions.
  • The control flow path of a trace of a component may be influenced by the messages it has received.

Towards a Consensus Algorithm

The key to the consensus problem is somehow discovering the ways in which the components have influenced the memory state of their peers. If we have a complete trace for each component, for which all peer components agree on the way they have influenced that complete trace, then we’ve found a solution to the consensus problem. Hence we can subdivide the consensus problem into four parts:

  1. Keeping track of the messages that influence the memory state of components.
  2. Keeping track of the peers that influence the memory state of components.
  3. Finding a set of interactions between components on which all involved components agree, i.e. each put should have a corresponding get at the peer.
  4. Somehow having a communication protocol that finds these agreeable interactions.

We’ll not consider the last point, as this is essentially a gossip protocol, and the appropriate gossip protocol varies with the requirements of the user (e.g. robust to failure, memory efficient, runtime efficiency, message complexity). We define some terms to make the following discussion easier:

  • “component graph”: A graph where each node is a component, and each channel between components forms am edge in that graph.
  • “sync region”: The group of components that have interacted with one another and should agree on the global consensus solution.
  • “local solution”: A complete trace of a component. For the component this is a valid local solution, but might not be part of a global solution.
  • “global solution”: A set of traces, one for each of the components in the sync region, that all agree on the interactions that took place between the components in the sync region.

We’ll now work incrementally to the final consensus algorithm, making it a bit of a story in order to explain the reasoning and intuition behind the consensus algorithm.

Suppose a component can somehow predict exactly which messages it is going to receive during the execution of its code, we’ll assume that each received message has the appropriate get call associated with it. In this case we’re able to produce the set of complete traces that a component produces by symbolically executing its code: we start out with the initial memory state, might perhaps do some explicit forking, know exactly which messages we receive and how they influence the control flow, and arrive at the end of the sync block. Hence each component can figure out independently which complete trace is the solution to its consensus problem.

However, as we’ve outlined above, we cannot know exactly which messages we’re going to receive. We’ll have to discover these messages while executing a component. The next best thing is to keep track of the values of the messages that we’ve received in a complete trace. Once we have complete traces for all of the interacting components, we can check that the received value corresponds to a sent value. e.g.

primitive sender(out<u32> tx) {
  sync {
    fork {
      put(tx, 1);   
    } or fork {
      put(tx, 2);
    }
  }
}

primitive receiver(in<u32> rx) {
  u32 value = 0; 
  sync {
    value = get(rx);
  }
}

Where tx is part of the same channel as rx. In this case we’ll have two traces for each of the components, resulting in two valid global consensus solutions. In one solution the message 1 was transferred, in another the message 2 was transferred. There are two problems with this solution: firstly it doesn’t take the identity of the channel into account. And secondly it doesn’t take the effects of previous messages into account.

To illustrate the first problem, consider:

primitive sender(out<u32> tx_a, out<u32> tx_b) {
  u32 some_variable_in_memory = 0;
  sync {
    fork {
      put(tx_a, 1);
      some_variable_in_memory = 1;
    } or fork {
      put(tx_b, 1);
      some_variable_in_memory = 2;
    }
  }
}

primitive receiver(in<u32> rx_a, in<u32> rx_b) {
  u32 value = 0; 
  sync {
    value = get(rx_a);
  }
}

Here the fact that the sender has the solutions 1 and 1 does not help the receiver figure out which of those corresponds to its own solution of 1.

To illustrate the second problem, consider:

primitive sender(out<u32> tx) {
  sync {
    fork {
      put(tx, 1);
      put(tx, 2);
    } or fork {
      put(tx, 2);
    }
  }
}

primitive receiver(in<u32> rx) {
  u32 value = 0; 
  sync {
    value = get(rx);
  }
}

Now we’ll have sender contributing the solutions 1, 2 and 2. While the receiver will generate the solutions 1, 2 and 2. The reason there are three solutions for the receiver is because it cannot figure out that the message 2 from the sender depended on the first message 1 from the sender having arrived.

So we need to change the algorithm. Instead of just tracking which messages were sent, each component needs to have a mapping from port identities to sent messages (internally the runtime will generate port/channel IDs, but for the sake of this discussion we’ll use the postfixes to the port names in the PDL code to indicate to which channel they belong, e.g. the tx_a out-port is part of the same channel a as the rx_a in-port). Secondly, if we sent a message, we need to transmit in which way it depends on previously received messages by sending along the sender’s port mapping. The receiver, upon getting a message, checks the port mapping to see if there is any of its own executions that can accept that message.

We’re already calling this information the “port mapping” (because we’ll truly turn it into a mapping later), but for now the sent mapping is a list of pairs containing (port ID, sent value).

This changes the way we can interpret the execution tree: each node is not only associated with the performed operation (fork, put or get), but also associated with a particular port mapping that indicates the influence of other components that allowed it to reach that exection node. We modify the port mapping per node in the following way:

  • For a fork: we fork the execution as many times as needed, and for those forks we copy the port mapping of the ancestor node in the execution tree.
  • For a put: we transmit the current port mapping, and the transmitted value in the message. We then update the mapping from the sending port: that particular port ID now maps to the recently transmitted value.
  • For a get: we receive the transmitted port mapping and value. Note that this get might be a particular statement executed by multiple different forked executions (each with their own port mapping). And so for a get to succeed, we need the shared channels between the sender and the receiver to agree on their port mapping. If such a get succeeds, then it forks into a new execution where the receiving port now maps to the received value.

So, for a slightly more complicated example, combining the two previously examples:

primitive initiator(out<u32> tx_a, out<u32> tx_b, in<u32> rx_c) {
  u32 value = 0;
  sync {
    put(tx_a, 1); // operation i1
    fork {
      put(tx_a, 1); // operation i2
      value = get(rx_c); // operation i3
    } or fork {
      put(tx_b, 2); // operation i4
      value = get(rx_c); // operation i5
    }
  }
}

primitive responder(in<u32> rx_a, in<u32> rx_b, out<u32> tx_c) {
  sync {
    auto value_1 = get(rx_a); // operation r1
    auto value_2 = 0;
    fork {
      value_2 = get(rx_a); // operation r2
    } or fork {
      value_2 = get(rx_b); // operation r3
    }
    put(tx_c, value1 + value2); // operation r4
}

Here, once the components have brought as much forked executions to completion as possible, we’ll have the following execution trees (and mappings). The square bracketed terms denote port mapping. The parenthesized terms correspond to the operations in the code, and the curly bracketed terms are the names for the traces (so we can refer to them in this document).

For the initiator:

sync  --> put (i1) --> fork +--> put (i2) -----> get (i3) -+-----> sync end {A}
start     [(a,1)]           |    [(a,1),(a,1)]   [(a,1),(a,1),(c,2)]
                            |                              |
                            |                              |
                            |                              +-> ...
                            |
                            +--> put (i4) -----> get (i5) -+-----> sync end {B}
                                 [(a,1),(b,2)]   [(a,1),(b,2),(c,3)]
                                                           |
                                                           |
                                                           +-> ...

For the responder:

sync  --> get (r1) -+--> fork +--> get (r2) -----> put (r4) ----> sync end {C}
start     [(a,1)]   |         |    [(a,1),(a,1)]   [(a,1),(a,1),(c,2)]
                    |         |
                    +-> ...   +--> get (r3) -----> put (r4) ----> sync end {D}
                                   [(a,1),(b,2)]   [(a,1),(b,2),(c,3)]

We can see that the put operation at i2 does not end up being received at r1. The reason being that at r1 the responder expects to not have received anything on rx_a yet. The message that the initiator sends contains the annotation [(a,1),(a,1)], meaning: I have previously sent [(a,1)], and am now sending [(a,1)]. The only operation that can receive this operation is at r2, because that expects the mapping [(a,1)]!

Similarly: when the responder puts at r4, this happens in two branches. The branch that ends up being trace C expects the initiator to be in the state [(a,1),(a,1)] (hence can only be received at operation i3, resulting in trace A of the initiator). The branch that ends up being trace D expects the initiator to be in the state [(a,1),(b,2)] (hence can only be received at operation i5, resulting in trace B of the initiator).

And ofcourse, when both components are finished, they can compare the mappings in both of them and conclude that the traces A and C are compatible, since their port mappings are compatible. Similarly traces B and D are compatible. So there are two global solutions to the consensus problem.

For the sake of simplicity, we’ve only considered two components. But there may be many more components involved in a single synchronous region. In this case we’ll have to clarify when receiving a message is valid. When a message is sent to another component, the receiving component first filters the port mapping (both for the mapping stored in the execution tree and the mapping sent over the channel) such that only the channels shared between the two components are left. If those two mappings are equal, then the message can be received in that branch.

The same process needs to be applied when we seek a global solution. In rather silly pseudocode, but the simplest way to explain this process, is to have the following algorithm seeking the global solution:

all_components = [ component_1, component_2, ..., component_N ]
global_solutions = []

// Nested loop through all components
for all complete traces in component_1:
  for all complete traces in component_2:
    ...
      ...
        for all complete traces in component_N:
          let success = true;
          let all_traces = [trace_of_1, trace_of_2, ..., trace_of_N];
          
          // Looping through every pair of traces
          for index_a in 0..N:
            for index_b in index_a + 1..N:
              let component_a = all_components[index_a]
              let trace_a = all_traces[index_a]
              let component_b = all_components[index_b]
              let trace_b = all_traces[index_b]
              
              trace_a = filter_on_shared_channels(trace_a, component_a, component_b)
              trace_b = filter_on_shared_channels(trace_b, component_a, component_b) 
            
              if trace_a != trace_b:
                success = false;
                break
            
            if !success:
              break
              
          if success:
            global_solutions = append(global_solutions, all_traces)

We’ll now apply the last bit of trickery to this algorithm. Firstly keeping track of the sent message values may be prohibitively expensive. Suppose some kind of streaming data processor that receives gigabytes of data in a single synchronous round. It would be an unwise design decision to store all of that data in the port mapping. So what we’ll do instead is assinging each node in the execution tree a unique number (unique only for that execution tree, different execution trees might contain the same number). With that unique number we’ll redefine the port mapping to consist of a list of (port ID, branch number) pairs.

The reason this is a valid trick is because of the arguments made earlier regarding control flow being influenced by received messages. If we know how a component was influenced by external influences, then the control flow path it takes is deterministic, hence the content of the sent messages will be deterministic. Locally a component A may only describe the way it was influenced by its peer B, but B also records how it was influenced by its peers C and D. So transitively A will also know the indirect mutual influences between it and C and D.

Lastly, we can turn the list of (port ID, branch number) pairs into a true mapping {port ID -> branch number}, we do not actually need to keep the entire history around. The reason behind this is the fact that the get operation is blocking and requires the sent port mapping to be compatible with its execution node’s port mapping. So when a get operation succeeds, it agrees that the executions of both sender and receiver are compatible up until that point. Continued execution only has to check that the subsequent interactions are compatible up until that point. Consider the following cases:

  • A get operation never receives a message: in that case we keep blocking indefinitely, we’ll never get a complete trace, hence are prevented from finding a local solution.
  • A get operation receives a message containing an incompatible port mapping: in that case we have roughly the same case as above. The message is not accepted and we keep blocking indefinitely. The interpretation is different: the sender and the receiver did not agree on the control flow paths they took.
  • A get operation receives a message containing a compatible port mapping: in this case both components agree on the order of operations that took place for the message to be transferred.
  • A put operation is never received: again, we’re fine. The putting component might submit a local solution for a complete trace. But the mapping for that never-received message will also never appear in the supposed receiver’s port mapping. Hence the two components will never agree on the control flow paths they took.

The Consensus Algorithm

With the story above, we may describe the complete consensus finding algorithm as following.

Each component will locally construct an execution tree. Branches appear whenever it encounters a fork (producing a set of new branches), a put operation (this will create a single new branch) or a get operation (which will create new branches if the sent message’s port mapping is compatible with the port mapping stored at that get‘s branch). Whenever a new branch is created it is assigned a locally unique identifier.

The port mapping at each branch consists of a mapping {port ID -> branch number}. This port mapping starts out empty at the start of a sync block. When a component puts a value through a channel, it will:

  • Generate the put branch and assign it a unique ID.
  • Send the message that is annotated with the ancestor branch’s port mapping. Together with the (sending port's ID, newly assigned branch ID) pair.
  • Update the port mapping of the put branch: it will copy the ancestor branch’s port mapping, and update it with the (send port's ID, newly assigned branch ID) pair. In case a previous entry is present for that specific port ID, then it is overwritten.

When a component encounters a fork, it will simply copy the ancestor branch’s port mapping for each new branch. Each new branch will receive a new unique ID. When a component performs a get, it will block until it receives a message annotated with a port mapping. If that happens, then it will:

  • Compare the port mapping in the message with the branch’s port mapping. If one of the shared channels do not agree on the port mapping, then the message is not valid for that get operation. Note that the message must be kept around, because in the future there may be a get operation that is valid for that port mapping.
  • If the port mapping for the shared channels do agree, then we:
    • Generate a new fork originating from the get branch and assign it a unique ID.
    • Update the port mapping of the get branch: copy the ancestor branch’s port mapping, and update it with the (sending port ID, peer's branch number) pair.

Reasons for not implementing this Consensus Algorithm

There are a lot of practical issues with this consensus algorithm:

  1. The fact that a get operation never knows when it will receive a new message requires us to keep a complete copy of the component’s memory and execution state at that point. Hence for each get operation we’re incurring a rather large memory overhead.
  2. The fact that we never know if a received message can be discarded because it cannot be received by any of the get operations in the component’s code. There may be another message coming in that causes a fork with a get operation that can receive this message. Hence we need to keep around all of the messages received in a synchronous round.
  3. The incredible computational complexity of finding a global solution to the consensus algorithm. We need to check for each component all of its completed traces. For all of those N components, each supplying T traces (to simplify the math), we need to check each pair of traces. So the maximum computational complexity is (N*T)*(N*(T-1))/2. In reality this is a bit less, because we can very likely quickly eliminate certain traces.
  4. Considering the previous points: the simplicity (especially when running over the internet) for a nefarious actor to incur computational overhead for the receiver. All it has to do is to keep sending messages to the receiver with an acceptable port mapping, but to never offer the consensus algorithm a valid local solution. Each accepted message will spawn a new fork at the receiver.

There is 1 comment

The comments are closed.