March 24, 2019

How-To: Create RSocket-Java Receiver/Connector

Acknowledgment

This is a first blog post in the series of How to get started with RSocket related blog posts in which I’m going to share the most straightforward tips on running, configuring, and debugging RSocket.

This series of blog posts do not include the motivation of the RSocket itself which will be delivered in a separate series.

Declaring Core Dependency

The first steps in the setup of any project are declaring the required dependencies. In our project we use Gradle, but the same can be reproduced with Maven:

dependencies {
  implementation 'io.rsocket:rsocket-core:0.12.1-RC3-SNAPSHOT'
}
Note

At a moment of this blog writing, the latest version of RSocket-Java was 0.12.1-RC3-SNAPSHOT.

Constructing RSocket Receiver

In RSocket-Java, any construction of connector/receiver starts with static factory called io.rsocket.RSocketFactory. RSocketFactory has two central builder methods one of which is for receiver declaration:

package com.example.rsocket;

import io.rsocket.RSocketFactory;


public class RSocketPlayground {
  public static void main(String[] args) {
    RSocketFactory.ServerRSocketFactory serverRSocketFactory =
      RSocketFactory.receive();
  }
}

The above example shows the first step of building an RSocket receiver. Call to RSocketFactory.receive() returns a RSocketFactory.ServerRSocketFactory instance which gives a wide list of customization for RSocket receiver. For now, we just use only io.rsocket.RSocketFactory.ServerRSocketFactory#acceptor which allows us to specify a handler for incoming connections:

package com.example.rsocket;

import io.rsocket.AbstractRSocket;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.SocketAcceptor;
import reactor.core.publisher.Mono;


public class RSocketPlayground {
  public static void main(String[] args) {
    RSocketFactory.ServerRSocketFactory serverRSocketFactory =
      RSocketFactory.receive();

    RSocketFactory.ServerTransportAcceptor serverTransportAcceptor =
      serverRSocketFactory.acceptor(new SocketAcceptor() {             (1)
        @Override
        public Mono<RSocket> accept(                                   (2)
            ConnectionSetupPayload payload,                            (3)
            RSocket connectorRSocket                                   (4)
        ) {
          return Mono.just(new AbstractRSocket() {});                  (5)
        }
      });
  }
}
  1. Creates an instance of the io.rsocket.SocketAcceptor class and passes it to the acceptor method;

  2. Declares the accept handler method which accept two params and asynchronously return server RSocket;

  3. Represents the information that connector can send to receiver at the connection stage;

  4. Instantiates RSocket that represents/gives access to the connector functionality;

  5. Constructs instance of RSocket that represents/gives access to receiver functionality;

The above sample continues the RSocket receiver construction and shows how we can declare a no-ops socket acceptor. In turn, applying that stage returns an io.rsocket.RSocketFactory.ServerTransportAcceptor which allows us to finalize setup by declaring a transport.

Declaring Transport Dependency

Since RSocket is an Application Layer protocol, it may work on top of any lower level transport. In general, the core dependency provides just an interface for transport. Thus we have to add another link to the required implementation. Today we are going to use local, interprocess transport for simplicity:

dependencies {
  implementation 'io.rsocket:rsocket-transport-local:0.12.1-RC3-SNAPSHOT'
}

Constructing RSocket Receiver Transport

Finally, we can use io.rsocket.transport.local.LocalServerTransport in order to run RSocket receiver on it:

package com.example.rsocket;

import io.rsocket.AbstractRSocket;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.SocketAcceptor;
import reactor.core.publisher.Mono;


public class RSocketPlayground {
  public static void main(String[] args) {
    RSocketFactory.ServerRSocketFactory serverRSocketFactory =
      RSocketFactory.receive();

    RSocketFactory.ServerTransportAcceptor serverTransportAcceptor =
      serverRSocketFactory.acceptor(new SocketAcceptor() {
        @Override
        public Mono<RSocket> accept(
            ConnectionSetupPayload payload,
            RSocket connectorRSocket
        ) {
          return Mono.just(new AbstractRSocket() {});
        }
      });

    Closeable closeableServer = serverTransportAcceptor
      .transport(LocalServerTransport.create("RSocket-Receiver"))    (1)
      .start()                                                       (2)
      .block();                                                      (3)
  }
}
  1. Creates an instance of LocalServerTransport and passes it to builder;

  2. Starts RSocket Receiver on the specified transport;

  3. Blocks executing thread by the end of startup process;

The above code show finalized creation of the RSocket Receiver that is running on the local or simply inter-process transport implementation.

Constructing RSocket Connector

Following the samples mentioned above, we can easily create an instance of the RSocket connector. The only difference is that we have to use the io.rsocket.RSocketFactory#connect factory method instead:

RSocket rSocket = RSocketFactory
  .connect()
  .transport(LocalClientTransport.create("RSocket-Receiver"))
  .start()
  .block();

Complete Sample

Finally, if we add some logging to the Receiver acceptor as in the following complete sample:

package com.example.rsocket;

import io.rsocket.AbstractRSocket;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.SocketAcceptor;
import reactor.core.publisher.Mono;


public class RSocketPlayground {
  public static void main(String[] args) {
    RSocketFactory.ServerRSocketFactory serverRSocketFactory =
      RSocketFactory.receive();

    RSocketFactory.ServerTransportAcceptor serverTransportAcceptor =
      serverRSocketFactory.acceptor(new SocketAcceptor() {
        @Override
        public Mono<RSocket> accept(
            ConnectionSetupPayload payload,
            RSocket connectorRSocket
        ) {
          System.out.println(
            "Received Connection. Data: [" + payload.getDataUtf8() + "]. " +
            "Metadata: [" + payload.getMetadataUtf8() + "]"
          );

          return Mono.just(new AbstractRSocket() {});
        }
      });

    Closeable closeableServer = serverTransportAcceptor
      .transport(LocalServerTransport.create("RSocket-Receiver"))
      .start()
      .block();

    RSocket rSocket = RSocketFactory
      .connect()
      .transport(LocalClientTransport.create("RSocket-Receiver"))
      .start()
      .block();
  }
}

By running the final code, we observe the following printed message in a console:

Received Connection. Data: []. Metadata: []

It means that our connector has successfully reached the receiver.

Summary

In this blog post, we covered the most straightforward steps required to run the RSocket receiver and connector.

To recap:

  1. All you need is in io.rsocket.RSocketFactory.

  2. RSocket is Application layer protocol so it can be built in to of any lower transport.

  3. The rocket-core module does not provide a particular implementation of the transport. Thus it is required to use an additional module.

  4. The simplest way to get started is by using inter-process transport implementation which does not require running a real server.

What is next?

In the next blog post, we will focus on how to start sending data between peers using RSocket-Java.

© Oleh Dokuka 2019

Powered by Hugo & Kiss.