package org.kairosdb.core.queue;

import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableSortedMap;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.inject.Inject;
import javax.inject.Named;
import jnr.constants.platform.darwin.RLIM;
import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.kairosdb.bigqueue.IBigArray;
import org.kairosdb.core.DataPointSet;
import org.kairosdb.core.datapoints.LongDataPointFactory;
import org.kairosdb.core.datapoints.LongDataPointFactoryImpl;
import org.kairosdb.core.exception.DatastoreException;
import org.kairosdb.events.DataPointEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:importkairosdb_130.jar:org/kairosdb/core/queue/FileQueueProcessor.class */
public class FileQueueProcessor extends QueueProcessor {
    public static final Logger logger = LoggerFactory.getLogger((Class<?>) FileQueueProcessor.class);
    public static final String SECONDS_TILL_CHECKPOINT = "kairosdb.queue_processor.seconds_till_checkpoint";
    private final Object m_lock;
    private final IBigArray m_bigArray;
    private final CircularFifoQueue<IndexedEvent> m_memoryQueue;
    private final DataPointEventSerializer m_eventSerializer;
    private final List<DataPointEvent> m_internalMetrics;
    private AtomicInteger m_readFromFileCount;
    private AtomicInteger m_readFromQueueCount;
    private Stopwatch m_stopwatch;
    private CompletionCallBack m_lastCallback;
    private final int m_secondsTillCheckpoint;
    private ImmutableSortedMap<String, String> m_reportTags;
    private volatile boolean m_shuttingDown;
    private long m_nextIndex;
    private String m_hostName;

    @Inject
    private LongDataPointFactory m_dataPointFactory;

    /* loaded from: input_file:importkairosdb_130.jar:org/kairosdb/core/queue/FileQueueProcessor$CompletionCallBack.class */
    private class CompletionCallBack implements EventCompletionCallBack {
        private long m_completionIndex;
        private final AtomicInteger m_counter;
        private volatile boolean m_finalized;
        private CompletionCallBack m_childCallBack;

        private CompletionCallBack() {
            this.m_counter = new AtomicInteger(0);
            this.m_finalized = false;
        }

        public void setChildCallBack(CompletionCallBack completionCallBack) {
            this.m_childCallBack = completionCallBack;
            this.m_childCallBack.increment();
        }

        public void setCompletionIndex(long j) {
            this.m_completionIndex = j;
        }

        public void increment() {
            this.m_counter.incrementAndGet();
        }

        public void setFinalized() {
            this.m_finalized = true;
        }

