package org.springframework.integration.scattergather;

import java.util.function.Supplier;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.integration.IntegrationPatternType;
import org.springframework.integration.channel.FixedSubscriberChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.channel.ReactiveStreamsSubscribableChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.endpoint.PollingConsumer;
import org.springframework.integration.endpoint.ReactiveStreamsConsumer;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.support.management.ManageableLifecycle;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.InterceptableChannel;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

/* loaded from: input_file:datasets/datasets-service.jar:BOOT-INF/lib/spring-integration-core-5.4.2.jar:org/springframework/integration/scattergather/ScatterGatherHandler.class */
public class ScatterGatherHandler extends AbstractReplyProducingMessageHandler implements ManageableLifecycle {
    private static final String GATHER_RESULT_CHANNEL = "gatherResultChannel";
    private static final String ORIGINAL_ERROR_CHANNEL = "originalErrorChannel";
    private final MessageChannel scatterChannel;
    private final MessageHandler gatherer;
    private MessageChannel gatherChannel;
    private String errorChannelName;
    private long gatherTimeout;
    private AbstractEndpoint gatherEndpoint;

    public ScatterGatherHandler(MessageHandler messageHandler, MessageHandler messageHandler2) {
        this(new FixedSubscriberChannel(messageHandler), messageHandler2);
        Assert.notNull(messageHandler, "'scatterer' must not be null");
        checkClass(AopUtils.getTargetClass(messageHandler), "org.springframework.integration.router.RecipientListRouter", "scatterer");
    }

    public ScatterGatherHandler(MessageChannel messageChannel, MessageHandler messageHandler) {
        this.errorChannelName = "errorChannel";
        this.gatherTimeout = -1L;
        Assert.notNull(messageChannel, "'scatterChannel' must not be null");
        Assert.notNull(messageHandler, "'gatherer' must not be null");
        checkClass(AopUtils.getTargetClass(messageHandler), "org.springframework.integration.aggregator.AggregatingMessageHandler", "gatherer");
        this.scatterChannel = messageChannel;
        this.gatherer = messageHandler;
    }

    public void setGatherChannel(MessageChannel messageChannel) {
        this.gatherChannel = messageChannel;
    }

    public void setGatherTimeout(long j) {
        this.gatherTimeout = j;
    }

    public void setErrorChannelName(String str) {
        Assert.hasText(str, "'errorChannelName' must not be empty.");
        this.errorChannelName = str;
    }

    @Override // org.springframework.integration.handler.MessageHandlerSupport, org.springframework.integration.context.IntegrationObjectSupport, org.springframework.integration.support.context.NamedComponent
    public String getComponentType() {
        return "scatter-gather";
    }

    @Override // org.springframework.integration.handler.AbstractReplyProducingMessageHandler, org.springframework.integration.handler.MessageHandlerSupport, org.springframework.integration.IntegrationPattern
    public IntegrationPatternType getIntegrationPatternType() {
        return IntegrationPatternType.scatter_gather;
    }

