package org.springframework.integration.gateway;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.reactivestreams.Subscriber;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.core.AttributeAccessor;
import org.springframework.integration.MessageTimeoutException;
import org.springframework.integration.channel.ReactiveStreamsSubscribableChannel;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.endpoint.PollingConsumer;
import org.springframework.integration.endpoint.ReactiveStreamsConsumer;
import org.springframework.integration.handler.BridgeHandler;
import org.springframework.integration.history.HistoryWritingMessagePostProcessor;
import org.springframework.integration.mapping.InboundMessageMapper;
import org.springframework.integration.mapping.MessageMappingException;
import org.springframework.integration.mapping.OutboundMessageMapper;
import org.springframework.integration.support.DefaultErrorMessageStrategy;
import org.springframework.integration.support.DefaultMessageBuilderFactory;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageUtils;
import org.springframework.integration.support.MessageBuilderFactory;
import org.springframework.integration.support.MutableMessageBuilder;
import org.springframework.integration.support.converter.SimpleMessageConverter;
import org.springframework.integration.support.management.IntegrationManagedResource;
import org.springframework.integration.support.management.IntegrationManagement;
import org.springframework.integration.support.management.MessageSourceMetrics;
import org.springframework.integration.support.management.TrackableComponent;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.core.MessagePostProcessor;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import reactor.core.publisher.Mono;

@IntegrationManagedResource
/* loaded from: input_file:datasets/datasets-service-10.0.2-SNAPSHOT.jar:BOOT-INF/lib/spring-integration-core-5.1.6.RELEASE.jar:org/springframework/integration/gateway/MessagingGatewaySupport.class */
public abstract class MessagingGatewaySupport extends AbstractEndpoint implements TrackableComponent, MessageSourceMetrics {
    private static final long DEFAULT_TIMEOUT = 1000;
    protected final MessagingTemplate messagingTemplate;
    private final SimpleMessageConverter messageConverter;
    private final HistoryWritingMessagePostProcessor historyWritingPostProcessor;
    private final Object replyMessageCorrelatorMonitor;
    private final boolean errorOnTimeout;
    private final AtomicLong messageCount;
    private final IntegrationManagement.ManagementOverrides managementOverrides;
    private ErrorMessageStrategy errorMessageStrategy;
    private MessageChannel requestChannel;
    private String requestChannelName;
    private MessageChannel replyChannel;
    private String replyChannelName;
    private MessageChannel errorChannel;
    private String errorChannelName;
    private long replyTimeout;
    private InboundMessageMapper<Object> requestMapper;
    private volatile AbstractEndpoint replyMessageCorrelator;
    private String managedType;
    private String managedName;
    private boolean countsEnabled;
    private boolean loggingEnabled;
    private volatile boolean initialized;

    /* loaded from: input_file:datasets/datasets-service-10.0.2-SNAPSHOT.jar:BOOT-INF/lib/spring-integration-core-5.1.6.RELEASE.jar:org/springframework/integration/gateway/MessagingGatewaySupport$DefaultRequestMapper.class */
    private static class DefaultRequestMapper implements InboundMessageMapper<Object> {
        private volatile MessageBuilderFactory messageBuilderFactory = new DefaultMessageBuilderFactory();

        DefaultRequestMapper() {
        }

        void setMessageBuilderFactory(MessageBuilderFactory messageBuilderFactory) {
            this.messageBuilderFactory = messageBuilderFactory;
        }

