package org.kairosdb.datastore.cassandra;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.utils.UUIDs;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import javax.annotation.Nullable;
import javax.inject.Named;
import jnr.constants.platform.darwin.RLIM;
import org.agileclick.genorm.Genormous;
import org.apache.commons.lang3.StringUtils;
import org.kairosdb.core.DataPoint;
import org.kairosdb.core.DataPointSet;
import org.kairosdb.core.KairosDataPointFactory;
import org.kairosdb.core.datapoints.DataPointFactory;
import org.kairosdb.core.datapoints.LegacyDataPointFactory;
import org.kairosdb.core.datapoints.LegacyDoubleDataPoint;
import org.kairosdb.core.datapoints.LegacyLongDataPoint;
import org.kairosdb.core.datastore.DataPointRow;
import org.kairosdb.core.datastore.Datastore;
import org.kairosdb.core.datastore.DatastoreMetricQuery;
import org.kairosdb.core.datastore.Order;
import org.kairosdb.core.datastore.QueryCallback;
import org.kairosdb.core.datastore.QueryMetric;
import org.kairosdb.core.datastore.QueryPlugin;
import org.kairosdb.core.datastore.ServiceKeyStore;
import org.kairosdb.core.datastore.ServiceKeyValue;
import org.kairosdb.core.datastore.TagSet;
import org.kairosdb.core.datastore.TagSetImpl;
import org.kairosdb.core.exception.DatastoreException;
import org.kairosdb.core.queue.EventCompletionCallBack;
import org.kairosdb.core.queue.ProcessorHandler;
import org.kairosdb.core.queue.QueueProcessor;
import org.kairosdb.core.reporting.KairosMetricReporter;
import org.kairosdb.core.reporting.ThreadReporter;
import org.kairosdb.datastore.cassandra.CassandraModule;
import org.kairosdb.eventbus.Subscribe;
import org.kairosdb.events.DataPointEvent;
import org.kairosdb.util.IngestExecutorService;
import org.kairosdb.util.KDataInput;
import org.kairosdb.util.MemoryMonitor;
import org.kairosdb.util.SimpleStatsReporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:importkairosdb_130.jar:org/kairosdb/datastore/cassandra/CassandraDatastore.class */
public class CassandraDatastore implements Datastore, ProcessorHandler, KairosMetricReporter, ServiceKeyStore {
    public static final int LONG_FLAG = 0;
    public static final int FLOAT_FLAG = 1;
    public static final long MAX_CQL_BATCH_SIZE = 10000;
    public static final String KEY_QUERY_TIME = "kairosdb.datastore.cassandra.key_query_time";
    public static final String ROW_KEY_COUNT = "kairosdb.datastore.cassandra.row_key_count";
    public static final String RAW_ROW_KEY_COUNT = "kairosdb.datastore.cassandra.raw_row_key_count";
    public static final String ROW_KEY_METRIC_NAMES = "metric_names";
    public static final String ROW_KEY_TAG_NAMES = "tag_names";
    public static final String ROW_KEY_TAG_VALUES = "tag_values";
    private final ClusterConnection m_writeCluster;
    private final ClusterConnection m_metaCluster;
    private final List<ClusterConnection> m_readClusters;
    private final CassandraModule.CQLBatchFactory m_cqlBatchFactory;
    private final Map<String, ClusterConnection> m_clusterMap;
    private final KairosDataPointFactory m_kairosDataPointFactory;
    private final QueueProcessor m_queueProcessor;
    private final IngestExecutorService m_congestionExecutor;
    private final CassandraModule.BatchHandlerFactory m_batchHandlerFactory;
    private final CassandraModule.DeleteBatchHandlerFactory m_deleteBatchHandlerFactory;
    private final CassandraModule.CQLFilteredRowKeyIteratorFactory m_rowKeyFilterFactory;
    private CassandraConfiguration m_cassandraConfiguration;

    @Inject
    @Named(QueueProcessor.BATCH_SIZE)
    private int m_batchSize;
    public static final Logger logger = LoggerFactory.getLogger((Class<?>) CassandraDatastore.class);
    public static final DataPointsRowKeySerializer DATA_POINTS_ROW_KEY_SERIALIZER = new DataPointsRowKeySerializer();
    private static final Charset UTF_8 = Charset.forName("UTF-8");
    private static final IDontCareCallBack s_dontCareCallBack = new IDontCareCallBack();

    @Inject
    private final BatchStats m_batchStats = new BatchStats();

    @Inject
    private DataCache<DataPointsRowKey> m_rowKeyCache = new DataCache<>(1024);

    @Inject
    private DataCache<TimedString> m_metricNameCache = new DataCache<>(1024);

    @Inject
    private SimpleStatsReporter m_simpleStatsReporter = new SimpleStatsReporter();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:importkairosdb_130.jar:org/kairosdb/datastore/cassandra/CassandraDatastore$ClusterCallback.class */
    public interface ClusterCallback {
        ResultSetFuture query(ClusterConnection clusterConnection) throws DatastoreException;
    }

