diff --git a/src/lib_bees/hive.ml b/src/lib_bees/hive.ml index 636d565b49d85c6bd0a197aa101116c42a6214b6..4cf740426ddb3d3214db97ba94c20eac29b95cdc 100644 --- a/src/lib_bees/hive.ml +++ b/src/lib_bees/hive.ml @@ -37,10 +37,16 @@ type t = { lwt_tasks_stream : (unit -> unit Lwt.t) Eio.Stream.t; } +(* This arbitrary value aims to limit the number of elements that can be + accumulated in the [lwt_tasks_stream] closure buffer. The upper bound is not + expected to be reached but it is set to high value to avoid Eio.Stream.push + to be blocking if reached at some point. *) +let tasks_stream_max_size = 16_384 + let hive = { workers = WorkerTbl.create ~initial_size:64; - lwt_tasks_stream = Eio.Stream.create max_int; + lwt_tasks_stream = Eio.Stream.create tasks_stream_max_size; } let async_lwt = Eio.Stream.add hive.lwt_tasks_stream diff --git a/src/lib_bees/hive.mli b/src/lib_bees/hive.mli index 598f0b2a4de24715eb757db5091dd7fc8088b3b3..a953bf9faa215393f3932ea55112ecae0a1cf14c 100644 --- a/src/lib_bees/hive.mli +++ b/src/lib_bees/hive.mli @@ -35,7 +35,11 @@ val get_error : string -> exn option If the closure raises an exception, it will break the internal lwt loop and prevent any subsequent asynchronous call to be executed. It's consequently advised to handle exceptions directly in the closure and return - a [result]. *) + a [result]. + + Warning: Promises added to this loop are expected to perform only very short + computations (typically just sending events). Longer computations may + congest promise scheduling and cause blocking [async_lwt] calls.*) val async_lwt : (unit -> unit Lwt.t) -> unit (** Schedule [f] to run on the Event_loop main switch and return its result.