        @Override // org.springframework.integration.mapping.InboundMessageMapper
        public Message<?> toMessage(Object obj, @Nullable Map<String, Object> map) {
            return obj instanceof Message ? (Message) obj : this.messageBuilderFactory.withPayload(obj).copyHeadersIfAbsent(map).build();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:datasets/datasets-service-10.0.2-SNAPSHOT.jar:BOOT-INF/lib/spring-integration-core-5.1.6.RELEASE.jar:org/springframework/integration/gateway/MessagingGatewaySupport$FutureReplyChannel.class */
    public static class FutureReplyChannel implements MessageChannel {
        private final CompletableFuture<Message<?>> messageFuture = new CompletableFuture<>();

        FutureReplyChannel() {
        }

        @Override // org.springframework.messaging.MessageChannel
        public boolean send(Message<?> message, long j) {
            return this.messageFuture.complete(message);
        }
    }

    public MessagingGatewaySupport() {
        this(false);
    }

    public MessagingGatewaySupport(boolean z) {
        this.messageConverter = new SimpleMessageConverter();
        this.historyWritingPostProcessor = new HistoryWritingMessagePostProcessor();
        this.replyMessageCorrelatorMonitor = new Object();
        this.messageCount = new AtomicLong();
        this.managementOverrides = new IntegrationManagement.ManagementOverrides();
        this.errorMessageStrategy = new DefaultErrorMessageStrategy();
        this.replyTimeout = 1000L;
        this.requestMapper = new DefaultRequestMapper();
        this.loggingEnabled = true;
        MessagingTemplate messagingTemplate = new MessagingTemplate();
        messagingTemplate.setMessageConverter(this.messageConverter);
        messagingTemplate.setSendTimeout(1000L);
        messagingTemplate.setReceiveTimeout(this.replyTimeout);
        this.messagingTemplate = messagingTemplate;
        this.errorOnTimeout = z;
    }

    public void setRequestChannel(MessageChannel messageChannel) {
        this.requestChannel = messageChannel;
    }

    public void setRequestChannelName(String str) {
        Assert.hasText(str, "'requestChannelName' must not be empty");
        this.requestChannelName = str;
    }

    public void setReplyChannel(MessageChannel messageChannel) {
        this.replyChannel = messageChannel;
    }

    public void setReplyChannelName(String str) {
        Assert.hasText(str, "'replyChannelName' must not be empty");
        this.replyChannelName = str;
    }

    public void setErrorChannel(MessageChannel messageChannel) {
        this.errorChannel = messageChannel;
    }

    public void setErrorChannelName(String str) {
        Assert.hasText(str, "'errorChannelName' must not be empty");
        this.errorChannelName = str;
    }

    public void setRequestTimeout(long j) {
        this.messagingTemplate.setSendTimeout(j);
    }

    public void setReplyTimeout(long j) {
        this.replyTimeout = j;
        this.messagingTemplate.setReceiveTimeout(j);
    }

    public void setRequestMapper(@Nullable InboundMessageMapper<?> inboundMessageMapper) {
        if (inboundMessageMapper != null) {
            this.requestMapper = inboundMessageMapper;
        }
        this.messageConverter.setInboundMessageMapper(this.requestMapper);
    }

    public void setReplyMapper(OutboundMessageMapper<?> outboundMessageMapper) {
        this.messageConverter.setOutboundMessageMapper(outboundMessageMapper);
    }

    @Override // org.springframework.integration.support.management.TrackableComponent
    public void setShouldTrack(boolean z) {
        this.historyWritingPostProcessor.setShouldTrack(z);
    }

    @Override // org.springframework.integration.support.management.MessageSourceMetrics
    public int getMessageCount() {
        return (int) this.messageCount.get();
    }

    @Override // org.springframework.integration.support.management.MessageSourceMetrics
    public long getMessageCountLong() {
        return this.messageCount.get();
    }

    @Override // org.springframework.integration.support.management.MessageSourceMetrics
    public void setManagedName(String str) {
        this.managedName = str;
    }

    @Override // org.springframework.integration.support.management.MessageSourceMetrics
    public String getManagedName() {
        return this.managedName;
    }

    @Override // org.springframework.integration.support.management.MessageSourceMetrics
    public void setManagedType(String str) {
        this.managedType = str;
    }

    @Override // org.springframework.integration.support.management.MessageSourceMetrics
    public String getManagedType() {
        return this.managedType;
    }

    @Override // org.springframework.integration.context.IntegrationObjectSupport, org.springframework.integration.support.context.NamedComponent
    public String getComponentType() {
        return "gateway";
    }

    @Override // org.springframework.integration.support.management.IntegrationManagement
    public void setLoggingEnabled(boolean z) {
        this.loggingEnabled = z;
        this.managementOverrides.loggingConfigured = true;
    }

    @Override // org.springframework.integration.support.management.IntegrationManagement
    public boolean isLoggingEnabled() {
        return this.loggingEnabled;
    }

    @Override // org.springframework.integration.support.management.IntegrationManagement
    public void setCountsEnabled(boolean z) {
        this.countsEnabled = z;
        this.managementOverrides.countsConfigured = true;
    }

    @Override // org.springframework.integration.support.management.IntegrationManagement
    public boolean isCountsEnabled() {
        return this.countsEnabled;
    }

    public final void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrategy) {
        Assert.notNull(errorMessageStrategy, "'errorMessageStrategy' cannot be null");
        this.errorMessageStrategy = errorMessageStrategy;
    }

