diff --git a/src/bin_dac_node/data_streamer.ml b/src/bin_dac_node/data_streamer.ml index ec2bf0f7db221e59e8e813a56753f4f0b3fd9c44..85c07abcb56065731a7aa19fa89c025950a08c18 100644 --- a/src/bin_dac_node/data_streamer.ml +++ b/src/bin_dac_node/data_streamer.ml @@ -23,32 +23,20 @@ (* *) (*****************************************************************************) -(* Component for streaming published root page hashes to subscribers. *) - -module type S = sig - (* [publish hash] publishes a root page [hash] to all attached subscribers. *) - val publish : Dac_hash.t -> unit tzresult Lwt.t +(** FIXME: https://gitlab.com/tezos/tezos/-/issues/4740 + Implement a useful Root_hash_streamer +*) +module Root_hash_streamer = struct + type t = unit - (* FIXME: https://gitlab.com/tezos/tezos/-/issues/4680 - Access Dac coordinator details via some [Dac_client.cctx]. - *) + type configuration = unit - (* [subscribe coordinator_host coordinator_port] returns an Lwt_stream of hashes and - a function to close the stream. - *) - val subscribe : - coordinator_host:string -> - coordinator_port:int -> - Dac_hash.t Lwt_stream.t * Lwt_watcher.stopper -end + let init (_configuration : configuration) = () -(* TODO: https://gitlab.com/tezos/tezos/-/issues/4510 - Implement useful streaming. -*) -module Make (P : Dac_plugin.T) : S = struct - let publish _hash = Lwt_result_syntax.return_unit + let publish (_streamer : t) (_hash : Dac_hash.t) = + Lwt_result_syntax.return_unit - let subscribe ~coordinator_host ~coordinator_port = - let _unused = (coordinator_host, coordinator_port) in - Lwt_watcher.create_fake_stream () + let make_subscription (_streamer : t) : + (Dac_hash.t Lwt_stream.t * Lwt_watcher.stopper) tzresult Lwt.t = + Lwt_result_syntax.return @@ Lwt_watcher.create_fake_stream () end diff --git a/src/bin_dac_node/data_streamer.mli b/src/bin_dac_node/data_streamer.mli index e36b06163f86807e3224ae4fb0398a6beb374463..acd87a9764c3130be94649796361acdf4ee7d7a5 100644 --- a/src/bin_dac_node/data_streamer.mli +++ b/src/bin_dac_node/data_streamer.mli @@ -23,23 +23,28 @@ (* *) (*****************************************************************************) -(* Component for streaming published root page hashes to subscribers. *) +(** [Root_hash_streamer] manages the pub-sub mechanism for streaming root + page hashes from publishers to subscribers. Root hash refers to the + root hash of the DAC payload Merkle tree. +*) +module Root_hash_streamer : sig + type t -module type S = sig - (* [publish hash] publishes a root page [hash] to all attached subscribers. *) - val publish : Dac_hash.t -> unit tzresult Lwt.t + (* Streamer configuration. *) + type configuration - (* FIXME: https://gitlab.com/tezos/tezos/-/issues/4680 - Access Dac coordinator details via some [Dac_client.cctx]. - *) + (** Initializes a [Root_hash_streamer.t] *) + val init : configuration -> t + + (** [publish streamer root_hash] publishes a [root_hash] to all attached + subscribers in [streamer]. + *) + val publish : t -> Dac_hash.t -> unit tzresult Lwt.t - (* [subscribe coordinator_host coordinator_port] returns an Lwt_stream of hashes and - a function to close the stream. + (** [make_subscription streamer] returns a new stream of hashes for the subscriber to + consume. An [Lwt_watcher.stopper] function is also returned for the + subscriber to close the stream. *) - val subscribe : - coordinator_host:string -> - coordinator_port:int -> - Dac_hash.t Lwt_stream.t * Lwt_watcher.stopper + val make_subscription : + t -> (Dac_hash.t Lwt_stream.t * Lwt_watcher.stopper) tzresult Lwt.t end - -module Make (P : Dac_plugin.T) : S