diff --git a/src/bin_node/node_config_validation.ml b/src/bin_node/node_config_validation.ml index a56cd130d5c7209c034ebdff2b042076058dbd26..d40f73b56f0651864c8748790ed661574ef43fd5 100644 --- a/src/bin_node/node_config_validation.ml +++ b/src/bin_node/node_config_validation.ml @@ -213,13 +213,13 @@ let invalid_pow = "p2p.expected-proof-of-work") ("proof-of-work", Data_encoding.float) -let validate_expected_pow (config : Node_config_file.t) : t = +let validate_expected_pow (config : Node_config_file.t) : + (t, 'error) result Lwt.t = unless (0. <= config.p2p.expected_pow && config.p2p.expected_pow <= 256.) ~event:invalid_pow ~payload:config.p2p.expected_pow - -let validate_expected_pow config = Lwt.return_ok (validate_expected_pow config) + |> Lwt.return_ok (* Validate addresses. *) @@ -406,41 +406,42 @@ let target_number_of_known_points_lower_than_maximum_conn = ("maximum", Data_encoding.int16) let validate_connections (config : Node_config_file.t) = - let limits = config.p2p.limits in - when_ - (limits.min_connections > limits.expected_connections) - ~event:connections_min_expected - ~payload:(limits.min_connections, limits.expected_connections) - @ when_ - (limits.expected_connections > limits.max_connections) - ~event:connections_expected_max - ~payload:(limits.expected_connections, limits.max_connections) - @ Option.fold - limits.max_known_peer_ids - ~none:[] - ~some:(fun (max_known_peer_ids, target_known_peer_ids) -> - when_ - (target_known_peer_ids > max_known_peer_ids) - ~event:target_number_of_known_peers_greater_than_maximum - ~payload:(target_known_peer_ids, max_known_peer_ids) - @ when_ - (limits.max_connections > target_known_peer_ids) - ~event:target_number_of_known_peers_lower_than_maximum_conn - ~payload:(target_known_peer_ids, limits.max_connections)) - @ Option.fold - limits.max_known_points - ~none:[] - ~some:(fun (max_known_points, target_known_points) -> - when_ - (target_known_points > max_known_points) - ~event:target_number_of_known_points_greater_than_maximum - ~payload:(max_known_points, target_known_points) - @ when_ - (limits.max_connections > target_known_points) - ~event:target_number_of_known_points_lower_than_maximum_conn - ~payload:(target_known_points, limits.max_connections)) - -let validate_connections config = Lwt.return_ok (validate_connections config) + let validated_connections = + let limits = config.p2p.limits in + when_ + (limits.min_connections > limits.expected_connections) + ~event:connections_min_expected + ~payload:(limits.min_connections, limits.expected_connections) + @ when_ + (limits.expected_connections > limits.max_connections) + ~event:connections_expected_max + ~payload:(limits.expected_connections, limits.max_connections) + @ Option.fold + limits.max_known_peer_ids + ~none:[] + ~some:(fun (max_known_peer_ids, target_known_peer_ids) -> + when_ + (target_known_peer_ids > max_known_peer_ids) + ~event:target_number_of_known_peers_greater_than_maximum + ~payload:(target_known_peer_ids, max_known_peer_ids) + @ when_ + (limits.max_connections > target_known_peer_ids) + ~event:target_number_of_known_peers_lower_than_maximum_conn + ~payload:(target_known_peer_ids, limits.max_connections)) + @ Option.fold + limits.max_known_points + ~none:[] + ~some:(fun (max_known_points, target_known_points) -> + when_ + (target_known_points > max_known_points) + ~event:target_number_of_known_points_greater_than_maximum + ~payload:(max_known_points, target_known_points) + @ when_ + (limits.max_connections > target_known_points) + ~event:target_number_of_known_points_lower_than_maximum_conn + ~payload:(target_known_points, limits.max_connections)) + in + Lwt.return_ok validated_connections (* Main validation passes. *) diff --git a/src/lib_context/context.ml b/src/lib_context/context.ml index 98d9045ee668bb33fc77698e65d76ac2ed17b8b7..5216827b2e1ff8a82eccd0ea47378172d8bb970d 100644 --- a/src/lib_context/context.ml +++ b/src/lib_context/context.ml @@ -311,6 +311,7 @@ module Make (Encoding : module type of Tezos_context_encoding.Context) = struct let get_hash_version _c = Context_hash.Version.of_int 0 let set_hash_version c v = + let open Lwt_tzresult_syntax in if Context_hash.Version.(of_int 0 = v) then return c else fail (Tezos_context_helpers.Context.Unsupported_context_hash_version v) diff --git a/src/lib_context/context_dump.ml b/src/lib_context/context_dump.ml index cc921e7bf8732f4acdd55b4ca247fdddb2f454ef..a957d319af2a6c3fe7ca84b027ded80122a199ba 100644 --- a/src/lib_context/context_dump.ml +++ b/src/lib_context/context_dump.ml @@ -323,7 +323,7 @@ module Make (I : Dump_interface) = struct notify ()) let dump_context_fd idx context_hash ~context_fd = - let open Lwt_syntax in + let open Lwt_tzresult_syntax in (* Dumping *) let buf = Buffer.create 1_000_000 in let written = ref 0 in @@ -338,7 +338,7 @@ module Make (I : Dump_interface) = struct in Lwt.catch (fun () -> - let* o = I.checkout idx context_hash in + let*! o = I.checkout idx context_hash in match o with | None -> (* FIXME: dirty *) @@ -356,14 +356,14 @@ module Make (I : Dump_interface) = struct else Format.asprintf "%dKiB" (!written / 1_024))) (fun notify -> set_root buf ; - let* elements = + let*! elements = I.context_tree ctxt |> serialize_tree ~notify ~maybe_flush buf in let parents = I.context_parents ctxt in set_eoc buf (I.context_info ctxt) parents ; set_end buf ; - let* () = flush () in - return_ok elements)) + let*! () = flush () in + return elements)) (function | Unix.Unix_error (e, _, _) -> fail @@ System_write_error (Unix.error_message e) diff --git a/src/lib_context/memory/context.ml b/src/lib_context/memory/context.ml index b80cd99a549d721647d91fa0ce8e8f9588c84726..f60b8176110f7a02fbc76b46358f222439832325 100644 --- a/src/lib_context/memory/context.ml +++ b/src/lib_context/memory/context.ml @@ -210,6 +210,7 @@ module Make (Encoding : module type of Tezos_context_encoding.Context) = struct let get_hash_version _c = Context_hash.Version.of_int 0 let set_hash_version c v = + let open Lwt_tzresult_syntax in if Context_hash.Version.(of_int 0 = v) then return c else fail (Tezos_context_helpers.Context.Unsupported_context_hash_version v) diff --git a/src/lib_shell/block_directory.ml b/src/lib_shell/block_directory.ml index 2437cd777f4132e5b85c3a0d8b4c2f57a09a0954..13211e7111ccde2a242e7a3477994b5b40139c24 100644 --- a/src/lib_shell/block_directory.ml +++ b/src/lib_shell/block_directory.ml @@ -58,6 +58,7 @@ let read_partial_context = | Some v -> raw_context_insert (k, Key v) acc) let build_raw_header_rpc_directory (module Proto : Block_services.PROTO) = + let open Lwt_tzresult_syntax in let dir : (Store.chain_store * Block_hash.t * Block_header.t) RPC_directory.t ref = ref RPC_directory.empty diff --git a/src/lib_shell/block_validator.ml b/src/lib_shell/block_validator.ml index 29d8f4b9e7e612df5e0259e1183a5b7bb565f1f6..cc427e051354977bebbabf5986f1f6d302aef327 100644 --- a/src/lib_shell/block_validator.ml +++ b/src/lib_shell/block_validator.ml @@ -117,16 +117,19 @@ module Worker = Worker.Make (Name) (Event) (Request) (Types) (Logger) type t = Worker.infinite Worker.queue Worker.t let check_chain_liveness chain_db hash (header : Block_header.t) = + let open Lwt_tzresult_syntax in let chain_store = Distributed_db.chain_store chain_db in match Store.Chain.expiration chain_store with | Some eol when Time.Protocol.(eol <= header.shell.timestamp) -> - fail @@ invalid_block hash - @@ Expired_chain - { - chain_id = Store.Chain.chain_id chain_store; - expiration = eol; - timestamp = header.shell.timestamp; - } + let error = + Expired_chain + { + chain_id = Store.Chain.chain_id chain_store; + expiration = eol; + timestamp = header.shell.timestamp; + } + in + fail (invalid_block hash error) | None | Some _ -> return_unit let is_already_validated chain_store hash = @@ -322,7 +325,7 @@ let metrics = Shell_metrics.Block_validator.init Name.base let on_launch _ _ (limits, start_testchain, db, validation_process) = let protocol_validator = Protocol_validator.create db in let invalid_blocks_after_precheck = Block_hash_ring.create 50 in - return + Lwt.return_ok { Types.protocol_validator; validation_process; @@ -427,7 +430,7 @@ let create limits db validation_process ~start_testchain = let on_completion = on_completion - let on_no_request _ = return_unit + let on_no_request _ = Lwt_tzresult_syntax.return_unit end in Worker.launch table diff --git a/src/lib_shell/block_validator_process.ml b/src/lib_shell/block_validator_process.ml index d1f2fbd0d4ee130811e3d9a2434f2b4ba4b88deb..2e1e4c4a0e36409fe059e23b19de105f35490f24 100644 --- a/src/lib_shell/block_validator_process.ml +++ b/src/lib_shell/block_validator_process.ml @@ -328,7 +328,7 @@ module Internal_validator_process = struct let* context = Store.Block.context validator.chain_store forking_block in Block_validation.init_test_chain context forked_header - let reconfigure_event_logging _ _ = return_unit + let reconfigure_event_logging _ _ = Lwt_tzresult_syntax.return_unit end (** Block validation using an external process *) diff --git a/src/lib_shell/config_directory.ml b/src/lib_shell/config_directory.ml index 38d28b977f5046ec982fd473527897f1d5b50420..5a0cd4a82c6ebba231d8ad21686b4ac6afd391e9 100644 --- a/src/lib_shell/config_directory.ml +++ b/src/lib_shell/config_directory.ml @@ -25,6 +25,7 @@ let build_rpc_directory ~user_activated_upgrades ~user_activated_protocol_overrides ~mainchain_validator store = + let open Lwt_tzresult_syntax in let register endpoint f directory = RPC_directory.register directory endpoint f in diff --git a/src/lib_shell/node.ml b/src/lib_shell/node.ml index d30a1b0cef5c39dbc527e261bdbf15e0c38adf26..4be28075a774aa8df3e8a4af32f3e4f45e80852e 100644 --- a/src/lib_shell/node.ml +++ b/src/lib_shell/node.ml @@ -368,5 +368,5 @@ let build_rpc_directory node = node.store) ; merge (Version_directory.rpc_directory node.p2p) ; register0 RPC_service.error_service (fun () () -> - return (Data_encoding.Json.schema Error_monad.error_encoding)) ; + Lwt.return_ok (Data_encoding.Json.schema Error_monad.error_encoding)) ; !dir diff --git a/src/lib_shell/prevalidator.ml b/src/lib_shell/prevalidator.ml index 024a4d932636617ccbcd315ee8f3b89d335b6585..f99c25f8ea968dafd7634d7e7ec31637fa60be60 100644 --- a/src/lib_shell/prevalidator.ml +++ b/src/lib_shell/prevalidator.ml @@ -1303,258 +1303,256 @@ module Make let build_rpc_directory w = lazy - (let dir : state RPC_directory.t ref = ref RPC_directory.empty in - let module Proto_services = - Block_services.Make (Filter.Proto) (Filter.Proto) - in - dir := - RPC_directory.register - !dir - (Proto_services.S.Mempool.get_filter RPC_path.open_root) - (fun pv params () -> - return - (get_filter_config_json - ~include_default:params#include_default - pv)) ; - dir := - RPC_directory.register - !dir - (Proto_services.S.Mempool.set_filter RPC_path.open_root) - (fun pv () obj -> - let open Lwt_syntax in - let* () = - try - let config = - Data_encoding.Json.destruct - Filter.Mempool.config_encoding - obj - in - pv.filter_config <- config ; - Lwt.return_unit - with _ -> Event.(emit invalid_mempool_filter_configuration) () - in - return_ok (get_filter_config_json pv)) ; - (* Ban an operation (from its given hash): remove it from the - mempool if present. Add it to the set pv.banned_operations - to prevent it from being fetched/processed/injected in the - future. - Note: If the baker has already received the operation, then - it's necessary to restart it manually to flush the operation - from it. *) - dir := - RPC_directory.register - !dir - (Proto_services.S.Mempool.ban_operation RPC_path.open_root) - (fun _pv () oph -> - Worker.Queue.push_request_and_wait w (Request.Ban oph)) ; - (* Unban an operation (from its given hash): remove it from the - set pv.banned_operations (nothing happens if it was not banned). *) - dir := - RPC_directory.register - !dir - (Proto_services.S.Mempool.unban_operation RPC_path.open_root) - (fun pv () oph -> - pv.shell.banned_operations <- - Operation_hash.Set.remove oph pv.shell.banned_operations ; - return_unit) ; - (* Unban all operations: clear the set pv.banned_operations. *) - dir := - RPC_directory.register - !dir - (Proto_services.S.Mempool.unban_all_operations RPC_path.open_root) - (fun pv () () -> - pv.shell.banned_operations <- Operation_hash.Set.empty ; - return_unit) ; - dir := - RPC_directory.gen_register - !dir - (Proto_services.S.Mempool.pending_operations RPC_path.open_root) - (fun pv params () -> - let map_op_error oph (op, error) acc = - op.Prevalidation.protocol |> fun res -> - Operation_hash.Map.add oph (res, error) acc - in - let applied = - if params#applied then - List.rev_map - (fun op -> - (op.Prevalidation.hash, op.Prevalidation.protocol)) - pv.shell.classification.applied_rev - else [] - in - let filter f map = - Operation_hash.Map.fold f map Operation_hash.Map.empty - in - let refused = - if params#refused then - filter - map_op_error - (Classification.map pv.shell.classification.refused) - else Operation_hash.Map.empty - in - let outdated = - if params#outdated then - filter - map_op_error - (Classification.map pv.shell.classification.outdated) - else Operation_hash.Map.empty - in - let branch_refused = - if params#branch_refused then - filter - map_op_error - (Classification.map pv.shell.classification.branch_refused) - else Operation_hash.Map.empty - in - let branch_delayed = - if params#branch_delayed then - filter - map_op_error - (Classification.map pv.shell.classification.branch_delayed) - else Operation_hash.Map.empty - in - let unprocessed = - Pending_ops.fold - (fun _prio oph op acc -> - Operation_hash.Map.add oph op.protocol acc) - pv.shell.pending - Operation_hash.Map.empty - in - (* FIXME https://gitlab.com/tezos/tezos/-/issues/2250 - - We merge prechecked operation with applied operation - so that the encoding of the RPC does not need to be - changed. Once prechecking will be done by the protocol - and not the plugin, we will change the encoding to - reflect that. *) - let prechecked_with_applied = - if params#applied then - (Operation_hash.Map.bindings pv.shell.classification.prechecked - |> List.rev_map (fun (oph, op) -> - (oph, op.Prevalidation.protocol))) - @ applied - else applied - in - let pending_operations = - { - Proto_services.Mempool.applied = prechecked_with_applied; - refused; - outdated; - branch_refused; - branch_delayed; - unprocessed; - } - in - Proto_services.Mempool.pending_operations_version_dispatcher - ~version:params#version - pending_operations) ; - dir := - RPC_directory.register - !dir - (Proto_services.S.Mempool.request_operations RPC_path.open_root) - (fun pv t () -> - pv.shell.parameters.tools.send_get_current_head ?peer:t#peer_id () ; - return_unit) ; - dir := - RPC_directory.gen_register - !dir - (Proto_services.S.Mempool.monitor_operations RPC_path.open_root) - (fun pv params () -> - Lwt_mutex.with_lock pv.lock @@ fun () -> - let (op_stream, stopper) = - Lwt_watcher.create_stream pv.operation_stream - in - (* Convert ops *) - let fold_op hash (Prevalidation.{protocol; _}, error) acc = - (hash, protocol, error) :: acc - in - (* First call : retrieve the current set of op from the mempool *) - let applied = - if params#applied then - List.map - (fun op -> (op.Prevalidation.hash, op.protocol, [])) - pv.shell.classification.applied_rev - else [] - in - (* FIXME https://gitlab.com/tezos/tezos/-/issues/2250 - - For the moment, applied and prechecked operations are - handled the same way for the user point of view. *) - let prechecked = - if params#applied then - Operation_hash.Map.fold - (fun hash op acc -> - (hash, op.Prevalidation.protocol, []) :: acc) - pv.shell.classification.prechecked - [] - else [] - in - let refused = - if params#refused then - Operation_hash.Map.fold - fold_op - (Classification.map pv.shell.classification.refused) - [] - else [] - in - let branch_refused = - if params#branch_refused then - Operation_hash.Map.fold - fold_op - (Classification.map pv.shell.classification.branch_refused) - [] - else [] - in - let branch_delayed = - if params#branch_delayed then - Operation_hash.Map.fold - fold_op - (Classification.map pv.shell.classification.branch_delayed) - [] - else [] - in - let current_mempool = - List.concat_map - (List.map (function - | (hash, op, []) -> ((hash, op), None) - | (hash, op, errors) -> ((hash, op), Some errors))) - [applied; prechecked; refused; branch_refused; branch_delayed] - in - let current_mempool = ref (Some current_mempool) in - let filter_result = function - | `Prechecked | `Applied -> params#applied - | `Refused _ -> params#refused - | `Outdated _ -> params#outdated - | `Branch_refused _ -> params#branch_refused - | `Branch_delayed _ -> params#branch_delayed - in - let rec next () = - let open Lwt_syntax in - match !current_mempool with - | Some mempool -> - current_mempool := None ; - Lwt.return_some mempool - | None -> ( - let* o = Lwt_stream.get op_stream in - match o with - | Some (kind, op) when filter_result kind -> - let errors = - match kind with - | `Prechecked | `Applied -> None - | `Branch_delayed errors - | `Branch_refused errors - | `Refused errors - | `Outdated errors -> - Some errors - in - Lwt.return_some - [(Prevalidation.(op.hash, op.protocol), errors)] - | Some _ -> next () - | None -> Lwt.return_none) - in - let shutdown () = Lwt_watcher.shutdown stopper in - RPC_answer.return_stream {next; shutdown}) ; - !dir) + (let open Lwt_tzresult_syntax in + let dir : state RPC_directory.t ref = ref RPC_directory.empty in + let module Proto_services = + Block_services.Make (Filter.Proto) (Filter.Proto) + in + dir := + RPC_directory.register + !dir + (Proto_services.S.Mempool.get_filter RPC_path.open_root) + (fun pv params () -> + return + (get_filter_config_json + ~include_default:params#include_default + pv)) ; + dir := + RPC_directory.register + !dir + (Proto_services.S.Mempool.set_filter RPC_path.open_root) + (fun pv () obj -> + let open Lwt_syntax in + let* () = + try + let config = + Data_encoding.Json.destruct Filter.Mempool.config_encoding obj + in + pv.filter_config <- config ; + Lwt.return_unit + with _ -> Event.(emit invalid_mempool_filter_configuration) () + in + return_ok (get_filter_config_json pv)) ; + (* Ban an operation (from its given hash): remove it from the + mempool if present. Add it to the set pv.banned_operations + to prevent it from being fetched/processed/injected in the + future. + Note: If the baker has already received the operation, then + it's necessary to restart it manually to flush the operation + from it. *) + dir := + RPC_directory.register + !dir + (Proto_services.S.Mempool.ban_operation RPC_path.open_root) + (fun _pv () oph -> + Worker.Queue.push_request_and_wait w (Request.Ban oph)) ; + (* Unban an operation (from its given hash): remove it from the + set pv.banned_operations (nothing happens if it was not banned). *) + dir := + RPC_directory.register + !dir + (Proto_services.S.Mempool.unban_operation RPC_path.open_root) + (fun pv () oph -> + pv.shell.banned_operations <- + Operation_hash.Set.remove oph pv.shell.banned_operations ; + return_unit) ; + (* Unban all operations: clear the set pv.banned_operations. *) + dir := + RPC_directory.register + !dir + (Proto_services.S.Mempool.unban_all_operations RPC_path.open_root) + (fun pv () () -> + pv.shell.banned_operations <- Operation_hash.Set.empty ; + return_unit) ; + dir := + RPC_directory.gen_register + !dir + (Proto_services.S.Mempool.pending_operations RPC_path.open_root) + (fun pv params () -> + let map_op_error oph (op, error) acc = + op.Prevalidation.protocol |> fun res -> + Operation_hash.Map.add oph (res, error) acc + in + let applied = + if params#applied then + List.rev_map + (fun op -> (op.Prevalidation.hash, op.Prevalidation.protocol)) + pv.shell.classification.applied_rev + else [] + in + let filter f map = + Operation_hash.Map.fold f map Operation_hash.Map.empty + in + let refused = + if params#refused then + filter + map_op_error + (Classification.map pv.shell.classification.refused) + else Operation_hash.Map.empty + in + let outdated = + if params#outdated then + filter + map_op_error + (Classification.map pv.shell.classification.outdated) + else Operation_hash.Map.empty + in + let branch_refused = + if params#branch_refused then + filter + map_op_error + (Classification.map pv.shell.classification.branch_refused) + else Operation_hash.Map.empty + in + let branch_delayed = + if params#branch_delayed then + filter + map_op_error + (Classification.map pv.shell.classification.branch_delayed) + else Operation_hash.Map.empty + in + let unprocessed = + Pending_ops.fold + (fun _prio oph op acc -> + Operation_hash.Map.add oph op.protocol acc) + pv.shell.pending + Operation_hash.Map.empty + in + (* FIXME https://gitlab.com/tezos/tezos/-/issues/2250 + + We merge prechecked operation with applied operation + so that the encoding of the RPC does not need to be + changed. Once prechecking will be done by the protocol + and not the plugin, we will change the encoding to + reflect that. *) + let prechecked_with_applied = + if params#applied then + (Operation_hash.Map.bindings pv.shell.classification.prechecked + |> List.rev_map (fun (oph, op) -> + (oph, op.Prevalidation.protocol))) + @ applied + else applied + in + let pending_operations = + { + Proto_services.Mempool.applied = prechecked_with_applied; + refused; + outdated; + branch_refused; + branch_delayed; + unprocessed; + } + in + Proto_services.Mempool.pending_operations_version_dispatcher + ~version:params#version + pending_operations) ; + dir := + RPC_directory.register + !dir + (Proto_services.S.Mempool.request_operations RPC_path.open_root) + (fun pv t () -> + pv.shell.parameters.tools.send_get_current_head ?peer:t#peer_id () ; + return_unit) ; + dir := + RPC_directory.gen_register + !dir + (Proto_services.S.Mempool.monitor_operations RPC_path.open_root) + (fun pv params () -> + Lwt_mutex.with_lock pv.lock @@ fun () -> + let (op_stream, stopper) = + Lwt_watcher.create_stream pv.operation_stream + in + (* Convert ops *) + let fold_op hash (Prevalidation.{protocol; _}, error) acc = + (hash, protocol, error) :: acc + in + (* First call : retrieve the current set of op from the mempool *) + let applied = + if params#applied then + List.map + (fun op -> (op.Prevalidation.hash, op.protocol, [])) + pv.shell.classification.applied_rev + else [] + in + (* FIXME https://gitlab.com/tezos/tezos/-/issues/2250 + + For the moment, applied and prechecked operations are + handled the same way for the user point of view. *) + let prechecked = + if params#applied then + Operation_hash.Map.fold + (fun hash op acc -> + (hash, op.Prevalidation.protocol, []) :: acc) + pv.shell.classification.prechecked + [] + else [] + in + let refused = + if params#refused then + Operation_hash.Map.fold + fold_op + (Classification.map pv.shell.classification.refused) + [] + else [] + in + let branch_refused = + if params#branch_refused then + Operation_hash.Map.fold + fold_op + (Classification.map pv.shell.classification.branch_refused) + [] + else [] + in + let branch_delayed = + if params#branch_delayed then + Operation_hash.Map.fold + fold_op + (Classification.map pv.shell.classification.branch_delayed) + [] + else [] + in + let current_mempool = + List.concat_map + (List.map (function + | (hash, op, []) -> ((hash, op), None) + | (hash, op, errors) -> ((hash, op), Some errors))) + [applied; prechecked; refused; branch_refused; branch_delayed] + in + let current_mempool = ref (Some current_mempool) in + let filter_result = function + | `Prechecked | `Applied -> params#applied + | `Refused _ -> params#refused + | `Outdated _ -> params#outdated + | `Branch_refused _ -> params#branch_refused + | `Branch_delayed _ -> params#branch_delayed + in + let rec next () = + let open Lwt_syntax in + match !current_mempool with + | Some mempool -> + current_mempool := None ; + Lwt.return_some mempool + | None -> ( + let* o = Lwt_stream.get op_stream in + match o with + | Some (kind, op) when filter_result kind -> + let errors = + match kind with + | `Prechecked | `Applied -> None + | `Branch_delayed errors + | `Branch_refused errors + | `Refused errors + | `Outdated errors -> + Some errors + in + Lwt.return_some + [(Prevalidation.(op.hash, op.protocol), errors)] + | Some _ -> next () + | None -> Lwt.return_none) + in + let shutdown () = Lwt_watcher.shutdown stopper in + RPC_answer.return_stream {next; shutdown}) ; + !dir) (** Module implementing the events at the {!Worker} level. Contrary to {!Requests}, these functions depend on [Worker]. *) @@ -1749,7 +1747,7 @@ module Make | View (Notify _) | View Leftover | View (Arrived _) | View Advertise -> Event.(emit request_completed_debug) (Request.view r, st) - let on_no_request _ = return_unit + let on_no_request _ = Lwt_tzresult_syntax.return_unit end let table = Worker.create_table Queue diff --git a/src/lib_shell/prevalidator_filters.ml b/src/lib_shell/prevalidator_filters.ml index e895ea42a7b231688f3d69712f18966d5058ec48..0d982d95780425e6a58a47e5a00dae35bba8a091 100644 --- a/src/lib_shell/prevalidator_filters.ml +++ b/src/lib_shell/prevalidator_filters.ml @@ -104,11 +104,13 @@ module No_filter (Proto : Registered_protocol.T) = struct type state = unit - let init _ ?validation_state:_ ~predecessor:_ () = return_unit + let init _ ?validation_state:_ ~predecessor:_ () = + Lwt_tzresult_syntax.return_unit let remove ~filter_state _ = filter_state - let on_flush _ _ ?validation_state:_ ~predecessor:_ () = return_unit + let on_flush _ _ ?validation_state:_ ~predecessor:_ () = + Lwt_tzresult_syntax.return_unit let precheck _ ~filter_state:_ ~validation_state:_ _ _ ~nb_successful_prechecks:_ = diff --git a/src/lib_shell/test/shell_test_helpers.ml b/src/lib_shell/test/shell_test_helpers.ml index 2a6b9fd870d3d17a866bd9958c1ca1eee6308ba6..87aaddaa159f66d9f2a6640bc93777c713c698e5 100644 --- a/src/lib_shell/test/shell_test_helpers.ml +++ b/src/lib_shell/test/shell_test_helpers.ml @@ -104,4 +104,4 @@ let init_mock_p2p chain_name = in let message_cfg = Distributed_db_message.cfg chain_name in let c_meta = {disable_mempool = true; private_node = true} in - return (P2p.faked_network message_cfg peer_metadata_cfg c_meta) + Lwt.return_ok (P2p.faked_network message_cfg peer_metadata_cfg c_meta) diff --git a/src/lib_shell/test/test_prevalidation_t.ml b/src/lib_shell/test/test_prevalidation_t.ml index f61b0e1880c00656998b9ca0a5a070c76afec734..0d5b3964e7649d13e9d7ea4afbdf4c640b81626f 100644 --- a/src/lib_shell/test/test_prevalidation_t.ml +++ b/src/lib_shell/test/test_prevalidation_t.ml @@ -43,7 +43,7 @@ module Mock_protocol : (* We need to override this function (so that it's not [assert false]), because Prevalidation.create calls this function, so we need it to work in all tests below. *) - return () + Lwt_tzresult_syntax.return_unit end module Internal_for_tests = Tezos_shell.Prevalidation.Internal_for_tests @@ -98,7 +98,8 @@ let create_prevalidation Internal_for_tests.CHAIN_STORE with type chain_store = unit = struct type chain_store = unit - let context () _block : Tezos_context.Context.t tzresult Lwt.t = return ctxt + let context () _block : Tezos_context.Context.t tzresult Lwt.t = + Lwt_tzresult_syntax.return ctxt let chain_id () = Init.chain_id end in diff --git a/src/lib_shell/test/test_prevalidator_pending_operations.ml b/src/lib_shell/test/test_prevalidator_pending_operations.ml index d46ffbd49c8aedc68d6d11d7db6f19532f30efb6..ff8d0ff7b9a90eafff3497f9bec9787e6936a31d 100644 --- a/src/lib_shell/test/test_prevalidator_pending_operations.ml +++ b/src/lib_shell/test/test_prevalidator_pending_operations.ml @@ -88,7 +88,7 @@ let test_fold_es_ordering = test_iterators_ordering ~name:"fold_es" ~iterator:Pending_ops.fold_es - return_unit + Lwt_tzresult_syntax.return_unit (* 2. Test partial iteration with fold_es *) diff --git a/src/lib_shell_services/block_services.ml b/src/lib_shell_services/block_services.ml index ff9c1ab7fd63ba2dc2310a2710d84daef1d5729e..6447b71d49c926f6baf2bd2a9325e9e87e04af08 100644 --- a/src/lib_shell_services/block_services.ml +++ b/src/lib_shell_services/block_services.ml @@ -1346,7 +1346,9 @@ module Make (Proto : PROTO) (Next_proto : PROTO) = struct let hash ctxt = let f = make_call0 S.hash ctxt in fun ?(chain = `Main) ?(block = `Head 0) () -> - match block with `Hash (h, 0) -> return h | _ -> f chain block () () + match block with + | `Hash (h, 0) -> Lwt.return_ok h + | _ -> f chain block () () let header ctxt = let f = make_call0 S.header ctxt in diff --git a/src/lib_shell_services/chain_services.ml b/src/lib_shell_services/chain_services.ml index 41c31d22321d10f789e1bc91c09d617a8441cf14..056d8db964964e3fbc561c4dde54587c13f8e9ff 100644 --- a/src/lib_shell_services/chain_services.ml +++ b/src/lib_shell_services/chain_services.ml @@ -218,7 +218,7 @@ let make_call1 s ctxt chain a q p = let chain_id ctxt = let f = make_call0 S.chain_id ctxt in fun ?(chain = `Main) () -> - match chain with `Hash h -> return h | _ -> f chain () () + match chain with `Hash h -> Lwt.return_ok h | _ -> f chain () () let checkpoint ctxt ?(chain = `Main) () = make_call0 S.checkpoint ctxt chain () () diff --git a/src/lib_store/block_store.ml b/src/lib_store/block_store.ml index 5c4e2d4bd5dcf2eca40807cf31f8e96b7ee6243a..fcbb8d81a38e7338f0d6b6d201799f0879d3f4c2 100644 --- a/src/lib_store/block_store.ml +++ b/src/lib_store/block_store.ml @@ -1012,8 +1012,10 @@ let move_all_floating_stores block_store ~new_ro_store = return_unit) let check_store_consistency block_store ~cementing_highwatermark = - Cemented_block_store.get_highest_cemented_level block_store.cemented_store - |> function + let open Lwt_tzresult_syntax in + match + Cemented_block_store.get_highest_cemented_level block_store.cemented_store + with | None -> (* First merge or Rolling 0 *) return_unit diff --git a/src/lib_store/cemented_block_store.ml b/src/lib_store/cemented_block_store.ml index bbe25986507465a112689ac2ae23c9cc9e82a514..7360213496a657a9eed7cf22890591d79bf2beac 100644 --- a/src/lib_store/cemented_block_store.ml +++ b/src/lib_store/cemented_block_store.ml @@ -518,6 +518,7 @@ let read_block_metadata cemented_store block_level = read_block_metadata cemented_store block_level let get_cemented_block_by_hash ~read_metadata (cemented_store : t) hash = + let open Lwt_tzresult_syntax in match get_cemented_block_level cemented_store hash with | None -> return_none | Some level -> diff --git a/src/lib_store/consistency.ml b/src/lib_store/consistency.ml index f6b3a041360761711d26931e7b55c421caf84802..70426005c0ecf7de745461472b3646bd24d569d8 100644 --- a/src/lib_store/consistency.ml +++ b/src/lib_store/consistency.ml @@ -47,6 +47,7 @@ open Store_errors the cementing_highwatermark is consistent with the cemented store. *) let check_cementing_highwatermark ~cementing_highwatermark block_store = + let open Lwt_tzresult_syntax in let cemented_store = Block_store.cemented_block_store block_store in let highest_cemented_level = Cemented_block_store.get_highest_cemented_level cemented_store @@ -362,22 +363,25 @@ let lowest_cemented_block cemented_block_files = (* Returns the lowest block level of a cemented metadata file. *) let lowest_metadata_entry metadata_file = - try - let metadata_file_path = Naming.file_path metadata_file in - let in_file = Zip.open_in metadata_file_path in - let entries = Zip.entries in_file in - let asc_entries = - List.sort - (fun {Zip.filename = a; _} {filename = b; _} -> - Int.compare (int_of_string a) (int_of_string b)) - entries - in - match asc_entries with - | [] -> - (* A metadata file is never empty *) - assert false - | {Zip.filename; _} :: _ -> return_some (Int32.of_string filename) - with exn -> Lwt.fail exn + let metadata_file_path = Naming.file_path metadata_file in + let in_file = Zip.open_in metadata_file_path in + let entries = Zip.entries in_file in + match entries with + | [] -> + (* A metadata file is never empty *) + assert false + | entry :: entries -> + let lowest_entry = + List.fold_left + (fun lowest entry -> + let entry = entry.Zip.filename in + if Compare.Int.(int_of_string lowest <= int_of_string entry) then + lowest + else entry) + entry.Zip.filename + entries + in + Int32.of_string lowest_entry (* Returns the lowest block level, from the cemented store, which is associated to some block metadata *) @@ -386,29 +390,25 @@ let lowest_cemented_metadata cemented_dir = let* metadata_files = Cemented_block_store.load_metadata_table cemented_dir in match metadata_files with | Some metadata_files -> - let rec aux = function - | [] -> return_none - | {Cemented_block_store.metadata_file; start_level; end_level} :: tl - -> ( - let* v = - Lwt.catch - (fun () -> - let* v = lowest_metadata_entry metadata_file in - return_some v) - (function - | _ -> - (* Can be the case if the metadata file is - corrupted. Raise a warning and continue the - search in the next metadata file. *) - let*! () = - Store_events.( - emit warning_missing_metadata (start_level, end_level)) - in - return_none) - in - match v with Some v -> return v | None -> aux tl) + let*! m = + Seq_s.of_seq (Array.to_seq metadata_files) + |> Seq_s.filter_map_s + (fun {Cemented_block_store.metadata_file; start_level; end_level} + -> + match lowest_metadata_entry metadata_file with + | v -> Lwt.return_some v + | exception _ -> + (* Can be the case if the metadata file is + corrupted. Raise a warning and continue the + search in the next metadata file. *) + let*! () = + Store_events.( + emit warning_missing_metadata (start_level, end_level)) + in + Lwt.return_none) + |> Seq_s.first in - aux (Array.to_list metadata_files) + return m | None -> return_none (* Returns both the lowest block and the lowest block with metadata diff --git a/src/lib_store/reconstruction.ml b/src/lib_store/reconstruction.ml index b1c396bd3989b294187bb668bf80b0e4023a6021..6c24d664e81e08b8bfddb546a61540d42f5fea3e 100644 --- a/src/lib_store/reconstruction.ml +++ b/src/lib_store/reconstruction.ml @@ -684,6 +684,7 @@ let reconstruct_floating chain_store context_index ~user_activated_upgrades (* Only Full modes with any offset can be reconstructed *) let check_history_mode_compatibility chain_store savepoint genesis_block = + let open Lwt_tzresult_syntax in match Store.Chain.history_mode chain_store with | History_mode.(Full _) -> fail_when diff --git a/src/lib_store/snapshots.ml b/src/lib_store/snapshots.ml index 813560b688cb08c7033786651940275989bc7da6..0e9ac2e6a9ada0237a879195709ccc7ec30cc9aa 100644 --- a/src/lib_store/snapshots.ml +++ b/src/lib_store/snapshots.ml @@ -722,7 +722,9 @@ let ensure_valid_tmp_snapshot_path snapshot_tmp_dir = (Sys.file_exists (Naming.dir_path snapshot_tmp_dir)) (Cannot_create_tmp_export_directory (Naming.dir_path snapshot_tmp_dir)) -let ensure_valid_export_path = function +let ensure_valid_export_path = + let open Lwt_tzresult_syntax in + function | Some path -> fail_when (Sys.file_exists path) (Invalid_export_path path) | None -> return_unit @@ -2167,6 +2169,7 @@ module Make_snapshot_exporter (Exporter : EXPORTER) : Snapshot_exporter = struct (* Ensures that the history mode requested to export is compatible with the current storage. *) let check_history_mode chain_store ~rolling = + let open Lwt_tzresult_syntax in match (Store.Chain.history_mode chain_store : History_mode.t) with | Archive | Full _ -> return_unit | Rolling _ when rolling -> return_unit diff --git a/src/lib_store/store.ml b/src/lib_store/store.ml index d6de8e3540af80d44afd1925e8adc9a7bcba2e1e..f44bb5c983f3ae8612c4c2e11a6c651cb1bec876 100644 --- a/src/lib_store/store.ml +++ b/src/lib_store/store.ml @@ -334,6 +334,7 @@ module Block = struct | None -> fail @@ Block_not_found {hash; distance = 0} let locked_read_block_by_level chain_store head level = + let open Lwt_tzresult_syntax in let distance = Int32.(to_int (sub (Block_repr.level head) level)) in if distance < 0 then fail @@ -2728,6 +2729,7 @@ let may_switch_history_mode ~store_dir ~context_dir genesis ~new_history_mode = let get_chain_store store chain_id = let chain_store = main_chain_store store in let rec loop chain_store = + let open Lwt_tzresult_syntax in if Chain_id.equal (Chain.chain_id chain_store) chain_id then return chain_store else diff --git a/src/lib_store/stored_data.ml b/src/lib_store/stored_data.ml index 5a28e67df037eb9431a1ec434d4c2d932ae9ef55..d815700322d88af8cfbabf96524c042cc6e6ac81 100644 --- a/src/lib_store/stored_data.ml +++ b/src/lib_store/stored_data.ml @@ -90,7 +90,7 @@ let write_file encoded_file data = let write (Stored_data v) data = Lwt_idle_waiter.force_idle v.scheduler (fun () -> - if v.cache = data then return_unit + if v.cache = data then Lwt_tzresult_syntax.return_unit else ( v.cache <- data ; write_file v.file data)) diff --git a/src/lib_store/test/alpha_utils.ml b/src/lib_store/test/alpha_utils.ml index 7a2f8e7bd4bc73102e6357f841ca80012b35b0bd..bc35ff0da9de731cf63f130eb68d472b6c563d87 100644 --- a/src/lib_store/test/alpha_utils.ml +++ b/src/lib_store/test/alpha_utils.ml @@ -89,12 +89,10 @@ module Account = struct let activator_account = new_account () let find pkh = - try - return - (Signature.Public_key_hash.Table.find known_accounts pkh - |> WithExceptions.Option.to_exn ~none:Not_found) - with Not_found -> - failwith "Missing account: %a" Signature.Public_key_hash.pp pkh + let open Lwt_tzresult_syntax in + match Signature.Public_key_hash.Table.find known_accounts pkh with + | Some v -> return v + | None -> failwith "Missing account: %a" Signature.Public_key_hash.pp pkh let find_alternate pkh = let exception Found of t in diff --git a/src/lib_store/test/test_store.ml b/src/lib_store/test/test_store.ml index 5a24b05b5edf0d3b7f55dfafe89986221a610f34..39e742ff7a06957ce52d0071ac4dc7b99dbc0ca5 100644 --- a/src/lib_store/test/test_store.ml +++ b/src/lib_store/test/test_store.ml @@ -26,8 +26,6 @@ open Test_utils -let test_init _ = return_unit - let test_cycles store = let open Lwt_result_syntax in let chain_store = Store.main_chain_store store in @@ -41,14 +39,7 @@ let test_cycles store = in assert_presence_in_store chain_store blocks -let test_cases = - let wrap_test (s, f) = - let f _ = f in - wrap_test (s, f) - in - List.map - wrap_test - [("initialisation", test_init); ("store cycles", test_cycles)] +let test_cases = [wrap_test ("store cycles", fun _ store -> test_cycles store)] open Example_tree diff --git a/src/lib_validation/block_validation.ml b/src/lib_validation/block_validation.ml index 15ad14d60e995a8d918869b287c76b1fdddd5867..290cd3f3b3d63ccb693f371158105a41c7a54c8c 100644 --- a/src/lib_validation/block_validation.ml +++ b/src/lib_validation/block_validation.ml @@ -289,6 +289,7 @@ module Make (Proto : Registered_protocol.T) = struct return_unit let parse_block_header block_hash (block_header : Block_header.t) = + let open Lwt_tzresult_syntax in match Data_encoding.Binary.of_bytes_opt Proto.block_header_data_encoding @@ -349,6 +350,7 @@ module Make (Proto : Registered_protocol.T) = struct let invalid_block = invalid_block block_hash in List.mapi_es (fun pass -> + let open Lwt_tzresult_syntax in List.map_es (fun op -> let op_hash = Operation.hash op in match @@ -358,7 +360,6 @@ module Make (Proto : Registered_protocol.T) = struct with | None -> fail (invalid_block (Cannot_parse_operation op_hash)) | Some protocol_data -> - let open Lwt_tzresult_syntax in let op = {Proto.shell = op.shell; protocol_data} in let allowed_pass = Proto.acceptable_passes op in let* () = @@ -1002,9 +1003,9 @@ module Make (Proto : Registered_protocol.T) = struct let precheck chain_id ~(predecessor_block_header : Block_header.t) ~predecessor_block_hash ~predecessor_context ~cache ~(block_header : Block_header.t) operations = - let open Lwt_syntax in + let open Lwt_tzresult_syntax in let block_hash = Block_header.hash block_header in - let* r = + let*! r = precheck block_hash chain_id @@ -1016,10 +1017,8 @@ module Make (Proto : Registered_protocol.T) = struct operations in match r with - | Error err -> - Error_monad.fail - (invalid_block block_hash (Economic_protocol_error err)) - | Ok () -> return_ok_unit + | Error err -> fail (invalid_block block_hash (Economic_protocol_error err)) + | Ok () -> return_unit end let assert_no_duplicate_operations block_hash live_operations operations = @@ -1147,8 +1146,7 @@ let apply ?cached_result c ~cache block_header operations = in match r with | Error (Exn (Unix.Unix_error (errno, fn, msg)) :: _) -> - Error_monad.fail - (System_error {errno = Unix.error_message errno; fn; msg}) + fail (System_error {errno = Unix.error_message errno; fn; msg}) | (Ok _ | Error _) as res -> Lwt.return res let precheck ~chain_id ~predecessor_block_header ~predecessor_block_hash @@ -1226,8 +1224,8 @@ let preapply ~chain_id ~user_activated_upgrades ~live_operations ~predecessor_context ~predecessor_shell_header ~predecessor_hash ~predecessor_max_operations_ttl ~predecessor_block_metadata_hash ~predecessor_ops_metadata_hash operations = - let open Lwt_syntax in - let* r = + let open Lwt_tzresult_syntax in + let*! r = preapply ~chain_id ~user_activated_upgrades @@ -1246,6 +1244,5 @@ let preapply ~chain_id ~user_activated_upgrades in match r with | Error (Exn (Unix.Unix_error (errno, fn, msg)) :: _) -> - Error_monad.fail - (System_error {errno = Unix.error_message errno; fn; msg}) + fail (System_error {errno = Unix.error_message errno; fn; msg}) | (Ok _ | Error _) as res -> Lwt.return res