    /* loaded from: input_file:importkairosdb_130.jar:org/kairosdb/datastore/cassandra/CassandraDatastore$ClusterCallbackList.class */
    private interface ClusterCallbackList {
        List<ResultSetFuture> query(ClusterConnection clusterConnection) throws DatastoreException;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:importkairosdb_130.jar:org/kairosdb/datastore/cassandra/CassandraDatastore$DeletingCallback.class */
    public class DeletingCallback implements QueryCallback {
        private String m_metricName;
        RowSpec m_rowSpec;

        /* loaded from: input_file:importkairosdb_130.jar:org/kairosdb/datastore/cassandra/CassandraDatastore$DeletingCallback$DeleteDatePointWriter.class */
        private class DeleteDatePointWriter implements QueryCallback.DataPointWriter {
            private String m_dataType;
            private SortedMap<String, String> m_tags;
            private List<DataPoint> m_dataPoints = new ArrayList();

            public DeleteDatePointWriter(String str, SortedMap<String, String> sortedMap) {
                this.m_dataType = str;
                this.m_tags = sortedMap;
            }

            @Override // org.kairosdb.core.datastore.QueryCallback.DataPointWriter
            public void addDataPoint(DataPoint dataPoint) throws IOException {
                this.m_dataPoints.add(dataPoint);
                if (this.m_dataPoints.size() > CassandraDatastore.this.m_batchSize) {
                    List<DataPoint> list = this.m_dataPoints;
                    this.m_dataPoints = new ArrayList();
                    CassandraDatastore.this.m_congestionExecutor.submit(CassandraDatastore.this.m_deleteBatchHandlerFactory.create(DeletingCallback.this.m_metricName, this.m_tags, list, CassandraDatastore.s_dontCareCallBack, DeletingCallback.this.m_rowSpec));
                }
            }

            @Override // org.kairosdb.core.datastore.QueryCallback.DataPointWriter, java.lang.AutoCloseable
            public void close() throws IOException {
                if (this.m_dataPoints.size() != 0) {
                    CassandraDatastore.this.m_congestionExecutor.submit(CassandraDatastore.this.m_deleteBatchHandlerFactory.create(DeletingCallback.this.m_metricName, this.m_tags, this.m_dataPoints, CassandraDatastore.s_dontCareCallBack, DeletingCallback.this.m_rowSpec));
                }
            }
        }

        public DeletingCallback(String str, RowSpec rowSpec) {
            this.m_metricName = str;
            this.m_rowSpec = rowSpec;
        }

        @Override // org.kairosdb.core.datastore.QueryCallback
        public QueryCallback.DataPointWriter startDataPointSet(String str, SortedMap<String, String> sortedMap) throws IOException {
            return new DeleteDatePointWriter(str, sortedMap);
        }
    }

    /* loaded from: input_file:importkairosdb_130.jar:org/kairosdb/datastore/cassandra/CassandraDatastore$IDontCareCallBack.class */
    private static class IDontCareCallBack implements EventCompletionCallBack {
        private IDontCareCallBack() {
        }

        @Override // org.kairosdb.core.queue.EventCompletionCallBack
        public void complete() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:importkairosdb_130.jar:org/kairosdb/datastore/cassandra/CassandraDatastore$QueryListener.class */
    public class QueryListener implements FutureCallback<ResultSet> {
        private final DataPointsRowKey m_rowKey;
        private final QueryCallback m_callback;
        private final Semaphore m_semaphore;
        private final QueryMonitor m_queryMonitor;
        private final RowSpec m_rowSpec;

        public QueryListener(DataPointsRowKey dataPointsRowKey, QueryCallback queryCallback, Semaphore semaphore, QueryMonitor queryMonitor, RowSpec rowSpec) {
            this.m_rowKey = dataPointsRowKey;
            this.m_callback = queryCallback;
            this.m_semaphore = semaphore;
            this.m_queryMonitor = queryMonitor;
            this.m_rowSpec = rowSpec;
        }

