From d59c3f14e81e0e7fdf39d3051197dc602b56e455 Mon Sep 17 00:00:00 2001 From: Julien Sagot Date: Fri, 28 Nov 2025 00:26:07 +0100 Subject: [PATCH 1/2] DAL: snapshots: redefine export function as merging function --- src/lib_dal_node/snapshot.ml | 48 +++++++++++++++++++++++++++--------- 1 file changed, 36 insertions(+), 12 deletions(-) diff --git a/src/lib_dal_node/snapshot.ml b/src/lib_dal_node/snapshot.ml index fd889ecbfb88..dcd91272631c 100644 --- a/src/lib_dal_node/snapshot.ml +++ b/src/lib_dal_node/snapshot.ml @@ -69,11 +69,11 @@ let kvs_copy_value src_store dst_store file_layout file key = key value -module Export = struct +module Merge = struct (** Export a filtered subset of the skip_list SQLite database by iterating through levels in the range [min_published_level, max_published_level] and copying the data using Dal_store_sqlite3 functions. *) - let export_skip_list ~src ~dst ~min_published_level ~max_published_level = + let merge_skip_list ~src ~dst ~min_published_level ~max_published_level = let open Lwt_result_syntax in let*! () = Lwt_utils_unix.create_dir dst in (* Initialize destination database with empty schema *) @@ -132,8 +132,8 @@ module Export = struct (** Export slots for all published slots in the given level range. Copies slot files from source to destination directory. *) - let export_slots ~cryptobox ~src ~dst ~min_published_level - ~max_published_level = + let merge_slots ~cryptobox ~src ~dst ~min_published_level ~max_published_level + = let open Lwt_result_syntax in let*! () = Lwt_utils_unix.create_dir dst in let* src_store = @@ -159,7 +159,7 @@ module Export = struct (** Export shards for all slots in the given level range. Copies shard files from source to destination directory. *) - let export_shards ~src ~dst ~min_published_level ~max_published_level = + let merge_shards ~src ~dst ~min_published_level ~max_published_level = let open Lwt_result_syntax in let*! () = Lwt_utils_unix.create_dir dst in let* src_store = @@ -210,7 +210,7 @@ module Export = struct let* _ = Key_value_store.close dst_store in return_unit) - let export ~data_dir:src_data_dir ~config_file ~endpoint ~min_published_level + let merge ~data_dir:src_data_dir ~config_file ~endpoint ~min_published_level ~max_published_level dst_data_dir = let open Lwt_result_syntax in let* config = Configuration_file.load ~config_file in @@ -282,7 +282,7 @@ module Export = struct let* () = let src_slot_dir = src_root_dir // Store.Stores_dirs.slot in let dst_slot_dir = dst_root_dir // Store.Stores_dirs.slot in - export_slots + merge_slots ~cryptobox ~src:src_slot_dir ~dst:dst_slot_dir @@ -293,7 +293,7 @@ module Export = struct let* () = let src_shard_dir = src_root_dir // Store.Stores_dirs.shard in let dst_shard_dir = dst_root_dir // Store.Stores_dirs.shard in - export_shards + merge_shards ~src:src_shard_dir ~dst:dst_shard_dir ~min_published_level @@ -307,7 +307,7 @@ module Export = struct let src_skip_list_dir = src_root_dir // Store.Stores_dirs.skip_list_cells in - export_skip_list + merge_skip_list ~src:src_skip_list_dir ~dst:dst_skip_list_dir ~min_published_level @@ -318,16 +318,40 @@ module Export = struct let* first_seen_store = Store.First_seen_level.init ~root_dir:dst_root_dir in + let* existing_first_seen = Store.First_seen_level.load first_seen_store in + let new_first_seen_level = + match existing_first_seen with + | None -> min_published_level + | Some existing -> min existing min_published_level + in let* () = - Store.First_seen_level.save first_seen_store min_published_level + Store.First_seen_level.save first_seen_store new_first_seen_level in let* last_processed_store = Store.Last_processed_level.init ~root_dir:dst_root_dir in + let* existing_last_processed = + Store.Last_processed_level.load last_processed_store + in + let new_last_processed_level = + match existing_last_processed with + | None -> max_published_level + | Some existing -> max existing max_published_level + in let* () = - Store.Last_processed_level.save last_processed_store max_published_level + Store.Last_processed_level.save + last_processed_store + new_last_processed_level in return_unit end -let export = Export.export +let export ~data_dir ~config_file ~endpoint ~min_published_level + ~max_published_level dst = + Merge.merge + ~data_dir + ~config_file + ~endpoint + ~min_published_level + ~max_published_level + dst -- GitLab From cee088f2af9b19200584a48b02592eed1516f33d Mon Sep 17 00:00:00 2001 From: Julien Sagot Date: Fri, 28 Nov 2025 00:42:20 +0100 Subject: [PATCH 2/2] DAL: implement snapshot import as reverse export --- src/lib_dal_node/cli.ml | 114 ++++++++++++++++++++++------------ src/lib_dal_node/snapshot.ml | 74 ++++++++++++++-------- src/lib_dal_node/snapshot.mli | 16 +++++ tezt/lib_tezos/dal_node.ml | 14 +++-- tezt/lib_tezos/dal_node.mli | 15 +++++ tezt/tests/dal.ml | 19 ++++-- 6 files changed, 178 insertions(+), 74 deletions(-) diff --git a/src/lib_dal_node/cli.ml b/src/lib_dal_node/cli.ml index 6d366a3cb630..8495cec49f90 100644 --- a/src/lib_dal_node/cli.ml +++ b/src/lib_dal_node/cli.ml @@ -1007,48 +1007,34 @@ module Snapshot = struct `P "Entrypoint for snapshot management commands."; ] + let min_published_level_arg doc = + let open Term in + let parse, pp = positive_int_format "min-published-level" in + make_arg ~doc ~parse ~placeholder:"INT" ~pp "min-published-level" + + let min_published_level doc = + Term.arg_to_cmdliner (min_published_level_arg doc) + + let max_published_level_arg doc = + let open Term in + let parse, pp = positive_int_format "max-published-level" in + make_arg ~doc ~parse ~placeholder:"INT" ~pp "max-published-level" + + let max_published_level doc = + Term.arg_to_cmdliner (max_published_level_arg doc) + + let file_path doc = + let open Cmdliner in + Arg.(required & pos 0 (some string) None & info ~docv:"DIR" ~doc []) + module Export = struct - let man = [`S "DESCRIPTION"; `P "Export DAL node data to a snapshot file."] + let man = + [`S "DESCRIPTION"; `P "Export DAL node data to another data directory."] let info = let version = Tezos_version_value.Bin_version.octez_version_string in Cmdliner.Cmd.info ~doc:"Export snapshot" ~man ~version "export" - let min_published_level_arg = - let open Term in - let parse, pp = positive_int_format "min-published-level" in - make_arg - ~doc:"Minimum published level to include in the snapshot." - ~parse - ~placeholder:"INT" - ~pp - "min-published-level" - - let min_published_level = Term.arg_to_cmdliner min_published_level_arg - - let max_published_level_arg = - let open Term in - let parse, pp = positive_int_format "max-published-level" in - make_arg - ~doc:"Maximum published level to include in the snapshot." - ~parse - ~placeholder:"INT" - ~pp - "max-published-level" - - let max_published_level = Term.arg_to_cmdliner max_published_level_arg - - let file_path = - let open Cmdliner in - Arg.( - required - & pos 0 (some string) None - & info - ~docv:"DIR" - ~doc:"Path to the snapshot directory where to export." - ~docs:"OPTIONS" - []) - let action min_published_level max_published_level data_dir endpoint config_file file_path = let data_dir = @@ -1073,8 +1059,58 @@ module Snapshot = struct Cmdliner.Term.( map wrap_action - (const action $ min_published_level $ max_published_level - $ Term.data_dir $ Term.endpoint $ Term.config_file $ file_path)) + (const action + $ min_published_level "Minimum published level to export." + $ max_published_level "Maximum published level to export." + $ Term.data_dir $ Term.endpoint $ Term.config_file + $ file_path "Path to the destination data directory.")) + + let cmd = Cmdliner.Cmd.v info term + end + + module Import = struct + let man = + [`S "DESCRIPTION"; `P "Import DAL node data from another data directory."] + + let no_check = + let open Cmdliner in + let doc = "Skip validation checks during import." in + Arg.(value & flag & info ~doc ["no-check"]) + + let info = + let version = Tezos_version_value.Bin_version.octez_version_string in + Cmdliner.Cmd.info ~doc:"Import snapshot" ~man ~version "import" + + let action min_published_level max_published_level data_dir endpoint + config_file no_check file_path = + let data_dir = + Option.value ~default:Configuration_file.default.data_dir data_dir + in + let config_file = + Option.value + ~default:(Configuration_file.default_config_file data_dir) + config_file + in + let min_level = Option.map Int32.of_int min_published_level in + let max_level = Option.map Int32.of_int max_published_level in + Snapshot.import + ~check:(not no_check) + ~data_dir + ~config_file + ~endpoint + ~min_published_level:min_level + ~max_published_level:max_level + file_path + + let term = + Cmdliner.Term.( + map + wrap_action + (const action + $ min_published_level "Minimum published level to import." + $ max_published_level "Maximum published level to import." + $ Term.data_dir $ Term.endpoint $ Term.config_file $ no_check + $ file_path "Path to the source data directory to import from.")) let cmd = Cmdliner.Cmd.v info term end @@ -1085,7 +1121,7 @@ module Snapshot = struct let version = Tezos_version_value.Bin_version.octez_version_string in Cmdliner.Cmd.info ~doc:"Snapshot management" ~man ~version "snapshot" in - Cmdliner.Cmd.group ~default info [Export.cmd] + Cmdliner.Cmd.group ~default info [Export.cmd; Import.cmd] end module Debug = struct diff --git a/src/lib_dal_node/snapshot.ml b/src/lib_dal_node/snapshot.ml index dcd91272631c..f1acc9a2d061 100644 --- a/src/lib_dal_node/snapshot.ml +++ b/src/lib_dal_node/snapshot.ml @@ -210,8 +210,8 @@ module Merge = struct let* _ = Key_value_store.close dst_store in return_unit) - let merge ~data_dir:src_data_dir ~config_file ~endpoint ~min_published_level - ~max_published_level dst_data_dir = + let merge ~frozen_only ~src_root_dir ~config_file ~endpoint + ~min_published_level ~max_published_level ~dst_root_dir = let open Lwt_result_syntax in let* config = Configuration_file.load ~config_file in let endpoint = Option.value ~default:config.endpoint endpoint in @@ -231,21 +231,6 @@ module Merge = struct (* Set crypto box share size hook. *) Value_size_hooks.set_share_size (Cryptobox.Internal_for_tests.encoded_share_size cryptobox) ; - let store_path data_dir = - (* We only need data_dir field, so we can just use default as base. *) - Configuration_file.store_path {Configuration_file.default with data_dir} - in - let src_root_dir = store_path src_data_dir in - let dst_root_dir = store_path dst_data_dir in - let*! dst_exists = Lwt_unix.file_exists dst_root_dir in - let* () = - if dst_exists then - failwith - "Destination directory %s already exists. Please remove it or choose \ - a different destination." - dst_root_dir - else return_unit - in let* chain_id = read_from_store ~root_dir:src_root_dir (module Store.Chain_id) in @@ -267,12 +252,14 @@ module Merge = struct max_published_level to this value to avoid exporting stale data. *) let max_published_level = let latest_frozen_level = - Int32.( - sub - last_processed_level - (of_int - (Constants.validation_slack + proto_parameters.attestation_lag - + 1))) + if frozen_only then + Int32.( + sub + last_processed_level + (of_int + (Constants.validation_slack + proto_parameters.attestation_lag + + 1))) + else last_processed_level in match max_published_level with | None -> latest_frozen_level @@ -348,10 +335,47 @@ end let export ~data_dir ~config_file ~endpoint ~min_published_level ~max_published_level dst = + let open Lwt_result_syntax in + let store_path data_dir = + Configuration_file.store_path {Configuration_file.default with data_dir} + in + let src_root_dir = store_path data_dir in + let dst_root_dir = store_path dst in + let*! dst_exists = Lwt_unix.file_exists dst_root_dir in + let* () = + if dst_exists then + failwith + "Destination directory %s already exists. Please remove it or choose a \ + different destination." + dst_root_dir + else return_unit + in Merge.merge - ~data_dir + ~frozen_only:true + ~src_root_dir ~config_file ~endpoint ~min_published_level ~max_published_level - dst + ~dst_root_dir + +let import ?(check = true) ~data_dir:dst ~config_file ~endpoint + ~min_published_level ~max_published_level src = + if check then + failwith + "Import with checks is not yet implemented. Use --no-check if you want \ + to bypass imported data validation.\n" + else + let store_path data_dir = + Configuration_file.store_path {Configuration_file.default with data_dir} + in + let src_root_dir = store_path src in + let dst_root_dir = store_path dst in + Merge.merge + ~frozen_only:false + ~src_root_dir + ~config_file + ~endpoint + ~min_published_level + ~max_published_level + ~dst_root_dir diff --git a/src/lib_dal_node/snapshot.mli b/src/lib_dal_node/snapshot.mli index e8a9f19a7c7d..d6884604d13c 100644 --- a/src/lib_dal_node/snapshot.mli +++ b/src/lib_dal_node/snapshot.mli @@ -23,3 +23,19 @@ val export : max_published_level:int32 option -> string -> unit tzresult Lwt.t + +(** [import ~data_dir ~config_file ~endpoint ~min_published_level + ~max_published_level path ] imports DAL node store data from [path] and + merges it into [data_dir]. + This is equivalent to exporting [data_dir] into [path], and this is + basically implemented by the same underlying function with swapped + argument positions. *) +val import : + ?check:bool -> + data_dir:string -> + config_file:string -> + endpoint:Uri.t option -> + min_published_level:int32 option -> + max_published_level:int32 option -> + string -> + unit tzresult Lwt.t diff --git a/tezt/lib_tezos/dal_node.ml b/tezt/lib_tezos/dal_node.ml index 36023b54e48a..60111088e1b4 100644 --- a/tezt/lib_tezos/dal_node.ml +++ b/tezt/lib_tezos/dal_node.ml @@ -567,8 +567,8 @@ let debug_print_store_schemas ?path ?hooks () = let process = Process.spawn ?hooks path @@ args in Process.check process -let snapshot_export dal_node ?endpoint ?min_published_level ?max_published_level - output_file = +let snapshot_aux cmd dal_node ?(extra = []) ?endpoint ?min_published_level + ?max_published_level output_file = let data_dir = data_dir dal_node in let endpoint_args = match endpoint with @@ -586,8 +586,8 @@ let snapshot_export dal_node ?endpoint ?min_published_level ?max_published_level | Some level -> ["--max-published-level"; Int32.to_string level] in let args = - ["snapshot"; "export"] @ ["--data-dir"; data_dir] @ endpoint_args - @ min_level_args @ max_level_args @ [output_file] + ["snapshot"; cmd] @ ["--data-dir"; data_dir] @ endpoint_args + @ min_level_args @ max_level_args @ extra @ [output_file] in let path = if use_baker_to_start_dal_node = Some true then @@ -597,6 +597,12 @@ let snapshot_export dal_node ?endpoint ?min_published_level ?max_published_level let process = Process.spawn path args in Process.check process +let snapshot_export = snapshot_aux "export" ~extra:[] + +let snapshot_import t ?(no_check = false) = + let no_check_args = if no_check then ["--no-check"] else [] in + snapshot_aux ~extra:no_check_args "import" t + module Proxy = struct type answer = [`Response of string | `Stream of Cohttp_lwt.Body.t] diff --git a/tezt/lib_tezos/dal_node.mli b/tezt/lib_tezos/dal_node.mli index 5326d184b217..fd8f1f6dfe1d 100644 --- a/tezt/lib_tezos/dal_node.mli +++ b/tezt/lib_tezos/dal_node.mli @@ -272,6 +272,21 @@ val snapshot_export : string -> unit Lwt.t +(** [snapshot_import dal_node ?endpoint ?min_published_level + ?max_published_level input_file] imports a snapshot into the DAL node's + store from [input_file]. + If [endpoint] is provided, it overrides the endpoint in the config file. + [min_published_level] and [max_published_level] are optional level ranges + to import. *) +val snapshot_import : + t -> + ?no_check:bool -> + ?endpoint:Endpoint.t -> + ?min_published_level:int32 -> + ?max_published_level:int32 -> + string -> + unit Lwt.t + (** The Proxy module provides functionality to create a proxy server that can intercept and mock responses for DAL node requests. *) module Proxy : sig diff --git a/tezt/tests/dal.ml b/tezt/tests/dal.ml index 2d0b4ab9c2c2..534499302e74 100644 --- a/tezt/tests/dal.ml +++ b/tezt/tests/dal.ml @@ -2328,8 +2328,8 @@ let test_dal_node_test_get_level_slot_content _protocol parameters _cryptobox %L, got = %R)" ; unit -let test_dal_node_snapshot_export ~operators _protocol parameters cryptobox node - client dal_node = +let test_dal_node_snapshot ~operators _protocol parameters cryptobox node client + dal_node = let* start = Node.get_level node in let expected_exported_levels = 5 in let stop = @@ -2365,8 +2365,15 @@ let test_dal_node_snapshot_export ~operators _protocol parameters cryptobox node else Test.fail "Snapshot export failed: file was not created at %s" file in (* Create a fresh DAL node using the exported snapshot data. *) - let fresh_dal_node = Dal_node.create ~node ~data_dir:file () in + let fresh_dal_node = Dal_node.create ~node () in let* () = Dal_node.init_config ~operator_profiles:operators fresh_dal_node in + let* () = + Dal_node.snapshot_import + ~no_check:true (* Snapshot data validation is not implemented yet *) + ~endpoint:(Node.as_rpc_endpoint node) + fresh_dal_node + file + in let* () = Dal_node.run fresh_dal_node in (* Compare slot statuses between the original node and the fresh one built from snapshot. *) let* () = @@ -11545,12 +11552,12 @@ let register ~protocols = test_dal_node_import_l1_snapshot protocols ; scenario_with_layer1_and_dal_nodes - ~tags:["snapshot"; "export"] + ~tags:["snapshot"] ~operator_profiles:[0] ~l1_history_mode:(Custom Node.Archive) ~history_mode:Full - "dal node snapshot export" - (test_dal_node_snapshot_export ~operators:[0]) + "dal node snapshot export/import" + (test_dal_node_snapshot ~operators:[0]) protocols ; (* Tests with layer1 and dal nodes (with p2p/GS) *) -- GitLab