    @Override // org.springframework.integration.support.management.IntegrationManagement
    public IntegrationManagement.ManagementOverrides getOverrides() {
        return this.managementOverrides;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.integration.endpoint.AbstractEndpoint, org.springframework.integration.context.IntegrationObjectSupport
    public void onInit() {
        Assert.state(this.requestChannelName == null || this.requestChannel == null, "'requestChannelName' and 'requestChannel' are mutually exclusive.");
        Assert.state(this.replyChannelName == null || this.replyChannel == null, "'replyChannelName' and 'replyChannel' are mutually exclusive.");
        Assert.state(this.errorChannelName == null || this.errorChannel == null, "'errorChannelName' and 'errorChannel' are mutually exclusive.");
        this.historyWritingPostProcessor.setTrackableComponent(this);
        MessageBuilderFactory messageBuilderFactory = getMessageBuilderFactory();
        this.historyWritingPostProcessor.setMessageBuilderFactory(messageBuilderFactory);
        BeanFactory beanFactory = getBeanFactory();
        if (beanFactory != null) {
            this.messagingTemplate.setBeanFactory(beanFactory);
            if (this.requestMapper instanceof DefaultRequestMapper) {
                ((DefaultRequestMapper) this.requestMapper).setMessageBuilderFactory(messageBuilderFactory);
            }
            this.messageConverter.setBeanFactory(beanFactory);
        }
        this.initialized = true;
    }

    private void initializeIfNecessary() {
        if (this.initialized) {
            return;
        }
        afterPropertiesSet();
    }

    @Nullable
    public MessageChannel getRequestChannel() {
        if (this.requestChannel == null && this.requestChannelName != null) {
            this.requestChannel = getChannelResolver().resolveDestination(this.requestChannelName);
        }
        return this.requestChannel;
    }

    @Nullable
    public MessageChannel getReplyChannel() {
        if (this.replyChannel == null && this.replyChannelName != null) {
            this.replyChannel = getChannelResolver().resolveDestination(this.replyChannelName);
        }
        return this.replyChannel;
    }

    @Nullable
    public MessageChannel getErrorChannel() {
        if (this.errorChannel == null && this.errorChannelName != null) {
            this.errorChannel = getChannelResolver().resolveDestination(this.errorChannelName);
        }
        return this.errorChannel;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void send(Object obj) {
        initializeIfNecessary();
        Assert.notNull(obj, "request must not be null");
        MessageChannel requestChannel = getRequestChannel();
        Assert.state(requestChannel != null, "send is not supported, because no request channel has been configured");
        try {
            if (this.countsEnabled) {
                this.messageCount.incrementAndGet();
            }
            this.messagingTemplate.convertAndSend((MessagingTemplate) requestChannel, obj, (MessagePostProcessor) this.historyWritingPostProcessor);
        } catch (Exception e) {
            MessageChannel errorChannel = getErrorChannel();
            if (errorChannel != null) {
                this.messagingTemplate.send((MessagingTemplate) errorChannel, (Message<?>) new ErrorMessage(e));
            } else {
                rethrow(e, "failed to send message");
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Nullable
    public Object receive() {
        initializeIfNecessary();
        MessageChannel replyChannel = getReplyChannel();
        assertPollableChannel(replyChannel);
        return this.messagingTemplate.receiveAndConvert((MessagingTemplate) replyChannel, Object.class);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Nullable
    public Message<?> receiveMessage() {
        initializeIfNecessary();
        MessageChannel replyChannel = getReplyChannel();
        assertPollableChannel(replyChannel);
        return this.messagingTemplate.receive((MessagingTemplate) replyChannel);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Nullable
    public Object receive(long j) {
        initializeIfNecessary();
        MessageChannel replyChannel = getReplyChannel();
        assertPollableChannel(replyChannel);
        return this.messagingTemplate.receiveAndConvert(replyChannel, j);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Nullable
    public Message<?> receiveMessage(long j) {
        initializeIfNecessary();
        MessageChannel replyChannel = getReplyChannel();
        assertPollableChannel(replyChannel);
        return this.messagingTemplate.receive(replyChannel, j);
    }

    private void assertPollableChannel(@Nullable MessageChannel messageChannel) {
        Assert.state(messageChannel instanceof PollableChannel, "receive is not supported, because no pollable reply channel has been configured");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Nullable
    public Object sendAndReceive(Object obj) {
        return doSendAndReceive(obj, true);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Nullable
    public Message<?> sendAndReceiveMessage(Object obj) {
        return (Message) doSendAndReceive(obj, false);
    }

    @Nullable
    private Object doSendAndReceive(Object obj, boolean z) {
        Object obj2;
        initializeIfNecessary();
        Assert.notNull(obj, "request must not be null");
        MessageChannel requestChannel = getRequestChannel();
        if (requestChannel == null) {
            throw new MessagingException("No request channel available. Cannot send request message.");
        }
        registerReplyMessageCorrelatorIfNecessary();
        Message<?> message = null;
        try {
            if (this.countsEnabled) {
                this.messageCount.incrementAndGet();
            }
            if (z) {
                obj2 = this.messagingTemplate.convertSendAndReceive((MessagingTemplate) requestChannel, obj, (Class<Object>) Object.class, (MessagePostProcessor) this.historyWritingPostProcessor);
            } else {
                Message<?> message2 = obj instanceof Message ? (Message) obj : this.requestMapper.toMessage(obj);
                Assert.state(message2 != null, (Supplier<String>) () -> {
                    return "request mapper resulted in no message for " + obj;
                });
                message = this.historyWritingPostProcessor.postProcessMessage(message2);
                obj2 = this.messagingTemplate.sendAndReceive(requestChannel, message);
            }
            if (obj2 == null && this.errorOnTimeout) {
                throwMessageTimeoutException(obj, "No reply received within timeout");
            }
        } catch (Exception e) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("failure occurred in gateway sendAndReceive: " + e.getMessage());
            }
            obj2 = e;
        }
        if ((obj2 instanceof Throwable) || (obj2 instanceof ErrorMessage)) {
            return handleSendAndReceiveError(obj, message, obj2 instanceof ErrorMessage ? ((ErrorMessage) obj2).getPayload() : (Throwable) obj2, z);
        }
        return obj2;
    }

    @Nullable
    private Object handleSendAndReceiveError(Object obj, @Nullable Message<?> message, Throwable th, boolean z) {
        MessageChannel errorChannel = getErrorChannel();
        if (errorChannel != null) {
            Message<?> sendErrorMessageAndReceive = sendErrorMessageAndReceive(errorChannel, buildErrorMessage(message, th));
            if (sendErrorMessageAndReceive != null || !this.errorOnTimeout) {
                return (!z || sendErrorMessageAndReceive == null) ? sendErrorMessageAndReceive : sendErrorMessageAndReceive.getPayload();
            }
            throwMessageTimeoutException(obj, "No reply received from error channel within timeout");
            return null;
        }
        Throwable th2 = th;
        if ((th instanceof MessagingException) && message != null && message.getHeaders().getErrorChannel() != null) {
            th2 = new MessageHandlingException(message, th);
        }
        rethrow(th2, "gateway received checked Exception");
        return null;
    }

    @Nullable
    private Message<?> sendErrorMessageAndReceive(MessageChannel messageChannel, ErrorMessage errorMessage) {
        try {
            Message<?> sendAndReceive = this.messagingTemplate.sendAndReceive(messageChannel, (Message<?>) errorMessage);
            if (sendAndReceive != null && (sendAndReceive.getPayload() instanceof Throwable)) {
                rethrow((Throwable) sendAndReceive.getPayload(), "error flow returned an Error Message");
            }
            return sendAndReceive;
        } catch (Exception e) {
            throw new MessagingException(errorMessage, "failure occurred in error-handling flow", e);
        }
    }

    private void throwMessageTimeoutException(Object obj, String str) {
        if (!(obj instanceof Message)) {
            throw new MessageTimeoutException(str);
        }
        throw new MessageTimeoutException((Message) obj, str);
    }

    protected Mono<Message<?>> sendAndReceiveMessageReactive(Object obj) {
        initializeIfNecessary();
        Assert.notNull(obj, "request must not be null");
        MessageChannel requestChannel = getRequestChannel();
        if (requestChannel == null) {
            throw new MessagingException("No request channel available. Cannot send request message.");
        }
        registerReplyMessageCorrelatorIfNecessary();
        return doSendAndReceiveMessageReactive(requestChannel, obj, false);
    }

    private Mono<Message<?>> doSendAndReceiveMessageReactive(MessageChannel messageChannel, Object obj, boolean z) {
        return Mono.defer(() -> {
            try {
                Message<?> postProcessMessage = this.historyWritingPostProcessor.postProcessMessage(obj instanceof Message ? (Message) obj : this.requestMapper.toMessage(obj));
                Object replyChannel = postProcessMessage.getHeaders().getReplyChannel();
                Object errorChannel = postProcessMessage.getHeaders().getErrorChannel();
                FutureReplyChannel futureReplyChannel = new FutureReplyChannel();
                Message<?> build = MutableMessageBuilder.fromMessage(postProcessMessage).setReplyChannel(futureReplyChannel).setHeader(this.messagingTemplate.getSendTimeoutHeader(), null).setHeader(this.messagingTemplate.getReceiveTimeoutHeader(), null).setErrorChannel(futureReplyChannel).build();
                sendMessageForReactiveFlow(messageChannel, build);
                return buildReplyMono(build, futureReplyChannel, z, replyChannel, errorChannel);
            } catch (Exception e) {
                throw new MessageMappingException("Cannot map to message: " + obj, e);
            }
        });
    }

    private void sendMessageForReactiveFlow(MessageChannel messageChannel, Message<?> message) {
        if (messageChannel instanceof ReactiveStreamsSubscribableChannel) {
            ((ReactiveStreamsSubscribableChannel) messageChannel).subscribeTo(Mono.just(message));
            return;
        }
        long sendTimeout = sendTimeout(message);
        if (!(sendTimeout >= 0 ? messageChannel.send(message, sendTimeout) : messageChannel.send(message))) {
            throw new MessageDeliveryException(message, "Failed to send message to channel '" + messageChannel + "' within timeout: " + sendTimeout);
        }
    }

    private Mono<Message<?>> buildReplyMono(Message<?> message, FutureReplyChannel futureReplyChannel, boolean z, @Nullable Object obj, @Nullable Object obj2) {
        return Mono.fromFuture(futureReplyChannel.messageFuture).doOnSubscribe(subscription -> {
            if (z || !this.countsEnabled) {
                return;
            }
            this.messageCount.incrementAndGet();
        }).map(message2 -> {
            if (z || !(message2 instanceof ErrorMessage)) {
                return MessageBuilder.fromMessage(message2).setHeader(MessageHeaders.REPLY_CHANNEL, obj).setHeader("errorChannel", obj2).build();
            }
            ErrorMessage errorMessage = (ErrorMessage) message2;
            if (errorMessage.getPayload() instanceof MessagingException) {
                throw ((MessagingException) errorMessage.getPayload());
            }
            throw new MessagingException((Message<?>) message, errorMessage.getPayload());
        }).onErrorResume(th -> {
            return z ? Mono.error(th) : handleSendError(message, th);
        });
    }

    private Mono<Message<?>> handleSendError(Message<?> message, Throwable th) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("failure occurred in gateway sendAndReceiveReactive: " + th.getMessage());
        }
        MessageChannel errorChannel = getErrorChannel();
        if (errorChannel == null) {
            throw wrapExceptionIfNecessary(th, "gateway received checked Exception");
        }
        ErrorMessage buildErrorMessage = buildErrorMessage(message, th);
        try {
            return doSendAndReceiveMessageReactive(errorChannel, buildErrorMessage, true);
        } catch (Exception e) {
            throw new MessagingException(buildErrorMessage, "failure occurred in error-handling flow", e);
        }
    }

    private long sendTimeout(Message<?> message) {
        Long headerToLong = headerToLong(message.getHeaders().get(this.messagingTemplate.getSendTimeoutHeader()));
        return headerToLong != null ? headerToLong.longValue() : this.messagingTemplate.getSendTimeout();
    }

    @Nullable
    private Long headerToLong(@Nullable Object obj) {
        if (obj instanceof Number) {
            return Long.valueOf(((Number) obj).longValue());
        }
        if (obj instanceof String) {
            return Long.valueOf(Long.parseLong((String) obj));
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final ErrorMessage buildErrorMessage(@Nullable Message<?> message, Throwable th) {
        return this.errorMessageStrategy.buildErrorMessage(th, getErrorMessageAttributes(message));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AttributeAccessor getErrorMessageAttributes(@Nullable Message<?> message) {
        return ErrorMessageUtils.getAttributeAccessor(message, null);
    }

    private void rethrow(Throwable th, String str) {
        throw wrapExceptionIfNecessary(th, str);
    }

    private RuntimeException wrapExceptionIfNecessary(Throwable th, String str) {
        return th instanceof RuntimeException ? (RuntimeException) th : new MessagingException(str, th);
    }

    protected void registerReplyMessageCorrelatorIfNecessary() {
        AbstractEndpoint abstractEndpoint;
        MessageChannel replyChannel = getReplyChannel();
        if (replyChannel == null || this.replyMessageCorrelator != null) {
            return;
        }
        synchronized (this.replyMessageCorrelatorMonitor) {
            if (this.replyMessageCorrelator != null) {
                return;
            }
            BridgeHandler bridgeHandler = new BridgeHandler();
            BeanFactory beanFactory = getBeanFactory();
            if (beanFactory != null) {
                bridgeHandler.setBeanFactory(beanFactory);
            }
            bridgeHandler.afterPropertiesSet();
            if (replyChannel instanceof SubscribableChannel) {
                abstractEndpoint = new EventDrivenConsumer((SubscribableChannel) replyChannel, bridgeHandler);
            } else if (replyChannel instanceof PollableChannel) {
                PollingConsumer pollingConsumer = new PollingConsumer((PollableChannel) replyChannel, bridgeHandler);
                if (beanFactory != null) {
                    pollingConsumer.setBeanFactory(beanFactory);
                }
                pollingConsumer.setReceiveTimeout(this.replyTimeout);
                pollingConsumer.afterPropertiesSet();
                abstractEndpoint = pollingConsumer;
            } else {
                if (!(replyChannel instanceof ReactiveStreamsSubscribableChannel)) {
                    throw new MessagingException("Unsupported 'replyChannel' type [" + replyChannel.getClass() + "].SubscribableChannel or PollableChannel type are supported.");
                }
                AbstractEndpoint reactiveStreamsConsumer = new ReactiveStreamsConsumer(replyChannel, (Subscriber<Message<?>>) bridgeHandler);
                reactiveStreamsConsumer.afterPropertiesSet();
                abstractEndpoint = reactiveStreamsConsumer;
            }
            this.replyMessageCorrelator = abstractEndpoint;
            if (isRunning()) {
                this.replyMessageCorrelator.start();
            }
        }
    }

    @Override // org.springframework.integration.endpoint.AbstractEndpoint
    protected void doStart() {
        if (this.replyMessageCorrelator != null) {
            this.replyMessageCorrelator.start();
        }
    }

    @Override // org.springframework.integration.endpoint.AbstractEndpoint
    protected void doStop() {
        if (this.replyMessageCorrelator != null) {
            this.replyMessageCorrelator.stop();
        }
    }

    @Override // org.springframework.integration.support.management.IntegrationManagement
    public void reset() {
        this.messageCount.set(0L);
    }
}
