package org.springframework.cloud.stream.function;

import java.lang.reflect.Field;
import java.lang.reflect.GenericArrayType;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.BeanDefinitionCustomizer;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionProperties;
import org.springframework.cloud.function.context.FunctionRegistry;
import org.springframework.cloud.function.context.PollableBean;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry;
import org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration;
import org.springframework.cloud.function.context.config.FunctionContextUtils;
import org.springframework.cloud.function.context.config.RoutingFunction;
import org.springframework.cloud.function.context.message.MessageUtils;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.binder.BindingCreatedEvent;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.binding.BindableProxyFactory;
import org.springframework.cloud.stream.binding.BinderAwareChannelResolver;
import org.springframework.cloud.stream.config.BinderFactoryAutoConfiguration;
import org.springframework.cloud.stream.config.BindingBeansRegistrar;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceConfiguration;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.messaging.DirectWithAttributesChannel;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.cloud.stream.micrometer.MetersPublisherBinding;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.EnvironmentAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.env.Environment;
import org.springframework.core.type.MethodMetadata;
import org.springframework.integration.channel.AbstractSubscribableChannel;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlowBuilder;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.util.IntegrationReactiveUtils;
import org.springframework.jdbc.datasource.init.ScriptUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.util.function.Tuples;

@EnableConfigurationProperties({StreamFunctionProperties.class})
@AutoConfigureBefore({BindingServiceConfiguration.class})
@Configuration
@AutoConfigureAfter({ContextFunctionCatalogAutoConfiguration.class})
@ConditionalOnBean({FunctionRegistry.class})
@Import({BindingBeansRegistrar.class, BinderFactoryAutoConfiguration.class})
/* loaded from: input_file:datasets/datasets-service.jar:BOOT-INF/lib/spring-cloud-stream-3.1.0.jar:org/springframework/cloud/stream/function/FunctionConfiguration.class */
public class FunctionConfiguration {
    private static final String SOURCE_PROPERY = "spring.cloud.stream.source";

    /* loaded from: input_file:datasets/datasets-service.jar:BOOT-INF/lib/spring-cloud-stream-3.1.0.jar:org/springframework/cloud/stream/function/FunctionConfiguration$FunctionBindingRegistrar.class */
    private static class FunctionBindingRegistrar implements InitializingBean, ApplicationContextAware, EnvironmentAware {
        protected final Log logger = LogFactory.getLog(getClass());
        private final FunctionCatalog functionCatalog;
        private final StreamFunctionProperties streamFunctionProperties;
        private ConfigurableApplicationContext applicationContext;
        private Environment environment;
        private int inputCount;
        private int outputCount;

        FunctionBindingRegistrar(FunctionCatalog functionCatalog, StreamFunctionProperties streamFunctionProperties) {
            this.functionCatalog = functionCatalog;
            this.streamFunctionProperties = streamFunctionProperties;
        }

        @Override // org.springframework.beans.factory.InitializingBean
        public void afterPropertiesSet() throws Exception {
            if (!ObjectUtils.isEmpty((Object[]) this.applicationContext.getBeanNamesForAnnotation(EnableBinding.class))) {
                this.logger.info("Functional binding is disabled due to the presense of @EnableBinding annotation in your configuration");
                return;
            }
            determineFunctionName(this.functionCatalog, this.environment);
            BeanDefinitionRegistry beanDefinitionRegistry = (BeanDefinitionRegistry) this.applicationContext.getBeanFactory();
            if (StringUtils.hasText(this.streamFunctionProperties.getDefinition())) {
                for (String str : filterEligibleFunctionDefinitions()) {
                    RootBeanDefinition rootBeanDefinition = new RootBeanDefinition((Class<?>) BindableFunctionProxyFactory.class);
                    SimpleFunctionRegistry.FunctionInvocationWrapper functionInvocationWrapper = (SimpleFunctionRegistry.FunctionInvocationWrapper) this.functionCatalog.lookup(str);
                    if (functionInvocationWrapper != null) {
                        Type functionType = functionInvocationWrapper.getFunctionType();
                        if (functionInvocationWrapper.isSupplier()) {
                            this.inputCount = 0;
                            this.outputCount = getOutputCount(functionType, true);
                        } else if (functionInvocationWrapper.isConsumer()) {
                            this.inputCount = FunctionTypeUtils.getInputCount(functionType);
                            this.outputCount = 0;
                        } else {
                            this.inputCount = FunctionTypeUtils.getInputCount(functionType);
                            this.outputCount = getOutputCount(functionType, false);
                        }
                        rootBeanDefinition.getConstructorArgumentValues().addGenericArgumentValue(str);
                        rootBeanDefinition.getConstructorArgumentValues().addGenericArgumentValue(Integer.valueOf(this.inputCount));
                        rootBeanDefinition.getConstructorArgumentValues().addGenericArgumentValue(Integer.valueOf(this.outputCount));
                        rootBeanDefinition.getConstructorArgumentValues().addGenericArgumentValue(this.streamFunctionProperties);
                        try {
                            beanDefinitionRegistry.registerBeanDefinition(str + "_binding", rootBeanDefinition);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    } else {
                        this.logger.warn("The function definition '" + this.streamFunctionProperties.getDefinition() + "' is not valid. The referenced function bean or one of its components does not exist");
                    }
                }
            }
            if (StringUtils.hasText(this.environment.getProperty(FunctionConfiguration.SOURCE_PROPERY))) {
                for (String str2 : this.environment.getProperty(FunctionConfiguration.SOURCE_PROPERY).split(ScriptUtils.DEFAULT_STATEMENT_SEPARATOR)) {
                    if (this.functionCatalog.lookup(str2) == null) {
                        RootBeanDefinition rootBeanDefinition2 = new RootBeanDefinition((Class<?>) BindableFunctionProxyFactory.class);
                        rootBeanDefinition2.getConstructorArgumentValues().addGenericArgumentValue(str2);
                        rootBeanDefinition2.getConstructorArgumentValues().addGenericArgumentValue((Object) 0);
                        rootBeanDefinition2.getConstructorArgumentValues().addGenericArgumentValue((Object) 1);
                        rootBeanDefinition2.getConstructorArgumentValues().addGenericArgumentValue(this.streamFunctionProperties);
                        beanDefinitionRegistry.registerBeanDefinition(str2 + "_binding", rootBeanDefinition2);
                    }
                }
            }
        }

        @Override // org.springframework.context.ApplicationContextAware
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = (ConfigurableApplicationContext) applicationContext;
        }

