diff --git a/src/lib_gossipsub/gossipsub_intf.ml b/src/lib_gossipsub/gossipsub_intf.ml index 4c3998a5b2b4187092754dd61050681adff288c0..c8051826aeb4735029ea02c91c67627ee319b17b 100644 --- a/src/lib_gossipsub/gossipsub_intf.ml +++ b/src/lib_gossipsub/gossipsub_intf.ml @@ -362,6 +362,10 @@ module type AUTOMATON = sig type value = {message : message; access : int Peer.Map.t} type t = {messages : value Message_id.Map.t} + + (** [get_memory_cache_value message_id state] returns the + cached value for [message_id]. *) + val get_value : Message_id.t -> state -> value option end type view = { diff --git a/src/lib_gossipsub/test/test_gossipsub_shared.ml b/src/lib_gossipsub/test/test_gossipsub_shared.ml index 03e5ca0f52e81394d48ba82515cd016366d89c18..f951ed4b02753c21a462187803d072d699898876 100644 --- a/src/lib_gossipsub/test/test_gossipsub_shared.ml +++ b/src/lib_gossipsub/test/test_gossipsub_shared.ml @@ -44,7 +44,7 @@ module Automaton_config : and type Peer.t = int and type Topic.t = string and type Message_id.t = int - and type Message.t = int = struct + and type Message.t = string = struct module Span = struct type t = int @@ -88,7 +88,7 @@ module Automaton_config : module Peer = Int_iterable module Topic = String_iterable module Message_id = Int_iterable - module Message = Int_iterable + module Message = String_iterable end module C = Automaton_config diff --git a/src/lib_gossipsub/test/test_unit.ml b/src/lib_gossipsub/test/test_unit.ml index 42031184e4076a707b697bf2ae28685919124752..e34d5a20a69f734ad6cea32cfecf49df7acd0933 100644 --- a/src/lib_gossipsub/test/test_unit.ml +++ b/src/lib_gossipsub/test/test_unit.ml @@ -46,6 +46,27 @@ let assert_subscribed_topics ~__LOC__ ~peer ~expected_topics state = ~error_msg:"Expected %R, got %L" ~__LOC__) +let assert_fanout_size ~__LOC__ ~topic ~expected_size state = + let view = GS.Introspection.view state in + let fanout_peers = GS.Introspection.get_fanout_peers topic view in + Check.( + (List.length fanout_peers = expected_size) + int + ~error_msg:"Expected %R, got %L" + ~__LOC__) + +let assert_in_memory_cache ~__LOC__ message_id ~expected_message state = + match GS.Introspection.Memory_cache.get_value message_id state with + | None -> + Test.fail "Expected entry in memory cache for message id %d" message_id + | Some {message; access = _} -> + Check.( + (message = expected_message) + string + ~error_msg:"Expected %R, got %L" + ~__LOC__) ; + unit + let many_peers limits = (4 * limits.degree_optimal) + 1 let make_peers ~number = @@ -85,6 +106,10 @@ let init_state ~rng ~limits ~parameters ~peers ~topics ?(outbound : C.Peer.t -> bool = fun _ -> false) ~(to_subscribe : C.Peer.t * C.Topic.t -> bool) () = let state = GS.make rng limits parameters in + (* Add and subscribe the given peers. *) + let state = + add_and_subscribe_peers topics peers ~to_subscribe ~direct ~outbound state + in (* Join to the given topics. *) let state = List.fold_left @@ -96,10 +121,6 @@ let init_state ~rng ~limits ~parameters ~peers ~topics state topics in - (* Add and subscribe the given peers. *) - let state = - add_and_subscribe_peers topics peers ~to_subscribe ~direct ~outbound state - in state (** Test that grafting an unknown topic is ignored. @@ -294,7 +315,7 @@ let test_join_adds_fanout_to_mesh rng limits parameters = We did not join the topic so the peers should be added to the fanout map.*) let state, _ = GS.publish - {sender = None; topic = "topic0"; message_id = 0; message = 0} + {sender = None; topic = "topic0"; message_id = 0; message = "message"} state in (* Check that all [init_peers] have been added to the fanout. *) @@ -356,8 +377,103 @@ let test_join_adds_fanout_to_mesh rng limits parameters = (List.length fanout_peers = 0) int ~error_msg:"Expected %R, got %L" ~__LOC__) ; unit +(** Tests that publishing to a subscribed topic: + - Returns peers to publish to. + - Inserts message into memory cache. + + Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L629 +*) +let test_publish_without_flood_publishing rng limits parameters = + Tezt_core.Test.register + ~__FILE__ + ~title:"Gossipsub: Test publish without flood publishing" + ~tags:["gossipsub"; "publish"] + @@ fun () -> + let topic = "test_publish" in + let peers = make_peers ~number:(many_peers limits) in + let state = + init_state + ~rng + ~limits + ~parameters + ~peers + ~topics:[topic] + ~to_join:(fun _ -> false) + ~to_subscribe:(fun _ -> true) + () + in + let publish_data = "some_data" in + let message_id = 0 in + (* Publish to a joined topic. *) + let state, Publish_message peers_to_publish = + GS.publish {sender = None; topic; message_id; message = publish_data} state + in + (* Should return [degree_optimal] peers to publish to. *) + Check.( + (C.Peer.Set.cardinal peers_to_publish = limits.degree_optimal) + int + ~error_msg:"Expected %R, got %L" + ~__LOC__) ; + (* [message_id] should be added to the memory cache. *) + assert_in_memory_cache + ~__LOC__ + message_id + ~expected_message:publish_data + state + +(** Tests that publishing to an unsubscribed topic: + - Populate fanout peers. + - Return peers to publish to. + - Inserts message into the memory cache. + + Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L715 +*) +let test_fanout rng limits parameters = + Tezt_core.Test.register + ~__FILE__ + ~title:"Gossipsub: Test fanout" + ~tags:["gossipsub"; "publish"; "fanout"] + @@ fun () -> + let topic = "topic" in + let peers = make_peers ~number:(many_peers limits) in + let state = + init_state + ~rng + ~limits + ~parameters + ~peers + ~topics:[topic] + ~to_join:(fun _ -> false) + ~to_subscribe:(fun _ -> true) + () + in + (* Leave the topic. *) + let state, _ = GS.leave {topic} state in + (* Publish to the topic we left. *) + let publish_data = "some data" in + let message_id = 0 in + let state, Publish_message peers_to_publish = + GS.publish {sender = None; topic; message_id; message = publish_data} state + in + (* Fanout should contain [degree_optimal] peers. *) + assert_fanout_size ~__LOC__ ~topic ~expected_size:limits.degree_optimal state ; + (* Should return [degree_optimal] peers to publish to. *) + Check.( + (C.Peer.Set.cardinal peers_to_publish = limits.degree_optimal) + int + ~error_msg:"Expected %R, got %L" + ~__LOC__) ; + (* [message_id] should be added to the memory cache. *) + assert_in_memory_cache + ~__LOC__ + message_id + ~expected_message:publish_data + state + let register rng limits parameters = test_ignore_graft_from_unknown_topic rng limits parameters ; test_handle_received_subscriptions rng limits parameters ; test_join_adds_peers_to_mesh rng limits parameters ; - test_join_adds_fanout_to_mesh rng limits parameters + test_join_adds_fanout_to_mesh rng limits parameters ; + test_publish_without_flood_publishing rng limits parameters ; + test_fanout rng limits parameters diff --git a/src/lib_gossipsub/tezos_gossipsub.ml b/src/lib_gossipsub/tezos_gossipsub.ml index 2dba3da8e962651755d3240b372ef59dc2e2f5f0..8ae43172aa7f68c28cc2065c2bde0ceb9a91b9ed 100644 --- a/src/lib_gossipsub/tezos_gossipsub.ml +++ b/src/lib_gossipsub/tezos_gossipsub.ml @@ -811,7 +811,7 @@ module Make (C : AUTOMATON_CONFIG) : let filter peer ({direct; _} as connection) = filter peer connection || direct in - select_peers topic ~filter ~max:Int.max_int + select_peers topic ~filter ~max:degree_optimal | Some fanout -> let* () = set_fanout_topic topic now fanout.peers in return fanout.peers @@ -1453,7 +1453,13 @@ module Make (C : AUTOMATON_CONFIG) : last_published_time : time; } - module Memory_cache = Memory_cache + module Memory_cache = struct + include Memory_cache + + let get_value message_id state = + Message_id.Map.find_opt message_id state.memory_cache.messages + |> Option.map (fun Memory_cache.{message; access} -> {message; access}) + end type view = state = { limits : limits;