        @Override // com.google.common.util.concurrent.FutureCallback
        public void onSuccess(@Nullable ResultSet resultSet) {
            try {
                try {
                    if (resultSet.isExhausted()) {
                        this.m_semaphore.release();
                        return;
                    }
                    QueryCallback.DataPointWriter startDataPointSet = this.m_callback.startDataPointSet(this.m_rowKey.getDataType(), this.m_rowKey.getTags());
                    Throwable th = null;
                    try {
                        try {
                            DataPointFactory factoryForDataStoreType = CassandraDatastore.this.m_kairosDataPointFactory.getFactoryForDataStoreType(this.m_rowKey.getDataType());
                            while (!resultSet.isExhausted()) {
                                Row one = resultSet.one();
                                int i = one.getBytes(0).getInt();
                                ByteBuffer bytes = one.getBytes(1);
                                long columnTimestamp = this.m_rowSpec.getColumnTimestamp(this.m_rowKey.getTimestamp(), i);
                                if (this.m_rowKey.getDataType() != LegacyDataPointFactory.DATASTORE_TYPE) {
                                    startDataPointSet.addDataPoint(factoryForDataStoreType.getDataPoint(columnTimestamp, KDataInput.createInput(bytes)));
                                } else if (CassandraDatastore.isLongValue(i)) {
                                    startDataPointSet.addDataPoint(new LegacyLongDataPoint(columnTimestamp, ValueSerializer.getLongFromByteBuffer(bytes)));
                                } else {
                                    startDataPointSet.addDataPoint(new LegacyDoubleDataPoint(columnTimestamp, ValueSerializer.getDoubleFromByteBuffer(bytes)));
                                }
                                this.m_queryMonitor.incrementCounter();
                            }
                            if (startDataPointSet != null) {
                                if (0 != 0) {
                                    try {
                                        startDataPointSet.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    startDataPointSet.close();
                                }
                            }
                            this.m_semaphore.release();
                        } catch (Throwable th3) {
                            th = th3;
                            throw th3;
                        }
                    } catch (Throwable th4) {
                        if (startDataPointSet != null) {
                            if (th != null) {
                                try {
                                    startDataPointSet.close();
                                } catch (Throwable th5) {
                                    th.addSuppressed(th5);
                                }
                            } else {
                                startDataPointSet.close();
                            }
                        }
                        throw th4;
                    }
                } catch (Exception e) {
                    CassandraDatastore.logger.error("QueryListener failure on cluster " + this.m_rowKey.getClusterName(), (Throwable) e);
                    this.m_queryMonitor.failQuery(e);
                    this.m_semaphore.release();
                }
            } catch (Throwable th6) {
                this.m_semaphore.release();
                throw th6;
            }
        }

        @Override // com.google.common.util.concurrent.FutureCallback
        public void onFailure(Throwable th) {
            CassandraDatastore.logger.error("Async query failure on cluster " + this.m_rowKey.getClusterName(), th);
            this.m_queryMonitor.failQuery(th);
            this.m_semaphore.release();
        }
    }

    @Inject
    public CassandraDatastore(CassandraConfiguration cassandraConfiguration, @Named("write_cluster") ClusterConnection clusterConnection, @Named("meta_cluster") ClusterConnection clusterConnection2, List<ClusterConnection> list, KairosDataPointFactory kairosDataPointFactory, QueueProcessor queueProcessor, IngestExecutorService ingestExecutorService, CassandraModule.BatchHandlerFactory batchHandlerFactory, CassandraModule.DeleteBatchHandlerFactory deleteBatchHandlerFactory, CassandraModule.CQLFilteredRowKeyIteratorFactory cQLFilteredRowKeyIteratorFactory, CassandraModule.CQLBatchFactory cQLBatchFactory) throws DatastoreException {
        this.m_kairosDataPointFactory = kairosDataPointFactory;
        this.m_queueProcessor = queueProcessor;
        this.m_congestionExecutor = ingestExecutorService;
        this.m_batchHandlerFactory = batchHandlerFactory;
        this.m_deleteBatchHandlerFactory = deleteBatchHandlerFactory;
        this.m_rowKeyFilterFactory = cQLFilteredRowKeyIteratorFactory;
        this.m_writeCluster = clusterConnection;
        this.m_metaCluster = clusterConnection2;
        this.m_readClusters = list;
        this.m_cqlBatchFactory = cQLBatchFactory;
        ImmutableMap.Builder builder = ImmutableMap.builder();
        builder.put(this.m_writeCluster.getClusterName(), this.m_writeCluster);
        for (ClusterConnection clusterConnection3 : list) {
            builder.put(clusterConnection3.getClusterName(), clusterConnection3);
        }
        this.m_clusterMap = builder.build();
        this.m_cassandraConfiguration = cassandraConfiguration;
        this.m_queueProcessor.setProcessorHandler(this);
    }

    private static ByteBuffer serializeEndString(String str) {
        byte[] bytes = str.getBytes(UTF_8);
        int length = bytes.length - 1;
        bytes[length] = (byte) (bytes[length] + 1);
        return ByteBuffer.wrap(bytes);
    }

    public static ByteBuffer serializeString(String str) {
        return ByteBuffer.wrap(str.getBytes(UTF_8));
    }

    public void cleanRowKeyCache() {
        long calculateRowTime = this.m_writeCluster.getRowSpec().calculateRowTime(System.currentTimeMillis());
        for (DataPointsRowKey dataPointsRowKey : this.m_rowKeyCache.getCachedKeys()) {
            if (dataPointsRowKey.getTimestamp() != calculateRowTime) {
                this.m_rowKeyCache.removeKey(dataPointsRowKey);
            }
        }
    }

    @Override // org.kairosdb.core.datastore.Datastore
    public void close() throws InterruptedException {
        this.m_queueProcessor.shutdown();
        this.m_writeCluster.close();
        Iterator<ClusterConnection> it = this.m_readClusters.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
    }

    @Subscribe
    public void putDataPoint(DataPointEvent dataPointEvent) throws DatastoreException {
        Objects.requireNonNull(dataPointEvent.getDataPoint().getDataStoreDataType());
        this.m_queueProcessor.put(dataPointEvent);
    }

