diff --git a/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java b/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
index 66d4469bf9..1a94f4d453 100644
--- a/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
+++ b/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
@@ -151,6 +151,8 @@ public class HBaseStorage extends LoadFunc implements StoreFuncInterface, LoadPu
private final long limit_;
private final int caching_;
private final boolean noWAL_;
+ private final long minTimeRange;
+ private final long maxTimeRange;
protected transient byte[] gt_;
protected transient byte[] gte_;
@@ -176,6 +178,8 @@ private static void populateValidOptions() {
validOptions_.addOption("caster", true, "Caster to use for converting values. A class name, " +
"HBaseBinaryConverter, or Utf8StorageConverter. For storage, casters must implement LoadStoreCaster.");
validOptions_.addOption("noWAL", false, "Sets the write ahead to false for faster loading. To be used with extreme caution since this could result in data loss (see http://hbase.apache.org/book.html#perf.hbase.client.putwal).");
+ validOptions_.addOption("minTimeRange", true, "Timestamp most be greater then this value");
+ validOptions_.addOption("maxTimeRange", true, "Timestamp must be less then this value");
}
/**
@@ -214,6 +218,8 @@ public HBaseStorage(String columnList) throws ParseException, IOException {
*
-ignoreWhitespace=(true|false) ignore spaces when parsing column names (default true)
* -caching=numRows number of rows to cache (faster scans, more memory).
* -noWAL=(true|false) Sets the write ahead to false for faster loading.
+ * -minTimeRange= min Scan TimeRagne
+ * -maxTimeRange= max Scan TimeRange
* To be used with extreme caution, since this could result in data loss
* (see http://hbase.apache.org/book.html#perf.hbase.client.putwal).
*
@@ -227,7 +233,7 @@ public HBaseStorage(String columnList, String optString) throws ParseException,
configuredOptions_ = parser_.parse(validOptions_, optsArr);
} catch (ParseException e) {
HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] [-columnPrefix] [-caching] [-caster] [-noWAL] [-limit] [-delim] [-ignoreWhitespace]", validOptions_ );
+ formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] [-columnPrefix] [-caching] [-caster] [-noWAL] [-limit] [-delim] [-ignoreWhitespace] [-minTimeRange] [-maxTimeRange]", validOptions_ );
throw e;
}
@@ -245,6 +251,8 @@ public HBaseStorage(String columnList, String optString) throws ParseException,
ignoreWhitespace_ = false;
}
}
+
+
columnInfo_ = parseColumnList(columnList, delimiter_, ignoreWhitespace_);
@@ -271,6 +279,21 @@ public HBaseStorage(String columnList, String optString) throws ParseException,
caching_ = Integer.valueOf(configuredOptions_.getOptionValue("caching", "100"));
limit_ = Long.valueOf(configuredOptions_.getOptionValue("limit", "-1"));
noWAL_ = configuredOptions_.hasOption("noWAL");
+
+ if (configuredOptions_.hasOption("minTimeRange")){
+ minTimeRange = Long.parseLong(configuredOptions_.getOptionValue("minTimeRange"));
+ }else
+ {
+ minTimeRange = Long.MIN_VALUE;
+ }
+
+ if (configuredOptions_.hasOption("maxTimeRange")){
+ maxTimeRange = Long.parseLong(configuredOptions_.getOptionValue("maxTimeRange"));
+ }else
+ {
+ maxTimeRange = Long.MIN_VALUE;
+ }
+
initScan();
}
@@ -327,7 +350,7 @@ private List parseColumnList(String columnList,
return columnInfo;
}
- private void initScan() {
+ private void initScan() throws IOException {
scan = new Scan();
// Map-reduce jobs should not run with cacheBlocks
@@ -350,6 +373,9 @@ private void initScan() {
lte_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("lte")));
addRowFilter(CompareOp.LESS_OR_EQUAL, lte_);
}
+ if (configuredOptions_.hasOption("minTimeRange") || configuredOptions_.hasOption("maxTimeRange")){
+ scan.setTimeRange(minTimeRange, maxTimeRange);
+ }
// apply any column filters
FilterList allColumnFilters = null;