Building RSocket based springboot application

Avatar of Rakesh Mothukuri

Rakesh Mothukuri

| Reading Time : 10 minutes

Avatar of Rakesh Mothukuri Avatar of Rakesh Mothukuri Avatar of Rakesh Mothukuri Avatar of Rakesh Mothukuri

Introduction

RSocket translates to Reactive socket is a messaging protocol that works over TCP or Websockets. Communication modes this protocol provides are fire-and-forget, request-response and streaming. Since RSocket is fully reactive, its ideal for high-throughput applications.

In this post we will explore three communications modes which are fire-and-forget, request-response and streaming and test with RSocket Client CLI (RSC) a postman type application but to test application over socket.

Setup

  1. Install Java preferably >8.

  2. Spring boot skeleton project

    Navigate to start.spring.io, select RSocket as dependency and a stable version of spring boot version at the point of time and clicking on Generate should give a zip file with skeleton project which good to get going.

Initial project structure looks as

  1. Application configuration

    Next, setting up the port on which this application runs by modifying the file application.properties

spring.rsocket.server.port=7000
spring.main.lazy-initialization=true
  1. Client to test the server application:

    Here i am using RSC client created by Toshiaki Maki . Set-up instructions are on his github page.

Data model:

We create a new POJO class representing the data being exchanged by client and server. This class will consists of two member variables for now which are message created

package com.example.rsocket;

import java.time.Instant;

public class Message {

  private String message; 
  private long created = Instant.now().getEpochSecond(); 

  public Message(String message) {
    this.message = message;
  }

  public String getMessage() {
    return message;
  }

  public void setMessage(String message) {
    this.message = message;
  }

  public long getCreated() {
    return created;
  }

  public void setCreated(long created) {
    this.created = created;
  }
}

This should get the base project set-up’d and ready for writing code and expose via Socket.

Communication mode 1: Request-Response

Development

