package org.apache.kafka.streams;

import io.reactivex.netty.protocol.http.client.HttpClientResponse;
import java.lang.Thread;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.StreamsMetadata;
import org.apache.kafka.streams.state.internals.GlobalStateStoreProvider;
import org.apache.kafka.streams.state.internals.QueryableStoreProvider;
import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
import org.slf4j.Logger;

@InterfaceStability.Evolving
/* loaded from: input_file:datasets/datasets-service.jar:BOOT-INF/lib/kafka-streams-2.3.1.jar:org/apache/kafka/streams/KafkaStreams.class */
public class KafkaStreams implements AutoCloseable {
    private static final String JMX_PREFIX = "kafka.streams";
    private final Time time;
    private final Logger log;
    private final String clientId;
    private final Metrics metrics;
    private final StreamsConfig config;
    protected final StreamThread[] threads;
    private final StateDirectory stateDirectory;
    private final StreamsMetadataState streamsMetadataState;
    private final ScheduledExecutorService stateDirCleaner;
    private final QueryableStoreProvider queryableStoreProvider;
    private final AdminClient adminClient;
    private GlobalStreamThread globalStreamThread;
    private StateListener stateListener;
    private StateRestoreListener globalStateRestoreListener;
    private final Object stateLock;
    protected volatile State state;

    /* loaded from: input_file:datasets/datasets-service.jar:BOOT-INF/lib/kafka-streams-2.3.1.jar:org/apache/kafka/streams/KafkaStreams$DelegatingStateRestoreListener.class */
    final class DelegatingStateRestoreListener implements StateRestoreListener {
        DelegatingStateRestoreListener() {
        }

        private void throwOnFatalException(Exception exc, TopicPartition topicPartition, String str) {
            throw new StreamsException(String.format("Fatal user code error in store restore listener for store %s, partition %s.", str, topicPartition), exc);
        }

        @Override // org.apache.kafka.streams.processor.StateRestoreListener
        public void onRestoreStart(TopicPartition topicPartition, String str, long j, long j2) {
            if (KafkaStreams.this.globalStateRestoreListener != null) {
                try {
                    KafkaStreams.this.globalStateRestoreListener.onRestoreStart(topicPartition, str, j, j2);
                } catch (Exception e) {
                    throwOnFatalException(e, topicPartition, str);
                }
            }
        }

        @Override // org.apache.kafka.streams.processor.StateRestoreListener
        public void onBatchRestored(TopicPartition topicPartition, String str, long j, long j2) {
            if (KafkaStreams.this.globalStateRestoreListener != null) {
                try {
                    KafkaStreams.this.globalStateRestoreListener.onBatchRestored(topicPartition, str, j, j2);
                } catch (Exception e) {
                    throwOnFatalException(e, topicPartition, str);
                }
            }
        }

        @Override // org.apache.kafka.streams.processor.StateRestoreListener
        public void onRestoreEnd(TopicPartition topicPartition, String str, long j) {
            if (KafkaStreams.this.globalStateRestoreListener != null) {
                try {
                    KafkaStreams.this.globalStateRestoreListener.onRestoreEnd(topicPartition, str, j);
                } catch (Exception e) {
                    throwOnFatalException(e, topicPartition, str);
                }
            }
        }
    }

    /* loaded from: input_file:datasets/datasets-service.jar:BOOT-INF/lib/kafka-streams-2.3.1.jar:org/apache/kafka/streams/KafkaStreams$State.class */
    public enum State {
        CREATED(1, 3),
        REBALANCING(2, 3, 5),
        RUNNING(1, 3, 5),
        PENDING_SHUTDOWN(4),
        NOT_RUNNING(new Integer[0]),
        ERROR(3);

        private final Set<Integer> validTransitions = new HashSet();

        State(Integer... numArr) {
            this.validTransitions.addAll(Arrays.asList(numArr));
        }

        public boolean isRunning() {
            return equals(RUNNING) || equals(REBALANCING);
        }

        public boolean isValidTransition(State state) {
            return this.validTransitions.contains(Integer.valueOf(state.ordinal()));
        }
    }

    /* loaded from: input_file:datasets/datasets-service.jar:BOOT-INF/lib/kafka-streams-2.3.1.jar:org/apache/kafka/streams/KafkaStreams$StateListener.class */
    public interface StateListener {
        void onChange(State state, State state2);
    }

