[Java] Implementacja asynchronicznego dostępu do synchronicznych usług

Czasami chcemy umożliwić konsumpcję synchronicznej usługi tak jak gdyby była asynchroniczna (np. cały nasze kod jest asynchroniczny). Na szczęście w większości przypadków możemy to osiągnąć w dość prosty sposób.
Rozważmy następującą usługę

public interface ISearchService {
	SearchResult searchByCriteria(Criteria criteria) throws SearchException;
}

Musimy oczywiście zaproponować nowe, asynchroniczne API dostępu do niej. Mamy tutaj zasadniczo dwa możliwe podejścia.

Pierwsze z nich polega na przyjęciu obiektu zwrotnego który zostanie wywołany gdy wyszukiwanie zakończy się i dostępne będzie jego rezultat lub przyczyna błędu. Wypada też zwrócić obiekt umożliwiający przerwanie operacji wyszukiwania. Nasz nowe API będzie w efekcie wyglądać następująco

public interface IAsyncSearchService {
   CancellationToken searchByCriteria(Criteria criteria, SearchResultCallback searchCallback) 
      throws SearchException;
}

public interface SearchResultCallback{
   void onSearchResult(Future<SearchResult> searchResult);
}

Możemy też zwrócić obiekt Future reprezentujący asynchroniczny rezultat operacji wyszukiwania. Wówczas deklaracja usługi będzie wyglądać tak

public interface IAsyncSearchService {
   Future<SearchResult> searchByCriteria(Criteria criteria)
         throws SearchException;
}

Decyzja o zwracaniu obiektu Future nie była do końca oczywista. Z jednej strony w mojej opinie ten interfejs fundamentalnie wadliwy ponieważ nie oferuje możliwości określenia kontynuacji przepływu sterowania. Z drugiej strony zwracanie obiektu ListenableFuture jest kwestionowalne z co najmniej dwóch powodów. Po pierwsze, nie popieram zanieczyszczania publicznego API obiektami z bibliotek zewnętrznych – nawet jeśli pochodzą od Google. Po drugie, wspomniany interfejs gwarantuje kontrakt który w praktyce może być trudny do spełnienia przy innej niż domyślna implementacji. W praktyce zapewne zdecydowałbym się na zwracanie obiektu

public interface AsyncResult<V> extends Future<V>{
   AsyncResult<V> whenDone(AsyncCallback<V> asyncCallback);
}

public interface AsyncCallback<V>{
   void onDone(AsyncResult<V> result);
}

W implementacji dla wygody wykorzystam bibliotekę Google Guava co znacznie uprości kod (czy może raczej ukryje jego złożoność).
Oczywiście, Nie istnieje coś takiego jak darmowe obiady , więc blokujące wywołanie metody musi zablokować wątek w którym jest wykonywane – nie musi to jednak być wątek klienta. W efekcie otrzymujemy API asynchroniczne z punktu widzenia klienta kosztem wykorzystania dodatkowy wątków.

Implementacja pierwszego podejścia jest dość prosta

public class AsyncSearchService implements IAsyncSearchService {

   ISearchService searchService;

   ListeningExecutorService executorService;

   public AsyncSearchService(ISearchService searchService) {
      this.searchService = searchService;
      createExecutorService();
   }

   private void createExecutorService() {
      executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
   }
   
   @Override
   public CancellationToken searchByCriteria(final Criteria criteria,
         SearchResultCallback searchCallback) {
      ListenableFuture<SearchResult> futureResult = invokeSearchByCriteria(criteria);
      notifyWhenDone(futureResult, searchCallback);
      return CancellationTokenImpl.fromFuture(futureResult);
   }

   private ListenableFuture<SearchResult> invokeSearchByCriteria(
         final Criteria criteria) {
      return executorService.submit(wrapServiceCall(criteria));
   }

   private Callable<SearchResult> wrapServiceCall(final Criteria criteria) {
      return new Callable<SearchResult>() {
         @Override
         public SearchResult call() throws Exception {
            return searchService.searchByCriteria(criteria);
         }
      };
   }

   private void notifyWhenDone(
         ListenableFuture<SearchResult> futureResult,
         SearchResultCallback searchCallback) {
      Futures.addCallback(futureResult, makeFutureCallbackFrom(searchCallback));
   }

   private FutureCallback<? super SearchResult> makeFutureCallbackFrom(
         final SearchResultCallback searchCallback) {
      return new FutureCallback<SearchResult>() {

         @Override
         public void onSuccess(SearchResult searchResult) {
            searchCallback.onSearchResult(CompletedFuture.withResult(searchResult));
         }

         @Override
         public void onFailure(Throwable t) {
            searchCallback.onSearchResult(CompletedFuture.<SearchResult> withFailure(t));
         }

      };
   }
}

drugiego jest bardzo prosta

public class AsyncSearchService implements IAsyncSearchService {

   ISearchService searchService;

   ListeningExecutorService executorService;

   public AsyncSearchService(ISearchService searchService) {
      this.searchService = searchService;
      createExecutorService();
   }
   
   @Override
   public Future<SearchResult> searchByCriteria(final Criteria criteria) {
      return invokeSearchByCriteria(criteria);
   }
   
   private ListenableFuture<SearchResult> invokeSearchByCriteria(
         final Criteria criteria) {
      return executorService.submit(wrapServiceCall(criteria));
   }

   private Callable<SearchResult> wrapServiceCall(final Criteria criteria) {
      return new Callable<SearchResult>() {
         @Override
         public SearchResult call() throws Exception {
            return searchService.searchByCriteria(criteria);
         }
      };
   }
}

Na zakończenie chciałby zauważyć, że oba podejścia są w praktyce równoważne. Jeśli np. mamy do czynienia z API opartym o wywołania zwrotne możemy z łatwością napisać adapter do API drugiego typu. Pomijam obsługę delegacji przerywania bo jest dość zdradliwa a nie jest szczególnie istotna w tym przykładowym kodzie. W praktyce jeśli nie chcemy obsługiwać delegacji przerywania powinniśmy chociaż zadbać o to by zwracany obiekt Future nie przeszedł w stan CANCELLED – ale to już inna historia.

public Future<SearchResult> searchByCriteria(Criteria criteria) {
   final SettableFuture<SearchResult> futureResult = SettableFuture.create();
   SearchResultCallback searchCallback = new SearchResultCallback() {

      @Override
      public void onSearchResult(Future<SearchResult> searchResult) {
          try {
            futureResult.set(searchResult.get());
         } catch (ExecutionException | InterruptedException e) {
           futureResult.setException(e);
         }
      }
   };

   CancellationToken cancellationToken = searchService.searchByCriteria(
         criteria, searchCallback);
   return futureResult;

}

Skomentuj

Wprowadź swoje dane lub kliknij jedną z tych ikon, aby się zalogować:

Logo WordPress.com

Komentujesz korzystając z konta WordPress.com. Log Out / Zmień )

Zdjęcie z Twittera

Komentujesz korzystając z konta Twitter. Log Out / Zmień )

Facebook photo

Komentujesz korzystając z konta Facebook. Log Out / Zmień )

Google+ photo

Komentujesz korzystając z konta Google+. Log Out / Zmień )

Connecting to %s