Project documentation<\/li><\/ol>\n\n\n\nFurthermore, this release has fixed a number of bugs that were present in previous releases. The final section shows the vision for the future of Reowolf.<\/p>\n\n\n\n
Select statements<\/h2>\n\n\n\n
We have reworked the component synchronization mechanism, and the underlying consensus algorithm supporting components.<\/p>\n\n\n\n
Imagine we instantiate a data_producer<\/code> a number of times (say a<\/em> b<\/em> c<\/em>), and link them up with a data_receiver<\/code>. The data receiver takes a datum from one of the producers, one by one.<\/p>\n\n\n\nIn the old synchronization mechanism, all data producers had to indicate they were ready to synchronize, even<\/em> when only one producer actually gives data for the receiver to process. So the following example causes the inadvertent synchronization of all participating components, which causes all producing components to wait on each other:<\/p>\n\n\n\ncomp<\/strong> data_producer(out<\/strong><u64<\/strong>> tx, u64<\/strong> min_val, u64<\/strong> max_val) {\n while<\/strong> (true<\/strong>) {\n sync<\/strong> {\n auto<\/strong> value = lots_of_work(min_val, max_val);\n put<\/strong>(tx, value);\n }\n }\n}\n\ncomp<\/strong> data_receiver_v1(in<\/strong><u64<\/strong>> rx_a, in<\/strong><u64<\/strong>> rx_b, in<\/strong><u64<\/strong>> rx_c, u32<\/strong> num_rounds) {\n u32<\/strong> counter = 0;\n auto<\/strong> rxs = { rx_a, rx_b, rx_c };\n while<\/strong> (counter < num_rounds) {\n auto<\/strong> num_peers = length<\/strong>(rxs);\n auto<\/strong> peer_index = 0;\n while<\/strong> (peer_index < num_peers) {\n sync<\/strong> {\n auto<\/strong> result = get<\/strong>(rxs[peer_index]);\n peer_index += 1;\n }\n }\n counter += 1;\n }\n}<\/pre>\n\n\n\nThe reason was that a synchronous interaction checked all<\/em> ports for a valid interaction. So for the round robin receiver we have that it communicates with one peer per round, but it still requires the other peers to agree that they didn’t send anything at all! Note that this already implies that all running components need to synchronize. We could fix this by writing:<\/p>\n\n\n\ncomp<\/strong> data_receiver_v2(in<\/strong><u64<\/strong>> rx_a, in<\/strong><u64<\/strong>> rx_b, in<\/strong><u64<\/strong>> rx_c, u32<\/strong> num_rounds) {\n u32<\/strong> counter = 0;\n auto<\/strong> rxs = { rx_a, rx_b, rx_c };\n while<\/strong> (counter < num_rounds) {\n auto<\/strong> num_peers = length<\/strong>(rxs);\n auto<\/strong> peer_index = 0;\n sync<\/strong> {\n while<\/strong> (peer_index < num_peers) {\n auto<\/strong> result = get<\/strong>(rxs[peer_index]);\n peer_index += 1;\n }\n }\n counter += 1;\n }\n}<\/pre>\n\n\n\nBut this is not the intended behavior. We want the producer components to be able to run independently of one another. This requires a change in the semantics of the language! We no longer have that each peer is automatically dragged into the synchronous round. Instead, after the first message of the peer is received through a get<\/code> call, will we merge each other’s synchronous rounds.<\/p>\n\n\n\nWith such a change to the runtime, we now have that the first version (written above) produces the intended behavior: the consumer accepts one value and synchronizes with its sender. Then it goes to the next round and synchronizes with the next sender.<\/p>\n\n\n\n
But what we would really like to do is to synchronize with any of the peers that happens to have its work ready for consumption. And so the select statement is introduced into the language. This statement can be used to describe a set of possible behaviors we could execute. Each behavior will have an associated set of ports. When those associated set of ports have a message ready to be read, then the corresponding behavior will execute. So to complete the example above, we have:<\/p>\n\n\n\n
comp<\/strong> data_receiver_v3(in<\/strong><u64<\/strong>> rx_a, in<\/strong><u64<\/strong>> rx_b, in<\/strong><u64<\/strong>> rx_c, u32<\/strong> num_rounds) {\n u32<\/strong> counter = 0;\n auto<\/strong> rxs = { rx_a, rx_b, rx_c };\n\n u32<\/strong> received_from_a = 0;\n u32<\/strong> received_from_b_or_c = 0;\n u32<\/strong> received_from_a_or_c = 0;\n u64<\/strong> sum_received_from_c = 0;\n\n while<\/strong> (counter < num_rounds*3) {\n sync<\/strong> {\n select<\/strong> {\n auto<\/strong> value = get<\/strong>(rx_a) -> {\n received_from_a += 1;\n received_from_a_or_c += 1;\n }\n auto<\/strong> value = get<\/strong>(rx_b) -> {\n received_from_b_or_c += 1;\n }\n auto<\/strong> value = get<\/strong>(rx_c) -> {\n received_from_a_or_c += 1;\n received_from_b_or_c += 1;\n sum_received_from_c += value;\n }\n }\n }\n counter += 1;\n }\n}<\/pre>\n\n\n\nRun-time error handling<\/h2>\n\n\n\n
We have an initial implementation for error handling and reporting. Roughly speaking: if a component has failed then it cannot complete any current or future synchronous rounds anymore. Hence, apart from some edge cases, any (attempted) received message by a peer should cause a failure at that peer as well. We may have a look at the various places where a component can crash, and how its neighboring peer handles receiving messages: sometimes the crash of the first component propagates, and sometimes it is blocked.<\/p>\n\n\n\n
enum<\/strong> ErrorLocation {\n BeforeSync,\n DuringSyncBeforeFirstInteraction,\n DuringSyncBeforeSecondInteraction,\n DuringSyncAfterInteractions,\n AfterSync,\n}\n\nfunc<\/strong> crash() -> u8<\/strong> {\n return {}[0]; \/\/ access index 0 of an empty array\n}\n\ncomp<\/strong> sender_and_crasher(out<\/strong><u32<\/strong>> value, ErrorLocation loc) {\n if<\/strong> (loc == ErrorLocation::BeforeSync) { crash(); }\n sync<\/strong> {\n if<\/strong> (loc == ErrorLocation::DuringSyncBeforeFirstInteraction) { crash(); }\n put<\/strong>(value, 0);\n if<\/strong> (loc == ErrorLocation::DuringSyncBeforeSecondInteraction) { crash(); }\n put<\/strong>(value, 1);\n if<\/strong> (loc == ErrorLocation::DuringSyncAfterInteractions) { crash(); }\n }\n if<\/strong> (loc == ErrorLocation::AfterSync) { crash(); }\n}\n\ncomp<\/strong> receiver(in<\/strong><u32<\/strong>> value) {\n sync<\/strong> {\n auto<\/strong> a = get<\/strong>(value);\n auto<\/strong> b = get<\/strong>(value);\n }\n}\n\ncomp<\/strong> main() {\n channel<\/strong> tx -> rx;\n\n new<\/strong> sender_and_crasher(tx, ErrorLocation::AfterSync);\n new<\/strong> receiver(rx);\n}<\/pre>\n\n\n\nNote that when we run the example with the error location before sync, or during sync, that the receiver always crashes. However the location where it will crash is somewhat random! Due to the asynchronous nature of the runtime, a sender of messages will always just put<\/code> the value onto the port and continue execution. So even though the sender component might already be done with its sync round, the receiver officially still has to receive its first message. In any case, a neat error message is displayed in the console (or in some other place where such diagnostics are reported).
Note that, especially, given the asynchronous nature of the runtime, the receiver should figure out when the peer component has crashed, but it can still finish the current synchronous round. This might happen if the peer component crashes just<\/em> after the synchronous round. However, there may be a case where the peer receives the information that the peer crashed before<\/em> it receives the information that the synchronous round has succeeded.<\/p>\n\n\n\nTransmitting ports through ports<\/h2>\n\n\n\n
Since this release transmitting ports is possible. This means that we can send ports through ports. In fact, we can send ports that may send ports that may send ports, etc. But don’t be fooled by the apparent complexity. The inner type T<\/code> of a port like in<T><\/code> simply states that that is the message type. Should the type T<\/code> contain one or more ports, then we kick off a bit of code that takes care of the transfer of the port. Should the port inside of T<\/code> itself, after being received, send a port, then we simply kick off that same procedure again.
In the simplest case, we have someone transmitting the receiving end of a channel to another component, which then uses that receiving end to receive a value. The example below shows this:<\/p>\n\n\n\ncomp<\/strong> port_sender(out<\/strong><in<\/strong><u32<\/strong>>> tx, in<\/strong><u32> to_transmit) {\n sync<\/strong> put<\/strong>(tx, to_transmit);\n}\n\ncomp<\/strong> port_receiver_and_value_getter(in<\/strong><in<\/strong><u32<\/strong>>> rx, u32<\/strong> expected_value) {\n u32<\/strong> got_value = 0;\n sync<\/strong> {\n auto<\/strong> port = get<\/strong>(rx);\n got_value = get<\/strong>(port);\n }\n if<\/strong> (expected_value == got_value) {\n print(\"got the expected value :)\");\n } else<\/strong> {\n print(\"got a different value :(\");\n }\n}\n\ncomp<\/strong> value_sender(out<\/strong><u32<\/strong>> tx, u32<\/strong> to_send) {\n sync<\/strong> put<\/strong>(tx, to_send);\n}\n\ncomp<\/strong> main() {\n u32<\/strong> value = 1337_2392;\n\n channel<\/strong> port_tx -> port_rx;\n channel<\/strong> value_tx -> value_rx;\n new<\/strong> port_sender(port_tx, value_rx);\n new<\/strong> port_receiver_and_value_getter(port_rx, value);\n new<\/strong> value_sender(value_tx, value);\n}<\/pre>\n\n\n\nOf course we may do something a little more complicated than this. Suppose that we don’t just send one port, but send a series of ports. i.e. we use an Option<\/code> union type, to turn an array of ports that we’re going to transmit into a series of messages containing ports, each sent to a specific component.<\/p>\n\n\n\nunion<\/strong> Option<T> {\n Some(T),\n None,\n}\n\ncomp<\/strong> port_sender(out<\/strong><Option<in<\/strong><u32<\/strong>>>>[] txs, in<\/strong><u32<\/strong>>[] to_transmit) {\n auto<\/strong> num_peers = length<\/strong>(txs);\n auto<\/strong> num_ports = length<\/strong>(to_transmit);\n\n auto<\/strong> num_per_peer = num_ports \/ num_peers;\n auto<\/strong> num_remaining = num_ports - (num_per_peer * num_peers);\n\n auto<\/strong> peer_index = 0;\n auto<\/strong> port_index = 0;\n while<\/strong> (peer_index < num_peers) {\n auto<\/strong> peer_port = txs[peer_index];\n auto<\/strong> counter = 0;\n\n \/\/ Distribute part of the ports to one of the peers.\n sync<\/strong> {\n \/\/ Sending the main batch of ports for the peer\n while<\/strong> (counter < num_per_peer) {\n put<\/strong>(peer_port, Option::Some(to_transmit[port_index]));\n port_index += 1;\n counter += 1;\n }\n\n \/\/ Sending the remainder of ports, one per peer until they're gone\n if<\/strong> (num_remaining > 0) {\n put<\/strong>(peer_port, Option::Some(to_transmit[port_index]));\n port_index += 1;\n num_remaining -= 1;\n }\n\n \/\/ Finish the custom protocol by sending nothing, which indicates to\n \/\/ the peer that it has received all the ports we have to hand out.\n put<\/strong>(peer_port, Option::None);\n }\n\n peer_index += 1;\n }\n}<\/pre>\n\n\n\nAnd here we have the component which will receive on that port. We can design the synchronous regions any we want. In this case when we receive ports we just synchronize port_sender<\/code>, but the moment we receive messages we synchronize with everyone.<\/p>\n\n\n\ncomp<\/strong> port_receiver(in<\/strong><Option<in<\/strong><u32<\/strong>>>> port_rxs, out<\/strong><u32<\/strong>> sum_tx) {\n \/\/ Receive all ports\n auto<\/strong> value_rxs = {};\n\n sync<\/strong> {\n while<\/strong> (true<\/strong>) {\n auto<\/strong> maybe_port = get<\/strong>(port_rxs);\n if<\/strong> (let<\/strong> Option::Some(certainly_a_port) = maybe_port) {\n value_rxs @= { certainly_a_port };\n } else<\/strong> {\n break<\/strong>;\n }\n }\n }\n\n \/\/ Receive all values\n auto<\/strong> received_sum = 0;\n\n sync<\/strong> {\n auto<\/strong> port_index = 0;\n auto<\/strong> num_ports = length<\/strong>(value_rxs);\n while<\/strong> (port_index < num_ports) {\n auto<\/strong> value = get<\/strong>(value_rxs[port_index]);\n received_sum += value;\n port_index += 1;\n }\n }\n\n \/\/ And send the sum\n sync<\/strong> put<\/strong>(sum_tx, received_sum);\n}<\/pre>\n\n\n\nNow we need something to send the values, we’ll make something incredibly simple. Namely:<\/p>\n\n\n\n
comp<\/strong> value_sender(out<\/strong><u32<\/strong>> tx, u32<\/strong> value_to_send) {\n sync<\/strong> put<\/strong>(tx, value_to_send);\n}\n\ncomp<\/strong> sum_collector(in<\/strong><u32<\/strong>>[] partial_sum_rx, out<\/strong><u32<\/strong>> total_sum_tx) {\n auto<\/strong> sum = 0;\n auto<\/strong> index = 0;\n while<\/strong> (index < length<\/strong>(partial_sum_rx)) {\n sync<\/strong> sum += get<\/strong>(partial_sum_rx[index]);\n index += 1;\n }\n sync<\/strong> put<\/strong>(total_sum_tx, sum);\n}<\/pre>\n\n\n\nAnd we need the component to set this entire system of components up. So we write the following entry point.<\/p>\n\n\n\n
comp<\/strong> main() {\n auto<\/strong> num_value_ports = 32;\n auto<\/strong> num_receivers = 3;\n\n \/\/ Construct the senders of values\n auto<\/strong> value_port_index = 1;\n auto<\/strong> value_rx_ports = {};\n while<\/strong> (value_port_index <= num_value_ports) {\n channel<\/strong> value_tx -> value_rx;\n new<\/strong> value_sender(value_tx, value_port_index);\n value_rx_ports @= { value_rx };\n value_port_index += 1;\n }\n\n \/\/ Construct the components that will receive groups of value-receiving\n \/\/ ports\n auto<\/strong> receiver_index = 0;\n auto<\/strong> sum_combine_rx_ports = {};\n auto<\/strong> port_tx_ports = {};\n\n while<\/strong> (receiver_index < num_receivers) {\n channel<\/strong> sum_tx -> sum_rx;\n channel<\/strong> port_tx -> port_rx;\n new<\/strong> port_receiver(port_rx, sum_tx);\n\n sum_combine_rx_ports @= { sum_rx };\n port_tx_ports @= { port_tx };\n receiver_index += 1;\n }\n\n \/\/ Construct the component that redistributes the total number of input\n \/\/ ports.\n new<\/strong> port_sender(port_tx_ports, value_rx_ports);\n\n \/\/ Construct the component that computes the sum of all sent values\n channel<\/strong> total_value_tx -> total_value_rx;\n new<\/strong> sum_collector(sum_combine_rx_ports, total_value_tx);\n\n