    /* loaded from: input_file:datasets/datasets-service.jar:BOOT-INF/lib/kafka-streams-2.3.1.jar:org/apache/kafka/streams/KafkaStreams$StreamStateListener.class */
    final class StreamStateListener implements StreamThread.StateListener {
        private final Map<Long, StreamThread.State> threadState;
        private GlobalStreamThread.State globalThreadState;
        private final Object threadStatesLock = new Object();

        StreamStateListener(Map<Long, StreamThread.State> map, GlobalStreamThread.State state) {
            this.threadState = map;
            this.globalThreadState = state;
        }

        private void maybeSetError() {
            Iterator<StreamThread.State> it = this.threadState.values().iterator();
            while (it.hasNext()) {
                if (it.next() != StreamThread.State.DEAD) {
                    return;
                }
            }
            if (KafkaStreams.this.setState(State.ERROR)) {
                KafkaStreams.this.log.error("All stream threads have died. The instance will be in error state and should be closed.");
            }
        }

        private void maybeSetRunning() {
            for (StreamThread.State state : this.threadState.values()) {
                if (state != StreamThread.State.RUNNING && state != StreamThread.State.DEAD) {
                    return;
                }
            }
            if (this.globalThreadState == null || this.globalThreadState == GlobalStreamThread.State.RUNNING) {
                KafkaStreams.this.setState(State.RUNNING);
            }
        }

        @Override // org.apache.kafka.streams.processor.internals.StreamThread.StateListener
        public synchronized void onChange(Thread thread, ThreadStateTransitionValidator threadStateTransitionValidator, ThreadStateTransitionValidator threadStateTransitionValidator2) {
            synchronized (this.threadStatesLock) {
                if (thread instanceof StreamThread) {
                    StreamThread.State state = (StreamThread.State) threadStateTransitionValidator;
                    this.threadState.put(Long.valueOf(thread.getId()), state);
                    if (state == StreamThread.State.PARTITIONS_REVOKED) {
                        KafkaStreams.this.setState(State.REBALANCING);
                    } else if (state == StreamThread.State.RUNNING) {
                        maybeSetRunning();
                    } else if (state == StreamThread.State.DEAD) {
                        maybeSetError();
                    }
                } else if (thread instanceof GlobalStreamThread) {
                    GlobalStreamThread.State state2 = (GlobalStreamThread.State) threadStateTransitionValidator;
                    this.globalThreadState = state2;
                    if (state2 == GlobalStreamThread.State.DEAD && KafkaStreams.this.setState(State.ERROR)) {
                        KafkaStreams.this.log.error("Global thread has died. The instance will be in error state and should be closed.");
                    }
                }
            }
        }
    }