    @Override // org.kairosdb.core.queue.ProcessorHandler
    public void handleEvents(List<DataPointEvent> list, EventCompletionCallBack eventCompletionCallBack, boolean z) {
        this.m_congestionExecutor.submit(this.m_batchHandlerFactory.create(list, eventCompletionCallBack, z, this.m_writeCluster.getRowSpec()));
    }

    private List<ResultSetFuture> queryClusters(ClusterCallback clusterCallback) throws DatastoreException {
        ArrayList arrayList = new ArrayList();
        arrayList.add(clusterCallback.query(this.m_writeCluster));
        Iterator<ClusterConnection> it = this.m_readClusters.iterator();
        while (it.hasNext()) {
            ResultSetFuture query = clusterCallback.query(it.next());
            if (query != null) {
                arrayList.add(query);
            }
        }
        return arrayList;
    }

    private List<ResultSetFuture> queryClustersList(ClusterCallbackList clusterCallbackList) throws DatastoreException {
        ArrayList arrayList = new ArrayList();
        List<ResultSetFuture> query = clusterCallbackList.query(this.m_writeCluster);
        if (query != null) {
            arrayList.addAll(query);
        }
        Iterator<ClusterConnection> it = this.m_readClusters.iterator();
        while (it.hasNext()) {
            List<ResultSetFuture> query2 = clusterCallbackList.query(it.next());
            if (query2 != null) {
                arrayList.addAll(query2);
            }
        }
        return arrayList;
    }

    private Iterable<String> queryStringIndex(String str, String str2) throws DatastoreException {
        ListenableFuture allAsList = Futures.allAsList(queryClusters(clusterConnection -> {
            BoundStatement boundStatement = new BoundStatement(clusterConnection.psStringIndexPrefixQuery);
            boundStatement.setBytesUnsafe(0, serializeString(str));
            boundStatement.setBytesUnsafe(1, serializeString(str2));
            boundStatement.setBytesUnsafe(2, serializeEndString(str2));
            boundStatement.setConsistencyLevel(clusterConnection.getReadConsistencyLevel());
            return clusterConnection.executeAsync(boundStatement);
        }));
        HashSet hashSet = new HashSet();
        try {
            for (ResultSet resultSet : (List) allAsList.get()) {
                while (!resultSet.isExhausted()) {
                    hashSet.add(resultSet.one().getString(0));
                }
            }
            return hashSet;
        } catch (Exception e) {
            throw new DatastoreException("CQL Query failure", e);
        }
    }

    private Iterable<String> queryStringIndex(String str) throws DatastoreException {
        ListenableFuture allAsList = Futures.allAsList(queryClusters(clusterConnection -> {
            BoundStatement boundStatement = new BoundStatement(clusterConnection.psStringIndexQuery);
            boundStatement.setBytesUnsafe(0, serializeString(str));
            boundStatement.setConsistencyLevel(clusterConnection.getReadConsistencyLevel());
            return clusterConnection.executeAsync(boundStatement);
        }));
        HashSet hashSet = new HashSet();
        try {
            for (ResultSet resultSet : (List) allAsList.get()) {
                while (!resultSet.isExhausted()) {
                    hashSet.add(resultSet.one().getString(0));
                }
            }
            return hashSet;
        } catch (Exception e) {
            throw new DatastoreException("CQL Query failure", e);
        }
    }

    @Override // org.kairosdb.core.datastore.Datastore
    public Iterable<String> getMetricNames(String str) throws DatastoreException {
        return str == null ? queryStringIndex("metric_names") : queryStringIndex("metric_names", str);
    }

    @Override // org.kairosdb.core.datastore.Datastore
    public Iterable<String> getTagNames() throws DatastoreException {
        return queryStringIndex("tag_names");
    }

    @Override // org.kairosdb.core.datastore.Datastore
    public Iterable<String> getTagValues() throws DatastoreException {
        return queryStringIndex("tag_values");
    }

    @Override // org.kairosdb.core.datastore.Datastore
    public TagSet queryMetricTags(DatastoreMetricQuery datastoreMetricQuery) throws DatastoreException {
        TagSetImpl tagSetImpl = new TagSetImpl();
        Iterator<DataPointsRowKey> keysForQueryIterator = getKeysForQueryIterator(datastoreMetricQuery);
        MemoryMonitor memoryMonitor = new MemoryMonitor(20);
        while (keysForQueryIterator.hasNext()) {
            for (Map.Entry<String, String> entry : keysForQueryIterator.next().getTags().entrySet()) {
                tagSetImpl.addTag(entry.getKey(), entry.getValue());
                memoryMonitor.checkMemoryAndThrowException();
            }
        }
        return tagSetImpl;
    }

    @Override // org.kairosdb.core.datastore.Datastore
    public void indexMetricTags(DatastoreMetricQuery datastoreMetricQuery) throws DatastoreException {
        CQLBatch create = this.m_cqlBatchFactory.create();
        Iterator<DataPointsRowKey> keysForQueryIterator = getKeysForQueryIterator(datastoreMetricQuery);
        MemoryMonitor memoryMonitor = new MemoryMonitor(20);
        long j = 0;
        while (keysForQueryIterator.hasNext()) {
            DataPointsRowKey next = keysForQueryIterator.next();
            create.indexRowKey(next, next.getTtl());
            memoryMonitor.checkMemoryAndThrowException();
            j++;
            if (j % 10000 == 0) {
                create.submitBatch();
                create = this.m_cqlBatchFactory.create();
            }
        }
        create.submitBatch();
    }

    @Override // org.kairosdb.core.datastore.Datastore
    public long getMinTimeValue() {
        return Long.MIN_VALUE;
    }

    @Override // org.kairosdb.core.datastore.Datastore
    public long getMaxTimeValue() {
        return RLIM.MAX_VALUE;
    }

    @Override // org.kairosdb.core.datastore.ServiceKeyStore
    public void setValue(String str, String str2, String str3, String str4) throws DatastoreException {
        BoundStatement boundStatement = new BoundStatement(this.m_metaCluster.psServiceIndexInsert);
        boundStatement.setString(0, str);
        boundStatement.setString(1, str2);
        boundStatement.setString(2, str3);
        boundStatement.setString(3, str4);
        boundStatement.setConsistencyLevel(this.m_metaCluster.getWriteConsistencyLevel());
        this.m_metaCluster.execute(boundStatement);
    }

    @Override // org.kairosdb.core.datastore.ServiceKeyStore
    public ServiceKeyValue getValue(String str, String str2, String str3) throws DatastoreException {
        BoundStatement boundStatement = new BoundStatement(this.m_metaCluster.psServiceIndexGet);
        boundStatement.setString(0, str);
        boundStatement.setString(1, str2);
        boundStatement.setString(2, str3);
        boundStatement.setConsistencyLevel(this.m_metaCluster.getReadConsistencyLevel());
        Row one = this.m_metaCluster.execute(boundStatement).one();
        if (one != null) {
            return new ServiceKeyValue(one.getString(0), new Date(one.getTime(1)));
        }
        return null;
    }

    @Override // org.kairosdb.core.datastore.ServiceKeyStore
    public Iterable<String> listServiceKeys(String str) throws DatastoreException {
        ArrayList arrayList = new ArrayList();
        if (this.m_metaCluster.psServiceIndexListServiceKeys == null) {
            throw new DatastoreException("List Service Keys is not available on this version of Cassandra.");
        }
        BoundStatement boundStatement = new BoundStatement(this.m_metaCluster.psServiceIndexListServiceKeys);
        boundStatement.setString(0, str);
        boundStatement.setConsistencyLevel(this.m_metaCluster.getReadConsistencyLevel());
        ResultSet execute = this.m_metaCluster.execute(boundStatement);
        while (!execute.isExhausted()) {
            arrayList.add(execute.one().getString(0));
        }
        return arrayList;
    }

    @Override // org.kairosdb.core.datastore.ServiceKeyStore
    public Iterable<String> listKeys(String str, String str2) throws DatastoreException {
        ArrayList arrayList = new ArrayList();
        BoundStatement boundStatement = new BoundStatement(this.m_metaCluster.psServiceIndexListKeys);
        boundStatement.setString(0, str);
        boundStatement.setString(1, str2);
        boundStatement.setConsistencyLevel(this.m_metaCluster.getReadConsistencyLevel());
        ResultSet execute = this.m_metaCluster.execute(boundStatement);
        while (!execute.isExhausted()) {
            String string = execute.one().getString(0);
            if (string != null) {
                arrayList.add(string);
            }
        }
        return arrayList;
    }

    @Override // org.kairosdb.core.datastore.ServiceKeyStore
    public Iterable<String> listKeys(String str, String str2, String str3) throws DatastoreException {
        ArrayList arrayList = new ArrayList();
        BoundStatement boundStatement = new BoundStatement(this.m_metaCluster.psServiceIndexListKeysPrefix);
        boundStatement.setString(0, str);
        boundStatement.setString(1, str2);
        boundStatement.setString(2, str3);
        boundStatement.setString(3, str3 + (char) 65535);
        boundStatement.setConsistencyLevel(this.m_metaCluster.getReadConsistencyLevel());
        ResultSet execute = this.m_metaCluster.execute(boundStatement);
        while (!execute.isExhausted()) {
            String string = execute.one().getString(0);
            if (string != null) {
                arrayList.add(string);
            }
        }
        return arrayList;
    }

    @Override // org.kairosdb.core.datastore.ServiceKeyStore
    public void deleteKey(String str, String str2, String str3) throws DatastoreException {
        BoundStatement boundStatement = new BoundStatement(this.m_metaCluster.psServiceIndexDeleteKey);
        boundStatement.setString(0, str);
        boundStatement.setString(1, str2);
        boundStatement.setString(2, str3);
        boundStatement.setConsistencyLevel(this.m_metaCluster.getWriteConsistencyLevel());
        this.m_metaCluster.execute(boundStatement);
        BoundStatement boundStatement2 = new BoundStatement(this.m_metaCluster.psServiceIndexInsertModifiedTime);
        boundStatement2.setString(0, str);
        boundStatement2.setString(1, str2);
        this.m_metaCluster.execute(boundStatement2);
    }

