Hi,
I have been for a long time on the blogger web site, but have found that worldpress is better.
So if you want to continue to follow my posts see me at:
https://chaimturkel.wordpress.com/
https://chaimturkel.wordpress.com/category/data-platform/
Java Android and rest
Tuesday, November 1, 2016
Tuesday, September 13, 2016
MVC RXJava
MVC RXJava
Use Case
My backend server externalizes an API that goes to another server to get information. I would like to use the react pattern as much as possible so that i do not block on threads. The standard HttpClient creates a new thread and blocks on it for the result. HttpClient does support async callback methods, but to make our code simpler we do not what to use the callbacks but would like to wrap all of the in the RX Framework.
RXJava
If you don’t know yet what RXJava is the go ahead and have a look at:
ObservableHttpResponse
There are a few different options to choose from, so go have a look at what is out there. I choose RxApacheHttp for it’s simplicity. Have a look at: https://github.com/ReactiveX/RxApacheHttp
Simple get example:
Of Course you need to define your basic connection parameters and options:
final RequestConfig requestConfig = RequestConfig.custom()
.setSocketTimeout(3000)
.setConnectTimeout(500).build();
final CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom()
.setDefaultRequestConfig(requestConfig)
.setMaxConnPerRoute(20)
.setMaxConnTotal(50)
.build();
.setSocketTimeout(3000)
.setConnectTimeout(500).build();
final CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom()
.setDefaultRequestConfig(requestConfig)
.setMaxConnPerRoute(20)
.setMaxConnTotal(50)
.build();
Then for a simple get:
ObservableHttp.createRequest(HttpAsyncMethods.createGet("http://www.wikipedia.com"), client)
.toObservable()
.flatMap({ ObservableHttpResponse response ->
return response.getContent().map({ byte[] bb ->
return new String(bb);
});
})
.toBlockingObservable()
.forEach({ String resp ->
// this will be invoked once with the response
println(resp);
});
.toObservable()
.flatMap({ ObservableHttpResponse response ->
return response.getContent().map({ byte[] bb ->
return new String(bb);
});
})
.toBlockingObservable()
.forEach({ String resp ->
// this will be invoked once with the response
println(resp);
});
Post Request
My case was to send a post request with a headers and a json payload.
public Observable<ObservableHttpResponse> sendSuspenedRequest(long id){
JSONObject object = new JSONObject();
object.put("id",id);
object.put("date",wooFormat.print(today, Locale.ENGLISH));
final CloseableHttpAsyncClient httpClient = buildClient();
httpClient.start();
StringEntity entity = new StringEntity(object.toString(), StandardCharsets.UTF_8);
entity.setContentType("application/json");
final HttpAsyncRequestProducer post = HttpAsyncMethods.createPost(url + suspendedUrlPath, object.toString(), ContentType.APPLICATION_JSON);
((BasicAsyncRequestProducer)request).generateRequest().setHeader(HttpHeaders.AUTHORIZATION, "Token " + authorizationKey);
((BasicAsyncRequestProducer)request).generateRequest().setHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
return ObservableHttp.createRequest(post,httpClient).toObservable();
}
As you can see the headers were a bit tricky.
What is important to understand is that this method does not execute the request. It just returns an Observable. Until you subscribe to the Observable the request will not be sent.
Subscribe
So what we want to do on the subscription. Since there is only one request, we can listen in on the complete and error (no need for the onnext). Once we get a result from the external system we will update another system with relevant information, and at the end we will sent the result to the client
service.sendSuspenedRequest(Long.parseLong(id))
.doOnCompleted(() -> {
logger.debug("Account Suspended - {}", accountId);
//update salesforce
Account account = new Account();
account.setMonthly_Payment_Done__c(false);
account.setStatus__c(Account.FIELD_Status_INACTIVE);
salesforceService.updateAccount(Long.valueOf(id), account)
.onSuccess(o -> //send result to client)
.onFailure(e -> {
logger.error(e.getMessage());
//send result to client
});
})
.doOnError(e -> {
String message = AngelSenseException.toString(e);
logger.error(message);
//send result to client
})
.subscribe();
MVC Reactive
As you can see above we now have a new problem. The observable return immediately and the request runs in the background. So how do we return the response object to client?
For that spring has an object called DeferredResult. With this object we return the instance but only update the value once we have value.
The full code will now look like:
@RequestMapping(value = "/account/suspend", method = RequestMethod.POST)
public @ResponseBody DeferredResult<ResponseEntity<StatusDto>> accountSuspended(@RequestParam(value = "data", required = true) JSONObject data) {
DeferredResult<ResponseEntity<StatusDto>> result = new DeferredResult<>();
logger.trace("Account Suspended: " + data.toString());
String accountId = data.getString("accountId");
String id = data.getString("id");
wooCommerceService.sendSuspenedRequest(Long.parseLong(id))
.doOnCompleted(() -> {
logger.debug("Account Suspended - {}", accountId);
//update salesforce
Account account = new Account();
account.setMonthly_Payment_Done__c(false);
account.setStatus__c(Account.FIELD_Status_INACTIVE);
salesforceService.updateAccount(Long.valueOf(id), account)
.onSuccess(o -> result.setResult(getResponse(new StatusDto())))
.onFailure(e -> {
logger.error(e.getMessage());
result.setResult(getResponse(new StatusDto(e.getMessage())));
});
})
.doOnError(e -> {
logger.error(e.getMessage());
result.setResult(getResponse(new StatusDto(e.getMessage())));
})
.subscribe();
return result;
}
Subscribe to:
Posts (Atom)