Request-Response is a classic way of communication where each request made by client will get a response back. This is achieved by including below snippet (full code is available in the github repository : https://github.com/rockey5520/rsocket )

@MessageMapping("request-response")
	Mono<Message> requestResponse(final Message message) {
		System.out.println("Received request-response message: {}"+message);
		return Mono.just(new Message("You said: " + message.getMessage()));
	}

The key part of the above code is @MessageMapping("request-response") where we are letting spring boot know that this method requestResponse should be invoked when client makes a call using request-response communication mode. Here we are using the POJO class Message as the payload for both receiving from client and responding to client.

Since this is a request and response model where server sends response to requests made by client Mono of Spring Reactor reactor.core is perfect where there is only need to send response once for each request.

Testing

using the RSC client installed if you pass below instruction

rsc --debug --request --data "{\"message\":\"Hello\"}" --route request-response --stacktrace tcp://localhost:7000

should it run successfully you would see an ouput as below

Frame => Stream ID: 1 Type: REQUEST_RESPONSE Flags: 0b100000000 Length: 49
Metadata:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 11 10 72 65 71 75 65 73 74 2d 72 65 73 |.....request-res|
|00000010| 70 6f 6e 73 65                                  |ponse           |
+--------+-------------------------------------------------+----------------+
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 48 65 6c 6c |{"message":"Hell|
|00000010| 6f 22 7d                                        |o"}             |
+--------+-------------------------------------------------+----------------+
2020-10-11 10:15:29.750 DEBUG --- [actor-tcp-nio-1] i.r.FrameLogger : receiving -> 
Frame => Stream ID: 1 Type: NEXT_COMPLETE Flags: 0b1100000 Length: 56
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 59 6f 75 20 |{"message":"You |
|00000010| 73 61 69 64 3a 20 48 65 6c 6c 6f 22 2c 22 63 72 |said: Hello","cr|
|00000020| 65 61 74 65 64 22 3a 31 36 30 32 34 31 31 33 32 |eated":160241132|
|00000030| 39 7d                                           |9}              |
+--------+-------------------------------------------------+----------------+
{"message":"You said: Hello","created":1602411329}

This response received is constructed in 3 message frames

Frame 1

First frame is labeled as metadata shows the routing metadata ( request-response) sent to the server

Frame 2

Second frame shows the data we sent to the server ( here in this example it is “Hello”) a JSON string

Frame 3

Third frame shows the server response sent back to the client.

Communication mode 2: Fire-and-Forget

Development

Fire and forget is another way of communication where request made by client but wont get a response back from server. This is achieved by including below snippet (full code is available in the github repository : https://github.com/rockey5520/rsocket )

	@MessageMapping("fire-and-forget")
	public Mono<Void> fireAndForget(final Message message) {
		System.out.println("Received fire-and-forget request: {}"+ message);
		return Mono.empty();
	}

The key part of the above code is @MessageMapping("fire-and-forget") where we are letting spring boot know that this method fireAndForget should be invoked when client makes a call using fire-and-forget communication mode. Here we are using the POJO class Message` as the payload for both receiving from client.

Testing

using the RSC client installed if you pass below instruction

rsc --debug --fnf --data "{\"message\":\"Hello\"}" --route fire-and-forget --stacktrace tcp://localhost:7000

should it run successfully you would see an ouput as below

2020-10-11 10:31:28.398 DEBUG --- [     parallel-2] i.r.FrameLogger : sending -> 
Frame => Stream ID: 1 Type: REQUEST_FNF Flags: 0b100000000 Length: 48
Metadata:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 10 0f 66 69 72 65 2d 61 6e 64 2d 66 6f |.....fire-and-fo|
|00000010| 72 67 65 74                                     |rget            |
+--------+-------------------------------------------------+----------------+
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 48 65 6c 6c |{"message":"Hell|
|00000010| 6f 22 7d                                        |o"}             |
+--------+-------------------------------------------------+----------------+

This response received is constructed in 2 message frames

Frame 1

First frame is labeled as metadata shows the routing metadata ( request-response) sent to the server

Frame 2

Second frame shows the data we sent to the server ( here in this example it is “Hello”) a JSON string

Communication mode 3: Request-Stream

Development

This communication mode is for the communication where a client makes a single request and server responds with stream of responses. This is achieved by including below snippet (full code is available in the github repository : https://github.com/rockey5520/rsocket )

	@MessageMapping("request-stream")
	Flux<Message> stream(final Message message) {
		return Flux
				// create a new indexed Flux emitting one element every second
				.interval(Duration.ofSeconds(1))
				// create a Flux of new Messages using the indexed Flux
				.map(index -> new Message("You said: " + message.getMessage() + ". Response #" + index))
				// show what's happening
				.log();
	}

The key part of the above code is @MessageMapping("request-stream") where we are letting spring boot know that this method stream should be invoked when client makes a call using request-stream communication mode. Here we are using the POJO class Message as the payload for both receiving from client and send stream of responses of type Message every 1 second which is achieved by using Flux .

Testing

using the RSC client installed if you pass below instruction

rsc --debug --stream --data "{\"message\":\"Hello\"}" --route request-stream --stacktrace tcp://localhost:7000

should it run successfully you would see an ouput as below

2020-10-11 10:42:40.638 DEBUG --- [     parallel-2] i.r.FrameLogger : sending -> 
Frame => Stream ID: 1 Type: REQUEST_STREAM Flags: 0b100000000 Length: 51 InitialRequestN: 9223372036854775807
Metadata:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 0f 0e 72 65 71 75 65 73 74 2d 73 74 72 |.....request-str|
|00000010| 65 61 6d                                        |eam             |
+--------+-------------------------------------------------+----------------+
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 48 65 6c 6c |{"message":"Hell|
|00000010| 6f 22 7d                                        |o"}             |
+--------+-------------------------------------------------+----------------+
2020-10-11 10:42:41.749 DEBUG --- [actor-tcp-nio-1] i.r.FrameLogger : receiving -> 
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 69
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 59 6f 75 20 |{"message":"You |
|00000010| 73 61 69 64 3a 20 48 65 6c 6c 6f 2e 20 52 65 73 |said: Hello. Res|
|00000020| 70 6f 6e 73 65 20 23 30 22 2c 22 63 72 65 61 74 |ponse #0","creat|
|00000030| 65 64 22 3a 31 36 30 32 34 31 32 39 36 31 7d    |ed":1602412961} |
+--------+-------------------------------------------------+----------------+
{"message":"You said: Hello. Response #0","created":1602412961}
2020-10-11 10:42:42.707 DEBUG --- [actor-tcp-nio-1] i.r.FrameLogger : receiving -> 
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 69
Data:

This response received is a stream of responses back to rsc client.

Communication mode 4: Channel

Development

This communication mode is for the communication where client and server can send stream of messages, essentially streaming messages in both directions. This is achieved by including below snippet (full code is available in the github repository : https://github.com/rockey5520/rsocket )

	@MessageMapping("stream-stream")
	Flux<Message> channel(final Flux<Integer> input) {
		System.out.println("Received stream-stream (channel) request...");
		return settings
				.doOnNext(input -> System.out.println("Requested interval is {} seconds."+ setting))
				.doOnCancel(() -> System.out.println("The client cancelled the channel."))
				.switchMap(input -> Flux.interval(Duration.ofSeconds(input))
						.map(index -> new Message("Stream Response #" + index)))
				.log();
	}

The key part of the above code is @MessageMapping("stream-stream") where we are letting spring boot know that this method channel should be invoked when client makes a call using stream-stream communication mode.

Here we are returning Flux for each request(input) sent as part of stream creating an new outbound flux using the Duration sent in input payload there by achieving the back pressure where client controls the speed server response stream should be. This is a very good feature for the functionalities such as Video streaming where user controls the speed at which server should stream videos based on the client internet speed

Testing

using the RSC client installed if you pass below instruction

rsc --debug --channel --data - --route stream-stream --stacktrace tcp://localhost:7000

now command line waits for user input for interval in seconds, Given provided 1 server responds stream of messages every 1 seconds and while its streaming you could change to 10 and see server changes its streaming speed from 1 second to 10 seconds which is amazing when you need to build an application where client needs to apply backpressure and let server know the the speed at which it expects the responses.

rockey@ubuntu:~/projects/rsocket$ rsc --debug --channel --data - --route stream-stream --stacktrace tcp://localhost:7000
3
2020-10-11 11:06:39.843 DEBUG --- [     parallel-2] i.r.FrameLogger : sending -> 
Frame => Stream ID: 1 Type: REQUEST_CHANNEL Flags: 0b100000000 Length: 32 InitialRequestN: 9223372036854775807
Metadata:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 0e 0d 73 74 72 65 61 6d 2d 73 74 72 65 |.....stream-stre|
|00000010| 61 6d                                           |am              |
+--------+-------------------------------------------------+----------------+
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 33                                              |3               |
+--------+-------------------------------------------------+----------------+
2020-10-11 11:06:39.847 DEBUG --- [actor-tcp-nio-1] i.r.FrameLogger : receiving -> 
Frame => Stream ID: 1 Type: REQUEST_N Flags: 0b0 Length: 10 RequestN: 9223372036854775807
Data:

2020-10-11 11:06:42.849 DEBUG --- [actor-tcp-nio-1] i.r.FrameLogger : receiving -> 
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 59
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 53 74 72 65 |{"message":"Stre|
|00000010| 61 6d 20 52 65 73 70 6f 6e 73 65 20 23 30 22 2c |am Response #0",|
|00000020| 22 63 72 65 61 74 65 64 22 3a 31 36 30 32 34 31 |"created":160241|
|00000030| 34 34 30 32 7d                                  |4402}           |
+--------+-------------------------------------------------+----------------+
{"message":"Stream Response #0","created":1602414402}
10
2020-10-11 11:06:45.216 DEBUG --- [oundedElastic-1] i.r.FrameLogger : sending -> 
Frame => Stream ID: 1 Type: NEXT Flags: 0b100100000 Length: 29
Metadata:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 0e 0d 73 74 72 65 61 6d 2d 73 74 72 65 |.....stream-stre|
|00000010| 61 6d                                           |am              |
+--------+-------------------------------------------------+----------------+
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 31 30                                           |10              |
+--------+-------------------------------------------------+----------------+


This response received is a stream of responses back to rsc client.

Additional Reading material :

https://docs.spring.io/spring-framework/docs/current/spring-framework-reference/web-reactive.html#rsocket-annot-responders

https://github.com/benwilcock/springone-2020-rsocket-talk/tree/master/rsocket-server

https://www.youtube.com/redirect?redir_token=QUFFLUhqa0FGbnJFaklWdDNqc0tydDc3amNVb1Ywd3Nod3xBQ3Jtc0tuYlpZaTlHODVjWmpFSkRhSjJrVjdHWGhJNWFQT0VzZnBiRk5PZUxkM3FucFZBZWoyWENKaXBGMlJ0YmRQUmk0QS1uTnQxNjVRVWpzOFliRXM1R281cTZtOExTVThBUXBCZlE5QmptWVFabEhseWtfcw%3D%3D&q=https%3A%2F%2Fbit.ly%2Frsocket-blog&v=dp1lGH2OCUs&event=video_description