diff --git a/src/lib_dal_node/cli.ml b/src/lib_dal_node/cli.ml index 17c2c7c9896846b7222b1737c99bbb29d92c7bcd..6d366a3cb630e3f2766068b8984f3d788acc3c5b 100644 --- a/src/lib_dal_node/cli.ml +++ b/src/lib_dal_node/cli.ml @@ -1000,6 +1000,94 @@ module Config = struct Cmdliner.Cmd.group ~default info [Init.cmd; Update.cmd] end +module Snapshot = struct + let man = + [ + `S "SNAPSHOT DESCRIPTION"; + `P "Entrypoint for snapshot management commands."; + ] + + module Export = struct + let man = [`S "DESCRIPTION"; `P "Export DAL node data to a snapshot file."] + + let info = + let version = Tezos_version_value.Bin_version.octez_version_string in + Cmdliner.Cmd.info ~doc:"Export snapshot" ~man ~version "export" + + let min_published_level_arg = + let open Term in + let parse, pp = positive_int_format "min-published-level" in + make_arg + ~doc:"Minimum published level to include in the snapshot." + ~parse + ~placeholder:"INT" + ~pp + "min-published-level" + + let min_published_level = Term.arg_to_cmdliner min_published_level_arg + + let max_published_level_arg = + let open Term in + let parse, pp = positive_int_format "max-published-level" in + make_arg + ~doc:"Maximum published level to include in the snapshot." + ~parse + ~placeholder:"INT" + ~pp + "max-published-level" + + let max_published_level = Term.arg_to_cmdliner max_published_level_arg + + let file_path = + let open Cmdliner in + Arg.( + required + & pos 0 (some string) None + & info + ~docv:"DIR" + ~doc:"Path to the snapshot directory where to export." + ~docs:"OPTIONS" + []) + + let action min_published_level max_published_level data_dir endpoint + config_file file_path = + let data_dir = + Option.value ~default:Configuration_file.default.data_dir data_dir + in + let config_file = + Option.value + ~default:(Configuration_file.default_config_file data_dir) + config_file + in + let min_level = Option.map Int32.of_int min_published_level in + let max_level = Option.map Int32.of_int max_published_level in + Snapshot.export + ~data_dir + ~config_file + ~endpoint + ~min_published_level:min_level + ~max_published_level:max_level + file_path + + let term = + Cmdliner.Term.( + map + wrap_action + (const action $ min_published_level $ max_published_level + $ Term.data_dir $ Term.endpoint $ Term.config_file $ file_path)) + + let cmd = Cmdliner.Cmd.v info term + end + + let cmd = + let default = Cmdliner.Term.(ret (const (`Help (`Pager, None)))) in + let info = + let version = Tezos_version_value.Bin_version.octez_version_string in + Cmdliner.Cmd.info ~doc:"Snapshot management" ~man ~version "snapshot" + in + Cmdliner.Cmd.group ~default info [Export.cmd] +end + module Debug = struct let man = [`S "DEBUG DESCRIPTION"; `P "Entrypoint for the debug commands."] @@ -1175,4 +1263,7 @@ let commands = let version = Tezos_version_value.Bin_version.octez_version_string in Cmdliner.Cmd.info ~doc:"The Octez DAL node" ~version "octez-dal-node" in - Cmdliner.Cmd.group ~default info [Run.cmd; Config.cmd; Debug.cmd] + Cmdliner.Cmd.group + ~default + info + [Run.cmd; Config.cmd; Snapshot.cmd; Debug.cmd] diff --git a/src/lib_dal_node/constants.ml b/src/lib_dal_node/constants.ml index 1a565a77584235a48b35e8041fa5de765c83e0fa..8b11cf6d07627fe4abad40bdb486887b4bf8f9a9 100644 --- a/src/lib_dal_node/constants.ml +++ b/src/lib_dal_node/constants.ml @@ -29,6 +29,11 @@ let number_of_shards = 512 let attestation_lag = 8 +(* Slack (in blocks) for outdated message validation. Messages older than + [attestation_lag + validation_slack] blocks from the head are considered + too old and rejected. *) +let validation_slack = 4 + let traps_fraction = Q.(1 // 2000) (* Each entry in the cache maintains two open file descriptors (one via diff --git a/src/lib_dal_node/constants.mli b/src/lib_dal_node/constants.mli index 2435ad19650563f20c29a3a4b7ac387e5258edcb..997fa890778e7fd3504b90d1779e03872d997dd1 100644 --- a/src/lib_dal_node/constants.mli +++ b/src/lib_dal_node/constants.mli @@ -23,6 +23,17 @@ (* *) (*****************************************************************************) +(** [number_of_slots] is the default number of DAL slots per level. *) +val number_of_slots : int + +(** [number_of_shards] is the default number of shards per slot. *) +val number_of_shards : int + +(** [validation_slack] is the slack (in blocks) for outdated message validation. + Messages older than [attestation_lag + validation_slack] blocks from the + head are considered too old and rejected. *) +val validation_slack : int + (** [shards_store_lru_size] is the maximum shards store LRU size. See {!Key_value_store.init} and {!Store.Shards.init}. *) val shards_store_lru_size : int diff --git a/src/lib_dal_node/daemon.ml b/src/lib_dal_node/daemon.ml index 9dbd3c909639e2f3e6939c95a49426587afeda59..35c741f7b5ddd1f3a8dfe1e967a354ce1cf790c8 100644 --- a/src/lib_dal_node/daemon.ml +++ b/src/lib_dal_node/daemon.ml @@ -50,37 +50,6 @@ let[@warning "-32"] may_start_profiler data_dir = backends | None -> () -let init_cryptobox config proto_parameters profile = - let open Lwt_result_syntax in - let prover_srs = Profile_manager.is_prover_profile profile in - let* () = - if prover_srs then - let find_srs_files () = Tezos_base.Dal_srs.find_trusted_setup_files () in - Cryptobox.init_prover_dal - ~find_srs_files - ~fetch_trusted_setup:config.Configuration_file.fetch_trusted_setup - () - else return_unit - in - match Cryptobox.make proto_parameters.Types.cryptobox_parameters with - | Ok cryptobox -> - if prover_srs then - match Cryptobox.precompute_shards_proofs cryptobox with - | Ok precomputation -> return (cryptobox, Some precomputation) - | Error (`Invalid_degree_strictly_less_than_expected {given; expected}) - -> - fail - [ - Errors.Cryptobox_initialisation_failed - (Printf.sprintf - "Cryptobox.precompute_shards_proofs: SRS size (= %d) \ - smaller than expected (= %d)" - given - expected); - ] - else return (cryptobox, None) - | Error (`Fail msg) -> fail [Errors.Cryptobox_initialisation_failed msg] - (* Monitor and process finalized heads. *) let on_new_finalized_head ctxt cctxt crawler = let open Lwt_result_syntax in @@ -695,7 +664,7 @@ let run ?(disable_shard_validation = false) ~ignore_pkhs ~data_dir ~config_file Instead of recomputing these parameters, they could be stored (for a given cryptobox). *) let* cryptobox, shards_proofs_precomputation = - init_cryptobox config proto_parameters profile_ctxt + Node_context.init_cryptobox config proto_parameters profile_ctxt in (* Set crypto box share size hook. *) Value_size_hooks.set_share_size diff --git a/src/lib_dal_node/message_validation.ml b/src/lib_dal_node/message_validation.ml index 56d0a821de6f05fc1da55c911da0f9e11995c8b8..3f69e940027c41868d196a4417dd4911f31c9855 100644 --- a/src/lib_dal_node/message_validation.ml +++ b/src/lib_dal_node/message_validation.ml @@ -165,55 +165,51 @@ let gossipsub_app_messages_validation ctxt cryptobox ~head_level happens, received data are considered as spam (invalid), and the remote peer might be punished, depending on the Gossipsub implementation. *) `Invalid - else - (* Have some slack for outdated messages. *) - let slack = 4 in - if - Int32.( - sub head_level message_id.Types.Message_id.level - > of_int (proto_parameters.Types.attestation_lag + slack)) - then - (* 2. Nodes don't care about messages whose ids are too old. Gossipsub + else if + Int32.( + sub head_level message_id.Types.Message_id.level + > of_int + (proto_parameters.Types.attestation_lag + Constants.validation_slack)) + then + (* 2. Nodes don't care about messages whose ids are too old. Gossipsub should only be used for the dissemination of fresh data. Old data could be retrieved using another method. *) - `Outdated - else - match - gossipsub_message_id_validation ctxt proto_parameters message_id - with - | `Valid -> - (* 3. Only check for message validity if the message_id is valid. *) - let res = - Option.fold - message - ~none:`Valid - ~some: - (gossipsub_app_message_payload_validation - ~disable_shard_validation: - (Node_context.get_disable_shard_validation ctxt) - cryptobox - message_id) - in - (if res = `Valid then - let store = Node_context.get_store ctxt in - let traps_store = Store.traps store in - (* TODO: https://gitlab.com/tezos/tezos/-/issues/7742 + `Outdated + else + match gossipsub_message_id_validation ctxt proto_parameters message_id with + | `Valid -> + (* 3. Only check for message validity if the message_id is valid. *) + let res = + Option.fold + message + ~none:`Valid + ~some: + (gossipsub_app_message_payload_validation + ~disable_shard_validation: + (Node_context.get_disable_shard_validation ctxt) + cryptobox + message_id) + in + (if res = `Valid then + let store = Node_context.get_store ctxt in + let traps_store = Store.traps store in + (* TODO: https://gitlab.com/tezos/tezos/-/issues/7742 The [proto_parameters] are those for the last known finalized level, which may differ from those of the slot level. This will be an issue when the value of the [traps_fraction] changes. (We cannot use {!Node_context.get_proto_parameters}, as it is not monad-free; we'll need to use mapping from levels to parameters.) *) - Option.iter - (Slot_manager.maybe_register_trap - traps_store - ~traps_fraction:proto_parameters.traps_fraction - message_id) - message) ; - res - | other -> - (* 4. In the case the message_id is not valid. *) - other + Option.iter + (Slot_manager.maybe_register_trap + traps_store + ~traps_fraction:proto_parameters.traps_fraction + message_id) + message) ; + res + | other -> + (* 4. In the case the message_id is not valid. *) + other type batch_identifier = {level : int32; slot_index : int} @@ -254,12 +250,12 @@ let triage ctxt head_level proto_parameters batch = message, _peers ) -> - (* Have some slack for outdated messages. *) - let slack = 4 in if Int32.( sub head_level level - > of_int (proto_parameters.Types.attestation_lag + slack)) + > of_int + (proto_parameters.Types.attestation_lag + + Constants.validation_slack)) then (index, `Outdated) :: not_valid else match gossipsub_message_id_validation ctxt proto_parameters id with diff --git a/src/lib_dal_node/node_context.ml b/src/lib_dal_node/node_context.ml index 995a56f5617f9cadf11e56a8e59fbcc56dbcf32d..35365c1eb5c19d78fe8e0e83664861b7650d149e 100644 --- a/src/lib_dal_node/node_context.ml +++ b/src/lib_dal_node/node_context.ml @@ -81,6 +81,37 @@ let init config ~identity ~network_name profile_ctxt cryptobox Attestable_slots_watcher_table.create ~initial_size:5; } +let init_cryptobox config proto_parameters profile = + let open Lwt_result_syntax in + let prover_srs = Profile_manager.is_prover_profile profile in + let* () = + if prover_srs then + let find_srs_files () = Tezos_base.Dal_srs.find_trusted_setup_files () in + Cryptobox.init_prover_dal + ~find_srs_files + ~fetch_trusted_setup:config.Configuration_file.fetch_trusted_setup + () + else return_unit + in + match Cryptobox.make proto_parameters.Types.cryptobox_parameters with + | Ok cryptobox -> + if prover_srs then + match Cryptobox.precompute_shards_proofs cryptobox with + | Ok precomputation -> return (cryptobox, Some precomputation) + | Error (`Invalid_degree_strictly_less_than_expected {given; expected}) + -> + fail + [ + Errors.Cryptobox_initialisation_failed + (Printf.sprintf + "Cryptobox.precompute_shards_proofs: SRS size (= %d) \ + smaller than expected (= %d)" + given + expected); + ] + else return (cryptobox, None) + | Error (`Fail msg) -> fail [Errors.Cryptobox_initialisation_failed msg] + let get_tezos_node_cctxt ctxt = ctxt.tezos_node_cctxt let get_identity ctxt = ctxt.identity diff --git a/src/lib_dal_node/node_context.mli b/src/lib_dal_node/node_context.mli index 7b7de8fb90ce77e5565b42f3d0a815cb2926b6a2..810c33e90ac3b5ec56ab1519115394d2cee6e46c 100644 --- a/src/lib_dal_node/node_context.mli +++ b/src/lib_dal_node/node_context.mli @@ -47,6 +47,16 @@ val init : unit -> t +(** [init_cryptobox config proto_parameters profile] initializes the cryptobox + with the given configuration and protocol parameters. For prover profiles, + it also initializes the SRS and precomputes shard proofs. Returns the + initialized cryptobox and optional precomputation. *) +val init_cryptobox : + Configuration_file.t -> + Types.proto_parameters -> + Profile_manager.t -> + (Cryptobox.t * Cryptobox.shards_proofs_precomputation option) tzresult Lwt.t + (** Returns all the registered plugins *) val get_all_plugins : t -> (module Dal_plugin.T) list diff --git a/src/lib_dal_node/snapshot.ml b/src/lib_dal_node/snapshot.ml new file mode 100644 index 0000000000000000000000000000000000000000..fd889ecbfb88ff50b896ccd9d78a643826a69957 --- /dev/null +++ b/src/lib_dal_node/snapshot.ml @@ -0,0 +1,333 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2025 Nomadic Labs *) +(* *) +(*****************************************************************************) + +(* + This module uses the [Key_value_store.Read] module to open the store in + readonly mode, ensuring that the export leaves source data untouched, while + still allowing another DAL node to run. +*) + +open Filename.Infix + +(** Read a value from a Single_value_store module in readonly mode. + Fails if the value is not found. *) +let read_from_store (type value) ~root_dir + (module Store : Single_value_store.S with type value = value) = + let open Lwt_result_syntax in + let* store = Store.init_readonly ~root_dir in + let* value_opt = Store.load store in + match value_opt with + | Some value -> return value + | None -> failwith "Value not found in store at %s" root_dir + +(** Iterate through all levels in the given range, calling [f] for each level. *) +let iterate_levels ~min_published_level ~max_published_level f = + let open Lwt_result_syntax in + let rec iterate_levels level = + if Compare.Int32.(level > max_published_level) then return_unit + else + let* () = f level in + iterate_levels (Int32.succ level) + in + iterate_levels min_published_level + +(** Iterate through all levels and slot indices in the given range, + calling [f] for each [{slot_level; slot_index}] slot id. *) +let iterate_all_slots ~min_published_level ~max_published_level f = + let open Lwt_result_syntax in + iterate_levels ~min_published_level ~max_published_level @@ fun level -> + let rec iterate_slot_index slot_index = + if slot_index >= Constants.number_of_slots then return_unit + else + let slot_id = Types.Slot_id.{slot_level = level; slot_index} in + let* () = f slot_id in + iterate_slot_index (slot_index + 1) + in + iterate_slot_index 0 + +(** Copy a value from source KVS to destination KVS. + Returns [Ok ()] if the value was copied or if src value is not found. *) +let kvs_copy_value src_store dst_store file_layout file key = + let open Lwt_result_syntax in + let* exists = + Key_value_store.Read.value_exists src_store file_layout file key + in + if not exists then return_unit + else + let* value = + Key_value_store.Read.read_value src_store file_layout file key + in + Key_value_store.write_value + ~override:false + dst_store + file_layout + file + key + value + +module Export = struct + (** Export a filtered subset of the skip_list SQLite database by iterating + through levels in the range [min_published_level, max_published_level] + and copying the data using Dal_store_sqlite3 functions. *) + let export_skip_list ~src ~dst ~min_published_level ~max_published_level = + let open Lwt_result_syntax in + let*! () = Lwt_utils_unix.create_dir dst in + (* Initialize destination database with empty schema *) + let* dst_db = + Dal_store_sqlite3.Skip_list_cells.init + ~data_dir:dst + ~perm:Sqlite.Read_write + () + in + (* Open source database in read-only mode *) + let* src_db = + Dal_store_sqlite3.Skip_list_cells.init + ~data_dir:src + ~perm:Sqlite.(Read_only {pool_size = 1}) + () + in + Lwt.finalize + (fun () -> + iterate_levels ~min_published_level ~max_published_level @@ fun level -> + (* Get all skip list cells for this level from source *) + let* cells = + Dal_store_sqlite3.Skip_list_cells.find_by_level + src_db + ~published_level:level + in + (* For each slot in this level, get complete info and insert into destination *) + let* items_with_lag = + List.map_es + (fun (_cell, hash, slot_index) -> + let slot_id = Types.Slot_id.{slot_level = level; slot_index} in + let* result = + Dal_store_sqlite3.Skip_list_cells.find_by_slot_id_opt + src_db + slot_id + in + match result with + | Some (cell, attestation_lag) -> + return (hash, cell, slot_index, attestation_lag) + | None -> failwith "Cell found by level but not by slot_id") + cells + in + match items_with_lag with + | [] -> return_unit + | (_, _, _, attestation_lag) :: _ -> + let attested_level = Int32.(add level (of_int attestation_lag)) in + Dal_store_sqlite3.Skip_list_cells.insert + dst_db + ~attested_level + items_with_lag + Fun.id) + (fun () -> + let open Lwt_syntax in + let* _ = Dal_store_sqlite3.Skip_list_cells.close src_db in + let* _ = Dal_store_sqlite3.Skip_list_cells.close dst_db in + return_unit) + + (** Export slots for all published slots in the given level range. + Copies slot files from source to destination directory. *) + let export_slots ~cryptobox ~src ~dst ~min_published_level + ~max_published_level = + let open Lwt_result_syntax in + let*! () = Lwt_utils_unix.create_dir dst in + let* src_store = + Key_value_store.Read.init ~lru_size:Constants.slots_store_lru_size src + in + let* dst_store = + Key_value_store.init + ~lru_size:Constants.slots_store_lru_size + ~root_dir:dst + in + Lwt.finalize + (fun () -> + iterate_all_slots ~min_published_level ~max_published_level + @@ fun slot_id -> + let Cryptobox.{slot_size; _} = Cryptobox.parameters cryptobox in + let file_layout = Store.Slots.file_layout in + kvs_copy_value src_store dst_store file_layout (slot_id, slot_size) ()) + (fun () -> + let open Lwt_syntax in + let* _ = Key_value_store.Read.close src_store in + let* _ = Key_value_store.close dst_store in + return_unit) + + (** Export shards for all slots in the given level range. + Copies shard files from source to destination directory. *) + let export_shards ~src ~dst ~min_published_level ~max_published_level = + let open Lwt_result_syntax in + let*! () = Lwt_utils_unix.create_dir dst in + let* src_store = + Key_value_store.Read.init ~lru_size:Constants.shards_store_lru_size src + in + let* dst_store = + Key_value_store.init + ~lru_size:Constants.shards_store_lru_size + ~root_dir:dst + in + Lwt.finalize + (fun () -> + (* For each level, we need to check all possible slot indices. + In practice, we should get this from the skip_list data, + but for now we'll scan for existing files. *) + iterate_all_slots ~min_published_level ~max_published_level + @@ fun slot_id -> + let file_layout = Store.Shards_disk.file_layout in + let*! count_result = + Key_value_store.Read.count_values src_store file_layout slot_id + in + match count_result with + | Error _ -> + (* Slot file doesn't exist, skip *) + return_unit + | Ok 0 -> + (* No shards for this slot *) + return_unit + | Ok _count -> + (* Copy all shards for this slot *) + let rec copy_shard shard_index = + if shard_index >= Constants.number_of_shards then return_unit + else + let* () = + kvs_copy_value + src_store + dst_store + file_layout + slot_id + shard_index + in + copy_shard (shard_index + 1) + in + copy_shard 0) + (fun () -> + let open Lwt_syntax in + let* _ = Key_value_store.Read.close src_store in + let* _ = Key_value_store.close dst_store in + return_unit) + + let export ~data_dir:src_data_dir ~config_file ~endpoint ~min_published_level + ~max_published_level dst_data_dir = + let open Lwt_result_syntax in + let* config = Configuration_file.load ~config_file in + let endpoint = Option.value ~default:config.endpoint endpoint in + let config = Configuration_file.{config with endpoint} in + let cctxt = Rpc_context.make config.Configuration_file.endpoint in + let* header, proto_plugins = L1_helpers.wait_for_block_with_plugin cctxt in + let*? (module Plugin : Dal_plugin.T), proto_parameters = + Proto_plugins.get_plugin_and_parameters_for_level + proto_plugins + ~level:header.Block_header.shell.level + in + let profile_ctxt = Profile_manager.empty in + (* Initialize crypto as needed by file layouts. *) + let* cryptobox, _ = + Node_context.init_cryptobox config proto_parameters profile_ctxt + in + (* Set crypto box share size hook. *) + Value_size_hooks.set_share_size + (Cryptobox.Internal_for_tests.encoded_share_size cryptobox) ; + let store_path data_dir = + (* We only need data_dir field, so we can just use default as base. *) + Configuration_file.store_path {Configuration_file.default with data_dir} + in + let src_root_dir = store_path src_data_dir in + let dst_root_dir = store_path dst_data_dir in + let*! dst_exists = Lwt_unix.file_exists dst_root_dir in + let* () = + if dst_exists then + failwith + "Destination directory %s already exists. Please remove it or choose \ + a different destination." + dst_root_dir + else return_unit + in + let* chain_id = + read_from_store ~root_dir:src_root_dir (module Store.Chain_id) + in + let* first_seen_level = + read_from_store ~root_dir:src_root_dir (module Store.First_seen_level) + in + let* last_processed_level = + read_from_store ~root_dir:src_root_dir (module Store.Last_processed_level) + in + (* min_published_level: max of first_seen_level and requested min *) + let min_published_level = + match min_published_level with + | None -> first_seen_level + | Some requested -> max first_seen_level requested + in + (* The DAL node stops validating shards published at a level older than + last_processed_level - (slack + attestation_lag), which means that data + before this point won't be updated by the DAL node. We cap the + max_published_level to this value to avoid exporting stale data. *) + let max_published_level = + let latest_frozen_level = + Int32.( + sub + last_processed_level + (of_int + (Constants.validation_slack + proto_parameters.attestation_lag + + 1))) + in + match max_published_level with + | None -> latest_frozen_level + | Some requested -> min latest_frozen_level requested + in + (* Export slots *) + let* () = + let src_slot_dir = src_root_dir // Store.Stores_dirs.slot in + let dst_slot_dir = dst_root_dir // Store.Stores_dirs.slot in + export_slots + ~cryptobox + ~src:src_slot_dir + ~dst:dst_slot_dir + ~min_published_level + ~max_published_level + in + (* Export shards *) + let* () = + let src_shard_dir = src_root_dir // Store.Stores_dirs.shard in + let dst_shard_dir = dst_root_dir // Store.Stores_dirs.shard in + export_shards + ~src:src_shard_dir + ~dst:dst_shard_dir + ~min_published_level + ~max_published_level + in + (* Export skip_list *) + let* () = + let dst_skip_list_dir = + dst_root_dir // Store.Stores_dirs.skip_list_cells + in + let src_skip_list_dir = + src_root_dir // Store.Stores_dirs.skip_list_cells + in + export_skip_list + ~src:src_skip_list_dir + ~dst:dst_skip_list_dir + ~min_published_level + ~max_published_level + in + let* chain_id_store = Store.Chain_id.init ~root_dir:dst_root_dir in + let* () = Store.Chain_id.save chain_id_store chain_id in + let* first_seen_store = + Store.First_seen_level.init ~root_dir:dst_root_dir + in + let* () = + Store.First_seen_level.save first_seen_store min_published_level + in + let* last_processed_store = + Store.Last_processed_level.init ~root_dir:dst_root_dir + in + let* () = + Store.Last_processed_level.save last_processed_store max_published_level + in + return_unit +end + +let export = Export.export diff --git a/src/lib_dal_node/snapshot.mli b/src/lib_dal_node/snapshot.mli new file mode 100644 index 0000000000000000000000000000000000000000..e8a9f19a7c7d77c35c71d9e9536134134a125e36 --- /dev/null +++ b/src/lib_dal_node/snapshot.mli @@ -0,0 +1,25 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2025 Nomadic Labs *) +(* *) +(*****************************************************************************) + +(** [export ~data_dir ~config_file ~endpoint ~min_published_level + ~max_published_level path ] exports the DAL node store data for slots + published between [min_published_level] and [max_published_level] + (inclusive) to a snapshot at the given [path]. + If [min_published_level] is [None], the [first_seen_level] from the store + is used as the default. + If [max_published_level] is [None], the [last_processed_level] from the + store is used as the default. + The [endpoint] parameter, if provided, overrides the endpoint specified + in the config file. *) +val export : + data_dir:string -> + config_file:string -> + endpoint:Uri.t option -> + min_published_level:int32 option -> + max_published_level:int32 option -> + string -> + unit tzresult Lwt.t diff --git a/src/lib_dal_node/store.mli b/src/lib_dal_node/store.mli index c699134a10ab90fa51d0c6d5f2039a109087d56c..f0ecfd8b315987641b3ea4ed537e1862e869e781 100644 --- a/src/lib_dal_node/store.mli +++ b/src/lib_dal_node/store.mli @@ -11,6 +11,30 @@ open Cryptobox +(** Store directory names within the DAL node's data directory. *) +module Stores_dirs : sig + val shard : string + + val slot : string + + val skip_list_cells : string +end + +module Shards_disk : sig + (** Low-level access to shards stored on disk. *) + + (** [file_layout ~root_dir slot_id] returns the Key_value_store layout + for the given [slot_id] in the root directory. + + Beware that cryptographic parameters must be initialized via + {!Node_context.init_cryptobox} or {!Value_size_hooks.set_share_size} before + using this function. *) + val file_layout : + root_dir:string -> + Types.slot_id -> + (int, Cryptobox.share) Key_value_store.layout +end + module Shards : sig (** A shard of some slot id consist of a shard index (a number between 0 and the number_of_shards protocol parameter) and a @@ -60,6 +84,13 @@ module Slots : sig type t + (** [file_layout ~root_dir (slot_id, slot_size)] returns the Key_value_store + layout for the given slot_id and slot_size in the root directory. *) + val file_layout : + root_dir:string -> + Types.slot_id * int -> + (unit, bytes) Key_value_store.layout + (** [add_slot store ~slot_size slot_content slot_id] adds a mapping from the given slot id to the given slot content. *) val add_slot : diff --git a/tezt/lib_tezos/dal_node.ml b/tezt/lib_tezos/dal_node.ml index 46da47d604a19135f0638f35e151376d68210f36..36023b54e48a574b8acf425d73967fc7f85c1d02 100644 --- a/tezt/lib_tezos/dal_node.ml +++ b/tezt/lib_tezos/dal_node.ml @@ -567,6 +567,36 @@ let debug_print_store_schemas ?path ?hooks () = let process = Process.spawn ?hooks path @@ args in Process.check process +let snapshot_export dal_node ?endpoint ?min_published_level ?max_published_level + output_file = + let data_dir = data_dir dal_node in + let endpoint_args = + match endpoint with + | None -> [] + | Some ep -> ["--endpoint"; Endpoint.as_string ep] + in + let min_level_args = + match min_published_level with + | None -> [] + | Some level -> ["--min-published-level"; Int32.to_string level] + in + let max_level_args = + match max_published_level with + | None -> [] + | Some level -> ["--max-published-level"; Int32.to_string level] + in + let args = + ["snapshot"; "export"] @ ["--data-dir"; data_dir] @ endpoint_args + @ min_level_args @ max_level_args @ [output_file] + in + let path = + if use_baker_to_start_dal_node = Some true then + Uses.path Constant.octez_agnostic_baker + else Uses.path Constant.octez_dal_node + in + let process = Process.spawn path args in + Process.check process + module Proxy = struct type answer = [`Response of string | `Stream of Cohttp_lwt.Body.t] diff --git a/tezt/lib_tezos/dal_node.mli b/tezt/lib_tezos/dal_node.mli index d5dcabe1f621df19f9ca53ca869c2a8e34ce5dd9..5326d184b2176a72db3d6c8e37e7217289f7b65b 100644 --- a/tezt/lib_tezos/dal_node.mli +++ b/tezt/lib_tezos/dal_node.mli @@ -258,6 +258,20 @@ val load_last_finalized_processed_level : t -> int option Lwt.t val debug_print_store_schemas : ?path:string -> ?hooks:Process_hooks.t -> unit -> unit Lwt.t +(** [snapshot_export dal_node ?endpoint ?min_published_level + ?max_published_level output_file] exports a snapshot of the DAL node's + store to [output_file]. + If [endpoint] is provided, it overrides the endpoint in the config file. + [min_published_level] and [max_published_level] are optional level ranges + to export. *) +val snapshot_export : + t -> + ?endpoint:Endpoint.t -> + ?min_published_level:int32 -> + ?max_published_level:int32 -> + string -> + unit Lwt.t + (** The Proxy module provides functionality to create a proxy server that can intercept and mock responses for DAL node requests. *) module Proxy : sig diff --git a/tezt/tests/dal.ml b/tezt/tests/dal.ml index b33bb10cf1cf524816aa454c85abb38b9137d670..2d0b4ab9c2c2a4e838be46b31b3b7a97ca410ef3 100644 --- a/tezt/tests/dal.ml +++ b/tezt/tests/dal.ml @@ -1095,8 +1095,8 @@ let slot_idx parameters level = The [delegates] parameter is used to specify which delegates should participate in baking. *) -let publish_and_bake ?delegates ~from_level ~to_level parameters cryptobox node - client dal_node = +let publish_and_bake ?slots ?delegates ~from_level ~to_level parameters + cryptobox node client dal_node = let num_bakers = Array.length Account.Bootstrap.keys in let publish source ~index message = let* _op_hash = @@ -1104,35 +1104,48 @@ let publish_and_bake ?delegates ~from_level ~to_level parameters cryptobox node in unit in - let publish_and_store level = - let source = Account.Bootstrap.keys.(level mod num_bakers) in - let index = slot_idx parameters level in - let slot_content = - Format.asprintf "content at level %d index %d" level index - in - let* () = publish source ~index slot_content in - let* _commitment, _proof = - let slot_size = parameters.Dal.Parameters.cryptobox.slot_size in - Helpers.( - store_slot dal_node ~slot_index:index - @@ make_slot ~slot_size slot_content) - in - Log.info "Slot with %d index (normally) published at level %d" index level ; - unit + let publish_and_store = + let slot_idx = + match slots with + | Some s -> + let num_slots = List.length s in + fun level -> List.nth s (slot_idx parameters level mod num_slots) + | None -> slot_idx parameters + in + fun level -> + let source = Account.Bootstrap.keys.(level mod num_bakers) in + let index = slot_idx level in + let slot_content = + Format.asprintf "content at level %d index %d" level index + in + let* () = publish source ~index slot_content in + let* _commitment, _proof = + let slot_size = parameters.Dal.Parameters.cryptobox.slot_size in + Helpers.( + store_slot dal_node ~slot_index:index + @@ make_slot ~slot_size slot_content) + in + Log.info "Slot with %d index (normally) published at level %d" index level ; + return (level, index) in Log.info "Publish (inject and bake) a slot header at each level from %d to %d." from_level to_level ; - let rec iter level = - if level > to_level then return () + let rec iter acc level = + if level > to_level then return acc else - let* () = publish_and_store level in - let* () = bake_for ?delegates client in + let* level, index = publish_and_store level in + let* () = + bake_for + ?delegates + ~dal_node_endpoint:(Dal_node.rpc_endpoint dal_node) + client + in let* _ = Node.wait_for_level node level in - iter (level + 1) + iter ((level, index) :: acc) (level + 1) in - iter from_level + iter [] from_level (* We check that publishing a slot header with a proof for a different slot leads to a proof-checking error. *) @@ -2315,7 +2328,77 @@ let test_dal_node_test_get_level_slot_content _protocol parameters _cryptobox %L, got = %R)" ; unit -let test_dal_node_import_snapshot _protocol parameters _cryptobox node client +let test_dal_node_snapshot_export ~operators _protocol parameters cryptobox node + client dal_node = + let* start = Node.get_level node in + let expected_exported_levels = 5 in + let stop = + start + expected_exported_levels + + parameters.Dal_common.Parameters.attestation_lag + + Tezos_dal_node_lib.Constants.validation_slack + 1 + in + let* published = + publish_and_bake + ~slots:operators + ~from_level:start + ~to_level:stop + parameters + cryptobox + node + client + dal_node + in + let to_test = + List.filter + (fun (level, _index) -> level <= start + expected_exported_levels) + published + in + let file = Temp.file "export" in + (* Export with default levels (uses first_seen_level and last_processed_level) *) + let* () = + Dal_node.snapshot_export ~endpoint:(Node.as_rpc_endpoint node) dal_node file + in + (* Verify the snapshot file was created *) + let* file_exists = Lwt_unix.file_exists file in + let* () = + if file_exists then unit + else Test.fail "Snapshot export failed: file was not created at %s" file + in + (* Create a fresh DAL node using the exported snapshot data. *) + let fresh_dal_node = Dal_node.create ~node ~data_dir:file () in + let* () = Dal_node.init_config ~operator_profiles:operators fresh_dal_node in + let* () = Dal_node.run fresh_dal_node in + (* Compare slot statuses between the original node and the fresh one built from snapshot. *) + let* () = + Lwt_list.iter_s + (fun (slot_level, slot_index) -> + let* status_orig = + Dal_RPC.( + call dal_node @@ get_level_slot_status ~slot_level ~slot_index) + in + let* status_fresh = + Dal_RPC.( + call fresh_dal_node @@ get_level_slot_status ~slot_level ~slot_index) + in + let pp_status s = Format.asprintf "%a" Dal_RPC.pp_slot_id_status s in + Check.(status_fresh = status_orig) + ~__LOC__ + Dal.Check.slot_id_status_typ + ~error_msg: + (Format.sprintf + "Snapshot import mismatch for slot (level=%d,index=%d): got %s, \ + expected %s" + slot_level + slot_index + (pp_status status_fresh) + (pp_status status_orig)) ; + unit) + to_test + in + Log.info "Snapshot export test passed: file created at %s" file ; + unit + +let test_dal_node_import_l1_snapshot _protocol parameters _cryptobox node client dal_node = let* commitment, proof = Helpers.( @@ -2737,7 +2820,7 @@ let test_attester_with_daemon protocol parameters cryptobox node client dal_node "attestation_lag (%L) should be higher than [max_level - first_level] \ (which is %R)" ; let wait_block_processing = wait_for_layer1_head dal_node max_level in - let* () = + let* _ = publish_and_bake ~from_level:first_level ~to_level:max_level @@ -2859,7 +2942,7 @@ let test_attester_with_bake_for _protocol parameters cryptobox node client wait_for_layer1_final_block dal_node (wait_level - 2) in - let* () = + let* _ = publish_and_bake ~from_level:first_level ~to_level:(intermediary_level + lag) @@ -2870,7 +2953,7 @@ let test_attester_with_bake_for _protocol parameters cryptobox node client client dal_node in - let* () = + let* _ = publish_and_bake ~from_level:(intermediary_level + lag + 1) ~to_level:last_level @@ -11456,10 +11539,18 @@ let register ~protocols = test_attester_with_daemon protocols ; scenario_with_layer1_and_dal_nodes - ~tags:["snapshot"; "import"] + ~tags:["l1_snapshot"; "import"] ~operator_profiles:[0] - "dal node import snapshot" - test_dal_node_import_snapshot + "dal node import l1 snapshot" + test_dal_node_import_l1_snapshot + protocols ; + scenario_with_layer1_and_dal_nodes + ~tags:["snapshot"; "export"] + ~operator_profiles:[0] + ~l1_history_mode:(Custom Node.Archive) + ~history_mode:Full + "dal node snapshot export" + (test_dal_node_snapshot_export ~operators:[0]) protocols ; (* Tests with layer1 and dal nodes (with p2p/GS) *)