(* 超来 -- The domain-specific language of remote computation The implementation of the signature CHR to remotely and perhaps speculatively execute sequences of EDSL expressions compiled into the A-normal form *) (* #load "unix.cma";; #load "config.cmo";; #load "wire.cmo";; #load "future.cmo";; *) open Config;; open Wire;; open Rpc_t;; open Future;; (* The CHR sublanguage is typed, at least on the client side *) (* See the comments in rpc_dsl.mli *) module type CHR = sig type 'a chr val unit : unit chr val int : int -> int chr val bool : bool -> bool chr val string : string -> string chr val app : ('a -> 'b) chr -> 'a chr -> 'b chr val app2 : ('a -> 'b -> 'c) chr -> 'a chr -> 'b chr -> 'c chr val lt : (int -> int -> bool) chr (* less than *) val guard : bool chr -> (unit -> 'a chr) -> bool chr val force : 'a chr -> 'a end;; (* The simplest test: factorial server: compute n! remotely *) module type CHRFact = sig include CHR val fact : (int -> int) chr (* Functional constant *) end;; (* The implementation of the signature: batch CHR *) (* It is a _compiler_ for code expressions; a code expression is represented in the A-normal form. *) (* Build the request, the sequence of expressions from the sequence of essentially let-bindings. The request may be regarded as one expression, in the A-normal form *) let build_req jobq : chr_request = List.map (fun (varname,_,chr_op) -> (varname,chr_op)) jobq (* Send all outstanding requests to the server and distribute the responses, overriding variables with local values. We could, and should have used a persistent connection. However, for simplicity and ease of debugging, we establish a connection anew for each request *) let perform_chr jobqueue = (* Record the responses of the server *) let rec set_responses = function | ([],[]) -> () | ([],_) -> failwith "The server returned more responses than expected" | ((_,vset,_)::jqe,[]) -> vset (Err "The server failed to compute the response"); set_responses (jqe,[]) | ((_,vset,_)::jqe,(r::resp)) -> vset r; set_responses (jqe,resp) in let do_req jqe = let requests = build_req jqe in let (cin,cout) = Unix.open_connection chr_connection in let () = Marshal.to_channel cout requests [Marshal.No_sharing] in let () = flush cout in let () = Printf.printf "Waiting for a response\n" in let resp = (Marshal.from_channel cin : chr_response) in let () = close_in cin in set_responses (jqe,resp) in let jqe = List.rev !jobqueue in let () = jobqueue := [] in if jqe = [] then () else do_req jqe ;; module CHRBatch = struct type 'a chr = 'a future_t ref;; let unit = ref (Local ((), unit_typerep)) let int x = ref (Local (x, int_typerep)) let bool x = ref (Local (x, bool_typerep)) let string x = ref (Local (x, string_typerep)) let jobqueue = ref [] let new_chr typrep chr_op = let (varname,vv) = newvar typrep in let v = ref vv in jobqueue := (varname,future_from_wire v,chr_op) :: !jobqueue; v let app f x = let tfun = match !f with | Var (_,trep) -> trep | Tag (_,trep) -> trep | _ -> failwith "Applications are only to futures or remote things" in let tres = arrow_result tfun in new_chr tres (App (future_to_chr_val !f, future_to_chr_val !x)) (* Two-argument application *) let app2 f x y = let tfun = match !f with | Var (_,trep) -> trep | Tag (_,trep) -> trep | _ -> failwith "Applications are only to futures or remote things" in let tres = arrow_result (arrow_result tfun) in new_chr tres (App2 (future_to_chr_val !f, future_to_chr_val !x, future_to_chr_val !y)) let lt : (int->int->bool) chr = ref (Tag ("<", arr_typerep int_typerep (arr_typerep int_typerep bool_typerep))) let guard test exp = let tv = future_to_chr_val !test in (* Since exp will be communicated as an expression and might be left unevaluated, we build the future value in the `recording mode', on a zero jobqueue *) let jq = !jobqueue in let () = jobqueue := [] in let exp_future = !(exp ()) in let jq_of_exp = List.rev !jobqueue in let () = jobqueue := jq in (* restore the jobqueue *) match exp_future with | Local _ -> test (* The guarding expression was a constant *) | Var _ -> new_chr bool_typerep (Guard (tv,build_req jq_of_exp)) | Tag _ -> failwith "Guarded exp: tag is unexpected" | Exc e -> failwith e let force chr = let rec loop first_time = function | Local (x,_) -> x | Var _ -> if first_time then (perform_chr jobqueue; loop false !chr) else failwith "CHR has not determined the future" | Tag _ -> failwith "Tag of remote value has no local representation" | Exc e -> failwith e in loop true !chr end;; (* With the explicit chr type, so that new constants could be added *) module CHRBatchExt = (CHRBatch : CHR with type 'a chr = 'a future_t ref);; module BatchFact = struct include CHRBatchExt let fact : (int->int) chr = ref (Tag ("Factorial", arr_typerep int_typerep int_typerep)) end;;