Created
May 30, 2015 04:35
-
-
Save jeffjirsa/553612b26630b335725c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java | |
index 15287bd..9d0b086 100644 | |
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java | |
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java | |
@@ -123,7 +123,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy | |
private List<SSTableReader> getCompactionCandidates(Iterable<SSTableReader> candidateSSTables, long now, int base) | |
{ | |
- Iterable<SSTableReader> candidates = filterOldSSTables(Lists.newArrayList(candidateSSTables), options.maxSSTableAge, now); | |
+ Iterable<SSTableReader> candidates = filterOldSSTables(Lists.newArrayList(candidateSSTables), options.maxSSTableAge, now, options.useMinTimestamp); | |
List<List<SSTableReader>> buckets = getBuckets(createSSTableAndMinTimestampPairs(candidates), options.baseTime, base, now); | |
logger.debug("Compaction buckets are {}", buckets); | |
@@ -158,23 +158,39 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy | |
* Removes all sstables with max timestamp older than maxSSTableAge. | |
* @param sstables all sstables to consider | |
* @param maxSSTableAge the age in milliseconds when an SSTable stops participating in compactions | |
- * @param now current time. SSTables with max timestamp less than (now - maxSSTableAge) are filtered. | |
+ * @param now current time. SSTables with min/max timestamp less than (now - maxSSTableAge) are filtered. | |
+ * @param useMinTimestamp Toggles decision to use sstables' min vs max timestamp when filtering | |
* @return a list of sstables with the oldest sstables excluded | |
*/ | |
@VisibleForTesting | |
- static Iterable<SSTableReader> filterOldSSTables(List<SSTableReader> sstables, long maxSSTableAge, long now) | |
+ static Iterable<SSTableReader> filterOldSSTables(List<SSTableReader> sstables, long maxSSTableAge, long now, boolean useMinTimestamp) | |
{ | |
if (maxSSTableAge == 0) | |
return sstables; | |
final long cutoff = now - maxSSTableAge; | |
- return Iterables.filter(sstables, new Predicate<SSTableReader>() | |
+ if (useMinTimestamp) | |
{ | |
- @Override | |
- public boolean apply(SSTableReader sstable) | |
+ logger.debug("Filtering old sstables using sstable minimum timestamp greater or equal to {}", maxSSTableAge); | |
+ return Iterables.filter(sstables, new Predicate<SSTableReader>() | |
{ | |
- return sstable.getMaxTimestamp() >= cutoff; | |
- } | |
- }); | |
+ @Override | |
+ public boolean apply(SSTableReader sstable) | |
+ { | |
+ return sstable.getMinTimestamp() >= cutoff; | |
+ } | |
+ }); | |
+ } | |
+ else | |
+ { | |
+ return Iterables.filter(sstables, new Predicate<SSTableReader>() | |
+ { | |
+ @Override | |
+ public boolean apply(SSTableReader sstable) | |
+ { | |
+ return sstable.getMaxTimestamp() >= cutoff; | |
+ } | |
+ }); | |
+ } | |
} | |
/** | |
diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java | |
index f54c020..5e13be8 100644 | |
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java | |
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java | |
@@ -27,12 +27,15 @@ public final class DateTieredCompactionStrategyOptions | |
protected static final TimeUnit DEFAULT_TIMESTAMP_RESOLUTION = TimeUnit.MICROSECONDS; | |
protected static final double DEFAULT_MAX_SSTABLE_AGE_DAYS = 365; | |
protected static final long DEFAULT_BASE_TIME_SECONDS = 60; | |
+ protected static final boolean DEFAULT_USE_MIN_TIMESTAMP = false; | |
protected static final String TIMESTAMP_RESOLUTION_KEY = "timestamp_resolution"; | |
protected static final String MAX_SSTABLE_AGE_KEY = "max_sstable_age_days"; | |
protected static final String BASE_TIME_KEY = "base_time_seconds"; | |
+ protected static final String USE_MIN_TIMESTAMP_KEY = "use_min_timestamp"; | |
protected final long maxSSTableAge; | |
protected final long baseTime; | |
+ protected final boolean useMinTimestamp; | |
public DateTieredCompactionStrategyOptions(Map<String, String> options) | |
{ | |
@@ -43,12 +46,15 @@ public final class DateTieredCompactionStrategyOptions | |
maxSSTableAge = Math.round(fractionalDays * timestampResolution.convert(1, TimeUnit.DAYS)); | |
optionValue = options.get(BASE_TIME_KEY); | |
baseTime = timestampResolution.convert(optionValue == null ? DEFAULT_BASE_TIME_SECONDS : Long.parseLong(optionValue), TimeUnit.SECONDS); | |
+ optionValue = options.get(USE_MIN_TIMESTAMP_KEY); | |
+ useMinTimestamp = Boolean.parseBoolean(optionValue) ? Boolean.parseBoolean(optionValue) : DEFAULT_USE_MIN_TIMESTAMP; | |
} | |
public DateTieredCompactionStrategyOptions() | |
{ | |
maxSSTableAge = Math.round(DEFAULT_MAX_SSTABLE_AGE_DAYS * DEFAULT_TIMESTAMP_RESOLUTION.convert(1, TimeUnit.DAYS)); | |
baseTime = DEFAULT_TIMESTAMP_RESOLUTION.convert(DEFAULT_BASE_TIME_SECONDS, TimeUnit.SECONDS); | |
+ useMinTimestamp = DEFAULT_USE_MIN_TIMESTAMP; | |
} | |
public static Map<String, String> validateOptions(Map<String, String> options, Map<String, String> uncheckedOptions) throws ConfigurationException | |
@@ -92,9 +98,19 @@ public final class DateTieredCompactionStrategyOptions | |
throw new ConfigurationException(String.format("%s is not a parsable int (base10) for %s", optionValue, BASE_TIME_KEY), e); | |
} | |
+ optionValue = options.get(USE_MIN_TIMESTAMP_KEY); | |
+ if(optionValue != null) | |
+ { | |
+ if (!optionValue.equalsIgnoreCase("true") && !optionValue.equalsIgnoreCase("false")) | |
+ { | |
+ throw new ConfigurationException(String.format("%s should either be 'true' or 'false', not %s", USE_MIN_TIMESTAMP_KEY, optionValue)); | |
+ } | |
+ } | |
+ | |
uncheckedOptions.remove(MAX_SSTABLE_AGE_KEY); | |
uncheckedOptions.remove(BASE_TIME_KEY); | |
uncheckedOptions.remove(TIMESTAMP_RESOLUTION_KEY); | |
+ uncheckedOptions.remove(USE_MIN_TIMESTAMP_KEY); | |
return uncheckedOptions; | |
} | |
diff --git a/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java | |
index 14e22f0..3e4d73b 100644 | |
--- a/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java | |
+++ b/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java | |
@@ -261,16 +261,54 @@ public class DateTieredCompactionStrategyTest extends SchemaLoader | |
Iterable<SSTableReader> filtered; | |
List<SSTableReader> sstrs = new ArrayList<>(cfs.getSSTables()); | |
- filtered = filterOldSSTables(sstrs, 0, 2); | |
+ filtered = filterOldSSTables(sstrs, 0, 2, false); | |
assertEquals("when maxSSTableAge is zero, no sstables should be filtered", sstrs.size(), Iterables.size(filtered)); | |
- filtered = filterOldSSTables(sstrs, 1, 2); | |
+ filtered = filterOldSSTables(sstrs, 1, 2, false); | |
assertEquals("only the newest 2 sstables should remain", 2, Iterables.size(filtered)); | |
- filtered = filterOldSSTables(sstrs, 1, 3); | |
+ filtered = filterOldSSTables(sstrs, 1, 3, false); | |
assertEquals("only the newest sstable should remain", 1, Iterables.size(filtered)); | |
- filtered = filterOldSSTables(sstrs, 1, 4); | |
+ filtered = filterOldSSTables(sstrs, 1, 4, false); | |
+ assertEquals("no sstables should remain when all are too old", 0, Iterables.size(filtered)); | |
+ } | |
+ | |
+ @Test | |
+ public void testFilterOldSSTablesByMinKey() | |
+ { | |
+ Keyspace keyspace = Keyspace.open(KEYSPACE1); | |
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1); | |
+ cfs.truncateBlocking(); | |
+ cfs.disableAutoCompaction(); | |
+ | |
+ ByteBuffer value = ByteBuffer.wrap(new byte[100]); | |
+ | |
+ // create 3 sstables | |
+ int numSSTables = 3; | |
+ for (int r = 0; r < numSSTables; r++) | |
+ { | |
+ DecoratedKey key = Util.dk(String.valueOf(r)); | |
+ Mutation rm = new Mutation(KEYSPACE1, key.getKey()); | |
+ rm.add(CF_STANDARD1, Util.cellname("column"), value, r); | |
+ rm.apply(); | |
+ cfs.forceBlockingFlush(); | |
+ } | |
+ cfs.forceBlockingFlush(); | |
+ | |
+ Iterable<SSTableReader> filtered; | |
+ List<SSTableReader> sstrs = new ArrayList<>(cfs.getSSTables()); | |
+ | |
+ filtered = filterOldSSTables(sstrs, 0, 2, true); | |
+ assertEquals("when maxSSTableAge is zero, no sstables should be filtered", sstrs.size(), Iterables.size(filtered)); | |
+ | |
+ filtered = filterOldSSTables(sstrs, 1, 2, true); | |
+ assertEquals("only the newest 2 sstables should remain", 2, Iterables.size(filtered)); | |
+ | |
+ filtered = filterOldSSTables(sstrs, 1, 3, true); | |
+ assertEquals("only the newest sstable should remain", 1, Iterables.size(filtered)); | |
+ | |
+ filtered = filterOldSSTables(sstrs, 1, 4, true); | |
assertEquals("no sstables should remain when all are too old", 0, Iterables.size(filtered)); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment