package org.springframework.messaging.rsocket.annotation.support;

import io.rsocket.ConnectionSetupPayload;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.frame.FrameType;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Publisher;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.NettyDataBuffer;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.ReactiveMessageHandler;
import org.springframework.messaging.handler.DestinationPatternsMessageCondition;
import org.springframework.messaging.handler.invocation.reactive.HandlerMethodReturnValueHandler;
import org.springframework.messaging.rsocket.MetadataExtractor;
import org.springframework.messaging.rsocket.PayloadUtils;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import org.springframework.util.RouteMatcher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:datasets/datasets-service.jar:BOOT-INF/lib/spring-messaging-5.3.2.jar:org/springframework/messaging/rsocket/annotation/support/MessagingRSocket.class */
class MessagingRSocket implements RSocket {
    private final MimeType dataMimeType;
    private final MimeType metadataMimeType;
    private final MetadataExtractor metadataExtractor;
    private final ReactiveMessageHandler messageHandler;
    private final RouteMatcher routeMatcher;
    private final RSocketRequester requester;
    private final RSocketStrategies strategies;

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessagingRSocket(MimeType mimeType, MimeType mimeType2, MetadataExtractor metadataExtractor, RSocketRequester rSocketRequester, ReactiveMessageHandler reactiveMessageHandler, RouteMatcher routeMatcher, RSocketStrategies rSocketStrategies) {
        Assert.notNull(mimeType, "'dataMimeType' is required");
        Assert.notNull(mimeType2, "'metadataMimeType' is required");
        Assert.notNull(metadataExtractor, "MetadataExtractor is required");
        Assert.notNull(rSocketRequester, "RSocketRequester is required");
        Assert.notNull(reactiveMessageHandler, "ReactiveMessageHandler is required");
        Assert.notNull(routeMatcher, "RouteMatcher is required");
        Assert.notNull(rSocketStrategies, "RSocketStrategies is required");
        this.dataMimeType = mimeType;
        this.metadataMimeType = mimeType2;
        this.metadataExtractor = metadataExtractor;
        this.requester = rSocketRequester;
        this.messageHandler = reactiveMessageHandler;
        this.routeMatcher = routeMatcher;
        this.strategies = rSocketStrategies;
    }

    public Mono<Void> handleConnectionSetupPayload(ConnectionSetupPayload connectionSetupPayload) {
        connectionSetupPayload.retain();
        return handle(connectionSetupPayload, FrameType.SETUP);
    }

    public Mono<Void> fireAndForget(Payload payload) {
        return handle(payload, FrameType.REQUEST_FNF);
    }

    public Mono<Payload> requestResponse(Payload payload) {
        return handleAndReply(payload, FrameType.REQUEST_RESPONSE, Flux.just(payload)).next();
    }

    public Flux<Payload> requestStream(Payload payload) {
        return handleAndReply(payload, FrameType.REQUEST_STREAM, Flux.just(payload));
    }

    public Flux<Payload> requestChannel(Publisher<Payload> publisher) {
        return Flux.from(publisher).switchOnFirst((signal, flux) -> {
            Payload payload = (Payload) signal.get();
            return payload == null ? flux : handleAndReply(payload, FrameType.REQUEST_CHANNEL, flux);
        });
    }

    public Mono<Void> metadataPush(Payload payload) {
        return handle(payload, FrameType.METADATA_PUSH);
    }

    private Mono<Void> handle(Payload payload, FrameType frameType) {
        MessageHeaders createHeaders = createHeaders(payload, frameType, null);
        DataBuffer retainDataAndReleasePayload = retainDataAndReleasePayload(payload);
        int refCount = refCount(retainDataAndReleasePayload);
        Message createMessage = MessageBuilder.createMessage(retainDataAndReleasePayload, createHeaders);
        return Mono.defer(() -> {
            return this.messageHandler.handleMessage(createMessage);
        }).doFinally(signalType -> {
            if (refCount(retainDataAndReleasePayload) == refCount) {
                DataBufferUtils.release(retainDataAndReleasePayload);
            }
        });
    }

    private int refCount(DataBuffer dataBuffer) {
        if (dataBuffer instanceof NettyDataBuffer) {
            return ((NettyDataBuffer) dataBuffer).getNativeBuffer().refCnt();
        }
        return 1;
    }

    private Flux<Payload> handleAndReply(Payload payload, FrameType frameType, Flux<Payload> flux) {
        AtomicReference<Flux<Payload>> atomicReference = new AtomicReference<>();
        MessageHeaders createHeaders = createHeaders(payload, frameType, atomicReference);
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        Message createMessage = MessageBuilder.createMessage(flux.map(this::retainDataAndReleasePayload).doOnSubscribe(subscription -> {
            atomicBoolean.set(true);
        }), createHeaders);
        return Mono.defer(() -> {
            return this.messageHandler.handleMessage(createMessage);
        }).doFinally(signalType -> {
            if (atomicBoolean.get()) {
                return;
            }
            payload.release();
        }).thenMany(Flux.defer(() -> {
            return atomicReference.get() != null ? (Publisher) atomicReference.get() : Mono.error(new IllegalStateException("Expected response"));
        }));
    }

    private DataBuffer retainDataAndReleasePayload(Payload payload) {
        return PayloadUtils.retainDataAndReleasePayload(payload, this.strategies.dataBufferFactory());
    }

    private MessageHeaders createHeaders(Payload payload, FrameType frameType, @Nullable AtomicReference<Flux<Payload>> atomicReference) {
        MessageHeaderAccessor messageHeaderAccessor = new MessageHeaderAccessor();
        messageHeaderAccessor.setLeaveMutable(true);
        Map<String, Object> extract = this.metadataExtractor.extract(payload, this.metadataMimeType);
        extract.putIfAbsent("route", "");
        for (Map.Entry<String, Object> entry : extract.entrySet()) {
            if (entry.getKey().equals("route")) {
                messageHeaderAccessor.setHeader(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, this.routeMatcher.parseRoute((String) entry.getValue()));
            } else {
                messageHeaderAccessor.setHeader(entry.getKey(), entry.getValue());
            }
        }
        messageHeaderAccessor.setContentType(this.dataMimeType);
        messageHeaderAccessor.setHeader(RSocketFrameTypeMessageCondition.FRAME_TYPE_HEADER, frameType);
        messageHeaderAccessor.setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, this.requester);
        if (atomicReference != null) {
            messageHeaderAccessor.setHeader(RSocketPayloadReturnValueHandler.RESPONSE_HEADER, atomicReference);
        }
        messageHeaderAccessor.setHeader(HandlerMethodReturnValueHandler.DATA_BUFFER_FACTORY_HEADER, this.strategies.dataBufferFactory());
        return messageHeaderAccessor.getMessageHeaders();
    }
}
