package org.springframework.cloud.stream.binder.kafka.streams;

import io.micrometer.core.instrument.MeterRegistry;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.context.properties.bind.Bindable;
import org.springframework.boot.context.properties.bind.Binder;
import org.springframework.boot.context.properties.bind.PropertySourcesPlaceholdersResolver;
import org.springframework.boot.context.properties.source.ConfigurationPropertySources;
import org.springframework.cloud.stream.binder.BinderConfiguration;
import org.springframework.cloud.stream.binder.kafka.streams.function.FunctionDetectorCondition;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.streams.serde.CompositeNonNativeSerde;
import org.springframework.cloud.stream.binder.kafka.streams.serde.MessageConverterDelegateSerde;
import org.springframework.cloud.stream.binding.BindingService;
import org.springframework.cloud.stream.binding.StreamListenerResultAdapter;
import org.springframework.cloud.stream.config.BinderProperties;
import org.springframework.cloud.stream.config.BindingServiceConfiguration;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.function.StreamFunctionProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.Environment;
import org.springframework.core.env.MapPropertySource;
import org.springframework.integration.support.utils.IntegrationUtils;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.kafka.streams.RecoveringDeserializationExceptionHandler;
import org.springframework.lang.Nullable;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.util.ObjectUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;

@EnableConfigurationProperties({KafkaStreamsExtendedBindingProperties.class})
@Configuration
@AutoConfigureAfter({BindingServiceConfiguration.class})
@ConditionalOnBean({BindingService.class})
/* loaded from: input_file:datasets/datasets-service.jar:BOOT-INF/lib/spring-cloud-stream-binder-kafka-streams-3.0.1.RELEASE.jar:org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfiguration.class */
public class KafkaStreamsBinderSupportAutoConfiguration {
    private static final String KSTREAM_BINDER_TYPE = "kstream";
    private static final String KTABLE_BINDER_TYPE = "ktable";
    private static final String GLOBALKTABLE_BINDER_TYPE = "globalktable";

    @ConditionalOnMissingBean(value = {KafkaStreamsBinderMetrics.class}, name = {"outerContext"})
    @Configuration
    @ConditionalOnClass(name = {"io.micrometer.core.instrument.MeterRegistry"})
    /* loaded from: input_file:datasets/datasets-service.jar:BOOT-INF/lib/spring-cloud-stream-binder-kafka-streams-3.0.1.RELEASE.jar:org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfiguration$KafkaStreamsBinderMetricsConfiguration.class */
    protected class KafkaStreamsBinderMetricsConfiguration {
        protected KafkaStreamsBinderMetricsConfiguration() {
        }

        @ConditionalOnMissingBean({KafkaStreamsBinderMetrics.class})
        @ConditionalOnBean({MeterRegistry.class})
        @Bean
        public KafkaStreamsBinderMetrics kafkaStreamsBinderMetrics(MeterRegistry meterRegistry) {
            return new KafkaStreamsBinderMetrics(meterRegistry);
        }
    }

    @Configuration
    @ConditionalOnClass(name = {"io.micrometer.core.instrument.MeterRegistry"})
    @ConditionalOnMissingBean({KafkaStreamsBinderMetrics.class})
    @ConditionalOnBean(name = {"outerContext"})
    /* loaded from: input_file:datasets/datasets-service.jar:BOOT-INF/lib/spring-cloud-stream-binder-kafka-streams-3.0.1.RELEASE.jar:org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfiguration$KafkaStreamsBinderMetricsConfigurationWithMultiBinder.class */
    protected class KafkaStreamsBinderMetricsConfigurationWithMultiBinder {
        protected KafkaStreamsBinderMetricsConfigurationWithMultiBinder() {
        }

        @Bean
        public KafkaStreamsBinderMetrics kafkaStreamsBinderMetrics(ConfigurableApplicationContext configurableApplicationContext) {
            return new KafkaStreamsBinderMetrics((MeterRegistry) ((ApplicationContext) configurableApplicationContext.getBean("outerContext", ApplicationContext.class)).getBean(MeterRegistry.class));
        }
    }

