diff --git a/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java b/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java index 66d4469bf9..1a94f4d453 100644 --- a/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java +++ b/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java @@ -151,6 +151,8 @@ public class HBaseStorage extends LoadFunc implements StoreFuncInterface, LoadPu private final long limit_; private final int caching_; private final boolean noWAL_; + private final long minTimeRange; + private final long maxTimeRange; protected transient byte[] gt_; protected transient byte[] gte_; @@ -176,6 +178,8 @@ private static void populateValidOptions() { validOptions_.addOption("caster", true, "Caster to use for converting values. A class name, " + "HBaseBinaryConverter, or Utf8StorageConverter. For storage, casters must implement LoadStoreCaster."); validOptions_.addOption("noWAL", false, "Sets the write ahead to false for faster loading. To be used with extreme caution since this could result in data loss (see http://hbase.apache.org/book.html#perf.hbase.client.putwal)."); + validOptions_.addOption("minTimeRange", true, "Timestamp most be greater then this value"); + validOptions_.addOption("maxTimeRange", true, "Timestamp must be less then this value"); } /** @@ -214,6 +218,8 @@ public HBaseStorage(String columnList) throws ParseException, IOException { *
  • -ignoreWhitespace=(true|false) ignore spaces when parsing column names (default true) *
  • -caching=numRows number of rows to cache (faster scans, more memory). *
  • -noWAL=(true|false) Sets the write ahead to false for faster loading. + *
  • -minTimeRange= min Scan TimeRagne + *
  • -maxTimeRange= max Scan TimeRange * To be used with extreme caution, since this could result in data loss * (see http://hbase.apache.org/book.html#perf.hbase.client.putwal). * @@ -227,7 +233,7 @@ public HBaseStorage(String columnList, String optString) throws ParseException, configuredOptions_ = parser_.parse(validOptions_, optsArr); } catch (ParseException e) { HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] [-columnPrefix] [-caching] [-caster] [-noWAL] [-limit] [-delim] [-ignoreWhitespace]", validOptions_ ); + formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] [-columnPrefix] [-caching] [-caster] [-noWAL] [-limit] [-delim] [-ignoreWhitespace] [-minTimeRange] [-maxTimeRange]", validOptions_ ); throw e; } @@ -245,6 +251,8 @@ public HBaseStorage(String columnList, String optString) throws ParseException, ignoreWhitespace_ = false; } } + + columnInfo_ = parseColumnList(columnList, delimiter_, ignoreWhitespace_); @@ -271,6 +279,21 @@ public HBaseStorage(String columnList, String optString) throws ParseException, caching_ = Integer.valueOf(configuredOptions_.getOptionValue("caching", "100")); limit_ = Long.valueOf(configuredOptions_.getOptionValue("limit", "-1")); noWAL_ = configuredOptions_.hasOption("noWAL"); + + if (configuredOptions_.hasOption("minTimeRange")){ + minTimeRange = Long.parseLong(configuredOptions_.getOptionValue("minTimeRange")); + }else + { + minTimeRange = Long.MIN_VALUE; + } + + if (configuredOptions_.hasOption("maxTimeRange")){ + maxTimeRange = Long.parseLong(configuredOptions_.getOptionValue("maxTimeRange")); + }else + { + maxTimeRange = Long.MIN_VALUE; + } + initScan(); } @@ -327,7 +350,7 @@ private List parseColumnList(String columnList, return columnInfo; } - private void initScan() { + private void initScan() throws IOException { scan = new Scan(); // Map-reduce jobs should not run with cacheBlocks @@ -350,6 +373,9 @@ private void initScan() { lte_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("lte"))); addRowFilter(CompareOp.LESS_OR_EQUAL, lte_); } + if (configuredOptions_.hasOption("minTimeRange") || configuredOptions_.hasOption("maxTimeRange")){ + scan.setTimeRange(minTimeRange, maxTimeRange); + } // apply any column filters FilterList allColumnFilters = null;