diff --git a/etherlink/CHANGES_KERNEL.md b/etherlink/CHANGES_KERNEL.md index e67dcae61d6e9efa12152d1838c7474356407f9f..105a40900ea9159571bf92d9c2ae7eb6d52d5919 100644 --- a/etherlink/CHANGES_KERNEL.md +++ b/etherlink/CHANGES_KERNEL.md @@ -10,6 +10,21 @@ ### Internal +## Version d517020b58afef0e15c768ee0b5acbda1786cdd8 + +### Features + +- Limit the size of blueprints to 128 chunks. (!12631) + +### Bug fixes + +### Breaking changes + +### Internal + +- Improve tick model of the delayed inbox. (!12524) +- Update tick model. (!12631) + ## Version 0a81ce76b3d4f57d8c5194bcb9418f9294fd2be1 ### Features diff --git a/etherlink/CHANGES_NODE.md b/etherlink/CHANGES_NODE.md index 6f1647dd9734ed75838c9185cf1095d897b009fb..57967cff07bf9fdf9f6f7b3c3d3dec92cb32d7fb 100644 --- a/etherlink/CHANGES_NODE.md +++ b/etherlink/CHANGES_NODE.md @@ -10,6 +10,29 @@ ### Internal +## Version for d517020b58afef0e15c768ee0b5acbda1786cdd8 + +### Features + +- Observers now follows a rollup node. (!12547) +- Wait for rollup node to reconnect if it's not available. (!12561) +- Filter out irrelevant events in observer mode. (!12607) +- Forward delayed transactions to observer nodes. (!12606) +- Limit the size of blueprints. (!12666) + +### Bug fixes + +- Store last known level even if there are no events. (!12590) + +### Breaking changes + +### Internal + +- Improve internal storage handling. (!12551,!12516, !12572, !12627) +- Merge publishable and executable blueprints. (!12571) +- Store delayed transactions in the EVM Context. (!12605) +- Reduce verbosity of events. (!12622) + ## Version for 0a81ce76b3d4f57d8c5194bcb9418f9294fd2be1 ### Features diff --git a/etherlink/bin_node/lib_prod/block_producer.ml b/etherlink/bin_node/lib_prod/block_producer.ml index 578ceac4509141afb439845c216ed2798003cce5..7f84cb7f81f16d1a71b5460f4846c0b6f758064c 100644 --- a/etherlink/bin_node/lib_prod/block_producer.ml +++ b/etherlink/bin_node/lib_prod/block_producer.ml @@ -6,12 +6,57 @@ (*****************************************************************************) type parameters = { - ctxt : Evm_context.t; cctxt : Client_context.wallet; smart_rollup_address : string; sequencer_key : Client_keys.sk_uri; + maximum_number_of_chunks : int; } +(* The size of a delayed transaction is overapproximated to the maximum size + of an inbox message, as chunks are not supported in the delayed bridge. *) +let maximum_delayed_transaction_size = 4096 + +(* + The legacy transactions are as follows: + ----------------------------- + | Nonce | Up to 32 bytes | + ----------------------------- + | GasPrice | Up to 32 bytes | + ----------------------------- + | GasLimit | Up to 32 bytes | + ----------------------------- + | To | 20 bytes addr | + ----------------------------- + | Value | Up to 32 bytes | + ----------------------------- + | Data | 0 - unlimited | + ----------------------------- + | V | 1 (usually) | + ----------------------------- + | R | 32 bytes | + ----------------------------- + | S | 32 bytes | + ----------------------------- + + where `up to` start at 0, and encoded as the empty byte for the 0 value + according to RLP specification. +*) +let minimum_ethereum_transaction_size = + Rlp.( + List + [ + Value Bytes.empty; + Value Bytes.empty; + Value Bytes.empty; + Value (Bytes.make 20 '\000'); + Value Bytes.empty; + Value Bytes.empty; + Value Bytes.empty; + Value (Bytes.make 32 '\000'); + Value (Bytes.make 32 '\000'); + ] + |> encode |> Bytes.length) + module Types = struct type nonrec parameters = parameters @@ -65,9 +110,6 @@ type worker = Worker.infinite Worker.queue Worker.t let get_hashes ~transactions ~delayed_transactions = let open Result_syntax in - let delayed_hashes = - List.map Ethereum_types.Delayed_transaction.hash delayed_transactions - in let hashes = List.map (fun transaction -> @@ -76,49 +118,79 @@ let get_hashes ~transactions ~delayed_transactions = Hash Hex.(of_string tx_hash_str |> show |> hex_of_string))) transactions in - return (delayed_hashes @ hashes) + return (delayed_transactions @ hashes) + +let take_delayed_transactions maximum_number_of_chunks = + let open Lwt_result_syntax in + let maximum_cumulative_size = + Sequencer_blueprint.maximum_usable_space_in_blueprint + maximum_number_of_chunks + in + let maximum_delayed_transactions = + maximum_cumulative_size / maximum_delayed_transaction_size + in + let* delayed_transactions = Evm_context.delayed_inbox_hashes () in + let delayed_transactions = + List.take_n maximum_delayed_transactions delayed_transactions + in + let remaining_cumulative_size = + maximum_cumulative_size - (List.length delayed_transactions * 4096) + in + return (delayed_transactions, remaining_cumulative_size) -let produce_block ~(ctxt : Evm_context.t) ~cctxt ~smart_rollup_address - ~sequencer_key ~force ~timestamp = +let produce_block ~cctxt ~smart_rollup_address ~sequencer_key ~force ~timestamp + ~maximum_number_of_chunks = let open Lwt_result_syntax in - let* tx_pool_response = Tx_pool.pop_transactions () in - match tx_pool_response with - | Transactions (transactions, delayed_transactions) -> - let n = List.length transactions + List.length delayed_transactions in - if force || n > 0 then + let* is_locked = Tx_pool.is_locked () in + if is_locked then + let*! () = Block_producer_events.production_locked () in + return 0 + else + let* delayed_transactions, remaining_cumulative_size = + take_delayed_transactions maximum_number_of_chunks + in + let* transactions = + (* Low key optimization to avoid even checking the txpool if there is not + enough space for the smallest transaction. *) + if remaining_cumulative_size <= minimum_ethereum_transaction_size then + return [] + else + Tx_pool.pop_transactions + ~maximum_cumulative_size:remaining_cumulative_size + in + let n = List.length transactions + List.length delayed_transactions in + if force || n > 0 then + let*! head_info = Evm_context.head_info () in + Helpers.with_timing + (Blueprint_events.blueprint_production head_info.next_blueprint_number) + @@ fun () -> + let*? hashes = get_hashes ~transactions ~delayed_transactions in + let* blueprint = Helpers.with_timing - (Blueprint_events.blueprint_production - ctxt.session.next_blueprint_number) + (Blueprint_events.blueprint_proposal head_info.next_blueprint_number) @@ fun () -> - let*? hashes = get_hashes ~transactions ~delayed_transactions in - let* blueprint = - Helpers.with_timing - (Blueprint_events.blueprint_proposal - ctxt.session.next_blueprint_number) - @@ fun () -> - Sequencer_blueprint.create - ~sequencer_key - ~cctxt - ~timestamp - ~smart_rollup_address - ~transactions - ~delayed_transactions - ~parent_hash:ctxt.Evm_context.session.current_block_hash - ~number:ctxt.Evm_context.session.next_blueprint_number - in - let* _ctxt = - Evm_context.apply_and_publish_blueprint ctxt timestamp blueprint - in - let*! () = - List.iter_p - (fun hash -> Block_producer_events.transaction_selected ~hash) - hashes - in - return n - else return 0 - | Locked -> - let*! () = Block_producer_events.production_locked () in - return 0 + Sequencer_blueprint.create + ~sequencer_key + ~cctxt + ~timestamp + ~smart_rollup_address + ~transactions + ~delayed_transactions + ~parent_hash:head_info.current_block_hash + ~number:head_info.next_blueprint_number + in + let* () = + Evm_context.apply_blueprint timestamp blueprint delayed_transactions + in + let (Qty number) = head_info.next_blueprint_number in + let* () = Blueprints_publisher.publish number blueprint in + let*! () = + List.iter_p + (fun hash -> Block_producer_events.transaction_selected ~hash) + hashes + in + return n + else return 0 module Handlers = struct type self = worker @@ -132,14 +204,21 @@ module Handlers = struct match request with | Request.Produce_block (timestamp, force) -> protect @@ fun () -> - let {ctxt; cctxt; smart_rollup_address; sequencer_key} = state in + let { + cctxt; + smart_rollup_address; + sequencer_key; + maximum_number_of_chunks; + } = + state + in produce_block - ~ctxt ~cctxt ~smart_rollup_address ~sequencer_key ~force ~timestamp + ~maximum_number_of_chunks type launch_error = error trace diff --git a/etherlink/bin_node/lib_prod/block_producer.mli b/etherlink/bin_node/lib_prod/block_producer.mli index d68633baa1b4d67add827649d1353a120e71bfc1..457eaf7add09c664acfe25a80fe6c699594fb99b 100644 --- a/etherlink/bin_node/lib_prod/block_producer.mli +++ b/etherlink/bin_node/lib_prod/block_producer.mli @@ -6,10 +6,10 @@ (*****************************************************************************) type parameters = { - ctxt : Evm_context.t; cctxt : Client_context.wallet; smart_rollup_address : string; sequencer_key : Client_keys.sk_uri; + maximum_number_of_chunks : int; } (** [start parameters] starts the events follower. *) diff --git a/etherlink/bin_node/lib_prod/blueprint_events.ml b/etherlink/bin_node/lib_prod/blueprint_events.ml index 799efc3c9eb7a080e17220985b4f4e9668ebe481..cc55d432803e9f1cdffa8f51ef0efc293d8f7a98 100644 --- a/etherlink/bin_node/lib_prod/blueprint_events.ml +++ b/etherlink/bin_node/lib_prod/blueprint_events.ml @@ -89,13 +89,15 @@ let invalid_blueprint = ~level:Error ("level", Data_encoding.n) -let missing_blueprint = - declare_1 +let missing_blueprints = + declare_3 ~section - ~name:"blueprint_blueprint" - ~msg:"Could not fetch the blueprint for level {level}" + ~name:"missing_blueprints" + ~msg:"Store is missing {count} blueprints in the range [{from}; {to_}]" ~level:Error - ("level", Data_encoding.n) + ("count", Data_encoding.int31) + ("from", Data_encoding.n) + ("to_", Data_encoding.n) let publisher_is_ready () = emit publisher_ready () @@ -111,7 +113,9 @@ let invalid_blueprint_produced level = emit invalid_blueprint level let catching_up min max = emit blueprint_catchup (min, max) -let missing_blueprint level = emit missing_blueprint level +let missing_blueprints count Ethereum_types.(Qty from) Ethereum_types.(Qty to_) + = + emit missing_blueprints (count, from, to_) let blueprint_proposal Ethereum_types.(Qty level) time = emit blueprint_proposal (level, time) diff --git a/etherlink/bin_node/lib_prod/blueprint_events.mli b/etherlink/bin_node/lib_prod/blueprint_events.mli index 99a74459607320bf765699ac3849e165520fbae8..d5b73e82df800fcd923948af99dd179ddce583f9 100644 --- a/etherlink/bin_node/lib_prod/blueprint_events.mli +++ b/etherlink/bin_node/lib_prod/blueprint_events.mli @@ -34,10 +34,11 @@ val blueprint_injection_failed : Z.t -> unit Lwt.t Ethereum block. *) val invalid_blueprint_produced : Z.t -> unit Lwt.t -(** [missing_blueprint level] advertizes that a sequencer has tried to fetch - the blueprint for level [level] from its store and failed. This means the +(** [missing_blueprints count from to_] advertizes that a sequencer has detect + it is missing [count] blueprints in the provided range. This means the sequencer store is inconsistent. *) -val missing_blueprint : Z.t -> unit Lwt.t +val missing_blueprints : + int -> Ethereum_types.quantity -> Ethereum_types.quantity -> unit Lwt.t (** [catching_up min max] advertizes that the sequencer is reinjecting blueprints from level [min] to [max] because the rollup node is lagging. *) diff --git a/etherlink/bin_node/lib_prod/blueprint_types.ml b/etherlink/bin_node/lib_prod/blueprint_types.ml index a7c8037ec01bafd5c7801975a07fbb46a9fdc2cd..c38dee573a98c9a0eb645f6ebd35dc17d4ded498 100644 --- a/etherlink/bin_node/lib_prod/blueprint_types.ml +++ b/etherlink/bin_node/lib_prod/blueprint_types.ml @@ -13,6 +13,11 @@ type t = { payload : payload; } +type with_events = { + delayed_transactions : Ethereum_types.Delayed_transaction.t list; + blueprint : t; +} + let payload_encoding = let open Data_encoding in list @@ -30,3 +35,14 @@ let encoding = (req "number" n) (req "timestamp" Time.Protocol.encoding) (req "payload" payload_encoding)) + +let with_events_encoding = + let open Data_encoding in + conv + (fun {delayed_transactions; blueprint} -> (delayed_transactions, blueprint)) + (fun (delayed_transactions, blueprint) -> {delayed_transactions; blueprint}) + (obj2 + (req + "delayed_transactions" + (list Ethereum_types.Delayed_transaction.encoding)) + (req "blueprint" encoding)) diff --git a/etherlink/bin_node/lib_prod/blueprint_types.mli b/etherlink/bin_node/lib_prod/blueprint_types.mli index b54d9cd9e465c8401666f4fbf2f3fe42a8517119..0500e328a4228aa94be2a938779174c4345924f8 100644 --- a/etherlink/bin_node/lib_prod/blueprint_types.mli +++ b/etherlink/bin_node/lib_prod/blueprint_types.mli @@ -14,6 +14,15 @@ type t = { payload : payload; } +(** Blueprint with events contains: *) +type with_events = { + delayed_transactions : Ethereum_types.Delayed_transaction.t list; + (** The delayed transactions to apply before applying the blueprint. *) + blueprint : t; (** The blueprint to execute. *) +} + val encoding : t Data_encoding.t val payload_encoding : payload Data_encoding.t + +val with_events_encoding : with_events Data_encoding.t diff --git a/etherlink/bin_node/lib_prod/blueprints_publisher.ml b/etherlink/bin_node/lib_prod/blueprints_publisher.ml index a3e7bffe41493c0dc6446d09cdf3c99c677f0faf..b055121eb48581e8dd7e0e66ad3c3ea1c142853c 100644 --- a/etherlink/bin_node/lib_prod/blueprints_publisher.ml +++ b/etherlink/bin_node/lib_prod/blueprints_publisher.ml @@ -7,7 +7,6 @@ type parameters = { rollup_node_endpoint : Uri.t; - store : Store.t; max_blueprints_lag : int; max_blueprints_ahead : int; max_blueprints_catchup : int; @@ -16,7 +15,6 @@ type parameters = { } type state = { - store : Store.t; rollup_node_endpoint : Uri.t; max_blueprints_lag : Z.t; max_blueprints_ahead : Z.t; @@ -68,8 +66,6 @@ module Worker = struct let set_latest_level_confirmed worker level = (state worker).latest_level_confirmed <- level - let blueprint_store worker = (state worker).store - let max_blueprints_lag worker = (state worker).max_blueprints_lag let max_level_ahead worker = (state worker).max_blueprints_ahead @@ -132,27 +128,29 @@ module Worker = struct let*! () = Blueprint_events.catching_up lower_bound upper_bound in - let rec catching_up curr = - if Z.Compare.(curr <= upper_bound) then - let* payload = - Store.Publishable_blueprints.find (blueprint_store worker) (Qty curr) - in - match payload with - | Some payload -> - let* () = publish worker payload curr in - catching_up Z.(succ curr) - | None -> - let*! () = Blueprint_events.missing_blueprint curr in - Stdlib.failwith - Format.( - asprintf - "Blueprint for level %a missing from the store" - Z.pp_print - curr) - else return_unit + let* blueprints = + Evm_context.blueprints_range (Qty lower_bound) (Qty upper_bound) + in + + let expected_count = Z.(to_int (sub upper_bound lower_bound)) + 1 in + let actual_count = List.length blueprints in + let* () = + when_ (actual_count < expected_count) (fun () -> + let*! () = + Blueprint_events.missing_blueprints + (expected_count - actual_count) + (Qty lower_bound) + (Qty upper_bound) + in + return_unit) in - let* () = catching_up lower_bound in + let* () = + List.iter_es + (fun (Ethereum_types.Qty current, payload) -> + publish worker payload current) + blueprints + in (* We give ourselves a cooldown window Tezos blocks to inject everything *) set_cooldown worker (catchup_cooldown worker) ; @@ -171,7 +169,6 @@ module Handlers = struct let on_launch _self () ({ rollup_node_endpoint; - store; max_blueprints_lag; max_blueprints_ahead; max_blueprints_catchup; @@ -182,7 +179,6 @@ module Handlers = struct let open Lwt_result_syntax in return { - store; latest_level_confirmed = (* Will be set at the correct value once the next L2 block is received from the rollup node *) @@ -239,7 +235,7 @@ let table = Worker.create_table Queue let worker_promise, worker_waker = Lwt.task () let start ~rollup_node_endpoint ~max_blueprints_lag ~max_blueprints_ahead - ~max_blueprints_catchup ~catchup_cooldown ~latest_level_seen store = + ~max_blueprints_catchup ~catchup_cooldown ~latest_level_seen () = let open Lwt_result_syntax in let* worker = Worker.launch @@ -247,7 +243,6 @@ let start ~rollup_node_endpoint ~max_blueprints_lag ~max_blueprints_ahead () { rollup_node_endpoint; - store; max_blueprints_lag; max_blueprints_ahead; max_blueprints_catchup; @@ -266,12 +261,22 @@ let worker = lazy (match Lwt.state worker_promise with | Lwt.Return worker -> Ok worker - | Lwt.Fail e -> Error (TzTrace.make @@ error_of_exn e) - | Lwt.Sleep -> Error (TzTrace.make No_worker)) + | Lwt.Fail e -> Result_syntax.tzfail (error_of_exn e) + | Lwt.Sleep -> Result_syntax.tzfail No_worker) + +let bind_worker f = + let open Lwt_result_syntax in + let res = Lazy.force worker in + match res with + | Error [No_worker] -> + (* There is no worker, nothing to do *) + return_unit + | Error errs -> fail errs + | Ok w -> f w let worker_add_request ~request = let open Lwt_result_syntax in - let*? w = Lazy.force worker in + bind_worker @@ fun w -> let*! (_pushed : bool) = Worker.Queue.push_request w request in return_unit @@ -282,11 +287,8 @@ let new_l2_head rollup_head = worker_add_request ~request:(New_l2_head {rollup_head}) let shutdown () = - let open Lwt_syntax in - match Lazy.force worker with - | Error _ -> - (* There is no publisher, nothing to do *) - Lwt.return_unit - | Ok w -> - let* () = Blueprint_events.publisher_shutdown () in - Worker.shutdown w + let open Lwt_result_syntax in + bind_worker @@ fun w -> + let*! () = Blueprint_events.publisher_shutdown () in + let*! () = Worker.shutdown w in + return_unit diff --git a/etherlink/bin_node/lib_prod/blueprints_publisher.mli b/etherlink/bin_node/lib_prod/blueprints_publisher.mli index 68eea3ebd6e35cf75a257dc7c1b46b2754e9d467..8441ed5f0e65cded33c2bbc60263a7abc196a9e5 100644 --- a/etherlink/bin_node/lib_prod/blueprints_publisher.mli +++ b/etherlink/bin_node/lib_prod/blueprints_publisher.mli @@ -12,10 +12,10 @@ val start : max_blueprints_catchup:int -> catchup_cooldown:int -> latest_level_seen:Z.t -> - Store.t -> + unit -> unit tzresult Lwt.t -val shutdown : unit -> unit Lwt.t +val shutdown : unit -> unit tzresult Lwt.t (** [publish level payload] sends a request to the publisher worker to forward the chunked blueprint [payload] for level [level] to the diff --git a/etherlink/bin_node/lib_prod/delayed_inbox.ml b/etherlink/bin_node/lib_prod/delayed_inbox.ml deleted file mode 100644 index 7d90e4f3782067726875457fa666aa57344aaf99..0000000000000000000000000000000000000000 --- a/etherlink/bin_node/lib_prod/delayed_inbox.ml +++ /dev/null @@ -1,260 +0,0 @@ -(*****************************************************************************) -(* *) -(* SPDX-License-Identifier: MIT *) -(* Copyright (c) 2024 Nomadic Labs *) -(* Copyright (c) 2024 Marigold *) -(* *) -(*****************************************************************************) - -type parameters = {rollup_node_endpoint : Uri.t; delayed_inbox_interval : int} - -module StringSet = Set.Make (String) - -module Types = struct - type state = { - rollup_node_endpoint : Uri.t; - delayed_inbox_interval : int; - mutable pending_transactions : StringSet.t; - } - - type nonrec parameters = parameters -end - -module Name = struct - (* We only have a single delayed inbox in the evm node *) - type t = unit - - let encoding = Data_encoding.unit - - let base = ["evm_node"; "delayed-inbox"; "prod"; "worker"] - - let pp _ _ = () - - let equal () () = true -end - -module Request = struct - type ('a, 'b) t = New_rollup_node_block : Int32.t -> (unit, error trace) t - - type view = View : _ t -> view - - let view (req : _ t) = View req - - let encoding = - let open Data_encoding in - union - [ - case - (Tag 0) - ~title:"New_rollup_node_block" - (obj2 - (req "request" (constant "new_rollup_node_block")) - (req "rollup_head" int32)) - (function - | View (New_rollup_node_block rollup_head) -> Some ((), rollup_head)) - (fun ((), rollup_head) -> View (New_rollup_node_block rollup_head)); - ] - - let pp ppf (View r) = - match r with - | New_rollup_node_block rollup_head -> - Format.fprintf ppf "New_rollup_node_block (level %ld)" rollup_head -end - -module Worker = Worker.MakeSingle (Name) (Request) (Types) - -type worker = Worker.infinite Worker.queue Worker.t - -let subkeys_from_rollup_node path level rollup_node_endpoint = - let open Rollup_services in - call_service - ~base:rollup_node_endpoint - durable_state_subkeys - ((), Block_id.Level level) - {key = path} - () - -let read_from_rollup_node path level rollup_node_endpoint = - let open Rollup_services in - call_service - ~base:rollup_node_endpoint - durable_state_value - ((), Block_id.Level level) - {key = path} - () - -let delayed_transaction_hashes level rollup_node_endpoint = - let open Lwt_result_syntax in - let* keys = - subkeys_from_rollup_node - Durable_storage_path.Delayed_transaction.hashes - level - rollup_node_endpoint - in - match keys with - | None -> return [] - | Some keys -> - let hashes = - (* Remove the empty, meta keys *) - List.filter (fun key -> not (key = "" || key = "meta")) keys - in - return hashes - -let fetch_delayed_inbox_hashes ~level ~rollup_node_endpoint = - delayed_transaction_hashes level rollup_node_endpoint - -let fetch_delayed_transactions ~hashes ~level ~rollup_node_endpoint = - let open Lwt_syntax in - List.filter_map_p - (fun key -> - let hash = Ethereum_types.hash_of_string key in - let path = Durable_storage_path.Delayed_transaction.transaction hash in - let* bytes = read_from_rollup_node path level rollup_node_endpoint in - match bytes with - | Ok (Some bytes) -> - return (Ethereum_types.Delayed_transaction.of_bytes hash bytes) - | _ -> - let* () = - Delayed_inbox_events.transaction_fetch_failed ~tx_hash:hash ~level - in - return_none) - hashes - -let include_delayed_transaction delayed_transaction = - let open Lwt_syntax in - let* () = Delayed_inbox_events.add_transaction ~delayed_transaction in - let* _sent = Tx_pool.add_delayed delayed_transaction in - return_unit - -let on_new_head - ({delayed_inbox_interval; rollup_node_endpoint; _} as state : Types.state) - level = - let open Lwt_syntax in - (* Fetch the delayed inbox with the given interval *) - if Int32.(rem level (of_int delayed_inbox_interval) <> 0l) then return_unit - else - (* Hashes in the delayed inbox *) - let* delayed_transaction_hashes = - fetch_delayed_inbox_hashes ~level ~rollup_node_endpoint - in - let* () = - match delayed_transaction_hashes with - | Error _err -> Delayed_inbox_events.fetch_failed ~level - | Ok delayed_transaction_hashes -> - (* Compute new hashes to avoid fetching transactions we already - know about. *) - let new_transaction_hashes = - List.filter - (fun tx_hash -> - not (StringSet.mem tx_hash state.pending_transactions)) - delayed_transaction_hashes - in - let* () = - Delayed_inbox_events.fetch_succeeded - ~level - ~nb_txs:(List.length new_transaction_hashes) - in - (* The new pending set is just the fetched delayed inbox. - It's important to drop the transactions from the pending - set that are no longer present in the delayed inbox. - On the contrary, a transaction that is sent twice to the - delayed inbox at different levels would only be added to the - tx-pool once. - *) - let pending = StringSet.of_list delayed_transaction_hashes in - (* Fetch transactions for new hashes *) - let* new_delayed_transactions = - fetch_delayed_transactions - ~hashes:new_transaction_hashes - ~level - ~rollup_node_endpoint - in - state.pending_transactions <- pending ; - (* Add new transactions to the tx-pool *) - let* () = - List.iter_p include_delayed_transaction new_delayed_transactions - in - return_unit - in - return_unit - -module Handlers = struct - type self = worker - - let on_request : - type r request_error. - worker -> (r, request_error) Request.t -> (r, request_error) result Lwt.t - = - fun worker request -> - let open Lwt_result_syntax in - match request with - | Request.New_rollup_node_block rollup_block_lvl -> - protect @@ fun () -> - let*! () = on_new_head (Worker.state worker) rollup_block_lvl in - return_unit - - type launch_error = error trace - - let on_launch _w () - ({rollup_node_endpoint; delayed_inbox_interval} : Types.parameters) = - let state = - Types. - { - rollup_node_endpoint; - delayed_inbox_interval; - pending_transactions = StringSet.empty; - } - in - Lwt_result_syntax.return state - - let on_error (type a b) _w _st (_r : (a, b) Request.t) (_errs : b) : - unit tzresult Lwt.t = - Lwt_result_syntax.return_unit - - let on_completion _ _ _ _ = Lwt.return_unit - - let on_no_request _ = Lwt.return_unit - - let on_close _ = Lwt.return_unit -end - -let table = Worker.create_table Queue - -let worker_promise, worker_waker = Lwt.task () - -type error += No_delayed_inbox - -let worker = - lazy - (match Lwt.state worker_promise with - | Lwt.Return worker -> Ok worker - | Lwt.Fail e -> Error (TzTrace.make @@ error_of_exn e) - | Lwt.Sleep -> Error (TzTrace.make No_delayed_inbox)) - -let start parameters = - let open Lwt_result_syntax in - let*! () = Delayed_inbox_events.started () in - let+ worker = Worker.launch table () parameters (module Handlers) in - Lwt.wakeup worker_waker worker - -let shutdown () = - let open Lwt_syntax in - let w = Lazy.force worker in - match w with - | Error _ -> - (* There is no delayed inbox, nothing to do *) - Lwt.return_unit - | Ok w -> - let* () = Delayed_inbox_events.shutdown () in - Worker.shutdown w - -let worker_add_request ~request : unit tzresult Lwt.t = - let open Lwt_result_syntax in - match Lazy.force worker with - | Ok w -> - let*! (_pushed : bool) = Worker.Queue.push_request w request in - return_unit - | Error e -> Lwt.return (Error e) - -let new_rollup_block rollup_level = - worker_add_request ~request:(New_rollup_node_block rollup_level) diff --git a/etherlink/bin_node/lib_prod/delayed_inbox.mli b/etherlink/bin_node/lib_prod/delayed_inbox.mli deleted file mode 100644 index c8726730c2b5f196a35e8e9b74e247b2618dd481..0000000000000000000000000000000000000000 --- a/etherlink/bin_node/lib_prod/delayed_inbox.mli +++ /dev/null @@ -1,26 +0,0 @@ -(*****************************************************************************) -(* *) -(* SPDX-License-Identifier: MIT *) -(* Copyright (c) 2024 Nomadic Labs *) -(* Copyright (c) 2024 Marigold *) -(* *) -(*****************************************************************************) - -type parameters = { - rollup_node_endpoint : Uri.t; - (** Rollup node endpoint used to monitor the delayed inbox. *) - delayed_inbox_interval : int; - (** Number of levels every which the worker will fetch the - delayed inbox. *) -} - -(** [start parameters] starts the delayed inbox worker. *) -val start : parameters -> unit tzresult Lwt.t - -(** [shutdown ()] stops the delayed inbox worker. *) -val shutdown : unit -> unit Lwt.t - -(** [new_rollup_block rollup_level] tells the worker that a new L2 - head has been published and that the rollup head is now - [rollup_level]. *) -val new_rollup_block : Int32.t -> unit tzresult Lwt.t diff --git a/etherlink/bin_node/lib_prod/delayed_inbox_events.ml b/etherlink/bin_node/lib_prod/delayed_inbox_events.ml deleted file mode 100644 index e0672aea6217e4d7ab824c939e384a3c65cfdf48..0000000000000000000000000000000000000000 --- a/etherlink/bin_node/lib_prod/delayed_inbox_events.ml +++ /dev/null @@ -1,89 +0,0 @@ -(*****************************************************************************) -(* *) -(* SPDX-License-Identifier: MIT *) -(* Copyright (c) 2024 Nomadic Labs *) -(* *) -(*****************************************************************************) - -module Event = struct - open Internal_event.Simple - - let section = Events.section - - let started = - declare_0 - ~section - ~name:"delayed_inbox_started" - ~msg:"Delayed inbox has been started" - ~level:Notice - () - - let add_transaction = - declare_1 - ~section - ~name:"delayed_inbox_add_transaction" - ~msg:"Add delayed transaction {transaction} to the tx-pool" - ~level:Notice - ("transaction", Ethereum_types.Delayed_transaction.encoding) - ~pp1:Ethereum_types.Delayed_transaction.pp_short - - let pp_int32 fmt i = Format.fprintf fmt "%ld" i - - let fetch_succeeded = - declare_2 - ~section - ~name:"delayed_inbox_fetch_succeeded" - ~msg: - "Fetching delayed inbox for level {level} succeeded, {nb} new \ - transactions fetched" - ~level:Notice - ("level", Data_encoding.int32) - ~pp1:pp_int32 - ("nb", Data_encoding.int31) - ~pp2:Format.pp_print_int - - let fetch_failed = - declare_1 - ~section - ~name:"delayed_inbox_fetch_failed" - ~msg:"Fetching delayed inbox for level {level} failed" - ~level:Error - ("level", Data_encoding.int32) - ~pp1:pp_int32 - - let transaction_fetch_failed = - declare_2 - ~section - ~name:"delayed_inbox_transaction_fetch_failed" - ~msg: - "Fetching transaction {tx_hash} from delayed inbox for level {level} \ - failed" - ~level:Error - ("tx_hash", Ethereum_types.hash_encoding) - ~pp1:Ethereum_types.pp_hash - ("level", Data_encoding.int32) - ~pp2:pp_int32 - - let shutdown = - declare_0 - ~section - ~name:"shutting_down_delayed_inbox" - ~msg:"Stopping the delayed inbox" - ~level:Notice - () -end - -let started = Internal_event.Simple.emit Event.started - -let add_transaction ~delayed_transaction = - Internal_event.Simple.emit Event.add_transaction delayed_transaction - -let fetch_succeeded ~level ~nb_txs = - Internal_event.Simple.emit Event.fetch_succeeded (level, nb_txs) - -let fetch_failed ~level = Internal_event.Simple.emit Event.fetch_failed level - -let transaction_fetch_failed ~tx_hash ~level = - Internal_event.Simple.emit Event.transaction_fetch_failed (tx_hash, level) - -let shutdown = Internal_event.Simple.emit Event.shutdown diff --git a/etherlink/bin_node/lib_prod/encodings/ethereum_types.ml b/etherlink/bin_node/lib_prod/encodings/ethereum_types.ml index 68bee98e6e0c8a53ff18fadf279535e9bc716632..62767abe4623e56591301ac2513af0bcb16886bf 100644 --- a/etherlink/bin_node/lib_prod/encodings/ethereum_types.ml +++ b/etherlink/bin_node/lib_prod/encodings/ethereum_types.ml @@ -1220,13 +1220,13 @@ module Delayed_transaction = struct case (Tag 0) ~title:"transaction" - unit + (constant "transaction") (function Transaction -> Some () | _ -> None) (function () -> Transaction); case (Tag 1) ~title:"deposit" - unit + (constant "deposit") (function Deposit -> Some () | _ -> None) (function () -> Deposit); ] @@ -1236,16 +1236,17 @@ module Delayed_transaction = struct conv (fun {kind; hash; raw} -> (kind, hash, raw)) (fun (kind, hash, raw) -> {kind; hash; raw}) - (obj3 - (req "kind" encoding_kind) - (req "hash" hash_encoding) - (req "raw" string)) + (tup3 encoding_kind hash_encoding (string' Hex)) - let of_bytes hash bytes = - match bytes |> Rlp.decode with - | Ok Rlp.(List [List [Value tag; content]; _timestamp; _level]) -> ( + let of_rlp_content hash rlp_content = + match rlp_content with + | Rlp.(List [Value tag; content]) -> ( match (Bytes.to_string tag, content) with - | "\x01", Rlp.Value raw_tx -> + (* This is a bit counter intuitive but what we decode is actually + the TransactionContent, which is Ethereum|Deposit|DelayedTransaction. + Transaction cannot be in the delayed inbox by construction, therefore + we care only about Deposit and DelayedTransaction. *) + | "\x03", Rlp.Value raw_tx -> let hash = raw_tx |> Bytes.to_string |> hash_raw_tx |> Hex.of_string |> Hex.show |> hash_of_string @@ -1257,6 +1258,21 @@ module Delayed_transaction = struct | _ -> None) | _ -> None + let to_rlp {kind; raw; hash} = + let open Rlp in + let tag = + (match kind with Transaction -> "\x03" | Deposit -> "\x02") + |> Bytes.of_string + in + let hash = hash_to_bytes hash |> Bytes.of_string in + let content = + match kind with + | Transaction -> Value (Bytes.of_string raw) + | Deposit -> decode_exn (Bytes.of_string raw) + in + let rlp = List [Value hash; List [Value tag; content]] in + encode rlp + let pp_kind fmt = function | Transaction -> Format.pp_print_string fmt "Transaction" | Deposit -> Format.pp_print_string fmt "Deposit" @@ -1264,7 +1280,8 @@ module Delayed_transaction = struct let pp fmt {raw; kind; _} = Format.fprintf fmt "%a: %a" pp_kind kind Hex.pp (Hex.of_string raw) - let pp_short fmt {hash = Hash (Hex h); _} = Format.pp_print_string fmt h + let pp_short fmt {kind; hash; _} = + Format.fprintf fmt "%a: %a" pp_kind kind pp_hash hash end module Upgrade = struct @@ -1369,6 +1386,7 @@ module Evm_events = struct | Upgrade_event of Upgrade.t | Sequencer_upgrade_event of Sequencer_upgrade.t | Blueprint_applied of Blueprint_applied.t + | New_delayed_transaction of Delayed_transaction.t let of_bytes bytes = match bytes |> Rlp.decode with @@ -1383,6 +1401,15 @@ module Evm_events = struct | "\x03" -> let blueprint_applied = Blueprint_applied.of_rlp rlp_content in Option.map (fun u -> Blueprint_applied u) blueprint_applied + | "\x04" -> ( + match rlp_content with + | List [Value hash; transaction_content] -> + let hash = decode_hash hash in + let transaction = + Delayed_transaction.of_rlp_content hash transaction_content + in + Option.map (fun u -> New_delayed_transaction u) transaction + | _ -> None) | _ -> None) | _ -> None @@ -1412,6 +1439,12 @@ module Evm_events = struct Z.pp_print number hash + | New_delayed_transaction delayed_transaction -> + Format.fprintf + fmt + "New delayed transaction:@ %a" + Delayed_transaction.pp_short + delayed_transaction let encoding = let open Data_encoding in @@ -1444,5 +1477,15 @@ module Evm_events = struct ~event_encoding:Blueprint_applied.encoding ~proj:(function Blueprint_applied info -> Some info | _ -> None) ~inj:(fun info -> Blueprint_applied info); + case + ~kind:"new_delayed_transaction" + ~tag:3 + ~event_encoding:Delayed_transaction.encoding + ~proj:(function + | New_delayed_transaction delayed_transaction -> + Some delayed_transaction + | _ -> None) + ~inj:(fun delayed_transaction -> + New_delayed_transaction delayed_transaction); ] end diff --git a/etherlink/bin_node/lib_prod/events.ml b/etherlink/bin_node/lib_prod/events.ml index 81bdf5ac698459cfd551f2ab3e541fb060a23a56..7ee96e699ddedd4b8e985ab245942fa15a05f387 100644 --- a/etherlink/bin_node/lib_prod/events.ml +++ b/etherlink/bin_node/lib_prod/events.ml @@ -110,6 +110,40 @@ let event_callback_log = ("method", Data_encoding.string) ("body", Data_encoding.string) +type kernel_log_kind = Application | Simulation + +type kernel_log_level = Debug | Info | Error | Fatal + +let kernel_log_kind_to_string = function + | Application -> "application" + | Simulation -> "simulation" + +let event_kernel_log kind level = + Internal_event.Simple.declare_1 + ~section:(section @ ["kernel"; kernel_log_kind_to_string kind]) + ~name: + (Format.sprintf "kernel_log_%s" (Internal_event.Level.to_string level)) + ~msg:"{msg}" + ~pp1:(fun fmt msg -> Format.pp_print_string fmt (String.trim msg)) + ~level + ("msg", Data_encoding.string) + +let event_kernel_log_application_debug = event_kernel_log Application Debug + +let event_kernel_log_simulation_debug = event_kernel_log Simulation Debug + +let event_kernel_log_application_info = event_kernel_log Application Notice + +let event_kernel_log_simulation_info = event_kernel_log Simulation Info + +let event_kernel_log_application_error = event_kernel_log Application Error + +let event_kernel_log_simulation_error = event_kernel_log Simulation Error + +let event_kernel_log_application_fatal = event_kernel_log Application Fatal + +let event_kernel_log_simulation_fatal = event_kernel_log Simulation Fatal + let received_upgrade payload = emit received_upgrade payload let pending_upgrade (upgrade : Ethereum_types.Upgrade.t) = @@ -136,3 +170,14 @@ let shutdown_rpc_server ~private_ = let shutdown_node ~exit_status = emit event_shutdown_node exit_status let callback_log ~uri ~meth ~body = emit event_callback_log (uri, meth, body) + +let event_kernel_log ~level ~kind ~msg = + match (level, kind) with + | Debug, Application -> emit event_kernel_log_application_debug msg + | Debug, Simulation -> emit event_kernel_log_simulation_debug msg + | Info, Application -> emit event_kernel_log_application_info msg + | Info, Simulation -> emit event_kernel_log_simulation_info msg + | Error, Application -> emit event_kernel_log_application_error msg + | Error, Simulation -> emit event_kernel_log_simulation_error msg + | Fatal, Application -> emit event_kernel_log_application_fatal msg + | Fatal, Simulation -> emit event_kernel_log_simulation_fatal msg diff --git a/etherlink/bin_node/lib_prod/events.mli b/etherlink/bin_node/lib_prod/events.mli index 923cb04c8cd59d86e49556a55603e4ca5bf5c8f6..7e6bb865f7a783e11654376ef6b93ad890c0ada6 100644 --- a/etherlink/bin_node/lib_prod/events.mli +++ b/etherlink/bin_node/lib_prod/events.mli @@ -58,3 +58,11 @@ val shutdown_node : exit_status:int -> unit Lwt.t (** [callback_log ~uri ~meth ~body] is used as the debug event used as callback for resto to logs the requests. *) val callback_log : uri:string -> meth:string -> body:string -> unit Lwt.t + +type kernel_log_kind = Application | Simulation + +type kernel_log_level = Debug | Info | Error | Fatal + +(** Logs kernel log [Debug]. *) +val event_kernel_log : + level:kernel_log_level -> kind:kernel_log_kind -> msg:string -> unit Lwt.t diff --git a/etherlink/bin_node/lib_prod/evm_context.ml b/etherlink/bin_node/lib_prod/evm_context.ml index b15f150417da5315d4546a55bddcdd14a0d9175e..75b63a030d832183ee8500e1d7441d0cdebf35c1 100644 --- a/etherlink/bin_node/lib_prod/evm_context.ml +++ b/etherlink/bin_node/lib_prod/evm_context.ml @@ -5,11 +5,28 @@ (* *) (*****************************************************************************) +type init_status = Loaded | Created + +type head = { + current_block_hash : Ethereum_types.block_hash; + next_blueprint_number : Ethereum_types.quantity; + evm_state : Evm_state.t; +} + +type parameters = { + kernel_path : string option; + data_dir : string; + preimages : string; + preimages_endpoint : Uri.t option; + smart_rollup_address : string; +} + type session_state = { mutable context : Irmin_context.rw; mutable next_blueprint_number : Ethereum_types.quantity; mutable current_block_hash : Ethereum_types.block_hash; mutable pending_upgrade : Ethereum_types.Upgrade.t option; + mutable evm_state : Evm_state.t; } type t = { @@ -18,508 +35,922 @@ type t = { preimages : string; preimages_endpoint : Uri.t option; smart_rollup_address : Tezos_crypto.Hashed.Smart_rollup_address.t; - blueprint_watcher : Blueprint_types.t Lwt_watcher.input; - store : Store.t; + store : Evm_store.t; session : session_state; - head_lock : Lwt_mutex.t; - (* TODO: https://gitlab.com/tezos/tezos/-/issues/7073 - [head_lock] is only necessary because several workers can modify the - current HEAD of the chain *) } -let with_store_transaction ctxt k = - Store.with_transaction ctxt.store (fun txn_store -> - k {ctxt with store = txn_store}) - -let store_path ~data_dir = Filename.Infix.(data_dir // "store") - -let load ~data_dir index = - let open Lwt_result_syntax in - let* store = Store.init ~data_dir in - let* latest = Store.Context_hashes.find_latest store in - match latest with - | Some (Qty latest_blueprint_number, checkpoint) -> - let*! context = Irmin_context.checkout_exn index checkpoint in - let*! evm_state = Irmin_context.PVMState.get context in - let+ current_block_hash = Evm_state.current_block_hash evm_state in - ( store, - context, - Ethereum_types.Qty Z.(succ latest_blueprint_number), - current_block_hash, - true ) - | None -> - let context = Irmin_context.empty index in - return +let blueprint_watcher : Blueprint_types.with_events Lwt_watcher.input = + Lwt_watcher.create_input () + +let session_to_head_info session = + { + evm_state = session.evm_state; + next_blueprint_number = session.next_blueprint_number; + current_block_hash = session.current_block_hash; + } + +module Types = struct + type state = t + + type nonrec parameters = parameters +end + +module Name = struct + type t = unit + + let encoding = Data_encoding.unit + + let base = Evm_context_events.section @ ["worker"] + + let pp _fmt () = () + + let equal () () = true +end + +module Request = struct + type (_, _) t = + | Apply_evm_events : { + finalized_level : int32 option; + events : Ethereum_types.Evm_events.t list; + } + -> (unit, tztrace) t + | Apply_blueprint : { + timestamp : Time.Protocol.t; + payload : Blueprint_types.payload; + delayed_transactions : Ethereum_types.hash list; + } + -> (unit, tztrace) t + | Last_produce_blueprint : (Blueprint_types.t, tztrace) t + | Blueprint : { + level : Ethereum_types.quantity; + } + -> (Blueprint_types.with_events option, tztrace) t + | Blueprints_range : { + from : Ethereum_types.quantity; + to_ : Ethereum_types.quantity; + } + -> ((Ethereum_types.quantity * Blueprint_types.payload) list, tztrace) t + | Last_known_L1_level : (int32 option, tztrace) t + | New_last_known_L1_level : int32 -> (unit, tztrace) t + | Delayed_inbox_hashes : (Ethereum_types.hash list, tztrace) t + + type view = View : _ t -> view + + let view req = View req + + let encoding = + let open Data_encoding in + union + [ + case + (Tag 0) + ~title:"Apply_evm_events" + (obj3 + (req "request" (constant "apply_evm_events")) + (opt "finalized_level" int32) + (req "events" (list Ethereum_types.Evm_events.encoding))) + (function + | View (Apply_evm_events {finalized_level; events}) -> + Some ((), finalized_level, events) + | _ -> None) + (fun ((), finalized_level, events) -> + View (Apply_evm_events {finalized_level; events})); + case + (Tag 1) + ~title:"Apply_blueprint" + (obj4 + (req "request" (constant "apply_blueprint")) + (req "timestamp" Time.Protocol.encoding) + (req "payload" Blueprint_types.payload_encoding) + (req "delayed_transactions" (list Ethereum_types.hash_encoding))) + (function + | View (Apply_blueprint {timestamp; payload; delayed_transactions}) + -> + Some ((), timestamp, payload, delayed_transactions) + | _ -> None) + (fun ((), timestamp, payload, delayed_transactions) -> + View (Apply_blueprint {timestamp; payload; delayed_transactions})); + case + (Tag 2) + ~title:"Last_produce_blueprint" + (obj1 (req "request" (constant "last_produce_blueprint"))) + (function View Last_produce_blueprint -> Some () | _ -> None) + (fun () -> View Last_produce_blueprint); + case + (Tag 4) + ~title:"Blueprint" + (obj2 + (req "request" (constant "blueprint")) + (req "level" Ethereum_types.quantity_encoding)) + (function View (Blueprint {level}) -> Some ((), level) | _ -> None) + (fun ((), level) -> View (Blueprint {level})); + case + (Tag 5) + ~title:"Blueprints_range" + (obj3 + (req "request" (constant "Blueprints_range")) + (req "from" Ethereum_types.quantity_encoding) + (req "to" Ethereum_types.quantity_encoding)) + (function + | View (Blueprints_range {from; to_}) -> Some ((), from, to_) + | _ -> None) + (fun ((), from, to_) -> View (Blueprints_range {from; to_})); + case + (Tag 6) + ~title:"Last_known_L1_level" + (obj1 (req "request" (constant "last_known_l1_level"))) + (function View Last_known_L1_level -> Some () | _ -> None) + (fun () -> View Last_known_L1_level); + case + (Tag 7) + ~title:"New_last_known_L1_level" + (obj2 + (req "request" (constant "new_last_known_l1_level")) + (req "value" int32)) + (function + | View (New_last_known_L1_level l) -> Some ((), l) | _ -> None) + (fun ((), l) -> View (New_last_known_L1_level l)); + case + (Tag 8) + ~title:"Delayed_inbox_hashes" + (obj1 (req "request" (constant "Delayed_inbox_hashes"))) + (function View Delayed_inbox_hashes -> Some () | _ -> None) + (fun () -> View Delayed_inbox_hashes); + ] + + let pp ppf view = + Data_encoding.Json.pp ppf @@ Data_encoding.Json.construct encoding view +end + +let head_info, head_info_waker = Lwt.task () + +let init_status, init_status_waker = Lwt.task () + +let execution_config, execution_config_waker = Lwt.task () + +module State = struct + let with_store_transaction ctxt k = + Evm_store.with_transaction ctxt.store (fun txn_store -> + k {ctxt with store = txn_store}) + + let store_path ~data_dir = Filename.Infix.(data_dir // "store") + + let load ~data_dir index = + let open Lwt_result_syntax in + let* store = Evm_store.init ~data_dir in + let* latest = Evm_store.Context_hashes.find_latest store in + match latest with + | Some (Qty latest_blueprint_number, checkpoint) -> + let*! context = Irmin_context.checkout_exn index checkpoint in + let*! evm_state = Irmin_context.PVMState.get context in + let+ current_block_hash = Evm_state.current_block_hash evm_state in ( store, context, - Ethereum_types.Qty Z.zero, - Ethereum_types.genesis_parent_hash, - false ) - -let commit_next_head (ctxt : t) evm_state = - let open Lwt_result_syntax in - let*! context = Irmin_context.PVMState.set ctxt.session.context evm_state in - let*! checkpoint = Irmin_context.commit context in - let* () = - Store.Context_hashes.store + Ethereum_types.Qty Z.(succ latest_blueprint_number), + current_block_hash, + Loaded ) + | None -> + let context = Irmin_context.empty index in + return + ( store, + context, + Ethereum_types.Qty Z.zero, + Ethereum_types.genesis_parent_hash, + Created ) + + let commit store context evm_state number = + let open Lwt_result_syntax in + let*! context = Irmin_context.PVMState.set context evm_state in + let*! checkpoint = Irmin_context.commit context in + let* () = Evm_store.Context_hashes.store store number checkpoint in + return context + + let commit_next_head (ctxt : t) evm_state = + commit ctxt.store + ctxt.session.context + evm_state ctxt.session.next_blueprint_number - checkpoint - in - return context -let replace_current_commit (ctxt : t) evm_state = - let open Lwt_result_syntax in - let (Qty next) = ctxt.session.next_blueprint_number in - let*! context = Irmin_context.PVMState.set ctxt.session.context evm_state in - let*! checkpoint = Irmin_context.commit context in - let* () = - Store.Context_hashes.store ctxt.store (Qty Z.(pred next)) checkpoint - in - return context + let replace_current_commit (ctxt : t) evm_state = + let (Qty next) = ctxt.session.next_blueprint_number in + commit ctxt.store ctxt.session.context evm_state (Qty Z.(pred next)) + + let inspect ctxt path = + let open Lwt_syntax in + let* res = Evm_state.inspect ctxt.session.evm_state path in + return res + + let on_modified_head ctxt evm_state context = + ctxt.session.evm_state <- evm_state ; + ctxt.session.context <- context + + let apply_evm_event_unsafe on_success ctxt evm_state event = + let open Lwt_result_syntax in + let open Ethereum_types in + let*! () = Evm_events_follower_events.new_event event in + match event with + | Evm_events.Upgrade_event upgrade -> + let on_success session = + session.pending_upgrade <- Some upgrade ; + on_success session + in + let payload = + Ethereum_types.Upgrade.to_bytes upgrade |> String.of_bytes + in + let*! evm_state = + Evm_state.modify + ~key:Durable_storage_path.kernel_upgrade + ~value:payload + evm_state + in + let* () = + Evm_store.Kernel_upgrades.store + ctxt.store + ctxt.session.next_blueprint_number + upgrade + in + let*! () = Events.pending_upgrade upgrade in + return (evm_state, on_success) + | Sequencer_upgrade_event sequencer_upgrade -> + let payload = + Sequencer_upgrade.to_bytes sequencer_upgrade |> String.of_bytes + in + let*! evm_state = + Evm_state.modify + ~key:Durable_storage_path.sequencer_upgrade + ~value:payload + evm_state + in + return (evm_state, on_success) + | Blueprint_applied {number = Qty number; hash = expected_block_hash} -> ( + let* block_hash_opt = + let*! bytes = + inspect + ctxt + (Durable_storage_path.Indexes.block_by_number (Nth number)) + in + return (Option.map decode_block_hash bytes) + in + match block_hash_opt with + | Some found_block_hash -> + if found_block_hash = expected_block_hash then + let*! () = + Evm_events_follower_events.upstream_blueprint_applied + (number, expected_block_hash) + in + return (evm_state, on_success) + else + let*! () = + Evm_events_follower_events.diverged + (number, expected_block_hash, found_block_hash) + in + tzfail + (Node_error.Diverged + (number, expected_block_hash, Some found_block_hash)) + | None -> + let*! () = + Evm_events_follower_events.missing_block + (number, expected_block_hash) + in + tzfail (Node_error.Diverged (number, expected_block_hash, None))) + | New_delayed_transaction delayed_transaction -> + let*! data_dir, config = execution_config in + let* evm_state = + Evm_state.execute + ~data_dir + ~config + evm_state + [ + `Input + ("\254" + ^ Bytes.to_string + (Delayed_transaction.to_rlp delayed_transaction)); + ] + in + let* () = + Evm_store.Delayed_transactions.store + ctxt.store + ctxt.session.next_blueprint_number + delayed_transaction + in + return (evm_state, on_success) + + let apply_evm_events ?finalized_level (ctxt : t) events = + let open Lwt_result_syntax in + let* context, evm_state, on_success = + with_store_transaction ctxt @@ fun ctxt -> + let* on_success, ctxt, evm_state = + List.fold_left_es + (fun (on_success, ctxt, evm_state) event -> + let* evm_state, on_success = + apply_evm_event_unsafe on_success ctxt evm_state event + in + return (on_success, ctxt, evm_state)) + (ignore, ctxt, ctxt.session.evm_state) + events + in + let* _ = + Option.map_es + (Evm_store.L1_latest_known_level.store ctxt.store) + finalized_level + in + let* ctxt = replace_current_commit ctxt evm_state in + return (ctxt, evm_state, on_success) + in + on_modified_head ctxt evm_state context ; + on_success ctxt.session ; + return_unit + + type error += Cannot_apply_blueprint of {local_state_level : Z.t} + + let () = + register_error_kind + `Permanent + ~id:"evm_node_prod_cannot_apply_blueprint" + ~title:"Cannot apply a blueprint" + ~description: + "The EVM node could not apply a blueprint on top of its local EVM \ + state." + ~pp:(fun ppf local_state_level -> + Format.fprintf + ppf + "The EVM node could not apply a blueprint on top of its local EVM \ + state at level %a." + Z.pp_print + local_state_level) + Data_encoding.(obj1 (req "current_state_level" n)) + (function + | Cannot_apply_blueprint {local_state_level} -> Some local_state_level + | _ -> None) + (fun local_state_level -> Cannot_apply_blueprint {local_state_level}) + + let check_pending_upgrade ctxt timestamp = + match ctxt.session.pending_upgrade with + | None -> None + | Some upgrade -> + if Time.Protocol.(upgrade.timestamp <= timestamp) then Some upgrade.hash + else None + + let check_upgrade ctxt evm_state = + let open Lwt_result_syntax in + function + | Some root_hash -> + let* () = + Evm_store.Kernel_upgrades.record_apply + ctxt.store + ctxt.session.next_blueprint_number + in -let evm_state ctxt = Irmin_context.PVMState.get ctxt.session.context + let*! bytes = + Evm_state.inspect evm_state Durable_storage_path.kernel_root_hash + in + let new_hash_candidate = + Option.map + (fun bytes -> + let (`Hex hex) = Hex.of_bytes bytes in + Ethereum_types.hash_of_string hex) + bytes + in -let inspect ctxt path = - let open Lwt_syntax in - let* evm_state = evm_state ctxt in - Evm_state.inspect evm_state path + let*! () = + match new_hash_candidate with + | Some current_root_hash when root_hash = current_root_hash -> + Events.applied_upgrade + root_hash + ctxt.session.next_blueprint_number + | _ -> + Events.failed_upgrade root_hash ctxt.session.next_blueprint_number + in -let on_modified_head ctxt context = ctxt.session.context <- context + return_true + | None -> return_false + + (** [apply_blueprint_store_unsafe ctxt payload delayed_transactions] applies + the blueprint [payload] on the head of [ctxt], and commit the resulting + state to Irmin and the node’s store. + + However, it does not modifies [ctxt] to make it aware of the new state. + This is because [apply_blueprint_store_unsafe] is expected to be called + within a SQL transaction to make sure the node’s store is not left in an + inconsistent state in case of error. *) + let apply_blueprint_store_unsafe ctxt timestamp payload delayed_transactions = + let open Lwt_result_syntax in + Evm_store.assert_in_transaction ctxt.store ; + let*! data_dir, config = execution_config in + let (Qty next) = ctxt.session.next_blueprint_number in + + let* try_apply = + Evm_state.apply_blueprint ~data_dir ~config ctxt.session.evm_state payload + in -let apply_evm_event_unsafe on_success ctxt evm_state event = - let open Lwt_result_syntax in - let open Ethereum_types in - let*! () = Evm_events_follower_events.new_event event in - match event with - | Evm_events.Upgrade_event upgrade -> - let on_success session = - session.pending_upgrade <- Some upgrade ; - on_success session - in - let payload = - Ethereum_types.Upgrade.to_bytes upgrade |> String.of_bytes - in - let*! evm_state = - Evm_state.modify - ~key:Durable_storage_path.kernel_upgrade - ~value:payload - evm_state - in - let* () = - Store.Kernel_upgrades.store - ctxt.store - ctxt.session.next_blueprint_number - upgrade - in - let*! () = Events.pending_upgrade upgrade in - return (evm_state, on_success) - | Sequencer_upgrade_event sequencer_upgrade -> - let payload = - Sequencer_upgrade.to_bytes sequencer_upgrade |> String.of_bytes - in - let*! evm_state = - Evm_state.modify - ~key:Durable_storage_path.sequencer_upgrade - ~value:payload - evm_state - in - return (evm_state, on_success) - | Blueprint_applied {number = Qty number; hash = expected_block_hash} -> ( - let* block_hash_opt = - let*! bytes = - inspect - ctxt - (Durable_storage_path.Indexes.block_by_number (Nth number)) + match try_apply with + | Apply_success + (evm_state, Block_height blueprint_number, current_block_hash) + when Z.equal blueprint_number next -> + let* () = + Evm_store.Blueprints.store + ctxt.store + {number = Qty blueprint_number; timestamp; payload} in - return (Option.map decode_block_hash bytes) - in - match block_hash_opt with - | Some found_block_hash -> - if found_block_hash = expected_block_hash then - let*! () = - Evm_events_follower_events.upstream_blueprint_applied - (number, expected_block_hash) - in - return (evm_state, on_success) + + let root_hash_candidate = check_pending_upgrade ctxt timestamp in + let* applied_upgrade = + check_upgrade ctxt evm_state root_hash_candidate + in + + let* delayed_transactions = + List.map_es + (fun hash -> + let* delayed_transaction = + Evm_store.Delayed_transactions.at_hash ctxt.store hash + in + match delayed_transaction with + | None -> + failwith + "Delayed transaction %a is missing from store" + Ethereum_types.pp_hash + hash + | Some delayed_transaction -> return delayed_transaction) + delayed_transactions + in + let* context = commit_next_head ctxt evm_state in + return + ( evm_state, + context, + current_block_hash, + applied_upgrade, + delayed_transactions ) + | Apply_success _ (* Produced a block, but not of the expected height *) + | Apply_failure (* Did not produce a block *) -> + (* TODO: https://gitlab.com/tezos/tezos/-/issues/6826 *) + let*! () = Blueprint_events.invalid_blueprint_produced next in + tzfail (Cannot_apply_blueprint {local_state_level = Z.pred next}) + + let on_new_head ctxt ~applied_upgrade evm_state context block_hash + blueprint_with_events = + let open Lwt_syntax in + let (Qty level) = ctxt.session.next_blueprint_number in + ctxt.session.evm_state <- evm_state ; + ctxt.session.context <- context ; + ctxt.session.next_blueprint_number <- Qty (Z.succ level) ; + ctxt.session.current_block_hash <- block_hash ; + Lwt_watcher.notify blueprint_watcher blueprint_with_events ; + if applied_upgrade then ctxt.session.pending_upgrade <- None ; + let* head_info in + head_info := session_to_head_info ctxt.session ; + Blueprint_events.blueprint_applied (level, block_hash) + + let apply_blueprint ctxt timestamp payload delayed_transactions = + let open Lwt_result_syntax in + let* ( evm_state, + context, + current_block_hash, + applied_upgrade, + delayed_transactions ) = + with_store_transaction ctxt @@ fun ctxt -> + apply_blueprint_store_unsafe ctxt timestamp payload delayed_transactions + in + let*! () = + on_new_head + ctxt + ~applied_upgrade + evm_state + context + current_block_hash + { + delayed_transactions; + blueprint = + {number = ctxt.session.next_blueprint_number; timestamp; payload}; + } + in + return_unit + + let init ?kernel_path ~data_dir ~preimages ~preimages_endpoint + ~smart_rollup_address () = + let open Lwt_result_syntax in + let*! () = + Lwt_utils_unix.create_dir (Evm_state.kernel_logs_directory ~data_dir) + in + let* index = + Irmin_context.load ~cache_size:100_000 Read_write (store_path ~data_dir) + in + let destination = + Tezos_crypto.Hashed.Smart_rollup_address.of_string_exn + smart_rollup_address + in + let* store, context, next_blueprint_number, current_block_hash, init_status + = + load ~data_dir index + in + let* pending_upgrade = + Evm_store.Kernel_upgrades.find_latest_pending store + in + + let* evm_state, context = + match kernel_path with + | Some kernel -> + if init_status = Loaded then + let*! () = Events.ignored_kernel_arg () in + let*! evm_state = Irmin_context.PVMState.get context in + return (evm_state, context) else - let*! () = - Evm_events_follower_events.diverged - (number, expected_block_hash, found_block_hash) - in - tzfail - (Node_error.Diverged - (number, expected_block_hash, Some found_block_hash)) + let* evm_state = Evm_state.init ~kernel in + let (Qty next) = next_blueprint_number in + let* context = commit store context evm_state (Qty Z.(pred next)) in + return (evm_state, context) | None -> - let*! () = - Evm_events_follower_events.missing_block - (number, expected_block_hash) - in - tzfail (Node_error.Diverged (number, expected_block_hash, None))) + if init_status = Loaded then + let*! evm_state = Irmin_context.PVMState.get context in + return (evm_state, context) + else + failwith + "Cannot compute the initial EVM state without the path to the \ + initial kernel" + in -let apply_evm_events ~finalized_level (ctxt : t) events = - let open Lwt_result_syntax in - Lwt_mutex.with_lock ctxt.head_lock @@ fun () -> - let* context, on_success = - with_store_transaction ctxt @@ fun ctxt -> - let*! evm_state = evm_state ctxt in - let* on_success, ctxt, evm_state = - List.fold_left_es - (fun (on_success, ctxt, evm_state) event -> - let* evm_state, on_success = - apply_evm_event_unsafe on_success ctxt evm_state event - in - return (on_success, ctxt, evm_state)) - (ignore, ctxt, evm_state) - events + let ctxt = + { + index; + data_dir; + preimages; + preimages_endpoint; + smart_rollup_address = destination; + session = + { + context; + next_blueprint_number; + current_block_hash; + pending_upgrade; + evm_state; + }; + store; + } in - let* _ = Store.L1_latest_known_level.store ctxt.store finalized_level in - let* ctxt = replace_current_commit ctxt evm_state in - return (ctxt, on_success) - in - on_modified_head ctxt context ; - on_success ctxt.session ; - return_unit -let execution_config ctxt = - Config.config - ~preimage_directory:ctxt.preimages - ?preimage_endpoint:ctxt.preimages_endpoint - ~kernel_debug:true - ~destination:ctxt.smart_rollup_address - () - -type error += Cannot_apply_blueprint of {local_state_level : Z.t} - -let () = - register_error_kind - `Permanent - ~id:"evm_node_prod_cannot_apply_blueprint" - ~title:"Cannot apply a blueprint" - ~description: - "The EVM node could not apply a blueprint on top of its local EVM state." - ~pp:(fun ppf local_state_level -> - Format.fprintf - ppf - "The EVM node could not apply a blueprint on top of its local EVM \ - state at level %a." - Z.pp_print - local_state_level) - Data_encoding.(obj1 (req "current_state_level" n)) - (function - | Cannot_apply_blueprint {local_state_level} -> Some local_state_level - | _ -> None) - (fun local_state_level -> Cannot_apply_blueprint {local_state_level}) - -let check_pending_upgrade ctxt timestamp = - match ctxt.session.pending_upgrade with - | None -> None - | Some upgrade -> - if Time.Protocol.(upgrade.timestamp <= timestamp) then Some upgrade.hash - else None - -let check_upgrade ctxt evm_state = - let open Lwt_result_syntax in - function - | Some root_hash -> - let* () = - Store.Kernel_upgrades.record_apply - ctxt.store - ctxt.session.next_blueprint_number - in + let*! () = + Option.iter_s + (fun upgrade -> Events.pending_upgrade upgrade) + pending_upgrade + in + + return (ctxt, init_status) - let*! bytes = - Evm_state.inspect evm_state Durable_storage_path.kernel_root_hash + let init_from_rollup_node ~data_dir ~rollup_node_data_dir = + let open Lwt_result_syntax in + let* Sc_rollup_block.(_, {context; _}) = + let open Rollup_node_storage in + let* last_finalized_level, levels_to_hashes, l2_blocks = + Rollup_node_storage.load ~rollup_node_data_dir () in - let new_hash_candidate = - Option.map - (fun bytes -> - let (`Hex hex) = Hex.of_bytes bytes in - Ethereum_types.hash_of_string hex) - bytes + let* final_level = Last_finalized_level.read last_finalized_level in + let*? final_level = + Option.to_result + ~none: + [ + error_of_fmt + "Rollup node storage is missing the last finalized level"; + ] + final_level in - - let*! () = - match new_hash_candidate with - | Some current_root_hash when root_hash = current_root_hash -> - Events.applied_upgrade root_hash ctxt.session.next_blueprint_number - | _ -> - Events.failed_upgrade root_hash ctxt.session.next_blueprint_number + let* final_level_hash = + Levels_to_hashes.find levels_to_hashes final_level in + let*? final_level_hash = + Option.to_result + ~none: + [ + error_of_fmt + "Rollup node has no block hash for the l1 level %ld" + final_level; + ] + final_level_hash + in + let* final_l2_block = L2_blocks.read l2_blocks final_level_hash in + Lwt.return + @@ Option.to_result + ~none: + [ + error_of_fmt + "Rollup node has no l2 blocks for the l1 block hash %a" + Block_hash.pp + final_level_hash; + ] + final_l2_block + in + let checkpoint = + Smart_rollup_context_hash.to_bytes context |> Context_hash.of_bytes_exn + in + let rollup_node_context_dir = + Filename.Infix.(rollup_node_data_dir // "context") + in + let* rollup_node_index = + Irmin_context.load ~cache_size:100_000 Read_only rollup_node_context_dir + in + let evm_context_dir = store_path ~data_dir in + let*! () = Lwt_utils_unix.create_dir evm_context_dir in + let* () = + Irmin_context.export_snapshot + rollup_node_index + checkpoint + ~path:evm_context_dir + in + let* evm_node_index = + Irmin_context.load ~cache_size:100_000 Read_write evm_context_dir + in + let*! evm_node_context = + Irmin_context.checkout_exn evm_node_index checkpoint + in + let*! evm_state = Irmin_context.PVMState.get evm_node_context in - return_true - | None -> return_false + (* Tell the kernel that it is executed by an EVM node *) + let*! evm_state = Evm_state.flag_local_exec evm_state in + (* We remove the delayed inbox from the EVM state. Its contents will be + retrieved by the sequencer by inspecting the evm events. *) + let*! evm_state = Evm_state.clear_delayed_inbox evm_state in -(** [apply_blueprint_store_unsafe ctxt payload] applies the blueprint [payload] - on the head of [ctxt], and commit the resulting state to Irmin and the - node’s store. + (* For changes made to [evm_state] to take effect, we commit the result *) + let*! evm_node_context = + Irmin_context.PVMState.set evm_node_context evm_state + in + let*! checkpoint = Irmin_context.commit evm_node_context in - However, it does not modifies [ctxt] to make it aware of the new state. - This is because [apply_blueprint_store_unsafe] is expected to be called - within a SQL transaction to make sure the node’s store is not left in an - inconsistent state in case of error. *) -let apply_blueprint_store_unsafe ctxt timestamp payload = - let open Lwt_result_syntax in - Store.assert_in_transaction ctxt.store ; - let*! evm_state = evm_state ctxt in - let config = execution_config ctxt in - let (Qty next) = ctxt.session.next_blueprint_number in - - let* try_apply = Evm_state.apply_blueprint ~config evm_state payload in - - match try_apply with - | Apply_success (evm_state, Block_height blueprint_number, current_block_hash) - when Z.equal blueprint_number next -> - let* () = - Store.Executable_blueprints.store - ctxt.store - {number = Qty blueprint_number; timestamp; payload} + (* Assert we can read the current blueprint number *) + let* current_blueprint_number = + let*! current_blueprint_number_opt = + Evm_state.inspect evm_state Durable_storage_path.Block.current_number in + match current_blueprint_number_opt with + | Some bytes -> return (Bytes.to_string bytes |> Z.of_bits) + | None -> failwith "The blueprint number was not found" + in - let root_hash_candidate = check_pending_upgrade ctxt timestamp in - let* applied_upgrade = check_upgrade ctxt evm_state root_hash_candidate in - - let* context = commit_next_head ctxt evm_state in - return (context, current_block_hash, applied_upgrade) - | Apply_success _ (* Produced a block, but not of the expected height *) - | Apply_failure (* Did not produce a block *) -> - (* TODO: https://gitlab.com/tezos/tezos/-/issues/6826 *) - let*! () = Blueprint_events.invalid_blueprint_produced next in - tzfail (Cannot_apply_blueprint {local_state_level = Z.pred next}) - -let on_new_head ctxt ~applied_upgrade timestamp context block_hash payload = - let (Qty level) = ctxt.session.next_blueprint_number in - ctxt.session.context <- context ; - ctxt.session.next_blueprint_number <- Qty (Z.succ level) ; - ctxt.session.current_block_hash <- block_hash ; - Lwt_watcher.notify - ctxt.blueprint_watcher - {number = Qty level; timestamp; payload} ; - if applied_upgrade then ctxt.session.pending_upgrade <- None ; - Blueprint_events.blueprint_applied (level, block_hash) - -let apply_and_publish_blueprint (ctxt : t) timestamp - (blueprint : Sequencer_blueprint.t) = - let open Lwt_result_syntax in - Lwt_mutex.with_lock ctxt.head_lock @@ fun () -> - let (Qty level) = ctxt.session.next_blueprint_number in - let* context, current_block_hash, applied_upgrade = - with_store_transaction ctxt @@ fun ctxt -> - let* current_block_hash = - apply_blueprint_store_unsafe ctxt timestamp blueprint.to_execute + (* Assert we can read the current block hash *) + let* () = + let*! current_block_hash_opt = + Evm_state.inspect evm_state Durable_storage_path.Block.current_hash + in + match current_block_hash_opt with + | Some _bytes -> return_unit + | None -> failwith "The block hash was not found" in + (* Init the store *) + let* store = Evm_store.init ~data_dir in let* () = - Store.Publishable_blueprints.store - ctxt.store - (Qty level) - blueprint.to_publish + Evm_store.Context_hashes.store + store + (Qty current_blueprint_number) + checkpoint in - return current_block_hash - in - let*! () = - on_new_head - ctxt - ~applied_upgrade - timestamp - context - current_block_hash - blueprint.to_execute - in - Blueprints_publisher.publish level blueprint.to_publish - -let apply_blueprint ctxt timestamp payload = + return_unit + + let last_produced_blueprint (ctxt : t) = + let open Lwt_result_syntax in + let (Qty next) = ctxt.session.next_blueprint_number in + let current = Ethereum_types.Qty Z.(pred next) in + let* blueprint = Evm_store.Blueprints.find ctxt.store current in + match blueprint with + | Some blueprint -> return blueprint + | None -> failwith "Could not fetch the last produced blueprint" + + let delayed_inbox_hashes evm_state = + let open Lwt_syntax in + let* keys = + Evm_state.subkeys + evm_state + Durable_storage_path.Delayed_transaction.hashes + in + let hashes = + (* Remove the empty, meta keys *) + List.filter_map + (fun key -> + if key = "" || key = "meta" then None + else Some (Ethereum_types.hash_of_string key)) + keys + in + return hashes + + let blueprint_with_events ctxt level = + let open Lwt_result_syntax in + let* blueprint = Evm_store.Blueprints.find ctxt.store level in + match blueprint with + | None -> return None + | Some blueprint -> + let* delayed_transactions = + Evm_store.Delayed_transactions.at_level ctxt.store level + in + return_some Blueprint_types.{delayed_transactions; blueprint} +end + +module Worker = Worker.MakeSingle (Name) (Request) (Types) + +type worker = Worker.infinite Worker.queue Worker.t + +module Handlers = struct + open Request + + type self = worker + + type launch_error = tztrace + + let on_launch _self () + { + kernel_path : string option; + data_dir : string; + preimages : string; + preimages_endpoint : Uri.t option; + smart_rollup_address : string; + } = + let open Lwt_result_syntax in + let* ctxt, status = + State.init + ?kernel_path + ~data_dir + ~preimages + ~preimages_endpoint + ~smart_rollup_address + () + in + Lwt.wakeup execution_config_waker + @@ ( ctxt.data_dir, + Config.config + ~preimage_directory:ctxt.preimages + ?preimage_endpoint:ctxt.preimages_endpoint + ~kernel_debug:true + ~destination:ctxt.smart_rollup_address + () ) ; + Lwt.wakeup init_status_waker status ; + let first_head = ref (session_to_head_info ctxt.session) in + Lwt.wakeup head_info_waker first_head ; + return ctxt + + let on_request : + type r request_error. + self -> (r, request_error) Request.t -> (r, request_error) result Lwt.t = + fun self request -> + let open Lwt_result_syntax in + match request with + | Apply_evm_events {finalized_level; events} -> + let ctxt = Worker.state self in + State.apply_evm_events ?finalized_level ctxt events + | Apply_blueprint {timestamp; payload; delayed_transactions} -> + let ctxt = Worker.state self in + State.apply_blueprint ctxt timestamp payload delayed_transactions + | Last_produce_blueprint -> + let ctxt = Worker.state self in + State.last_produced_blueprint ctxt + | Blueprint {level} -> + let ctxt = Worker.state self in + State.blueprint_with_events ctxt level + | Blueprints_range {from; to_} -> + let ctxt = Worker.state self in + Evm_store.Blueprints.find_range ctxt.store ~from ~to_ + | Last_known_L1_level -> + let ctxt = Worker.state self in + Evm_store.L1_latest_known_level.find ctxt.store + | New_last_known_L1_level l -> + let ctxt = Worker.state self in + Evm_store.L1_latest_known_level.store ctxt.store l + | Delayed_inbox_hashes -> + let ctxt = Worker.state self in + let*! hashes = State.delayed_inbox_hashes ctxt.session.evm_state in + return hashes + + let on_completion (type a err) _self (_r : (a, err) Request.t) (_res : a) _st + = + Lwt_syntax.return_unit + + let on_no_request _self = Lwt.return_unit + + let on_close _self = Lwt.return_unit + + let on_error (type a b) _self _st (req : (a, b) Request.t) (errs : b) : + unit tzresult Lwt.t = + let open Lwt_result_syntax in + match (req, errs) with + | Apply_evm_events _, [Node_error.Diverged _divergence] -> + Lwt_exit.exit_and_raise Node_error.exit_code_when_diverge + | _ -> return_unit +end + +let table = Worker.create_table Queue + +let worker_promise, worker_waker = Lwt.task () + +type error += No_worker + +let worker = + lazy + (match Lwt.state worker_promise with + | Lwt.Return worker -> Ok worker + | Lwt.Fail e -> Error (TzTrace.make @@ error_of_exn e) + | Lwt.Sleep -> Error (TzTrace.make No_worker)) + +let bind_worker f = let open Lwt_result_syntax in - Lwt_mutex.with_lock ctxt.head_lock @@ fun () -> - let* context, current_block_hash, applied_upgrade = - with_store_transaction ctxt @@ fun ctxt -> - apply_blueprint_store_unsafe ctxt timestamp payload - in - let*! () = - on_new_head - ctxt - ~applied_upgrade - timestamp - context - current_block_hash - payload - in + let res = Lazy.force worker in + match res with + | Error [No_worker] -> + (* There is no worker, nothing to do *) + return_unit + | Error errs -> fail errs + | Ok w -> f w + +let worker_add_request ~request = + let open Lwt_result_syntax in + bind_worker @@ fun w -> + let*! (_pushed : bool) = Worker.Queue.push_request w request in return_unit -let init ?kernel_path ~data_dir ~preimages ~preimages_endpoint +let return_ : (_, _ Worker.message_error) result -> _ = + let open Lwt_result_syntax in + function + | Ok res -> return res + | Error (Closed (Some trace)) -> Lwt.return (Error trace) + | Error (Closed None) -> + failwith + "Cannot interact with the EVM context worker because it is closed" + | Error (Request_error err) -> Lwt.return (Error err) + | Error (Any exn) -> fail_with_exn exn + +let worker_wait_for_request req = + let open Lwt_result_syntax in + let*? w = Lazy.force worker in + let*! res = Worker.Queue.push_request_and_wait w req in + return_ res + +let start ?kernel_path ~data_dir ~preimages ~preimages_endpoint ~smart_rollup_address () = let open Lwt_result_syntax in - let* index = - Irmin_context.load ~cache_size:100_000 Read_write (store_path ~data_dir) - in - let destination = - Tezos_crypto.Hashed.Smart_rollup_address.of_string_exn smart_rollup_address - in - let* store, context, next_blueprint_number, current_block_hash, loaded = - load ~data_dir index - in - let* pending_upgrade = Store.Kernel_upgrades.find_latest_pending store in - let ctxt = - { - index; - data_dir; - preimages; - preimages_endpoint; - smart_rollup_address = destination; - session = - {context; next_blueprint_number; current_block_hash; pending_upgrade}; - blueprint_watcher = Lwt_watcher.create_input (); - store; - head_lock = Lwt_mutex.create (); - } + let* worker = + Worker.launch + table + () + { + kernel_path; + data_dir; + preimages; + preimages_endpoint; + smart_rollup_address; + } + (module Handlers) in + let*! () = Blueprint_events.publisher_is_ready () in + Lwt.wakeup worker_waker worker ; + let*! init_status in + let*! () = Evm_context_events.ready () in + return init_status - let* () = - match kernel_path with - | Some kernel -> - if loaded then - let*! () = Events.ignored_kernel_arg () in - return_unit - else - let* evm_state = Evm_state.init ~kernel in - let* context = replace_current_commit ctxt evm_state in - on_modified_head ctxt context ; - return_unit - | None -> - if loaded then return_unit - else - failwith - "Cannot compute the initial EVM state without the path to the \ - initial kernel" - in +let init_from_rollup_node = State.init_from_rollup_node - let*! () = - Option.iter_s - (fun upgrade -> Events.pending_upgrade upgrade) - pending_upgrade - in +let apply_evm_events ?finalized_level events = + worker_add_request ~request:(Apply_evm_events {finalized_level; events}) + +let apply_blueprint timestamp payload delayed_transactions = + worker_wait_for_request + (Apply_blueprint {timestamp; payload; delayed_transactions}) - return (ctxt, loaded) +let last_produced_blueprint () = worker_wait_for_request Last_produce_blueprint + +let head_info () = + let open Lwt_syntax in + let+ head_info in + !head_info -let init_from_rollup_node ~data_dir ~rollup_node_data_dir = +let execute_and_inspect ?wasm_entrypoint input = let open Lwt_result_syntax in - let* Sc_rollup_block.(_, {context; _}) = - let open Rollup_node_storage in - let* last_finalized_level, levels_to_hashes, l2_blocks = - Rollup_node_storage.load ~rollup_node_data_dir () - in - let* final_level = Last_finalized_level.read last_finalized_level in - let*? final_level = - Option.to_result - ~none: - [ - error_of_fmt - "Rollup node storage is missing the last finalized level"; - ] - final_level - in - let* final_level_hash = - Levels_to_hashes.find levels_to_hashes final_level - in - let*? final_level_hash = - Option.to_result - ~none: - [ - error_of_fmt - "Rollup node has no block hash for the l1 level %ld" - final_level; - ] - final_level_hash - in - let* final_l2_block = L2_blocks.read l2_blocks final_level_hash in - Lwt.return - @@ Option.to_result - ~none: - [ - error_of_fmt - "Rollup node has no l2 blocks for the l1 block hash %a" - Block_hash.pp - final_level_hash; - ] - final_l2_block - in - let checkpoint = - Smart_rollup_context_hash.to_bytes context |> Context_hash.of_bytes_exn - in - let rollup_node_context_dir = - Filename.Infix.(rollup_node_data_dir // "context") - in - let* rollup_node_index = - Irmin_context.load ~cache_size:100_000 Read_only rollup_node_context_dir - in - let evm_context_dir = store_path ~data_dir in - let*! () = Lwt_utils_unix.create_dir evm_context_dir in - let* () = - Irmin_context.export_snapshot - rollup_node_index - checkpoint - ~path:evm_context_dir - in - let* evm_node_index = - Irmin_context.load ~cache_size:100_000 Read_write evm_context_dir - in - let*! evm_node_context = - Irmin_context.checkout_exn evm_node_index checkpoint - in - let*! evm_state = Irmin_context.PVMState.get evm_node_context in + let*! {evm_state; _} = head_info () in + let*! data_dir, config = execution_config in + Evm_state.execute_and_inspect + ~data_dir + ?wasm_entrypoint + ~config + ~input + evm_state + +let inspect path = + let open Lwt_result_syntax in + let*! {evm_state; _} = head_info () in + let*! res = Evm_state.inspect evm_state path in + return res - (* Tell the kernel that it is executed by an EVM node *) - let*! evm_state = Evm_state.flag_local_exec evm_state in - (* We remove the delayed inbox from the EVM state. Its contents will be - retrieved by the sequencer by inspecting the rollup node durable storage. +let blueprints_watcher () = Lwt_watcher.create_stream blueprint_watcher - If we do not remove the delayed inbox from the state, the contents will - never be flushed (because of the distinction between executable / - publishable) *) - let*! evm_state = Evm_state.clear_delayed_inbox evm_state in +let blueprint level = worker_wait_for_request (Blueprint {level}) - (* For changes made to [evm_state] to take effect, we commit the result *) - let*! evm_node_context = - Irmin_context.PVMState.set evm_node_context evm_state - in - let*! checkpoint = Irmin_context.commit evm_node_context in +let blueprints_range from to_ = + worker_wait_for_request (Blueprints_range {from; to_}) - (* Assert we can read the current blueprint number *) - let* current_blueprint_number = - let*! current_blueprint_number_opt = - Evm_state.inspect evm_state Durable_storage_path.Block.current_number - in - match current_blueprint_number_opt with - | Some bytes -> return (Bytes.to_string bytes |> Z.of_bits) - | None -> failwith "The blueprint number was not found" - in +let last_known_l1_level () = worker_wait_for_request Last_known_L1_level - (* Assert we can read the current block hash *) - let* () = - let*! current_block_hash_opt = - Evm_state.inspect evm_state Durable_storage_path.Block.current_hash - in - match current_block_hash_opt with - | Some _bytes -> return_unit - | None -> failwith "The block hash was not found" - in - (* Init the store *) - let* store = Store.init ~data_dir in - let* () = - Store.Context_hashes.store store (Qty current_blueprint_number) checkpoint - in - return_unit +let new_last_known_l1_level l = + worker_add_request ~request:(New_last_known_L1_level l) -let execute_and_inspect ?wasm_entrypoint ~input ctxt = - let open Lwt_result_syntax in - let config = execution_config ctxt in - let*! evm_state = evm_state ctxt in - Evm_state.execute_and_inspect ?wasm_entrypoint ~config ~input evm_state +let delayed_inbox_hashes () = worker_wait_for_request Delayed_inbox_hashes -let last_produced_blueprint (ctxt : t) = +let shutdown () = let open Lwt_result_syntax in - let (Qty next) = ctxt.session.next_blueprint_number in - let current = Ethereum_types.Qty Z.(pred next) in - let* blueprint = Store.Executable_blueprints.find ctxt.store current in - match blueprint with - | Some blueprint -> return blueprint - | None -> failwith "Could not fetch the last produced blueprint" + bind_worker @@ fun w -> + let*! () = Evm_context_events.shutdown () in + let*! () = Worker.shutdown w in + return_unit diff --git a/etherlink/bin_node/lib_prod/evm_context.mli b/etherlink/bin_node/lib_prod/evm_context.mli index e13a5c1ce54601ff299f1498bf96af93cc917072..096fd2aa7149fc7305f63466ffc228bfea0a378f 100644 --- a/etherlink/bin_node/lib_prod/evm_context.mli +++ b/etherlink/bin_node/lib_prod/evm_context.mli @@ -5,42 +5,29 @@ (* *) (*****************************************************************************) -type session_state = { - mutable context : Irmin_context.rw; (** Irmin read and write context. *) - mutable next_blueprint_number : Ethereum_types.quantity; - (** Number for the next bluerpint to be produced. *) - mutable current_block_hash : Ethereum_types.block_hash; - (** Hash of the latest processed block *) - mutable pending_upgrade : Ethereum_types.Upgrade.t option; -} +type init_status = Loaded | Created -type t = { - data_dir : string; (** Data dir of the EVM node. *) - index : Irmin_context.rw_index; - preimages : string; (** Path to the preimages directory. *) - preimages_endpoint : Uri.t option; (** URI to fetch missing pre-images. *) - smart_rollup_address : Tezos_crypto.Hashed.Smart_rollup_address.t; - blueprint_watcher : Blueprint_types.t Lwt_watcher.input; - store : Store.t; - session : session_state; - head_lock : Lwt_mutex.t; - (** Lock to acquire to modify the head of the chain *) +type head = { + current_block_hash : Ethereum_types.block_hash; + next_blueprint_number : Ethereum_types.quantity; + evm_state : Evm_state.t; } -(** [init ~data_dir ~preimages ~preimages_endpoint ~smart_rollup_address ()] - creates a context where it initializes the {!type-index}, and use a - checkpoint mechanism to load the latest {!type-store} if any. +(** [start ~data_dir ~preimages ~preimages_endpoint ~smart_rollup_address ()] + creates a new worker to manage a local EVM context where it initializes the + {!type-index}, and use a checkpoint mechanism to load the latest + {!type-store} if any. - Returns an additional boolean telling if the context was loaded from disk - ([true]) or was initialized from scratch ([false]). *) -val init : + Returns a value telling if the context was loaded from disk + ([Loaded]) or was initialized from scratch ([Created]). *) +val start : ?kernel_path:string -> data_dir:string -> preimages:string -> preimages_endpoint:Uri.t option -> smart_rollup_address:string -> unit -> - (t * bool) tzresult Lwt.t + init_status tzresult Lwt.t (** [init_from_rollup_node ~data_dir ~rollup_node_data_dir ~inspect_current_blueprint_number] @@ -50,18 +37,20 @@ val init : val init_from_rollup_node : data_dir:string -> rollup_node_data_dir:string -> unit tzresult Lwt.t -(** [apply_evm_events ~finalized_level ctxt events] applies all the - events [events] on the local context [ctxt]. The events are - performed in a transactional context. *) +(** [apply_evm_events ~finalized_level events] applies all the + events [events] on the local context. The events are performed in a + transactional context. + + Stores [finalized_level] with {!new_last_known_l1_level} if provided. +*) val apply_evm_events : - finalized_level:int32 -> - t -> + ?finalized_level:int32 -> Ethereum_types.Evm_events.t list -> unit tzresult Lwt.t (** [inspect ctxt path] returns the value stored in [path] of the freshest EVM state, if it exists. *) -val inspect : t -> string -> bytes option Lwt.t +val inspect : string -> bytes option tzresult Lwt.t (** [execute_and_inspect ~input ctxt] executes [input] using the freshest EVM state, and returns [input.insights_requests]. @@ -70,21 +59,41 @@ val inspect : t -> string -> bytes option Lwt.t executed. *) val execute_and_inspect : ?wasm_entrypoint:string -> - input:Simulation.Encodings.simulate_input -> - t -> + Simulation.Encodings.simulate_input -> bytes option list tzresult Lwt.t -(** [last_produced_blueprint ctxt] returns the pair of publishable and - executable blueprints used to create the current head of the chain. *) -val last_produced_blueprint : t -> Blueprint_types.t tzresult Lwt.t +(** [last_produced_blueprint ctxt] returns the blueprint used to + create the current head of the chain. *) +val last_produced_blueprint : unit -> Blueprint_types.t tzresult Lwt.t -(** [apply_blueprint ctxt blueprint] applies [blueprint] in the freshest EVM - state stored under [ctxt]. It commits the result if the blueprint produces - the expected block. *) +(** [apply_blueprint timestamp payload delayed_transactions] applies + [payload] in the freshest EVM state stored under [ctxt] at + timestamp [timestamp], forwards the {!Blueprint_types.with_events}. + It commits the result if the blueprint produces the expected block. *) val apply_blueprint : - t -> Time.Protocol.t -> Blueprint_types.payload -> unit tzresult Lwt.t + Time.Protocol.t -> + Blueprint_types.payload -> + Ethereum_types.hash list -> + unit tzresult Lwt.t + +val head_info : unit -> head Lwt.t + +val blueprints_watcher : + unit -> Blueprint_types.with_events Lwt_stream.t * Lwt_watcher.stopper + +val blueprint : + Ethereum_types.quantity -> Blueprint_types.with_events option tzresult Lwt.t + +val blueprints_range : + Ethereum_types.quantity -> + Ethereum_types.quantity -> + (Ethereum_types.quantity * Blueprint_types.payload) list tzresult Lwt.t + +val last_known_l1_level : unit -> int32 option tzresult Lwt.t + +val new_last_known_l1_level : int32 -> unit tzresult Lwt.t + +val shutdown : unit -> unit tzresult Lwt.t -(** Same as {!apply_blueprint}, but additionally publish the blueprint if it is - correct. *) -val apply_and_publish_blueprint : - t -> Time.Protocol.t -> Sequencer_blueprint.t -> unit tzresult Lwt.t +(** [delayed_inbox_hashes ctxt] returns the hashes in the delayed inbox. *) +val delayed_inbox_hashes : unit -> Ethereum_types.hash list tzresult Lwt.t diff --git a/etherlink/bin_node/lib_prod/evm_context_events.ml b/etherlink/bin_node/lib_prod/evm_context_events.ml new file mode 100644 index 0000000000000000000000000000000000000000..270ba3d6badb6974fdd6c03d0616636224edee8f --- /dev/null +++ b/etherlink/bin_node/lib_prod/evm_context_events.ml @@ -0,0 +1,30 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2024 Nomadic Labs *) +(* *) +(*****************************************************************************) + +include Internal_event.Simple + +let section = Events.section @ ["evm_context"] + +let ready = + declare_0 + ~section + ~name:"evm_context_is_ready" + ~msg:"EVM Context worker is ready" + ~level:Info + () + +let shutdown = + declare_0 + ~section + ~name:"evm_context_shutdown" + ~msg:"EVM Context worker is shutting down" + ~level:Info + () + +let ready () = emit ready () + +let shutdown () = emit shutdown () diff --git a/etherlink/bin_node/lib_prod/evm_events_follower.ml b/etherlink/bin_node/lib_prod/evm_events_follower.ml index cc5312fdb8b4dedf2dcfc97be96b6eb4a05e01af..5db1d3e42a975d810b981aeb0c612d478bee27df 100644 --- a/etherlink/bin_node/lib_prod/evm_events_follower.ml +++ b/etherlink/bin_node/lib_prod/evm_events_follower.ml @@ -5,7 +5,10 @@ (* *) (*****************************************************************************) -type parameters = {rollup_node_endpoint : Uri.t; ctxt : Evm_context.t} +type parameters = { + rollup_node_endpoint : Uri.t; + filter_event : Ethereum_types.Evm_events.t -> bool; +} module StringSet = Set.Make (String) @@ -84,7 +87,7 @@ let fetch_event ({rollup_node_endpoint; _} : Types.state) rollup_block_lvl in return event_opt -let on_new_head ({rollup_node_endpoint; ctxt} as state : Types.state) +let on_new_head ({rollup_node_endpoint; filter_event} as state : Types.state) rollup_block_lvl = let open Lwt_result_syntax in let* nb_of_events_bytes = @@ -94,7 +97,7 @@ let on_new_head ({rollup_node_endpoint; ctxt} as state : Types.state) rollup_node_endpoint in match nb_of_events_bytes with - | None -> return_unit + | None -> Evm_context.new_last_known_l1_level rollup_block_lvl | Some nb_of_events_bytes -> let (Qty nb_of_events) = Ethereum_types.decode_number nb_of_events_bytes @@ -109,8 +112,13 @@ let on_new_head ({rollup_node_endpoint; ctxt} as state : Types.state) nb_of_events (fetch_event state rollup_block_lvl) in - let events = List.filter_map Fun.id events in - Evm_context.apply_evm_events ~finalized_level:rollup_block_lvl ctxt events + let events = + List.filter_map + (function + | Some event when filter_event event -> Some event | _ -> None) + events + in + Evm_context.apply_evm_events ~finalized_level:rollup_block_lvl events module Handlers = struct type self = worker @@ -140,12 +148,9 @@ module Handlers = struct (r, request_error) Request.t -> request_error -> unit tzresult Lwt.t = - fun _w _ req errs -> + fun _w _ _req _errs -> let open Lwt_result_syntax in - match (req, errs) with - | New_rollup_node_block _, [Node_error.Diverged _divergence] -> - Lwt_exit.exit_and_raise Node_error.exit_code_when_diverge - | _ -> return_unit + return_unit let on_completion _ _ _ _ = Lwt.return_unit @@ -164,8 +169,18 @@ let worker = lazy (match Lwt.state worker_promise with | Lwt.Return worker -> Ok worker - | Lwt.Fail e -> Error (TzTrace.make @@ error_of_exn e) - | Lwt.Sleep -> Error (TzTrace.make No_worker)) + | Lwt.Fail e -> Result_syntax.tzfail (error_of_exn e) + | Lwt.Sleep -> Result_syntax.tzfail No_worker) + +let bind_worker f = + let open Lwt_result_syntax in + let res = Lazy.force worker in + match res with + | Error [No_worker] -> + (* There is no worker, nothing to do *) + return_unit + | Error errs -> fail errs + | Ok w -> f w let start parameters = let open Lwt_result_syntax in @@ -174,23 +189,17 @@ let start parameters = Lwt.wakeup worker_waker worker let shutdown () = - let open Lwt_syntax in - let w = Lazy.force worker in - match w with - | Error _ -> - (* There is no events follower, nothing to do *) - Lwt.return_unit - | Ok w -> - let* () = Evm_events_follower_events.shutdown () in - Worker.shutdown w - -let worker_add_request ~request : unit tzresult Lwt.t = let open Lwt_result_syntax in - match Lazy.force worker with - | Ok w -> - let*! (_pushed : bool) = Worker.Queue.push_request w request in - return_unit - | Error e -> Lwt.return (Error e) + bind_worker @@ fun w -> + let*! () = Evm_events_follower_events.shutdown () in + let*! () = Worker.shutdown w in + return_unit + +let worker_add_request ~request = + let open Lwt_result_syntax in + bind_worker @@ fun w -> + let*! (_pushed : bool) = Worker.Queue.push_request w request in + return_unit let new_rollup_block rollup_level = worker_add_request ~request:(New_rollup_node_block rollup_level) diff --git a/etherlink/bin_node/lib_prod/evm_events_follower.mli b/etherlink/bin_node/lib_prod/evm_events_follower.mli index 95db737c8d5ff76a73ea807a0cca99652d6a84a4..bdb86f554ef031b21149bebb61c0c08a4a8c0584 100644 --- a/etherlink/bin_node/lib_prod/evm_events_follower.mli +++ b/etherlink/bin_node/lib_prod/evm_events_follower.mli @@ -8,14 +8,15 @@ type parameters = { rollup_node_endpoint : Uri.t; (** Rollup node endpoint used to monitor kernel events. *) - ctxt : Evm_context.t; + filter_event : Ethereum_types.Evm_events.t -> bool; + (** Filter event the follower applies. *) } (** [start parameters] starts the events follower. *) val start : parameters -> unit tzresult Lwt.t (** [shutdown ()] stops the events follower. *) -val shutdown : unit -> unit Lwt.t +val shutdown : unit -> unit tzresult Lwt.t (** [new_rollup_block rollup_level] tells the worker that a new L2 head has been published and that the rollup head is now diff --git a/etherlink/bin_node/lib_prod/evm_events_follower_events.ml b/etherlink/bin_node/lib_prod/evm_events_follower_events.ml index c024e7731136b72050e2ea24d5196cfa8b8d959f..eed15a4cfa2028bab7cbe57377a97feea1fb7794 100644 --- a/etherlink/bin_node/lib_prod/evm_events_follower_events.ml +++ b/etherlink/bin_node/lib_prod/evm_events_follower_events.ml @@ -32,7 +32,7 @@ module Event = struct ~section ~name:"evm_events_new_event" ~msg:"Evm events follower: applying {event}" - ~level:Notice + ~level:Debug ~pp1:Ethereum_types.Evm_events.pp ("event", Ethereum_types.Evm_events.encoding) diff --git a/etherlink/bin_node/lib_prod/evm_services.ml b/etherlink/bin_node/lib_prod/evm_services.ml index 89a8b9b3465dbd7dbecbb28dfbb2fa5ec1197353..db69f5709beb97ea5fb02e662d29de8da74ee111 100644 --- a/etherlink/bin_node/lib_prod/evm_services.ml +++ b/etherlink/bin_node/lib_prod/evm_services.ml @@ -20,7 +20,7 @@ let get_blueprint_service = Service.get_service ~description:"Fetch the contents of a blueprint" ~query:Query.empty - ~output:Blueprint_types.encoding + ~output:Blueprint_types.with_events_encoding Path.(evm_services_root / "blueprint" /: Arg.uint63) let blueprint_watcher_service = @@ -31,13 +31,13 @@ let blueprint_watcher_service = Service.get_service ~description:"Watch for new blueprints" ~query:level_query - ~output:Blueprint_types.encoding + ~output:Blueprint_types.with_events_encoding Path.(evm_services_root / "blueprints") -let create_blueprint_watcher_service (ctxt : Evm_context.t) from_level = +let create_blueprint_watcher_service from_level = let open Lwt_syntax in (* input source block creating a stream to observe the events *) - let* head_res = Evm_context.last_produced_blueprint ctxt in + let* head_res = Evm_context.last_produced_blueprint () in let (Qty head_level) = match head_res with | Ok head -> @@ -51,9 +51,7 @@ let create_blueprint_watcher_service (ctxt : Evm_context.t) from_level = in (* generate the next asynchronous event *) - let blueprint_stream, stopper = - Lwt_watcher.create_stream ctxt.blueprint_watcher - in + let blueprint_stream, stopper = Evm_context.blueprints_watcher () in let shutdown () = Lwt_watcher.shutdown stopper in let next = let next_level_requested = ref Z.(of_int64 from_level) in @@ -61,9 +59,7 @@ let create_blueprint_watcher_service (ctxt : Evm_context.t) from_level = if Z.Compare.(!next_level_requested <= head_level) then ( let current_request = !next_level_requested in (next_level_requested := Z.(succ current_request)) ; - let* blueprint = - Store.Executable_blueprints.find ctxt.store (Qty current_request) - in + let* blueprint = Evm_context.blueprint (Qty current_request) in match blueprint with | Ok (Some blueprint) -> return_some blueprint | Ok None -> return_none @@ -74,26 +70,25 @@ let create_blueprint_watcher_service (ctxt : Evm_context.t) from_level = in Tezos_rpc.Answer.return_stream {next; shutdown} -let register_get_smart_rollup_address_service ctxt dir = +let register_get_smart_rollup_address_service smart_rollup_address dir = Directory.register0 dir get_smart_rollup_address_service (fun () () -> let open Lwt_syntax in - return_ok ctxt.Evm_context.smart_rollup_address) + return_ok smart_rollup_address) -let register_get_blueprint_service (ctxt : Evm_context.t) dir = +let register_get_blueprint_service dir = Directory.opt_register1 dir get_blueprint_service (fun level () () -> let open Lwt_result_syntax in let number = Ethereum_types.Qty (Z.of_int64 level) in - let* blueprint = Store.Executable_blueprints.find ctxt.store number in + let* blueprint = Evm_context.blueprint number in return blueprint) -let register_blueprint_watcher_service ctxt dir = +let register_blueprint_watcher_service dir = Directory.gen_register0 dir blueprint_watcher_service (fun level () -> - create_blueprint_watcher_service ctxt level) + create_blueprint_watcher_service level) -let register ctxt dir = - register_get_smart_rollup_address_service ctxt dir - |> register_get_blueprint_service ctxt - |> register_blueprint_watcher_service ctxt +let register smart_rollup_address dir = + register_get_smart_rollup_address_service smart_rollup_address dir + |> register_get_blueprint_service |> register_blueprint_watcher_service let get_smart_rollup_address ~evm_node_endpoint = Tezos_rpc_http_client_unix.RPC_client_unix.call_service diff --git a/etherlink/bin_node/lib_prod/evm_services.mli b/etherlink/bin_node/lib_prod/evm_services.mli index 1fcf3090b2aca9e7e80cdca78eb3d483f1b6f0c9..ad7e45bfb764008234e3fbddfabe96450f39163c 100644 --- a/etherlink/bin_node/lib_prod/evm_services.mli +++ b/etherlink/bin_node/lib_prod/evm_services.mli @@ -14,11 +14,14 @@ val get_smart_rollup_address : val get_blueprint : evm_node_endpoint:Uri.t -> Ethereum_types.quantity -> - Blueprint_types.t tzresult Lwt.t + Blueprint_types.with_events tzresult Lwt.t -val register : Evm_context.t -> unit Directory.t -> unit Directory.t +val register : + Tezos_crypto.Hashed.Smart_rollup_address.t -> + unit Directory.t -> + unit Directory.t val monitor_blueprints : evm_node_endpoint:Uri.t -> Ethereum_types.quantity -> - Blueprint_types.t Lwt_stream.t Lwt.t + Blueprint_types.with_events Lwt_stream.t Lwt.t diff --git a/etherlink/bin_node/lib_prod/evm_state.ml b/etherlink/bin_node/lib_prod/evm_state.ml index 97811836fd9f4348aedd7f7ac5ea1aa914cc5ca7..78b530c74b69048c1f506b9da0d18d1c67fa85f2 100644 --- a/etherlink/bin_node/lib_prod/evm_state.ml +++ b/etherlink/bin_node/lib_prod/evm_state.ml @@ -32,13 +32,68 @@ module Wasm_utils = Wasm_utils.Make (Tezos_tree_encoding.Encodings_util.Make (Bare_context)) module Wasm = Wasm_debugger.Make (Wasm_utils) -let execute ?(wasm_entrypoint = Tezos_scoru_wasm.Constants.wasm_entrypoint) - ~config evm_state inbox = +let kernel_logs_directory ~data_dir = Filename.concat data_dir "kernel_logs" + +let level_prefix = function + | Events.Debug -> "[Debug]" + | Info -> "[Info]" + | Error -> "[Error]" + | Fatal -> "[Fatal]" + +let event_kernel_log ~kind ~msg = + let is_level ~level msg = + let prefix = level_prefix level in + String.remove_prefix ~prefix msg |> Option.map (fun msg -> (level, msg)) + in + let level_and_msg = + Option.either_f (is_level ~level:Debug msg) @@ fun () -> + Option.either_f (is_level ~level:Info msg) @@ fun () -> + Option.either_f (is_level ~level:Error msg) @@ fun () -> + is_level ~level:Fatal msg + in + Option.iter_s + (fun (level, msg) -> Events.event_kernel_log ~level ~kind ~msg) + level_and_msg + +let execute ?(kind = Events.Application) ~data_dir ?(log_file = "kernel_log") + ?(wasm_entrypoint = Tezos_scoru_wasm.Constants.wasm_entrypoint) ~config + evm_state inbox = let open Lwt_result_syntax in + let path = Filename.concat (kernel_logs_directory ~data_dir) log_file in let inbox = List.map (function `Input s -> s) inbox in let inbox = List.to_seq [inbox] in + let messages = ref [] in + let write_debug = + Tezos_scoru_wasm.Builtins.Printer + (fun msg -> + messages := msg :: !messages ; + event_kernel_log ~kind ~msg) + in let* evm_state, _, _, _ = - Wasm.Commands.eval ~wasm_entrypoint 0l inbox config Inbox evm_state + Wasm.Commands.eval + ~write_debug + ~wasm_entrypoint + 0l + inbox + config + Inbox + evm_state + in + (* The messages are accumulated during the execution and stored + atomatically at the end to preserve their order. *) + let*! () = + Lwt_io.with_file + ~flags:Unix.[O_WRONLY; O_CREAT; O_APPEND] + ~perm:0o644 + ~mode:Output + path + @@ fun chan -> + Lwt_io.atomic + (fun chan -> + let msgs = List.rev !messages in + let*! () = List.iter_s (Lwt_io.write chan) msgs in + Lwt_io.flush chan) + chan in return evm_state @@ -62,6 +117,13 @@ let inspect evm_state key = let* value = Wasm.Commands.find_key_in_durable evm_state key in Option.map_s Tezos_lazy_containers.Chunked_byte_vector.to_bytes value +let subkeys evm_state key = + let open Lwt_syntax in + let key = Tezos_scoru_wasm.Durable.key_of_string_exn key in + let* durable = Wasm_utils.wrap_as_durable_storage evm_state in + let durable = Tezos_scoru_wasm.Durable.of_storage_exn durable in + Tezos_scoru_wasm.Durable.list durable key + let current_block_height evm_state = let open Lwt_syntax in let* current_block_number = @@ -87,8 +149,10 @@ let current_block_hash evm_state = | Some h -> return (decode_block_hash h) | None -> return genesis_parent_hash -let execute_and_inspect ?wasm_entrypoint ~config - ~input:Simulation.Encodings.{messages; insight_requests; _} ctxt = +let execute_and_inspect ~data_dir ?wasm_entrypoint ~config + ~input: + Simulation.Encodings. + {messages; insight_requests; log_kernel_debug_file; _} ctxt = let open Lwt_result_syntax in let keys = List.map @@ -101,7 +165,16 @@ let execute_and_inspect ?wasm_entrypoint ~config in (* Messages from simulation requests are already valid inputs. *) let messages = List.map (fun s -> `Input s) messages in - let* evm_state = execute ?wasm_entrypoint ~config ctxt messages in + let* evm_state = + execute + ~kind:Simulation + ?log_file:log_kernel_debug_file + ~data_dir + ?wasm_entrypoint + ~config + ctxt + messages + in let*! values = List.map_p (fun key -> inspect evm_state key) keys in return values @@ -109,7 +182,8 @@ type apply_result = | Apply_success of t * block_height * block_hash | Apply_failure -let apply_blueprint ~config evm_state (blueprint : Blueprint_types.payload) = +let apply_blueprint ~data_dir ~config evm_state + (blueprint : Blueprint_types.payload) = let open Lwt_result_syntax in let exec_inputs = List.map @@ -119,6 +193,7 @@ let apply_blueprint ~config evm_state (blueprint : Blueprint_types.payload) = let*! (Block_height before_height) = current_block_height evm_state in let* evm_state = execute + ~data_dir ~wasm_entrypoint:Tezos_scoru_wasm.Constants.wasm_entrypoint ~config evm_state diff --git a/etherlink/bin_node/lib_prod/evm_state.mli b/etherlink/bin_node/lib_prod/evm_state.mli index c09c58564e14e69556ff68c18e34eaebe8b63652..cb2dc6c4504e994f18006a6403107fc5ca0a9e70 100644 --- a/etherlink/bin_node/lib_prod/evm_state.mli +++ b/etherlink/bin_node/lib_prod/evm_state.mli @@ -7,10 +7,22 @@ type t = Irmin_context.PVMState.value -(** [execute ~wasm_entrypoint ~config evm_state messages] executes the - [wasm_entrypoint] function (default to [kernel_run]) with [messages] within - the inbox of [evm_state]. *) +(** Directory where the kernel logs are stored. The function {!execute} below + expect the directory to exist.*) +val kernel_logs_directory : data_dir:string -> string + +(** [execute ?simulation ~data_dir ?log_file ~wasm_entrypoint ~config + evm_state messages] executes the [wasm_entrypoint] function + (default to [kernel_run]) with [messages] within the inbox of + [evm_state]. + + Kernel logs are stored under the {!kernel_logs_directory} in [log_file]. + [simulation] adds a prefix to the event to differenciate the logs. +*) val execute : + ?kind:Events.kernel_log_kind -> + data_dir:string -> + ?log_file:string -> ?wasm_entrypoint:string -> config:Config.config -> t -> @@ -28,10 +40,16 @@ val modify : key:string -> value:string -> t -> t Lwt.t [evm_state], if any. *) val inspect : t -> string -> bytes option Lwt.t -(** [execute_and_inspect ?wasm_entrypoint ~config ~input evm_state] executes - the [wasm_entrypoint] function (default to [kernel_run]) with [input] - within the inbox of [evm_state], and returns [input.insights_requests]. *) +(** [subkeys evm_state key] returns the list of value stored under [key] in + [evm_state]. *) +val subkeys : t -> string -> string trace Lwt.t + +(** [execute_and_inspect ~data_dir ?wasm_entrypoint ~config ~input + evm_state] executes the [wasm_entrypoint] function (default to + [kernel_run]) with [input] within the inbox of [evm_state], and + returns [input.insights_requests]. *) val execute_and_inspect : + data_dir:string -> ?wasm_entrypoint:string -> config:Config.config -> input:Simulation.Encodings.simulate_input -> @@ -49,10 +67,16 @@ type apply_result = | Apply_success of t * Ethereum_types.block_height * Ethereum_types.block_hash | Apply_failure -(** [apply_blueprint ~config state payload] applies the blueprint [payload] on - top of [evm_state]. If the payload produces a block, the new updated EVM - state is returned along with the new block’s height. *) +(** [apply_blueprint ~data-dir ~config state payload] applies the + blueprint [payload] on top of [evm_state]. If the payload produces + a block, the new updated EVM state is returned along with the new + block’s height. + + The [data-dir] is used to store the kernel logs in the + {!kernel_logs_directory}. +*) val apply_blueprint : + data_dir:string -> config:Config.config -> t -> Blueprint_types.payload -> diff --git a/etherlink/bin_node/lib_prod/store.ml b/etherlink/bin_node/lib_prod/evm_store.ml similarity index 81% rename from etherlink/bin_node/lib_prod/store.ml rename to etherlink/bin_node/lib_prod/evm_store.ml index 304eaa1863b1b0b1b1e80969cac0f2bd87af02b4..6e5260fe456563516c1b5dc87b27118d29072b85 100644 --- a/etherlink/bin_node/lib_prod/store.ml +++ b/etherlink/bin_node/lib_prod/evm_store.ml @@ -19,6 +19,9 @@ module Db = struct let start (module Db : Caqti_lwt.CONNECTION) = caqti @@ Db.start () + let collect_list (module Db : Caqti_lwt.CONNECTION) req arg = + caqti @@ Db.collect_list req arg + let commit (module Db : Caqti_lwt.CONNECTION) = caqti @@ Db.commit () let rollback (module Db : Caqti_lwt.CONNECTION) = caqti @@ Db.rollback () @@ -99,6 +102,20 @@ module Q = struct Ok Ethereum_types.Upgrade.{hash; timestamp}) (t2 root_hash timestamp) + let delayed_transaction = + custom + ~encode:(fun payload -> + Ok + (Data_encoding.Binary.to_string_exn + Ethereum_types.Delayed_transaction.encoding + payload)) + ~decode:(fun bytes -> + Option.to_result ~none:"Not a valid blueprint payload" + @@ Data_encoding.Binary.of_string_opt + Ethereum_types.Delayed_transaction.encoding + bytes) + string + let table_exists = (string ->! bool) @@ {| @@ -248,28 +265,60 @@ module Q = struct ] end + module V5 = struct + let name = "create_blueprints_table" + + let up = + [ + migration_step + {| + CREATE TABLE blueprints ( + id SERIAL PRIMARY KEY, + payload BLOB NOT NULL, + timestamp DATETIME NOT NULL + )|}; + migration_step {| + DROP TABLE executable_blueprints +|}; + migration_step {| + DROP TABLE publishable_blueprints +|}; + migration_step + {| + CREATE TABLE delayed_transactions ( + injected_before INT NOT NULL, + hash TEXT NOT NULL, + payload TEXT NOT NULL + ) + |}; + ] + end + let all : migration list = - [(module V0); (module V1); (module V2); (module V3); (module V4)] + [ + (module V0); + (module V1); + (module V2); + (module V3); + (module V4); + (module V5); + ] end - module Executable_blueprints = struct + module Blueprints = struct let insert = (t3 level timestamp payload ->. unit) - @@ {eos|INSERT INTO executable_blueprints (id, timestamp, payload) VALUES (?, ?, ?)|eos} + @@ {eos|INSERT INTO blueprints (id, timestamp, payload) VALUES (?, ?, ?)|eos} let select = (level ->? t2 payload timestamp) - @@ {eos|SELECT payload, timestamp FROM executable_blueprints WHERE id = ?|eos} - end - - module Publishable_blueprints = struct - let insert = - (t2 level payload ->. unit) - @@ {eos|INSERT INTO publishable_blueprints (id, payload) VALUES (?, ?)|eos} + @@ {eos|SELECT payload, timestamp FROM blueprints WHERE id = ?|eos} - let select = - (level ->? payload) - @@ {eos|SELECT payload FROM publishable_blueprints WHERE id = ?|eos} + let select_range = + (t2 level level ->* t2 level payload) + @@ {|SELECT id, payload FROM blueprints + WHERE ? <= id AND id <= ? + ORDER BY id ASC|} end module Context_hashes = struct @@ -306,6 +355,20 @@ module Q = struct |} end + module Delayed_transactions = struct + let insert = + (t3 level root_hash delayed_transaction ->. unit) + @@ {|INSERT INTO delayed_transactions (injected_before, hash, payload) VALUES (?, ?, ?)|} + + let select_at_level = + (level ->* delayed_transaction) + @@ {|SELECT payload FROM delayed_transactions WHERE ? = injected_before|} + + let select_at_hash = + (root_hash ->? delayed_transaction) + @@ {|SELECT payload FROM delayed_transactions WHERE ? = hash|} + end + module L1_latest_level = struct let insert = (l1_level ->. unit) @@ -370,7 +433,7 @@ module Migrations = struct if applied <= known then return (List.drop_n applied all_migrations) else let*! () = - Store_events.migrations_from_the_future ~applied ~known:0 + Evm_store_events.migrations_from_the_future ~applied ~known:0 in failwith "Cannot use a store modified by a more up-to-date version of the \ @@ -382,20 +445,6 @@ module Migrations = struct with_connection store @@ fun conn -> let* () = List.iter_es (fun up -> Db.exec conn up ()) M.up in Db.exec conn Q.Migrations.register_migration (id, M.name) - - let assume_old_store store = - let open Lwt_result_syntax in - with_connection store @@ fun conn -> - let* () = Db.exec conn Q.Migrations.create_table () in - Db.exec conn Q.Migrations.register_migration (0, Q.Migrations.V0.name) - - let check_V0 store = - let open Lwt_result_syntax in - with_connection store @@ fun conn -> - let* publishable = Db.find conn Q.table_exists "publishable_blueprints" in - let* executable = Db.find conn Q.table_exists "executable_blueprints" in - let* context_hashes = Db.find conn Q.table_exists "context_hashes" in - return (publishable && executable && context_hashes) end let init ~data_dir = @@ -409,24 +458,13 @@ let init ~data_dir = let* () = if not exists then let* () = Migrations.create_table store in - let*! () = Store_events.init_store () in + let*! () = Evm_store_events.init_store () in return_unit else let* table_exists = Migrations.table_exists store in let* () = when_ (not table_exists) (fun () -> - (* The database already exists, but the migrations table does not. - This probably means it was created before the introduction of - the migration system. We check that the three initial tables are - there and if so, we assume the first migration is applied moving - forward, with a warning. *) - let* old_db = Migrations.check_V0 store in - if old_db then - let* () = Migrations.assume_old_store store in - let*! () = Store_events.assume_old_store () in - return_unit - else - failwith "A store already exists, but its content is incorrect.") + failwith "A store already exists, but its content is incorrect.") in return_unit in @@ -435,7 +473,7 @@ let init ~data_dir = List.iter_es (fun (i, ((module M : Q.MIGRATION) as mig)) -> let* () = Migrations.apply_migration store i mig in - let*! () = Store_events.applied_migration M.name in + let*! () = Evm_store_events.applied_migration M.name in return_unit) migrations in @@ -443,32 +481,26 @@ let init ~data_dir = in return store -module Executable_blueprints = struct +module Blueprints = struct let store store (blueprint : Blueprint_types.t) = with_connection store @@ fun conn -> Db.exec conn - Q.Executable_blueprints.insert + Q.Blueprints.insert (blueprint.number, blueprint.timestamp, blueprint.payload) let find store number = let open Lwt_result_syntax in with_connection store @@ fun conn -> - let+ opt = Db.find_opt conn Q.Executable_blueprints.select number in + let+ opt = Db.find_opt conn Q.Blueprints.select number in match opt with | Some (payload, timestamp) -> Some Blueprint_types.{payload; timestamp; number} | None -> None -end -module Publishable_blueprints = struct - let store store number blueprint = + let find_range store ~from ~to_ = with_connection store @@ fun conn -> - Db.exec conn Q.Publishable_blueprints.insert (number, blueprint) - - let find store number = - with_connection store @@ fun conn -> - Db.find_opt conn Q.Publishable_blueprints.select number + Db.collect_list conn Q.Blueprints.select_range (from, to_) end module Context_hashes = struct @@ -502,6 +534,24 @@ module Kernel_upgrades = struct Db.exec conn Q.Kernel_upgrades.record_apply level end +module Delayed_transactions = struct + let store store next_blueprint_number + (delayed_transaction : Ethereum_types.Delayed_transaction.t) = + with_connection store @@ fun conn -> + Db.exec + conn + Q.Delayed_transactions.insert + (next_blueprint_number, delayed_transaction.hash, delayed_transaction) + + let at_level store blueprint_number = + with_connection store @@ fun conn -> + Db.collect_list conn Q.Delayed_transactions.select_at_level blueprint_number + + let at_hash store hash = + with_connection store @@ fun conn -> + Db.find_opt conn Q.Delayed_transactions.select_at_hash hash +end + module L1_latest_known_level = struct let store store level = with_connection store @@ fun conn -> diff --git a/etherlink/bin_node/lib_prod/store.mli b/etherlink/bin_node/lib_prod/evm_store.mli similarity index 83% rename from etherlink/bin_node/lib_prod/store.mli rename to etherlink/bin_node/lib_prod/evm_store.mli index 9da69fc8b81b7c5307955d38d754bc69ec4c64f5..a90f8a483b23600698dfe7c956803babbe453daf 100644 --- a/etherlink/bin_node/lib_prod/store.mli +++ b/etherlink/bin_node/lib_prod/evm_store.mli @@ -25,24 +25,17 @@ val with_transaction : t -> (t -> 'a tzresult Lwt.t) -> 'a tzresult Lwt.t @raise Assert_failure *) val assert_in_transaction : t -> unit -module Executable_blueprints : sig +module Blueprints : sig val store : t -> Blueprint_types.t -> unit tzresult Lwt.t val find : t -> Ethereum_types.quantity -> Blueprint_types.t option tzresult Lwt.t -end - -module Publishable_blueprints : sig - val store : - t -> - Ethereum_types.quantity -> - Blueprint_types.payload -> - unit tzresult Lwt.t - val find : + val find_range : t -> - Ethereum_types.quantity -> - Blueprint_types.payload option tzresult Lwt.t + from:Ethereum_types.quantity -> + to_:Ethereum_types.quantity -> + (Ethereum_types.quantity * Blueprint_types.payload) list tzresult Lwt.t end module Context_hashes : sig @@ -68,6 +61,24 @@ module Kernel_upgrades : sig val record_apply : t -> Ethereum_types.quantity -> unit tzresult Lwt.t end +module Delayed_transactions : sig + val store : + t -> + Ethereum_types.quantity -> + Ethereum_types.Delayed_transaction.t -> + unit tzresult Lwt.t + + val at_level : + t -> + Ethereum_types.quantity -> + Ethereum_types.Delayed_transaction.t list tzresult Lwt.t + + val at_hash : + t -> + Ethereum_types.hash -> + Ethereum_types.Delayed_transaction.t option tzresult Lwt.t +end + module L1_latest_known_level : sig val store : t -> int32 -> unit tzresult Lwt.t diff --git a/etherlink/bin_node/lib_prod/store_events.ml b/etherlink/bin_node/lib_prod/evm_store_events.ml similarity index 77% rename from etherlink/bin_node/lib_prod/store_events.ml rename to etherlink/bin_node/lib_prod/evm_store_events.ml index 82687a10de02ea704467781581267f7fbd1e8273..94acf2cc96dbf30f1093caa20b5a988e97b36880 100644 --- a/etherlink/bin_node/lib_prod/store_events.ml +++ b/etherlink/bin_node/lib_prod/evm_store_events.ml @@ -13,20 +13,10 @@ let init_store = declare_0 ~section ~name:"store_init" - ~msg:"Store is being initialized for the first time" + ~msg:"Evm_store is being initialized for the first time" ~level:Notice () -let assume_old_store = - declare_0 - ~section - ~name:"store_assume_old" - ~msg: - "A store already exists, provides the tables created by V0, but is \ - missing the migrations table. We assume it is correct." - ~level:Warning - () - let applied_migration = declare_1 ~section @@ -40,8 +30,8 @@ let migrations_from_the_future = ~section ~name:"migrations_from_the_future" ~msg: - "Store has {applied} migrations applied but the EVM node is only aware \ - of {known}" + "Evm_store has {applied} migrations applied but the EVM node is only \ + aware of {known}" ~level:Error ("applied", Data_encoding.int31) ("known", Data_encoding.int31) @@ -60,8 +50,6 @@ let init_store () = emit init_store () let applied_migration name = emit applied_migration name -let assume_old_store () = emit assume_old_store () - let migrations_from_the_future ~applied ~known = emit migrations_from_the_future (applied, known) diff --git a/etherlink/bin_node/lib_prod/store_events.mli b/etherlink/bin_node/lib_prod/evm_store_events.mli similarity index 81% rename from etherlink/bin_node/lib_prod/store_events.mli rename to etherlink/bin_node/lib_prod/evm_store_events.mli index e1b44fdd56f8289b29978220c9f185a080c75a7b..0aefe815fbfcf001816a997c381b0ec4f658d898 100644 --- a/etherlink/bin_node/lib_prod/store_events.mli +++ b/etherlink/bin_node/lib_prod/evm_store_events.mli @@ -13,12 +13,6 @@ val init_store : unit -> unit Lwt.t on a store that was previously missing it. *) val applied_migration : string -> unit Lwt.t -(** [assume_old_store ()] advertizes that the EVM node is promoting its store - to be compatible with the migrations table. It does that when its store is - missing the migrations table, but has the tables expected from the first - migration. *) -val assume_old_store : unit -> unit Lwt.t - (** [migration_from_the_future ~applied ~known] advertizes that there is more migrations applied to the store than known from the EVM node, which suggests the EVM node is outdated. *) diff --git a/etherlink/bin_node/lib_prod/helpers.ml b/etherlink/bin_node/lib_prod/helpers.ml index 6939b723f12c24cb728e4298b7f48bfd2e391c44..cafb0c4a4a8404a0db57e6596f2c91a87b2d298f 100644 --- a/etherlink/bin_node/lib_prod/helpers.ml +++ b/etherlink/bin_node/lib_prod/helpers.ml @@ -21,3 +21,11 @@ let with_timing event k = let* () = event diff in return res + +let unwrap_error_monad f = + let open Lwt_syntax in + let* res = f () in + match res with + | Ok v -> return v + | Error errs -> + Lwt.fail_with (Format.asprintf "%a" Error_monad.pp_print_trace errs) diff --git a/etherlink/bin_node/lib_prod/helpers.mli b/etherlink/bin_node/lib_prod/helpers.mli index 57776f4fa20acff20ac3b00f379aef89d76e6f7c..fd3394a7b2aada3a3f66fda8f051c19a7e8b014c 100644 --- a/etherlink/bin_node/lib_prod/helpers.mli +++ b/etherlink/bin_node/lib_prod/helpers.mli @@ -11,3 +11,7 @@ val now : unit -> Time.Protocol.t (** [with_timing event k] computes how much time [k ()] takes to be computed and advertises it with [event]. *) val with_timing : (Ptime.span -> unit Lwt.t) -> (unit -> 'a Lwt.t) -> 'a Lwt.t + +(** [unwrap_error_monad f] execute f and fails with a Failure when the + error monad returns an error. *) +val unwrap_error_monad : (unit -> 'a tzresult Lwt.t) -> 'a Lwt.t diff --git a/etherlink/bin_node/lib_prod/observer.ml b/etherlink/bin_node/lib_prod/observer.ml index 18d98067b18d5a2f6b5d3da1b6e1bb8f41e0483a..3f1926ab48a501e71992f67dca76d524c497291a 100644 --- a/etherlink/bin_node/lib_prod/observer.ml +++ b/etherlink/bin_node/lib_prod/observer.ml @@ -8,27 +8,20 @@ open Ethereum_types module MakeBackend (Ctxt : sig - val ctxt : Evm_context.t - val evm_node_endpoint : Uri.t + + val smart_rollup_address : Tezos_crypto.Hashed.Smart_rollup_address.t end) : Services_backend_sig.Backend = struct module Reader = struct - let read path = - let open Lwt_result_syntax in - let*! res = Evm_context.inspect Ctxt.ctxt path in - return res + let read path = Evm_context.inspect path end module TxEncoder = struct - type transactions = { - raw : string list; - delayed : Ethereum_types.Delayed_transaction.t list; - } + type transactions = string list type messages = string list - let encode_transactions ~smart_rollup_address:_ - ~(transactions : transactions) = + let encode_transactions ~smart_rollup_address:_ ~transactions = let open Result_syntax in let hashes = List.map @@ -36,9 +29,9 @@ end) : Services_backend_sig.Backend = struct let tx_hash_str = Ethereum_types.hash_raw_tx transaction in Ethereum_types.( Hash Hex.(of_string tx_hash_str |> show |> hex_of_string))) - transactions.raw + transactions in - return (hashes, transactions.raw) + return (hashes, transactions) end module Publisher = struct @@ -96,49 +89,181 @@ end) : Services_backend_sig.Backend = struct module SimulatorBackend = struct let simulate_and_read ~input = let open Lwt_result_syntax in - let* raw_insights = Evm_context.execute_and_inspect Ctxt.ctxt ~input in + let* raw_insights = Evm_context.execute_and_inspect input in match raw_insights with | [Some bytes] -> return bytes | _ -> Error_monad.failwith "Invalid insights format" end let smart_rollup_address = - Tezos_crypto.Hashed.Smart_rollup_address.to_string - Ctxt.ctxt.smart_rollup_address + Tezos_crypto.Hashed.Smart_rollup_address.to_string Ctxt.smart_rollup_address end -let on_new_blueprint (ctxt : Evm_context.t) (blueprint : Blueprint_types.t) = +let on_new_blueprint next_blueprint_number + ({delayed_transactions; blueprint} : Blueprint_types.with_events) = + let open Lwt_result_syntax in let (Qty level) = blueprint.number in - let (Qty number) = ctxt.session.next_blueprint_number in + let (Qty number) = next_blueprint_number in if Z.(equal level number) then - Evm_context.apply_blueprint ctxt blueprint.timestamp blueprint.payload + let events = + List.map + (fun delayed_transaction -> + Ethereum_types.Evm_events.New_delayed_transaction delayed_transaction) + delayed_transactions + in + let* () = Evm_context.apply_evm_events events in + let delayed_transactions = + List.map + (fun Ethereum_types.Delayed_transaction.{hash; _} -> hash) + delayed_transactions + in + Evm_context.apply_blueprint + blueprint.timestamp + blueprint.payload + delayed_transactions else failwith "Received a blueprint with an unexpected number." -let main (ctxt : Evm_context.t) ~evm_node_endpoint = +module Make (Ctxt : sig + val evm_node_endpoint : Uri.t + + val smart_rollup_address : Tezos_crypto.Hashed.Smart_rollup_address.t +end) : Services_backend_sig.S = + Services_backend_sig.Make (MakeBackend (Ctxt)) + +let callback_log server conn req body = + let open Cohttp in + let open Lwt_syntax in + let uri = req |> Request.uri |> Uri.to_string in + let meth = req |> Request.meth |> Code.string_of_method in + let* body_str = body |> Cohttp_lwt.Body.to_string in + let* () = Events.callback_log ~uri ~meth ~body:body_str in + Tezos_rpc_http_server.RPC_server.resto_callback + server + conn + req + (Cohttp_lwt.Body.of_string body_str) + +let observer_start + ({rpc_addr; rpc_port; cors_origins; cors_headers; max_active_connections; _} : + Configuration.observer Configuration.t) ~directory = let open Lwt_result_syntax in - let rec loop stream = + let open Tezos_rpc_http_server in + let p2p_addr = P2p_addr.of_string_exn rpc_addr in + let host = Ipaddr.V6.to_string p2p_addr in + let node = `TCP (`Port rpc_port) in + let acl = RPC_server.Acl.allow_all in + let cors = + Resto_cohttp.Cors. + {allowed_headers = cors_headers; allowed_origins = cors_origins} + in + let server = + RPC_server.init_server + ~acl + ~cors + ~media_types:Media_type.all_media_types + directory + in + let*! () = + RPC_server.launch + ~max_active_connections + ~host + server + ~callback:(callback_log server) + node + in + let*! () = Events.is_ready ~rpc_addr ~rpc_port in + return server + +let install_finalizer_observer server = + let open Lwt_syntax in + Lwt_exit.register_clean_up_callback ~loc:__LOC__ @@ fun exit_status -> + let* () = Events.shutdown_node ~exit_status in + let* () = Tezos_rpc_http_server.RPC_server.shutdown server in + let* () = Events.shutdown_rpc_server ~private_:false in + Helpers.unwrap_error_monad @@ fun () -> + let open Lwt_result_syntax in + let* () = Tx_pool.shutdown () in + let* () = Evm_events_follower.shutdown () in + Evm_context.shutdown () + +let main_loop ~evm_node_endpoint = + let open Lwt_result_syntax in + let rec loop (Qty next_blueprint_number) stream = let*! candidate = Lwt_stream.get stream in match candidate with | Some blueprint -> - let* () = on_new_blueprint ctxt blueprint in + let* () = on_new_blueprint (Qty next_blueprint_number) blueprint in let* _ = Tx_pool.pop_and_inject_transactions () in - loop stream + loop (Qty (Z.succ next_blueprint_number)) stream | None -> return_unit in + let*! head = Evm_context.head_info () in + (* TODO: https://gitlab.com/tezos/tezos/-/issues/6876 Should be resilient to errors from the EVM node endpoint *) let*! blueprints_stream = Evm_services.monitor_blueprints ~evm_node_endpoint - ctxt.session.next_blueprint_number + head.next_blueprint_number in - loop blueprints_stream + loop head.next_blueprint_number blueprints_stream -module Make (Ctxt : sig - val ctxt : Evm_context.t +let main ?kernel_path ~rollup_node_endpoint ~evm_node_endpoint ~data_dir + ~(config : Configuration.observer Configuration.t) () = + let open Lwt_result_syntax in + let* smart_rollup_address = + Evm_services.get_smart_rollup_address ~evm_node_endpoint + in - val evm_node_endpoint : Uri.t -end) : Services_backend_sig.S = - Services_backend_sig.Make (MakeBackend (Ctxt)) + let* _loaded = + Evm_context.start + ~data_dir + ?kernel_path + ~preimages:config.mode.preimages + ~preimages_endpoint:config.mode.preimages_endpoint + ~smart_rollup_address: + (Tezos_crypto.Hashed.Smart_rollup_address.to_string + smart_rollup_address) + () + in + + let observer_backend = + (module Make (struct + let smart_rollup_address = smart_rollup_address + + let evm_node_endpoint = evm_node_endpoint + end) : Services_backend_sig.S) + in + + let* () = + Tx_pool.start + { + rollup_node = observer_backend; + smart_rollup_address = + Tezos_crypto.Hashed.Smart_rollup_address.to_b58check + smart_rollup_address; + mode = Observer; + } + in + + let directory = + Services.directory config (observer_backend, smart_rollup_address) + in + let directory = directory |> Evm_services.register smart_rollup_address in + + let* server = observer_start config ~directory in + + let (_ : Lwt_exit.clean_up_callback_id) = install_finalizer_observer server in + let* () = + Evm_events_follower.start + { + rollup_node_endpoint; + filter_event = + (function New_delayed_transaction _ -> false | _ -> true); + } + in + let () = Rollup_node_follower.start ~proxy:false ~rollup_node_endpoint in + + main_loop ~evm_node_endpoint diff --git a/etherlink/bin_node/lib_prod/observer.mli b/etherlink/bin_node/lib_prod/observer.mli index 190f5e98422c41981b64342b8b85ff1bff6eb5d9..df46a8d47c8cecb8719060cf8b8b2c938c77e0a1 100644 --- a/etherlink/bin_node/lib_prod/observer.mli +++ b/etherlink/bin_node/lib_prod/observer.mli @@ -5,12 +5,20 @@ (* *) (*****************************************************************************) -(** [main ctxt ~evm_node_endpoint] starts the main event-loop of the Observer, - consuming the blueprints received from [evm_node_endpoint]. *) -val main : Evm_context.t -> evm_node_endpoint:Uri.t -> unit tzresult Lwt.t - module Make (Ctxt : sig - val ctxt : Evm_context.t - val evm_node_endpoint : Uri.t + + val smart_rollup_address : Tezos_crypto.Hashed.Smart_rollup_address.t end) : Services_backend_sig.S + +(** [main ?kernel_path ~rollup_node_endpoint ~evm_node_endpoint + ~data_dir ~config] starts the main event-loop of the Observer, + consuming the blueprints received from [evm_node_endpoint]. *) +val main : + ?kernel_path:string -> + rollup_node_endpoint:Uri.t -> + evm_node_endpoint:Uri.t -> + data_dir:string -> + config:Configuration.observer Configuration.t -> + unit -> + unit tzresult Lwt.t diff --git a/etherlink/bin_node/lib_prod/publisher.ml b/etherlink/bin_node/lib_prod/publisher.ml index f3e6e1f3bc9ae1a34607110a7d7d140bb220c88e..4bcb43c94766f9b292fca786054bf7d33e786552 100644 --- a/etherlink/bin_node/lib_prod/publisher.ml +++ b/etherlink/bin_node/lib_prod/publisher.ml @@ -9,7 +9,7 @@ open Ethereum_types module type TxEncoder = sig (* Transactions to be encoded *) - type transactions = {raw : string list; delayed : Delayed_transaction.t list} + type transactions = string list (* Encoded messages to be injected *) type messages @@ -34,13 +34,10 @@ module Make (TxEncoder : TxEncoder) (Publisher : Publisher with type messages = TxEncoder.messages) = struct - let inject_raw_transactions ~timestamp ~smart_rollup_address ~transactions - ~delayed = + let inject_raw_transactions ~timestamp ~smart_rollup_address ~transactions = let open Lwt_result_syntax in let*? tx_hashes, to_publish = - TxEncoder.encode_transactions - ~smart_rollup_address - ~transactions:{raw = transactions; delayed} + TxEncoder.encode_transactions ~smart_rollup_address ~transactions in let* () = Publisher.publish_messages diff --git a/etherlink/bin_node/lib_prod/rollup_node.ml b/etherlink/bin_node/lib_prod/rollup_node.ml index 38916db7938e0800ef08eab72078a1eb7d4ddd29..796db591997a575226bc1e89c841a3ba281d3a26 100644 --- a/etherlink/bin_node/lib_prod/rollup_node.ml +++ b/etherlink/bin_node/lib_prod/rollup_node.ml @@ -45,15 +45,11 @@ end) : Services_backend_sig.Backend = struct end module TxEncoder = struct - type transactions = { - raw : string list; - delayed : Ethereum_types.Delayed_transaction.t list; - } + type transactions = string list type messages = string list - let encode_transactions ~smart_rollup_address - ~transactions:({raw = transactions; _} : transactions) = + let encode_transactions ~smart_rollup_address ~transactions = let open Result_syntax in let* rev_hashes, messages = List.fold_left_e diff --git a/etherlink/bin_node/lib_prod/rollup_node_follower.ml b/etherlink/bin_node/lib_prod/rollup_node_follower.ml index 39e639326e2d59f94dba5f527c5d52b5dd464823..bf26de51c1085980837217c66a3d8e39cd8e7dbf 100644 --- a/etherlink/bin_node/lib_prod/rollup_node_follower.ml +++ b/etherlink/bin_node/lib_prod/rollup_node_follower.ml @@ -5,86 +5,6 @@ (* *) (*****************************************************************************) -type parameters = {rollup_node_endpoint : Uri.t} - -module Types = struct - type state = Uri.t - - type nonrec parameters = parameters -end - -module Name = struct - (* We only have a single rollup node follower in the evm node *) - type t = unit - - let encoding = Data_encoding.unit - - let base = ["evm_node"; "prod"; "l2_block"; "follower"; "worker"] - - let pp _ _ = () - - let equal () () = true -end - -module Request = struct - type ('a, 'b) t = Unit : (unit, tztrace) t - - type view = View : _ t -> view - - let view (req : _ t) = View req - - let encoding : view Data_encoding.t = - let open Data_encoding in - conv (fun (View _) -> ()) (fun () -> View Unit) unit - - let pp _ppf (View _) = () -end - -module Worker = Worker.MakeSingle (Name) (Request) (Types) - -type worker = Worker.infinite Worker.queue Worker.t - -module Handlers = struct - type self = worker - - let on_request : - type r request_error. - worker -> (r, request_error) Request.t -> (r, request_error) result Lwt.t - = - fun _w request -> - match request with - | Request.Unit -> protect @@ fun () -> Lwt_result_syntax.return_unit - - type launch_error = error trace - - let on_launch _w () ({rollup_node_endpoint; _} : Types.parameters) = - let state = rollup_node_endpoint in - Lwt_result_syntax.return state - - let on_error (type a b) _w _st (_r : (a, b) Request.t) (_errs : b) : - unit tzresult Lwt.t = - Lwt_result_syntax.return_unit - - let on_completion _ _ _ _ = Lwt.return_unit - - let on_no_request _ = Lwt.return_unit - - let on_close _ = Lwt.return_unit -end - -let table = Worker.create_table Queue - -let worker_promise, worker_waker = Lwt.task () - -type error += No_l2_block_follower - -let worker = - lazy - (match Lwt.state worker_promise with - | Lwt.Return worker -> Ok worker - | Lwt.Fail e -> Error (TzTrace.make @@ error_of_exn e) - | Lwt.Sleep -> Error (TzTrace.make No_l2_block_follower)) - let read_from_rollup_node path level rollup_node_endpoint = let open Rollup_services in call_service @@ -95,7 +15,7 @@ let read_from_rollup_node path level rollup_node_endpoint = () let advertize_blueprints_publisher rollup_node_endpoint finalized_level = - let open Lwt_syntax in + let open Lwt_result_syntax in let* finalized_current_number = read_from_rollup_node Durable_storage_path.Block.current_number @@ -103,57 +23,213 @@ let advertize_blueprints_publisher rollup_node_endpoint finalized_level = rollup_node_endpoint in match finalized_current_number with - | Ok (Some bytes) -> + | Some bytes -> let (Qty evm_block_number) = Ethereum_types.decode_number bytes in - let* _ = Blueprints_publisher.new_l2_head evm_block_number in + let* () = Blueprints_publisher.new_l2_head evm_block_number in return_unit - | _ -> return_unit + | None -> return_unit -let process_new_block ~rollup_node_endpoint block = - let open Lwt_syntax in - let finalized_level = Sc_rollup_block.(Int32.(sub block.header.level 2l)) in - let* _ = Delayed_inbox.new_rollup_block finalized_level in - let* _ = Evm_events_follower.new_rollup_block finalized_level in +let process_new_block ~rollup_node_endpoint ~finalized_level = + let open Lwt_result_syntax in + let* () = Evm_events_follower.new_rollup_block finalized_level in let* () = advertize_blueprints_publisher rollup_node_endpoint finalized_level in - return_unit + Tx_pool.pop_and_inject_transactions_lazy () + +type error += Connection_lost | Connection_timeout -let rec process_rollup_node_stream ~stream ~rollup_node_endpoint worker = +(** [process_finalized_level ~oldest_rollup_node_known_l1_level + ~finalized_level ~rollup_node_endpoint] process the rollup node + block level [finalized_level] iff it's known by the rollup node + (i.e. superior to [oldest_rollup_node_known_l1_level]. + + This is necessary for the very beginning of the rollup life, when + the evm node is started at the same moment at the origination of + the rollup, and so `finalized_level` is < origination level. *) +let process_finalized_level ~oldest_rollup_node_known_l1_level ~finalized_level + ~rollup_node_endpoint = + let open Lwt_result_syntax in + if oldest_rollup_node_known_l1_level <= finalized_level then + process_new_block ~finalized_level ~rollup_node_endpoint + else return_unit + +let reconnection_delay = 5.0 + +let min_timeout = 10. + +let timeout_factor = 10. + +type rollup_node_connection = { + close : unit -> unit; (** stream closing function *) + stream : Sc_rollup_block.t Lwt_stream.t; + (** current rollup node block stream *) + rollup_node_endpoint : Uri.t; (** endpoint used to reconnect to the node *) + timeout : Float.t; + (** expected time to receive a l2 block from the rollup node. is + recalculated at each received block. *) +} + +(** [timeout] is updated to reflect reality of how long we should with + the next block or assume the connection is failing/hanging. *) +let update_timeout ~elapsed ~connection = + let new_timeout = elapsed *. timeout_factor in + if new_timeout < min_timeout then connection + else {connection with timeout = new_timeout} + +let sleep_before_reconnection ~factor = let open Lwt_syntax in - let* new_head = Lwt_stream.get stream in - match new_head with - | None -> - let* () = Rollup_node_follower_events.connection_lost () in - Worker.shutdown worker - | Some block -> + if factor = 0 then return_unit + else + (* randomised the sleep time to not DoS the rollup node if + multiple evm node are connected to the same rollup node *) + let fcount = float_of_int (factor - 1) in + (* Randomized exponential backoff capped to 1.5h: 1.5^count * delay ± 50% *) + let delay = reconnection_delay *. (1.5 ** fcount) in + let delay = min delay 3600. in + let randomization_factor = 0.5 (* 50% *) in + let delay = + delay + +. Random.float (delay *. 2. *. randomization_factor) + -. (delay *. randomization_factor) + in + let* () = Rollup_node_follower_events.trying_reconnection delay in + Lwt_unix.sleep delay + +(**[connect_to_stream ?count ~rollup_node_endpoint ()] try to connect + to the stream of rollup node block. If [count] is superior to [0] + then sleep some time with [sleep_before_reconnection] before trying + to reconnect. + + [count] is the number of time we tried to reconnect in a row. *) +let[@tailrec] rec connect_to_stream ?(count = 0) ~rollup_node_endpoint () = + let open Lwt_result_syntax in + let*! () = sleep_before_reconnection ~factor:count in + let*! res = Rollup_services.make_streamed_call ~rollup_node_endpoint in + match res with + | Ok (stream, close) -> + let*! () = Rollup_node_follower_events.connection_acquired () in + return {close; stream; rollup_node_endpoint; timeout = 300.} + | Error _e -> + (connect_to_stream [@tailcall]) + ~count:(count + 1) + ~rollup_node_endpoint + () + +(** [catchup_evm_event ~rollup_node_endpoint ~from ~to_] catchup on + evm events from [from] to [to_] from the rollup node. *) +let[@tailrec] rec catchup_evm_event ~rollup_node_endpoint ~from ~to_ = + let open Lwt_result_syntax in + if from = to_ then (*we are catch up *) return_unit + else if from > to_ then + failwith + "Internal error: The catchup of evm_event went too far, it should be \ + impossible." + else + (* reading event from [from] level then catching up from [from + + 1]. *) + let next_l1_level = Int32.succ from in + let* () = Evm_events_follower.new_rollup_block next_l1_level in + let* () = Evm_context.new_last_known_l1_level next_l1_level in + catchup_evm_event ~rollup_node_endpoint ~from:next_l1_level ~to_ + +(** [catchup_and_next_block ~proxy ~catchup_event ~connection] + returns the next block found in [connection.stream]. + + - If the connection drops then it tries to reconnect the stream + using [connect_to_stream]. + + - If the connection timeout (takes more than [connection.timeout]) + or if the connection fails then reconnect with [connect_to_stream] + and try to fetch [catchup_and_next_block] with that new stream.*) +let[@tailrec] rec catchup_and_next_block ~proxy ~catchup_event ~connection = + let open Lwt_result_syntax in + let get_promise () = + let*! res = Lwt_stream.get connection.stream in + match res with None -> tzfail Connection_lost | Some block -> return block + in + let timeout_promise timeout = + let*! () = Lwt_unix.sleep timeout in + tzfail Connection_timeout + in + let*! get_or_timeout = + Lwt.pick [get_promise (); timeout_promise connection.timeout] + in + match get_or_timeout with + | Ok block -> let* () = - Rollup_node_follower_events.new_block - Sc_rollup_block.(block.header.level) + if catchup_event then + let* latest_known_l1_level = Evm_context.last_known_l1_level () in + match latest_known_l1_level with + | None -> + (* sequencer has no value to start from, it must be the + initial start. *) + let*! () = Evm_store_events.no_l1_latest_level_to_catch_up () in + return_unit + | Some from -> + let to_ = Sc_rollup_block.(Int32.(sub block.header.level 2l)) in + catchup_evm_event + ~rollup_node_endpoint:connection.rollup_node_endpoint + ~from + ~to_ + else return_unit in - let* () = process_new_block ~rollup_node_endpoint block in - process_rollup_node_stream ~stream ~rollup_node_endpoint worker - -let start ({rollup_node_endpoint} as parameters) = + return (block, connection) + | Error [Connection_lost] | Error [Connection_timeout] -> + connection.close () ; + let* connection = + connect_to_stream + ~count:1 + ~rollup_node_endpoint:connection.rollup_node_endpoint + () + in + (catchup_and_next_block [@tailcall]) + ~proxy + ~catchup_event:(not proxy) + (* catchup event if not in proxy mode, proxy does not have + `evm_context` and would fail to fetch some data. Else + catchup possible missed event.*) + ~connection + | Error errs -> fail errs + +(** [loop_on_rollup_node_stream ~proxy + ~oldest_rollup_node_known_l1_level ~connection] main loop to + process the block. + + get the current rollup node block with [catchup_and_next_block], process it + with [process_finalized_level] then loop over. *) +let[@tailrec] rec loop_on_rollup_node_stream ~proxy + ~oldest_rollup_node_known_l1_level ~connection = let open Lwt_result_syntax in - let*! () = Rollup_node_follower_events.started () in - let+ worker = Worker.launch table () parameters (module Handlers) in - let () = - Lwt.dont_wait - (fun () -> - let*! stream = - Rollup_services.make_streamed_call ~rollup_node_endpoint - in - process_rollup_node_stream ~stream ~rollup_node_endpoint worker) - (fun _ -> ()) + let start_time = Unix.gettimeofday () in + let* block, connection = + catchup_and_next_block ~proxy ~catchup_event:false ~connection + in + let elapsed = Unix.gettimeofday () -. start_time in + let connection = update_timeout ~elapsed ~connection in + let finalized_level = Sc_rollup_block.(Int32.(sub block.header.level 2l)) in + let* () = + process_finalized_level + ~oldest_rollup_node_known_l1_level + ~rollup_node_endpoint:connection.rollup_node_endpoint + ~finalized_level in - Lwt.wakeup worker_waker worker + (loop_on_rollup_node_stream [@tailcall]) + ~proxy + ~oldest_rollup_node_known_l1_level + ~connection -let shutdown () = +let start ~proxy ~rollup_node_endpoint = + Lwt.async @@ fun () -> let open Lwt_syntax in - let w = Lazy.force worker in - match w with - | Error _ -> Lwt.return_unit - | Ok w -> - let* () = Rollup_node_follower_events.shutdown () in - Worker.shutdown w + let* () = Rollup_node_follower_events.started () in + Helpers.unwrap_error_monad @@ fun () -> + let open Lwt_result_syntax in + let* oldest_rollup_node_known_l1_level = + Rollup_services.oldest_known_l1_level rollup_node_endpoint + in + let* connection = connect_to_stream ~rollup_node_endpoint () in + loop_on_rollup_node_stream + ~proxy + ~oldest_rollup_node_known_l1_level + ~connection diff --git a/etherlink/bin_node/lib_prod/rollup_node_follower.mli b/etherlink/bin_node/lib_prod/rollup_node_follower.mli index 9fa26081f732c1aa27cdad7ed6f208f07e4aba5e..aef30b12129fa0fe9cf4dff47bf0d310ec852028 100644 --- a/etherlink/bin_node/lib_prod/rollup_node_follower.mli +++ b/etherlink/bin_node/lib_prod/rollup_node_follower.mli @@ -5,14 +5,6 @@ (* *) (*****************************************************************************) -type parameters = { - rollup_node_endpoint : Uri.t; - (** Rollup node endpoint used to monitor the stream of rollup - node block. *) -} - -(** [start parameters] starts the rollup node follower. *) -val start : parameters -> unit tzresult Lwt.t - -(** [shutdown ()] stops the rollup node follower. *) -val shutdown : unit -> unit Lwt.t +(** [start ~proxy ~rollup_node_endpoint] starts the rollup node + follower. In proxy mode does not try to catchup evm event. *) +val start : proxy:bool -> rollup_node_endpoint:Uri.t -> unit diff --git a/etherlink/bin_node/lib_prod/rollup_node_follower_events.ml b/etherlink/bin_node/lib_prod/rollup_node_follower_events.ml index 5f1ef9402c9a844499d2c86f5cfc1c44730a549e..37252a65d355767d6a8250f177a43846c7a94de4 100644 --- a/etherlink/bin_node/lib_prod/rollup_node_follower_events.ml +++ b/etherlink/bin_node/lib_prod/rollup_node_follower_events.ml @@ -34,6 +34,14 @@ module Event = struct ~level:Notice () + let connection_acquired = + declare_0 + ~section + ~name:"rollup_node_follower_connection_acquired" + ~msg:"Rollup node follower connected to the rollup node" + ~level:Info + () + let connection_lost = declare_0 ~section @@ -41,6 +49,15 @@ module Event = struct ~msg:"Connection with the rollup node has been lost" ~level:Error () + + let trying_reconnection = + declare_1 + ~section + ~name:"rollup_node_follower_trying_reconnection" + ~msg: + "Waiting {duration} sec before trying to reconnect to the rollup node" + ~level:Info + ("duration", Data_encoding.float) end let started = Internal_event.Simple.emit Event.started @@ -50,3 +67,8 @@ let new_block level = Internal_event.Simple.emit Event.new_block level let shutdown = Internal_event.Simple.emit Event.shutdown let connection_lost = Internal_event.Simple.emit Event.connection_lost + +let trying_reconnection duration = + Internal_event.Simple.emit Event.trying_reconnection duration + +let connection_acquired = Internal_event.Simple.emit Event.connection_acquired diff --git a/etherlink/bin_node/lib_prod/rollup_services.ml b/etherlink/bin_node/lib_prod/rollup_services.ml index 717a4d1ea237dd9ce4e10083a5c1ae7774603c6f..ef5c2750e2db0b6b40320a8d31581eef8ffa7e9c 100644 --- a/etherlink/bin_node/lib_prod/rollup_services.ml +++ b/etherlink/bin_node/lib_prod/rollup_services.ml @@ -56,6 +56,27 @@ let smart_rollup_address : ~output:(Data_encoding.Fixed.bytes 20) (open_root / "global" / "smart_rollup_address") +let gc_info_encoding = + Data_encoding.( + obj3 + (req "last_gc_level" int32) + (req "first_available_level" int32) + (opt "last_context_split_level" int32)) + +let gc_info : + ( [`GET], + unit, + unit, + unit, + unit, + int32 * int32 * int32 option ) + Service.service = + Service.get_service + ~description:"Smart rollup address" + ~query:Query.empty + ~output:gc_info_encoding + (open_root / "local" / "gc_info") + type state_value_query = {key : string} module Block_id = struct @@ -171,10 +192,10 @@ let call_service ~base ?(media_types = Media_type.all_media_types) a b c d = | Error trace -> fail trace let make_streamed_call ~rollup_node_endpoint = - let open Lwt_syntax in + let open Lwt_result_syntax in let stream, push = Lwt_stream.create () in let on_chunk v = push (Some v) and on_close () = push None in - let* _spill_all = + let* spill_all = Tezos_rpc_http_client_unix.RPC_client_unix.call_streamed_service [Media_type.json] ~base:rollup_node_endpoint @@ -185,7 +206,11 @@ let make_streamed_call ~rollup_node_endpoint = () () in - return stream + let close () = + spill_all () ; + if Lwt_stream.is_closed stream then () else on_close () + in + return (stream, close) let publish : rollup_node_endpoint:Uri.t -> @@ -234,6 +259,16 @@ let smart_rollup_address base = | Ok address -> return (Bytes.to_string address) | Error trace -> fail trace +let oldest_known_l1_level base = + let open Lwt_result_syntax in + let*! answer = + call_service ~base ~media_types:[Media_type.octet_stream] gc_info () () () + in + match answer with + | Ok (_last_gc_level, first_available_level, _last_context_split) -> + return first_available_level + | Error trace -> fail trace + (** [tezos_level base] asks for the smart rollup node's latest l1 level, using the endpoint [base]. *) let tezos_level base = diff --git a/etherlink/bin_node/lib_prod/sequencer.ml b/etherlink/bin_node/lib_prod/sequencer.ml index e21ac3e78a4c8414e6ec523146a43ff07630e5ac..f819c2dc590e725e890711d9993c7a35293824d2 100644 --- a/etherlink/bin_node/lib_prod/sequencer.ml +++ b/etherlink/bin_node/lib_prod/sequencer.ml @@ -6,20 +6,14 @@ (*****************************************************************************) module MakeBackend (Ctxt : sig - val ctxt : Evm_context.t + val smart_rollup_address : Tezos_crypto.Hashed.Smart_rollup_address.t end) : Services_backend_sig.Backend = struct module Reader = struct - let read path = - let open Lwt_result_syntax in - let*! res = Evm_context.inspect Ctxt.ctxt path in - return res + let read path = Evm_context.inspect path end module TxEncoder = struct - type transactions = { - raw : string list; - delayed : Ethereum_types.Delayed_transaction.t list; - } + type transactions = string list type messages = transactions @@ -37,19 +31,18 @@ end) : Services_backend_sig.Backend = struct module SimulatorBackend = struct let simulate_and_read ~input = let open Lwt_result_syntax in - let* raw_insights = Evm_context.execute_and_inspect Ctxt.ctxt ~input in + let* raw_insights = Evm_context.execute_and_inspect input in match raw_insights with | [Some bytes] -> return bytes | _ -> Error_monad.failwith "Invalid insights format" end let smart_rollup_address = - Tezos_crypto.Hashed.Smart_rollup_address.to_string - Ctxt.ctxt.smart_rollup_address + Tezos_crypto.Hashed.Smart_rollup_address.to_string Ctxt.smart_rollup_address end module Make (Ctxt : sig - val ctxt : Evm_context.t + val smart_rollup_address : Tezos_crypto.Hashed.Smart_rollup_address.t end) = Services_backend_sig.Make (MakeBackend (Ctxt)) @@ -66,11 +59,11 @@ let install_finalizer_seq server private_server = Events.shutdown_rpc_server ~private_:true) private_server in + Helpers.unwrap_error_monad @@ fun () -> + let open Lwt_result_syntax in let* () = Tx_pool.shutdown () in - let* () = Rollup_node_follower.shutdown () in let* () = Evm_events_follower.shutdown () in let* () = Blueprints_publisher.shutdown () in - let* () = Delayed_inbox.shutdown () in return_unit let callback_log server conn req body = @@ -187,52 +180,6 @@ let loop_sequencer : let now = Helpers.now () in loop now -let[@tailrec] rec catchup_evm_event ~rollup_node_endpoint store = - let open Lwt_result_syntax in - let* rollup_node_l1_level = - Rollup_services.tezos_level rollup_node_endpoint - in - let* latest_known_l1_level = Store.L1_latest_known_level.find store in - match latest_known_l1_level with - | None -> - (* sequencer has no value to start from, it must be the initial - start or we went from prod to dev. *) - let*! () = Store_events.no_l1_latest_level_to_catch_up () in - return_unit - | Some latest_known_l1_level -> - let finalized_level = Int32.(sub rollup_node_l1_level 2l) in - if latest_known_l1_level = finalized_level then return_unit - else if latest_known_l1_level > finalized_level then - tzfail - (error_of_fmt - "Internal error: The sequencer has processed more l1 level than \ - the rollup node, it should be impossible. ") - else - let*! () = - Events.catching_up_evm_event - ~from:latest_known_l1_level - ~to_:finalized_level - in - catch_evm_event_aux - ~rollup_node_endpoint - store - ~from:latest_known_l1_level - ~to_:finalized_level - -and[@tailrec] catch_evm_event_aux ~rollup_node_endpoint store ~from ~to_ = - let open Lwt_result_syntax in - if from = to_ then catchup_evm_event ~rollup_node_endpoint store - else if from > to_ then - tzfail - (error_of_fmt - "Internal error: The catchup of evm_event went too far, it should be \ - impossible.") - else - let next_l1_level = Int32.succ from in - let* () = Evm_events_follower.new_rollup_block next_l1_level in - let* () = Store.L1_latest_known_level.store store next_l1_level in - catch_evm_event_aux ~rollup_node_endpoint store ~from:next_l1_level ~to_ - let main ~data_dir ~rollup_node_endpoint ~max_blueprints_lag ~max_blueprints_ahead ~max_blueprints_catchup ~catchup_cooldown ?(genesis_timestamp = Helpers.now ()) ~cctxt ~sequencer @@ -242,8 +189,8 @@ let main ~data_dir ~rollup_node_endpoint ~max_blueprints_lag let* smart_rollup_address = Rollup_services.smart_rollup_address rollup_node_endpoint in - let* ctxt, loaded = - Evm_context.init + let* status = + Evm_context.start ?kernel_path:kernel ~data_dir ~preimages:configuration.mode.preimages @@ -251,7 +198,8 @@ let main ~data_dir ~rollup_node_endpoint ~max_blueprints_lag ~smart_rollup_address () in - let (Qty next_blueprint_number) = ctxt.session.next_blueprint_number in + let*! head = Evm_context.head_info () in + let (Qty next_blueprint_number) = head.next_blueprint_number in let* () = Blueprints_publisher.start ~rollup_node_endpoint @@ -260,10 +208,10 @@ let main ~data_dir ~rollup_node_endpoint ~max_blueprints_lag ~max_blueprints_catchup ~catchup_cooldown ~latest_level_seen:(Z.pred next_blueprint_number) - ctxt.store + () in let* () = - if not loaded then + if status = Created then (* Create the first empty block. *) let* genesis = Sequencer_blueprint.create @@ -276,12 +224,17 @@ let main ~data_dir ~rollup_node_endpoint ~max_blueprints_lag ~number:Ethereum_types.(Qty Z.zero) ~parent_hash:Ethereum_types.genesis_parent_hash in - Evm_context.apply_and_publish_blueprint ctxt genesis_timestamp genesis + let* () = Evm_context.apply_blueprint genesis_timestamp genesis [] in + Blueprints_publisher.publish Z.zero genesis else return_unit in + let smart_rollup_address_typed = + Tezos_crypto.Hashed.Smart_rollup_address.of_string_exn smart_rollup_address + in + let module Sequencer = Make (struct - let ctxt = ctxt + let smart_rollup_address = smart_rollup_address_typed end) in let* () = Tx_pool.start @@ -289,18 +242,25 @@ let main ~data_dir ~rollup_node_endpoint ~max_blueprints_lag in let* () = Block_producer.start - {ctxt; cctxt; smart_rollup_address; sequencer_key = sequencer} + { + cctxt; + smart_rollup_address; + sequencer_key = sequencer; + maximum_number_of_chunks = configuration.mode.max_number_of_chunks; + } in let* () = - Delayed_inbox.start {rollup_node_endpoint; delayed_inbox_interval = 1} + Evm_events_follower.start + {rollup_node_endpoint; filter_event = (fun _ -> true)} in - let* () = Evm_events_follower.start {rollup_node_endpoint; ctxt} in - let* () = catchup_evm_event ~rollup_node_endpoint ctxt.store in - let* () = Rollup_node_follower.start {rollup_node_endpoint} in + let () = Rollup_node_follower.start ~proxy:false ~rollup_node_endpoint in + let directory = Services.directory configuration ((module Sequencer), smart_rollup_address) in - let directory = directory |> Evm_services.register ctxt in + let directory = + directory |> Evm_services.register smart_rollup_address_typed + in let private_info = Option.map (fun private_rpc_port -> diff --git a/etherlink/bin_node/lib_prod/sequencer_blueprint.ml b/etherlink/bin_node/lib_prod/sequencer_blueprint.ml index 5decfe173cb137c2b64b3241fa71b9b4f739bcad..be819f0657488d8ffa8403637c2cb60bea85235f 100644 --- a/etherlink/bin_node/lib_prod/sequencer_blueprint.ml +++ b/etherlink/bin_node/lib_prod/sequencer_blueprint.ml @@ -41,73 +41,37 @@ let max_chunk_size = - blueprint_tag_size - blueprint_number_size - nb_chunks_size - chunk_index_size - rlp_tags_size - signature_size -let encode_transaction hash raw = +let maximum_usable_space_in_blueprint chunks_count = + chunks_count * max_chunk_size + +let maximum_chunks_per_l1_level = 512 * 1024 / 4096 + +let encode_transaction raw = let open Rlp in - List - [ - Value (Bytes.of_string hash); - List [Value (Bytes.of_string "\001"); Value (Bytes.of_string raw)]; - ] - -let encode_delayed_transaction Delayed_transaction.{kind; hash; raw} = - match kind with - | Transaction -> - let open Rlp in - let hash = hash_to_bytes hash in - List - [ - Value (Bytes.of_string hash); - List [Value (Bytes.of_string "\003"); Value (Bytes.of_string raw)]; - ] - | Deposit -> - let open Rlp in - let hash = hash_to_bytes hash in - let rlp = decode_exn (Bytes.of_string raw) in - List - [ - Value (hash |> Bytes.of_string); - List [Value (Bytes.of_string "\002"); rlp]; - ] - -let make_blueprint_chunks ~timestamp ~transactions ~delayed_transactions - ~parent_hash = + Value (Bytes.of_string raw) + +let make_blueprint_chunks ~timestamp ~transactions + ~(delayed_transactions : Ethereum_types.hash list) ~parent_hash = let open Rlp in - let delayed_hashes = + let delayed_transactions = List (List.map - (fun tx -> - Value - (Delayed_transaction.hash tx |> hash_to_bytes |> Bytes.of_string)) + (fun hash -> Value (hash_to_bytes hash |> Bytes.of_string)) delayed_transactions) in - let delayed_transactions = - List.map encode_delayed_transaction delayed_transactions - in - let messages, full_messages = - let m = - List.map - (fun transaction -> - let tx_hash_str = Ethereum_types.hash_raw_tx transaction in - encode_transaction tx_hash_str transaction) - transactions - in - (List m, List (delayed_transactions @ m)) + let messages = + let m = List.map encode_transaction transactions in + List m in let timestamp = Value (Ethereum_types.timestamp_to_bytes timestamp) in let parent_hash = Value (block_hash_to_bytes parent_hash |> Bytes.of_string) in - let to_publish = - List [parent_hash; delayed_hashes; messages; timestamp] |> encode - in - let to_execute = - List [parent_hash; List []; full_messages; timestamp] |> encode + let blueprint = + List [parent_hash; delayed_transactions; messages; timestamp] |> encode in - match - ( String.chunk_bytes max_chunk_size to_publish, - String.chunk_bytes max_chunk_size to_execute ) - with - | Ok to_publish, Ok to_execute -> (to_publish, to_execute) + match String.chunk_bytes max_chunk_size blueprint with + | Ok chunks -> chunks | _ -> (* [chunk_bytes] can only return an [Error] if the optional argument [error_on_partial_chunk] is passed. As this is not @@ -119,29 +83,22 @@ let encode_u16_le i = Bytes.set_uint16_le bytes 0 i ; bytes -type t = { - to_publish : Blueprint_types.payload; - to_execute : Blueprint_types.payload; -} +type t = Blueprint_types.payload let create ~cctxt ~sequencer_key ~timestamp ~smart_rollup_address ~number - ~parent_hash ~delayed_transactions ~transactions = + ~parent_hash ~(delayed_transactions : Ethereum_types.hash list) + ~transactions = let open Lwt_result_syntax in let open Rlp in let number = Value (encode_u256_le number) in - let to_publish_chunks, to_execute_chunks = + let chunks = make_blueprint_chunks ~timestamp ~transactions ~delayed_transactions ~parent_hash in - let nb_chunks_publish = - Rlp.Value (encode_u16_le @@ List.length to_publish_chunks) - in - let nb_chunks_execute = - Rlp.Value (encode_u16_le @@ List.length to_execute_chunks) - in + let nb_chunks = Rlp.Value (encode_u16_le @@ List.length chunks) in let message_from_chunk nb_chunks chunk_index chunk = let chunk_index = Rlp.Value (encode_u16_le chunk_index) in let value = Value (Bytes.of_string chunk) in @@ -165,10 +122,4 @@ let create ~cctxt ~sequencer_key ~timestamp ~smart_rollup_address ~number rlp_sequencer_blueprint) |> return in - let* to_publish = - List.mapi_ep (message_from_chunk nb_chunks_publish) to_publish_chunks - in - let* to_execute = - List.mapi_ep (message_from_chunk nb_chunks_execute) to_execute_chunks - in - return {to_publish; to_execute} + List.mapi_ep (message_from_chunk nb_chunks) chunks diff --git a/etherlink/bin_node/lib_prod/sequencer_blueprint.mli b/etherlink/bin_node/lib_prod/sequencer_blueprint.mli index e58842598c7e9ca27a7561911b39c10f48d43c40..78fc1777a3c21fb5d7bfd33b36cc61e6cd3f454f 100644 --- a/etherlink/bin_node/lib_prod/sequencer_blueprint.mli +++ b/etherlink/bin_node/lib_prod/sequencer_blueprint.mli @@ -5,21 +5,7 @@ (* *) (*****************************************************************************) -(** Type representing the messages describing a blueprint. - - The sequencer has to produce two different versions of every blueprint. - The first one is the version published to L1. In this one, only the hashes - of delayed transactions are added, as the kernel already has the full - transactions in the delayed inbox. - The second one is the one the sequencer executes locally. In this case, - instead of replicating the state of the delayed inbox, the full delayed - transactions are added in the blueprint, respecting the same order as - their corresponding hashes. -*) -type t = { - to_publish : Blueprint_types.payload; - to_execute : Blueprint_types.payload; -} +type t = Blueprint_types.payload (** [create ~secret_key ~timestamp ~smart_rollup_address ~number ~parent_hash ~delayed_transactions ~transactions] @@ -34,6 +20,14 @@ val create : smart_rollup_address:string -> number:Ethereum_types.quantity -> parent_hash:Ethereum_types.block_hash -> - delayed_transactions:Ethereum_types.Delayed_transaction.t list -> + delayed_transactions:Ethereum_types.hash list -> transactions:string list -> t tzresult Lwt.t + +(** [maximum_usable_size_in_blueprint chunks_count] returns the available space + for transactions in a blueprint composed of [chunks_count] chunks. *) +val maximum_usable_space_in_blueprint : int -> int + +(* [maximum_chunks_per_l1_level] is the maximum number of chunks a L1 block can + hold at once. *) +val maximum_chunks_per_l1_level : int diff --git a/etherlink/bin_node/lib_prod/services_backend_sig.ml b/etherlink/bin_node/lib_prod/services_backend_sig.ml index 1013c3f038de2814b9eeafb13f9ea57b97035555..d46dbe0acb943d64745db9b24c6222f34d6f5edd 100644 --- a/etherlink/bin_node/lib_prod/services_backend_sig.ml +++ b/etherlink/bin_node/lib_prod/services_backend_sig.ml @@ -26,7 +26,6 @@ module type S = sig timestamp:Time.Protocol.t -> smart_rollup_address:string -> transactions:string list -> - delayed:Ethereum_types.Delayed_transaction.t list -> Ethereum_types.hash list tzresult Lwt.t (** [current_block ~full_transaction_object] returns the most recent diff --git a/etherlink/bin_node/lib_prod/tx_pool.ml b/etherlink/bin_node/lib_prod/tx_pool.ml index 34132e303eb20394b5410abc31ccb47f11d4c722..034c34e6e47be74c35a2b152ef9060cc0c3d4d21 100644 --- a/etherlink/bin_node/lib_prod/tx_pool.ml +++ b/etherlink/bin_node/lib_prod/tx_pool.ml @@ -19,22 +19,16 @@ module Pool = struct } type t = { - delayed_transactions : Ethereum_types.Delayed_transaction.t list; transactions : transaction Nonce_map.t Pkey_map.t; global_index : int64; (* Index to order the transactions. *) } - let empty : t = - { - transactions = Pkey_map.empty; - global_index = Int64.zero; - delayed_transactions = []; - } + let empty : t = {transactions = Pkey_map.empty; global_index = Int64.zero} (** Add a transaction to the pool. *) let add t pkey raw_tx = let open Result_syntax in - let {transactions; global_index; delayed_transactions} = t in + let {transactions; global_index} = t in let* nonce = Ethereum_types.transaction_nonce raw_tx in let* gas_price = Ethereum_types.transaction_gas_price raw_tx in let* gas_limit = Ethereum_types.transaction_gas_limit raw_tx in @@ -60,18 +54,7 @@ module Pool = struct user_transactions)) transactions in - return - { - transactions; - global_index = Int64.(add global_index one); - delayed_transactions; - } - - (** Add a delayed transaction to the pool.*) - let add_delayed t tx = - let open Result_syntax in - let delayed_transactions = tx :: t.delayed_transactions in - return {t with delayed_transactions} + return {transactions; global_index = Int64.(add global_index one)} (** Returns all the addresses of the pool *) let addresses {transactions; _} = @@ -79,8 +62,7 @@ module Pool = struct (** Returns the transaction matching the predicate. And remove them from the pool. *) - let partition pkey predicate - {transactions; global_index; delayed_transactions} = + let partition pkey predicate {transactions; global_index} = (* Get the sequence of transaction *) let selected, remaining = transactions |> Pkey_map.find pkey @@ -94,7 +76,7 @@ module Pool = struct in (* Convert the sequence to a list *) let selected = selected |> Nonce_map.bindings |> List.map snd in - (selected, {transactions; global_index; delayed_transactions}) + (selected, {transactions; global_index}) (** Removes from the pool the transactions matching the predicate for the given pkey. *) @@ -133,32 +115,6 @@ type parameters = { mode : mode; } -type add_transaction = - | Transaction of string - | Delayed of Ethereum_types.Delayed_transaction.t - -let add_transaction_encoding = - let open Data_encoding in - union - [ - case - (Tag 0) - ~title:"transaction" - string - (function Transaction transaction -> Some transaction | _ -> None) - (function transaction -> Transaction transaction); - case - (Tag 1) - ~title:"delayed" - Ethereum_types.Delayed_transaction.encoding - (function Delayed delayed -> Some delayed | _ -> None) - (fun delayed -> Delayed delayed); - ] - -type popped_transactions = - | Locked - | Transactions of string list * Ethereum_types.Delayed_transaction.t list - module Types = struct type state = { rollup_node : (module Services_backend_sig.S); @@ -187,12 +143,13 @@ end module Request = struct type ('a, 'b) t = | Add_transaction : - add_transaction + string -> ((Ethereum_types.hash, string) result, tztrace) t - | Pop_transactions : (popped_transactions, tztrace) t + | Pop_transactions : int -> (string list, tztrace) t | Pop_and_inject_transactions : (unit, tztrace) t | Lock_transactions : (unit, tztrace) t | Unlock_transactions : (unit, tztrace) t + | Is_locked : (bool, tztrace) t type view = View : _ t -> view @@ -207,7 +164,7 @@ module Request = struct ~title:"Add_transaction" (obj2 (req "request" (constant "add_transaction")) - (req "transaction" add_transaction_encoding)) + (req "transaction" string)) (function | View (Add_transaction transaction) -> Some ((), transaction) | _ -> None) @@ -215,9 +172,15 @@ module Request = struct case (Tag 1) ~title:"Pop_transactions" - (obj1 (req "request" (constant "pop_transactions"))) - (function View Pop_transactions -> Some () | _ -> None) - (fun () -> View Pop_transactions); + (obj2 + (req "request" (constant "pop_transactions")) + (req "maximum_cumulatize_size" int31)) + (function + | View (Pop_transactions maximum_cumulative_size) -> + Some ((), maximum_cumulative_size) + | _ -> None) + (fun ((), maximum_cumulative_size) -> + View (Pop_transactions maximum_cumulative_size)); case (Tag 2) ~title:"Pop_and_inject_transactions" @@ -236,28 +199,31 @@ module Request = struct (obj1 (req "request" (constant "unlock_transactions"))) (function View Unlock_transactions -> Some () | _ -> None) (fun () -> View Unlock_transactions); + case + (Tag 5) + ~title:"Is_locked" + (obj1 (req "request" (constant "is_locked"))) + (function View Is_locked -> Some () | _ -> None) + (fun () -> View Is_locked); ] let pp ppf (View r) = match r with - | Add_transaction transaction -> ( - match transaction with - | Transaction tx_raw -> - Format.fprintf - ppf - "Add tx [%s] to tx-pool" - (Hex.of_string tx_raw |> Hex.show) - | Delayed delayed -> - Format.fprintf - ppf - "Add delayed inbox tx [%a] to tx-pool" - Ethereum_types.Delayed_transaction.pp_short - delayed) - | Pop_transactions -> Format.fprintf ppf "Popping transactions" + | Add_transaction tx_raw -> + Format.fprintf + ppf + "Add tx [%s] to tx-pool" + (Hex.of_string tx_raw |> Hex.show) + | Pop_transactions maximum_cumulative_size -> + Format.fprintf + ppf + "Popping transactions of maximum cumulative size %d bytes" + maximum_cumulative_size | Pop_and_inject_transactions -> Format.fprintf ppf "Popping and injecting transactions" | Lock_transactions -> Format.fprintf ppf "Locking the transactions" | Unlock_transactions -> Format.fprintf ppf "Unlocking the transactions" + | Is_locked -> Format.fprintf ppf "Checking if the tx pool is locked" end module Worker = Worker.MakeSingle (Name) (Request) (Types) @@ -291,24 +257,6 @@ let on_normal_transaction state tx_raw = state.pool <- pool ; return (Ok hash) -let on_delayed_transaction state delayed_tx = - let open Lwt_result_syntax in - let open Types in - let {rollup_node = (module Rollup_node); pool; _} = state in - (* Add the tx to the pool*) - let*? pool = Pool.add_delayed pool delayed_tx in - state.pool <- pool ; - return_unit - -let on_transaction state transaction = - let open Lwt_result_syntax in - match transaction with - | Transaction transaction -> on_normal_transaction state transaction - | Delayed delayed_transaction -> - let hash = Ethereum_types.Delayed_transaction.hash delayed_transaction in - let* () = on_delayed_transaction state delayed_transaction in - return (Ok hash) - (** Checks that [balance] is enough to pay up to the maximum [gas_limit] the sender defined parametrized by the [gas_price]. *) let can_prepay ~balance ~gas_price ~gas_limit = @@ -319,7 +267,7 @@ let can_prepay ~balance ~gas_price ~gas_limit = let can_pay_with_current_base_fee ~gas_price ~base_fee_per_gas = gas_price >= base_fee_per_gas -let pop_transactions state = +let pop_transactions state ~maximum_cumulative_size = let open Lwt_result_syntax in let Types. { @@ -330,7 +278,7 @@ let pop_transactions state = } = state in - if locked then return Locked + if locked then return [] else (* Get all the addresses in the tx-pool. *) let addresses = Pool.addresses pool in @@ -360,24 +308,40 @@ let pop_transactions state = pool) pool in - (* Select transaction with nonce equal to user's nonce and that - can be prepaid. + (* Select transaction with nonce equal to user's nonce, that can be prepaid + and selects a sum of transactions that wouldn't go above the size limit + of the blueprint. Also removes the transactions from the pool. *) - let txs, pool = + let txs, pool, _ = addr_with_nonces |> List.fold_left - (fun (txs, pool) (pkey, _, current_nonce) -> + (fun (txs, pool, cumulative_size) (pkey, _, current_nonce) -> + (* This mutable counter is purely local and used only for the + partition. *) + let accumulated_size = ref cumulative_size in let selected, pool = Pool.partition pkey - (fun nonce {gas_price; _} -> - nonce = current_nonce - && can_pay_with_current_base_fee ~gas_price ~base_fee_per_gas) + (fun nonce {gas_price; raw_tx; _} -> + let check_nonce = nonce = current_nonce in + let can_fit = + !accumulated_size + String.length raw_tx + <= maximum_cumulative_size + in + let can_pay = + can_pay_with_current_base_fee ~gas_price ~base_fee_per_gas + in + let selected = check_nonce && can_pay && can_fit in + (* If the transaction is selected, this means it will fit *) + if selected then + accumulated_size := + !accumulated_size + String.length raw_tx ; + selected) pool in let txs = List.append txs selected in - (txs, pool)) - ([], pool) + (txs, pool, !accumulated_size)) + ([], pool, 0) in (* Sorting transactions by index. First tx in the pool is the first tx to be sent to the batcher. *) @@ -387,13 +351,9 @@ let pop_transactions state = Int64.compare index_a index_b) |> List.map (fun Pool.{raw_tx; _} -> raw_tx) in - (* Add delayed transactions and empty the list *) - let delayed, pool = - (pool.delayed_transactions, {pool with delayed_transactions = []}) - in (* update the pool *) state.pool <- pool ; - return (Transactions (txs, delayed)) + return txs let pop_and_inject_transactions state = let open Lwt_result_syntax in @@ -402,41 +362,45 @@ let pop_and_inject_transactions state = | Sequencer -> failwith "Internal error: the sequencer is not supposed to use this function" - | Observer | Proxy _ -> ( - let* res = pop_transactions state in - match res with - | Locked -> return_unit - | Transactions (txs, delayed) -> - if not (List.is_empty txs && List.is_empty delayed) then - let (module Rollup_node : Services_backend_sig.S) = - state.rollup_node - in - let*! hashes = - Rollup_node.inject_raw_transactions - (* The timestamp is ignored in observer and proxy mode, it's just for - compatibility with sequencer mode. *) - ~timestamp:(Helpers.now ()) - ~smart_rollup_address:state.smart_rollup_address - ~transactions:txs - ~delayed + | Observer | Proxy _ -> + (* We over approximate the number of transactions to pop in proxy and + observer mode to the maximum size an L1 block can hold. If the proxy + sends more, they won't be applied at the next level. For the observer, + it prevents spamming the sequencer. *) + let maximum_cumulative_size = + Sequencer_blueprint.maximum_usable_space_in_blueprint + Sequencer_blueprint.maximum_chunks_per_l1_level + in + let* txs = pop_transactions state ~maximum_cumulative_size in + if not (List.is_empty txs) then + let (module Rollup_node : Services_backend_sig.S) = state.rollup_node in + let*! hashes = + Rollup_node.inject_raw_transactions + (* The timestamp is ignored in observer and proxy mode, it's just for + compatibility with sequencer mode. *) + ~timestamp:(Helpers.now ()) + ~smart_rollup_address:state.smart_rollup_address + ~transactions:txs + in + match hashes with + | Error trace -> + let*! () = Tx_pool_events.transaction_injection_failed trace in + return_unit + | Ok hashes -> + let*! () = + List.iter_s + (fun hash -> Tx_pool_events.transaction_injected ~hash) + hashes in - match hashes with - | Error trace -> - let*! () = Tx_pool_events.transaction_injection_failed trace in - return_unit - | Ok hashes -> - let*! () = - List.iter_s - (fun hash -> Tx_pool_events.transaction_injected ~hash) - hashes - in - return_unit - else return_unit) + return_unit + else return_unit let lock_transactions state = state.Types.locked <- true let unlock_transactions state = state.Types.locked <- false +let is_locked state = state.Types.locked + module Handlers = struct type self = worker @@ -461,15 +425,17 @@ module Handlers = struct match request with | Request.Add_transaction transaction -> protect @@ fun () -> - let* res = on_transaction state transaction in + let* res = on_normal_transaction state transaction in let* () = observer_self_inject_request w in return res - | Request.Pop_transactions -> protect @@ fun () -> pop_transactions state + | Request.Pop_transactions maximum_cumulative_size -> + protect @@ fun () -> pop_transactions state ~maximum_cumulative_size | Request.Pop_and_inject_transactions -> protect @@ fun () -> pop_and_inject_transactions state | Request.Lock_transactions -> protect @@ fun () -> return (lock_transactions state) | Request.Unlock_transactions -> return (unlock_transactions state) + | Request.Is_locked -> protect @@ fun () -> return (is_locked state) type launch_error = error trace @@ -502,7 +468,7 @@ let table = Worker.create_table Queue let worker_promise, worker_waker = Lwt.task () -type error += No_tx_pool +type error += No_worker type error += Tx_pool_terminated @@ -510,8 +476,18 @@ let worker = lazy (match Lwt.state worker_promise with | Lwt.Return worker -> Ok worker - | Lwt.Fail e -> Error (TzTrace.make @@ error_of_exn e) - | Lwt.Sleep -> Error (TzTrace.make No_tx_pool)) + | Lwt.Fail e -> Result_syntax.tzfail (error_of_exn e) + | Lwt.Sleep -> Result_syntax.tzfail No_worker) + +let bind_worker f = + let open Lwt_result_syntax in + let res = Lazy.force worker in + match res with + | Error [No_worker] -> + (* There is no worker, nothing to do *) + return_unit + | Error errs -> fail errs + | Ok w -> f w let handle_request_error rq = let open Lwt_syntax in @@ -523,62 +499,22 @@ let handle_request_error rq = | Error (Closed (Some errs)) -> Lwt.return_error errs | Error (Any exn) -> Lwt.return_error [Exn exn] -let rec subscribe_l2_block ~stream_l2 worker = - let open Lwt_syntax in - let* new_head = Lwt_stream.get stream_l2 in - match new_head with - | Some _block -> - let* _pushed = - Worker.Queue.push_request worker Request.Pop_and_inject_transactions - in - subscribe_l2_block ~stream_l2 worker - | None -> - let* () = Tx_pool_events.connection_lost () in - let* () = Lwt_unix.sleep 1. in - subscribe_l2_block ~stream_l2 worker - -let start ({mode; _} as parameters) = +let start parameters = let open Lwt_result_syntax in let+ worker = Worker.launch table () parameters (module Handlers) in - let () = - Lwt.dont_wait - (fun () -> - match mode with - | Proxy {rollup_node_endpoint} -> - let*! stream_l2 = - Rollup_services.make_streamed_call ~rollup_node_endpoint - in - subscribe_l2_block ~stream_l2 worker - | Sequencer | Observer -> Lwt.return_unit) - (fun _ -> ()) - in Lwt.wakeup worker_waker worker let shutdown () = - let open Lwt_syntax in - let w = Lazy.force worker in - match w with - | Error _ -> - (* There is no tx-pool, nothing to do *) - Lwt.return_unit - | Ok w -> - let* () = Tx_pool_events.shutdown () in - Worker.shutdown w - -let add raw_tx = let open Lwt_result_syntax in - let*? w = Lazy.force worker in - Worker.Queue.push_request_and_wait - w - (Request.Add_transaction (Transaction raw_tx)) - |> handle_request_error + bind_worker @@ fun w -> + let*! () = Tx_pool_events.shutdown () in + let*! () = Worker.shutdown w in + return_unit -let add_delayed delayed = +let add raw_tx = let open Lwt_result_syntax in let*? w = Lazy.force worker in - Worker.Queue.push_request_and_wait - w - (Request.Add_transaction (Delayed delayed)) + Worker.Queue.push_request_and_wait w (Request.Add_transaction raw_tx) |> handle_request_error let nonce pkey = @@ -593,10 +529,12 @@ let nonce pkey = in return next_nonce -let pop_transactions () = +let pop_transactions ~maximum_cumulative_size = let open Lwt_result_syntax in let*? worker = Lazy.force worker in - Worker.Queue.push_request_and_wait worker Request.Pop_transactions + Worker.Queue.push_request_and_wait + worker + (Request.Pop_transactions maximum_cumulative_size) |> handle_request_error let pop_and_inject_transactions () = @@ -605,6 +543,14 @@ let pop_and_inject_transactions () = Worker.Queue.push_request_and_wait worker Request.Pop_and_inject_transactions |> handle_request_error +let pop_and_inject_transactions_lazy () = + let open Lwt_result_syntax in + bind_worker @@ fun w -> + let*! (_pushed : bool) = + Worker.Queue.push_request w Request.Pop_and_inject_transactions + in + return_unit + let lock_transactions () = let open Lwt_result_syntax in let*? worker = Lazy.force worker in @@ -616,3 +562,9 @@ let unlock_transactions () = let*? worker = Lazy.force worker in Worker.Queue.push_request_and_wait worker Request.Unlock_transactions |> handle_request_error + +let is_locked () = + let open Lwt_result_syntax in + let*? worker = Lazy.force worker in + Worker.Queue.push_request_and_wait worker Request.Is_locked + |> handle_request_error diff --git a/etherlink/bin_node/lib_prod/tx_pool.mli b/etherlink/bin_node/lib_prod/tx_pool.mli index 73e9a60251df61921adea14287f9328a013434e8..b9ea9394a968d48d357553894a0e62be4d5c030f 100644 --- a/etherlink/bin_node/lib_prod/tx_pool.mli +++ b/etherlink/bin_node/lib_prod/tx_pool.mli @@ -13,38 +13,36 @@ type parameters = { mode : mode; } -type popped_transactions = - | Locked - | Transactions of string list * Ethereum_types.Delayed_transaction.t list - (** [start parameters] starts the tx-pool *) val start : parameters -> unit tzresult Lwt.t (** [shutdown ()] stops the tx-pool, waiting for the ongoing request to be processed. *) -val shutdown : unit -> unit Lwt.t +val shutdown : unit -> unit tzresult Lwt.t (** [add raw_tx] adds a raw eth transaction to the tx-pool. *) val add : string -> (Ethereum_types.hash, string) result tzresult Lwt.t -(** [add_delayed transaction] adds a delayed transaction to the tx-pool. *) -val add_delayed : - Ethereum_types.Delayed_transaction.t -> - (Ethereum_types.hash, string) result tzresult Lwt.t - (** [nonce address] returns the nonce of the user - Returns the first gap in the tx-pool, or the nonce stored on the rollup + Returns the first gap in the tx-pool, or the nonce stored on the rollup if no transactions are in the pool. *) val nonce : Ethereum_types.Address.t -> Ethereum_types.quantity tzresult Lwt.t -(** [pop_transactions ()] pops the valid transactions from the pool. *) -val pop_transactions : unit -> popped_transactions tzresult Lwt.t +(** [pop_transactions maximum_cumulative_size] pops as much valid transactions + as possible from the pool, until their cumulative size exceeds + `maximum_cumulative_size`. Returns no transactions if the pool is locked. *) +val pop_transactions : maximum_cumulative_size:int -> string list tzresult Lwt.t (** [pop_and_inject_transactions ()] pops the valid transactions from - the pool using {!pop_transactions }and injects them using + the pool using {!pop_transactions} and injects them using [inject_raw_transactions] provided by {!parameters.rollup_node}. *) val pop_and_inject_transactions : unit -> unit tzresult Lwt.t +(** [pop_and_inject_transactions_lazy ()] same as + [pop_and_inject_transactions] but don't wait for the request to + complete *) +val pop_and_inject_transactions_lazy : unit -> unit tzresult Lwt.t + (** [lock_transactions] locks the transactions in the pool, new transactions can be added but nothing can be retrieved with {!pop_transactions}. *) val lock_transactions : unit -> unit tzresult Lwt.t @@ -52,3 +50,6 @@ val lock_transactions : unit -> unit tzresult Lwt.t (** [unlock_transactions] unlocks the transactions if it was locked by {!lock_transactions}. *) val unlock_transactions : unit -> unit tzresult Lwt.t + +(** [is_locked] checks if the pools is locked. *) +val is_locked : unit -> bool tzresult Lwt.t diff --git a/etherlink/bin_node/main.ml b/etherlink/bin_node/main.ml index c5ad68e4bbecdcb2c43b12213af84f7ac6cdfad2..dd11c8acc7a3aff7996a868453266f7bde572634 100644 --- a/etherlink/bin_node/main.ml +++ b/etherlink/bin_node/main.ml @@ -64,7 +64,7 @@ module Event = struct ~level:Notice ("exit_status", Data_encoding.int8) - let event_shutdown_tx_pool = + let _event_shutdown_tx_pool = Internal_event.Simple.declare_0 ~section ~name:"shutting_down_tx_pool" @@ -127,8 +127,10 @@ let install_finalizer_prod server = let* () = emit Event.event_shutdown_node exit_status in let* () = Tezos_rpc_http_server.RPC_server.shutdown server in let* () = emit (Event.event_shutdown_rpc_server ~private_:false) () in + Evm_node_lib_prod.Helpers.unwrap_error_monad @@ fun () -> + let open Lwt_result_syntax in let* () = Evm_node_lib_prod.Tx_pool.shutdown () in - emit Event.event_shutdown_tx_pool () + Evm_node_lib_prod.Evm_context.shutdown () let install_finalizer_dev server = let open Lwt_syntax in @@ -611,6 +613,11 @@ let proxy_command = mode = Proxy {rollup_node_endpoint}; } in + let () = + Evm_node_lib_prod.Rollup_node_follower.start + ~proxy:true + ~rollup_node_endpoint + in let* directory = prod_directory config rollup_config in let* server = start config ~directory in let (_ : Lwt_exit.clean_up_callback_id) = @@ -863,8 +870,8 @@ let observer_command = let make_prod_messages ~kind ~smart_rollup_address data = let open Lwt_result_syntax in - let open Evm_node_lib_prod in - let open Evm_node_lib_prod_encoding in + let open Evm_node_lib_dev in + let open Evm_node_lib_dev_encoding in let transactions = List.map (fun s -> Ethereum_types.hex_of_string s |> Ethereum_types.hex_to_bytes) @@ -873,7 +880,7 @@ let make_prod_messages ~kind ~smart_rollup_address data = let* messages = match kind with | `Blueprint (cctxt, sk_uri, timestamp, number, parent_hash) -> - let* Sequencer_blueprint.{to_publish; _} = + let* blueprint = Sequencer_blueprint.create ~cctxt ~sequencer_key:sk_uri @@ -884,7 +891,7 @@ let make_prod_messages ~kind ~smart_rollup_address data = ~transactions ~delayed_transactions:[] in - return @@ List.map (fun (`External s) -> s) to_publish + return @@ List.map (fun (`External s) -> s) blueprint | `Transaction -> let*? chunks = List.map_e diff --git a/etherlink/kernel_evm/kernel/tests/resources/ghostnet_evm_kernel.wasm b/etherlink/kernel_evm/kernel/tests/resources/ghostnet_evm_kernel.wasm index f3eb9b1fa3dd0cd38fa8913f7797195ca4c7d163..b02d91839ca4c7176d508bdd78b17bff0c09e4ce 100644 Binary files a/etherlink/kernel_evm/kernel/tests/resources/ghostnet_evm_kernel.wasm and b/etherlink/kernel_evm/kernel/tests/resources/ghostnet_evm_kernel.wasm differ diff --git a/tezt/lib_tezos/constant.ml b/tezt/lib_tezos/constant.ml index 28cc94271697feae46fe1cf32cbade5326eb242c..b2fa75f02c849e440daa52a91eaee9a819518dea 100644 --- a/tezt/lib_tezos/constant.ml +++ b/tezt/lib_tezos/constant.ml @@ -103,7 +103,7 @@ module WASM = struct ~path: "etherlink/kernel_evm/kernel/tests/resources/ghostnet_evm_kernel.wasm" - let ghostnet_evm_commit = "0a81ce76b3d4f57d8c5194bcb9418f9294fd2be1" + let ghostnet_evm_commit = "d517020b58afef0e15c768ee0b5acbda1786cdd8" let tx_kernel = Uses.make ~tag:"tx_kernel" ~path:"tx_kernel.wasm"