1 /** 2 Core functionnalities of the RPC framework. 3 4 Copyright: © 2018 Eliott Dumeix 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 */ 7 module rpc.core; 8 9 import std.traits : hasUDA; 10 import vibe.internal.meta.uda : onlyAsUda; 11 12 13 // //////////////////////////////////////////////////////////////////////////// 14 // Attributes // 15 // //////////////////////////////////////////////////////////////////////////// 16 package struct NoRPCMethodAttribute 17 { 18 } 19 20 /// Methods marked with this attribute will not be treated as rpc endpoints. 21 NoRPCMethodAttribute noRpcMethod() @safe 22 { 23 if (!__ctfe) 24 assert(false, onlyAsUda!__FUNCTION__); 25 return NoRPCMethodAttribute(); 26 } 27 28 29 package struct RPCMethodAttribute 30 { 31 string method; 32 } 33 34 /// Methods marked with this attribute will be treated as rpc endpoints. 35 /// Params: 36 /// method = RPC method name 37 RPCMethodAttribute rpcMethod(string method) @safe 38 { 39 if (!__ctfe) 40 assert(false, onlyAsUda!__FUNCTION__); 41 return RPCMethodAttribute(method); 42 } 43 44 /// Allow to specify the id type used by some rpc protocol (like json-rpc 2.0) 45 package struct RPCIdTypeAttribute(T) if (is(T == int) || is(T == string)) 46 { 47 alias idType = T; 48 } 49 alias rpcIdType(T) = RPCIdTypeAttribute!T; 50 51 /// attributes utils 52 private enum IsRPCMethod(alias M) = !hasUDA!(M, NoRPCMethodAttribute); 53 54 /// On a rpc method, when RPCMethodObjectParams.asObject is selected, this 55 /// attribute is used to customize the name rendered for each arg in the params object. 56 package struct RPCMethodObjectParams 57 { 58 string[string] names; 59 } 60 61 RPCMethodObjectParams rpcObjectParams(string[string] names) @safe 62 { 63 if (!__ctfe) 64 assert(false, onlyAsUda!__FUNCTION__); 65 return RPCMethodObjectParams(names); 66 } 67 68 RPCMethodObjectParams rpcObjectParams() @safe 69 { 70 if (!__ctfe) 71 assert(false, onlyAsUda!__FUNCTION__); 72 return RPCMethodObjectParams(); 73 } 74 75 76 /** RPC interface settings. 77 */ 78 class RPCInterfaceSettings 79 { 80 import core.time; 81 82 public: 83 /** Ignores a trailing underscore in method and function names. 84 With this setting set to $(D true), it's possible to use names in the 85 REST interface that are reserved words in D. 86 */ 87 bool stripTrailingUnderscore = true; 88 89 Duration responseTimeout = 500.msecs; 90 91 /** Optional handler used to render custom replies in case of errors. 92 */ 93 RPCErrorHandler errorHandler; 94 } 95 96 alias RPCErrorHandler = void delegate(Exception e) @safe; 97 98 /** Define an id generator. 99 100 Template_Params: 101 TId = The type used to identify rpc request. 102 */ 103 interface IIdGenerator(TId) 104 { 105 TId getNextId() @safe; 106 } 107 108 /** An int id generator. 109 */ 110 class IdGenerator(TId: int): IIdGenerator!TId 111 { 112 private TId _id; 113 114 TId getNextId() @safe 115 { 116 _id++; 117 return _id; 118 } 119 } 120 121 /** A string id generator. 122 */ 123 class IdGenerator(TId: string): IIdGenerator!TId 124 { 125 import std.string : succ; 126 127 private TId _id = "0"; 128 129 TId getNextId() @safe 130 { 131 _id = succ(_id); 132 return _id; 133 } 134 } 135 136 /** An RPC request identified by an id. 137 138 Template_Params: 139 TId = The type used to identify the RPC request. 140 */ 141 interface IRPCRequest(TId) 142 { 143 @property TId requestId(); 144 } 145 146 /// An RPC response. 147 interface IRPCResponse 148 { 149 } 150 151 /** 152 An RPC client working with TRequest and TResponse. 153 154 Template_Params: 155 TId = The type used to identify rpc request. 156 TRequest = Request type, must be an IRPCRequest. 157 TResponse = Reponse type, must be an IRPCResponse. 158 */ 159 interface IRPCClient(TId, TRequest, TResponse) 160 if (is(TRequest: IRPCRequest!TId) && is(TResponse: IRPCResponse)) 161 { 162 import core.time : Duration; 163 164 /// Returns true if the client is connected. 165 @property bool connected() @safe nothrow; 166 167 /// Try to connect the client. 168 bool connect() @safe nothrow; 169 170 /** 171 Send a request and wait a response for the specified timeout. 172 173 Params: 174 request = The request to send. 175 timeout = How mush to wait for a response. 176 177 Throws: 178 Any of RPCException sub-classes. 179 */ 180 TResponse sendRequestAndWait(TRequest request, Duration timeout = Duration.max()) @safe; 181 182 /// Tell to process the input stream once. 183 void tick() @safe; 184 } 185 186 /** 187 A raw rpc client sending TRequest and receiving TResponse object through 188 Input/Output stream. 189 190 Template_Params: 191 TId = The type used to identify rpc request. 192 TRequest = Request type, must be an IRPCRequest. 193 TResponse = Reponse type, must be an IRPCResponse. 194 */ 195 abstract class RawRPCClient(TId, TRequest, TResponse): IRPCClient!(TId, TRequest, TResponse) 196 { 197 import vibe.core.stream: InputStream, OutputStream; 198 199 protected OutputStream _ostream; 200 protected InputStream _istream; 201 202 this(OutputStream ostream, InputStream istream) @safe 203 { 204 _ostream = ostream; 205 _istream = istream; 206 } 207 208 @disable @property bool connected() @safe nothrow { return true; } 209 @disable bool connect() @safe nothrow { return true; } 210 } 211 212 /** 213 Base implementation of an Http RPC client. 214 215 Template_Params: 216 TId = The type used to identify rpc request. 217 TRequest = Request type, must be an IRPCRequest. 218 TResponse = Reponse type, must be an IRPCResponse. 219 */ 220 221 class HttpRPCClient(TId, TRequest, TResponse): IRPCClient!(TId, TRequest, TResponse) 222 { 223 import vibe.data.json; 224 import vibe.http.client; 225 import vibe.stream.operations; 226 import std.conv: to; 227 import vibe.core.log; 228 229 private: 230 string _url; 231 IIdGenerator!TId _idGenerator; 232 TResponse[TId] _pendingResponse; 233 234 public: 235 this(string url) 236 { 237 _url = url; 238 _idGenerator = new IdGenerator!TId(); 239 } 240 241 TResponse sendRequestAndWait(TRequest request, Duration timeout = Duration.max()) @safe 242 { 243 request.id = _idGenerator.getNextId(); 244 245 TResponse reponse; 246 247 requestHTTP(_url, 248 (scope req) { 249 req.method = HTTPMethod.POST; 250 251 req.writeJsonBody(request); 252 logTrace("client request: %s", request); 253 }, 254 (scope res) { 255 if (res.statusCode == 200) 256 { 257 string json = res.bodyReader.readAllUTF8(); 258 logTrace("client response: %s", json); 259 reponse = deserializeJson!TResponse(json); 260 } 261 else 262 { 263 throw new RPCTimeoutException("HTTP " ~ to!string(res.statusCode) ~ ": " ~ res.statusPhrase); 264 } 265 } 266 ); 267 268 return reponse; 269 } 270 271 @disable @property bool connected() { return true; } 272 @disable bool connect() @safe nothrow { return true; } 273 @disable void tick() @safe nothrow { } 274 } 275 276 /** 277 Represent server to client stream. 278 279 Template_Params: 280 TResponse = Reponse type, must be an IRPCResponse. 281 */ 282 interface IRPCServerOutput(TResponse) 283 { 284 void sendResponse(TResponse reponse) @safe; 285 } 286 287 /// A RPC request handler 288 alias RPCRequestHandler(TRequest, TResponse) = void delegate(TRequest req, IRPCServerOutput!TResponse serv) @safe; 289 290 /** An RPC server that can register handler. 291 292 Template_Params: 293 TId = The type used to identify RPC request. 294 TRequest = Request type, must be an IRPCRequest. 295 TResponse = Reponse type, must be an IRPCResponse. 296 */ 297 interface IRPCServer(TId, TRequest, TResponse) 298 if (is(TRequest: IRPCRequest!TId) && is(TResponse: IRPCResponse)) 299 { 300 /** Register a delegate to be called on reception of a request matching 'method'. 301 302 Params: 303 method = The RPC method to match. 304 handler = The delegate to call. 305 */ 306 void registerRequestHandler(string method, RPCRequestHandler!(TRequest, TResponse) handler); 307 308 /** Auto-register all method in an interface. 309 310 Template_Params: 311 TImpl = The interface type. 312 Params: 313 instance = The interface instance. 314 settings = Optional RPC settings. 315 */ 316 void registerInterface(TImpl)(TImpl instance, RPCInterfaceSettings settings = null); 317 318 void tick() @safe; 319 } 320 321 322 abstract class RawRPCServer(TId, TRequest, TResponse): IRPCServer!(TId, TRequest, TResponse) 323 { 324 import vibe.core.stream: InputStream, OutputStream; 325 326 protected OutputStream _ostream; 327 protected InputStream _istream; 328 329 this(OutputStream ostream, InputStream istream) @safe 330 { 331 _ostream = ostream; 332 _istream = istream; 333 } 334 } 335 336 /** An HTTP RPC server. 337 */ 338 class HttpRPCServer(TId, TRequest, TResponse): IRPCServer!(TId, TRequest, TResponse) 339 { 340 import vibe.core.log; 341 import vibe.data.json: Json, parseJson, deserializeJson; 342 import vibe.http.router; 343 import vibe.stream.operations; 344 345 alias RPCRespHandler = IRPCServerOutput!TResponse; 346 alias RequestHandler = RPCRequestHandler!(TRequest, TResponse); 347 348 private: 349 URLRouter _router; 350 RequestHandler[string] _requestHandler; 351 352 public: 353 this(URLRouter router, string path) 354 { 355 _router = router; 356 _router.post(path, &onPostRequest); 357 } 358 359 @disable void registerInterface(I)(I instance, RPCInterfaceSettings settings = null) 360 { 361 } 362 363 void registerRequestHandler(string method, RequestHandler handler) 364 { 365 _requestHandler[method] = handler; 366 } 367 368 protected: 369 /** Handle all HTTP POST request on the RPC route and 370 forward call to the service. 371 */ 372 void onPostRequest(HTTPServerRequest req, HTTPServerResponse res) 373 { 374 string json = req.bodyReader.readAllUTF8(); 375 logTrace("post request received: %s", json); 376 377 this.process(json, new class RPCRespHandler 378 { 379 void sendResponse(TResponse reponse) @safe nothrow 380 { 381 logTrace("post request response: %s", reponse); 382 try { 383 res.writeJsonBody(reponse.toJson()); 384 } catch (Exception e) { 385 logCritical("unable to send response: %s", e.msg); 386 // TODO: add a delgate to allow the user to handle error 387 } 388 } 389 }); 390 } 391 392 void process(string data, RPCRespHandler respHandler) 393 @safe nothrow { 394 try 395 { 396 Json json = parseJson(data); 397 398 void process(Json jsonObject) 399 @safe { 400 auto request = deserializeJson!TRequest(jsonObject); 401 if (request.method in _requestHandler) 402 { 403 _requestHandler[request.method](request, respHandler); 404 } 405 } 406 407 // batch of commands 408 if (json.type == Json.Type.array) 409 { 410 foreach (object; json.byValue) 411 { 412 process(object); 413 } 414 } 415 else 416 { 417 process(json); 418 } 419 } 420 catch (Exception e) 421 { 422 // request parse error, so send a response without id 423 auto response = buildResponseFromException(e); 424 try { 425 respHandler.sendResponse(response); 426 } catch (Exception e) { 427 logCritical("unable to send response: %s", e.msg); 428 // TODO: add a delgate to allow the user to handle error 429 } 430 } 431 } 432 433 abstract TResponse buildResponseFromException(Exception e) @safe nothrow; 434 } 435 436 437 /// Base class for RPC exceptions. 438 class RPCException: Exception { 439 public Exception inner; 440 441 this(string msg, Exception inner = null) 442 @safe { 443 super(msg); 444 this.inner = inner; 445 } 446 } 447 448 /// Client not connected exception 449 class RPCNotConnectedException: RPCException { 450 this(string msg) 451 @safe { 452 super(msg); 453 } 454 } 455 456 /// Parsing exception. 457 class RPCParsingException: RPCException { 458 this(string msg, Exception inner = null) 459 @safe { 460 super(msg, inner); 461 } 462 } 463 464 /// Unhandled RPC method on server-side. 465 class UnhandledRPCMethod: RPCException 466 { 467 this(string msg) 468 @safe { 469 super(msg); 470 } 471 } 472 473 /// RPC call timeout on client-side. 474 class RPCTimeoutException: RPCException 475 { 476 this(string msg) 477 @safe { 478 super(msg); 479 } 480 }