Uploaded image for project: 'OpenDJ'
  1. OpenDJ
  2. OPENDJ-2920

Investigate using asynchronous streams instead of promises

    Details

    • Type: Task
    • Status: Done
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 3.0.0
    • Fix Version/s: 4.0.0
    • Component/s: core apis
    • Labels:
      None

      Description

      There's a problem with using Promises for LDAP: all LDAP operations can yield streams of responses. The most obvious and common case is the LDAP search operation. However, all operations may return intermediate responses, although these are most likely to be used for extended operations. Promises are similar to Futures and only support completion using a single "result" object. In other words, they are intended for asynchronous functions, not asynchronous streams. In the client Connection API we have been forced to use a combination of Promises and call-backs (e.g. org.forgerock.opendj.ldap.SearchResultHandler) which is pretty clunky since we lose the ability to perform nice stream based processing on the search results.

      We could have a stream based promise that supports processing of intermediate events before completion:

      connection.search(...)                  // Send the search request
          .thenOnSearchResultEntry(...)       // consume entries
          .thenOnSearchResultReference(...)   // consume references
          .thenOnIntermediateResponse(...)    // consume intermediate responses
          .thenOnResult(...);                 // consume final result
      

      However, there are some hidden details:

      • what happens if the client is slow to register any of these call-backs? For example, the client thread is descheduled immediately after the search promise is returned but before any of the call-backs are registered
      • what happens if the client wants to ignore intermediate responses or is not expecting any, yet very many are returned?
      • what happens if the underlying search implementation is blocking? In this case all of the search results are computed before the promise is returned and will need to be buffered somewhere (this is really just an extreme but common case of the first two points above)
      • can intermediate events (intermediate responses, search entries, etc) be consumed more than once? In other words, should clients be able to register multiple listeners against the same search promise? How long should responses be buffered before deciding that all listeners have been registered?
      • how can we provide an API for transforming (mapping) results as they are received? Mapping is likely to change the type of the stream, yet we still want to be able to register listeners for the other types of intermediate events

      The problems arise because Promises represents the result of an operation that has already been started by the time the client receives the Promise. Specifically, Promises are returned to client code after their associated task has begun. Asynchronous stream processing libraries, like RxJava, take a different approach, usually only starting the task once the client subscribes to stream. Using the example search from above this is how the behavior changes:

      connection.search(...)                  // Nothing sent yet
          .thenOnSearchResultEntry(...)
          .thenOnSearchResultReference(...)
          .thenOnIntermediateResponse(...)
          .thenOnResult(...);                 // Search is sent/performed now
      

      In the above example, registration (thenOnResult()) of a completion handler must be performed once and once only, and corresponds to Rx's subscribe() method. When the completion handler is registered we know all handlers have been registered so there is no need to buffer intermediate events and we can discard events for which there is no corresponding handler. Another advantage is that the producer is responsible for notifying the subscriber and is therefore able to perform additional tasks after notification, such as enabling SASL transport layers (see OPENDJ-2839), even if the operation was processed synchronously. Finally, an LDAP client API would not know the LDAP request's request ID until the final subscription (thenOnResult()) is performed because it is only at this point that the request is actually sent over the network. This implies that the call to thenOnResult() should return an object containing the request ID. It probably makes sense to return a Promise from this call.

      The goal of using a stream/flow API is to avoid client code that contains lots of nested callbacks and instead using chaining. Therefore it is important that we support some basic operators for filtering and mapping intermediate events. However, I really don't want us to re-invent RxJava! The goal should be to provide the bare minimum of functionality that fits our use cases whilst allowing third-party applications to easily integrate our SDK with frameworks like Rx 1 and 2, Akka, etc. The recommended JDK9 future proof approach for library developers is to support Reactive Streams, but even these don't seem to be a great fit for LDAP, since the protocol comprises of a sequence of heterogeneous messages (intermediate responses, search entries and references) terminated with an LDAP result, whereas Rx and friends only support streaming of a homogeneous sequence of events. We could workaround this by returning a stream of "LDAP messages" and letting client code figure out the type of each message, but this is not very user friendly and we also lose the protocol's statefulness (that an LDAP result is always the last message).

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                ylecaillez Yannick Lecaillez
                Reporter:
                matthew Matthew Swift
                Dev Assignee:
                Yannick Lecaillez
              • Votes:
                0 Vote for this issue
                Watchers:
                1 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: