diff --git a/src/api/api_handlers.ml b/src/api/api_handlers.ml index ab1a1c22fa971d0ccb9482f1e0914e87515d4eb8..ecf8a469d44151bb04dcd61787de9e3b6b5e16a5 100644 --- a/src/api/api_handlers.ml +++ b/src/api/api_handlers.ml @@ -94,16 +94,16 @@ let get_latest_block_number_db_eth _ _ = log_func_lwt ~category:"service" "get_latest_block_number_db_eth" "" @@ fun () -> let> block_number = - Db_psql_eth.Eth_db_psql_store_information - .get_latest_block_trace_latest_stored () in + Db_psql_eth.Eth_db_psql_store_information.get_latest_block_trace_stored () + in EzAPIServer.return_ok block_number [@@service Api_services.get_latest_block_number_db_eth] let get_latest_block_db_eth _ _ = log_func_lwt ~category:"service" "get_latest_block_db_eth" "" @@ fun () -> let> block_number_opt = - Db_psql_eth.Eth_db_psql_store_information - .get_latest_block_stats_latest_stored () in + Db_psql_eth.Eth_db_psql_store_information.get_latest_block_stats_stored () + in match block_number_opt with | None -> EzAPIServer.return ~code:400 diff --git a/src/ethereum_indexer/eth_db_psql/eth_db_psql_common.ml b/src/ethereum_indexer/eth_db_psql/eth_db_psql_common.ml index db7c9db4cf9c704336f288d2d3a8266a5f80add8..b756f5e78c66c2c8971921cb3f4be9492b73b215 100644 --- a/src/ethereum_indexer/eth_db_psql/eth_db_psql_common.ml +++ b/src/ethereum_indexer/eth_db_psql/eth_db_psql_common.ml @@ -105,7 +105,9 @@ CREATE INDEX IF NOT EXISTS idx_address ON addresses(address); block_gas_limit BIGINT NOT NULL, block_gas_used BIGINT NOT NULL, block_timestamp BIGINT NOT NULL, - block_base_fee_per_gas NUMERIC DEFAULT 0 NOT NULL + block_base_fee_per_gas NUMERIC DEFAULT 0 NOT NULL, + block_is_stored BOOLEAN NOT NULL DEFAULT FALSE, + block_is_trace_stored BOOLEAN NOT NULL DEFAULT FALSE )|}; {|CREATE TABLE IF NOT EXISTS block_uncles ( @@ -241,6 +243,7 @@ ON transactions_receipts_logs_topics ( stat_block_eth_transfer_total_gas_used BIGINT NOT NULL, stat_block_eth_transfer_operation_number BIGINT NOT NULL, stat_block_eth_transfer_operation_total_gas_used BIGINT NOT NULL, + stat_block_is_stored BOOLEAN NOT NULL DEFAULT FALSE, FOREIGN KEY (stat_block_number) REFERENCES blocks(block_number) ON DELETE CASCADE )|}; {|CREATE @@ -396,48 +399,6 @@ CREATE TABLE IF NOT EXISTS block_validator ( block_number BIGINT PRIMARY KEY, FOREIGN KEY (block_number) REFERENCES blocks(block_number) ); -|}; - {| -CREATE TABLE IF NOT EXISTS store_block_information ( - store_block_information_block_number BIGINT PRIMARY KEY, - store_block_information_pending BOOLEAN NOT NULL DEFAULT FALSE, - store_block_information_stored BOOLEAN NOT NULL DEFAULT FALSE -); -|}; - {| -CREATE TABLE IF NOT EXISTS store_block_information_trace ( - store_block_information_trace_block_number BIGINT PRIMARY KEY, - store_block_information_trace_pending BOOLEAN NOT NULL DEFAULT FALSE, - store_block_information_trace_stored BOOLEAN NOT NULL DEFAULT FALSE -); -|}; - {| -CREATE TABLE IF NOT EXISTS store_block_information_latest ( - store_block_information_latest_block_number BIGINT PRIMARY KEY, - store_block_information_latest_pending BOOLEAN NOT NULL DEFAULT FALSE, - store_block_information_latest_stored BOOLEAN NOT NULL DEFAULT FALSE -); -|}; - {| -CREATE TABLE IF NOT EXISTS store_block_information_trace_latest ( - store_block_information_trace_latest_block_number BIGINT PRIMARY KEY, - store_block_information_trace_latest_pending BOOLEAN NOT NULL DEFAULT FALSE, - store_block_information_trace_latest_stored BOOLEAN NOT NULL DEFAULT FALSE -); -|}; - {| -CREATE TABLE IF NOT EXISTS store_block_information_stats_latest ( - store_block_information_stats_latest_block_number BIGINT PRIMARY KEY, - store_block_information_stats_latest_pending BOOLEAN NOT NULL DEFAULT FALSE, - store_block_information_stats_latest_stored BOOLEAN NOT NULL DEFAULT FALSE -); -|}; - {| -CREATE TABLE IF NOT EXISTS store_block_information_stats ( - store_block_information_stats_block_number BIGINT PRIMARY KEY, - store_block_information_stats_pending BOOLEAN NOT NULL DEFAULT FALSE, - store_block_information_stats_stored BOOLEAN NOT NULL DEFAULT FALSE -); |}; {| CREATE TABLE IF NOT EXISTS sandwich_token_swaped ( diff --git a/src/ethereum_indexer/eth_db_psql/eth_db_psql_helpers.ml b/src/ethereum_indexer/eth_db_psql/eth_db_psql_helpers.ml index ea15e80a8ae5e997f25c0943e8fea9a97ff1025f..ccc14bd33a6771ca410e49a7ad4bc814bce6fcfd 100644 --- a/src/ethereum_indexer/eth_db_psql/eth_db_psql_helpers.ml +++ b/src/ethereum_indexer/eth_db_psql/eth_db_psql_helpers.ml @@ -34,7 +34,8 @@ let row_block ; block_state_root : string ; block_timestamp : bint64 ; block_total_difficulty : string option - ; block_transactions_root : string >) miner = + ; block_transactions_root : string + ; .. >) miner = { number = Int64.to_int r#block_number; hash = Option.map b r#block_hash; @@ -231,7 +232,8 @@ let row_block_stat ; stat_block_eth_transfer_total_gas_used : bint64 ; stat_block_gas_limit : bint64 ; stat_block_gas_used : bint64 - ; stat_block_number : bint64 >) = + ; stat_block_number : bint64 + ; .. >) = { sb_gas_used = Int64.to_int r#stat_block_gas_used; sb_gas_limit = Int64.to_int r#stat_block_gas_limit; diff --git a/src/ethereum_indexer/eth_db_psql/eth_db_psql_helpers.mli b/src/ethereum_indexer/eth_db_psql/eth_db_psql_helpers.mli index faa639d28e1e8a474f8511fc99c1391f434405dd..a2f93b0b0053e3112e5aec1a9ee36948a7a5a7be 100644 --- a/src/ethereum_indexer/eth_db_psql/eth_db_psql_helpers.mli +++ b/src/ethereum_indexer/eth_db_psql/eth_db_psql_helpers.mli @@ -23,7 +23,8 @@ val row_block : ; block_state_root : string ; block_timestamp : Eth.bint64 ; block_total_difficulty : string option - ; block_transactions_root : string > -> + ; block_transactions_root : string + ; .. > -> string option -> 'a Eth.block @@ -120,7 +121,8 @@ val row_block_stat : ; stat_block_eth_transfer_total_gas_used : Eth.bint64 ; stat_block_gas_limit : Eth.bint64 ; stat_block_gas_used : Eth.bint64 - ; stat_block_number : Eth.bint64 > -> + ; stat_block_number : Eth.bint64 + ; .. > -> Common.Types.Ethereum_indexer.stat_block val row_db_contract_information : diff --git a/src/ethereum_indexer/eth_db_psql/eth_db_psql_raw.ml b/src/ethereum_indexer/eth_db_psql/eth_db_psql_raw.ml index 2189f51a12d2ba51beb9e496f7d2d4c60cb81fd9..afa08bc1537e9b23ad54fc0b081c92a922c9b5bb 100644 --- a/src/ethereum_indexer/eth_db_psql/eth_db_psql_raw.ml +++ b/src/ethereum_indexer/eth_db_psql/eth_db_psql_raw.ml @@ -1177,3 +1177,119 @@ let get_block_traces ?dbh ~block_number () = let> r = get_trace_of_transaction_aux ~dbh x in Lwt.return r) l + +let delete_block_transaction ?dbh ~block_number () = + Log.log_func_lwt ~category:"delete_db" "delete_block_transaction" "" + @@ fun () -> + use "delete_block_transaction" dbh @@ fun dbh -> + let> _ = + [%pgsql + dbh + {|DELETE FROM transaction_blob_versioned_hashes WHERE transaction_blob_tx_id IN (SELECT transaction_id FROM transactions WHERE transaction_block_number = ${block_number});|}] + in + let> _ = + [%pgsql + dbh + {| + DELETE FROM transaction_access_list WHERE transaction_access_list_tx_id IN (SELECT transaction_id FROM transactions WHERE transaction_block_number = ${block_number}); + |}] + in + let> _ = + [%pgsql + dbh + {| + DELETE FROM tx_hashes WHERE tx_id IN (SELECT transaction_id FROM transactions WHERE transaction_block_number = ${block_number}); + |}] + in + let> _ = + [%pgsql + dbh + {|DELETE FROM transactions WHERE transaction_block_number = ${block_number};|}] + in + let> _ = + [%pgsql + dbh + {| +DELETE FROM transactions_receipts_logs_topics WHERE transactions_receipts_logs_topics_transaction_id IN (SELECT transaction_id FROM transactions WHERE transaction_block_number = ${block_number}); + |}] + in + let> _ = + [%pgsql + dbh + {| +DELETE FROM transactions_receipts_logs WHERE transactions_receipts_logs_block_number = ${block_number}; + |}] + in + let> _ = + [%pgsql + dbh + {| +DELETE FROM transactions_receipts WHERE transactions_receipts_block_number = ${block_number}; + |}] + in + let> _ = + [%pgsql + dbh + {| +DELETE FROM block_uncles WHERE block_uncles_block_number = ${block_number}; + |}] + in + let> _ = + [%pgsql + dbh + {| +DELETE FROM blocks WHERE block_number = ${block_number}; + |}] + in + Lwt.return_unit + +let delete_block_trace ?dbh ~block_number () = + Log.log_func_lwt ~category:"delete_db" "delete_block_transaction" "" + @@ fun () -> + use "delete_block_transaction" dbh @@ fun dbh -> + let> _ = + [%pgsql + dbh + {| +WITH + + traces AS ( + SELECT transaction_trace_id + FROM transaction_trace + WHERE transaction_trace_block_number= ${block_number} + ), + calls AS ( + SELECT transaction_trace_calls_id + FROM transaction_trace_calls + WHERE transaction_trace_calls_trace_id IN (SELECT transaction_trace_id FROM traces) + ), + logs AS ( + SELECT transaction_trace_log_id + FROM transaction_trace_log + WHERE transaction_trace_log_trace_calls_id IN (SELECT transaction_trace_calls_id FROM calls) + ), + + del_topics AS ( + DELETE FROM transaction_trace_log_topics + WHERE transaction_trace_log_topics_log_id IN (SELECT transaction_trace_log_id FROM logs) + ), + + del_logs AS ( + DELETE FROM transaction_trace_log + WHERE transaction_trace_log_trace_calls_id IN (SELECT transaction_trace_calls_id FROM calls) + ), + + del_calls AS ( + DELETE FROM transaction_trace_calls + WHERE transaction_trace_calls_trace_id IN (SELECT transaction_trace_id FROM traces) + ), + + del_traces AS ( + DELETE FROM transaction_trace + WHERE transaction_trace_block_number= ${block_number} + ) + +SELECT 1; +|}] + in + Lwt.return_unit diff --git a/src/ethereum_indexer/eth_db_psql/eth_db_psql_raw.mli b/src/ethereum_indexer/eth_db_psql/eth_db_psql_raw.mli index 0da4e9615b9a29b8d58c36e5738acd35bfca9882..dd6ac9cb3d33e70e5fa79e4c81d70159b4c96fe4 100644 --- a/src/ethereum_indexer/eth_db_psql/eth_db_psql_raw.mli +++ b/src/ethereum_indexer/eth_db_psql/eth_db_psql_raw.mli @@ -181,3 +181,15 @@ val get_block_traces : block_number:int64 -> unit -> Common.Types.Ethereum_indexer.debug_trace list Lwt.t + +val delete_block_transaction : + ?dbh:EzPG_lwt.PGOCaml.pa_pg_data EzPG_lwt.PGOCaml.t -> + block_number:int64 -> + unit -> + unit Lwt.t + +val delete_block_trace : + ?dbh:EzPG_lwt.PGOCaml.pa_pg_data EzPG_lwt.PGOCaml.t -> + block_number:int64 -> + unit -> + unit Lwt.t diff --git a/src/ethereum_indexer/eth_db_psql/eth_db_psql_stats.ml b/src/ethereum_indexer/eth_db_psql/eth_db_psql_stats.ml index 0ce57a044fce7a47126e3cbef71a74a464e2c20d..0ea462c3f750d103a2f7360fe50c0d49ebbc6573 100644 --- a/src/ethereum_indexer/eth_db_psql/eth_db_psql_stats.ml +++ b/src/ethereum_indexer/eth_db_psql/eth_db_psql_stats.ml @@ -492,3 +492,11 @@ $pagination let row_address = row_db_stat_address transaction_receipts_address address dsu_delta in Lwt.return_some { row_address with dsu_blocks } + +let delete_block_stats ?dbh ~block_number () = + Log.log_func_lwt ~category:"delete_db" "delete_block_stats" + (Int64.to_string block_number) + @@ fun () -> + use "delete_block_stats" dbh @@ fun dbh -> + [%pgsql + dbh {|DELETE FROM stat_blocks WHERE stat_block_number = ${block_number};|}] diff --git a/src/ethereum_indexer/eth_db_psql/eth_db_psql_stats.mli b/src/ethereum_indexer/eth_db_psql/eth_db_psql_stats.mli index 5fdf6f04deadcd8485fff06d6bbb32b70785634f..2f7175ebde92d27ce0e3a6f8e6b476844bacedfe 100644 --- a/src/ethereum_indexer/eth_db_psql/eth_db_psql_stats.mli +++ b/src/ethereum_indexer/eth_db_psql/eth_db_psql_stats.mli @@ -18,3 +18,9 @@ val get_address_stats_by_address : unit -> Eth.transaction_receipt Common.Types.Ethereum_indexer.db_address_stat option Lwt.t + +val delete_block_stats : + ?dbh:EzPG_lwt.PGOCaml.pa_pg_data EzPG_lwt.PGOCaml.t -> + block_number:int64 -> + unit -> + unit Lwt.t diff --git a/src/ethereum_indexer/eth_db_psql/eth_db_psql_store_information.ml b/src/ethereum_indexer/eth_db_psql/eth_db_psql_store_information.ml index c249ba36e0d80abada4b1e33a3721a17ba1d66ef..d544339a7fa98c3d8c6d21983fbc42eed8105bb4 100644 --- a/src/ethereum_indexer/eth_db_psql/eth_db_psql_store_information.ml +++ b/src/ethereum_indexer/eth_db_psql/eth_db_psql_store_information.ml @@ -3,7 +3,7 @@ open Eth open Common open Eth_db_psql_common -[%%pg { loose }] +[%%pg { loose; database = "ethereum" }] let block_count_gauge_name = "block_count" @@ -13,49 +13,37 @@ let block_count_trace_gauge_name = "block_trace_count" let block_count_trace_stats = ref None -let block_count_latest_gauge_name = "block_count" - -let block_count_latest_stats = ref None - -let block_count_trace_latest_gauge_name = "block_trace_count" - -let block_count_trace_latest_stats = ref None - let block_count_stats_gauge_name = "block_count" let block_count_stats_stats = ref None -let block_count_stats_latest_gauge_name = "block_stats_count" - -let block_count_stats_latest_stats = ref None - -let set_store_block_pending ?dbh ~block_number () = - Log.log_func_lwt ~category:"insert_db" "set_store_block_pending" - (Int64.to_string block_number) - @@ fun () -> - use "set_store_block_pending" dbh @@ fun dbh -> - [%pgsql - dbh - {| -INSERT INTO store_block_information -(store_block_information_block_number, store_block_information_pending,store_block_information_stored) -VALUES (${block_number},TRUE,FALSE) -|}] - -let get_latest_block_stored ?dbh () = +let get_latest_block_stored ?dbh ?old () = Log.log_func_lwt ~category:"fetch_db" "get_latest_block_stored" "" @@ fun () -> use "get_latest_block_stored" dbh @@ fun dbh -> - let> l = - [%pgsql.object - dbh - {| - select MAX(store_block_information_block_number) from store_block_information where store_block_information_stored = true; + match old with + | None -> ( + let> l = + [%pgsql.object + dbh + {| + select MAX(block_number) from blocks where block_is_stored = true; |}] - in - match l with - | [] -> Lwt.return_none - | x :: _ -> Lwt.return x#max + in + match l with + | [] -> Lwt.return_none + | x :: _ -> Lwt.return x#max) + | Some () -> ( + let> l = + [%pgsql.object + dbh + {| + select MIN(block_number) from blocks where block_is_stored = true; + |}] + in + match l with + | [] -> Lwt.return_none + | x :: _ -> Lwt.return x#min) let set_store_block_finish ?dbh ~block_number () = Log.log_func_lwt ~category:"update_db" "set_store_block_finish" @@ -66,10 +54,10 @@ let set_store_block_finish ?dbh ~block_number () = [%pgsql dbh {| -UPDATE store_block_information SET -store_block_information_pending = FALSE , store_block_information_stored = TRUE +UPDATE blocks SET +block_is_stored = TRUE WHERE -store_block_information_block_number = $block_number +block_number = $block_number |}] in let> count = @@ -85,219 +73,33 @@ store_block_information_block_number = $block_number let _ = Statistics.set_or_update_gauge block_count_gauge_name count in Lwt.return_unit -let reset_db_block_aux block_ids = - Log.log_func_lwt ~category:"reset_db" "reset_db_block_aux" "" @@ fun () -> - use "reset_db_block_aux" None @@ fun dbh -> - Lwt_list.iter_s - (fun x -> - let> _ = - [%pgsql - dbh - {|DELETE FROM transaction_blob_versioned_hashes WHERE transaction_blob_tx_id IN (SELECT transaction_id FROM transactions WHERE transaction_block_number = ${x});|}] - in - let> _ = - [%pgsql - dbh - {| - DELETE FROM transaction_access_list WHERE transaction_access_list_tx_id IN (SELECT transaction_id FROM transactions WHERE transaction_block_number = ${x}); - |}] - in - let> _ = - [%pgsql - dbh - {| - DELETE FROM tx_hashes WHERE tx_id IN (SELECT transaction_id FROM transactions WHERE transaction_block_number = ${x}); - |}] - in - let> _ = - [%pgsql - dbh - {|DELETE FROM transactions WHERE transaction_block_number = ${x};|}] - in - let> _ = - [%pgsql - dbh - {| -DELETE FROM transactions_receipts_logs_topics WHERE transactions_receipts_logs_topics_transaction_id IN (SELECT transaction_id FROM transactions WHERE transaction_block_number = ${x}); - |}] - in - let> _ = - [%pgsql - dbh - {| -DELETE FROM transactions_receipts_logs WHERE transactions_receipts_logs_block_number = ${x}; - |}] - in - let> _ = - [%pgsql - dbh - {| -DELETE FROM transactions_receipts WHERE transactions_receipts_block_number = ${x}; - |}] - in - let> _ = - [%pgsql - dbh - {| -DELETE FROM block_uncles WHERE block_uncles_block_number = ${x}; - |}] - in - let> _ = - [%pgsql - dbh {| -DELETE FROM blocks WHERE block_number = ${x}; - |}] - in - Lwt.return_unit) - block_ids - -let reset_db_block () = - Log.log_func_lwt ~category:"reset_db" "reset_db_to_block" "" @@ fun () -> - use "reset_db_to_block" None @@ fun dbh -> - let> block_ids = - [%pgsql.object - dbh - {| - SELECT store_block_information_block_number - FROM store_block_information - WHERE store_block_information_pending = TRUE - |}] - in - let block_ids = - List.map (fun x -> x#store_block_information_block_number) block_ids in - let> _ = reset_db_block_aux block_ids in - Lwt_list.iter_s - (fun x -> - [%pgsql +let get_latest_block_trace_stored ?dbh ?old () = + Log.log_func_lwt ~category:"fetch_db" "get_latest_block_trace_stored" "" + @@ fun () -> + use "get_latest_block_trace_stored" dbh @@ fun dbh -> + match old with + | None -> ( + let> l = + [%pgsql.object dbh {| - DELETE FROM store_block_information WHERE store_block_information_block_number = ${x}; - |}]) - block_ids - -let set_store_block_latest_pending ?dbh ~block_number () = - Log.log_func_lwt ~category:"insert_db" "set_store_block_latest_pending" - (Int64.to_string block_number) - @@ fun () -> - use "set_store_block_latest_pending" dbh @@ fun dbh -> - [%pgsql - dbh - {| - INSERT INTO store_block_information_latest - (store_block_information_latest_block_number, store_block_information_latest_pending,store_block_information_latest_stored) - VALUES (${block_number},TRUE,FALSE) - |}] - -let get_latest_block_latest_stored ?dbh () = - Log.log_func_lwt ~category:"fetch_db" "get_latest_block_latest_stored" "" - @@ fun () -> - use "get_latest_block_latest_stored" dbh @@ fun dbh -> - let> l = - [%pgsql.object - dbh - {| - select MAX(store_block_information_latest_block_number) from store_block_information_latest where store_block_information_latest_stored = true; + select MAX(block_number) from blocks where block_is_trace_stored = true; |}] - in - match l with - | [] -> Lwt.return_none - | x :: _ -> Lwt.return x#max - -let get_oldest_block_latest_stored ?dbh () = - Log.log_func_lwt ~category:"fetch_db" "get_oldest_block_latest_stored" "" - @@ fun () -> - use "get_oldest_block_latest_stored" dbh @@ fun dbh -> - let> l = - [%pgsql.object - dbh - {| - select MIN(store_block_information_trace_block_number) from store_block_information_trace where store_block_information_trace_stored = true; - |}] - in - match l with - | [] -> Lwt.return_none - | x :: _ -> Lwt.return x#min - -let set_store_block_latest_finish ?dbh ~block_number () = - Log.log_func_lwt ~category:"update_db" "set_store_block_latest_finish" - (Int64.to_string block_number) - @@ fun () -> - use "set_store_block_latest_finish" dbh @@ fun dbh -> - let> _ = - [%pgsql - dbh - {| - UPDATE store_block_information_latest SET - store_block_information_latest_pending = FALSE , store_block_information_latest_stored = TRUE - WHERE - store_block_information_latest_block_number = $block_number - |}] - in - let> count = - match !block_count_latest_stats with - | None -> ( - let> r = get_latest_block_latest_stored () in - match r with - | None -> Lwt.fail_with "LAST BLOCK STORED EMPTY" - | Some value -> Lwt.return @@ Int64.to_float value) - | Some block_count_stats_tmp -> Lwt.return @@ (1. +. block_count_stats_tmp) - in - block_count_latest_stats := Some count ; - let _ = Statistics.set_or_update_gauge block_count_latest_gauge_name count in - Lwt.return_unit - -let reset_db_block_latest () = - Log.log_func_lwt ~category:"reset_db" "reset_db_block_latest" "" @@ fun () -> - use "reset_db_block_latest" None @@ fun dbh -> - let> block_ids = - [%pgsql.object - dbh - {| - SELECT store_block_information_latest_block_number - FROM store_block_information_latest - WHERE store_block_information_latest_pending = TRUE - |}] - in - let block_ids = - List.map (fun x -> x#store_block_information_latest_block_number) block_ids - in - let> _ = reset_db_block_aux block_ids in - Lwt_list.iter_s - (fun x -> - [%pgsql + in + match l with + | [] -> Lwt.return_none + | x :: _ -> Lwt.return x#max) + | Some () -> ( + let> l = + [%pgsql.object dbh {| - DELETE FROM store_block_information_latest WHERE store_block_information_latest_block_number = ${x}; - |}]) - block_ids - -let set_store_block_trace_pending ?dbh ~block_number () = - Log.log_func_lwt ~category:"insert_db" "set_store_block_trace_pending" - (Int64.to_string block_number) - @@ fun () -> - use "set_store_block_trace_pending" dbh @@ fun dbh -> - [%pgsql - dbh - {| - INSERT INTO store_block_information_trace - (store_block_information_trace_block_number, store_block_information_trace_pending,store_block_information_trace_stored) - VALUES (${block_number},TRUE,FALSE) + select MIN(block_number) from blocks where block_is_trace_stored = true; |}] - -let get_latest_block_trace_stored ?dbh () = - Log.log_func_lwt ~category:"fetch_db" "get_latest_block_trace_stored" "" - @@ fun () -> - use "get_latest_block_trace_stored" dbh @@ fun dbh -> - let> l = - [%pgsql.object - dbh - {| - select MAX(store_block_information_trace_block_number) from store_block_information_trace where store_block_information_trace_stored = true; - |}] - in - match l with - | [] -> Lwt.return_none - | x :: _ -> Lwt.return x#max + in + match l with + | [] -> Lwt.return_none + | x :: _ -> Lwt.return x#min) let set_store_block_trace_finish ?dbh ~block_number () = Log.log_func_lwt ~category:"update_db" "set_store_block_trace_finish" @@ -308,10 +110,10 @@ let set_store_block_trace_finish ?dbh ~block_number () = [%pgsql dbh {| - UPDATE store_block_information_trace SET - store_block_information_trace_pending = FALSE , store_block_information_trace_stored = TRUE + UPDATE blocks SET + block_is_trace_stored = TRUE WHERE - store_block_information_trace_block_number = $block_number + block_number = $block_number |}] in let> count = @@ -327,237 +129,29 @@ let set_store_block_trace_finish ?dbh ~block_number () = let _ = Statistics.set_or_update_gauge block_count_trace_gauge_name count in Lwt.return_unit -let reset_db_block_trace_aux block_ids = - Log.log_func_lwt ~category:"reset_db" "reset_db_block_trace_aux" "" - @@ fun () -> - use "reset_db_block_trace_aux" None @@ fun dbh -> - Lwt_list.iter_s - (fun x -> - let> _ = - [%pgsql - dbh - {|DELETE FROM transaction_trace_log_topics WHERE transaction_trace_log_topics_log_id IN - (SELECT transaction_trace_log_id FROM transaction_trace_log WHERE transaction_trace_log_trace_calls_id IN - (SELECT transaction_trace_calls_id FROM transaction_trace_calls WHERE transaction_trace_calls_trace_id IN - (SELECT transaction_trace_id FROM transaction_trace WHERE transaction_trace_transaction_id IN - (SELECT transaction_id FROM transactions WHERE transaction_block_number = ${x}))));|}] - in - let> _ = - [%pgsql - dbh - {|DELETE FROM transaction_trace_log WHERE transaction_trace_log_trace_calls_id IN - (SELECT transaction_trace_calls_id FROM transaction_trace_calls WHERE transaction_trace_calls_trace_id IN - (SELECT transaction_trace_id FROM transaction_trace WHERE transaction_trace_transaction_id IN - (SELECT transaction_id FROM transactions WHERE transaction_block_number = ${x})));|}] - in - let> _ = - [%pgsql - dbh - {|DELETE FROM transaction_trace_calls WHERE transaction_trace_calls_trace_id IN - (SELECT transaction_trace_id FROM transaction_trace WHERE transaction_trace_transaction_id IN - (SELECT transaction_id FROM transactions WHERE transaction_block_number = ${x}));|}] - in - let> _ = - [%pgsql - dbh - {|DELETE FROM transaction_trace WHERE transaction_trace_transaction_id IN - (SELECT transaction_id FROM transactions WHERE transaction_block_number = ${x});|}] - in - Lwt.return_unit) - block_ids - -let reset_db_block_trace () = - Log.log_func_lwt ~category:"reset_db" "reset_db_block_trace" "" @@ fun () -> - use "reset_db_block_trace" None @@ fun dbh -> - let> block_ids = - [%pgsql.object - dbh - {| - SELECT store_block_information_trace_block_number - FROM store_block_information_trace - WHERE store_block_information_trace_pending = TRUE - |}] - in - let block_ids = - List.map (fun x -> x#store_block_information_trace_block_number) block_ids - in - let> _ = reset_db_block_trace_aux block_ids in - Lwt_list.iter_s - (fun x -> - [%pgsql - dbh - {| - DELETE FROM store_block_information_trace WHERE store_block_information_trace_block_number = ${x}; - |}]) - block_ids - -let set_store_block_trace_latest_pending ?dbh ~block_number () = - Log.log_func_lwt ~category:"insert_db" "set_store_block_trace_latest_pending" - (Int64.to_string block_number) - @@ fun () -> - use "set_store_block_trace_latest_pending" dbh @@ fun dbh -> - [%pgsql - dbh - {| - INSERT INTO store_block_information_trace_latest - (store_block_information_trace_latest_block_number, store_block_information_trace_latest_pending,store_block_information_trace_latest_stored) - VALUES (${block_number},TRUE,FALSE) - |}] - -let get_latest_block_trace_latest_stored ?dbh () = - Log.log_func_lwt ~category:"fetch_db" "get_latest_block_trace_stored" "" - @@ fun () -> - use "get_latest_block_trace_stored" dbh @@ fun dbh -> - let> l = - [%pgsql.object - dbh - {| - select MAX(store_block_information_trace_latest_block_number) from store_block_information_trace_latest where store_block_information_trace_latest_stored = true; - |}] - in - match l with - | [] -> Lwt.return_none - | x :: _ -> Lwt.return x#max - -let get_oldest_block_trace_latest_stored ?dbh () = - Log.log_func_lwt ~category:"fetch_db" "get_oldest_block_trace_latest_stored" - "" - @@ fun () -> - use "get_oldest_block_trace_latest_stored" dbh @@ fun dbh -> - let> l = - [%pgsql.object - dbh - {| - select MIN(store_block_information_trace_latest_block_number) from store_block_information_trace_latest where store_block_information_trace_latest_stored = true; - |}] - in - match l with - | [] -> Lwt.return_none - | x :: _ -> Lwt.return x#min - -let set_store_block_trace_latest_finish ?dbh ~block_number () = - Log.log_func_lwt ~category:"update_db" "set_store_block_trace_finish" - (Int64.to_string block_number) - @@ fun () -> - use "set_store_block_trace_finish" dbh @@ fun dbh -> - let> _ = - [%pgsql - dbh - {| - UPDATE store_block_information_trace_latest SET - store_block_information_trace_latest_pending = FALSE , store_block_information_trace_latest_stored = TRUE - WHERE - store_block_information_trace_latest_block_number = $block_number - |}] - in - let> count = - match !block_count_trace_latest_stats with - | None -> ( - let> r = get_latest_block_trace_latest_stored () in - match r with - | None -> Lwt.fail_with "LAST BLOCK TRACE STORED EMPTY" - | Some value -> Lwt.return @@ Int64.to_float value) - | Some block_count_stats_tmp -> Lwt.return @@ (1. +. block_count_stats_tmp) - in - block_count_trace_latest_stats := Some count ; - let _ = - Statistics.set_or_update_gauge block_count_trace_latest_gauge_name count - in - Lwt.return_unit - -let reset_db_block_trace_latest () = - Log.log_func_lwt ~category:"reset_db" "reset_db_block_latest_trace" "" - @@ fun () -> - use "reset_db_block_latest_trace" None @@ fun dbh -> - let> block_ids = - [%pgsql.object - dbh - {| - SELECT store_block_information_trace_latest_block_number - FROM store_block_information_trace_latest - WHERE store_block_information_trace_latest_pending = TRUE - |}] - in - let block_ids = - List.map - (fun x -> x#store_block_information_trace_latest_block_number) - block_ids in - let> _ = reset_db_block_trace_aux block_ids in - Lwt_list.iter_s - (fun x -> - [%pgsql - dbh - {| - DELETE FROM store_block_information_trace_latest WHERE store_block_information_trace_latest_block_number = ${x}; - |}]) - block_ids - -let reset_db_block_stats_aux block_ids = - Log.log_func_lwt ~category:"reset_db" "reset_db_block_stats_aux" "" - @@ fun () -> - use "reset_db_block_stats_aux" None @@ fun dbh -> - Lwt_list.iter_s - (fun x -> - let> _ = - [%pgsql - dbh - {| - DELETE FROM stat_operations WHERE stat_operations_blocks_number = ${x}; - |}] - in - let> _ = - [%pgsql - dbh - {| - DELETE FROM stat_blocks WHERE stat_block_number = ${x}; - |}] - in - - Lwt.return_unit) - block_ids - -let set_store_block_stats_pending ?dbh ~block_number () = - Log.log_func_lwt ~category:"insert_db" "set_store_block_stats_pending" - (Int64.to_string block_number) - @@ fun () -> - use "set_store_block_stats_pending" dbh @@ fun dbh -> - [%pgsql - dbh - {| - INSERT INTO store_block_information_stats - (store_block_information_stats_block_number, store_block_information_stats_pending,store_block_information_stats_stored) - VALUES (${block_number},TRUE,FALSE) - |}] - -let get_latest_block_stats_stored ?dbh () = +let get_latest_block_stats_stored ?dbh ?old () = Log.log_func_lwt ~category:"fetch_db" "get_latest_block_stats_stored" "" @@ fun () -> use "get_latest_block_stats_stored" dbh @@ fun dbh -> - let> l = - [%pgsql.object - dbh - {| - select MAX(store_block_information_stats_block_number) from store_block_information_stats where store_block_information_stats_stored = true; - |}] - in - match l with - | [] -> Lwt.return_none - | x :: _ -> Lwt.return x#max - -let get_oldest_block_stats_stored ?dbh () = - Log.log_func_lwt ~category:"fetch_db" "get_oldest_block_stats_stored" "" - @@ fun () -> - use "get_oldest_block_stats_stored" dbh @@ fun dbh -> - let> l = - [%pgsql.object - dbh - {| - select MIN(store_block_information_stats_block_number) from store_block_information_stats where store_block_information_stats_stored = true; - |}] - in - match l with - | [] -> Lwt.return_none - | x :: _ -> Lwt.return x#min + match old with + | None -> ( + let> l = + [%pgsql.object + dbh + {|select MAX(stat_block_number) from stat_blocks where stat_block_is_stored = true;|}] + in + match l with + | [] -> Lwt.return_none + | x :: _ -> Lwt.return x#max) + | Some () -> ( + let> l = + [%pgsql.object + dbh + {|select MIN(stat_block_number) from stat_blocks where stat_block_is_stored = true;|}] + in + match l with + | [] -> Lwt.return_none + | x :: _ -> Lwt.return x#min) let set_store_block_stats_finish ?dbh ~block_number () = Log.log_func_lwt ~category:"update_db" "set_store_block_trace_finish" @@ -568,10 +162,10 @@ let set_store_block_stats_finish ?dbh ~block_number () = [%pgsql dbh {| - UPDATE store_block_information_stats SET - store_block_information_stats_pending = FALSE , store_block_information_stats_stored = TRUE + UPDATE stat_blocks SET + stat_block_is_stored = TRUE WHERE - store_block_information_stats_block_number = $block_number + stat_block_number = $block_number |}] in let> count = @@ -586,131 +180,3 @@ let set_store_block_stats_finish ?dbh ~block_number () = block_count_stats_stats := Some count ; let _ = Statistics.set_or_update_gauge block_count_stats_gauge_name count in Lwt.return_unit - -let reset_db_block_stats () = - Log.log_func_lwt ~category:"reset_db" "reset_db_block_latest_trace" "" - @@ fun () -> - use "reset_db_block_latest_trace" None @@ fun dbh -> - let> block_ids = - [%pgsql.object - dbh - {| - SELECT store_block_information_stats_block_number - FROM store_block_information_stats - WHERE store_block_information_stats_pending = TRUE - |}] - in - - let block_ids = - List.map (fun x -> x#store_block_information_stats_block_number) block_ids - in - let> _ = reset_db_block_stats_aux block_ids in - Lwt_list.iter_s - (fun x -> - [%pgsql - dbh - {| - DELETE FROM store_block_information_stats WHERE store_block_information_stats_block_number = ${x}; - |}]) - block_ids - -let set_store_block_stats_latest_pending ?dbh ~block_number () = - Log.log_func_lwt ~category:"insert_db" "set_store_block_stats_latest_pending" - (Int64.to_string block_number) - @@ fun () -> - use "set_store_block_stats_latest_pending" dbh @@ fun dbh -> - [%pgsql - dbh - {| - INSERT INTO store_block_information_stats_latest - (store_block_information_stats_latest_block_number, store_block_information_stats_latest_pending,store_block_information_stats_latest_stored) - VALUES (${block_number},TRUE,FALSE) - |}] - -let get_latest_block_stats_latest_stored ?dbh () = - Log.log_func_lwt ~category:"fetch_db" "get_latest_block_stats_stored" "" - @@ fun () -> - use "get_latest_block_stats_stored" dbh @@ fun dbh -> - let> l = - [%pgsql.object - dbh - {| - select MAX(store_block_information_stats_latest_block_number) from store_block_information_stats_latest where store_block_information_stats_latest_stored = true; - |}] - in - match l with - | [] -> Lwt.return_none - | x :: _ -> Lwt.return x#max - -let get_oldest_block_stats_latest_stored ?dbh () = - Log.log_func_lwt ~category:"fetch_db" "get_oldest_block_stats_latest_stored" - "" - @@ fun () -> - use "get_oldest_block_stats_latest_stored" dbh @@ fun dbh -> - let> l = - [%pgsql.object - dbh - {| - select MIN(store_block_information_stats_latest_block_number) from store_block_information_stats_latest where store_block_information_stats_latest_stored = true; - |}] - in - match l with - | [] -> Lwt.return_none - | x :: _ -> Lwt.return x#min - -let set_store_block_stats_latest_finish ?dbh ~block_number () = - Log.log_func_lwt ~category:"update_db" "set_store_block_stats_finish" - (Int64.to_string block_number) - @@ fun () -> - use "set_store_block_stats_finish" dbh @@ fun dbh -> - let> _ = - [%pgsql - dbh - {| - UPDATE store_block_information_stats_latest SET - store_block_information_stats_latest_pending = FALSE , store_block_information_stats_latest_stored = TRUE - WHERE - store_block_information_stats_latest_block_number = $block_number - |}] - in - let> count = - match !block_count_stats_latest_stats with - | None -> ( - let> r = get_latest_block_stats_latest_stored () in - match r with - | None -> Lwt.fail_with "LAST BLOCK TRACE STORED EMPTY" - | Some value -> Lwt.return @@ Int64.to_float value) - | Some block_count_stats_tmp -> Lwt.return @@ (1. +. block_count_stats_tmp) - in - block_count_stats_latest_stats := Some count ; - let _ = - Statistics.set_or_update_gauge block_count_stats_latest_gauge_name count - in - Lwt.return_unit - -let reset_db_block_stats_latest () = - Log.log_func_lwt ~category:"reset_db" "reset_db_block_latest_stats" "" - @@ fun () -> - use "reset_db_block_latest_stats" None @@ fun dbh -> - let> block_ids = - [%pgsql.object - dbh - {| - SELECT store_block_information_stats_latest_block_number - FROM store_block_information_stats_latest - WHERE store_block_information_stats_latest_pending = TRUE - |}] - in - let block_ids = - List.map - (fun x -> x#store_block_information_stats_latest_block_number) - block_ids in - let> _ = reset_db_block_stats_aux block_ids in - Lwt_list.iter_s - (fun x -> - [%pgsql - dbh - {| - DELETE FROM store_block_information_stats_latest WHERE store_block_information_stats_latest_block_number = ${x}; - |}]) - block_ids diff --git a/src/ethereum_indexer/eth_db_psql/eth_db_psql_store_information.mli b/src/ethereum_indexer/eth_db_psql/eth_db_psql_store_information.mli index e990b8df74034641dcc3489ddb2ab82a16a0730e..7029ce0f2a943155344b9d1298c9333400de6690 100644 --- a/src/ethereum_indexer/eth_db_psql/eth_db_psql_store_information.mli +++ b/src/ethereum_indexer/eth_db_psql/eth_db_psql_store_information.mli @@ -1,11 +1,6 @@ -val set_store_block_pending : - ?dbh:EzPG_lwt.PGOCaml.pa_pg_data EzPG_lwt.PGOCaml.t -> - block_number:int64 -> - unit -> - unit Lwt.t - val get_latest_block_stored : ?dbh:EzPG_lwt.PGOCaml.pa_pg_data EzPG_lwt.PGOCaml.t -> + ?old:unit -> unit -> int64 option Lwt.t @@ -15,40 +10,9 @@ val set_store_block_finish : unit -> unit Lwt.t -val reset_db_block : unit -> unit Lwt.t - -val set_store_block_latest_pending : - ?dbh:EzPG_lwt.PGOCaml.pa_pg_data EzPG_lwt.PGOCaml.t -> - block_number:int64 -> - unit -> - unit Lwt.t - -val get_latest_block_latest_stored : - ?dbh:EzPG_lwt.PGOCaml.pa_pg_data EzPG_lwt.PGOCaml.t -> - unit -> - int64 option Lwt.t - -val get_oldest_block_latest_stored : - ?dbh:EzPG_lwt.PGOCaml.pa_pg_data EzPG_lwt.PGOCaml.t -> - unit -> - int64 option Lwt.t - -val set_store_block_latest_finish : - ?dbh:EzPG_lwt.PGOCaml.pa_pg_data EzPG_lwt.PGOCaml.t -> - block_number:int64 -> - unit -> - unit Lwt.t - -val reset_db_block_latest : unit -> unit Lwt.t - -val set_store_block_trace_pending : - ?dbh:EzPG_lwt.PGOCaml.pa_pg_data EzPG_lwt.PGOCaml.t -> - block_number:int64 -> - unit -> - unit Lwt.t - val get_latest_block_trace_stored : ?dbh:EzPG_lwt.PGOCaml.pa_pg_data EzPG_lwt.PGOCaml.t -> + ?old:unit -> unit -> int64 option Lwt.t @@ -58,45 +22,9 @@ val set_store_block_trace_finish : unit -> unit Lwt.t -val reset_db_block_trace : unit -> unit Lwt.t - -val set_store_block_trace_latest_pending : - ?dbh:EzPG_lwt.PGOCaml.pa_pg_data EzPG_lwt.PGOCaml.t -> - block_number:int64 -> - unit -> - unit Lwt.t - -val get_latest_block_trace_latest_stored : - ?dbh:EzPG_lwt.PGOCaml.pa_pg_data EzPG_lwt.PGOCaml.t -> - unit -> - int64 option Lwt.t - -val get_oldest_block_trace_latest_stored : - ?dbh:EzPG_lwt.PGOCaml.pa_pg_data EzPG_lwt.PGOCaml.t -> - unit -> - int64 option Lwt.t - -val set_store_block_trace_latest_finish : - ?dbh:EzPG_lwt.PGOCaml.pa_pg_data EzPG_lwt.PGOCaml.t -> - block_number:int64 -> - unit -> - unit Lwt.t - -val reset_db_block_trace_latest : unit -> unit Lwt.t - -val set_store_block_stats_pending : - ?dbh:EzPG_lwt.PGOCaml.pa_pg_data EzPG_lwt.PGOCaml.t -> - block_number:int64 -> - unit -> - unit Lwt.t - val get_latest_block_stats_stored : ?dbh:EzPG_lwt.PGOCaml.pa_pg_data EzPG_lwt.PGOCaml.t -> - unit -> - int64 option Lwt.t - -val get_oldest_block_stats_stored : - ?dbh:EzPG_lwt.PGOCaml.pa_pg_data EzPG_lwt.PGOCaml.t -> + ?old:unit -> unit -> int64 option Lwt.t @@ -105,29 +33,3 @@ val set_store_block_stats_finish : block_number:int64 -> unit -> unit Lwt.t - -val reset_db_block_stats : unit -> unit Lwt.t - -val set_store_block_stats_latest_pending : - ?dbh:EzPG_lwt.PGOCaml.pa_pg_data EzPG_lwt.PGOCaml.t -> - block_number:int64 -> - unit -> - unit Lwt.t - -val get_latest_block_stats_latest_stored : - ?dbh:EzPG_lwt.PGOCaml.pa_pg_data EzPG_lwt.PGOCaml.t -> - unit -> - int64 option Lwt.t - -val get_oldest_block_stats_latest_stored : - ?dbh:EzPG_lwt.PGOCaml.pa_pg_data EzPG_lwt.PGOCaml.t -> - unit -> - int64 option Lwt.t - -val set_store_block_stats_latest_finish : - ?dbh:EzPG_lwt.PGOCaml.pa_pg_data EzPG_lwt.PGOCaml.t -> - block_number:int64 -> - unit -> - unit Lwt.t - -val reset_db_block_stats_latest : unit -> unit Lwt.t diff --git a/src/ethereum_indexer/eth_main_real_time_store.ml b/src/ethereum_indexer/eth_main_real_time_store.ml index 895c981e4d166640fabf797ad86b816f3d8f1115..fa90c6add079f1d54efdda224f71286f4fada909 100644 --- a/src/ethereum_indexer/eth_main_real_time_store.ml +++ b/src/ethereum_indexer/eth_main_real_time_store.ml @@ -13,17 +13,12 @@ let _ = @@ Format.sprintf "#############\n%s\n###########" @@ Printexc.to_string exn -let store_information_block block is_latest = +let store_information_block block = Log.log_medium ~color:Green @@ Format.sprintf "Start store %d" block.number ; let> result, time = Tools.time_counter_lwt (fun () -> let block_number = block.number in let block_number_64 = Int64.of_int block_number in - let> _ = - if is_latest then - set_store_block_latest_pending ~block_number:block_number_64 () - else - set_store_block_pending ~block_number:block_number_64 () in let> block_receipt_result = Node.get_block_receipts_by_number block_number in match block_receipt_result with @@ -38,37 +33,30 @@ let store_information_block block is_latest = let> _ = add_receipts ~block_receipts () in Log.log_medium @@ Format.sprintf "Block receipts %d stored" block_number ; - let|> _ = - if is_latest then - set_store_block_latest_finish ~block_number:block_number_64 () - else - set_store_block_finish ~block_number:block_number_64 () in + let|> _ = set_store_block_finish ~block_number:block_number_64 () in ()) in Log.log ~color:Green @@ Format.sprintf "Finish store %d execution time %f s" block.number time ; Lwt.return result -let rec store_aux block_number to_block_number is_latest = +let rec store_aux block_number direction = if Handlers.is_execution_stopped () then Lwt.return_unit - else if block_number > to_block_number then - let _ = - Log.log ~color:Blue - @@ Format.sprintf "Wait main chain, block %d for 2 seconds" block_number - in - let> _ = Lwt_unix.sleep 2. in - let> latest_block_number = Node.get_latest_block_number () in - match latest_block_number with - | Error e -> Lwt.fail_with @@ EzEncoding.construct error_enc e - | Ok latest_block_number -> - store_aux block_number latest_block_number is_latest else let> block_r = Node.get_block_by_number block_number in match block_r with - | Error e -> Lwt.fail_with @@ EzEncoding.construct error_enc e + | Error e -> + let _ = Log.log ~color:Red @@ EzEncoding.construct error_enc e in + let _ = + Log.log ~color:Blue + @@ Format.sprintf "Wait fetch from node the block %d for 2 seconds" + block_number in + let> _ = Lwt_unix.sleep 2. in + store_aux block_number direction | Ok block -> - let> _ = store_information_block block is_latest in - store_aux (block_number + 1) to_block_number is_latest + let> _ = store_information_block block in + let next_block_number = block_number + direction in + store_aux next_block_number direction type command = { config : string; @@ -81,7 +69,9 @@ type command = { }|}] [@req] verbose : Common.Types.verbose option; [@conv Log.verbose_of_string] - base_block_number : int option; [@descr "Number of the first block if latest"] + base_block_number : int option; + [@descr "Number of the first block if the database is empty"] + decreasing : bool; statistics : string option; verbose_file : string option; } @@ -111,7 +101,7 @@ let main command = | None -> Lwt.return_unit in match set_node_result with | Error e -> Lwt.fail_with @@ EzEncoding.construct error_enc e - | Ok _ -> ( + | Ok _ -> if Handlers.is_execution_stopped () then Lwt.return_unit else @@ -120,33 +110,37 @@ let main command = let _ = Option.map (fun _ -> Log.close_file ()) command.verbose_file in Handlers.set_stop_execution true) in - let is_latest = Option.is_some command.base_block_number in - let> _ = - if is_latest then - reset_db_block_latest () + let old = + if command.decreasing then + Some () else - reset_db_block () in - let> latest_block_stored = - if is_latest then - get_latest_block_latest_stored () + None in + let direction = + if command.decreasing then + -1 else - get_latest_block_stored () in + 1 in + let> latest_block_stored = get_latest_block_stored ?old () in let base_block_number = Option.value ~default:0 command.base_block_number in let block_number = match latest_block_stored with | None -> base_block_number | Some block_number -> - let block_number = Int64.to_int block_number + 1 in + let block_number = Int64.to_int block_number + direction in Log.log ~color:Blue - @@ Format.sprintf "Latest block stored is %d and start with %d" - block_number base_block_number ; + @@ Format.sprintf + "Latest block stored is %d and start with %d, decreasing %b" + block_number base_block_number command.decreasing ; Int.max base_block_number block_number in - let> latest_block_number = Node.get_latest_block_number () in - match latest_block_number with - | Error e -> Lwt.fail_with @@ EzEncoding.construct error_enc e - | Ok latest_block_number -> - store_aux block_number latest_block_number is_latest) + Log.log ~color:Blue + @@ Format.sprintf "Delete block %d before storing" block_number ; + + let> _ = + Eth_db_psql.Eth_db_psql_raw.delete_block_transaction + ~block_number:(Int64.of_int block_number) + () in + store_aux block_number direction let () = let _ = Lwt_main.run @@ main @@ parse_command () in diff --git a/src/ethereum_indexer/eth_main_real_time_store_stats.ml b/src/ethereum_indexer/eth_main_real_time_store_stats.ml index aefb1b69fb00d48c7661fd04f782434578234cea..10a937989c6829920552bcd536b5493b41fa993a 100644 --- a/src/ethereum_indexer/eth_main_real_time_store_stats.ml +++ b/src/ethereum_indexer/eth_main_real_time_store_stats.ml @@ -15,18 +15,12 @@ let _ = @@ Format.sprintf "#############\n%s\n###########" @@ Printexc.to_string exn -let store_information_block block is_latest = +let store_information_block block = Log.log_medium ~color:Green @@ Format.sprintf "Start store %d" block.number ; let> result, time = Tools.time_counter_lwt (fun () -> let block_number = block.number in let block_number_64 = Int64.of_int block_number in - let> _ = - if is_latest then - set_store_block_stats_latest_pending ~block_number:block_number_64 - () - else - set_store_block_stats_pending ~block_number:block_number_64 () in Log.log_medium @@ Format.sprintf "Get and store transactions decode of %d block" block.number ; @@ -87,37 +81,37 @@ let store_information_block block is_latest = in Log.log_medium @@ Format.sprintf "Stats of %d : stored" block.number ; let|> _ = - if is_latest then - set_store_block_stats_latest_finish ~block_number:block_number_64 - () - else - set_store_block_stats_finish ~block_number:block_number_64 () - in + set_store_block_stats_finish ~block_number:block_number_64 () in ()) in Log.log ~color:Green @@ Format.sprintf "Finish store %d execution time %f s" block.number time ; Lwt.return result -let rec store_aux block_number is_latest = +let rec store_aux block_number latest_block_trace_stored direction old = if Handlers.is_execution_stopped () then Lwt.return_unit else - let> last_trace_block_number_stored = - get_latest_block_trace_latest_stored () in - match last_trace_block_number_stored with - | None -> - Log.log ~color:Red - @@ Format.sprintf "Wait 2 seconde for %s traces " - (Int64.to_string block_number) ; - let open Lwt in - Lwt_unix.sleep 2. >>= fun _ -> store_aux block_number is_latest - | Some lbn when lbn < block_number -> - Log.log ~color:Red - @@ Format.sprintf "Wait 2 seconde for %s traces" - (Int64.to_string block_number) ; - let open Lwt in - Lwt_unix.sleep 2. >>= fun _ -> store_aux block_number is_latest - | _ -> ( + let condition_to_wait lbs = + if direction < Int64.zero then + block_number < lbs + else + block_number > lbs in + if condition_to_wait latest_block_trace_stored then + let> latest_block_trace_stored_opt = + get_latest_block_trace_stored ?old () in + match latest_block_trace_stored_opt with + | None -> Lwt.fail_with "No block stored" + | Some latest_block_trace_stored_2 -> + if condition_to_wait latest_block_trace_stored_2 then + let _ = + Log.log ~color:Red + @@ Format.sprintf "Wait 2 seconde for %s " + (Int64.to_string block_number) in + let> _ = Lwt_unix.sleep 2. in + store_aux block_number latest_block_trace_stored_2 direction old + else + store_aux block_number latest_block_trace_stored_2 direction old + else let> block = get_block_transaction_by_number ~block_number () in match block with | None -> @@ -125,8 +119,10 @@ let rec store_aux block_number is_latest = @@ Format.sprintf "Block %s not stored but traces stored" (Int64.to_string block_number) | Some block -> - let> _ = store_information_block block is_latest in - store_aux (Int64.add block_number Int64.one) is_latest) + let> _ = store_information_block block in + store_aux + (Int64.add block_number direction) + latest_block_trace_stored direction old type command = { config : string; @@ -141,10 +137,11 @@ type command = { verbose : Common.Types.verbose option; [@conv Log.verbose_of_string] base_block_number : int option; [@descr "Number of the first block"] statistics : string option; + decreasing : bool; verbose_file : string option; abis : string option; } -[@@deriving arg { exe = "main_real_time_store_trace_stats_decode.exe" }] +[@@deriving arg { exe = "main_real_time_store_trace_stats.exe" }] let main command = let ch = open_in command.config in @@ -170,8 +167,7 @@ let main command = | None -> Lwt.return_unit in match set_node_result with | Error e -> Lwt.fail_with @@ EzEncoding.construct error_enc e - | Ok _ -> - let is_latest = Option.is_some command.base_block_number in + | Ok _ -> ( let _ = match command.abis with | None -> () @@ -184,22 +180,42 @@ let main command = let _ = Option.map (fun _ -> Log.close_file ()) command.verbose_file in Handlers.set_stop_execution true) in - let> _ = - if is_latest then - reset_db_block_stats_latest () + let old = + if command.decreasing then + Some () else - reset_db_block_stats () in - let> latest_block_stored = - if is_latest then - get_latest_block_stats_latest_stored () + None in + let direction = + if command.decreasing then + Int64.minus_one else - get_latest_block_stats_stored () in - let block_number = - match latest_block_stored with - | None -> - Int64.of_int @@ Option.value ~default:0 command.base_block_number - | Some block_number -> Int64.add block_number Int64.one in - store_aux block_number is_latest + Int64.one in + let> latest_block_trace_stored_opt = + get_latest_block_trace_stored ?old () in + match latest_block_trace_stored_opt with + | None -> Lwt.fail_with "No block stored" + | Some latest_block_trace_stored -> + let> latest_block_stats_stored_opt = + get_latest_block_stats_stored ?old () in + let base_block_number = + Option.value ~default:0 command.base_block_number in + let next_block_number = + match latest_block_stats_stored_opt with + | None -> Int64.of_int base_block_number + | Some latest_block_stats_stored -> + Int64.add latest_block_stats_stored direction in + Log.log ~color:Blue + @@ Format.sprintf + "Latest block stored is %d and start with %d, decreasing %b" + (Int64.to_int next_block_number) + base_block_number command.decreasing ; + Log.log ~color:Blue + @@ Format.sprintf "Delete block %d before storing" + (Int64.to_int next_block_number) ; + let> _ = + Eth_db_psql.Eth_db_psql_stats.delete_block_stats + ~block_number:next_block_number () in + store_aux next_block_number latest_block_trace_stored direction old) let () = let _ = Lwt_main.run @@ main @@ parse_command () in diff --git a/src/ethereum_indexer/eth_main_real_time_store_trace.ml b/src/ethereum_indexer/eth_main_real_time_store_trace.ml index f98bd82a5e1991a297b0230d8e232ed700854d2e..b98404979df459a624ffd58c5a112fab6a91438d 100644 --- a/src/ethereum_indexer/eth_main_real_time_store_trace.ml +++ b/src/ethereum_indexer/eth_main_real_time_store_trace.ml @@ -9,61 +9,59 @@ open Eth_db_psql.Eth_db_psql_store_information let _ = Lwt.async_exception_hook := fun exn -> - Log.log + Format.printf "%s" @@ Format.sprintf "#############\n%s\n###########" @@ Printexc.to_string exn -let store_information_block block is_latest = - Log.log_medium ~color:Green @@ Format.sprintf "Start store %d" block.number ; +let store_information_block block_number = + Log.log_medium ~color:Green @@ Format.sprintf "Start store %d" block_number ; let> result, time = Tools.time_counter_lwt (fun () -> - let block_number = block.number in let block_number_64 = Int64.of_int block_number in - let> _ = - if is_latest then - set_store_block_trace_latest_pending ~block_number:block_number_64 - () - else - set_store_block_trace_pending ~block_number:block_number_64 () in - Log.log_medium - @@ Format.sprintf "Get and store transactions decode of %d block" - block.number ; Log.log_medium - @@ Format.sprintf "Get and store Trace of %d block" block.number ; - let> traces_r = Node.get_traces_of_block block.number in + @@ Format.sprintf "Get and store Trace of %d block" block_number ; + let> traces_r = Node.get_traces_of_block block_number in match traces_r with | Error e -> Lwt.fail_with (EzEncoding.construct Eth.error_enc e) | Ok traces -> let> _ = add_block_traces ~traces ~block_number:block_number_64 () in Log.log_medium @@ Format.sprintf "Get and store Trace of %d block : finished" - block.number ; + block_number ; let|> _ = - if is_latest then - set_store_block_trace_latest_finish ~block_number:block_number_64 - () - else - set_store_block_trace_finish ~block_number:block_number_64 () - in + set_store_block_trace_finish ~block_number:block_number_64 () in ()) in Log.log ~color:Green - @@ Format.sprintf "Finish store %d execution time %f s" block.number time ; + @@ Format.sprintf "Finish store %d execution time %f s" block_number time ; Lwt.return result -let rec store_aux block_number is_latest = +let rec store_aux block_number latest_block_stored direction old = if Handlers.is_execution_stopped () then Lwt.return_unit else - let> block = get_block_transaction_by_number ~block_number () in - match block with - | None -> - Log.log ~color:Red - @@ Format.sprintf "Wait 2 seconde for %s " (Int64.to_string block_number) ; - let open Lwt in - Lwt_unix.sleep 2. >>= fun _ -> store_aux block_number is_latest - | Some block -> - let> _ = store_information_block block is_latest in - store_aux (Int64.add block_number Int64.one) is_latest + let condition_to_wait lbs = + if direction < Int64.zero then + block_number < lbs + else + block_number > lbs in + if condition_to_wait latest_block_stored then + let> latest_block_stored_opt = get_latest_block_stored ?old () in + match latest_block_stored_opt with + | None -> Lwt.fail_with "No block stored" + | Some latest_block_stored_2 -> + if condition_to_wait latest_block_stored_2 then + let _ = + Log.log ~color:Red + @@ Format.sprintf "Wait 2 seconde for %s " + (Int64.to_string block_number) in + let> _ = Lwt_unix.sleep 2. in + store_aux block_number latest_block_stored_2 direction old + else + store_aux block_number latest_block_stored_2 direction old + else + let> _ = store_information_block (Int64.to_int block_number) in + let next_block_number = Int64.add block_number direction in + store_aux next_block_number latest_block_stored direction old type command = { config : string; @@ -77,10 +75,11 @@ type command = { [@req] verbose : Common.Types.verbose option; [@conv Log.verbose_of_string] base_block_number : int option; [@descr "Number of the first block if latest"] + decreasing : bool; statistics : string option; verbose_file : string option; } -[@@deriving arg { exe = "main_real_time_store_trace_stats_decode.exe" }] +[@@deriving arg { exe = "eth_main_real_time_store_trace.exe" }] let main command = let ch = open_in command.config in @@ -106,7 +105,7 @@ let main command = | None -> Lwt.return_unit in match set_node_result with | Error e -> Lwt.fail_with @@ EzEncoding.construct error_enc e - | Ok _ -> + | Ok _ -> ( if Handlers.is_execution_stopped () then Lwt.return_unit else @@ -115,23 +114,40 @@ let main command = let _ = Option.map (fun _ -> Log.close_file ()) command.verbose_file in Handlers.set_stop_execution true) in - let is_latest = Option.is_some command.base_block_number in - let> _ = - if is_latest then - reset_db_block_trace_latest () + let old = + if command.decreasing then + Some () else - reset_db_block_trace () in - let> latest_block_stored = - if is_latest then - get_latest_block_trace_latest_stored () + None in + let direction = + if command.decreasing then + Int64.minus_one else - get_latest_block_trace_stored () in - let block_number = - match latest_block_stored with - | None -> - Int64.of_int @@ Option.value ~default:0 command.base_block_number - | Some block_number -> Int64.add block_number Int64.one in - store_aux block_number is_latest + Int64.one in + let> latest_block_stored_opt = get_latest_block_stored ?old () in + match latest_block_stored_opt with + | None -> Lwt.fail_with "No block stored" + | Some latest_block_stored -> + let> latest_block_trace_stored_opt = + get_latest_block_trace_stored ?old () in + let base_block_number = + Option.value ~default:0 command.base_block_number in + let block_number = + match latest_block_trace_stored_opt with + | None -> Int64.of_int base_block_number + | Some latest_block_trace_stored -> + Int64.add latest_block_trace_stored direction in + Log.log ~color:Blue + @@ Format.sprintf + "Latest block stored is %d and start with %d, decreasing %b" + (Int64.to_int block_number) + base_block_number command.decreasing ; + Log.log ~color:Blue + @@ Format.sprintf "Delete block %d before storing" + (Int64.to_int block_number) ; + let> _ = + Eth_db_psql.Eth_db_psql_raw.delete_block_trace ~block_number () in + store_aux block_number latest_block_stored direction old) let () = let _ = Lwt_main.run @@ main @@ parse_command () in