Acknowledgment
This is the second blog post in the series of How to get started with RSocket related blog posts in which I’m going to share the most straightforward tips on running, configuring, and debugging RSocket.
In this blog post, I’m going to share some necessary information about existing communication models in RSocket and how to start using them. Also, this post continuous the first so I assume you have a backbone of the project.
Working With Payload
Before we dig deeper into the communication models in RSocket, we have to learn the central data representation that RSocket uses for its messaging.
The central data representation in RSocket is io.rsocket.Payload
. In RSocket, Payload
consists of two parts:
-
data
- usually, message body we send. Effective payload’s part that we can use for business logic. -
metadata
- message meta information. Allows sending some contextual information.
Both parts of the Payload
interface are represented as binary data, so the responsibility of conversion of a logical element back and forth lays on us.
Building communication with RSocket
There are 5 central interaction models in RSocket:
-
requestResponse
- the most common interaction type we ever used in our development. -
fireAndForget
- an advancedrequestResponse
which does not wait for the response at all. -
requestStream
- the request with a response as a stream ofPayload
. -
requestChannel
- duplex streaming where the sender can send a stream ofPayload
, as well as the responder, can respond with a stream ofPayload
. -
metadataPush
- the special interaction type which allows pushing meta information to a responder. The main difference withfireAndForget
is thatmetadataPush
awaits successful processing of the pushed data and completes when responder sends a complete signal.
The fantastic part of RSocket is that all those methods are incorporated within the io.rsocket.RSocket
class. Moreover RSocket
plays both roles - a sender role which means we can use RSocket
in order to make a call, and the responder role which means we should return an implementation of this class in order to handle incoming calls.
Building Request-Response interaction
In this listen we are going to reuse the complete sample from the previous blog post. As we can see from the code snippet, we return an instance of the io.rsocket.AbstractRSocket
class which is an abstract, no-ops implementation of the RSocket
. To add the requestResponse
interaction on the receiver part, we have to override existing implementation:
package com.example.rsocket;
import io.rsocket.AbstractRSocket;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.SocketAcceptor;
import reactor.core.publisher.Mono;
import io.rsocket.Payload;
public class RSocketPlayground {
public static void main(String[] args) {
RSocketFactory.ServerRSocketFactory serverRSocketFactory =
RSocketFactory.receive();
RSocketFactory.ServerTransportAcceptor serverTransportAcceptor =
serverRSocketFactory.acceptor(new SocketAcceptor() {
@Override
public Mono<RSocket> accept(
ConnectionSetupPayload payload,
RSocket connectorRSocket
) {
...
return Mono.just(new AbstractRSocket() {
@Override
public Mono<Payload> requestResponse(Payload payload) { (1)
System.out.println(
"Received Request-Response. " +
"Data: [" + payload.getDataUtf8() + "]. " + (2)
"Metadata: [" + payload.getMetadataUtf8() + "]" (3)
);
return Mono.just(payload); (4)
}
});
}
});
...
}
}
-
Declares of the
requestResponse
handler method that accept aPayload
interface; -
Gets
data
and converts it to the UTF8 string -
Gets
metadata
and converts it to the UTF8 string -
Returns the same payload back
The example above shows how we can create the most straightforward handler for requestResponse
interaction type.
In order to initiate interaction we have to call the requestResponse
method on the connector RSocket
:
...
Payload response = rSocket
.requestResponse( (1)
DefaultPayload.create( (2)
"Hello World Payload", (3)
"Hello World Metadata" (4)
)
)
.block(); (5)
-
Executes
requestResponse
interaction; -
Creates instance of the
io.rsocket.util.DefaultPayload
which is one of the available implementations ofPayload
interface; -
Passes
Payload
data asString
; -
Passes
Payload
metadata asString
; -
Blocks execution until the successful/exceptional response from the server;
As we can see from the sample above, we can easily use RSocket
API in order to execute plain request response interaction. In turn, RSocket core module provides us with straightforward API in order to construct a Payload
instance from either byte
buffer or String
.
By running the code above we observe the following output in the console:
Received Request-Response. Data: [Hello World]. Metadata: [Hello Metadata]
Implementing Fire and Forget
In order to implement the Fire and Forget handler we have to override the following method in the AbstractRSocket
instance we return on the receiver side:
@Override
public Mono<Void> fireAndForget(Payload payload) {
System.out.println(
"Received Fire-And-Forget. " +
"Data: [" + payload.getDataUtf8() + "]. " +
"Metadata: [" + payload.getMetadataUtf8() + "]"
);
return Mono.delay(Duration.ofDays(1)) (1)
.then(); (2)
}
-
Return a
Mono
that delay response for one day; -
Ignores the result and just propagate
onComplete
signal when it is appeared.
The above sample listens for incoming a payload and responds with some long-running delay. With plain request-response interaction, such execution lasts a specified duration. However, let’s see what it takes with fire and forget case:
...
System.out.println( (1)
"FireAndForget Called At: [" + Instant.now().toString() + "]"
);
rSocket
.fireAndForget( (2)
DefaultPayload.create("Hello FireAndForget")
)
.doOnSuccess(__ ->
System.out.println( (3)
"FireAndForget Done At: [" + Instant.now().toString() + "]"
)
)
.block();
-
Prints execution start time;
-
Executes fire and forget call;
-
Handles the completion response;
Now, if we run that code, we observer in the console logs something like the following:
FireAndForget Called At: [2019-03-26T16:59:51.056697Z]
Received Fire-And-Forget. Data: [Hello FireAndForget]. Metadata: []
FireAndForget Done At: [2019-03-26T16:59:51.070513Z]
As we can see from the logs above, the difference in time between the call execution and the response is tremendously less than 1 Day. In that way, we ensured the mentioned fire-and-forget behavior.
Implementing Request Stream and Request Channel
The last but not the least is streaming communication. The following sample shows how we can start using streaming in RSocket receiver side:
@Override
public Flux<Payload> requestStream(Payload payload) { (1)
System.out.println(
"Received Request Stream. " +
"Data: [" + payload.getDataUtf8() + "]. " +
"Metadata: [" + payload.getMetadataUtf8() + "]"
);
return Flux.range(0, 2)
.map(i -> DefaultPayload.create("Stream Response: " + i)); (2)
}
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) { (3)
System.out.println("Received Request Channel.");
return Flux
.from(payloads) (4)
.map(incomingPayload ->
DefaultPayload
.create("Channel Response: " + incomingPayload.getDataUtf8()) (5)
);
}
-
Declares the
requestStream
method which accept only as singlePayload
; -
Responds with
FluxRange
and.map
each element toPayload
; -
Declares the
requestChannel
method which accepts in that case aPublisher
ofPayload
; -
Adopts
Publisher
to Reactor’sFlux
; -
Maps each incoming
Payload
from the sender to some echo responsePayload
;
Now, we can incorporate requestStream
and requestChannel
together in order to taste all at once:
Flux<Payload> requestStreamResponseFlux = rSocket
.requestStream( (1)
DefaultPayload.create("Hello Stream-Channel")
);
rSocket.requestChannel(requestStreamResponseFlux) (2)
.doOnNext(p -> System.out.println( (3)
"Received Back: " + p.getDataUtf8()
))
.blockLast();
-
Executes a request Stream call and stores the result
Flux
into the variable; -
Executes a request Channel call and passes as an parameter the result from
requestStream
call; -
Prints every incoming payload;
The code above shows used together requestStream
and requestChannel
calls where the result of requestStream
is sending to requestChannel
and create some chain of execution in that way. If we run the code mentioned above we observer the following output:
Received Request Stream. Data: [Hello Stream-Channel]. Metadata: []
Received Request Channel.
Received Back: Channel Response: Stream Response: 0
Received Back: Channel Response: Stream Response: 1
As we can conclude from the logs, the result stream from requestStream
has been sent to requestChannel
which finally provided their transformation on the incoming stream.
Summary
In this blog post, we covered the most straightforward steps to start sending messages from a connector to a receiver.
To recap:
-
The central data representation in RSocket is a
Payload
interface. -
One of the available implementations of
Payload
is theDefaultPayload
class. -
There are five central interaction types in RSocket.
-
Request-Response allows you to do most wider interaction between client and server.
-
There is an advanced Request-Response called Fire and Forget which allows sending a message without waiting for the response back.
-
There are two interactions which allow handling data streaming.
-
There is an additional
metadataPush
method that allows sending service/meta information to a recipient
What is next?
In the next blog post, we will focus on how to create fully peer to peer interaction between connector and receiver using RSocket-Java.