package org.kairosdb.core.queue;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import javax.inject.Inject;
import org.kairosdb.core.DataPointSet;
import org.kairosdb.core.exception.DatastoreException;
import org.kairosdb.core.reporting.KairosMetricReporter;
import org.kairosdb.eventbus.Subscribe;
import org.kairosdb.events.BatchReductionEvent;
import org.kairosdb.events.DataPointEvent;
import org.kairosdb.util.SimpleStats;
import org.kairosdb.util.SimpleStatsReporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:importkairosdb_130.jar:org/kairosdb/core/queue/QueueProcessor.class */
public abstract class QueueProcessor implements KairosMetricReporter {
    public static final Logger logger = LoggerFactory.getLogger((Class<?>) QueueProcessor.class);
    public static final String QUEUE_PROCESSOR_CLASS = "kairosdb.queue_processor.class";
    public static final String QUEUE_PROCESSOR = "queue_processor";
    public static final String BATCH_SIZE = "kairosdb.queue_processor.batch_size";
    public static final String MEMORY_QUEUE_SIZE = "kairosdb.queue_processor.memory_queue_size";
    public static final String MINIMUM_BATCH_SIZE = "kairosdb.queue_processor.min_batch_size";
    public static final String MINIMUM_BATCH_WAIT = "kairosdb.queue_processor.min_batch_wait";
    private final ExecutorService m_executor;
    private int m_batchSize;
    private final int m_initialBatchSize;
    private final int m_minimumBatchSize;
    private final int m_minBatchWait;
    private volatile ProcessorHandler m_processorHandler;
    private final SimpleStats m_batchStats = new SimpleStats();

    @Inject
    private SimpleStatsReporter m_simpleStatsReporter = new SimpleStatsReporter();
    private final DeliveryThread m_deliveryThread = new DeliveryThread();

    /* loaded from: input_file:importkairosdb_130.jar:org/kairosdb/core/queue/QueueProcessor$DeliveryThread.class */
    public class DeliveryThread implements Runnable {
        private boolean m_running = true;
        private boolean m_runOnce = false;

        public DeliveryThread() {
        }

        public void shutdown() {
            this.m_running = false;
        }

        public void setRunning(boolean z) {
            this.m_running = z;
        }

        public void setRunOnce(boolean z) {
            this.m_runOnce = z;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (QueueProcessor.this.m_processorHandler == null) {
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            while (this.m_running) {
                if (this.m_runOnce) {
                    this.m_running = false;
                }
                try {
                    if (QueueProcessor.this.getAvailableDataPointEvents() < QueueProcessor.this.m_minimumBatchSize) {
                        Thread.sleep(QueueProcessor.this.m_minBatchWait);
                    }
                    if (QueueProcessor.this.getAvailableDataPointEvents() != 0) {
                        List<DataPointEvent> list = QueueProcessor.this.get(QueueProcessor.this.m_batchSize);
                        EventCompletionCallBack completionCallBack = QueueProcessor.this.getCompletionCallBack();
                        QueueProcessor.this.m_batchStats.addValue(list.size());
                        boolean z = false;
                        if (list.size() == QueueProcessor.this.m_batchSize) {
                            z = true;
                            if (QueueProcessor.this.m_batchSize < QueueProcessor.this.m_initialBatchSize) {
                                QueueProcessor.this.m_batchSize += 5;
                            }
                        }
                        QueueProcessor.this.m_processorHandler.handleEvents(list, completionCallBack, z);
                    }
                } catch (Exception e2) {
                    QueueProcessor.logger.error("DeliveryThread Exception", (Throwable) e2);
                }
            }
        }
    }

    public QueueProcessor(ExecutorService executorService, int i, int i2, int i3) {
        this.m_batchSize = i;
        this.m_initialBatchSize = i;
        this.m_minimumBatchSize = i2;
        this.m_minBatchWait = i3;
        executorService.execute(this.m_deliveryThread);
        this.m_executor = executorService;
        logger.info("Starting QueueProcessor " + getClass().getName());
    }

    @Subscribe
    public void reduceBatch(BatchReductionEvent batchReductionEvent) {
        this.m_batchSize = Math.min(this.m_batchSize, batchReductionEvent.getBatchSize());
        logger.info("Reducing queue batch size to " + this.m_batchSize);
    }

    public void setProcessorHandler(ProcessorHandler processorHandler) {
        this.m_processorHandler = processorHandler;
    }

    public void shutdown() {
        this.m_deliveryThread.shutdown();
        this.m_executor.shutdown();
    }

    public abstract void put(DataPointEvent dataPointEvent) throws DatastoreException;

    protected abstract List<DataPointEvent> get(int i);

    protected abstract int getAvailableDataPointEvents();

    protected abstract EventCompletionCallBack getCompletionCallBack();

    protected abstract void addReportedMetrics(ArrayList<DataPointSet> arrayList, long j);

    @Override // org.kairosdb.core.reporting.KairosMetricReporter
    public List<DataPointSet> getMetrics(long j) {
        ArrayList<DataPointSet> arrayList = new ArrayList<>();
        addReportedMetrics(arrayList, j);
        this.m_simpleStatsReporter.reportStats(this.m_batchStats.getAndClear(), j, "kairosdb.queue.batch_stats", arrayList);
        return arrayList;
    }
}