    @Override // org.kairosdb.core.datastore.ServiceKeyStore
    public Date getServiceKeyLastModifiedTime(String str, String str2) throws DatastoreException {
        BoundStatement boundStatement = new BoundStatement(this.m_metaCluster.psServiceIndexModificationTime);
        boundStatement.setString(0, str);
        boundStatement.setString(1, str2);
        Row one = this.m_metaCluster.execute(boundStatement).one();
        return one != null ? new Date(UUIDs.unixTimestamp(one.getUUID(0))) : new Date(0L);
    }

    @Override // org.kairosdb.core.datastore.Datastore
    public void queryDatabase(DatastoreMetricQuery datastoreMetricQuery, QueryCallback queryCallback) throws DatastoreException {
        cqlQueryWithRowKeys(datastoreMetricQuery, queryCallback, getKeysForQueryIterator(datastoreMetricQuery));
    }

    @Override // org.kairosdb.core.reporting.KairosMetricReporter
    public List<DataPointSet> getMetrics(long j) {
        ArrayList arrayList = new ArrayList();
        this.m_simpleStatsReporter.reportStats(this.m_batchStats.getNameStats(), j, "kairosdb.datastore.cassandra.write_batch_size", Genormous.TABLE, ClusterConnection.STRING_INDEX_TABLE_NAME, arrayList);
        this.m_simpleStatsReporter.reportStats(this.m_batchStats.getDataPointStats(), j, "kairosdb.datastore.cassandra.write_batch_size", Genormous.TABLE, ClusterConnection.DATA_POINTS_TABLE_NAME, arrayList);
        this.m_simpleStatsReporter.reportStats(this.m_batchStats.getRowKeyStats(), j, "kairosdb.datastore.cassandra.write_batch_size", Genormous.TABLE, ClusterConnection.ROW_KEYS_NAME, arrayList);
        this.m_simpleStatsReporter.reportStats(this.m_batchStats.getRowKeyTimeStats(), j, "kairosdb.datastore.cassandra.write_batch_size", Genormous.TABLE, ClusterConnection.ROW_KEY_TIME_INDEX_NAME, arrayList);
        this.m_simpleStatsReporter.reportStats(this.m_batchStats.getTagIndexedStats(), j, "kairosdb.datastore.cassandra.write_batch_size", Genormous.TABLE, ClusterConnection.TAG_INDEXED_ROW_KEYS_NAME, arrayList);
        return arrayList;
    }

    private void cqlQueryWithRowKeys(final DatastoreMetricQuery datastoreMetricQuery, QueryCallback queryCallback, Iterator<DataPointsRowKey> it) throws DatastoreException {
        ArrayList arrayList = new ArrayList();
        int i = 0;
        long startTime = datastoreMetricQuery.getStartTime();
        long endTime = datastoreMetricQuery.getEndTime();
        boolean z = datastoreMetricQuery.getLimit() != 0;
        QueryMonitor queryMonitor = new QueryMonitor(this.m_cassandraConfiguration.getQueryLimit(), this.m_cassandraConfiguration.getQueryTimeLimit());
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(this.m_cassandraConfiguration.getQueryReaderThreads(), new ThreadFactory() { // from class: org.kairosdb.datastore.cassandra.CassandraDatastore.1
            private int m_count = 0;

            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                this.m_count++;
                return new Thread(runnable, "query_" + datastoreMetricQuery.getName() + "-" + this.m_count);
            }
        });
        Semaphore semaphore = new Semaphore(this.m_cassandraConfiguration.getSimultaneousQueries());
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            i++;
            DataPointsRowKey next = it.next();
            ClusterConnection clusterConnection = this.m_clusterMap.get(next.getClusterName());
            RowSpec rowSpec = clusterConnection.getRowSpec();
            long rowWidthInMillis = rowSpec.getRowWidthInMillis();
            long timestamp = next.getTimestamp();
            int columnName = startTime < timestamp ? 0 : rowSpec.getColumnName(timestamp, startTime);
            int columnName2 = endTime > timestamp + rowWidthInMillis ? rowSpec.getColumnName(timestamp, timestamp + rowWidthInMillis) + 1 : rowSpec.getColumnName(timestamp, endTime) + 1;
            ByteBuffer allocate = ByteBuffer.allocate(4);
            allocate.putInt(columnName);
            allocate.rewind();
            ByteBuffer allocate2 = ByteBuffer.allocate(4);
            allocate2.putInt(columnName2);
            allocate2.rewind();
            BoundStatement boundStatement = z ? datastoreMetricQuery.getOrder() == Order.ASC ? new BoundStatement(clusterConnection.psDataPointsQueryAscLimit) : new BoundStatement(clusterConnection.psDataPointsQueryDescLimit) : datastoreMetricQuery.getOrder() == Order.ASC ? new BoundStatement(clusterConnection.psDataPointsQueryAsc) : new BoundStatement(clusterConnection.psDataPointsQueryDesc);
            boundStatement.setBytesUnsafe(0, DATA_POINTS_ROW_KEY_SERIALIZER.toByteBuffer(next));
            boundStatement.setBytesUnsafe(1, allocate);
            boundStatement.setBytesUnsafe(2, allocate2);
            if (z) {
                boundStatement.setInt(3, datastoreMetricQuery.getLimit());
            }
            boundStatement.setConsistencyLevel(clusterConnection.getReadConsistencyLevel());
            try {
                semaphore.acquire();
            } catch (InterruptedException e) {
                queryMonitor.failQuery(e);
            }
            if (queryMonitor.keepRunning()) {
                ResultSetFuture executeAsync = clusterConnection.executeAsync(boundStatement);
                arrayList.add(executeAsync);
                Futures.addCallback(executeAsync, new QueryListener(next, queryCallback, semaphore, queryMonitor, rowSpec), newFixedThreadPool);
            } else {
                Iterator it2 = arrayList.iterator();
                while (it2.hasNext()) {
                    ((ResultSetFuture) it2.next()).cancel(true);
                }
            }
        }
        ThreadReporter.addDataPoint(ROW_KEY_COUNT, i);
        try {
            if (queryMonitor.getException() == null) {
                semaphore.acquire(this.m_cassandraConfiguration.getSimultaneousQueries());
            }
            newFixedThreadPool.shutdown();
        } catch (InterruptedException e2) {
            logger.error("Query interrupted", (Throwable) e2);
        }
        if (queryMonitor.getException() != null) {
            throw new DatastoreException(queryMonitor.getException());
        }
    }