    private boolean waitOnState(State state, long j) {
        long milliseconds = this.time.milliseconds();
        synchronized (this.stateLock) {
            long j2 = 0;
            while (this.state != state) {
                if (j <= j2) {
                    this.log.debug("Cannot transit to {} within {}ms", state, Long.valueOf(j));
                    return false;
                }
                try {
                    this.stateLock.wait(j - j2);
                } catch (InterruptedException e) {
                }
                j2 = this.time.milliseconds() - milliseconds;
            }
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean setState(State state) {
        synchronized (this.stateLock) {
            State state2 = this.state;
            if (this.state == State.PENDING_SHUTDOWN && state != State.NOT_RUNNING) {
                return false;
            }
            if (this.state == State.NOT_RUNNING && (state == State.PENDING_SHUTDOWN || state == State.NOT_RUNNING)) {
                return false;
            }
            if (this.state == State.REBALANCING && state == State.REBALANCING) {
                return false;
            }
            if (this.state == State.ERROR && state == State.ERROR) {
                return false;
            }
            if (!this.state.isValidTransition(state)) {
                throw new IllegalStateException("Stream-client " + this.clientId + ": Unexpected state transition from " + state2 + " to " + state);
            }
            this.log.info("State transition from {} to {}", state2, state);
            this.state = state;
            this.stateLock.notifyAll();
            if (this.stateListener == null) {
                return true;
            }
            this.stateListener.onChange(state, state2);
            return true;
        }
    }

    public State state() {
        return this.state;
    }

    private boolean isRunning() {
        boolean isRunning;
        synchronized (this.stateLock) {
            isRunning = this.state.isRunning();
        }
        return isRunning;
    }

    private void validateIsRunning() {
        if (!isRunning()) {
            throw new IllegalStateException("KafkaStreams is not running. State is " + this.state + ".");
        }
    }

    public void setStateListener(StateListener stateListener) {
        synchronized (this.stateLock) {
            if (this.state != State.CREATED) {
                throw new IllegalStateException("Can only set StateListener in CREATED state. Current state is: " + this.state);
            }
            this.stateListener = stateListener;
        }
    }

    public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
        synchronized (this.stateLock) {
            if (this.state != State.CREATED) {
                throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state. Current state is: " + this.state);
            }
            for (StreamThread streamThread : this.threads) {
                streamThread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
            }
            if (this.globalStreamThread != null) {
                this.globalStreamThread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
            }
        }
    }

    public void setGlobalStateRestoreListener(StateRestoreListener stateRestoreListener) {
        synchronized (this.stateLock) {
            if (this.state != State.CREATED) {
                throw new IllegalStateException("Can only set GlobalStateRestoreListener in CREATED state. Current state is: " + this.state);
            }
            this.globalStateRestoreListener = stateRestoreListener;
        }
    }

    public Map<MetricName, ? extends Metric> metrics() {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (StreamThread streamThread : this.threads) {
            linkedHashMap.putAll(streamThread.producerMetrics());
            linkedHashMap.putAll(streamThread.consumerMetrics());
            linkedHashMap.putAll(streamThread.adminClientMetrics());
        }
        if (this.globalStreamThread != null) {
            linkedHashMap.putAll(this.globalStreamThread.consumerMetrics());
        }
        linkedHashMap.putAll(this.metrics.metrics());
        return Collections.unmodifiableMap(linkedHashMap);
    }

    public KafkaStreams(Topology topology, Properties properties) {
        this(topology.internalTopologyBuilder, new StreamsConfig(properties), new DefaultKafkaClientSupplier());
    }

    public KafkaStreams(Topology topology, Properties properties, KafkaClientSupplier kafkaClientSupplier) {
        this(topology.internalTopologyBuilder, new StreamsConfig(properties), kafkaClientSupplier, Time.SYSTEM);
    }

    public KafkaStreams(Topology topology, Properties properties, Time time) {
        this(topology.internalTopologyBuilder, new StreamsConfig(properties), new DefaultKafkaClientSupplier(), time);
    }

    public KafkaStreams(Topology topology, Properties properties, KafkaClientSupplier kafkaClientSupplier, Time time) {
        this(topology.internalTopologyBuilder, new StreamsConfig(properties), kafkaClientSupplier, time);
    }

    @Deprecated
    public KafkaStreams(Topology topology, StreamsConfig streamsConfig) {
        this(topology, streamsConfig, new DefaultKafkaClientSupplier());
    }

    @Deprecated
    public KafkaStreams(Topology topology, StreamsConfig streamsConfig, KafkaClientSupplier kafkaClientSupplier) {
        this(topology.internalTopologyBuilder, streamsConfig, kafkaClientSupplier);
    }

    @Deprecated
    public KafkaStreams(Topology topology, StreamsConfig streamsConfig, Time time) {
        this(topology.internalTopologyBuilder, streamsConfig, new DefaultKafkaClientSupplier(), time);
    }

    private KafkaStreams(InternalTopologyBuilder internalTopologyBuilder, StreamsConfig streamsConfig, KafkaClientSupplier kafkaClientSupplier) throws StreamsException {
        this(internalTopologyBuilder, streamsConfig, kafkaClientSupplier, Time.SYSTEM);
    }

    private KafkaStreams(InternalTopologyBuilder internalTopologyBuilder, StreamsConfig streamsConfig, KafkaClientSupplier kafkaClientSupplier, Time time) throws StreamsException {
        this.stateLock = new Object();
        this.state = State.CREATED;
        this.config = streamsConfig;
        this.time = time;
        UUID randomUUID = UUID.randomUUID();
        String string = streamsConfig.getString("client.id");
        String string2 = streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG);
        if (string.length() <= 0) {
            this.clientId = string2 + "-" + randomUUID;
        } else {
            this.clientId = string;
        }
        this.log = new LogContext(String.format("stream-client [%s] ", this.clientId)).logger(getClass());
        MetricConfig timeWindow = new MetricConfig().samples(streamsConfig.getInt("metrics.num.samples").intValue()).recordLevel(Sensor.RecordingLevel.forName(streamsConfig.getString("metrics.recording.level"))).timeWindow(streamsConfig.getLong("metrics.sample.window.ms").longValue(), TimeUnit.MILLISECONDS);
        List configuredInstances = streamsConfig.getConfiguredInstances("metric.reporters", MetricsReporter.class, Collections.singletonMap("client.id", this.clientId));
        configuredInstances.add(new JmxReporter(JMX_PREFIX));
        this.metrics = new Metrics(timeWindow, configuredInstances, time);
        internalTopologyBuilder.rewriteTopology(streamsConfig);
        ProcessorTopology build = internalTopologyBuilder.build();
        this.streamsMetadataState = new StreamsMetadataState(internalTopologyBuilder, parseHostInfo(streamsConfig.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
        this.threads = new StreamThread[streamsConfig.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG).intValue()];
        long longValue = streamsConfig.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG).longValue();
        if (longValue < 0) {
            longValue = 0;
            this.log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes.");
        }
        ProcessorTopology buildGlobalStateTopology = internalTopologyBuilder.buildGlobalStateTopology();
        long length = longValue / (this.threads.length + (buildGlobalStateTopology == null ? 0 : 1));
        try {
            this.stateDirectory = new StateDirectory(streamsConfig, time, build.hasPersistentLocalStore() || (buildGlobalStateTopology != null && buildGlobalStateTopology.hasPersistentGlobalStore()));
            DelegatingStateRestoreListener delegatingStateRestoreListener = new DelegatingStateRestoreListener();
            GlobalStreamThread.State state = null;
            if (buildGlobalStateTopology != null) {
                this.globalStreamThread = new GlobalStreamThread(buildGlobalStateTopology, streamsConfig, kafkaClientSupplier.getGlobalConsumer(streamsConfig.getGlobalConsumerConfigs(this.clientId)), this.stateDirectory, length, this.metrics, time, this.clientId + "-GlobalStreamThread", delegatingStateRestoreListener);
                state = this.globalStreamThread.state();
            }
            this.adminClient = kafkaClientSupplier.getAdminClient(streamsConfig.getAdminConfigs(StreamThread.getSharedAdminClientId(this.clientId)));
            HashMap hashMap = new HashMap(this.threads.length);
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < this.threads.length; i++) {
                this.threads[i] = StreamThread.create(internalTopologyBuilder, streamsConfig, kafkaClientSupplier, this.adminClient, randomUUID, this.clientId, this.metrics, time, this.streamsMetadataState, length, this.stateDirectory, delegatingStateRestoreListener, i + 1);
                hashMap.put(Long.valueOf(this.threads[i].getId()), this.threads[i].state());
                arrayList.add(new StreamThreadStateStoreProvider(this.threads[i]));
            }
            StreamStateListener streamStateListener = new StreamStateListener(hashMap, state);
            if (buildGlobalStateTopology != null) {
                this.globalStreamThread.setStateListener(streamStateListener);
            }
            for (StreamThread streamThread : this.threads) {
                streamThread.setStateListener(streamStateListener);
            }
            this.queryableStoreProvider = new QueryableStoreProvider(arrayList, new GlobalStateStoreProvider(internalTopologyBuilder.globalStateStores()));
            this.stateDirCleaner = Executors.newSingleThreadScheduledExecutor(runnable -> {
                Thread thread = new Thread(runnable, this.clientId + "-CleanupThread");
                thread.setDaemon(true);
                return thread;
            });
        } catch (ProcessorStateException e) {
            throw new StreamsException(e);
        }
    }

    private static HostInfo parseHostInfo(String str) {
        if (str == null || str.trim().isEmpty()) {
            return StreamsMetadataState.UNKNOWN_HOST;
        }
        String host = Utils.getHost(str);
        Integer port = Utils.getPort(str);
        if (host == null || port == null) {
            throw new ConfigException(String.format("Error parsing host address %s. Expected format host:port.", str));
        }
        return new HostInfo(host, port.intValue());
    }

    public synchronized void start() throws IllegalStateException, StreamsException {
        if (!setState(State.REBALANCING)) {
            throw new IllegalStateException("The client is either already started or already stopped, cannot re-start");
        }
        this.log.debug("Starting Streams client");
        if (this.globalStreamThread != null) {
            this.globalStreamThread.start();
        }
        for (StreamThread streamThread : this.threads) {
            streamThread.start();
        }
        Long l = this.config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
        this.stateDirCleaner.scheduleAtFixedRate(() -> {
            if (this.state == State.RUNNING) {
                this.stateDirectory.cleanRemovedTasks(l.longValue());
            }
        }, l.longValue(), l.longValue(), TimeUnit.MILLISECONDS);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        close(Long.MAX_VALUE);
    }

    @Deprecated
    public synchronized boolean close(long j, TimeUnit timeUnit) {
        long millis = timeUnit.toMillis(j);
        this.log.debug("Stopping Streams client with timeoutMillis = {} ms. You are using deprecated method. Please, consider update your code.", Long.valueOf(millis));
        if (millis < 0) {
            millis = 0;
        } else if (millis == 0) {
            millis = Long.MAX_VALUE;
        }
        return close(millis);
    }

    private boolean close(long j) {
        if (setState(State.PENDING_SHUTDOWN)) {
            this.stateDirCleaner.shutdownNow();
            Thread thread = new Thread(() -> {
                for (StreamThread streamThread : this.threads) {
                    streamThread.shutdown();
                }
                for (StreamThread streamThread2 : this.threads) {
                    try {
                        if (!streamThread2.isRunning()) {
                            streamThread2.join();
                        }
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
                if (this.globalStreamThread != null) {
                    this.globalStreamThread.shutdown();
                }
                if (this.globalStreamThread != null && !this.globalStreamThread.stillRunning()) {
                    try {
                        this.globalStreamThread.join();
                    } catch (InterruptedException e2) {
                        Thread.currentThread().interrupt();
                    }
                    this.globalStreamThread = null;
                }
                this.adminClient.close();
                this.metrics.close();
                setState(State.NOT_RUNNING);
            }, "kafka-streams-close-thread");
            thread.setDaemon(true);
            thread.start();
        } else {
            this.log.info("Already in the pending shutdown state, wait to complete shutdown");
        }
        if (waitOnState(State.NOT_RUNNING, j)) {
            this.log.info("Streams client stopped completely");
            return true;
        }
        this.log.info("Streams client cannot stop completely within the timeout");
        return false;
    }

    public synchronized boolean close(Duration duration) throws IllegalArgumentException {
        long validateMillisecondDuration = ApiUtils.validateMillisecondDuration(duration, ApiUtils.prepareMillisCheckFailMsgPrefix(duration, HttpClientResponse.KEEP_ALIVE_TIMEOUT_HEADER_ATTR));
        if (validateMillisecondDuration < 0) {
            throw new IllegalArgumentException("Timeout can't be negative.");
        }
        this.log.debug("Stopping Streams client with timeoutMillis = {} ms.", Long.valueOf(validateMillisecondDuration));
        return close(validateMillisecondDuration);
    }

    public void cleanUp() {
        if (isRunning()) {
            throw new IllegalStateException("Cannot clean up while running.");
        }
        this.stateDirectory.clean();
    }

    public Collection<StreamsMetadata> allMetadata() {
        validateIsRunning();
        return this.streamsMetadataState.getAllMetadata();
    }

    public Collection<StreamsMetadata> allMetadataForStore(String str) {
        validateIsRunning();
        return this.streamsMetadataState.getAllMetadataForStore(str);
    }

    public <K> StreamsMetadata metadataForKey(String str, K k, Serializer<K> serializer) {
        validateIsRunning();
        return this.streamsMetadataState.getMetadataWithKey(str, (String) k, (Serializer<String>) serializer);
    }

    public <K> StreamsMetadata metadataForKey(String str, K k, StreamPartitioner<? super K, ?> streamPartitioner) {
        validateIsRunning();
        return this.streamsMetadataState.getMetadataWithKey(str, (String) k, (StreamPartitioner<? super String, ?>) streamPartitioner);
    }

    public <T> T store(String str, QueryableStoreType<T> queryableStoreType) {
        validateIsRunning();
        return (T) this.queryableStoreProvider.getStore(str, queryableStoreType);
    }

    public Set<ThreadMetadata> localThreadsMetadata() {
        validateIsRunning();
        HashSet hashSet = new HashSet();
        for (StreamThread streamThread : this.threads) {
            hashSet.add(streamThread.threadMetadata());
        }
        return hashSet;
    }
}