    @ConfigurationProperties(prefix = "spring.cloud.stream.kafka.streams.binder")
    @Bean
    public KafkaStreamsBinderConfigurationProperties binderConfigurationProperties(KafkaProperties kafkaProperties, ConfigurableEnvironment configurableEnvironment, BindingServiceProperties bindingServiceProperties, ConfigurableApplicationContext configurableApplicationContext) throws Exception {
        for (Map.Entry<String, BinderConfiguration> entry : getBinderConfigurations(bindingServiceProperties).entrySet()) {
            BinderConfiguration value = entry.getValue();
            String binderType = value.getBinderType();
            if (binderType != null && (binderType.equals(KSTREAM_BINDER_TYPE) || binderType.equals(KTABLE_BINDER_TYPE) || binderType.equals(GLOBALKTABLE_BINDER_TYPE))) {
                HashMap hashMap = new HashMap();
                flatten(null, value.getProperties(), hashMap);
                configurableEnvironment.getPropertySources().addFirst(new MapPropertySource(entry.getKey() + "-kafkaStreamsBinderEnv", hashMap));
                configurableApplicationContext.getBeanFactory().registerSingleton(entry.getKey() + "-KafkaStreamsBinderConfigurationProperties", new Binder(ConfigurationPropertySources.get(configurableEnvironment), new PropertySourcesPlaceholdersResolver(configurableEnvironment), IntegrationUtils.getConversionService(configurableApplicationContext.getBeanFactory()), null).bind("spring.cloud.stream.kafka.streams.binder", Bindable.ofInstance((KafkaStreamsBinderConfigurationProperties) BeanUtils.instantiateClass(ReflectionUtils.accessibleConstructor(KafkaStreamsBinderConfigurationProperties.class, KafkaProperties.class), kafkaProperties))).get());
            }
        }
        return new KafkaStreamsBinderConfigurationProperties(kafkaProperties);
    }