        @Override // org.springframework.context.EnvironmentAware
        public void setEnvironment(Environment environment) {
            this.environment = environment;
        }

        private int getOutputCount(Type type, boolean z) {
            int outputCount = FunctionTypeUtils.getOutputCount(type);
            if (!z && (type instanceof ParameterizedType)) {
                Type type2 = ((ParameterizedType) type).getActualTypeArguments()[1];
                if (FunctionTypeUtils.isMono(type2) && (type2 instanceof ParameterizedType) && FunctionTypeUtils.getRawType(((ParameterizedType) type2).getActualTypeArguments()[0]).equals(Void.class)) {
                    outputCount = 0;
                } else if (FunctionTypeUtils.getRawType(type2).equals(Void.class)) {
                    outputCount = 0;
                }
            }
            return outputCount;
        }

        private boolean determineFunctionName(FunctionCatalog functionCatalog, Environment environment) {
            String definition = this.streamFunctionProperties.getDefinition();
            if (!StringUtils.hasText(definition)) {
                definition = environment.getProperty(FunctionProperties.FUNCTION_DEFINITION);
            }
            if (StringUtils.hasText(definition)) {
                this.streamFunctionProperties.setDefinition(definition);
            } else if (Boolean.parseBoolean(environment.getProperty("spring.cloud.stream.function.routing.enabled", "false")) || environment.containsProperty("spring.cloud.function.routing-expression")) {
                this.streamFunctionProperties.setDefinition(RoutingFunction.FUNCTION_NAME);
            } else {
                this.streamFunctionProperties.setDefinition(((FunctionInspector) this.functionCatalog).getName(this.functionCatalog.lookup("")));
            }
            return StringUtils.hasText(this.streamFunctionProperties.getDefinition());
        }

