Tuesday, November 1, 2016

Moving to worldpress

Hi,
  I have been for a long time on the blogger web site, but have found that worldpress is better.
So if you want to continue to follow my posts see me at:

https://chaimturkel.wordpress.com/
https://chaimturkel.wordpress.com/category/data-platform/

Tuesday, September 13, 2016

MVC RXJava

MVC RXJava

Use Case

My backend server externalizes an API that goes to another server to get information. I would like to use the react pattern as much as possible so that i do not block on threads. The standard HttpClient creates a new thread and blocks on it for the result. HttpClient does support async callback methods, but to make our code simpler we do not what to use the callbacks but would like to wrap all of the in the RX Framework.

RXJava

If you don’t know yet what RXJava is the go ahead and have a look at:

ObservableHttpResponse

There are a few different options to choose from, so go have a look at what is out there. I choose RxApacheHttp for it’s simplicity. Have a look at: https://github.com/ReactiveX/RxApacheHttp

Simple get example:
Of Course you need to define your basic connection parameters and options:

final RequestConfig requestConfig = RequestConfig.custom()
       .setSocketTimeout(3000)
       .setConnectTimeout(500).build();
final CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom()
       .setDefaultRequestConfig(requestConfig)
       .setMaxConnPerRoute(20)
       .setMaxConnTotal(50)
       .build();


Then for a simple get:
ObservableHttp.createRequest(HttpAsyncMethods.createGet("http://www.wikipedia.com"), client)
       .toObservable()
       .flatMap({ ObservableHttpResponse response ->
               return response.getContent().map({ byte[] bb ->
                       return new String(bb);
               });
       })
       .toBlockingObservable()
       .forEach({ String resp ->
               // this will be invoked once with the response
               println(resp);
       });


Post Request

My case was to send a post request with a headers and a json payload.

public Observable<ObservableHttpResponse> sendSuspenedRequest(long id){
JSONObject object = new JSONObject();
object.put("id",id);
object.put("date",wooFormat.print(today, Locale.ENGLISH));
final CloseableHttpAsyncClient httpClient = buildClient();
httpClient.start();
StringEntity entity = new StringEntity(object.toString(), StandardCharsets.UTF_8);
entity.setContentType("application/json");
final HttpAsyncRequestProducer post = HttpAsyncMethods.createPost(url + suspendedUrlPath, object.toString(), ContentType.APPLICATION_JSON);
((BasicAsyncRequestProducer)request).generateRequest().setHeader(HttpHeaders.AUTHORIZATION, "Token " + authorizationKey);
((BasicAsyncRequestProducer)request).generateRequest().setHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
return ObservableHttp.createRequest(post,httpClient).toObservable();
}


As you can see the headers were a bit tricky.
What is important to understand is that this method does not execute the request. It just returns an Observable. Until you subscribe to the Observable the request will not be sent.

Subscribe

So what we want to do on the subscription. Since there is only one request, we can listen in on the complete and error (no need for the onnext). Once we get a result from the external system we will update another system with relevant information, and at the end we will sent the result to the client

service.sendSuspenedRequest(Long.parseLong(id))
    .doOnCompleted(() -> {
       logger.debug("Account Suspended - {}", accountId);
       //update salesforce
       Account account = new Account();
       account.setMonthly_Payment_Done__c(false);
       account.setStatus__c(Account.FIELD_Status_INACTIVE);
       salesforceService.updateAccount(Long.valueOf(id), account)
             .onSuccess(o -> //send result to client)
             .onFailure(e -> {
                logger.error(e.getMessage());
                //send result to client
             });
    })
    .doOnError(e -> {
       String message = AngelSenseException.toString(e);
       logger.error(message);
       //send result to client
    })
    .subscribe();

MVC Reactive

As you can see above we now have a new problem. The observable return immediately and the request runs in the background. So how do we return the response object to client?
For that spring has an object called DeferredResult. With this object we return the instance but only update the value once we have  value.
The full code will now look like:

@RequestMapping(value = "/account/suspend", method = RequestMethod.POST)
public @ResponseBody DeferredResult<ResponseEntity<StatusDto>> accountSuspended(@RequestParam(value = "data", required = true) JSONObject data) {
 DeferredResult<ResponseEntity<StatusDto>> result = new DeferredResult<>();
 logger.trace("Account Suspended: " + data.toString());
 String accountId = data.getString("accountId");
 String id = data.getString("id");

 wooCommerceService.sendSuspenedRequest(Long.parseLong(id))
       .doOnCompleted(() -> {
          logger.debug("Account Suspended - {}", accountId);
          //update salesforce
          Account account = new Account();
          account.setMonthly_Payment_Done__c(false);
          account.setStatus__c(Account.FIELD_Status_INACTIVE);
          salesforceService.updateAccount(Long.valueOf(id), account)
                .onSuccess(o -> result.setResult(getResponse(new StatusDto())))
                .onFailure(e -> {
                   logger.error(e.getMessage());
                   result.setResult(getResponse(new StatusDto(e.getMessage())));
                });
       })
       .doOnError(e -> {
          logger.error(e.getMessage());
          result.setResult(getResponse(new StatusDto(e.getMessage())));
       })
       .subscribe();
 return result;
}