diff --git a/manifest/product_octez.ml b/manifest/product_octez.ml index da4c45fc390eb50abf05f1f61748c78d3f3e5dc9..7cce8cdc3280284594cfd95ac0ae9699935477de 100644 --- a/manifest/product_octez.ml +++ b/manifest/product_octez.ml @@ -2671,6 +2671,7 @@ let _octez_bees_tests = "test_bees_unit"; "test_bees_unit_eio"; "test_bees_task_worker"; + "test_bees_lwt_async"; ] ~path:"src/lib_bees/test" ~opam:"octez-libs" diff --git a/src/lib_bees/hive.ml b/src/lib_bees/hive.ml index 22cd0c5499585b4b9b2845456dfecea88887ee76..636d565b49d85c6bd0a197aa101116c42a6214b6 100644 --- a/src/lib_bees/hive.ml +++ b/src/lib_bees/hive.ml @@ -45,6 +45,22 @@ let hive = let async_lwt = Eio.Stream.add hive.lwt_tasks_stream +(* Initialize the [lwt_scheduler_loop] by running it in its own domain in the + main Eio switch *) +let () = + let lwt_scheduler_loop () = + let rec loop () : [`Stop_daemon] = + let lwt_closure = Eio.Stream.take hive.lwt_tasks_stream in + (* The loop will run in the [Event_loop] main domain, so [Eio.run_lwt] is + fine. *) + Lwt_eio.run_lwt lwt_closure ; + loop () + in + loop () + in + Tezos_base_unix.Event_loop.on_main_run (fun _env switch -> + Eio.Fiber.fork_daemon ~sw:switch lwt_scheduler_loop) + exception Unknown_worker of string let launch_worker (type worker) ?switch (worker : worker) ~bee_name ~domains diff --git a/src/lib_bees/test/dune b/src/lib_bees/test/dune index 57594127eef1c09de0fb49e2ad8533e067e0ac85..2c0fef0d07c33d07a197a8641fbf0ddb4bd3cf94 100644 --- a/src/lib_bees/test/dune +++ b/src/lib_bees/test/dune @@ -33,7 +33,8 @@ mocked_worker test_bees_unit test_bees_unit_eio - test_bees_task_worker)) + test_bees_task_worker + test_bees_lwt_async)) (executable (name main) diff --git a/src/lib_bees/test/test_bees_lwt_async.ml b/src/lib_bees/test/test_bees_lwt_async.ml new file mode 100644 index 0000000000000000000000000000000000000000..e099b5a46597eb0126685c71cd5704b3112cd3fc --- /dev/null +++ b/src/lib_bees/test/test_bees_lwt_async.ml @@ -0,0 +1,47 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* SPDX-FileCopyrightText: 2025 Nomadic Labs *) +(* *) +(*****************************************************************************) + +(** Testing + ------- + Component: Lib_bees async_lwt + Invocation: dune exec src/lib_bees/test/main.exe \ + -- --file test_bees_lwt_async.ml + Subject: Unit tests for [Lib_bees] async_lwt promise processing +*) + +let test_async_lwt_promise_processing () = + let open Result_syntax in + (* Test that async_lwt processes promises. + This test schedules multiple async Lwt tasks and verifies they all complete. *) + let results = ref 0 in + let num_tasks = 5 in + (* Schedule multiple Lwt tasks *) + for _ = 1 to num_tasks do + Tezos_bees.Hive.async_lwt (fun () -> + incr results ; + Lwt.return_unit) + done ; + (* Give the lwt scheduler time to process all tasks *) + let env = Tezos_base_unix.Event_loop.env_exn () in + Eio.Time.sleep env#clock 1. ; + if !results = num_tasks then return_unit + else + Alcotest.failf + "async_lwt task results mismatch: got %d, expected %d" + !results + num_tasks + +let tests_async_lwt = + ( "Async Lwt", + [ + Alcotezt_process.test_case + "Promise processing (eio handlers)" + `Quick + test_async_lwt_promise_processing; + ] ) + +let () = Alcotezt_process.run ~__FILE__ "Bees async_lwt" [tests_async_lwt]