From 41a398386b6878a5d1bc9f4f57f76b558dfa2260 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Proust?= Date: Tue, 15 Mar 2022 16:46:25 +0100 Subject: [PATCH 1/8] Store: minor clean-up of code --- src/lib_store/block_store.ml | 5 +++-- src/lib_store/test/alpha_utils.ml | 9 +++------ src/lib_store/test/test_store.ml | 11 +---------- 3 files changed, 7 insertions(+), 18 deletions(-) diff --git a/src/lib_store/block_store.ml b/src/lib_store/block_store.ml index 5c4e2d4bd5dc..3282faac66d6 100644 --- a/src/lib_store/block_store.ml +++ b/src/lib_store/block_store.ml @@ -1012,8 +1012,9 @@ let move_all_floating_stores block_store ~new_ro_store = return_unit) let check_store_consistency block_store ~cementing_highwatermark = - Cemented_block_store.get_highest_cemented_level block_store.cemented_store - |> function + match + Cemented_block_store.get_highest_cemented_level block_store.cemented_store + with | None -> (* First merge or Rolling 0 *) return_unit diff --git a/src/lib_store/test/alpha_utils.ml b/src/lib_store/test/alpha_utils.ml index 7a2f8e7bd4bc..10a22a5e33ad 100644 --- a/src/lib_store/test/alpha_utils.ml +++ b/src/lib_store/test/alpha_utils.ml @@ -89,12 +89,9 @@ module Account = struct let activator_account = new_account () let find pkh = - try - return - (Signature.Public_key_hash.Table.find known_accounts pkh - |> WithExceptions.Option.to_exn ~none:Not_found) - with Not_found -> - failwith "Missing account: %a" Signature.Public_key_hash.pp pkh + match Signature.Public_key_hash.Table.find known_accounts pkh with + | Some v -> return v + | None -> failwith "Missing account: %a" Signature.Public_key_hash.pp pkh let find_alternate pkh = let exception Found of t in diff --git a/src/lib_store/test/test_store.ml b/src/lib_store/test/test_store.ml index 5a24b05b5edf..39e742ff7a06 100644 --- a/src/lib_store/test/test_store.ml +++ b/src/lib_store/test/test_store.ml @@ -26,8 +26,6 @@ open Test_utils -let test_init _ = return_unit - let test_cycles store = let open Lwt_result_syntax in let chain_store = Store.main_chain_store store in @@ -41,14 +39,7 @@ let test_cycles store = in assert_presence_in_store chain_store blocks -let test_cases = - let wrap_test (s, f) = - let f _ = f in - wrap_test (s, f) - in - List.map - wrap_test - [("initialisation", test_init); ("store cycles", test_cycles)] +let test_cases = [wrap_test ("store cycles", fun _ store -> test_cycles store)] open Example_tree -- GitLab From 49ff894bdf872a0a8956de4114ecb29f5b4e077f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Proust?= Date: Tue, 15 Mar 2022 16:47:15 +0100 Subject: [PATCH 2/8] Store: Use local (rather than global) return/fail --- src/lib_store/block_store.ml | 1 + src/lib_store/cemented_block_store.ml | 1 + src/lib_store/consistency.ml | 4 +++- src/lib_store/reconstruction.ml | 1 + src/lib_store/snapshots.ml | 5 ++++- src/lib_store/store.ml | 2 ++ src/lib_store/stored_data.ml | 2 +- src/lib_store/test/alpha_utils.ml | 1 + 8 files changed, 14 insertions(+), 3 deletions(-) diff --git a/src/lib_store/block_store.ml b/src/lib_store/block_store.ml index 3282faac66d6..fcbb8d81a38e 100644 --- a/src/lib_store/block_store.ml +++ b/src/lib_store/block_store.ml @@ -1012,6 +1012,7 @@ let move_all_floating_stores block_store ~new_ro_store = return_unit) let check_store_consistency block_store ~cementing_highwatermark = + let open Lwt_tzresult_syntax in match Cemented_block_store.get_highest_cemented_level block_store.cemented_store with diff --git a/src/lib_store/cemented_block_store.ml b/src/lib_store/cemented_block_store.ml index bbe259865074..7360213496a6 100644 --- a/src/lib_store/cemented_block_store.ml +++ b/src/lib_store/cemented_block_store.ml @@ -518,6 +518,7 @@ let read_block_metadata cemented_store block_level = read_block_metadata cemented_store block_level let get_cemented_block_by_hash ~read_metadata (cemented_store : t) hash = + let open Lwt_tzresult_syntax in match get_cemented_block_level cemented_store hash with | None -> return_none | Some level -> diff --git a/src/lib_store/consistency.ml b/src/lib_store/consistency.ml index f6b3a0413607..dffbaf21952c 100644 --- a/src/lib_store/consistency.ml +++ b/src/lib_store/consistency.ml @@ -47,6 +47,7 @@ open Store_errors the cementing_highwatermark is consistent with the cemented store. *) let check_cementing_highwatermark ~cementing_highwatermark block_store = + let open Lwt_tzresult_syntax in let cemented_store = Block_store.cemented_block_store block_store in let highest_cemented_level = Cemented_block_store.get_highest_cemented_level cemented_store @@ -376,7 +377,8 @@ let lowest_metadata_entry metadata_file = | [] -> (* A metadata file is never empty *) assert false - | {Zip.filename; _} :: _ -> return_some (Int32.of_string filename) + | {Zip.filename; _} :: _ -> + Lwt_tzresult_syntax.return_some (Int32.of_string filename) with exn -> Lwt.fail exn (* Returns the lowest block level, from the cemented store, which is diff --git a/src/lib_store/reconstruction.ml b/src/lib_store/reconstruction.ml index b1c396bd3989..6c24d664e81e 100644 --- a/src/lib_store/reconstruction.ml +++ b/src/lib_store/reconstruction.ml @@ -684,6 +684,7 @@ let reconstruct_floating chain_store context_index ~user_activated_upgrades (* Only Full modes with any offset can be reconstructed *) let check_history_mode_compatibility chain_store savepoint genesis_block = + let open Lwt_tzresult_syntax in match Store.Chain.history_mode chain_store with | History_mode.(Full _) -> fail_when diff --git a/src/lib_store/snapshots.ml b/src/lib_store/snapshots.ml index 813560b688cb..0e9ac2e6a9ad 100644 --- a/src/lib_store/snapshots.ml +++ b/src/lib_store/snapshots.ml @@ -722,7 +722,9 @@ let ensure_valid_tmp_snapshot_path snapshot_tmp_dir = (Sys.file_exists (Naming.dir_path snapshot_tmp_dir)) (Cannot_create_tmp_export_directory (Naming.dir_path snapshot_tmp_dir)) -let ensure_valid_export_path = function +let ensure_valid_export_path = + let open Lwt_tzresult_syntax in + function | Some path -> fail_when (Sys.file_exists path) (Invalid_export_path path) | None -> return_unit @@ -2167,6 +2169,7 @@ module Make_snapshot_exporter (Exporter : EXPORTER) : Snapshot_exporter = struct (* Ensures that the history mode requested to export is compatible with the current storage. *) let check_history_mode chain_store ~rolling = + let open Lwt_tzresult_syntax in match (Store.Chain.history_mode chain_store : History_mode.t) with | Archive | Full _ -> return_unit | Rolling _ when rolling -> return_unit diff --git a/src/lib_store/store.ml b/src/lib_store/store.ml index d6de8e3540af..f44bb5c983f3 100644 --- a/src/lib_store/store.ml +++ b/src/lib_store/store.ml @@ -334,6 +334,7 @@ module Block = struct | None -> fail @@ Block_not_found {hash; distance = 0} let locked_read_block_by_level chain_store head level = + let open Lwt_tzresult_syntax in let distance = Int32.(to_int (sub (Block_repr.level head) level)) in if distance < 0 then fail @@ -2728,6 +2729,7 @@ let may_switch_history_mode ~store_dir ~context_dir genesis ~new_history_mode = let get_chain_store store chain_id = let chain_store = main_chain_store store in let rec loop chain_store = + let open Lwt_tzresult_syntax in if Chain_id.equal (Chain.chain_id chain_store) chain_id then return chain_store else diff --git a/src/lib_store/stored_data.ml b/src/lib_store/stored_data.ml index 5a28e67df037..d815700322d8 100644 --- a/src/lib_store/stored_data.ml +++ b/src/lib_store/stored_data.ml @@ -90,7 +90,7 @@ let write_file encoded_file data = let write (Stored_data v) data = Lwt_idle_waiter.force_idle v.scheduler (fun () -> - if v.cache = data then return_unit + if v.cache = data then Lwt_tzresult_syntax.return_unit else ( v.cache <- data ; write_file v.file data)) diff --git a/src/lib_store/test/alpha_utils.ml b/src/lib_store/test/alpha_utils.ml index 10a22a5e33ad..bc35ff0da9de 100644 --- a/src/lib_store/test/alpha_utils.ml +++ b/src/lib_store/test/alpha_utils.ml @@ -89,6 +89,7 @@ module Account = struct let activator_account = new_account () let find pkh = + let open Lwt_tzresult_syntax in match Signature.Public_key_hash.Table.find known_accounts pkh with | Some v -> return v | None -> failwith "Missing account: %a" Signature.Public_key_hash.pp pkh -- GitLab From fdf5b6c9173d3ee85e40e516468824dc58dbba70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Proust?= Date: Tue, 15 Mar 2022 16:47:15 +0100 Subject: [PATCH 3/8] Shell: Use local (rather than global) return/fail --- src/lib_shell/block_directory.ml | 1 + src/lib_shell/block_validator.ml | 21 +- src/lib_shell/block_validator_process.ml | 2 +- src/lib_shell/config_directory.ml | 1 + src/lib_shell/node.ml | 2 +- src/lib_shell/prevalidator.ml | 504 +++++++++--------- src/lib_shell/prevalidator_filters.ml | 6 +- src/lib_shell/test/shell_test_helpers.ml | 2 +- src/lib_shell/test/test_prevalidation_t.ml | 5 +- .../test_prevalidator_pending_operations.ml | 2 +- 10 files changed, 276 insertions(+), 270 deletions(-) diff --git a/src/lib_shell/block_directory.ml b/src/lib_shell/block_directory.ml index 2437cd777f41..13211e7111cc 100644 --- a/src/lib_shell/block_directory.ml +++ b/src/lib_shell/block_directory.ml @@ -58,6 +58,7 @@ let read_partial_context = | Some v -> raw_context_insert (k, Key v) acc) let build_raw_header_rpc_directory (module Proto : Block_services.PROTO) = + let open Lwt_tzresult_syntax in let dir : (Store.chain_store * Block_hash.t * Block_header.t) RPC_directory.t ref = ref RPC_directory.empty diff --git a/src/lib_shell/block_validator.ml b/src/lib_shell/block_validator.ml index 29d8f4b9e7e6..cc427e051354 100644 --- a/src/lib_shell/block_validator.ml +++ b/src/lib_shell/block_validator.ml @@ -117,16 +117,19 @@ module Worker = Worker.Make (Name) (Event) (Request) (Types) (Logger) type t = Worker.infinite Worker.queue Worker.t let check_chain_liveness chain_db hash (header : Block_header.t) = + let open Lwt_tzresult_syntax in let chain_store = Distributed_db.chain_store chain_db in match Store.Chain.expiration chain_store with | Some eol when Time.Protocol.(eol <= header.shell.timestamp) -> - fail @@ invalid_block hash - @@ Expired_chain - { - chain_id = Store.Chain.chain_id chain_store; - expiration = eol; - timestamp = header.shell.timestamp; - } + let error = + Expired_chain + { + chain_id = Store.Chain.chain_id chain_store; + expiration = eol; + timestamp = header.shell.timestamp; + } + in + fail (invalid_block hash error) | None | Some _ -> return_unit let is_already_validated chain_store hash = @@ -322,7 +325,7 @@ let metrics = Shell_metrics.Block_validator.init Name.base let on_launch _ _ (limits, start_testchain, db, validation_process) = let protocol_validator = Protocol_validator.create db in let invalid_blocks_after_precheck = Block_hash_ring.create 50 in - return + Lwt.return_ok { Types.protocol_validator; validation_process; @@ -427,7 +430,7 @@ let create limits db validation_process ~start_testchain = let on_completion = on_completion - let on_no_request _ = return_unit + let on_no_request _ = Lwt_tzresult_syntax.return_unit end in Worker.launch table diff --git a/src/lib_shell/block_validator_process.ml b/src/lib_shell/block_validator_process.ml index d1f2fbd0d4ee..2e1e4c4a0e36 100644 --- a/src/lib_shell/block_validator_process.ml +++ b/src/lib_shell/block_validator_process.ml @@ -328,7 +328,7 @@ module Internal_validator_process = struct let* context = Store.Block.context validator.chain_store forking_block in Block_validation.init_test_chain context forked_header - let reconfigure_event_logging _ _ = return_unit + let reconfigure_event_logging _ _ = Lwt_tzresult_syntax.return_unit end (** Block validation using an external process *) diff --git a/src/lib_shell/config_directory.ml b/src/lib_shell/config_directory.ml index 38d28b977f50..5a0cd4a82c6e 100644 --- a/src/lib_shell/config_directory.ml +++ b/src/lib_shell/config_directory.ml @@ -25,6 +25,7 @@ let build_rpc_directory ~user_activated_upgrades ~user_activated_protocol_overrides ~mainchain_validator store = + let open Lwt_tzresult_syntax in let register endpoint f directory = RPC_directory.register directory endpoint f in diff --git a/src/lib_shell/node.ml b/src/lib_shell/node.ml index d30a1b0cef5c..4be28075a774 100644 --- a/src/lib_shell/node.ml +++ b/src/lib_shell/node.ml @@ -368,5 +368,5 @@ let build_rpc_directory node = node.store) ; merge (Version_directory.rpc_directory node.p2p) ; register0 RPC_service.error_service (fun () () -> - return (Data_encoding.Json.schema Error_monad.error_encoding)) ; + Lwt.return_ok (Data_encoding.Json.schema Error_monad.error_encoding)) ; !dir diff --git a/src/lib_shell/prevalidator.ml b/src/lib_shell/prevalidator.ml index 024a4d932636..f99c25f8ea96 100644 --- a/src/lib_shell/prevalidator.ml +++ b/src/lib_shell/prevalidator.ml @@ -1303,258 +1303,256 @@ module Make let build_rpc_directory w = lazy - (let dir : state RPC_directory.t ref = ref RPC_directory.empty in - let module Proto_services = - Block_services.Make (Filter.Proto) (Filter.Proto) - in - dir := - RPC_directory.register - !dir - (Proto_services.S.Mempool.get_filter RPC_path.open_root) - (fun pv params () -> - return - (get_filter_config_json - ~include_default:params#include_default - pv)) ; - dir := - RPC_directory.register - !dir - (Proto_services.S.Mempool.set_filter RPC_path.open_root) - (fun pv () obj -> - let open Lwt_syntax in - let* () = - try - let config = - Data_encoding.Json.destruct - Filter.Mempool.config_encoding - obj - in - pv.filter_config <- config ; - Lwt.return_unit - with _ -> Event.(emit invalid_mempool_filter_configuration) () - in - return_ok (get_filter_config_json pv)) ; - (* Ban an operation (from its given hash): remove it from the - mempool if present. Add it to the set pv.banned_operations - to prevent it from being fetched/processed/injected in the - future. - Note: If the baker has already received the operation, then - it's necessary to restart it manually to flush the operation - from it. *) - dir := - RPC_directory.register - !dir - (Proto_services.S.Mempool.ban_operation RPC_path.open_root) - (fun _pv () oph -> - Worker.Queue.push_request_and_wait w (Request.Ban oph)) ; - (* Unban an operation (from its given hash): remove it from the - set pv.banned_operations (nothing happens if it was not banned). *) - dir := - RPC_directory.register - !dir - (Proto_services.S.Mempool.unban_operation RPC_path.open_root) - (fun pv () oph -> - pv.shell.banned_operations <- - Operation_hash.Set.remove oph pv.shell.banned_operations ; - return_unit) ; - (* Unban all operations: clear the set pv.banned_operations. *) - dir := - RPC_directory.register - !dir - (Proto_services.S.Mempool.unban_all_operations RPC_path.open_root) - (fun pv () () -> - pv.shell.banned_operations <- Operation_hash.Set.empty ; - return_unit) ; - dir := - RPC_directory.gen_register - !dir - (Proto_services.S.Mempool.pending_operations RPC_path.open_root) - (fun pv params () -> - let map_op_error oph (op, error) acc = - op.Prevalidation.protocol |> fun res -> - Operation_hash.Map.add oph (res, error) acc - in - let applied = - if params#applied then - List.rev_map - (fun op -> - (op.Prevalidation.hash, op.Prevalidation.protocol)) - pv.shell.classification.applied_rev - else [] - in - let filter f map = - Operation_hash.Map.fold f map Operation_hash.Map.empty - in - let refused = - if params#refused then - filter - map_op_error - (Classification.map pv.shell.classification.refused) - else Operation_hash.Map.empty - in - let outdated = - if params#outdated then - filter - map_op_error - (Classification.map pv.shell.classification.outdated) - else Operation_hash.Map.empty - in - let branch_refused = - if params#branch_refused then - filter - map_op_error - (Classification.map pv.shell.classification.branch_refused) - else Operation_hash.Map.empty - in - let branch_delayed = - if params#branch_delayed then - filter - map_op_error - (Classification.map pv.shell.classification.branch_delayed) - else Operation_hash.Map.empty - in - let unprocessed = - Pending_ops.fold - (fun _prio oph op acc -> - Operation_hash.Map.add oph op.protocol acc) - pv.shell.pending - Operation_hash.Map.empty - in - (* FIXME https://gitlab.com/tezos/tezos/-/issues/2250 - - We merge prechecked operation with applied operation - so that the encoding of the RPC does not need to be - changed. Once prechecking will be done by the protocol - and not the plugin, we will change the encoding to - reflect that. *) - let prechecked_with_applied = - if params#applied then - (Operation_hash.Map.bindings pv.shell.classification.prechecked - |> List.rev_map (fun (oph, op) -> - (oph, op.Prevalidation.protocol))) - @ applied - else applied - in - let pending_operations = - { - Proto_services.Mempool.applied = prechecked_with_applied; - refused; - outdated; - branch_refused; - branch_delayed; - unprocessed; - } - in - Proto_services.Mempool.pending_operations_version_dispatcher - ~version:params#version - pending_operations) ; - dir := - RPC_directory.register - !dir - (Proto_services.S.Mempool.request_operations RPC_path.open_root) - (fun pv t () -> - pv.shell.parameters.tools.send_get_current_head ?peer:t#peer_id () ; - return_unit) ; - dir := - RPC_directory.gen_register - !dir - (Proto_services.S.Mempool.monitor_operations RPC_path.open_root) - (fun pv params () -> - Lwt_mutex.with_lock pv.lock @@ fun () -> - let (op_stream, stopper) = - Lwt_watcher.create_stream pv.operation_stream - in - (* Convert ops *) - let fold_op hash (Prevalidation.{protocol; _}, error) acc = - (hash, protocol, error) :: acc - in - (* First call : retrieve the current set of op from the mempool *) - let applied = - if params#applied then - List.map - (fun op -> (op.Prevalidation.hash, op.protocol, [])) - pv.shell.classification.applied_rev - else [] - in - (* FIXME https://gitlab.com/tezos/tezos/-/issues/2250 - - For the moment, applied and prechecked operations are - handled the same way for the user point of view. *) - let prechecked = - if params#applied then - Operation_hash.Map.fold - (fun hash op acc -> - (hash, op.Prevalidation.protocol, []) :: acc) - pv.shell.classification.prechecked - [] - else [] - in - let refused = - if params#refused then - Operation_hash.Map.fold - fold_op - (Classification.map pv.shell.classification.refused) - [] - else [] - in - let branch_refused = - if params#branch_refused then - Operation_hash.Map.fold - fold_op - (Classification.map pv.shell.classification.branch_refused) - [] - else [] - in - let branch_delayed = - if params#branch_delayed then - Operation_hash.Map.fold - fold_op - (Classification.map pv.shell.classification.branch_delayed) - [] - else [] - in - let current_mempool = - List.concat_map - (List.map (function - | (hash, op, []) -> ((hash, op), None) - | (hash, op, errors) -> ((hash, op), Some errors))) - [applied; prechecked; refused; branch_refused; branch_delayed] - in - let current_mempool = ref (Some current_mempool) in - let filter_result = function - | `Prechecked | `Applied -> params#applied - | `Refused _ -> params#refused - | `Outdated _ -> params#outdated - | `Branch_refused _ -> params#branch_refused - | `Branch_delayed _ -> params#branch_delayed - in - let rec next () = - let open Lwt_syntax in - match !current_mempool with - | Some mempool -> - current_mempool := None ; - Lwt.return_some mempool - | None -> ( - let* o = Lwt_stream.get op_stream in - match o with - | Some (kind, op) when filter_result kind -> - let errors = - match kind with - | `Prechecked | `Applied -> None - | `Branch_delayed errors - | `Branch_refused errors - | `Refused errors - | `Outdated errors -> - Some errors - in - Lwt.return_some - [(Prevalidation.(op.hash, op.protocol), errors)] - | Some _ -> next () - | None -> Lwt.return_none) - in - let shutdown () = Lwt_watcher.shutdown stopper in - RPC_answer.return_stream {next; shutdown}) ; - !dir) + (let open Lwt_tzresult_syntax in + let dir : state RPC_directory.t ref = ref RPC_directory.empty in + let module Proto_services = + Block_services.Make (Filter.Proto) (Filter.Proto) + in + dir := + RPC_directory.register + !dir + (Proto_services.S.Mempool.get_filter RPC_path.open_root) + (fun pv params () -> + return + (get_filter_config_json + ~include_default:params#include_default + pv)) ; + dir := + RPC_directory.register + !dir + (Proto_services.S.Mempool.set_filter RPC_path.open_root) + (fun pv () obj -> + let open Lwt_syntax in + let* () = + try + let config = + Data_encoding.Json.destruct Filter.Mempool.config_encoding obj + in + pv.filter_config <- config ; + Lwt.return_unit + with _ -> Event.(emit invalid_mempool_filter_configuration) () + in + return_ok (get_filter_config_json pv)) ; + (* Ban an operation (from its given hash): remove it from the + mempool if present. Add it to the set pv.banned_operations + to prevent it from being fetched/processed/injected in the + future. + Note: If the baker has already received the operation, then + it's necessary to restart it manually to flush the operation + from it. *) + dir := + RPC_directory.register + !dir + (Proto_services.S.Mempool.ban_operation RPC_path.open_root) + (fun _pv () oph -> + Worker.Queue.push_request_and_wait w (Request.Ban oph)) ; + (* Unban an operation (from its given hash): remove it from the + set pv.banned_operations (nothing happens if it was not banned). *) + dir := + RPC_directory.register + !dir + (Proto_services.S.Mempool.unban_operation RPC_path.open_root) + (fun pv () oph -> + pv.shell.banned_operations <- + Operation_hash.Set.remove oph pv.shell.banned_operations ; + return_unit) ; + (* Unban all operations: clear the set pv.banned_operations. *) + dir := + RPC_directory.register + !dir + (Proto_services.S.Mempool.unban_all_operations RPC_path.open_root) + (fun pv () () -> + pv.shell.banned_operations <- Operation_hash.Set.empty ; + return_unit) ; + dir := + RPC_directory.gen_register + !dir + (Proto_services.S.Mempool.pending_operations RPC_path.open_root) + (fun pv params () -> + let map_op_error oph (op, error) acc = + op.Prevalidation.protocol |> fun res -> + Operation_hash.Map.add oph (res, error) acc + in + let applied = + if params#applied then + List.rev_map + (fun op -> (op.Prevalidation.hash, op.Prevalidation.protocol)) + pv.shell.classification.applied_rev + else [] + in + let filter f map = + Operation_hash.Map.fold f map Operation_hash.Map.empty + in + let refused = + if params#refused then + filter + map_op_error + (Classification.map pv.shell.classification.refused) + else Operation_hash.Map.empty + in + let outdated = + if params#outdated then + filter + map_op_error + (Classification.map pv.shell.classification.outdated) + else Operation_hash.Map.empty + in + let branch_refused = + if params#branch_refused then + filter + map_op_error + (Classification.map pv.shell.classification.branch_refused) + else Operation_hash.Map.empty + in + let branch_delayed = + if params#branch_delayed then + filter + map_op_error + (Classification.map pv.shell.classification.branch_delayed) + else Operation_hash.Map.empty + in + let unprocessed = + Pending_ops.fold + (fun _prio oph op acc -> + Operation_hash.Map.add oph op.protocol acc) + pv.shell.pending + Operation_hash.Map.empty + in + (* FIXME https://gitlab.com/tezos/tezos/-/issues/2250 + + We merge prechecked operation with applied operation + so that the encoding of the RPC does not need to be + changed. Once prechecking will be done by the protocol + and not the plugin, we will change the encoding to + reflect that. *) + let prechecked_with_applied = + if params#applied then + (Operation_hash.Map.bindings pv.shell.classification.prechecked + |> List.rev_map (fun (oph, op) -> + (oph, op.Prevalidation.protocol))) + @ applied + else applied + in + let pending_operations = + { + Proto_services.Mempool.applied = prechecked_with_applied; + refused; + outdated; + branch_refused; + branch_delayed; + unprocessed; + } + in + Proto_services.Mempool.pending_operations_version_dispatcher + ~version:params#version + pending_operations) ; + dir := + RPC_directory.register + !dir + (Proto_services.S.Mempool.request_operations RPC_path.open_root) + (fun pv t () -> + pv.shell.parameters.tools.send_get_current_head ?peer:t#peer_id () ; + return_unit) ; + dir := + RPC_directory.gen_register + !dir + (Proto_services.S.Mempool.monitor_operations RPC_path.open_root) + (fun pv params () -> + Lwt_mutex.with_lock pv.lock @@ fun () -> + let (op_stream, stopper) = + Lwt_watcher.create_stream pv.operation_stream + in + (* Convert ops *) + let fold_op hash (Prevalidation.{protocol; _}, error) acc = + (hash, protocol, error) :: acc + in + (* First call : retrieve the current set of op from the mempool *) + let applied = + if params#applied then + List.map + (fun op -> (op.Prevalidation.hash, op.protocol, [])) + pv.shell.classification.applied_rev + else [] + in + (* FIXME https://gitlab.com/tezos/tezos/-/issues/2250 + + For the moment, applied and prechecked operations are + handled the same way for the user point of view. *) + let prechecked = + if params#applied then + Operation_hash.Map.fold + (fun hash op acc -> + (hash, op.Prevalidation.protocol, []) :: acc) + pv.shell.classification.prechecked + [] + else [] + in + let refused = + if params#refused then + Operation_hash.Map.fold + fold_op + (Classification.map pv.shell.classification.refused) + [] + else [] + in + let branch_refused = + if params#branch_refused then + Operation_hash.Map.fold + fold_op + (Classification.map pv.shell.classification.branch_refused) + [] + else [] + in + let branch_delayed = + if params#branch_delayed then + Operation_hash.Map.fold + fold_op + (Classification.map pv.shell.classification.branch_delayed) + [] + else [] + in + let current_mempool = + List.concat_map + (List.map (function + | (hash, op, []) -> ((hash, op), None) + | (hash, op, errors) -> ((hash, op), Some errors))) + [applied; prechecked; refused; branch_refused; branch_delayed] + in + let current_mempool = ref (Some current_mempool) in + let filter_result = function + | `Prechecked | `Applied -> params#applied + | `Refused _ -> params#refused + | `Outdated _ -> params#outdated + | `Branch_refused _ -> params#branch_refused + | `Branch_delayed _ -> params#branch_delayed + in + let rec next () = + let open Lwt_syntax in + match !current_mempool with + | Some mempool -> + current_mempool := None ; + Lwt.return_some mempool + | None -> ( + let* o = Lwt_stream.get op_stream in + match o with + | Some (kind, op) when filter_result kind -> + let errors = + match kind with + | `Prechecked | `Applied -> None + | `Branch_delayed errors + | `Branch_refused errors + | `Refused errors + | `Outdated errors -> + Some errors + in + Lwt.return_some + [(Prevalidation.(op.hash, op.protocol), errors)] + | Some _ -> next () + | None -> Lwt.return_none) + in + let shutdown () = Lwt_watcher.shutdown stopper in + RPC_answer.return_stream {next; shutdown}) ; + !dir) (** Module implementing the events at the {!Worker} level. Contrary to {!Requests}, these functions depend on [Worker]. *) @@ -1749,7 +1747,7 @@ module Make | View (Notify _) | View Leftover | View (Arrived _) | View Advertise -> Event.(emit request_completed_debug) (Request.view r, st) - let on_no_request _ = return_unit + let on_no_request _ = Lwt_tzresult_syntax.return_unit end let table = Worker.create_table Queue diff --git a/src/lib_shell/prevalidator_filters.ml b/src/lib_shell/prevalidator_filters.ml index e895ea42a7b2..0d982d957804 100644 --- a/src/lib_shell/prevalidator_filters.ml +++ b/src/lib_shell/prevalidator_filters.ml @@ -104,11 +104,13 @@ module No_filter (Proto : Registered_protocol.T) = struct type state = unit - let init _ ?validation_state:_ ~predecessor:_ () = return_unit + let init _ ?validation_state:_ ~predecessor:_ () = + Lwt_tzresult_syntax.return_unit let remove ~filter_state _ = filter_state - let on_flush _ _ ?validation_state:_ ~predecessor:_ () = return_unit + let on_flush _ _ ?validation_state:_ ~predecessor:_ () = + Lwt_tzresult_syntax.return_unit let precheck _ ~filter_state:_ ~validation_state:_ _ _ ~nb_successful_prechecks:_ = diff --git a/src/lib_shell/test/shell_test_helpers.ml b/src/lib_shell/test/shell_test_helpers.ml index 2a6b9fd870d3..87aaddaa159f 100644 --- a/src/lib_shell/test/shell_test_helpers.ml +++ b/src/lib_shell/test/shell_test_helpers.ml @@ -104,4 +104,4 @@ let init_mock_p2p chain_name = in let message_cfg = Distributed_db_message.cfg chain_name in let c_meta = {disable_mempool = true; private_node = true} in - return (P2p.faked_network message_cfg peer_metadata_cfg c_meta) + Lwt.return_ok (P2p.faked_network message_cfg peer_metadata_cfg c_meta) diff --git a/src/lib_shell/test/test_prevalidation_t.ml b/src/lib_shell/test/test_prevalidation_t.ml index f61b0e1880c0..0d5b3964e764 100644 --- a/src/lib_shell/test/test_prevalidation_t.ml +++ b/src/lib_shell/test/test_prevalidation_t.ml @@ -43,7 +43,7 @@ module Mock_protocol : (* We need to override this function (so that it's not [assert false]), because Prevalidation.create calls this function, so we need it to work in all tests below. *) - return () + Lwt_tzresult_syntax.return_unit end module Internal_for_tests = Tezos_shell.Prevalidation.Internal_for_tests @@ -98,7 +98,8 @@ let create_prevalidation Internal_for_tests.CHAIN_STORE with type chain_store = unit = struct type chain_store = unit - let context () _block : Tezos_context.Context.t tzresult Lwt.t = return ctxt + let context () _block : Tezos_context.Context.t tzresult Lwt.t = + Lwt_tzresult_syntax.return ctxt let chain_id () = Init.chain_id end in diff --git a/src/lib_shell/test/test_prevalidator_pending_operations.ml b/src/lib_shell/test/test_prevalidator_pending_operations.ml index d46ffbd49c8a..ff8d0ff7b9a9 100644 --- a/src/lib_shell/test/test_prevalidator_pending_operations.ml +++ b/src/lib_shell/test/test_prevalidator_pending_operations.ml @@ -88,7 +88,7 @@ let test_fold_es_ordering = test_iterators_ordering ~name:"fold_es" ~iterator:Pending_ops.fold_es - return_unit + Lwt_tzresult_syntax.return_unit (* 2. Test partial iteration with fold_es *) -- GitLab From c1eca56dffbcb9c9a5cf685b49387be56008d00d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Proust?= Date: Tue, 15 Mar 2022 16:47:15 +0100 Subject: [PATCH 4/8] Context: Use local (rather than global) return/fail --- src/lib_context/context.ml | 1 + src/lib_context/context_dump.ml | 10 +++++----- src/lib_context/memory/context.ml | 1 + 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/lib_context/context.ml b/src/lib_context/context.ml index 98d9045ee668..5216827b2e1f 100644 --- a/src/lib_context/context.ml +++ b/src/lib_context/context.ml @@ -311,6 +311,7 @@ module Make (Encoding : module type of Tezos_context_encoding.Context) = struct let get_hash_version _c = Context_hash.Version.of_int 0 let set_hash_version c v = + let open Lwt_tzresult_syntax in if Context_hash.Version.(of_int 0 = v) then return c else fail (Tezos_context_helpers.Context.Unsupported_context_hash_version v) diff --git a/src/lib_context/context_dump.ml b/src/lib_context/context_dump.ml index cc921e7bf873..a957d319af2a 100644 --- a/src/lib_context/context_dump.ml +++ b/src/lib_context/context_dump.ml @@ -323,7 +323,7 @@ module Make (I : Dump_interface) = struct notify ()) let dump_context_fd idx context_hash ~context_fd = - let open Lwt_syntax in + let open Lwt_tzresult_syntax in (* Dumping *) let buf = Buffer.create 1_000_000 in let written = ref 0 in @@ -338,7 +338,7 @@ module Make (I : Dump_interface) = struct in Lwt.catch (fun () -> - let* o = I.checkout idx context_hash in + let*! o = I.checkout idx context_hash in match o with | None -> (* FIXME: dirty *) @@ -356,14 +356,14 @@ module Make (I : Dump_interface) = struct else Format.asprintf "%dKiB" (!written / 1_024))) (fun notify -> set_root buf ; - let* elements = + let*! elements = I.context_tree ctxt |> serialize_tree ~notify ~maybe_flush buf in let parents = I.context_parents ctxt in set_eoc buf (I.context_info ctxt) parents ; set_end buf ; - let* () = flush () in - return_ok elements)) + let*! () = flush () in + return elements)) (function | Unix.Unix_error (e, _, _) -> fail @@ System_write_error (Unix.error_message e) diff --git a/src/lib_context/memory/context.ml b/src/lib_context/memory/context.ml index b80cd99a549d..f60b8176110f 100644 --- a/src/lib_context/memory/context.ml +++ b/src/lib_context/memory/context.ml @@ -210,6 +210,7 @@ module Make (Encoding : module type of Tezos_context_encoding.Context) = struct let get_hash_version _c = Context_hash.Version.of_int 0 let set_hash_version c v = + let open Lwt_tzresult_syntax in if Context_hash.Version.(of_int 0 = v) then return c else fail (Tezos_context_helpers.Context.Unsupported_context_hash_version v) -- GitLab From f61c243fe53afabc658368a34755579a4b5202e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Proust?= Date: Tue, 15 Mar 2022 16:47:15 +0100 Subject: [PATCH 5/8] Node: Use local (rather than global) return/fail --- src/bin_node/node_config_validation.ml | 77 +++++++++++++------------- 1 file changed, 39 insertions(+), 38 deletions(-) diff --git a/src/bin_node/node_config_validation.ml b/src/bin_node/node_config_validation.ml index a56cd130d5c7..d40f73b56f06 100644 --- a/src/bin_node/node_config_validation.ml +++ b/src/bin_node/node_config_validation.ml @@ -213,13 +213,13 @@ let invalid_pow = "p2p.expected-proof-of-work") ("proof-of-work", Data_encoding.float) -let validate_expected_pow (config : Node_config_file.t) : t = +let validate_expected_pow (config : Node_config_file.t) : + (t, 'error) result Lwt.t = unless (0. <= config.p2p.expected_pow && config.p2p.expected_pow <= 256.) ~event:invalid_pow ~payload:config.p2p.expected_pow - -let validate_expected_pow config = Lwt.return_ok (validate_expected_pow config) + |> Lwt.return_ok (* Validate addresses. *) @@ -406,41 +406,42 @@ let target_number_of_known_points_lower_than_maximum_conn = ("maximum", Data_encoding.int16) let validate_connections (config : Node_config_file.t) = - let limits = config.p2p.limits in - when_ - (limits.min_connections > limits.expected_connections) - ~event:connections_min_expected - ~payload:(limits.min_connections, limits.expected_connections) - @ when_ - (limits.expected_connections > limits.max_connections) - ~event:connections_expected_max - ~payload:(limits.expected_connections, limits.max_connections) - @ Option.fold - limits.max_known_peer_ids - ~none:[] - ~some:(fun (max_known_peer_ids, target_known_peer_ids) -> - when_ - (target_known_peer_ids > max_known_peer_ids) - ~event:target_number_of_known_peers_greater_than_maximum - ~payload:(target_known_peer_ids, max_known_peer_ids) - @ when_ - (limits.max_connections > target_known_peer_ids) - ~event:target_number_of_known_peers_lower_than_maximum_conn - ~payload:(target_known_peer_ids, limits.max_connections)) - @ Option.fold - limits.max_known_points - ~none:[] - ~some:(fun (max_known_points, target_known_points) -> - when_ - (target_known_points > max_known_points) - ~event:target_number_of_known_points_greater_than_maximum - ~payload:(max_known_points, target_known_points) - @ when_ - (limits.max_connections > target_known_points) - ~event:target_number_of_known_points_lower_than_maximum_conn - ~payload:(target_known_points, limits.max_connections)) - -let validate_connections config = Lwt.return_ok (validate_connections config) + let validated_connections = + let limits = config.p2p.limits in + when_ + (limits.min_connections > limits.expected_connections) + ~event:connections_min_expected + ~payload:(limits.min_connections, limits.expected_connections) + @ when_ + (limits.expected_connections > limits.max_connections) + ~event:connections_expected_max + ~payload:(limits.expected_connections, limits.max_connections) + @ Option.fold + limits.max_known_peer_ids + ~none:[] + ~some:(fun (max_known_peer_ids, target_known_peer_ids) -> + when_ + (target_known_peer_ids > max_known_peer_ids) + ~event:target_number_of_known_peers_greater_than_maximum + ~payload:(target_known_peer_ids, max_known_peer_ids) + @ when_ + (limits.max_connections > target_known_peer_ids) + ~event:target_number_of_known_peers_lower_than_maximum_conn + ~payload:(target_known_peer_ids, limits.max_connections)) + @ Option.fold + limits.max_known_points + ~none:[] + ~some:(fun (max_known_points, target_known_points) -> + when_ + (target_known_points > max_known_points) + ~event:target_number_of_known_points_greater_than_maximum + ~payload:(max_known_points, target_known_points) + @ when_ + (limits.max_connections > target_known_points) + ~event:target_number_of_known_points_lower_than_maximum_conn + ~payload:(target_known_points, limits.max_connections)) + in + Lwt.return_ok validated_connections (* Main validation passes. *) -- GitLab From 4274b828335bf53c241ec59ad32bb0b2812a4375 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Proust?= Date: Tue, 15 Mar 2022 16:47:15 +0100 Subject: [PATCH 6/8] Shell-services: Use local (rather than global) return/fail --- src/lib_shell_services/block_services.ml | 4 +++- src/lib_shell_services/chain_services.ml | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/lib_shell_services/block_services.ml b/src/lib_shell_services/block_services.ml index ff9c1ab7fd63..6447b71d49c9 100644 --- a/src/lib_shell_services/block_services.ml +++ b/src/lib_shell_services/block_services.ml @@ -1346,7 +1346,9 @@ module Make (Proto : PROTO) (Next_proto : PROTO) = struct let hash ctxt = let f = make_call0 S.hash ctxt in fun ?(chain = `Main) ?(block = `Head 0) () -> - match block with `Hash (h, 0) -> return h | _ -> f chain block () () + match block with + | `Hash (h, 0) -> Lwt.return_ok h + | _ -> f chain block () () let header ctxt = let f = make_call0 S.header ctxt in diff --git a/src/lib_shell_services/chain_services.ml b/src/lib_shell_services/chain_services.ml index 41c31d22321d..056d8db96496 100644 --- a/src/lib_shell_services/chain_services.ml +++ b/src/lib_shell_services/chain_services.ml @@ -218,7 +218,7 @@ let make_call1 s ctxt chain a q p = let chain_id ctxt = let f = make_call0 S.chain_id ctxt in fun ?(chain = `Main) () -> - match chain with `Hash h -> return h | _ -> f chain () () + match chain with `Hash h -> Lwt.return_ok h | _ -> f chain () () let checkpoint ctxt ?(chain = `Main) () = make_call0 S.checkpoint ctxt chain () () -- GitLab From 2099134805ebf221b700ce2df0133cf3b887c4db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Proust?= Date: Tue, 15 Mar 2022 16:47:15 +0100 Subject: [PATCH 7/8] Validation: Use local (rather than global) return/fail --- src/lib_validation/block_validation.ml | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/src/lib_validation/block_validation.ml b/src/lib_validation/block_validation.ml index 15ad14d60e99..290cd3f3b3d6 100644 --- a/src/lib_validation/block_validation.ml +++ b/src/lib_validation/block_validation.ml @@ -289,6 +289,7 @@ module Make (Proto : Registered_protocol.T) = struct return_unit let parse_block_header block_hash (block_header : Block_header.t) = + let open Lwt_tzresult_syntax in match Data_encoding.Binary.of_bytes_opt Proto.block_header_data_encoding @@ -349,6 +350,7 @@ module Make (Proto : Registered_protocol.T) = struct let invalid_block = invalid_block block_hash in List.mapi_es (fun pass -> + let open Lwt_tzresult_syntax in List.map_es (fun op -> let op_hash = Operation.hash op in match @@ -358,7 +360,6 @@ module Make (Proto : Registered_protocol.T) = struct with | None -> fail (invalid_block (Cannot_parse_operation op_hash)) | Some protocol_data -> - let open Lwt_tzresult_syntax in let op = {Proto.shell = op.shell; protocol_data} in let allowed_pass = Proto.acceptable_passes op in let* () = @@ -1002,9 +1003,9 @@ module Make (Proto : Registered_protocol.T) = struct let precheck chain_id ~(predecessor_block_header : Block_header.t) ~predecessor_block_hash ~predecessor_context ~cache ~(block_header : Block_header.t) operations = - let open Lwt_syntax in + let open Lwt_tzresult_syntax in let block_hash = Block_header.hash block_header in - let* r = + let*! r = precheck block_hash chain_id @@ -1016,10 +1017,8 @@ module Make (Proto : Registered_protocol.T) = struct operations in match r with - | Error err -> - Error_monad.fail - (invalid_block block_hash (Economic_protocol_error err)) - | Ok () -> return_ok_unit + | Error err -> fail (invalid_block block_hash (Economic_protocol_error err)) + | Ok () -> return_unit end let assert_no_duplicate_operations block_hash live_operations operations = @@ -1147,8 +1146,7 @@ let apply ?cached_result c ~cache block_header operations = in match r with | Error (Exn (Unix.Unix_error (errno, fn, msg)) :: _) -> - Error_monad.fail - (System_error {errno = Unix.error_message errno; fn; msg}) + fail (System_error {errno = Unix.error_message errno; fn; msg}) | (Ok _ | Error _) as res -> Lwt.return res let precheck ~chain_id ~predecessor_block_header ~predecessor_block_hash @@ -1226,8 +1224,8 @@ let preapply ~chain_id ~user_activated_upgrades ~live_operations ~predecessor_context ~predecessor_shell_header ~predecessor_hash ~predecessor_max_operations_ttl ~predecessor_block_metadata_hash ~predecessor_ops_metadata_hash operations = - let open Lwt_syntax in - let* r = + let open Lwt_tzresult_syntax in + let*! r = preapply ~chain_id ~user_activated_upgrades @@ -1246,6 +1244,5 @@ let preapply ~chain_id ~user_activated_upgrades in match r with | Error (Exn (Unix.Unix_error (errno, fn, msg)) :: _) -> - Error_monad.fail - (System_error {errno = Unix.error_message errno; fn; msg}) + fail (System_error {errno = Unix.error_message errno; fn; msg}) | (Ok _ | Error _) as res -> Lwt.return res -- GitLab From 4689dc2ec8fd61033f5b1e8e87a75950f9c31631 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Proust?= Date: Thu, 17 Mar 2022 09:48:41 +0100 Subject: [PATCH 8/8] Store: minor code simplification --- src/lib_store/consistency.ml | 76 ++++++++++++++++++------------------ 1 file changed, 37 insertions(+), 39 deletions(-) diff --git a/src/lib_store/consistency.ml b/src/lib_store/consistency.ml index dffbaf21952c..70426005c0ec 100644 --- a/src/lib_store/consistency.ml +++ b/src/lib_store/consistency.ml @@ -363,23 +363,25 @@ let lowest_cemented_block cemented_block_files = (* Returns the lowest block level of a cemented metadata file. *) let lowest_metadata_entry metadata_file = - try - let metadata_file_path = Naming.file_path metadata_file in - let in_file = Zip.open_in metadata_file_path in - let entries = Zip.entries in_file in - let asc_entries = - List.sort - (fun {Zip.filename = a; _} {filename = b; _} -> - Int.compare (int_of_string a) (int_of_string b)) - entries - in - match asc_entries with - | [] -> - (* A metadata file is never empty *) - assert false - | {Zip.filename; _} :: _ -> - Lwt_tzresult_syntax.return_some (Int32.of_string filename) - with exn -> Lwt.fail exn + let metadata_file_path = Naming.file_path metadata_file in + let in_file = Zip.open_in metadata_file_path in + let entries = Zip.entries in_file in + match entries with + | [] -> + (* A metadata file is never empty *) + assert false + | entry :: entries -> + let lowest_entry = + List.fold_left + (fun lowest entry -> + let entry = entry.Zip.filename in + if Compare.Int.(int_of_string lowest <= int_of_string entry) then + lowest + else entry) + entry.Zip.filename + entries + in + Int32.of_string lowest_entry (* Returns the lowest block level, from the cemented store, which is associated to some block metadata *) @@ -388,29 +390,25 @@ let lowest_cemented_metadata cemented_dir = let* metadata_files = Cemented_block_store.load_metadata_table cemented_dir in match metadata_files with | Some metadata_files -> - let rec aux = function - | [] -> return_none - | {Cemented_block_store.metadata_file; start_level; end_level} :: tl - -> ( - let* v = - Lwt.catch - (fun () -> - let* v = lowest_metadata_entry metadata_file in - return_some v) - (function - | _ -> - (* Can be the case if the metadata file is - corrupted. Raise a warning and continue the - search in the next metadata file. *) - let*! () = - Store_events.( - emit warning_missing_metadata (start_level, end_level)) - in - return_none) - in - match v with Some v -> return v | None -> aux tl) + let*! m = + Seq_s.of_seq (Array.to_seq metadata_files) + |> Seq_s.filter_map_s + (fun {Cemented_block_store.metadata_file; start_level; end_level} + -> + match lowest_metadata_entry metadata_file with + | v -> Lwt.return_some v + | exception _ -> + (* Can be the case if the metadata file is + corrupted. Raise a warning and continue the + search in the next metadata file. *) + let*! () = + Store_events.( + emit warning_missing_metadata (start_level, end_level)) + in + Lwt.return_none) + |> Seq_s.first in - aux (Array.to_list metadata_files) + return m | None -> return_none (* Returns both the lowest block and the lowest block with metadata -- GitLab