        private String[] filterEligibleFunctionDefinitions() {
            ArrayList arrayList = new ArrayList();
            for (String str : this.streamFunctionProperties.getDefinition().split(ScriptUtils.DEFAULT_STATEMENT_SEPARATOR)) {
                String[] delimitedListToStringArray = StringUtils.delimitedListToStringArray(str.replaceAll(",", "|").trim(), "|");
                boolean z = true;
                for (int i = 0; i < delimitedListToStringArray.length && z; i++) {
                    String str2 = delimitedListToStringArray[i];
                    if (this.applicationContext.containsBean(str2)) {
                        String obj = FunctionTypeUtils.discoverFunctionType(this.applicationContext.getBean(str2), str2, (GenericApplicationContext) this.applicationContext).toString();
                        if (obj.contains("KTable") || obj.contains("KStream")) {
                            z = false;
                        }
                    } else {
                        this.logger.warn("You have defined function definition that does not exist: " + str2);
                    }
                }
                if (z) {
                    arrayList.add(str);
                }
            }
            return (String[]) arrayList.toArray(new String[0]);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:datasets/datasets-service.jar:BOOT-INF/lib/spring-cloud-stream-3.1.0.jar:org/springframework/cloud/stream/function/FunctionConfiguration$FunctionToDestinationBinder.class */
    public static class FunctionToDestinationBinder implements InitializingBean, ApplicationContextAware {
        protected final Log logger = LogFactory.getLog(getClass());
        private GenericApplicationContext applicationContext;
        private BindableProxyFactory[] bindableProxyFactories;
        private final FunctionCatalog functionCatalog;
        private final StreamFunctionProperties functionProperties;
        private final BindingServiceProperties serviceProperties;
        private final StreamBridge streamBridge;

        FunctionToDestinationBinder(FunctionCatalog functionCatalog, StreamFunctionProperties streamFunctionProperties, BindingServiceProperties bindingServiceProperties, StreamBridge streamBridge) {
            this.functionCatalog = functionCatalog;
            this.functionProperties = streamFunctionProperties;
            this.serviceProperties = bindingServiceProperties;
            this.streamBridge = streamBridge;
        }

        @Override // org.springframework.context.ApplicationContextAware
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = (GenericApplicationContext) applicationContext;
        }

        @Override // org.springframework.beans.factory.InitializingBean
        public void afterPropertiesSet() throws Exception {
            SimpleFunctionRegistry.FunctionInvocationWrapper functionInvocationWrapper;
            this.bindableProxyFactories = (BindableProxyFactory[]) this.applicationContext.getBeansOfType(BindableProxyFactory.class).values().toArray(new BindableProxyFactory[0]);
            for (BindableProxyFactory bindableProxyFactory : this.bindableProxyFactories) {
                String functionDefinition = bindableProxyFactory instanceof BindableFunctionProxyFactory ? ((BindableFunctionProxyFactory) bindableProxyFactory).getFunctionDefinition() : this.functionProperties.getDefinition();
                boolean z = false;
                if (!(bindableProxyFactory instanceof BindableFunctionProxyFactory)) {
                    Set<String> outputs = bindableProxyFactory.getOutputs();
                    z = !CollectionUtils.isEmpty(outputs) && outputs.iterator().next().equals(MetersPublisherBinding.APPLICATION_METRICS);
                }
                if (StringUtils.hasText(functionDefinition) && !z && (functionInvocationWrapper = (SimpleFunctionRegistry.FunctionInvocationWrapper) this.functionCatalog.lookup(functionDefinition)) != null && !functionInvocationWrapper.isSupplier()) {
                    bindFunctionToDestinations(bindableProxyFactory, functionDefinition);
                }
            }
        }

        private void bindFunctionToDestinations(BindableProxyFactory bindableProxyFactory, String str) {
            assertBindingIsPossible(bindableProxyFactory);
            Set<String> inputs = bindableProxyFactory.getInputs();
            Set<String> outputs = bindableProxyFactory.getOutputs();
            SimpleFunctionRegistry.FunctionInvocationWrapper functionInvocationWrapper = (SimpleFunctionRegistry.FunctionInvocationWrapper) this.functionCatalog.lookup(str, (String[]) outputs.stream().map(str2 -> {
                return this.serviceProperties.getBindings().get(str2).getContentType();
            }).toArray(i -> {
                return new String[i];
            }));
            Type functionType = functionInvocationWrapper.getFunctionType();
            assertSupportedSignatures(bindableProxyFactory, functionInvocationWrapper);
            if (this.functionProperties.isComposeFrom()) {
                AbstractSubscribableChannel abstractSubscribableChannel = (AbstractSubscribableChannel) this.applicationContext.getBean(outputs.iterator().next(), AbstractSubscribableChannel.class);
                this.logger.info("Composing at the head of output destination: " + abstractSubscribableChannel.getBeanName());
                String beanName = abstractSubscribableChannel.getBeanName();
                DirectWithAttributesChannel directWithAttributesChannel = new DirectWithAttributesChannel();
                directWithAttributesChannel.setAttribute("type", Source.OUTPUT);
                directWithAttributesChannel.setComponentName("output.extended");
                this.applicationContext.registerBean("output.extended", MessageChannel.class, () -> {
                    return directWithAttributesChannel;
                }, new BeanDefinitionCustomizer[0]);
                bindableProxyFactory.replaceOutputChannel(beanName, "output.extended", directWithAttributesChannel);
                inputs = Collections.singleton(Source.OUTPUT);
            }
            if (!isReactiveOrMultipleInputOutput(bindableProxyFactory, functionType)) {
                String determineOutputDestinationName = determineOutputDestinationName(0, bindableProxyFactory, functionType);
                String next = inputs.iterator().next();
                Object bean = this.applicationContext.getBean(next);
                if (bean == null || !(bean instanceof SubscribableChannel)) {
                    return;
                }
                ((SubscribableChannel) bean).subscribe(createFunctionHandler(functionInvocationWrapper, next, determineOutputDestinationName));
                return;
            }
            AtomicReference atomicReference = new AtomicReference();
            if (!CollectionUtils.isEmpty(outputs)) {
                String str3 = ((BinderFactory) this.applicationContext.getBean(BinderFactory.class)).getBinder(this.serviceProperties.getBinder(outputs.iterator().next()), MessageChannel.class).getClass().getSimpleName().startsWith("Rabbit") ? "amqp" : "kafka";
                Field findField = ReflectionUtils.findField(MessageHeaders.class, "headers");
                findField.setAccessible(true);
                atomicReference.set(message -> {
                    Map map = (Map) ReflectionUtils.getField(findField, message.getHeaders());
                    map.putIfAbsent(MessageUtils.TARGET_PROTOCOL, str3);
                    if (CloudEventMessageUtils.isCloudEvent(message)) {
                        map.putIfAbsent(MessageUtils.MESSAGE_TYPE, CloudEventMessageUtils.CLOUDEVENT_VALUE);
                    }
                    return message;
                });
            }
            Object[] objArr = (Publisher[]) inputs.stream().map(str4 -> {
                BindingProperties bindingProperties = this.serviceProperties.getBindings().get(str4);
                ConsumerProperties consumer = bindingProperties == null ? null : bindingProperties.getConsumer();
                if (consumer != null) {
                    functionInvocationWrapper.setSkipInputConversion(consumer.isUseNativeDecoding());
                    Assert.isTrue(consumer.getConcurrency() <= 1, "Concurrency > 1 is not supported by reactive consumer, given that project reactor maintains its own concurrency mechanism. Was '..." + str4 + ".consumer.concurrency=" + consumer.getConcurrency() + "'");
                }
                return IntegrationReactiveUtils.messageChannelToFlux((SubscribableChannel) this.applicationContext.getBean(str4, SubscribableChannel.class));
            }).map(flux -> {
                return atomicReference.get() != null ? flux.map((Function) atomicReference.get()) : flux;
            }).toArray(i2 -> {
                return new Publisher[i2];
            });
            Function function = functionInvocationWrapper;
            if (!CollectionUtils.isEmpty(outputs)) {
                BindingProperties bindingProperties = this.serviceProperties.getBindings().get(outputs.iterator().next());
                ProducerProperties producer = bindingProperties == null ? null : bindingProperties.getProducer();
                if (producer != null) {
                    functionInvocationWrapper.setSkipOutputConversion(producer.isUseNativeEncoding());
                }
                function = new PartitionAwareFunctionWrapper(functionInvocationWrapper, this.applicationContext, producer);
            }
            Object apply = function.apply(objArr.length == 1 ? objArr[0] : Tuples.fromArray(objArr));
            if (!(apply instanceof Iterable)) {
                apply = Collections.singletonList(apply);
            }
            Iterator<String> it = outputs.iterator();
            ((Iterable) apply).forEach(obj -> {
                Flux from = Flux.from((Publisher) obj);
                if (!CollectionUtils.isEmpty(outputs)) {
                    MessageChannel messageChannel = (MessageChannel) this.applicationContext.getBean((String) it.next(), MessageChannel.class);
                    from = from.doOnNext(obj -> {
                        if (!(obj instanceof Message) || ((Message) obj).getHeaders().get("spring.cloud.stream.sendto.destination") == null) {
                            if (!(obj instanceof Message)) {
                                obj = MessageBuilder.withPayload(obj).build();
                            }
                            messageChannel.send((Message) obj);
                            return;
                        }
                        String str5 = (String) ((Message) obj).getHeaders().get("spring.cloud.stream.sendto.destination");
                        SubscribableChannel resolveDestination = this.streamBridge.resolveDestination(str5, this.serviceProperties.getBindings().get(outputs.iterator().next()).getProducer());
                        if (this.logger.isInfoEnabled()) {
                            this.logger.info("Output message is sent to '" + str5 + "' destination");
                        }
                        if (!(obj instanceof Message)) {
                            obj = MessageBuilder.withPayload(obj).build();
                        }
                        resolveDestination.send((Message) obj);
                    }).doOnError(obj2 -> {
                        this.logger.error("Failure was detected during execution of the reactive function '" + str + "'");
                        ((Throwable) obj2).printStackTrace();
                    });
                }
                if (functionInvocationWrapper.isConsumer()) {
                    return;
                }
                from.subscribe();
            });
        }

        private AbstractMessageHandler createFunctionHandler(SimpleFunctionRegistry.FunctionInvocationWrapper functionInvocationWrapper, String str, final String str2) {
            ConsumerProperties consumer = StringUtils.hasText(str) ? this.serviceProperties.getBindingProperties(str).getConsumer() : null;
            final ProducerProperties producer = StringUtils.hasText(str2) ? this.serviceProperties.getBindingProperties(str2).getProducer() : null;
            final FunctionWrapper functionWrapper = new FunctionWrapper(functionInvocationWrapper, consumer, producer, this.applicationContext, determineTargetProtocol(str2));
            final MessagingTemplate messagingTemplate = new MessagingTemplate();
            messagingTemplate.setBeanFactory(this.applicationContext.getBeanFactory());
            AbstractMessageHandler abstractMessageHandler = new AbstractMessageHandler() { // from class: org.springframework.cloud.stream.function.FunctionConfiguration.FunctionToDestinationBinder.1
                @Override // org.springframework.integration.handler.AbstractMessageHandler
                public void handleMessageInternal(Message<?> message) throws MessagingException {
                    Object apply = functionWrapper.apply((Message<byte[]>) message);
                    if (apply instanceof Iterable) {
                        Iterator it = ((Iterable) apply).iterator();
                        while (it.hasNext()) {
                            doSendMessage(it.next(), message);
                        }
                    } else {
                        if (!ObjectUtils.isArray(apply) || (apply instanceof byte[])) {
                            doSendMessage(apply, message);
                            return;
                        }
                        for (int i = 0; i < ((Object[]) apply).length; i++) {
                            doSendMessage(((Object[]) apply)[i], message);
                        }
                    }
                }

                private void doSendMessage(Object obj, Message<?> message) {
                    if (!(obj instanceof Message) || ((Message) obj).getHeaders().get("spring.cloud.stream.sendto.destination") == null) {
                        if (StringUtils.hasText(str2)) {
                            messagingTemplate.send(str2, (Message<?>) obj);
                        }
                    } else {
                        String str3 = (String) ((Message) obj).getHeaders().get("spring.cloud.stream.sendto.destination");
                        SubscribableChannel resolveDestination = FunctionToDestinationBinder.this.streamBridge.resolveDestination(str3, producer);
                        if (this.logger.isInfoEnabled()) {
                            this.logger.info("Output message is sent to '" + str3 + "' destination");
                        }
                        resolveDestination.send((Message) obj);
                    }
                }
            };
            abstractMessageHandler.setBeanFactory(this.applicationContext);
            abstractMessageHandler.afterPropertiesSet();
            return abstractMessageHandler;
        }

        private String determineTargetProtocol(String str) {
            if (StringUtils.hasText(str)) {
                return ((BinderFactory) this.applicationContext.getBean(BinderFactory.class)).getBinder(this.serviceProperties.getBinder(str), MessageChannel.class).getClass().getSimpleName().startsWith("Rabbit") ? "amqp" : "kafka";
            }
            return null;
        }

        private boolean isReactiveOrMultipleInputOutput(BindableProxyFactory bindableProxyFactory, Type type) {
            return isMultipleInputOutput(bindableProxyFactory) || (FunctionTypeUtils.isPublisher(FunctionTypeUtils.getInputType(type)) || FunctionTypeUtils.isPublisher(FunctionTypeUtils.getOutputType(type)));
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v17, types: [java.util.List] */
        private String determineOutputDestinationName(int i, BindableProxyFactory bindableProxyFactory, Type type) {
            ArrayList arrayList = new ArrayList(bindableProxyFactory.getOutputs());
            if (CollectionUtils.isEmpty(arrayList)) {
                arrayList = Collections.singletonList(Source.OUTPUT);
            }
            return bindableProxyFactory instanceof BindableFunctionProxyFactory ? ((BindableFunctionProxyFactory) bindableProxyFactory).getOutputName(i) : FunctionTypeUtils.isConsumer(type) ? null : (String) arrayList.get(i);
        }

        private void assertBindingIsPossible(BindableProxyFactory bindableProxyFactory) {
            if (isMultipleInputOutput(bindableProxyFactory)) {
                Assert.isTrue((this.functionProperties.isComposeTo() || this.functionProperties.isComposeFrom()) ? false : true, "Composing to/from existing Sinks and Sources are not supported for functions with multiple arguments.");
            }
        }

        private boolean isMultipleInputOutput(BindableProxyFactory bindableProxyFactory) {
            return (bindableProxyFactory instanceof BindableFunctionProxyFactory) && ((BindableFunctionProxyFactory) bindableProxyFactory).isMultiple();
        }

        private boolean isArray(Type type) {
            return (type instanceof GenericArrayType) || ((type instanceof Class) && ((Class) type).isArray());
        }

        private void assertSupportedSignatures(BindableProxyFactory bindableProxyFactory, SimpleFunctionRegistry.FunctionInvocationWrapper functionInvocationWrapper) {
            if (isMultipleInputOutput(bindableProxyFactory)) {
                Assert.isTrue(!functionInvocationWrapper.isConsumer(), "Function '" + this.functionProperties.getDefinition() + "' is a Consumer which is not supported for multi-in/out reactive streams. Only Functions are supported");
                Assert.isTrue(!functionInvocationWrapper.isSupplier(), "Function '" + this.functionProperties.getDefinition() + "' is a Supplier which is not supported for multi-in/out reactive streams. Only Functions are supported");
                Assert.isTrue((isArray(functionInvocationWrapper.getInputType()) || isArray(functionInvocationWrapper.getOutputType())) ? false : true, "Function '" + this.functionProperties.getDefinition() + "' has the following signature: [" + functionInvocationWrapper.getFunctionType() + "]. Your input and/or outout lacks arity and therefore we can not determine how many input/output destinations are required in the context of function input/output binding.");
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:datasets/datasets-service.jar:BOOT-INF/lib/spring-cloud-stream-3.1.0.jar:org/springframework/cloud/stream/function/FunctionConfiguration$FunctionWrapper.class */
    public static class FunctionWrapper implements Function<Message<byte[]>, Object> {
        private final Function function;
        private final ConsumerProperties consumerProperties;
        private final ProducerProperties producerProperties;
        private final Field headersField;
        private final ConfigurableApplicationContext applicationContext;
        private final boolean isRoutingFunction;
        private final String targetProtocol;

        FunctionWrapper(Function function, ConsumerProperties consumerProperties, ProducerProperties producerProperties, ConfigurableApplicationContext configurableApplicationContext, String str) {
            this.isRoutingFunction = ((SimpleFunctionRegistry.FunctionInvocationWrapper) function).getTarget() instanceof RoutingFunction;
            this.applicationContext = configurableApplicationContext;
            this.function = new PartitionAwareFunctionWrapper((SimpleFunctionRegistry.FunctionInvocationWrapper) function, this.applicationContext, producerProperties);
            this.consumerProperties = consumerProperties;
            if (this.consumerProperties != null) {
                ((SimpleFunctionRegistry.FunctionInvocationWrapper) function).setSkipInputConversion(this.consumerProperties.isUseNativeDecoding());
            }
            this.producerProperties = producerProperties;
            if (this.producerProperties != null) {
                ((SimpleFunctionRegistry.FunctionInvocationWrapper) function).setSkipOutputConversion(this.producerProperties.isUseNativeEncoding());
            }
            this.headersField = ReflectionUtils.findField(MessageHeaders.class, "headers");
            this.headersField.setAccessible(true);
            this.targetProtocol = str;
        }

        @Override // java.util.function.Function
        public Object apply(Message<byte[]> message) {
            Map map = (Map) ReflectionUtils.getField(this.headersField, message.getHeaders());
            if (StringUtils.hasText(this.targetProtocol)) {
                map.putIfAbsent(MessageUtils.TARGET_PROTOCOL, this.targetProtocol);
            }
            if (CloudEventMessageUtils.isCloudEvent(message)) {
                map.putIfAbsent(MessageUtils.MESSAGE_TYPE, CloudEventMessageUtils.CLOUDEVENT_VALUE);
            }
            if (message != null && this.consumerProperties != null) {
                map.put(FunctionProperties.SKIP_CONVERSION_HEADER, Boolean.valueOf(this.consumerProperties.isUseNativeDecoding()));
            }
            Object apply = this.function.apply(message);
            if ((apply instanceof Publisher) && this.isRoutingFunction) {
                throw new IllegalStateException("Routing to functions that return Publisher is not supported in the context of Spring Cloud Stream.");
            }
            return apply;
        }
    }

    /* loaded from: input_file:datasets/datasets-service.jar:BOOT-INF/lib/spring-cloud-stream-3.1.0.jar:org/springframework/cloud/stream/function/FunctionConfiguration$PollableSourceRegistrar.class */
    private static class PollableSourceRegistrar implements BeanFactoryPostProcessor {
        private final Environment environment;

        PollableSourceRegistrar(Environment environment) {
            this.environment = environment;
        }

        @Override // org.springframework.beans.factory.config.BeanFactoryPostProcessor
        public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {
            if (StringUtils.hasText(this.environment.getProperty("spring.cloud.stream.pollable-source"))) {
                for (String str : this.environment.getProperty("spring.cloud.stream.pollable-source").split(ScriptUtils.DEFAULT_STATEMENT_SEPARATOR)) {
                    RootBeanDefinition rootBeanDefinition = new RootBeanDefinition((Class<?>) BindableFunctionProxyFactory.class);
                    rootBeanDefinition.getConstructorArgumentValues().addGenericArgumentValue(str);
                    rootBeanDefinition.getConstructorArgumentValues().addGenericArgumentValue((Object) 1);
                    rootBeanDefinition.getConstructorArgumentValues().addGenericArgumentValue((Object) 0);
                    rootBeanDefinition.getConstructorArgumentValues().addGenericArgumentValue(new StreamFunctionProperties());
                    rootBeanDefinition.getConstructorArgumentValues().addGenericArgumentValue((Object) true);
                    ((BeanDefinitionRegistry) configurableListableBeanFactory).registerBeanDefinition(str + "_binding", rootBeanDefinition);
                }
            }
        }
    }

    @Bean
    public StreamBridge streamBridgeUtils(FunctionCatalog functionCatalog, FunctionRegistry functionRegistry, BindingServiceProperties bindingServiceProperties, ConfigurableApplicationContext configurableApplicationContext, @Nullable BinderAwareChannelResolver.NewDestinationBindingCallback newDestinationBindingCallback) {
        return new StreamBridge(functionCatalog, functionRegistry, bindingServiceProperties, configurableApplicationContext, newDestinationBindingCallback);
    }

    @Bean
    public InitializingBean functionBindingRegistrar(Environment environment, FunctionCatalog functionCatalog, StreamFunctionProperties streamFunctionProperties) {
        return new FunctionBindingRegistrar(functionCatalog, streamFunctionProperties);
    }

    @Bean
    public BeanFactoryPostProcessor po(Environment environment) {
        return new PollableSourceRegistrar(environment);
    }

    @Bean
    public InitializingBean functionInitializer(FunctionCatalog functionCatalog, FunctionInspector functionInspector, StreamFunctionProperties streamFunctionProperties, @Nullable BindableProxyFactory[] bindableProxyFactoryArr, BindingServiceProperties bindingServiceProperties, ConfigurableApplicationContext configurableApplicationContext, FunctionBindingRegistrar functionBindingRegistrar, StreamBridge streamBridge) {
        if (configurableApplicationContext.containsBean(Source.OUTPUT) || ObjectUtils.isEmpty((Object[]) configurableApplicationContext.getBeanNamesForAnnotation(EnableBinding.class))) {
            return new FunctionToDestinationBinder(functionCatalog, streamFunctionProperties, bindingServiceProperties, streamBridge);
        }
        return null;
    }

    @Bean
    InitializingBean supplierInitializer(final FunctionCatalog functionCatalog, final StreamFunctionProperties streamFunctionProperties, final GenericApplicationContext genericApplicationContext, final BindingServiceProperties bindingServiceProperties, @Nullable final BindableFunctionProxyFactory[] bindableFunctionProxyFactoryArr, final StreamBridge streamBridge, final TaskScheduler taskScheduler) {
        if (!ObjectUtils.isEmpty((Object[]) genericApplicationContext.getBeanNamesForAnnotation(EnableBinding.class)) || bindableFunctionProxyFactoryArr == null) {
            return null;
        }
        return new InitializingBean() { // from class: org.springframework.cloud.stream.function.FunctionConfiguration.1
            /* JADX WARN: Multi-variable type inference failed */
            @Override // org.springframework.beans.factory.InitializingBean
            public void afterPropertiesSet() throws Exception {
                SimpleFunctionRegistry.FunctionInvocationWrapper functionInvocationWrapper;
                for (BindableFunctionProxyFactory bindableFunctionProxyFactory : bindableFunctionProxyFactoryArr) {
                    SimpleFunctionRegistry.FunctionInvocationWrapper functionInvocationWrapper2 = (SimpleFunctionRegistry.FunctionInvocationWrapper) functionCatalog.lookup(bindableFunctionProxyFactory.getFunctionDefinition());
                    if (functionInvocationWrapper2 != null && functionInvocationWrapper2.isSupplier()) {
                        ArrayList arrayList = new ArrayList();
                        Assert.isTrue(bindableFunctionProxyFactory.getOutputs().size() == 1, "Supplier with multiple outputs is not supported at the moment.");
                        String next = bindableFunctionProxyFactory.getOutputs().iterator().next();
                        BindingProperties bindingProperties = bindingServiceProperties.getBindingProperties(next);
                        ProducerProperties producer = bindingProperties.getProducer();
                        if (bindingProperties.getProducer() == null || !producer.isUseNativeEncoding()) {
                            arrayList.add(bindingProperties.getContentType());
                        }
                        String[] delimitedListToStringArray = StringUtils.delimitedListToStringArray(bindableFunctionProxyFactory.getFunctionDefinition().replaceAll(",", "|").trim(), "|");
                        Object obj = null;
                        Function function = null;
                        if (ObjectUtils.isEmpty((Object[]) delimitedListToStringArray) || delimitedListToStringArray.length <= 1) {
                            functionInvocationWrapper = (SimpleFunctionRegistry.FunctionInvocationWrapper) functionCatalog.lookup(bindableFunctionProxyFactory.getFunctionDefinition(), (String[]) arrayList.toArray(new String[0]));
                        } else {
                            String str = delimitedListToStringArray[0];
                            String arrayToCommaDelimitedString = StringUtils.arrayToCommaDelimitedString(Arrays.copyOfRange(delimitedListToStringArray, 1, delimitedListToStringArray.length));
                            obj = (Function) functionCatalog.lookup(str);
                            function = (Function) functionCatalog.lookup(arrayToCommaDelimitedString, (String[]) arrayList.toArray(new String[0]));
                            functionInvocationWrapper = (((SimpleFunctionRegistry.FunctionInvocationWrapper) obj).isOutputTypePublisher() || !((SimpleFunctionRegistry.FunctionInvocationWrapper) function).isInputTypePublisher()) ? (SimpleFunctionRegistry.FunctionInvocationWrapper) functionCatalog.lookup(bindableFunctionProxyFactory.getFunctionDefinition(), (String[]) arrayList.toArray(new String[0])) : null;
                        }
                        Publisher publisher = FunctionConfiguration.this.setupBindingTrigger(genericApplicationContext);
                        if (!streamFunctionProperties.isComposeFrom() && !streamFunctionProperties.isComposeTo()) {
                            String str2 = bindableFunctionProxyFactory.getFunctionDefinition() + "_integrationflow";
                            PollableBean extractPollableAnnotation = FunctionConfiguration.this.extractPollableAnnotation(streamFunctionProperties, genericApplicationContext, bindableFunctionProxyFactory);
                            if (functionInvocationWrapper != null) {
                                IntegrationFlowBuilder integrationFlowFromProvidedSupplier = FunctionConfiguration.this.integrationFlowFromProvidedSupplier(new PartitionAwareFunctionWrapper(functionInvocationWrapper, genericApplicationContext, producer), publisher, extractPollableAnnotation, genericApplicationContext, taskScheduler, functionInvocationWrapper.getFunctionType());
                                StreamBridge streamBridge2 = streamBridge;
                                IntegrationFlow integrationFlow = (IntegrationFlow) genericApplicationContext.getAutowireCapableBeanFactory().applyBeanPostProcessorsBeforeInitialization(((IntegrationFlowBuilder) integrationFlowFromProvidedSupplier.route(Message.class, message -> {
                                    return message.getHeaders().get("spring.cloud.stream.sendto.destination") != null ? streamBridge2.resolveDestination((String) message.getHeaders().get("spring.cloud.stream.sendto.destination"), producer) : next;
                                })).get(), str2);
                                genericApplicationContext.registerBean(str2, IntegrationFlow.class, () -> {
                                    return integrationFlow;
                                }, new BeanDefinitionCustomizer[0]);
                            } else {
                                IntegrationFlowBuilder integrationFlowBuilder = (IntegrationFlowBuilder) ((IntegrationFlowBuilder) FunctionConfiguration.this.integrationFlowFromProvidedSupplier(new PartitionAwareFunctionWrapper((SimpleFunctionRegistry.FunctionInvocationWrapper) obj, genericApplicationContext, producer), publisher, extractPollableAnnotation, genericApplicationContext, taskScheduler, ((SimpleFunctionRegistry.FunctionInvocationWrapper) obj).getFunctionType()).channel(channels -> {
                                    return channels.direct();
                                })).fluxTransform(function);
                                StreamBridge streamBridge3 = streamBridge;
                                IntegrationFlow integrationFlow2 = (IntegrationFlow) genericApplicationContext.getAutowireCapableBeanFactory().applyBeanPostProcessorsBeforeInitialization(((IntegrationFlowBuilder) integrationFlowBuilder.route(Message.class, message2 -> {
                                    return message2.getHeaders().get("spring.cloud.stream.sendto.destination") != null ? streamBridge3.resolveDestination((String) message2.getHeaders().get("spring.cloud.stream.sendto.destination"), producer) : next;
                                })).get(), str2);
                                genericApplicationContext.registerBean(str2, IntegrationFlow.class, () -> {
                                    return integrationFlow2;
                                }, new BeanDefinitionCustomizer[0]);
                            }
                        }
                    }
                }
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Publisher<Object> setupBindingTrigger(GenericApplicationContext genericApplicationContext) {
        AtomicReference atomicReference = new AtomicReference();
        Mono create = Mono.create(monoSink -> {
            atomicReference.set(monoSink);
        });
        genericApplicationContext.addApplicationListener(applicationEvent -> {
            if (!(applicationEvent instanceof BindingCreatedEvent) || atomicReference.get() == null) {
                return;
            }
            ((MonoSink) atomicReference.get()).success();
        });
        return create;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Multi-variable type inference failed */
    public IntegrationFlowBuilder integrationFlowFromProvidedSupplier(Supplier<?> supplier, Publisher<Object> publisher, PollableBean pollableBean, GenericApplicationContext genericApplicationContext, TaskScheduler taskScheduler, Type type) {
        IntegrationFlowBuilder fromSupplier;
        boolean z = pollableBean != null && ((Boolean) AnnotationUtils.getAnnotationAttributes(pollableBean).get("splittable")).booleanValue();
        boolean isPublisher = FunctionTypeUtils.isPublisher(FunctionTypeUtils.getOutputType(type));
        if (pollableBean == null && isPublisher) {
            Publisher publisher2 = (Publisher) supplier.get();
            fromSupplier = IntegrationFlows.from((Publisher<? extends Message<?>>) (publisher2 instanceof Mono ? ((Mono) publisher2).delaySubscription(publisher).map(this::wrapToMessageIfNecessary) : ((Flux) publisher2).delaySubscription(publisher).map(this::wrapToMessageIfNecessary)));
            taskScheduler.schedule(() -> {
            }, Instant.now());
        } else {
            fromSupplier = IntegrationFlows.fromSupplier(supplier);
            if (z && isPublisher) {
                fromSupplier = (IntegrationFlowBuilder) fromSupplier.split();
            }
        }
        return fromSupplier;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public PollableBean extractPollableAnnotation(StreamFunctionProperties streamFunctionProperties, GenericApplicationContext genericApplicationContext, BindableFunctionProxyFactory bindableFunctionProxyFactory) {
        BeanDefinition beanDefinition = genericApplicationContext.getBeanDefinition(StringUtils.delimitedListToStringArray(bindableFunctionProxyFactory.getFunctionDefinition().replaceAll(",", "|").trim(), "|")[0]);
        if (!(beanDefinition instanceof RootBeanDefinition)) {
            return null;
        }
        Method resolvedFactoryMethod = ((RootBeanDefinition) beanDefinition).getResolvedFactoryMethod();
        if (resolvedFactoryMethod == null) {
            Object source = beanDefinition.getSource();
            if (source instanceof MethodMetadata) {
                Class<?> resolveClassName = ClassUtils.resolveClassName(((MethodMetadata) source).getDeclaringClassName(), null);
                resolvedFactoryMethod = ReflectionUtils.findMethod(resolveClassName, ((MethodMetadata) source).getMethodName(), FunctionContextUtils.getParamTypesFromBeanDefinitionFactory(resolveClassName, (RootBeanDefinition) beanDefinition));
            }
        }
        Assert.notNull(resolvedFactoryMethod, "Failed to introspect factory method since it was not discovered for function '" + streamFunctionProperties.getDefinition() + "'");
        if (resolvedFactoryMethod.getReturnType().isAssignableFrom(Supplier.class)) {
            return (PollableBean) AnnotationUtils.findAnnotation(resolvedFactoryMethod, PollableBean.class);
        }
        return null;
    }

    private <T> Message<T> wrapToMessageIfNecessary(T t) {
        return t instanceof Message ? (Message) t : MessageBuilder.withPayload(t).build();
    }
}
