# Publishing and applying messages for DAC in Scoru.
This document follows Yann's proposal 2 for DAC (https://hackmd.io/HI-QphljRNSv3E57duri3g?view), and focuses on the changes that need to be done to enable publishing DAC messages that will be fetched by the rollup node and applied by the Wasm PVM.
### Functional Requirements:
1. DAC Pages can be retrieved by the Data Availability Committee for signing
2. Once a DAC page has been signed by enough members of the data availability committee, its hash is posted on the L1 inbox
3. Rollup nodes can import into the PVM messages whose hash has been requested by the latter.
### Non-functional requirements:
1. Reasonable usage of rollup node - Changes to the rollup node should be minimal
2. Reasonable latency
- The majority of messages should be available to the rollup node by the time the PVM requests them.
- Must be usable to meet the 1 MTPS for the Demo (https://hackmd.io/ApUzqkGMSwifsgb4sFLmEw)
3. Only limited by the amount of memory available
# Proposal
I propose to go for an incremental implementation of the workflow for publishing DAC messages. The workflow follows the assumption that there is no indication from the L1 about what messages should be made available to rollup nodes.
## Phase 1. Single rollup node.
Goals:
1. Messages containing requests to read a DAC page can be pushed into the L1.
2. Enable testing of publishing and importing DAC messages via Tezt.
A standalone service is equipped with a command for publishing data as files into the rollup node. This service can be the dal or rollup node running in a new DAC mode, and it should work as a sidecar for the rollup node. In particular, it should have access to the folder from where the rollup node reads Reveals data. It should have access to the private keys of all (or a majority of) the members of the DAC committee, for computing the aggregated signature that attests the availability of the data.
The sidecar service exposes a single POST endpoint
```
POST reveal-data
```
By calling the endpoint above using `<data>` as the request payload, the following happens:
1. If the size of `<data>` is less than 4Kb, then
1. the contents of `<data>` are are hashed, and we let `<hash>` be the result,
2. the final hash is signed by the DAC committee to obtain `<signature>`.
3. A file with contents `<data'>` and name `<hash>` is written into the folder that the rollup node uses to retrieve DAC messages. Here `<data'>` is obtained from `<data>` using the format described in [Quantitative analysis](https://hackmd.io/PBNbp7gfRbevwdnCjMlmng?view#Quantitative-analysis)
4. A message that a new DAC message with `<hash>` hash and signature `<signature>` is available is injected into the L1, and the procedure terminates.
2. If the size of `<data>` is more than 4Kb, then the file is chunked into blocks of 4Bk each.
1. For each chunk
1. the contents of the chunk are hashed to `<chunk-hash>`
2. A file with contents `<signed-chunk>` and name `<chunk-hash>` is saved in the directory that the rollup node uses to send
2. All the `<chunk-hash>`-es from step 2.1 are grouped together, and the result is processed again from step 1 as if it were a new file.
Chunks contructed in the first iteration of the procedure above contain the contents of the original file, and therefore they will be referred to as _data-chunks_. Chunks constracted in further iterations of the algorithm will contain hashes of other chunks, and therefore are referred to as _pointer-chunks_.
Note that even for large files, the procedure terminates with a single message containing a hash and a signature being posted to L1. Also, it is always possible to reconstruct the original file from the first page whose hash is injected in the layer 1 (provided that all the pages are stored on disk).
Also, by the time a message is posted into the L1, the signed contents of all the chunks that made up the file are available to the rollup node.

:::warning
:warning: The format of all files produced by the service will need to be as expected by the PVM kernel.
:warning: Concurrent accessed to disk must be taken care of. Because one process is responsible for writing data, and another is responsible for reading data, we can use dotlocking (http://wiki.call-cc.org/eggref/5/dot-locking) to prevent concurrent access to data.
:::
### Quantitative analysis
:::danger
Check the details later with Emma Turner/Tx Kernel folks, once the format for DAC pages and DAC messages has been confirmed.
:::
Size of messages published to L1:
Currently the Arith PVM resuses hashes of SC rollup inboxes for hashing the contents of a file. These occupy **20 bytes**.
BLS signatures were proposed for validating hashes from the Data Availability Committee. These occupy **48 bytes**. Assuming one byte is used to ima message request for revealing a data hash should occupy at most **68 bytes**, rounding up to **100** bytes for safety. The transaction list of a block is limited to **500kb**, meaning that if we only post one hash per rollup per level, we can handle around **5120** rollups, rounding down to **5000** for safety.
For the 1MTPS demo, we require **1000 rollups** processing **1000 TPS** per second. Blocks are published every **30 seconds**, hence each hash published as a message on L1 will need to contain **30_000 txs**.
For data chunks, we can use **1 byte** to distinguish the page format (data-chunk vs pointer-chunk), followed by **4 bytes** denoting the length of the rest of the chunk. Thus, each data-chunk of **4096 bytes** can use **4091 bytes** to store the transaction list. Transactions will require (amortized) **12 bytes** of storage each, which means that we can store a total of **340 txs per data chunk**. In order to store **30_000 txs** we will require **ceil(30_000 / 340) = 89 data chunks**, for a total of **(4096 bytes * 89) = 356 Kbytes** per rollup. Each data chunk will need to be reffered in a pointer page. Hashes of data-chunks require 20 bytes each, meaning that each pointer page can contain **(4091/20 = 204)** hashes of data-chunks. We only need **1 pointer page** to store the pointers of all the 89 data-chunks, or equivalently **4 kbytes**, hence the total storage required per rollup is around **360 Kbytes**. For 1000 rollups, the total storage required is **(360 * 1000) / 1024 = 351 MB**
## Phase 1.5: Use rsync to handle multiple rollup nodes.
As a first attempt to deliver messages to multiple rollup nodes, we can deploy the stand-alone service remotely and use rsync to copy the data it produces to the directories monitored by several rollup nodes. However, rsync might be quite slow in coping the data.
## Phase 2: decouple the stand-alone service
Throughout the implementation of phase 1, the following requirements from a fully functional DAC were identified:
### Functional Requirements
1. The user must be able to send an individual message to the DAC system.
2. The DAC manager should store aggregated individual messages and advertise the relative root page hash
3. DAC members should be able to download the payload corresponding to an advertised root page hash
4. DAC members must advertise their signature of root page hashes whose content they have downloaded
5. The DAC member must provide the data corresponding to a root page hash it has signed, upon request.
6. The DAC manager must compute the aggregated signature of a root page hash, and inject it to the L1 if enough signatures are collected
7. The rollup node must be able to request for missing pages from the DAC members
8. The Kernel must be able to retrieve the whole payload of a signed root page hash.
### Non functional requirements
1. Storage of DAC payloads should be replicated.
2. The amount of DAC payloads that are signed by a sufficent amount of DAC members is maximised,
3. Latency should be kept at a minimum. Requests from the rollup node to import data from a DAC member is minimised.
4. Requests to download data is spread evenly across DAC members.
5. Optional -
In order to identify a design that satisfies both functional and non-functional requirements, the following questions need to be answered:
1. Who is responsible for aggregating individual messages into dac payloads?
2. Could the access to the DAC be regulated (only signed messages from allowed accounts can send to DAC)?
3. How is data distributed and signed?
4. Who can send external messages to L1?
5. How often are external messages sent to L1?
6. Do we want the Dal node to handle multiple rollups?
7. How do the kernel and the Dal node sync on the DAC members?
8. How do we ensure that the rollup node can retrieve the data from the DAL node?
9. How do we deploy the infrastructure to cloud services?
The questions above have different answers that lead to different tradeoffs for the DAC. These tradeoffs are considered in a [separate document](https://hackmd.io/kIeBsTewQ5SxMu4dxY33Xg?view). Here we only outline the final design proposed.
### Proposed design
**Design has been updated to account for changes after meeting with NL.**
In the proposed design, a DAL node will be able to operate in one of three modes:
1. A coordinator mode. The coordinator is responsible for the following:
* receiving a payload consisting of sequenced messages,
* split the payload in pages of 4096 bytes each, according to the scheme proposed in Phase 1, and compute the final root page hash
* stream root page hashes to DAC members (described below)
* Receive signatures of root page hashes from DAC members
* aggregating root page hashes signatures that have met the threshold required to attest the data as available,
* Optional - receive updates about the DAC (list of members and threshold of signatures needed to consider the payload of a root page hash available).
2. A Committee member mode. The committee member is responsible for the following
* Receive root page hashes streamed by the coordinator,
* Download and store payloads corresponding to root ,page hashes from the coordinator
* Sign root page hashes whose payload has been downloaded,
* Communicate signatures of root pages to the coordinator,
* Provide payloads corresponding to signed root page hashes, upon request.
:::warning
:warning: in the initial design the coordinator was responsible for aggregating messages from different users into a single payload. Upon meeting with NL engineers, it was agreed that this responsibility should fall to the sequencers of rollups. In the future we might consider extending the Framing payloads meant for different rollups in the coordinator.
:::
3. An observer mode. An observer is responsible for the following:
* Receive root page hashes streamed by the coordinator,
* Download and store payloads corresponding to root page hashes from the coordinator.
* Receive requests for missing data from the rollup node, and forward it to DAC members
Each of these operating modes is described in more detail below. A picture summarising the overall design is also included here:

### The DAC coordinator
The DAC coordinator consists of several component:
* An RPC server to serve requests from users, and to receive signatures from DAC members
* A L1 tracker to track new heads from the blockchain. Only the protocol hash and the current level will need to be available to the DAC node,
* ~~A message aggregator to aggregate messages into payloads and produce the corresponding root page hashes~~
* An endpoint to receive payloads that need to be signed
* A data streamer to stream root page hashes to DAC members
* A page storage manager to write pages to/read pages from an external storage
* A signature storage manager to save and retrieve signatures of root page hashes to an external storage/cache.
* ~~An injector, to send signed root page hashes as external rollup messages to the L1 node.
* An external endpoint where signed messages will be sent
#### RPC Server
The RPC server exposes the following endpoints:
* `PUT /dac/payload`, used by the user to request the DAC to store and sign a payload.
* `GET /dac/pages/{page_hash}` to retrieve the contents of a page hash,
* `PUT /dac/signatures/{root_page_hash}` to collect a signature for a root page hash from a dac member.
The following is an OpenAPI specification of the endpoints that will be provided by the DAC coordinator:
```openapi=
paths:
/preimage:
Put:
summary: Stores the preimage in a sequence of pages
requestBody:
required: true
schema:
type: string
responses:
'200':
description: The payload was split successfully into a sequence of pages
content:
application/json:
schema:
type: object
properties:
root_page_hash:
type: string
required: true
description: the root page hash of the preimage
expiration_level:
type: int
required: true
description: The expiration level for the preimage
get:
summary: Retrieves a page by its page hash and returns its contents. {page_hash} is the b58 encoded page hash
responses:
'200':
description: The page was retrieved successfully
content:
application/json:
schema:
type: string
description: The contents of the page
'404':
description: The page was not found
/signature/{root_page_hash}:
put:
summary: Stores a signature for a given root page hash
requestBody:
required: true
schema:
type: object
properties:
address:
type: string
required: true
description: The tz4 address of the DAC member
signature:
type: string
required: true
description: The bls12-381 signature of the root page hash and expiration level
responses:
'200':
description: The signature is valid and has been stored by the signature storage manager
'400':
description: The signature is not valid, or the signee is not a dac member
content:
application/json:
schema:
type: string
required: true
```
Deprecated API (from original design)
```openapi=
paths:
/send_message:
post:
summary: Adds a message to be included in the current DAC payload
requestBody:
required: true
schema:
type: string
responses:
'200':
description: Message succesfully injected into the current DAC payload
```
#### Entrypoint to the DAC
Payloads are sent to the dac coordinator via the `PUT /dac/preimage` endpoint. Upon receiving a payload, the coordinator will proceed to compute the merkle tree of pages and store each page on disk. It will also compute an expiration by summing the `current_level` of the L1 node that the coordinator uses to track heads, and an `expiration_lag` that will be specified in the node configuration. Root page hashes and expiration levels are then streamed via the Data_streamer, described next.
:::danger
:negative_squared_cross_mark: **Previous notes on aggregator - currently on hold**
Messages received via the `/dac/send_message` endpoint are handled by the `Message_aggregator` module, which builds from the current `Dac_pages_encoding` module (see [here](https://gitlab.com/tezos/tezos/-/blob/master/src/proto_alpha/lib_dal/dac_pages_encoding.ml)) and reuses the `Merkle tree` pagination scheme discussed in [Phase 1](https://hackmd.io/PBNbp7gfRbevwdnCjMlmng#Phase-1-Single-rollup-node). The message aggregator is responsible for adding messages to a DAC payload. It will also interact with a `Page storage manager` to request to store pages once they are full (i.e. they have reached the page limit of 4096 bytes).
The DAL node in coordinator mode keeps track of new L1 heads. Every time a new head is received, a request to finalise the current message to the message aggregator is sent. This will trigger the latter to store any incomplete page and produce the final root page hash of the payload.
Final root page hashes are then streamed via a communication channel to DAC nodes. We give an example of how the aggregation of new messages [here](https://hackmd.io/PBNbp7gfRbevwdnCjMlmng#Inner-working-of-the-message-aggregator).
The message aggregator has some similarity with the [batcher in the rollup node](https://gitlab.com/tezos/tezos/-/blob/master/src/proto_alpha/bin_sc_rollup_node/batcher.ml).
To give context, the batcher receives messages to be injected into the L1 as external messages, and batches them into a sequence of `Sc_rollup_add_messages` manager operations every time the rollup node processes a new head, and forwards them into a new head.
Both the batcher and the message aggregator need to react to the same set of events:
* Receiving messages
* Receiving a request to batch messages
* Receiving a request that a new head is being processed,
However, the way in which the batcher and the message aggregator react to these events is considerably different.
:::
#### ~~Inner working of the message aggregator~~
:::danger
**This section is deprecated as it describes the inner working of the message aggregator, which has been removed in the current design. However, we retain the stack-based implementation described here to produce pages and root page hashes, from a payload.**
The message aggregator keeps track of a stack of pages in memory. Each page can be serialized to a sequence of bytes of at most `4_096` bytes. Following the structure given in `Dac_pages_encoding`, we distinguish between `Content pages` and `Hashes pages`. Content pages contain messages sent by users, each prefixed with the length of the message in bytes. Hashes pages contain a list of hashes of other pages.
From the point of view of a user of the message aggregator, the top page of the stack kept by the message aggregator is always a (possibly empty) `Content page`. Pages not at the top of the stack will always be `Hashes pages`.
When receiving a new message, the message aggregator will perform the following operations:
1. It prepends 4 bytes denoting the message length to the message,
2. It checks whether the message (with the 4 bytes of size length) can be added to the `Content page` at the top of the stack, that is if adding the message to the `Content page` does not exceed the maximum page size of `4_096` bytes,
3. If the message fits into the page, then the page at the top of the stack is updated accordingly and the procedure terminates
4. Otherwise, the page is popped from the stack. The hash of the page is computed, and a request to store the page using the b58 encoding of the hash as its key is forwarded to the `Page storage manager`.
5. Steps 2 to 6 are repeated, using the hash computed in Step 4 instead of the serialized message, and using `Hashes page` instead of `Content pages` (since now the top of the stack will be a content page).
6. A new page containing the (serialized) message is pushed on top of the stack.
When receiving a notification to batch messages, the message aggregator will start emptying the stack. In doing so, it will
1. pop the last page from the stack, compute it hashes and ask the `Page storage manager` to save the page by using the page hash as its key,
2. If the stack is not empty, add the hash to the page on top of the stack, using the procedure described above. Then repeat the procedure from step 1.
3. If the stack is empty, add it to the stream of messages that DAC members will consume, add an empty `Content page` on top of the stack (to restore the stack invariant), and terminate the procedure.
In practice, this stack-based implementation of the message aggregator produces a payload paginated according to the `Merkle tree` pagination scheme described in [Phase 1](https://hackmd.io/PBNbp7gfRbevwdnCjMlmng#Phase-1-Single-rollup-node). Each hash streamed to DAC members is the `root page hash` of one such payload.
:::
#### The data streamer
The data streamer is responsible to provide root page hashes and expiration levels to committee members. To this end, root page hashes notified by the message aggregator will be added to a `Lwt_stream.t`, and a streamed service that returns the next element of the stream can be defined. This follows a pattern already used in the codebase, for example [when streaming new slot headers in the Dal node](https://gitlab.com/tezos/tezos/-/blob/master/src/bin_dal_node/RPC_server_legacy.ml#L85). A function to perform the streamed call can be provided to the users.
The proposed interface for the Data_streamer is as follows:
```ocaml=
module Root_hash_streamer : sig
type t
(* Streamer configuration. *)
type configuration
(** Initializes a [Root_hash_streamer.t] *)
val init : configuration -> t
(** [publish streamer root_hash] publishes a [root_hash] to all attached
subscribers in [streamer].
*)
val publish : t -> (Dac_hash.t * int32) -> unit tzresult Lwt.t
(** [make_subscription streamer] returns a new stream of hashes for the subscriber to
consume. An [Lwt_watcher.stopper] function is also returned for the
subscriber to close the stream.
*)
val make_subscription :
t -> (Dac_hash.t * int32) Lwt_stream.t * Lwt_watcher.stopper) tzresult Lwt.t
end
```
The coordinator will manage a `Data_streamer.t with` streamer. It will also provide a streamed point that will be called by committee members and observers.
Calling the streamed endpoint will trigger the `make_subscription` function, on the coordinator side. On the user side, this will return a Lwt_stream.t and stopper function that monitor the root page hashes that have been streamed.
:::danger
:red_circle: **TODO**: If the coordinator goes down, all the root page hashes that have not been fetched by the consumers of the stream will be lost. In a hardening step,
it could be useful to save on disk a fixed amount of root page hashes that have been streamed. Consumers will also keep track of the last root page hash that they have received.
:::
#### The signature manager
Signatures for a root page hash + expiration level are communicated by dac members via the endpoint `PUT /signature/{root_page_hash}`.
The request body will contain both the signature of the root page hash, as well as the tz4 address of the signee. The signature manager first verifies that the signature is valid. This is to prevent malicious actors to send an incorrect signature: incorrect signatures would cause the signature verification at the kernel side to fail, which in turn would lead to the kernel discarding the whole payload corresponding to the root page hash.
In order to be able to verify that the signature belongs to a member of the DAC committee, the signature manager needs to know the list of the DAC members. We follow the current design and require that these are stored in the DAL node configuration. In the future, we may want to explore new strategies for the DAC node to retrieve the list of the dac members.
Once a signature has been received and verified, it will be stored in the signature storage. This will contain a map from `(root page hash, expiration_level)` to a list of optional values. The value at position `i` of the list corresponds to the signature of the `i`-th dac member, if available. It is `None` otherwise.
When adding a signature, the DAL node checks whether the sufficient amount of signatures for attesting the data as available has been collected for the root page hash. The threshold for this value is also stored in the DAC node configuration. In this case, the signature manager computes the aggregate signature, which in turn combined with the root page hash to produce the external message that would be injected into the L1. This is then posted to an external endpoint which needs to be specified in the DAC node configuration.
The structure of the L1 message follows closely the one that we have in the [current implementaiton of DAC](https://gitlab.com/tezos/tezos/-/blob/master/src/proto_alpha/lib_dal/dac_external_message_manager.ml#L80), with the addition of a `dac_id` field.
```ocaml=
let untagged_encoding =
Data_encoding.(
conv
(function
| Dac_message {root_hash; expiration_level; signature; witnesses} ->
(root_hash, expiration_level, signature, witnesses))
(fun (root_hash, expiration_level, signature, witnesses) ->
Dac_message {root_hash; expiration_level; signature; witnesses})
(obj5
(req "dac_id") Dac.b58_encoding)
(req "root_hash" Hashing_scheme.encoding)
(req "expiration_level" int32)
(req "signature" Tezos_crypto.Aggregate_signature.encoding)
(req "witnesses" Bitset.encoding)))
let dac_message_encoding =
Data_encoding.(
union
~tag_size:`Uint8
[
case
~title:("dac_message_" ^ Encoding_metadata.title)
(Tag Encoding_metadata.tag)
untagged_encoding
(fun msg -> Some msg)
(fun msg -> msg);
])
```
Here `Dac_id.b58_encoding` is undefined, but it might be thought as an encoding that produces a base 58 string prefixed with `dac1`. In practice, a Dac_id can be a sequence of a fixed amount of bytes. The Dac_id is part of the configuration of the dal node.
The presence of the DAC id is needed so that a kernel can easily identify whether it must process an external message
The witnesses field in the encoding above is a `Bitset.t` that keeps track of the DAC members that signed the message. If `i` belongs to the bitset, then the `i`-th dac member has signed the message. This is needed by the kernel to compute the set of DAC member public keys that will be needed to verify the signature.
#### The injector.
:::danger
**Deprecated: A component external to the DAC will be responsible to inject messages into the L1**
The injector follows closely the one implemented int the rollup nodes. To account for possible lags in retrieving the signatures of a root page hash, we allow some flexibility and allow for more than 1 `Sc_rollup_add_messages` operation per level.
:::
### The Committee member
Committee members are relatively simpler than the DAC coordinator.
They will make use of the Data_streamer to discover root page hashes advertised by the DAC coordinator. Once a new root page hash is received, the DAC member will download the data from the DAC coordinator. This amounts to using the [deserialization logic](https://gitlab.com/tezos/tezos/-/blob/master/src/proto_alpha/lib_dal/dac_pages_encoding.ml#L346) already provided in the `Dac_pages_encoding` module, using calls to the `GET /page/{page_hash}` to retrieve pages from the DAC coordinator. Every time a new page is received in this way, its hash is computed and checked against the `{page_hash}` that was requested. If a mismatch in the hash is found, the dowload process terminates and the DAC member does not sign the root page hash with the associated expiration level.
If the DAC payload manages to download and verify the hash of all pages, it procedes to sign the pair `(root page hash, expiration level)` and communicate the signature to the dac node, by calling the `dac/signatures/{root_page_hash}` endpoint of the latter.
Similarly to the DAC coordinator, the DAC member also provides the `GET /dac/page/{get_page_hash}` endpoint, which observers (described next) can use to retrieve missing pages, when requested by the rollup node.
### The Observer
The observer lives in the same host where the rollup node is executing. Each rollup node should be equipped with an observer as a side-car. The role of the rollup node is that of downloading pages from the DAC coordinator and provide save them into its own pages storage, which is also accessible by the rollup node. This follows closely the current design of the DAC (see `Reveals` module in the rollup node [here](https://gitlab.com/tezos/tezos/-/blob/master/src/proto_alpha/bin_sc_rollup_node/reveals.ml#L83)).
The observe is also equipped with an endpoint `HEAD /dac/notify_missing_page/{page_hash}` which immediately returns a `200` response. The observer will then broadcast a request to all the dac members (meaning that it will need to keep a list of the endpoints for such dac members, in its configuration) to retrieve the page. Since the page is meant to be reachable from a root page hash that has been signed by a sufficient amount of DAC members, then one of such requests should be successful.
### The rollup node:
As we mentioned above, the endpoint `/dac/notify_missing_page/{page_hash}` is asynchronous.
This endpoint is meant to be used by the rollup node when the PVM requests a page that the rollup node does not have in its own page storage.
The reason for this endpoint to be asynchronous is that the observer living in the same host of the rollup node may take some time, before retreiving the data into the rollup node. However, the rollup node must be able to continue operating while waiting for the observer to retrieve the page contents. For example, it may still need to serve RPC requests or to play refutation games.