    private static Map<String, BinderConfiguration> getBinderConfigurations(BindingServiceProperties bindingServiceProperties) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, BinderProperties> entry : bindingServiceProperties.getBinders().entrySet()) {
            BinderProperties value = entry.getValue();
            hashMap.put(entry.getKey(), new BinderConfiguration(value.getType(), value.getEnvironment(), value.isInheritEnvironment(), value.isDefaultCandidate()));
        }
        return hashMap;
    }

    private void flatten(String str, Object obj, Map<String, Object> map) {
        if (obj instanceof Map) {
            ((Map) obj).forEach((obj2, obj3) -> {
                flatten((str != null ? str + "." : "") + obj2, obj3, map);
            });
        } else {
            map.put(str, obj.toString());
        }
    }

    @Bean
    public KafkaStreamsConfiguration kafkaStreamsConfiguration(@Qualifier("binderConfigurationProperties") KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties, Environment environment) {
        String property;
        KafkaProperties kafkaProperties = kafkaStreamsBinderConfigurationProperties.getKafkaProperties();
        Map<String, Object> buildStreamsProperties = kafkaProperties.buildStreamsProperties();
        if (kafkaProperties.getStreams().getApplicationId() == null && (property = environment.getProperty("spring.application.name")) != null) {
            buildStreamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, property);
        }
        return new KafkaStreamsConfiguration(buildStreamsProperties);
    }

    @Bean({"streamConfigGlobalProperties"})
    public Map<String, Object> streamConfigGlobalProperties(@Qualifier("binderConfigurationProperties") KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties, KafkaStreamsConfiguration kafkaStreamsConfiguration, ConfigurableEnvironment configurableEnvironment, SendToDlqAndContinue sendToDlqAndContinue) {
        Properties asProperties = kafkaStreamsConfiguration.asProperties();
        String kafkaConnectionString = kafkaStreamsBinderConfigurationProperties.getKafkaConnectionString();
        if (kafkaConnectionString != null && kafkaConnectionString.equals("localhost:9092") && StringUtils.isEmpty(configurableEnvironment.getProperty("spring.cloud.stream.kafka.streams.binder.brokers"))) {
            String property = configurableEnvironment.getProperty("spring.cloud.stream.kafka.binder.brokers");
            if (!StringUtils.isEmpty(property)) {
                kafkaConnectionString = property;
                kafkaStreamsBinderConfigurationProperties.setBrokers(kafkaConnectionString);
            }
        }
        if (ObjectUtils.isEmpty(asProperties.get("bootstrap.servers"))) {
            asProperties.put("bootstrap.servers", kafkaConnectionString);
        } else {
            Object obj = asProperties.get("bootstrap.servers");
            if (obj instanceof String) {
                if (((String) asProperties.get("bootstrap.servers")).equals("localhost:9092")) {
                    asProperties.put("bootstrap.servers", kafkaConnectionString);
                }
            } else if (obj instanceof List) {
                List list = (List) obj;
                if (list.size() == 1 && list.get(0).equals("localhost:9092")) {
                    asProperties.put("bootstrap.servers", kafkaConnectionString);
                }
            }
        }
        String applicationId = kafkaStreamsBinderConfigurationProperties.getApplicationId();
        if (StringUtils.hasText(applicationId)) {
            asProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
        }
        asProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName());
        asProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName());
        if (kafkaStreamsBinderConfigurationProperties.getDeserializationExceptionHandler() == DeserializationExceptionHandler.logAndContinue) {
            asProperties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
        } else if (kafkaStreamsBinderConfigurationProperties.getDeserializationExceptionHandler() == DeserializationExceptionHandler.logAndFail) {
            asProperties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndFailExceptionHandler.class);
        } else if (kafkaStreamsBinderConfigurationProperties.getDeserializationExceptionHandler() == DeserializationExceptionHandler.sendToDlq) {
            asProperties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, RecoveringDeserializationExceptionHandler.class);
            asProperties.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, sendToDlqAndContinue);
        }
        if (!ObjectUtils.isEmpty(kafkaStreamsBinderConfigurationProperties.getConfiguration())) {
            asProperties.putAll(kafkaStreamsBinderConfigurationProperties.getConfiguration());
        }
        return (Map) asProperties.entrySet().stream().collect(Collectors.toMap(entry -> {
            return String.valueOf(entry.getKey());
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    @Bean
    public KStreamStreamListenerResultAdapter kstreamStreamListenerResultAdapter() {
        return new KStreamStreamListenerResultAdapter();
    }

    @Bean
    public KStreamStreamListenerParameterAdapter kstreamStreamListenerParameterAdapter(KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate, KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue) {
        return new KStreamStreamListenerParameterAdapter(kafkaStreamsMessageConversionDelegate, kafkaStreamsBindingInformationCatalogue);
    }

    @Bean
    public KafkaStreamsStreamListenerSetupMethodOrchestrator kafkaStreamsStreamListenerSetupMethodOrchestrator(BindingServiceProperties bindingServiceProperties, KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties, KeyValueSerdeResolver keyValueSerdeResolver, KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue, KStreamStreamListenerParameterAdapter kStreamStreamListenerParameterAdapter, Collection<StreamListenerResultAdapter> collection, ObjectProvider<CleanupConfig> objectProvider, ObjectProvider<StreamsBuilderFactoryBeanCustomizer> objectProvider2, ConfigurableEnvironment configurableEnvironment) {
        return new KafkaStreamsStreamListenerSetupMethodOrchestrator(bindingServiceProperties, kafkaStreamsExtendedBindingProperties, keyValueSerdeResolver, kafkaStreamsBindingInformationCatalogue, kStreamStreamListenerParameterAdapter, collection, objectProvider.getIfUnique(), objectProvider2.getIfUnique(), configurableEnvironment);
    }

    @Bean
    public KafkaStreamsMessageConversionDelegate messageConversionDelegate(@Qualifier("integrationArgumentResolverMessageConverter") CompositeMessageConverter compositeMessageConverter, SendToDlqAndContinue sendToDlqAndContinue, KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue, @Qualifier("binderConfigurationProperties") KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties) {
        return new KafkaStreamsMessageConversionDelegate(compositeMessageConverter, sendToDlqAndContinue, kafkaStreamsBindingInformationCatalogue, kafkaStreamsBinderConfigurationProperties);
    }

    @Bean
    public MessageConverterDelegateSerde messageConverterDelegateSerde(@Qualifier("integrationArgumentResolverMessageConverter") CompositeMessageConverter compositeMessageConverter) {
        return new MessageConverterDelegateSerde(compositeMessageConverter);
    }

    @Bean
    public CompositeNonNativeSerde compositeNonNativeSerde(@Qualifier("integrationArgumentResolverMessageConverter") CompositeMessageConverter compositeMessageConverter) {
        return new CompositeNonNativeSerde(compositeMessageConverter);
    }

    @Bean
    public KStreamBoundElementFactory kStreamBoundElementFactory(BindingServiceProperties bindingServiceProperties, KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue, EncodingDecodingBindAdviceHandler encodingDecodingBindAdviceHandler) {
        return new KStreamBoundElementFactory(bindingServiceProperties, kafkaStreamsBindingInformationCatalogue, encodingDecodingBindAdviceHandler);
    }

    @Bean
    public KTableBoundElementFactory kTableBoundElementFactory(BindingServiceProperties bindingServiceProperties, EncodingDecodingBindAdviceHandler encodingDecodingBindAdviceHandler) {
        return new KTableBoundElementFactory(bindingServiceProperties, encodingDecodingBindAdviceHandler);
    }

    @Bean
    public GlobalKTableBoundElementFactory globalKTableBoundElementFactory(BindingServiceProperties bindingServiceProperties, EncodingDecodingBindAdviceHandler encodingDecodingBindAdviceHandler) {
        return new GlobalKTableBoundElementFactory(bindingServiceProperties, encodingDecodingBindAdviceHandler);
    }

    @Bean
    public SendToDlqAndContinue sendToDlqAndContinue() {
        return new SendToDlqAndContinue();
    }

    @Bean
    public KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue() {
        return new KafkaStreamsBindingInformationCatalogue();
    }

    @ConditionalOnMissingBean
    @Bean
    public KeyValueSerdeResolver keyValueSerdeResolver(@Qualifier("streamConfigGlobalProperties") Object obj, @Qualifier("binderConfigurationProperties") KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties) {
        return new KeyValueSerdeResolver((Map) obj, kafkaStreamsBinderConfigurationProperties);
    }

    @Bean
    public InteractiveQueryService interactiveQueryServices(KafkaStreamsRegistry kafkaStreamsRegistry, @Qualifier("binderConfigurationProperties") KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties) {
        return new InteractiveQueryService(kafkaStreamsRegistry, kafkaStreamsBinderConfigurationProperties);
    }

    @Bean
    public KafkaStreamsRegistry kafkaStreamsRegistry() {
        return new KafkaStreamsRegistry();
    }

    @Bean
    public StreamsBuilderFactoryManager streamsBuilderFactoryManager(KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue, KafkaStreamsRegistry kafkaStreamsRegistry, @Nullable KafkaStreamsBinderMetrics kafkaStreamsBinderMetrics) {
        return new StreamsBuilderFactoryManager(kafkaStreamsBindingInformationCatalogue, kafkaStreamsRegistry, kafkaStreamsBinderMetrics);
    }

    @Conditional({FunctionDetectorCondition.class})
    @Bean
    public KafkaStreamsFunctionProcessor kafkaStreamsFunctionProcessor(BindingServiceProperties bindingServiceProperties, KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties, KeyValueSerdeResolver keyValueSerdeResolver, KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue, KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate, ObjectProvider<CleanupConfig> objectProvider, StreamFunctionProperties streamFunctionProperties, @Qualifier("binderConfigurationProperties") KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties, ObjectProvider<StreamsBuilderFactoryBeanCustomizer> objectProvider2, ConfigurableEnvironment configurableEnvironment) {
        return new KafkaStreamsFunctionProcessor(bindingServiceProperties, kafkaStreamsExtendedBindingProperties, keyValueSerdeResolver, kafkaStreamsBindingInformationCatalogue, kafkaStreamsMessageConversionDelegate, objectProvider.getIfUnique(), streamFunctionProperties, kafkaStreamsBinderConfigurationProperties, objectProvider2.getIfUnique(), configurableEnvironment);
    }

    @Bean
    public EncodingDecodingBindAdviceHandler encodingDecodingBindAdviceHandler() {
        return new EncodingDecodingBindAdviceHandler();
    }
}