        @Override // org.kairosdb.core.queue.EventCompletionCallBack
        public void complete() {
            if (this.m_counter.decrementAndGet() == 0 && this.m_finalized) {
                this.m_childCallBack.complete();
                try {
                    FileQueueProcessor.this.m_bigArray.removeBeforeIndex(this.m_completionIndex);
                } catch (IOException e) {
                    FileQueueProcessor.logger.warn("Unable to cleanup bigqueue", (Throwable) e);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:importkairosdb_130.jar:org/kairosdb/core/queue/FileQueueProcessor$IndexedEvent.class */
    public static class IndexedEvent {
        public final DataPointEvent m_dataPointEvent;
        public final long m_index;

        public IndexedEvent(DataPointEvent dataPointEvent, long j) {
            this.m_dataPointEvent = dataPointEvent;
            this.m_index = j;
        }
    }

    @Inject
    public FileQueueProcessor(DataPointEventSerializer dataPointEventSerializer, IBigArray iBigArray, @Named("queue_processor") ExecutorService executorService, @Named("kairosdb.queue_processor.batch_size") int i, @Named("kairosdb.queue_processor.memory_queue_size") int i2, @Named("kairosdb.queue_processor.seconds_till_checkpoint") int i3, @Named("kairosdb.queue_processor.min_batch_size") int i4, @Named("kairosdb.queue_processor.min_batch_wait") int i5) {
        super(executorService, i, i4, i5);
        this.m_lock = new Object();
        this.m_internalMetrics = new ArrayList();
        this.m_readFromFileCount = new AtomicInteger();
        this.m_readFromQueueCount = new AtomicInteger();
        this.m_stopwatch = Stopwatch.createStarted();
        this.m_lastCallback = new CompletionCallBack();
        this.m_reportTags = ImmutableSortedMap.of();
        this.m_nextIndex = -1L;
        this.m_hostName = "none";
        this.m_dataPointFactory = new LongDataPointFactoryImpl();
        this.m_bigArray = iBigArray;
        this.m_memoryQueue = new CircularFifoQueue<>(i2);
        this.m_eventSerializer = dataPointEventSerializer;
        this.m_nextIndex = this.m_bigArray.getTailIndex();
        this.m_secondsTillCheckpoint = i3;
        this.m_shuttingDown = false;
    }

    @Inject
    public void setHostName(@Named("HOSTNAME") String str) {
        this.m_hostName = str;
        this.m_reportTags = ImmutableSortedMap.of((Comparable) "host", (Object) this.m_hostName);
    }

    @Override // org.kairosdb.core.queue.QueueProcessor
    public void shutdown() {
        this.m_shuttingDown = true;
        this.m_bigArray.flush();
        try {
            this.m_bigArray.close();
        } catch (IOException e) {
            logger.warn("Error while shutting down bigqueue", (Throwable) e);
        }
        super.shutdown();
    }

    private long incrementIndex(long j) {
        if (j == RLIM.MAX_VALUE) {
            return 0L;
        }
        return j + 1;
    }

    private void waitForEvent() {
        boolean z = false;
        synchronized (this.m_lock) {
            if (this.m_memoryQueue.isEmpty()) {
                try {
                    z = true;
                    this.m_lock.wait();
                } catch (InterruptedException e) {
                    logger.info("Queue processor sleep interrupted");
                }
            }
        }
        if (z) {
            try {
                Thread.sleep(50L);
            } catch (InterruptedException e2) {
                e2.printStackTrace();
            }
        }
    }

    @Override // org.kairosdb.core.queue.QueueProcessor
    public void put(DataPointEvent dataPointEvent) throws DatastoreException {
        if (this.m_shuttingDown) {
            throw new DatastoreException("File Queue shutting down");
        }
        byte[] serializeEvent = this.m_eventSerializer.serializeEvent(dataPointEvent);
        try {
            synchronized (this.m_lock) {
                this.m_memoryQueue.add(new IndexedEvent(dataPointEvent, this.m_bigArray.append(serializeEvent)));
                this.m_lock.notify();
            }
        } catch (IOException e) {
            throw new DatastoreException("Failure to write data to bigqueue", e);
        }
    }

    @Override // org.kairosdb.core.queue.QueueProcessor
    protected int getAvailableDataPointEvents() {
        return this.m_memoryQueue.size();
    }

    @Override // org.kairosdb.core.queue.QueueProcessor
    protected List<DataPointEvent> get(int i) {
        if (this.m_memoryQueue.isEmpty()) {
            waitForEvent();
        }
        ArrayList arrayList = new ArrayList();
        long j = 0;
        synchronized (this.m_lock) {
            arrayList.addAll(this.m_internalMetrics);
            this.m_internalMetrics.clear();
            for (int size = arrayList.size(); size < i; size++) {
                IndexedEvent peek = this.m_memoryQueue.peek();
                if (peek == null) {
                    break;
                }
                if (peek.m_index == this.m_nextIndex) {
                    this.m_memoryQueue.remove();
                } else if (this.m_nextIndex != this.m_bigArray.getHeadIndex()) {
                    try {
                        peek = new IndexedEvent(this.m_eventSerializer.deserializeEvent(this.m_bigArray.get(this.m_nextIndex)), this.m_nextIndex);
                        this.m_readFromFileCount.incrementAndGet();
                    } catch (IOException e) {
                        logger.error("Unable to read from bigqueue", (Throwable) e);
                    }
                }
                j = this.m_nextIndex;
                this.m_nextIndex = incrementIndex(this.m_nextIndex);
                if (peek.m_dataPointEvent != null) {
                    arrayList.add(peek.m_dataPointEvent);
                }
            }
        }
        this.m_readFromQueueCount.getAndAdd(arrayList.size());
        this.m_lastCallback.increment();
        this.m_lastCallback.setCompletionIndex(j);
        return arrayList;
    }

    @Override // org.kairosdb.core.queue.QueueProcessor
    protected EventCompletionCallBack getCompletionCallBack() {
        CompletionCallBack completionCallBack = this.m_lastCallback;
        if (this.m_stopwatch.elapsed(TimeUnit.SECONDS) > this.m_secondsTillCheckpoint) {
            completionCallBack.setFinalized();
            this.m_lastCallback = new CompletionCallBack();
            completionCallBack.setChildCallBack(this.m_lastCallback);
            this.m_stopwatch.reset();
            this.m_stopwatch.start();
        }
        return completionCallBack;
    }

    @Override // org.kairosdb.core.queue.QueueProcessor
    public void addReportedMetrics(ArrayList<DataPointSet> arrayList, long j) {
        long headIndex = this.m_bigArray.getHeadIndex() - this.m_nextIndex;
        long andSet = this.m_readFromFileCount.getAndSet(0);
        long andSet2 = this.m_readFromQueueCount.getAndSet(0);
        synchronized (this.m_lock) {
            this.m_internalMetrics.add(new DataPointEvent("kairosdb.queue.file_queue.size", this.m_reportTags, this.m_dataPointFactory.createDataPoint(j, headIndex)));
            this.m_internalMetrics.add(new DataPointEvent("kairosdb.queue.read_from_file", this.m_reportTags, this.m_dataPointFactory.createDataPoint(j, andSet)));
            this.m_internalMetrics.add(new DataPointEvent("kairosdb.queue.process_count", this.m_reportTags, this.m_dataPointFactory.createDataPoint(j, andSet2)));
        }
        DataPointSet dataPointSet = new DataPointSet("kairosdb.queue.file_queue.size");
        dataPointSet.addTag("host", this.m_hostName);
        dataPointSet.addDataPoint(this.m_dataPointFactory.createDataPoint(j, headIndex));
        arrayList.add(dataPointSet);
        DataPointSet dataPointSet2 = new DataPointSet("kairosdb.queue.read_from_file");
        dataPointSet2.addTag("host", this.m_hostName);
        dataPointSet2.addDataPoint(this.m_dataPointFactory.createDataPoint(j, andSet));
        arrayList.add(dataPointSet2);
        DataPointSet dataPointSet3 = new DataPointSet("kairosdb.queue.process_count");
        dataPointSet3.addTag("host", this.m_hostName);
        dataPointSet3.addDataPoint(this.m_dataPointFactory.createDataPoint(j, andSet2));
        arrayList.add(dataPointSet3);
    }
}
