// field notes 2019·06·10 archive
Getting started with RSocket in Spring Boot
A walkthrough of building a Producer/Consumer pair with the early RSocket support in Spring Boot — request/response and streaming over a single connection.
Originally published at dev.to .
Introduction
RSocket is a binary protocol for use on byte stream transports such as TCP, WebSockets, and Aeron.
It enables the following symmetric interaction models via async message passing over a single connection:
- request/response (stream of 1)
- request/stream (finite stream of many)
- fire-and-forget (no response)
- event subscription (infinite stream of many)
It supports session resumption, to allow resuming long-lived streams across different transport connections. This is particularly useful for mobile⬄server communication when network connections drop, switch, and reconnect frequently.
In this tutorial, we will implement RSocket using the Java programming language.
Why Spring Boot
Although I could simply implement RSocket with a plain Java application, I chose Spring Boot because it’s a huge project on the JVM ecosystem. Spring Boot doesn’t have a stable version for RSocket yet, but that shouldn’t stop us experimenting with it.
Structure
Our project will consist of two sub-projects: the Consumer, who will do the requests, and the Producer, who will provide the Consumer with data.
Getting Started
If you want to easily get started with a Spring Boot project I recommend always using the Spring Initializr.
Configuring Gradle
First we have to configure our build.gradle and include the RSocket starter dependency from Spring Boot. For the Producer:
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-rsocket'
// ...
}
For the Consumer we should also include the reactive-web-starter dependency, because we want to display the data we receive from the Producer:
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-rsocket'
implementation 'org.springframework.boot:spring-boot-starter-webflux'
// ...
}
Configuration
Now let’s get our hands dirty and write some code.
Producer
First we have to configure on which port our Producer will receive new connections:
spring.rsocket.server.port=7000
spring.main.lazy-initialization=true
The spring.rsocket.server.port=7000 property tells RSocket to start listening on port 7000, and spring.main.lazy-initialization=true makes sure that Spring Boot initializes its beans lazily.
Consumer
Here we configure the RSocket client by creating two beans of type RSocket and RSocketRequester. The RSocket bean is used to create the connection to the Producer. The RSocketRequester is like our WebClient (if we are using Reactive) or RestTemplate (if we are using MVC) — it does the requests to the Producer.
@Bean
@SneakyThrows
RSocket rSocket() {
return RSocketFactory.connect()
.dataMimeType(MimeTypeUtils.APPLICATION_JSON_VALUE)
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(TcpClientTransport.create(new InetSocketAddress("127.0.0.1", 7000)))
.start()
.block();
}
@Bean
RSocketRequester requester(RSocketStrategies strategies) {
return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, strategies);
}
The Actual Code
Our two applications will use the following domain objects for their communication:
@Data
@NoArgsConstructor
@AllArgsConstructor
class GreetingsRequest {
private String name;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
class GreetingsResponse {
private String greeting;
}
Simple Communication
Let’s start with an easy example: a single Request, a single Response.
Producer
@MessageMapping("greet")
Mono<GreetingsResponse> greet(GreetingsRequest request) {
return Mono.just(new GreetingsResponse("Hello " + request.getName() + " @ " + Instant.now()));
}
If you’ve used WebSockets with Spring Boot, the @MessageMapping annotation should be familiar. Here, with @MessageMapping("greet") we specify the route "greet", which receives a GreetingsRequest and returns a single GreetingsResponse.
Consumer
@RequiredArgsConstructor
@RestController
class GreetingsRestController {
private final RSocketRequester requester;
@GetMapping("/greet/{name}")
public Publisher<GreetingsResponse> greet(@PathVariable String name) {
return requester
.route("greet")
.data(new GreetingsRequest(name))
.retrieveMono(GreetingsResponse.class);
}
}
A simple @RestController injects the RSocketRequester we configured earlier and uses it to do the request. We tell the requester to call the "greet" route configured on the Producer.
Communication with Stream
Delivering a stream of data with RSocket comes naturally and with not that much effort.
Producer
@MessageMapping("greet-stream")
Flux<GreetingsResponse> greetStream(GreetingsRequest request) {
return Flux.fromStream(Stream.generate(
() -> new GreetingsResponse("Hello " + request.getName() + " @ " + Instant.now())
)).delayElements(Duration.ofSeconds(1));
}
We configured our @MessageMapping as before, but now we return a Flux (don’t know what Mono and Flux are? Check this StackOverflow post).
We also use .delayElements() to simulate a delay between responses.
Consumer
@GetMapping(value = "/greet-stream/{name}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Publisher<GreetingsResponse> greetStream(@PathVariable String name) {
return requester
.route("greet-stream")
.data(new GreetingsRequest(name))
.retrieveFlux(GreetingsResponse.class);
}
Not much has changed from the other request to the Producer, clearly the .route("greet-stream"). But notice that our @GetMapping now produces text/event-stream instead of the default application/json.
Closing thoughts
That was it — I hope I gave you a clear explanation on how to use the new RSocket protocol.
You can find the project on GitHub: Petros0/springboot-rsocket.
Thanks for reading and have fun.