    @Override // org.springframework.integration.handler.AbstractReplyProducingMessageHandler
    protected void doInit() {
        BeanFactory beanFactory = getBeanFactory();
        if (this.gatherChannel == null) {
            this.gatherChannel = new FixedSubscriberChannel(message -> {
                this.gatherer.handleMessage(enhanceScatterReplyMessage(message));
            });
        } else {
            Assert.isInstanceOf((Class<?>) InterceptableChannel.class, this.gatherChannel, (Supplier<String>) () -> {
                return "An injected 'gatherChannel' '" + this.gatherChannel + "' must be an 'InterceptableChannel' instance.";
            });
            ((InterceptableChannel) this.gatherChannel).addInterceptor(0, new ChannelInterceptor() { // from class: org.springframework.integration.scattergather.ScatterGatherHandler.1
                @Override // org.springframework.messaging.support.ChannelInterceptor
                public Message<?> preSend(Message<?> message2, MessageChannel messageChannel) {
                    return ScatterGatherHandler.this.enhanceScatterReplyMessage(message2);
                }
            });
            if (this.gatherChannel instanceof SubscribableChannel) {
                this.gatherEndpoint = new EventDrivenConsumer((SubscribableChannel) this.gatherChannel, this.gatherer);
            } else if (this.gatherChannel instanceof PollableChannel) {
                this.gatherEndpoint = new PollingConsumer((PollableChannel) this.gatherChannel, this.gatherer);
                ((PollingConsumer) this.gatherEndpoint).setReceiveTimeout(this.gatherTimeout);
            } else {
                if (!(this.gatherChannel instanceof ReactiveStreamsSubscribableChannel)) {
                    throw new BeanInitializationException("Unsupported 'gatherChannel' type '" + this.gatherChannel.getClass() + "'. 'SubscribableChannel', 'PollableChannel' or 'ReactiveStreamsSubscribableChannel' types are supported.");
                }
                this.gatherEndpoint = new ReactiveStreamsConsumer(this.gatherChannel, this.gatherer);
            }
            this.gatherEndpoint.setBeanFactory(beanFactory);
            this.gatherEndpoint.afterPropertiesSet();
        }
        ((MessageProducer) this.gatherer).setOutputChannel(new FixedSubscriberChannel(message2 -> {
            MessageChannel messageChannel = (MessageChannel) message2.getHeaders().get(GATHER_RESULT_CHANNEL, MessageChannel.class);
            if (messageChannel == null) {
                throw new MessageDeliveryException((Message<?>) message2, "The 'gatherResultChannel' header is required to deliver the gather result.");
            }
            this.messagingTemplate.send((MessagingTemplate) messageChannel, (Message<?>) message2);
        }));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Message<?> enhanceScatterReplyMessage(Message<?> message) {
        return getMessageBuilderFactory().fromMessage(message).setHeader("errorChannel", message.getHeaders().get(ORIGINAL_ERROR_CHANNEL)).build();
    }

    @Override // org.springframework.integration.handler.AbstractReplyProducingMessageHandler
    protected Object handleRequestMessage(Message<?> message) {
        MessageHeaders headers = message.getHeaders();
        QueueChannel queueChannel = new QueueChannel();
        this.messagingTemplate.send((MessagingTemplate) this.scatterChannel, getMessageBuilderFactory().fromMessage(message).setHeader(GATHER_RESULT_CHANNEL, queueChannel).setHeader(ORIGINAL_ERROR_CHANNEL, headers.getErrorChannel()).setReplyChannel(this.gatherChannel).setErrorChannelName(this.errorChannelName).build());
        Message<?> receive = queueChannel.receive(this.gatherTimeout);
        if (receive != null) {
            return getMessageBuilderFactory().fromMessage(receive).removeHeaders(GATHER_RESULT_CHANNEL, ORIGINAL_ERROR_CHANNEL, MessageHeaders.REPLY_CHANNEL, "errorChannel");
        }
        return null;
    }

    @Override // org.springframework.integration.support.management.ManageableLifecycle, org.springframework.context.Lifecycle
    public void start() {
        if (this.gatherEndpoint != null) {
            this.gatherEndpoint.start();
        }
    }

    @Override // org.springframework.integration.support.management.ManageableLifecycle, org.springframework.context.Lifecycle
    public void stop() {
        if (this.gatherEndpoint != null) {
            this.gatherEndpoint.stop();
        }
    }

    @Override // org.springframework.integration.support.management.ManageableLifecycle, org.springframework.context.Lifecycle
    public boolean isRunning() {
        return this.gatherEndpoint == null || this.gatherEndpoint.isRunning();
    }

    private static void checkClass(Class<?> cls, String str, String str2) throws LinkageError {
        try {
            Assert.isAssignable(ClassUtils.forName(str, ClassUtils.getDefaultClassLoader()), cls, (Supplier<String>) () -> {
                return "the '" + str2 + "' must be an " + str + " instance";
            });
        } catch (ClassNotFoundException e) {
            throw new IllegalStateException("The class for '" + str + "' cannot be loaded", e);
        }
    }
}
