1 /**
2 	Core functionnalities of the RPC framework.
3 
4 	Copyright: © 2018 Eliott Dumeix
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 */
7 module rpc.core;
8 
9 import std.traits : hasUDA;
10 import vibe.internal.meta.uda : onlyAsUda;
11 
12 
13 // ////////////////////////////////////////////////////////////////////////////
14 // Attributes																 //
15 // ////////////////////////////////////////////////////////////////////////////
16 package struct NoRPCMethodAttribute
17 {
18 }
19 
20 /// Methods marked with this attribute will not be treated as rpc endpoints.
21 NoRPCMethodAttribute noRpcMethod() @safe
22 {
23 	if (!__ctfe)
24 		assert(false, onlyAsUda!__FUNCTION__);
25 	return NoRPCMethodAttribute();
26 }
27 
28 
29 package struct RPCMethodAttribute
30 {
31     string method;
32 }
33 
34 /// Methods marked with this attribute will be treated as rpc endpoints.
35 /// Params:
36 ///     method = RPC method name
37 RPCMethodAttribute rpcMethod(string method) @safe
38 {
39 	if (!__ctfe)
40 		assert(false, onlyAsUda!__FUNCTION__);
41 	return RPCMethodAttribute(method);
42 }
43 
44 /// Allow to specify the id type used by some rpc protocol (like json-rpc 2.0)
45 package struct RPCIdTypeAttribute(T) if (is(T == int) || is(T == string))
46 {
47 	alias idType = T;
48 }
49 alias rpcIdType(T) = RPCIdTypeAttribute!T;
50 
51 /// attributes utils
52 private enum IsRPCMethod(alias M) = !hasUDA!(M, NoRPCMethodAttribute);
53 
54 /// On a rpc method, when RPCMethodObjectParams.asObject is selected, this
55 /// attribute is used to customize the name rendered for each arg in the params object.
56 package struct RPCMethodObjectParams
57 {
58     string[string] names;
59 }
60 
61 RPCMethodObjectParams rpcObjectParams(string[string] names) @safe
62 {
63 	if (!__ctfe)
64 		assert(false, onlyAsUda!__FUNCTION__);
65 	return RPCMethodObjectParams(names);
66 }
67 
68 RPCMethodObjectParams rpcObjectParams() @safe
69 {
70 	if (!__ctfe)
71 		assert(false, onlyAsUda!__FUNCTION__);
72 	return RPCMethodObjectParams();
73 }
74 
75 
76 /** RPC interface settings.
77 */
78 class RPCInterfaceSettings
79 {
80     import core.time;
81 
82 public:
83 	/** Ignores a trailing underscore in method and function names.
84 		With this setting set to $(D true), it's possible to use names in the
85 		REST interface that are reserved words in D.
86 	*/
87 	bool stripTrailingUnderscore = true;
88 
89 	Duration responseTimeout = 500.msecs;
90 
91 	/** Optional handler used to render custom replies in case of errors.
92 	*/
93 	RPCErrorHandler errorHandler;
94 }
95 
96 alias RPCErrorHandler = void delegate(Exception e) @safe;
97 
98 /** Define an id generator.
99 
100 	Template_Params:
101 		TId = The type used to identify rpc request.
102 */
103 interface IIdGenerator(TId)
104 {
105 	TId getNextId() @safe;
106 }
107 
108 /** An int id generator.
109 */
110 class IdGenerator(TId: int): IIdGenerator!TId
111 {
112 	private TId _id;
113 
114 	TId getNextId() @safe
115 	{
116 		_id++;
117 		return _id;
118 	}
119 }
120 
121 /** A string id generator.
122 */
123 class IdGenerator(TId: string): IIdGenerator!TId
124 {
125 	import std.string : succ;
126 
127 	private TId _id = "0";
128 
129 	TId getNextId() @safe
130 	{
131 		_id = succ(_id);
132 		return _id;
133 	}
134 }
135 
136 /** An RPC request identified by an id.
137 
138 	Template_Params:
139 		TId = The type used to identify the RPC request.
140 */
141 interface IRPCRequest(TId)
142 {
143 	@property TId requestId();
144 }
145 
146 /// An RPC response.
147 interface IRPCResponse
148 {
149 }
150 
151 /**
152 	An RPC client working with TRequest and TResponse.
153 
154 	Template_Params:
155 		TId = The type used to identify rpc request.
156 		TRequest = Request type, must be an IRPCRequest.
157 		TResponse = Reponse type, must be an IRPCResponse.
158 */
159 interface IRPCClient(TId, TRequest, TResponse)
160 	if (is(TRequest: IRPCRequest!TId) && is(TResponse: IRPCResponse))
161 {
162     import core.time : Duration;
163 
164 	/// Returns true if the client is connected.
165 	@property bool connected() @safe nothrow;
166 
167 	/// Try to connect the client.
168 	bool connect() @safe nothrow;
169 
170 	/**
171 		Send a request and wait a response for the specified timeout.
172 
173 		Params:
174 			request = The request to send.
175 			timeout = How mush to wait for a response.
176 
177 		Throws:
178 			Any of RPCException sub-classes.
179 	*/
180 	TResponse sendRequestAndWait(TRequest request, Duration timeout = Duration.max()) @safe;
181 
182 	/// Tell to process the input stream once.
183 	void tick() @safe;
184 }
185 
186 /**
187 	A raw rpc client sending TRequest and receiving TResponse object through
188 	Input/Output stream.
189 
190 	Template_Params:
191 		TId = The type used to identify rpc request.
192 		TRequest = Request type, must be an IRPCRequest.
193 		TResponse = Reponse type, must be an IRPCResponse.
194 */
195 abstract class RawRPCClient(TId, TRequest, TResponse): IRPCClient!(TId, TRequest, TResponse)
196 {
197     import vibe.core.stream: InputStream, OutputStream;
198 
199 	protected OutputStream _ostream;
200 	protected InputStream _istream;
201 
202 	this(OutputStream ostream, InputStream istream) @safe
203 	{
204 		_ostream = ostream;
205 		_istream = istream;
206 	}
207 
208 	@disable @property bool connected() @safe nothrow { return true; }
209 	@disable bool connect() @safe nothrow { return true; }
210 }
211 
212 /**
213 	Base implementation of an Http RPC client.
214 
215 	Template_Params:
216 		TId = The type used to identify rpc request.
217 		TRequest = Request type, must be an IRPCRequest.
218 		TResponse = Reponse type, must be an IRPCResponse.
219 */
220 
221 class HttpRPCClient(TId, TRequest, TResponse): IRPCClient!(TId, TRequest, TResponse)
222 {
223     import vibe.data.json;
224     import vibe.http.client;
225     import vibe.stream.operations;
226     import std.conv: to;
227 	import vibe.core.log;
228 
229 private:
230     string _url;
231     IIdGenerator!TId _idGenerator;
232     TResponse[TId] _pendingResponse;
233 
234 public:
235     this(string url)
236     {
237         _url = url;
238         _idGenerator = new IdGenerator!TId();
239     }
240 
241     TResponse sendRequestAndWait(TRequest request, Duration timeout = Duration.max()) @safe
242     {
243         request.id = _idGenerator.getNextId();
244 
245         TResponse reponse;
246 
247         requestHTTP(_url,
248             (scope req) {
249                 req.method = HTTPMethod.POST;
250 
251                 req.writeJsonBody(request);
252                 logTrace("client request: %s", request);
253             },
254             (scope res) {
255                 if (res.statusCode == 200)
256                 {
257                     string json = res.bodyReader.readAllUTF8();
258                     logTrace("client response: %s", json);
259                     reponse = deserializeJson!TResponse(json);
260                 }
261                 else
262                 {
263                     throw new RPCTimeoutException("HTTP " ~ to!string(res.statusCode) ~ ": " ~ res.statusPhrase);
264                 }
265             }
266         );
267 
268         return reponse;
269     }
270 
271 	@disable @property bool connected() { return true; }
272     @disable bool connect() @safe nothrow { return true; }
273     @disable void tick() @safe nothrow { }
274 }
275 
276 /**
277 	Represent server to client stream.
278 
279 	Template_Params:
280 		TResponse = Reponse type, must be an IRPCResponse.
281 */
282 interface IRPCServerOutput(TResponse)
283 {
284 	void sendResponse(TResponse reponse) @safe;
285 }
286 
287 /// A RPC request handler
288 alias RPCRequestHandler(TRequest, TResponse) = void delegate(TRequest req, IRPCServerOutput!TResponse serv) @safe;
289 
290 /** An RPC server that can register handler.
291 
292 	Template_Params:
293 		TId = The type used to identify RPC request.
294 		TRequest = Request type, must be an IRPCRequest.
295 		TResponse = Reponse type, must be an IRPCResponse.
296 */
297 interface IRPCServer(TId, TRequest, TResponse)
298 	if (is(TRequest: IRPCRequest!TId) && is(TResponse: IRPCResponse))
299 {
300 	/** Register a delegate to be called on reception of a request matching 'method'.
301 
302 		Params:
303 			method = The RPC method to match.
304 			handler = The delegate to call.
305 	*/
306 	void registerRequestHandler(string method, RPCRequestHandler!(TRequest, TResponse) handler);
307 
308 	/** Auto-register all method in an interface.
309 
310 		Template_Params:
311 			TImpl = The interface type.
312 		Params:
313 			instance = The interface instance.
314 			settings = Optional RPC settings.
315 	*/
316 	void registerInterface(TImpl)(TImpl instance, RPCInterfaceSettings settings = null);
317 
318 	void tick() @safe;
319 }
320 
321 
322 abstract class RawRPCServer(TId, TRequest, TResponse): IRPCServer!(TId, TRequest, TResponse)
323 {
324     import vibe.core.stream: InputStream, OutputStream;
325 
326 	protected OutputStream _ostream;
327 	protected InputStream _istream;
328 
329 	this(OutputStream ostream, InputStream istream) @safe
330 	{
331 		_ostream = ostream;
332 		_istream = istream;
333 	}
334 }
335 
336 /** An HTTP RPC server.
337 */
338 class HttpRPCServer(TId, TRequest, TResponse): IRPCServer!(TId, TRequest, TResponse)
339 {
340 	import vibe.core.log;
341 	import vibe.data.json: Json, parseJson, deserializeJson;
342     import vibe.http.router;
343     import vibe.stream.operations;
344 
345     alias RPCRespHandler = IRPCServerOutput!TResponse;
346 	alias RequestHandler = RPCRequestHandler!(TRequest, TResponse);
347 
348 private:
349     URLRouter _router;
350     RequestHandler[string] _requestHandler;
351 
352 public:
353     this(URLRouter router, string path)
354     {
355         _router = router;
356         _router.post(path, &onPostRequest);
357     }
358 
359     @disable void registerInterface(I)(I instance, RPCInterfaceSettings settings = null)
360     {
361     }
362 
363     void registerRequestHandler(string method, RequestHandler handler)
364     {
365         _requestHandler[method] = handler;
366     }
367 
368 protected:
369     /** Handle all HTTP POST request on the RPC route and
370         forward call to the service.
371     */
372     void onPostRequest(HTTPServerRequest req, HTTPServerResponse res)
373     {
374         string json = req.bodyReader.readAllUTF8();
375         logTrace("post request received: %s", json);
376 
377         this.process(json, new class RPCRespHandler
378         {
379             void sendResponse(TResponse reponse) @safe nothrow
380             {
381                 logTrace("post request response: %s", reponse);
382                 try {
383                     res.writeJsonBody(reponse.toJson());
384                 } catch (Exception e) {
385                     logCritical("unable to send response: %s", e.msg);
386                     // TODO: add a delgate to allow the user to handle error
387                 }
388             }
389         });
390     }
391 
392     void process(string data, RPCRespHandler respHandler)
393     @safe nothrow {
394         try
395         {
396             Json json = parseJson(data);
397 
398             void process(Json jsonObject)
399             @safe {
400                 auto request = deserializeJson!TRequest(jsonObject);
401                 if (request.method in _requestHandler)
402                 {
403                     _requestHandler[request.method](request, respHandler);
404                 }
405             }
406 
407             // batch of commands
408             if (json.type == Json.Type.array)
409             {
410                 foreach (object; json.byValue)
411                 {
412                     process(object);
413                 }
414             }
415             else
416             {
417                 process(json);
418             }
419         }
420         catch (Exception e)
421         {
422             // request parse error, so send a response without id
423 			auto response = buildResponseFromException(e);
424 			try {
425                 respHandler.sendResponse(response);
426             } catch (Exception e) {
427                 logCritical("unable to send response: %s", e.msg);
428                 // TODO: add a delgate to allow the user to handle error
429             }
430         }
431     }
432 
433 	abstract TResponse buildResponseFromException(Exception e) @safe nothrow;
434 }
435 
436 
437 /// Base class for RPC exceptions.
438 class RPCException: Exception {
439 	public Exception inner;
440 
441 	this(string msg, Exception inner = null)
442 	@safe {
443 		super(msg);
444 		this.inner = inner;
445 	}
446 }
447 
448 /// Client not connected exception
449 class RPCNotConnectedException: RPCException {
450 	this(string msg)
451 	@safe {
452 		super(msg);
453 	}
454 }
455 
456 /// Parsing exception.
457 class RPCParsingException: RPCException {
458 	this(string msg, Exception inner = null)
459 	@safe {
460 		super(msg, inner);
461 	}
462 }
463 
464 /// Unhandled RPC method on server-side.
465 class UnhandledRPCMethod: RPCException
466 {
467     this(string msg)
468     @safe {
469         super(msg);
470     }
471 }
472 
473 /// RPC call timeout on client-side.
474 class RPCTimeoutException: RPCException
475 {
476     this(string msg)
477     @safe {
478         super(msg);
479     }
480 }