    private void deletePartialRow(DataPointsRowKey dataPointsRowKey, long j, long j2, ClusterConnection clusterConnection) throws DatastoreException {
        RowSpec rowSpec = clusterConnection.getRowSpec();
        if (clusterConnection.psDataPointsDeleteRange == null) {
            QueryMetric queryMetric = new QueryMetric(j, j2, 0, dataPointsRowKey.getMetricName());
            cqlQueryWithRowKeys(queryMetric, new DeletingCallback(queryMetric.getName(), rowSpec), Collections.singletonList(dataPointsRowKey).iterator());
            return;
        }
        BoundStatement boundStatement = new BoundStatement(clusterConnection.psDataPointsDeleteRange);
        boundStatement.setBytesUnsafe(0, DATA_POINTS_ROW_KEY_SERIALIZER.toByteBuffer(dataPointsRowKey));
        ByteBuffer allocate = ByteBuffer.allocate(4);
        allocate.putInt(rowSpec.getColumnName(dataPointsRowKey.getTimestamp(), j));
        allocate.rewind();
        boundStatement.setBytesUnsafe(1, allocate);
        ByteBuffer allocate2 = ByteBuffer.allocate(4);
        allocate2.putInt(rowSpec.getColumnName(dataPointsRowKey.getTimestamp(), j2));
        allocate2.rewind();
        boundStatement.setBytesUnsafe(2, allocate2);
        boundStatement.setConsistencyLevel(clusterConnection.getReadConsistencyLevel());
        clusterConnection.executeAsync(boundStatement);
    }

