diff --git a/CHANGES.md b/CHANGES.md index 0dee0de88f40f59e6215143558261250e5669d2d..85a0c753d20c41e7350309d99b8b93c4b85d5c21 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -46,6 +46,11 @@ be documented here either. containing the configuration for a custom network, or a URL from which such a file can be downloaded. +- Some RPCs now send their response in chunked transfer encoding. + Additionally, the implementation allows for more concurrency internally: it + allows RPC requests to be treated even if a request is currently being + treated. This leads to some improved response times on some RPC requests. + ## Client - Fixed the return code of errors in the client calls to be non-zero. diff --git a/src/lib_protocol_environment/environment_V0.ml b/src/lib_protocol_environment/environment_V0.ml index 225402a248e25d93b694ea64cbfbb956e2be78bc..0422700c62c2d29999788306f0fa7a0a2c00b9e8 100644 --- a/src/lib_protocol_environment/environment_V0.ml +++ b/src/lib_protocol_environment/environment_V0.ml @@ -433,6 +433,8 @@ struct let return x = Lwt.return (`Ok x) + let return_chunked x = Lwt.return (`OkChunk x) + let return_stream x = Lwt.return (`OkStream x) let not_found = Lwt.return (`Not_found None) @@ -448,7 +450,7 @@ struct handler p q i >>= function | `Ok o -> - RPC_answer.return o + RPC_answer.return_chunked o | `OkStream s -> RPC_answer.return_stream s | `Created s -> diff --git a/src/lib_protocol_environment/environment_V1.ml b/src/lib_protocol_environment/environment_V1.ml index 3db4d284e096b99d079bb07a65dea40d37dd7260..fd8d723c73438bb66e51dfa6f60ab6d4b33888cf 100644 --- a/src/lib_protocol_environment/environment_V1.ml +++ b/src/lib_protocol_environment/environment_V1.ml @@ -620,6 +620,8 @@ struct let return x = Lwt.return (`Ok x) + let return_chunked x = Lwt.return (`OkChunk x) + let return_stream x = Lwt.return (`OkStream x) let not_found = Lwt.return (`Not_found None) @@ -635,7 +637,7 @@ struct handler p q i >>= function | `Ok o -> - RPC_answer.return o + RPC_answer.return_chunked o | `OkStream s -> RPC_answer.return_stream s | `Created s -> diff --git a/src/lib_protocol_environment/environment_V2.ml b/src/lib_protocol_environment/environment_V2.ml index 927c87a95510ed1c1e6e151da2e1602f51753aac..128916cc132842d6bb28df95b60e91156cb9fb5d 100644 --- a/src/lib_protocol_environment/environment_V2.ml +++ b/src/lib_protocol_environment/environment_V2.ml @@ -633,6 +633,8 @@ struct let return x = Lwt.return (`Ok x) + let return_chunked x = Lwt.return (`OkChunk x) + let return_stream x = Lwt.return (`OkStream x) let not_found = Lwt.return (`Not_found None) @@ -648,7 +650,7 @@ struct handler p q i >>= function | `Ok o -> - RPC_answer.return o + RPC_answer.return_chunked o | `OkStream s -> RPC_answer.return_stream s | `Created s -> diff --git a/src/lib_rpc/RPC_answer.ml b/src/lib_rpc/RPC_answer.ml index 06b5044d93539cfab2fd502a3ef92188e0007472..9d1dc54078773045f0cdd7c117b48dcdb6072f34 100644 --- a/src/lib_rpc/RPC_answer.ml +++ b/src/lib_rpc/RPC_answer.ml @@ -26,6 +26,7 @@ (** Return type for service handler *) type 'o t = [ `Ok of 'o (* 200 *) + | `OkChunk of 'o (* 200 *) | `OkStream of 'o stream (* 200 *) | `Created of string option (* 201 *) | `No_content (* 204 *) @@ -43,6 +44,8 @@ and 'a stream = 'a Resto_directory.Answer.stream = { let return x = Lwt.return (`Ok x) +let return_chunked x = Lwt.return (`OkChunk x) + let return_unit = Lwt.return (`Ok ()) let return_stream x = Lwt.return (`OkStream x) diff --git a/src/lib_rpc/RPC_answer.mli b/src/lib_rpc/RPC_answer.mli index b82e4289d8b8b804aafb413f0972fc667ffaeff8..c4c451adad69291ab0942e1cb91aa1ad87def4af 100644 --- a/src/lib_rpc/RPC_answer.mli +++ b/src/lib_rpc/RPC_answer.mli @@ -26,6 +26,7 @@ (** Return type for service handler *) type 'o t = [ `Ok of 'o (* 200 *) + | `OkChunk of 'o (* 200 *) | `OkStream of 'o stream (* 200 *) | `Created of string option (* 201 *) | `No_content (* 204 *) @@ -43,6 +44,8 @@ and 'a stream = 'a Resto_directory.Answer.stream = { val return : 'o -> 'o t Lwt.t +val return_chunked : 'o -> 'o t Lwt.t + val return_unit : unit t Lwt.t val return_stream : 'o stream -> 'o t Lwt.t diff --git a/src/lib_rpc/RPC_directory.ml b/src/lib_rpc/RPC_directory.ml index bba7b8611228653544d1cb184d463fbea629d3b5..38e060275cfa89fcb57ca415587eac8dba1a4d03 100644 --- a/src/lib_rpc/RPC_directory.ml +++ b/src/lib_rpc/RPC_directory.ml @@ -43,6 +43,12 @@ let register dir service handler = handler p q i >>= function Ok o -> RPC_answer.return o | Error e -> RPC_answer.fail e) +let register_chunked dir service handler = + gen_register dir service (fun p q i -> + handler p q i + >>= function + | Ok o -> RPC_answer.return_chunked o | Error e -> RPC_answer.fail e) + let opt_register dir service handler = gen_register dir service (fun p q i -> handler p q i @@ -62,6 +68,8 @@ open Curry let register0 root s f = register root s (curry Z f) +let register0_chunked root s f = register_chunked root s (curry Z f) + let register1 root s f = register root s (curry (S Z) f) let register2 root s f = register root s (curry (S (S Z)) f) diff --git a/src/lib_rpc/RPC_directory.mli b/src/lib_rpc/RPC_directory.mli index 8ae3d1e6f187e6cf119eae81f37469639d66a1da..4be09bf5b4898f0e96a8de7568fe9a2a6fd39002 100644 --- a/src/lib_rpc/RPC_directory.mli +++ b/src/lib_rpc/RPC_directory.mli @@ -36,6 +36,12 @@ val register : ('p -> 'q -> 'i -> 'o tzresult Lwt.t) -> 'prefix directory +val register_chunked : + 'prefix directory -> + ([< Resto.meth], 'prefix, 'p, 'q, 'i, 'o) RPC_service.t -> + ('p -> 'q -> 'i -> 'o tzresult Lwt.t) -> + 'prefix directory + val opt_register : 'prefix directory -> ([< Resto.meth], 'prefix, 'p, 'q, 'i, 'o) RPC_service.t -> @@ -62,6 +68,12 @@ val register0 : ('q -> 'i -> 'o tzresult Lwt.t) -> unit directory +val register0_chunked : + unit directory -> + ('m, unit, unit, 'q, 'i, 'o) RPC_service.t -> + ('q -> 'i -> 'o tzresult Lwt.t) -> + unit directory + val register1 : 'prefix directory -> ('m, 'prefix, unit * 'a, 'q, 'i, 'o) RPC_service.t -> diff --git a/src/lib_rpc_http/media_type.ml b/src/lib_rpc_http/media_type.ml index 4a00501cd1a42d8df5d1485296f3b9dff2254f36..325d8cddc66184278f21b89f15b002805d618967 100644 --- a/src/lib_rpc_http/media_type.ml +++ b/src/lib_rpc_http/media_type.ml @@ -25,6 +25,9 @@ include Resto_cohttp.Media_type.Make (RPC_encoding) +(* emits chunks of size approx chunk_size_hint but occasionally a bit bigger *) +let chunk_size_hint = 4096 + let json = { name = Cohttp.Accept.MediaType ("application", "json"); @@ -47,7 +50,7 @@ let json = @@ Data_encoding.Json.construct enc v); construct_seq = (fun enc v -> - let buffer = Bytes.create 4096 in + let buffer = Bytes.create chunk_size_hint in Data_encoding.Json.blit_instructions_seq_of_jsonm_lexeme_seq ~newline:true ~buffer @@ -68,6 +71,10 @@ let json = } let bson = + let construct enc v = + Bytes.unsafe_to_string @@ Json_repr_bson.bson_to_bytes + @@ Data_encoding.Bson.construct enc v + in { name = Cohttp.Accept.MediaType ("application", "bson"); q = Some 100; @@ -89,16 +96,11 @@ let bson = bson in Data_encoding.Json.pp ppf json); - construct = - (fun enc v -> - Bytes.unsafe_to_string @@ Json_repr_bson.bson_to_bytes - @@ Data_encoding.Bson.construct enc v); + construct; construct_seq = (fun enc v -> - let bytes = - Json_repr_bson.bson_to_bytes @@ Data_encoding.Bson.construct enc v - in - Seq.return (bytes, 0, Bytes.length bytes)); + let s = construct enc v in + Seq.return (Bytes.unsafe_of_string s, 0, String.length s)); destruct = (fun enc body -> match @@ -120,6 +122,9 @@ let bson = } let octet_stream = + let construct enc v = + Bytes.to_string @@ Data_encoding.Binary.to_bytes_exn enc v + in { name = Cohttp.Accept.MediaType ("application", "octet-stream"); q = Some 200; @@ -138,11 +143,11 @@ let octet_stream = ";; binary equivalent of the following json@.%a" Data_encoding.Json.pp (Data_encoding.Json.construct enc v)); - construct = (fun enc v -> Data_encoding.Binary.to_string_exn enc v); + construct; construct_seq = (fun enc v -> - let bytes = Data_encoding.Binary.to_bytes_exn enc v in - Seq.return (bytes, 0, Bytes.length bytes)); + let s = construct enc v in + Seq.return (Bytes.unsafe_of_string s, 0, String.length s)); destruct = (fun enc s -> match Data_encoding.Binary.of_bytes enc (Bytes.of_string s) with