package org.springframework.integration.aggregator;

import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.locks.Lock;
import java.util.function.Supplier;
import org.aopalliance.aop.Advice;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.Lifecycle;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.channel.NullChannel;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.handler.AbstractMessageProducingHandler;
import org.springframework.integration.handler.DiscardingMessageHandler;
import org.springframework.integration.store.MessageGroup;
import org.springframework.integration.store.MessageGroupStore;
import org.springframework.integration.store.SimpleMessageGroup;
import org.springframework.integration.store.SimpleMessageStore;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.integration.support.locks.DefaultLockRegistry;
import org.springframework.integration.support.locks.LockRegistry;
import org.springframework.integration.util.UUIDConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.core.DestinationResolutionException;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:datasets/datasets-service.jar:BOOT-INF/lib/spring-integration-core-5.2.3.RELEASE.jar:org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.class */
public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageProducingHandler implements DiscardingMessageHandler, ApplicationEventPublisherAware, Lifecycle {
    private final Comparator<Message<?>> sequenceNumberComparator;
    private final Map<UUID, ScheduledFuture<?>> expireGroupScheduledFutures;
    private MessageGroupProcessor outputProcessor;
    private MessageGroupStore messageStore;
    private CorrelationStrategy correlationStrategy;
    private ReleaseStrategy releaseStrategy;
    private boolean releaseStrategySet;
    private MessageChannel discardChannel;
    private String discardChannelName;
    private boolean sendPartialResultOnExpiry;
    private boolean sequenceAware;
    private LockRegistry lockRegistry;
    private boolean lockRegistrySet;
    private long minimumTimeoutForEmptyGroups;
    private boolean releasePartialSequences;
    private Expression groupTimeoutExpression;
    private List<Advice> forceReleaseAdviceChain;
    private MessageGroupProcessor forceReleaseProcessor;
    private EvaluationContext evaluationContext;
    private ApplicationEventPublisher applicationEventPublisher;
    private boolean expireGroupsUponTimeout;
    private boolean popSequence;
    private boolean releaseLockBeforeSend;
    private volatile boolean running;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:datasets/datasets-service.jar:BOOT-INF/lib/spring-integration-core-5.2.3.RELEASE.jar:org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler$ForceReleaseMessageGroupProcessor.class */
    public class ForceReleaseMessageGroupProcessor implements MessageGroupProcessor {
        ForceReleaseMessageGroupProcessor() {
        }

        @Override // org.springframework.integration.aggregator.MessageGroupProcessor
        public Object processMessageGroup(MessageGroup messageGroup) {
            AbstractCorrelatingMessageHandler.this.forceComplete(messageGroup);
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:datasets/datasets-service.jar:BOOT-INF/lib/spring-integration-core-5.2.3.RELEASE.jar:org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler$SequenceAwareMessageGroup.class */
    public static class SequenceAwareMessageGroup extends SimpleMessageGroup {
        private final SimpleMessageGroup sourceGroup;

        public SequenceAwareMessageGroup(MessageGroup messageGroup) {
            super(messageGroup.getMessages(), null, messageGroup.getGroupId(), messageGroup.getTimestamp(), messageGroup.isComplete(), true);
            if (messageGroup instanceof SimpleMessageGroup) {
                this.sourceGroup = (SimpleMessageGroup) messageGroup;
            } else {
                this.sourceGroup = null;
            }
        }

        @Override // org.springframework.integration.store.SimpleMessageGroup, org.springframework.integration.store.MessageGroup
        public boolean canAdd(Message<?> message) {
            Integer num;
            if (size() == 0 || (num = (Integer) message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, Integer.class)) == null || num.intValue() <= 0) {
                return true;
            }
            Integer num2 = (Integer) message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, Integer.class);
            if (num2 == null) {
                num2 = 0;
            }
            return num2.equals(Integer.valueOf(getSequenceSize())) && (this.sourceGroup == null ? !containsSequenceNumber(getMessages(), num) : !this.sourceGroup.containsSequence(num));
        }

        private boolean containsSequenceNumber(Collection<Message<?>> collection, Integer num) {
            Iterator<Message<?>> it = collection.iterator();
            while (it.hasNext()) {
                if (num.equals(it.next().getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, Integer.class))) {
                    return true;
                }
            }
            return false;
        }
    }

    public AbstractCorrelatingMessageHandler(MessageGroupProcessor messageGroupProcessor, MessageGroupStore messageGroupStore, CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
        this.sequenceNumberComparator = new MessageSequenceComparator();
        this.expireGroupScheduledFutures = new ConcurrentHashMap();
        this.lockRegistry = new DefaultLockRegistry();
        this.lockRegistrySet = false;
        this.forceReleaseProcessor = new ForceReleaseMessageGroupProcessor();
        this.expireGroupsUponTimeout = true;
        this.popSequence = true;
        Assert.notNull(messageGroupProcessor, "'processor' must not be null");
        Assert.notNull(messageGroupStore, "'store' must not be null");
        setMessageStore(messageGroupStore);
        this.outputProcessor = messageGroupProcessor;
        this.correlationStrategy = correlationStrategy == null ? new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) : correlationStrategy;
        this.releaseStrategy = releaseStrategy == null ? new SimpleSequenceSizeReleaseStrategy() : releaseStrategy;
        this.releaseStrategySet = releaseStrategy != null;
        this.sequenceAware = this.releaseStrategy instanceof SequenceSizeReleaseStrategy;
    }

    public AbstractCorrelatingMessageHandler(MessageGroupProcessor messageGroupProcessor, MessageGroupStore messageGroupStore) {
        this(messageGroupProcessor, messageGroupStore, null, null);
    }

    public AbstractCorrelatingMessageHandler(MessageGroupProcessor messageGroupProcessor) {
        this(messageGroupProcessor, new SimpleMessageStore(0), null, null);
    }

    public void setLockRegistry(LockRegistry lockRegistry) {
        Assert.isTrue(!this.lockRegistrySet, "'this.lockRegistry' can not be reset once its been set");
        Assert.notNull(lockRegistry, "'lockRegistry' must not be null");
        this.lockRegistry = lockRegistry;
        this.lockRegistrySet = true;
    }

    public final void setMessageStore(MessageGroupStore messageGroupStore) {
        this.messageStore = messageGroupStore;
        messageGroupStore.registerMessageGroupExpiryCallback((messageGroupStore2, messageGroup) -> {
            this.forceReleaseProcessor.processMessageGroup(messageGroup);
        });
    }

    public void setCorrelationStrategy(CorrelationStrategy correlationStrategy) {
        Assert.notNull(correlationStrategy, "'correlationStrategy' must not be null");
        this.correlationStrategy = correlationStrategy;
    }

    public void setReleaseStrategy(ReleaseStrategy releaseStrategy) {
        Assert.notNull(releaseStrategy, "'releaseStrategy' must not be null");
        this.releaseStrategy = releaseStrategy;
        this.sequenceAware = this.releaseStrategy instanceof SequenceSizeReleaseStrategy;
        this.releaseStrategySet = true;
    }

    public void setGroupTimeoutExpression(Expression expression) {
        this.groupTimeoutExpression = expression;
    }

    public void setForceReleaseAdviceChain(List<Advice> list) {
        Assert.notNull(list, "'forceReleaseAdviceChain' must not be null");
        this.forceReleaseAdviceChain = list;
    }

    public void setOutputProcessor(MessageGroupProcessor messageGroupProcessor) {
        Assert.notNull(messageGroupProcessor, "'processor' must not be null");
        this.outputProcessor = messageGroupProcessor;
    }

    public MessageGroupProcessor getOutputProcessor() {
        return this.outputProcessor;
    }

    public void setDiscardChannel(MessageChannel messageChannel) {
        Assert.notNull(messageChannel, "'discardChannel' cannot be null");
        this.discardChannel = messageChannel;
    }

    public void setDiscardChannelName(String str) {
        Assert.hasText(str, "'discardChannelName' must not be empty");
        this.discardChannelName = str;
    }

    public void setSendPartialResultOnExpiry(boolean z) {
        this.sendPartialResultOnExpiry = z;
    }

    public void setMinimumTimeoutForEmptyGroups(long j) {
        this.minimumTimeoutForEmptyGroups = j;
    }

    public void setReleasePartialSequences(boolean z) {
        if (!this.releaseStrategySet && z) {
            setReleaseStrategy(new SequenceSizeReleaseStrategy());
        }
        this.releasePartialSequences = z;
    }

    public void setExpireGroupsUponTimeout(boolean z) {
        this.expireGroupsUponTimeout = z;
    }

    public void setPopSequence(boolean z) {
        this.popSequence = z;
    }

    protected boolean isReleaseLockBeforeSend() {
        return this.releaseLockBeforeSend;
    }

    public void setReleaseLockBeforeSend(boolean z) {
        this.releaseLockBeforeSend = z;
    }

    @Override // org.springframework.context.ApplicationEventPublisherAware
    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.integration.handler.AbstractMessageProducingHandler, org.springframework.integration.handler.AbstractMessageHandler, org.springframework.integration.context.IntegrationObjectSupport
    public void onInit() {
        super.onInit();
        Assert.state(this.discardChannelName == null || this.discardChannel == null, "'discardChannelName' and 'discardChannel' are mutually exclusive.");
        BeanFactory beanFactory = getBeanFactory();
        if (beanFactory != null) {
            if (this.outputProcessor instanceof BeanFactoryAware) {
                ((BeanFactoryAware) this.outputProcessor).setBeanFactory(beanFactory);
            }
            if (this.correlationStrategy instanceof BeanFactoryAware) {
                ((BeanFactoryAware) this.correlationStrategy).setBeanFactory(beanFactory);
            }
            if (this.releaseStrategy instanceof BeanFactoryAware) {
                ((BeanFactoryAware) this.releaseStrategy).setBeanFactory(beanFactory);
            }
        }
        if (this.releasePartialSequences) {
            Assert.isInstanceOf((Class<?>) SequenceSizeReleaseStrategy.class, this.releaseStrategy, (Supplier<String>) () -> {
                return "Release strategy of type [" + this.releaseStrategy.getClass().getSimpleName() + "] cannot release partial sequences. Use a SequenceSizeReleaseStrategy instead.";
            });
            ((SequenceSizeReleaseStrategy) this.releaseStrategy).setReleasePartialSequences(this.releasePartialSequences);
        }
        if (this.evaluationContext == null) {
            this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
        }
        if (this.sequenceAware) {
            this.logger.warn("Using a SequenceSizeReleaseStrategy with large groups may not perform well, consider using a SimpleSequenceSizeReleaseStrategy");
        }
        this.lockRegistrySet = true;
        this.forceReleaseProcessor = createGroupTimeoutProcessor();
    }

    private MessageGroupProcessor createGroupTimeoutProcessor() {
        ForceReleaseMessageGroupProcessor forceReleaseMessageGroupProcessor = new ForceReleaseMessageGroupProcessor();
        if (this.groupTimeoutExpression == null || CollectionUtils.isEmpty(this.forceReleaseAdviceChain)) {
            return forceReleaseMessageGroupProcessor;
        }
        ProxyFactory proxyFactory = new ProxyFactory(forceReleaseMessageGroupProcessor);
        List<Advice> list = this.forceReleaseAdviceChain;
        proxyFactory.getClass();
        list.forEach(proxyFactory::addAdvice);
        return (MessageGroupProcessor) proxyFactory.getProxy(getApplicationContext().getClassLoader());
    }

    @Override // org.springframework.integration.handler.AbstractMessageHandler, org.springframework.integration.context.IntegrationObjectSupport, org.springframework.integration.support.context.NamedComponent
    public String getComponentType() {
        return "aggregator";
    }

    public MessageGroupStore getMessageStore() {
        return this.messageStore;
    }

    protected Map<UUID, ScheduledFuture<?>> getExpireGroupScheduledFutures() {
        return this.expireGroupScheduledFutures;
    }

    protected CorrelationStrategy getCorrelationStrategy() {
        return this.correlationStrategy;
    }

    protected ReleaseStrategy getReleaseStrategy() {
        return this.releaseStrategy;
    }

    @Override // org.springframework.integration.handler.DiscardingMessageHandler
    public MessageChannel getDiscardChannel() {
        String str = this.discardChannelName;
        if (str == null && this.discardChannel == null) {
            str = IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME;
        }
        if (str != null) {
            try {
                this.discardChannel = getChannelResolver().resolveDestination(str);
            } catch (DestinationResolutionException e) {
                if (!str.equals(IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME)) {
                    throw e;
                }
                this.discardChannel = new NullChannel();
            }
            this.discardChannelName = null;
        }
        return this.discardChannel;
    }

    protected String getDiscardChannelName() {
        return this.discardChannelName;
    }

    protected boolean isSendPartialResultOnExpiry() {
        return this.sendPartialResultOnExpiry;
    }

    protected boolean isSequenceAware() {
        return this.sequenceAware;
    }

    protected LockRegistry getLockRegistry() {
        return this.lockRegistry;
    }

    protected boolean isLockRegistrySet() {
        return this.lockRegistrySet;
    }

    protected long getMinimumTimeoutForEmptyGroups() {
        return this.minimumTimeoutForEmptyGroups;
    }

    protected boolean isReleasePartialSequences() {
        return this.releasePartialSequences;
    }

    protected Expression getGroupTimeoutExpression() {
        return this.groupTimeoutExpression;
    }

    protected EvaluationContext getEvaluationContext() {
        return this.evaluationContext;
    }

    @Override // org.springframework.integration.handler.AbstractMessageHandler
    protected void handleMessageInternal(Message<?> message) {
        Object correlationKey = this.correlationStrategy.getCorrelationKey(message);
        Assert.state(correlationKey != null, "Null correlation not allowed.  Maybe the CorrelationStrategy is failing?");
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Handling message with correlationKey [" + correlationKey + "]: " + message);
        }
        UUID uuid = UUIDConverter.getUUID(correlationKey);
        Lock obtain = this.lockRegistry.obtain(uuid.toString());
        boolean z = true;
        try {
            obtain.lockInterruptibly();
            try {
                z = processMessageForGroup(message, correlationKey, uuid, obtain);
                if (z || !this.releaseLockBeforeSend) {
                    obtain.unlock();
                }
            } catch (Throwable th) {
                if (z || !this.releaseLockBeforeSend) {
                    obtain.unlock();
                }
                throw th;
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new MessageHandlingException(message, "Interrupted getting lock in the [" + this + ']', e);
        }
    }

    private boolean processMessageForGroup(Message<?> message, Object obj, UUID uuid, Lock lock) {
        boolean z = true;
        cancelScheduledFutureIfAny(obj, uuid, true);
        MessageGroup messageGroup = this.messageStore.getMessageGroup(obj);
        if (this.sequenceAware) {
            messageGroup = new SequenceAwareMessageGroup(messageGroup);
        }
        if (messageGroup.isComplete() || !messageGroup.canAdd(message)) {
            z = false;
            discardMessage(message, lock);
        } else {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Adding message to group [ " + messageGroup + "]");
            }
            MessageGroup store = store(obj, message);
            if (this.releaseStrategy.canRelease(store)) {
                Collection<Message<?>> collection = null;
                try {
                    z = false;
                    collection = completeGroup(message, obj, store, lock);
                    afterRelease(store, collection);
                    if (!isExpireGroupsUponCompletion() && this.minimumTimeoutForEmptyGroups > 0) {
                        removeEmptyGroupAfterTimeout(store, this.minimumTimeoutForEmptyGroups);
                    }
                } catch (Throwable th) {
                    afterRelease(store, collection);
                    throw th;
                }
            } else {
                scheduleGroupToForceComplete(store);
            }
        }
        return z;
    }

    private void cancelScheduledFutureIfAny(Object obj, UUID uuid, boolean z) {
        ScheduledFuture<?> remove = this.expireGroupScheduledFutures.remove(uuid);
        if (remove != null && remove.cancel(z) && this.logger.isDebugEnabled()) {
            this.logger.debug("Cancel 'ScheduledFuture' for MessageGroup with Correlation Key [ " + obj + "].");
        }
    }

    protected boolean isExpireGroupsUponCompletion() {
        return false;
    }

    private void removeEmptyGroupAfterTimeout(MessageGroup messageGroup, long j) {
        Object groupId = messageGroup.getGroupId();
        UUID uuid = UUIDConverter.getUUID(groupId);
        ScheduledFuture<?> schedule = getTaskScheduler().schedule(() -> {
            Lock obtain = this.lockRegistry.obtain(uuid.toString());
            try {
                obtain.lockInterruptibly();
                try {
                    this.expireGroupScheduledFutures.remove(uuid);
                    MessageGroup messageGroup2 = this.messageStore.getMessageGroup(uuid);
                    if (messageGroup2.size() == 0 && messageGroup2.getLastModified() <= System.currentTimeMillis() - this.minimumTimeoutForEmptyGroups) {
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug("Removing empty group: " + uuid);
                        }
                        remove(messageGroup);
                    }
                    obtain.unlock();
                } catch (Throwable th) {
                    obtain.unlock();
                    throw th;
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Thread was interrupted while trying to obtain lock.Rescheduling empty MessageGroup [ " + groupId + "] for removal.");
                }
                removeEmptyGroupAfterTimeout(messageGroup, j);
            }
        }, new Date(System.currentTimeMillis() + j));
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Schedule empty MessageGroup [ " + groupId + "] for removal.");
        }
        this.expireGroupScheduledFutures.put(uuid, schedule);
    }

    private void scheduleGroupToForceComplete(MessageGroup messageGroup) {
        Long obtainGroupTimeout = obtainGroupTimeout(messageGroup);
        if (obtainGroupTimeout != null) {
            if (obtainGroupTimeout.longValue() <= 0) {
                this.forceReleaseProcessor.processMessageGroup(messageGroup);
                return;
            }
            Object groupId = messageGroup.getGroupId();
            long timestamp = messageGroup.getTimestamp();
            long lastModified = messageGroup.getLastModified();
            ScheduledFuture<?> schedule = getTaskScheduler().schedule(() -> {
                try {
                    processForceRelease(groupId, timestamp, lastModified);
                } catch (MessageDeliveryException e) {
                    if (this.logger.isWarnEnabled()) {
                        this.logger.warn("The MessageGroup [" + groupId + "] is rescheduled by the reason of:", e);
                    }
                    scheduleGroupToForceComplete(groupId);
                }
            }, new Date(System.currentTimeMillis() + obtainGroupTimeout.longValue()));
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Schedule MessageGroup [ " + messageGroup + "] to 'forceComplete'.");
            }
            this.expireGroupScheduledFutures.put(UUIDConverter.getUUID(groupId), schedule);
        }
    }

    private void scheduleGroupToForceComplete(Object obj) {
        scheduleGroupToForceComplete(this.messageStore.getMessageGroup(obj));
    }

    private void processForceRelease(Object obj, long j, long j2) {
        MessageGroup messageGroup = this.messageStore.getMessageGroup(obj);
        if (messageGroup.getTimestamp() == j && messageGroup.getLastModified() == j2) {
            this.forceReleaseProcessor.processMessageGroup(messageGroup);
        }
    }

    private void discardMessage(Message<?> message, Lock lock) {
        if (this.releaseLockBeforeSend) {
            lock.unlock();
        }
        discardMessage(message);
    }

    private void discardMessage(Message<?> message) {
        MessageChannel discardChannel = getDiscardChannel();
        if (discardChannel != null) {
            this.messagingTemplate.send((MessagingTemplate) discardChannel, message);
        }
    }

    protected abstract void afterRelease(MessageGroup messageGroup, Collection<Message<?>> collection);

    protected void afterRelease(MessageGroup messageGroup, Collection<Message<?>> collection, boolean z) {
        afterRelease(messageGroup, collection);
    }

    /* JADX WARN: Finally extract failed */
    protected void forceComplete(MessageGroup messageGroup) {
        Object groupId = messageGroup.getGroupId();
        UUID uuid = UUIDConverter.getUUID(groupId);
        Lock obtain = this.lockRegistry.obtain(uuid.toString());
        boolean z = true;
        boolean z2 = true;
        try {
            obtain.lockInterruptibly();
            try {
                try {
                    cancelScheduledFutureIfAny(groupId, uuid, false);
                    MessageGroup messageGroup2 = messageGroup;
                    if (!messageGroup.isComplete()) {
                        messageGroup2 = this.messageStore.getMessageGroup(groupId);
                    }
                    long lastModified = messageGroup2.getLastModified();
                    int size = messageGroup2.size();
                    if ((messageGroup2.isComplete() && size != 0) || messageGroup.getLastModified() != lastModified || messageGroup.getTimestamp() != messageGroup2.getTimestamp()) {
                        z = false;
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug("Group expiry candidate (" + groupId + ") has changed - it may be reconsidered for a future expiration");
                        }
                    } else if (size > 0) {
                        z2 = false;
                        if (this.releaseStrategy.canRelease(messageGroup2)) {
                            completeGroup(groupId, messageGroup2, obtain);
                        } else {
                            expireGroup(groupId, messageGroup2, obtain);
                        }
                        if (!this.expireGroupsUponTimeout) {
                            afterRelease(messageGroup2, messageGroup2.getMessages(), true);
                            z = false;
                        }
                    } else {
                        z = lastModified <= System.currentTimeMillis() - this.minimumTimeoutForEmptyGroups;
                        if (z && this.logger.isDebugEnabled()) {
                            this.logger.debug("Removing empty group: " + groupId);
                        }
                    }
                    if (z) {
                        try {
                            remove(messageGroup);
                        } catch (Throwable th) {
                            z2 = z2;
                            throw th;
                        }
                    }
                    if (z2 || !this.releaseLockBeforeSend) {
                        obtain.unlock();
                    }
                } catch (Throwable th2) {
                    if (1 != 0) {
                        try {
                            remove(messageGroup);
                        } finally {
                            if (1 != 0 || !this.releaseLockBeforeSend) {
                                obtain.unlock();
                            }
                        }
                    }
                    if (1 != 0 || !this.releaseLockBeforeSend) {
                        obtain.unlock();
                    }
                    throw th2;
                }
            } catch (MessageDeliveryException e) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Group expiry candidate (" + groupId + ") has been affected by MessageDeliveryException - it may be reconsidered for a future expiration one more time");
                }
                throw e;
            }
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            this.logger.debug("Thread was interrupted while trying to obtain lock");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void remove(MessageGroup messageGroup) {
        this.messageStore.removeMessageGroup(messageGroup.getGroupId());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int findLastReleasedSequenceNumber(Object obj, Collection<Message<?>> collection) {
        return new IntegrationMessageHeaderAccessor((Message) Collections.max(collection, this.sequenceNumberComparator)).getSequenceNumber();
    }

    protected MessageGroup store(Object obj, Message<?> message) {
        return this.messageStore.addMessageToGroup(obj, message);
    }

    protected void expireGroup(Object obj, MessageGroup messageGroup, Lock lock) {
        if (this.logger.isInfoEnabled()) {
            this.logger.info("Expiring MessageGroup with correlationKey[" + obj + "]");
        }
        if (this.sendPartialResultOnExpiry) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Prematurely releasing partially complete group with key [" + obj + "] to: " + getOutputChannel());
            }
            completeGroup(obj, messageGroup, lock);
        } else {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Discarding messages of partially complete group with key [" + obj + "] to: " + (this.discardChannelName != null ? this.discardChannelName : this.discardChannel));
            }
            if (this.releaseLockBeforeSend) {
                lock.unlock();
            }
            messageGroup.getMessages().forEach(this::discardMessage);
        }
        if (this.applicationEventPublisher != null) {
            this.applicationEventPublisher.publishEvent((ApplicationEvent) new MessageGroupExpiredEvent(this, obj, messageGroup.size(), new Date(messageGroup.getLastModified()), new Date(), !this.sendPartialResultOnExpiry));
        }
    }

    protected void completeGroup(Object obj, MessageGroup messageGroup, Lock lock) {
        Message<?> message = null;
        if (messageGroup != null) {
            message = messageGroup.getOne();
        }
        completeGroup(message, obj, messageGroup, lock);
    }

    protected Collection<Message<?>> completeGroup(Message<?> message, Object obj, MessageGroup messageGroup, Lock lock) {
        Collection<Message<?>> collection = null;
        try {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Completing group with correlationKey [" + obj + "]");
            }
            Object processMessageGroup = this.outputProcessor.processMessageGroup(messageGroup);
            if (processMessageGroup instanceof Collection) {
                verifyResultCollectionConsistsOfMessages((Collection) processMessageGroup);
                collection = (Collection) processMessageGroup;
            }
            if (this.popSequence && collection == null && !(processMessageGroup instanceof Message)) {
                processMessageGroup = (processMessageGroup instanceof AbstractIntegrationMessageBuilder ? (AbstractIntegrationMessageBuilder) processMessageGroup : getMessageBuilderFactory().withPayload(processMessageGroup).copyHeaders(message.getHeaders())).popSequenceDetails();
            }
            sendOutputs(processMessageGroup, message);
            return collection;
        } finally {
            if (this.releaseLockBeforeSend) {
                lock.unlock();
            }
        }
    }

    protected void verifyResultCollectionConsistsOfMessages(Collection<?> collection) {
        Class<?> findCommonElementType = CollectionUtils.findCommonElementType(collection);
        Assert.isAssignable((Class<?>) Message.class, findCommonElementType, "The expected collection of Messages contains non-Message element: " + findCommonElementType);
    }

    protected Long obtainGroupTimeout(MessageGroup messageGroup) {
        if (this.groupTimeoutExpression != null) {
            return (Long) this.groupTimeoutExpression.getValue(this.evaluationContext, messageGroup, Long.class);
        }
        return null;
    }

    @Override // org.springframework.integration.handler.AbstractMessageHandler, org.springframework.integration.support.management.IntegrationManagement, org.springframework.beans.factory.DisposableBean
    public void destroy() {
        this.expireGroupScheduledFutures.values().forEach(scheduledFuture -> {
            scheduledFuture.cancel(true);
        });
    }

    @Override // org.springframework.context.Lifecycle
    public void start() {
        if (this.running) {
            return;
        }
        this.running = true;
        if (this.outputProcessor instanceof Lifecycle) {
            ((Lifecycle) this.outputProcessor).start();
        }
        if (this.releaseStrategy instanceof Lifecycle) {
            ((Lifecycle) this.releaseStrategy).start();
        }
    }

    @Override // org.springframework.context.Lifecycle
    public void stop() {
        if (this.running) {
            this.running = false;
            if (this.outputProcessor instanceof Lifecycle) {
                ((Lifecycle) this.outputProcessor).stop();
            }
            if (this.releaseStrategy instanceof Lifecycle) {
                ((Lifecycle) this.releaseStrategy).stop();
            }
        }
    }

    @Override // org.springframework.context.Lifecycle
    public boolean isRunning() {
        return this.running;
    }
}