    @Override // org.kairosdb.core.datastore.Datastore
    public void deleteDataPoints(DatastoreMetricQuery datastoreMetricQuery) throws DatastoreException {
        Objects.requireNonNull(datastoreMetricQuery);
        boolean z = false;
        boolean z2 = false;
        if (datastoreMetricQuery.getStartTime() == Long.MIN_VALUE && datastoreMetricQuery.getEndTime() == RLIM.MAX_VALUE) {
            z2 = true;
        }
        Iterator<DataPointsRowKey> keysForQueryIterator = getKeysForQueryIterator(datastoreMetricQuery);
        while (keysForQueryIterator.hasNext()) {
            DataPointsRowKey next = keysForQueryIterator.next();
            ClusterConnection clusterConnection = this.m_clusterMap.get(next.getClusterName());
            long rowWidthInMillis = clusterConnection.getRowSpec().getRowWidthInMillis();
            long timestamp = next.getTimestamp();
            if (datastoreMetricQuery.getStartTime() <= timestamp && datastoreMetricQuery.getEndTime() >= (timestamp + rowWidthInMillis) - 1) {
                clusterConnection.execute(new BoundStatement(clusterConnection.psDataPointsDeleteRow).setBytesUnsafe(0, DATA_POINTS_ROW_KEY_SERIALIZER.toByteBuffer(next)).setConsistencyLevel(clusterConnection.getReadConsistencyLevel()));
                clusterConnection.execute(new BoundStatement(clusterConnection.psRowKeyIndexDelete).setBytesUnsafe(0, serializeString(next.getMetricName())).setBytesUnsafe(1, DATA_POINTS_ROW_KEY_SERIALIZER.toByteBuffer(next)).setConsistencyLevel(clusterConnection.getReadConsistencyLevel()));
                for (Statement statement : clusterConnection.getRowKeyLookupForMetric(next.getMetricName()).createDeleteStatements(next)) {
                    statement.setConsistencyLevel(clusterConnection.getReadConsistencyLevel());
                    clusterConnection.execute(statement);
                }
                if (datastoreMetricQuery.getTags().isEmpty()) {
                    clusterConnection.execute(new BoundStatement(clusterConnection.psRowKeyTimeDelete).setString(0, next.getMetricName()).setString(1, ClusterConnection.DATA_POINTS_TABLE_NAME).setTimestamp(2, new Date(next.getTimestamp())).setConsistencyLevel(clusterConnection.getReadConsistencyLevel()));
                }
                z = true;
            } else if (datastoreMetricQuery.getStartTime() <= timestamp) {
                deletePartialRow(next, timestamp, datastoreMetricQuery.getEndTime(), clusterConnection);
            } else if (datastoreMetricQuery.getEndTime() >= (timestamp + rowWidthInMillis) - 1) {
                deletePartialRow(next, datastoreMetricQuery.getStartTime(), (timestamp + rowWidthInMillis) - 1, clusterConnection);
            } else {
                deletePartialRow(next, datastoreMetricQuery.getStartTime(), datastoreMetricQuery.getEndTime(), clusterConnection);
            }
        }
        if (z2) {
            queryClusters(clusterConnection2 -> {
                BoundStatement boundStatement = new BoundStatement(clusterConnection2.psRowKeyIndexDeleteRow);
                boundStatement.setBytesUnsafe(0, serializeString(datastoreMetricQuery.getName()));
                boundStatement.setConsistencyLevel(clusterConnection2.getReadConsistencyLevel());
                clusterConnection2.executeAsync(boundStatement);
                BoundStatement boundStatement2 = new BoundStatement(clusterConnection2.psStringIndexDelete);
                boundStatement2.setBytesUnsafe(0, serializeString("metric_names"));
                boundStatement2.setBytesUnsafe(1, serializeString(datastoreMetricQuery.getName()));
                boundStatement2.setConsistencyLevel(clusterConnection2.getReadConsistencyLevel());
                clusterConnection2.executeAsync(boundStatement2);
                return null;
            });
            z = true;
            this.m_metricNameCache.clear();
        }
        if (z) {
            this.m_rowKeyCache.clear();
        }
    }

    private SortedMap<String, String> getTags(DataPointRow dataPointRow) {
        TreeMap treeMap = new TreeMap();
        for (String str : dataPointRow.getTagNames()) {
            treeMap.put(str, dataPointRow.getTagValue(str));
        }
        return treeMap;
    }

    public Iterator<DataPointsRowKey> getKeysForQueryIterator(DatastoreMetricQuery datastoreMetricQuery) throws DatastoreException {
        Iterator<DataPointsRowKey> it = null;
        Iterator<QueryPlugin> it2 = datastoreMetricQuery.getPlugins().iterator();
        while (true) {
            if (!it2.hasNext()) {
                break;
            }
            QueryPlugin next = it2.next();
            if (next instanceof CassandraRowKeyPlugin) {
                it = ((CassandraRowKeyPlugin) next).getKeysForQueryIterator(datastoreMetricQuery);
                break;
            }
        }
        if (it != null || datastoreMetricQuery.isExplicitTags()) {
        }
        if (it == null) {
            ArrayList arrayList = new ArrayList();
            if (this.m_writeCluster.containRange(datastoreMetricQuery.getStartTime(), datastoreMetricQuery.getEndTime())) {
                arrayList.add(this.m_rowKeyFilterFactory.create(this.m_writeCluster, datastoreMetricQuery.getName(), datastoreMetricQuery.getStartTime(), datastoreMetricQuery.getEndTime(), datastoreMetricQuery.getTags()));
            }
            for (ClusterConnection clusterConnection : this.m_readClusters) {
                if (clusterConnection.containRange(datastoreMetricQuery.getStartTime(), datastoreMetricQuery.getEndTime())) {
                    arrayList.add(this.m_rowKeyFilterFactory.create(clusterConnection, datastoreMetricQuery.getName(), datastoreMetricQuery.getStartTime(), datastoreMetricQuery.getEndTime(), datastoreMetricQuery.getTags()));
                }
            }
            it = Iterators.concat(arrayList.iterator());
        }
        return it;
    }

    private static int getColumnName(long j, long j2, boolean z) {
        int i = (int) (j2 - j);
        return z ? (i << 1) | 0 : (i << 1) | 1;
    }

    public static boolean isLongValue(int i) {
        return (i & 1) == 0;
    }

    private void printHosts(Iterator<Host> it) {
        StringBuilder sb = new StringBuilder();
        while (it.hasNext()) {
            sb.append(it.next().toString()).append(StringUtils.SPACE);
        }
        System.out.println(sb.toString());
    }
}
