Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RSocketFacrory.start() returns Mono<Rsocket> which is reconnect on each usage #730

Closed
vdshb opened this issue Dec 11, 2019 · 2 comments
Closed
Labels

Comments

@vdshb
Copy link

vdshb commented Dec 11, 2019

RSocketFacrory.start() returns Mono which returns new RSocket instance with new connection on each usage.

Shouldn't it keep established connection and return it whenever possible until connection closed?

E.g. I have simple spring client with rSocket and controller:

@Bean
    fun rSocket(): Mono<RSocket> {
        return RSocketFactory
                .connect()
                .dataMimeType(APPLICATION_JSON_VALUE)
                .metadataMimeType(MESSAGE_RSOCKET_COMPOSITE_METADATA.toString())
                .resume()
                .resumeSessionDuration(Duration.ofHours(2))
                .resumeStreamTimeout(Duration.ofHours(2))
                .frameDecoder(PayloadDecoder.ZERO_COPY)
                .transport(WebsocketClientTransport.create(7000))
                .start()
    }
@RestController
class MarketDataRestController(val rSocketMono: Mono<RSocket>, val rSocketStrategies: RSocketStrategies) {

@GetMapping(path = ["/feed/{stock}"], produces = [TEXT_EVENT_STREAM_VALUE])
    fun feed(@PathVariable("stock") stock: String): Flux<MarketData> {
        
         return rSocketMono.flatMapMany {
            RSocketRequester.wrap(it, MimeTypeUtils.APPLICATION_JSON, MimeType.valueOf(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.toString()), rSocketStrategies)
                    .route("feedMarketData")
                    .data(MarketDataRequest(stock))
                    .retrieveFlux(MarketData::class.java)
        }
    }
}

With this approach rSocket creates new connection for each request and doesn't uses multiple streams inside single connection.

@OlegDokuka
Copy link
Member

Personally, I would recommend caching the created RSocket instance like the following:

@Bean
fun rSocket(): Mono<RSocket> {
    return RSocketFactory
                .connect()
                .dataMimeType(APPLICATION_JSON_VALUE)
                .metadataMimeType(MESSAGE_RSOCKET_COMPOSITE_METADATA.toString())
                .resume()
                .resumeSessionDuration(Duration.ofHours(2))
                .resumeStreamTimeout(Duration.ofHours(2))
                .frameDecoder(PayloadDecoder.ZERO_COPY)
                .transport(WebsocketClientTransport.create(7000))
                .start()
                .cache()
}

However, such usage has its own downside. For example, if your connection gets closed for any reason, then you will not be able to reconnect using the given Mono anymore.

Right now I'm working on a version of ReaconnectingRSocket which will take care of such things (#685), but it is not here yet, and implementation mentioned in the PR has a couple of bugs

@vdshb
Copy link
Author

vdshb commented Dec 11, 2019

Thank you for your response!

With this info I've manage to make a hacky but sustainable Spring configuration which is reconnected only when previous connection is lost.

Maybe someone finds it useful until official solution released:

@Configuration
class RSocketClientConfiguration {

    private val connectedRSocketMonoReference = AtomicReference<Mono<RSocket>>()

    @Bean
    @Qualifier("factory")
    fun rSocketMonoFactory(): Mono<RSocket> {
        return RSocketFactory
                .connect()
                .dataMimeType(APPLICATION_JSON_VALUE)
                .metadataMimeType(MESSAGE_RSOCKET_COMPOSITE_METADATA.toString())
                .resume()
                .resumeSessionDuration(Duration.ofHours(2))
                .resumeStreamTimeout(Duration.ofHours(2))
                .frameDecoder(PayloadDecoder.ZERO_COPY)
                .transport(WebsocketClientTransport.create(7000))
                .start()
    }

    @Bean
    @Primary
    fun rSocketMono(@Qualifier("factory") rSocketMono: Mono<RSocket>): Mono<RSocket> {
        return Mono.just(1)
                .flatMap {
                    var resultMono: Mono<RSocket>
                    val connectedRSocketMono = connectedRSocketMonoReference.get()
                    if (connectedRSocketMono == null) {
                        resultMono = rSocketMono.cache()
                        connectedRSocketMonoReference.set(resultMono)
                    } else {
                        resultMono = connectedRSocketMono.flatMap { connectedRSocket ->
                            var result: Mono<RSocket> = connectedRSocketMono
                            if (connectedRSocket.isDisposed) {
                                result = rSocketMono.cache()
                                connectedRSocketMonoReference.set(result)
                            }
                            result
                        }
                    }
                    resultMono
                }
    }
}

@vdshb vdshb closed this as completed Dec 11, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants