March 26, 2019

How-To: Send first messages in RSocket-Java

Acknowledgment

This is the second blog post in the series of How to get started with RSocket related blog posts in which I’m going to share the most straightforward tips on running, configuring, and debugging RSocket.

In this blog post, I’m going to share some necessary information about existing communication models in RSocket and how to start using them. Also, this post continuous the first so I assume you have a backbone of the project.

Working With Payload

Before we dig deeper into the communication models in RSocket, we have to learn the central data representation that RSocket uses for its messaging.

The central data representation in RSocket is io.rsocket.Payload. In RSocket, Payload consists of two parts:

  • data - usually, message body we send. Effective payload’s part that we can use for business logic.

  • metadata - message meta information. Allows sending some contextual information.

Both parts of the Payload interface are represented as binary data, so the responsibility of conversion of a logical element back and forth lays on us.

Building communication with RSocket

There are 5 central interaction models in RSocket:

  • requestResponse - the most common interaction type we ever used in our development.

  • fireAndForget - an advanced requestResponse which does not wait for the response at all.

  • requestStream - the request with a response as a stream of Payload.

  • requestChannel - duplex streaming where the sender can send a stream of Payload, as well as the responder, can respond with a stream of Payload.

  • metadataPush - the special interaction type which allows pushing meta information to a responder. The main difference with fireAndForget is that metadataPush awaits successful processing of the pushed data and completes when responder sends a complete signal.

The fantastic part of RSocket is that all those methods are incorporated within the io.rsocket.RSocket class. Moreover RSocket plays both roles - a sender role which means we can use RSocket in order to make a call, and the responder role which means we should return an implementation of this class in order to handle incoming calls.

Building Request-Response interaction

In this listen we are going to reuse the complete sample from the previous blog post. As we can see from the code snippet, we return an instance of the io.rsocket.AbstractRSocket class which is an abstract, no-ops implementation of the RSocket. To add the requestResponse interaction on the receiver part, we have to override existing implementation:

package com.example.rsocket;

import io.rsocket.AbstractRSocket;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.SocketAcceptor;
import reactor.core.publisher.Mono;
import io.rsocket.Payload;


public class RSocketPlayground {
  public static void main(String[] args) {
    RSocketFactory.ServerRSocketFactory serverRSocketFactory =
      RSocketFactory.receive();

    RSocketFactory.ServerTransportAcceptor serverTransportAcceptor =
      serverRSocketFactory.acceptor(new SocketAcceptor() {
        @Override
        public Mono<RSocket> accept(
            ConnectionSetupPayload payload,
            RSocket connectorRSocket
        ) {
          ...

          return Mono.just(new AbstractRSocket() {
            @Override
            public Mono<Payload> requestResponse(Payload payload) {    (1)
              System.out.println(
                  "Received Request-Response. " +
                  "Data: [" + payload.getDataUtf8() + "]. " +          (2)
                  "Metadata: [" + payload.getMetadataUtf8() + "]"      (3)
              );
              return Mono.just(payload);                               (4)
            }
          });
        }
      });

    ...
  }
}
  1. Declares of the requestResponse handler method that accept a Payload interface;

  2. Gets data and converts it to the UTF8 string

  3. Gets metadata and converts it to the UTF8 string

  4. Returns the same payload back

The example above shows how we can create the most straightforward handler for requestResponse interaction type.

In order to initiate interaction we have to call the requestResponse method on the connector RSocket:

...

Payload response = rSocket
  .requestResponse(             (1)
    DefaultPayload.create(      (2)
      "Hello World Payload",    (3)
      "Hello World Metadata"    (4)
    )
  )
  .block();                     (5)
  1. Executes requestResponse interaction;

  2. Creates instance of the io.rsocket.util.DefaultPayload which is one of the available implementations of Payload interface;

  3. Passes Payload data as String;

  4. Passes Payload metadata as String;

  5. Blocks execution until the successful/exceptional response from the server;

As we can see from the sample above, we can easily use RSocket API in order to execute plain request response interaction. In turn, RSocket core module provides us with straightforward API in order to construct a Payload instance from either byte buffer or String.

By running the code above we observe the following output in the console:

Received Request-Response. Data: [Hello World]. Metadata: [Hello Metadata]

Implementing Fire and Forget

In order to implement the Fire and Forget handler we have to override the following method in the AbstractRSocket instance we return on the receiver side:

@Override
public Mono<Void> fireAndForget(Payload payload) {
  System.out.println(
      "Received Fire-And-Forget. " +
          "Data: [" + payload.getDataUtf8() + "]. " +
          "Metadata: [" + payload.getMetadataUtf8() + "]"
  );

  return Mono.delay(Duration.ofDays(1))                      (1)
             .then();                                        (2)
}
  1. Return a Mono that delay response for one day;

  2. Ignores the result and just propagate onComplete signal when it is appeared.

The above sample listens for incoming a payload and responds with some long-running delay. With plain request-response interaction, such execution lasts a specified duration. However, let’s see what it takes with fire and forget case:

...

System.out.println(                                               (1)
  "FireAndForget Called At: [" + Instant.now().toString() + "]"
);
rSocket
  .fireAndForget(                                                 (2)
    DefaultPayload.create("Hello FireAndForget")
  )
  .doOnSuccess(__ ->
    System.out.println(                                           (3)
      "FireAndForget Done At: [" + Instant.now().toString() + "]"
    )
  )
  .block();
  1. Prints execution start time;

  2. Executes fire and forget call;

  3. Handles the completion response;

Now, if we run that code, we observer in the console logs something like the following:

FireAndForget Called At: [2019-03-26T16:59:51.056697Z]
Received Fire-And-Forget. Data: [Hello FireAndForget]. Metadata: []
FireAndForget Done At: [2019-03-26T16:59:51.070513Z]

As we can see from the logs above, the difference in time between the call execution and the response is tremendously less than 1 Day. In that way, we ensured the mentioned fire-and-forget behavior.

Implementing Request Stream and Request Channel

The last but not the least is streaming communication. The following sample shows how we can start using streaming in RSocket receiver side:

@Override
public Flux<Payload> requestStream(Payload payload) {                   (1)
  System.out.println(
    "Received Request Stream. " +
    "Data: [" + payload.getDataUtf8() + "]. " +
    "Metadata: [" + payload.getMetadataUtf8() + "]"
  );

  return Flux.range(0, 2)
    .map(i -> DefaultPayload.create("Stream Response: " + i));          (2)
}

@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {      (3)
  System.out.println("Received Request Channel.");

  return Flux
    .from(payloads)                                                     (4)
    .map(incomingPayload ->
      DefaultPayload
        .create("Channel Response: " + incomingPayload.getDataUtf8())   (5)
    );
}
  1. Declares the requestStream method which accept only as single Payload;

  2. Responds with FluxRange and .map each element to Payload;

  3. Declares the requestChannel method which accepts in that case a Publisher of Payload;

  4. Adopts Publisher to Reactor’s Flux;

  5. Maps each incoming Payload from the sender to some echo response Payload;

Now, we can incorporate requestStream and requestChannel together in order to taste all at once:

Flux<Payload> requestStreamResponseFlux = rSocket
  .requestStream(                                     (1)
    DefaultPayload.create("Hello Stream-Channel")
  );

rSocket.requestChannel(requestStreamResponseFlux)     (2)
  .doOnNext(p -> System.out.println(                  (3)
    "Received Back: " + p.getDataUtf8()
  ))
  .blockLast();
  1. Executes a request Stream call and stores the result Flux into the variable;

  2. Executes a request Channel call and passes as an parameter the result from requestStream call;

  3. Prints every incoming payload;

The code above shows used together requestStream and requestChannel calls where the result of requestStream is sending to requestChannel and create some chain of execution in that way. If we run the code mentioned above we observer the following output:

Received Request Stream. Data: [Hello Stream-Channel]. Metadata: []
Received Request Channel.
Received Back: Channel Response: Stream Response: 0
Received Back: Channel Response: Stream Response: 1

As we can conclude from the logs, the result stream from requestStream has been sent to requestChannel which finally provided their transformation on the incoming stream.

Summary

In this blog post, we covered the most straightforward steps to start sending messages from a connector to a receiver.

To recap:

  1. The central data representation in RSocket is a Payload interface.

  2. One of the available implementations of Payload is the DefaultPayload class.

  3. There are five central interaction types in RSocket.

  4. Request-Response allows you to do most wider interaction between client and server.

  5. There is an advanced Request-Response called Fire and Forget which allows sending a message without waiting for the response back.

  6. There are two interactions which allow handling data streaming.

  7. There is an additional metadataPush method that allows sending service/meta information to a recipient

What is next?

In the next blog post, we will focus on how to create fully peer to peer interaction between connector and receiver using RSocket-Java.

© Oleh Dokuka 2019

Powered by Hugo & Kiss.