diff --git a/README.md b/README.md index ac38265..b461661 100644 --- a/README.md +++ b/README.md @@ -263,7 +263,7 @@ This library provides several Lua modules to help OpenResty/ngx_lua users to con limit the traffic, either request rate or request concurrency (or both). -* [resty.limit.req](lib/resty/limit/req.md) provides request rate limiting and adjustment based on the "leaky bucket" method. +* [resty.limit.req](lib/resty/limit/req.md) provides request rate limiting and adjustment based on the "leaky bucket" + "sliding window" method. * [resty.limit.count](lib/resty/limit/count.md) provides rate limiting based on a "fixed window" implementation since OpenResty 1.13.6.1+. * [resty.limit.conn](lib/resty/limit/conn.md) provides request concurrency level limiting and adjustment based on extra delays. * [resty.limit.traffic](lib/resty/limit/traffic.md) provides an aggregator to combine multiple instances of the [resty.limit.req](lib/resty/limit/req.md), [resty.limit.count](lib/resty/limit/count.md), or [resty.limit.conn](lib/resty/limit/conn.md) classes (or all). diff --git a/lib/resty/limit/conn.lua b/lib/resty/limit/conn.lua index bf0e7ca..3b2086e 100644 --- a/lib/resty/limit/conn.lua +++ b/lib/resty/limit/conn.lua @@ -12,7 +12,12 @@ local floor = math.floor local ngx_shared = ngx.shared local assert = assert - +---@class resty.limit.conn +---@field dict ngx.shared.DICT +---@field max number +---@field burst number +---@field unit_delay number +---@field committed boolean? local _M = { _VERSION = '0.09' } @@ -22,7 +27,11 @@ local mt = { __index = _M } - +---@param dict_name string +---@param max integer +---@param burst integer +---@param default_conn_delay number +---@return table?, string? function _M.new(dict_name, max, burst, default_conn_delay) local dict = ngx_shared[dict_name] if not dict then @@ -33,21 +42,24 @@ function _M.new(dict_name, max, burst, default_conn_delay) local self = { dict = dict, - max = max + 0, -- just to ensure the param is good + max = max + 0, -- just to ensure the param is good burst = burst, unit_delay = default_conn_delay, } - return setmetatable(self, mt) + return setmetatable(self, mt), nil end - +---@param key string|number +---@param commit boolean +---@return number?, number|((ngx.shared.DICT.error)?) function _M.incoming(self, key, commit) local dict = self.dict local max = self.max self.committed = false + ---@type integer?, (ngx.shared.DICT.error)? local conn, err if commit then conn, err = dict:incr(key, 1, 0) @@ -63,7 +75,6 @@ function _M.incoming(self, key, commit) return nil, "rejected" end self.committed = true - else conn = (dict:get(key) or 0) + 1 if conn > max + self.burst then @@ -80,12 +91,14 @@ function _M.incoming(self, key, commit) return 0, conn end - +---@return boolean? function _M.is_committed(self) return self.committed end - +---@param key string|number +---@param req_latency number? +---@return integer?, (ngx.shared.DICT.error)? function _M.leaving(self, key, req_latency) assert(key) local dict = self.dict @@ -103,7 +116,8 @@ function _M.leaving(self, key, req_latency) return conn end - +---@param key string|number +---@return integer?, (ngx.shared.DICT.error)?, boolean function _M.uncommit(self, key) assert(key) local dict = self.dict @@ -111,15 +125,14 @@ function _M.uncommit(self, key) return dict:incr(key, -1) end - -function _M.set_conn(self, conn) - self.max = conn +---@param max_conn integer +function _M.set_conn(self, max_conn) + self.max = max_conn end - +---@param burst integer function _M.set_burst(self, burst) self.burst = burst end - return _M diff --git a/lib/resty/limit/count.lua b/lib/resty/limit/count.lua index c75e758..0d5ed77 100644 --- a/lib/resty/limit/count.lua +++ b/lib/resty/limit/count.lua @@ -6,8 +6,12 @@ local setmetatable = setmetatable local assert = assert +---@class resty.limit.count +---@field dict ngx.shared.DICT +---@field limit integer +---@field window integer local _M = { - _VERSION = '0.09' + _VERSION = '0.09' } @@ -15,6 +19,7 @@ local mt = { __index = _M } +---@type boolean local incr_support_init_ttl local ngx_config = ngx.config local ngx_lua_v = ngx.config.ngx_lua_version @@ -25,8 +30,12 @@ else end --- the "limit" argument controls number of request allowed in a time window. --- time "window" argument controls the time window in seconds. +--- the "limit" argument controls number of request allowed in a time window. +--- time "window" argument controls the time window in seconds. +---@param dict_name string +---@param limit integer +---@param window integer +---@return resty.limit.count?, string? function _M.new(dict_name, limit, window) local dict = ngx_shared[dict_name] if not dict then @@ -41,16 +50,21 @@ function _M.new(dict_name, limit, window) window = window, } - return setmetatable(self, mt) + return setmetatable(self, mt), nil end -- incoming function using incr with init_ttl -- need OpenResty version > v0.10.12rc2 +---@param self resty.limit.count +---@param key string|number +---@param commit boolean +---@return integer?, number|((ngx.shared.DICT.error)?) local function incoming_new(self, key, commit) local dict = self.dict local limit = self.limit local window = self.window + ---@type integer?, (ngx.shared.DICT.error)? local remaining, err if commit then @@ -70,11 +84,16 @@ local function incoming_new(self, key, commit) end -- incoming function using incr and expire +---@param self resty.limit.count +---@param key string|number +---@param commit boolean +---@return integer?, number|((ngx.shared.DICT.error)?) local function incoming_old(self, key, commit) local dict = self.dict local limit = self.limit local window = self.window + ---@type integer?, boolean?, (ngx.shared.DICT.error)? local remaining, ok, err if commit then @@ -96,13 +115,11 @@ local function incoming_old(self, key, commit) if not ok then return nil, err end - else return nil, err end end end - else remaining = (dict:get(key) or limit) - 1 end @@ -117,6 +134,9 @@ end _M.incoming = incr_support_init_ttl and incoming_new or incoming_old -- uncommit remaining and return remaining value +---@param self resty.limit.count +---@param key string|number +---@return integer?, (ngx.shared.DICT.error)? function _M.uncommit(self, key) assert(key) local dict = self.dict @@ -134,5 +154,4 @@ function _M.uncommit(self, key) return remaining end - return _M diff --git a/lib/resty/limit/req.lua b/lib/resty/limit/req.lua index 229462d..181a095 100644 --- a/lib/resty/limit/req.lua +++ b/lib/resty/limit/req.lua @@ -4,39 +4,23 @@ -- module. -local ffi = require "ffi" local math = require "math" - local ngx_shared = ngx.shared local ngx_now = ngx.now local setmetatable = setmetatable -local ffi_cast = ffi.cast -local ffi_str = ffi.string -local abs = math.abs -local tonumber = tonumber local type = type local assert = assert -local max = math.max - --- TODO: we could avoid the tricky FFI cdata when lua_shared_dict supports --- hash-typed values as in redis. -ffi.cdef[[ - struct lua_resty_limit_req_rec { - unsigned long excess; - uint64_t last; /* time in milliseconds */ - /* integer value, 1 corresponds to 0.001 r/s */ - }; -]] -local const_rec_ptr_type = ffi.typeof("const struct lua_resty_limit_req_rec*") -local rec_size = ffi.sizeof("struct lua_resty_limit_req_rec") --- we can share the cdata here since we only need it temporarily for --- serialization inside the shared dict: -local rec_cdata = ffi.new("struct lua_resty_limit_req_rec") +--- store 3 seconds of request count history +local KEY_TTL = 3 +---@class resty.limit.req +---@field dict ngx.shared.DICT +---@field rate number +---@field burst number local _M = { _VERSION = '0.09' } @@ -47,6 +31,10 @@ local mt = { } +---@param dict_name string +---@param rate number +---@param burst number +---@return resty.limit.req?, string? function _M.new(dict_name, rate, burst) local dict = ngx_shared[dict_name] if not dict then @@ -57,97 +45,98 @@ function _M.new(dict_name, rate, burst) local self = { dict = dict, - rate = rate * 1000, - burst = burst * 1000, + rate = rate, + burst = burst, } - return setmetatable(self, mt) + return setmetatable(self, mt), nil end - -- sees an new incoming event -- the "commit" argument controls whether should we record the event in shm. --- FIXME we have a (small) race-condition window between dict:get() and --- dict:set() across multiple nginx worker processes. The size of the --- window is proportional to the number of workers. +---@param self resty.limit.req +---@param key string|number +---@param commit boolean function _M.incoming(self, key, commit) local dict = self.dict local rate = self.rate - local now = ngx_now() * 1000 - - local excess - - -- it's important to anchor the string value for the read-only pointer - -- cdata: - local v = dict:get(key) - if v then - if type(v) ~= "string" or #v ~= rec_size then - return nil, "shdict abused by other users" - end - local rec = ffi_cast(const_rec_ptr_type, v) - local elapsed = now - tonumber(rec.last) + local now_sec = ngx_now() + local now_ms = now_sec * 1000.0 + local current_second = math.floor(now_sec) + local current_second_key = key .. ":" .. tostring(current_second) + local previous_second_key = key .. ":" .. tostring(current_second - 1) + + local prev_req_count = dict:get(previous_second_key) + if not prev_req_count or type(prev_req_count) ~= "number" then + prev_req_count = 0 + end - -- print("elapsed: ", elapsed, "ms") + local curr_req_count = dict:incr(current_second_key, 1, 0, KEY_TTL) + if not curr_req_count then + --- something is really wrong + --- possibly oom in the shared dict + return nil, "failed to increment request count" + end - -- we do not handle changing rate values specifically. the excess value - -- can get automatically adjusted by the following formula with new rate - -- values rather quickly anyway. - excess = max(tonumber(rec.excess) - rate * abs(elapsed) / 1000 + 1000, - 0) + --- now check if we are within limits + local elapsed = now_ms - (current_second * 1000.0) - -- print("excess: ", excess) + -- sliding window approach + -- we assume that requests were uniformly distributed in the last second + local sliding_window_rate = curr_req_count + (prev_req_count * (1000.0 - elapsed) / 1000.0) - if excess > self.burst then - return nil, "rejected" - end + local to_reject = false + local to_delay = false + local delay_ms = 0 - else - excess = 0 + if sliding_window_rate > (rate + self.burst) then + to_reject = true + elseif sliding_window_rate > rate then + to_delay = true + delay_ms = (sliding_window_rate * 1000 / rate) - 1000 end - if commit then - rec_cdata.excess = excess - rec_cdata.last = now - dict:set(key, ffi_str(rec_cdata, rec_size)) + if not commit then + dict:incr(current_second_key, -1) end - -- return the delay in seconds, as well as excess - return excess / rate, excess / 1000 + if to_reject then + return nil, "rejected" + elseif to_delay then + return delay_ms / 1000.0, sliding_window_rate - rate + else + return 0, 0 + end end - +---@param self resty.limit.req +---@param key string|number +---@return boolean?, string? function _M.uncommit(self, key) assert(key) local dict = self.dict - local v = dict:get(key) - if not v then - return nil, "not found" - end + local now_sec = ngx_now() + local current_second = math.floor(now_sec) + local current_second_key = key .. ":" .. tostring(current_second) - if type(v) ~= "string" or #v ~= rec_size then - return nil, "shdict abused by other users" + local curr_req_count, err = dict:incr(current_second_key, -1, 1, KEY_TTL) + if not curr_req_count then + return nil, err end - - local rec = ffi_cast(const_rec_ptr_type, v) - - local excess = max(tonumber(rec.excess) - 1000, 0) - - rec_cdata.excess = excess - rec_cdata.last = rec.last - dict:set(key, ffi_str(rec_cdata, rec_size)) return true end - +---@param self resty.limit.req +---@param rate number function _M.set_rate(self, rate) - self.rate = rate * 1000 + self.rate = rate end - +---@param self resty.limit.req +---@param burst number function _M.set_burst(self, burst) - self.burst = burst * 1000 + self.burst = burst end - return _M diff --git a/lib/resty/limit/req.md b/lib/resty/limit/req.md index c2b040f..65ba6d7 100644 --- a/lib/resty/limit/req.md +++ b/lib/resty/limit/req.md @@ -91,7 +91,7 @@ Description =========== This module provides APIs to help the OpenResty/ngx_lua user programmers limit request -rate using the "leaky bucket" method. +rate using the "leaky bucket" + "sliding window" method. If you want to use multiple different instances of this class at once or use one instance of this class with instances of other classes (like [resty.limit.conn](./conn.md)), @@ -195,8 +195,8 @@ uncommit -------- **syntax:** `ok, err = obj:uncommit(key)` -This tries to undo the commit of the `incoming` call. This is simply an approximation -and should be used with care. This method is mainly for being used in the [resty.limit.traffic](./traffic.md) +This tries to undo the commit of the `incoming` call. +This method is mainly for being used in the [resty.limit.traffic](./traffic.md) Lua module when combining multiple limiters at the same time. [Back to TOC](#table-of-contents) diff --git a/lib/resty/limit/traffic.lua b/lib/resty/limit/traffic.lua index 6333499..8c179b6 100644 --- a/lib/resty/limit/traffic.lua +++ b/lib/resty/limit/traffic.lua @@ -8,6 +8,7 @@ local max = math.max +---@class resty.limit.traffic local _M = { _VERSION = '0.09' } @@ -18,6 +19,9 @@ local _M = { -- is the "excess" value (i.e., the number of excessive requests each second), -- and for resty.limit.conn, the state is the current concurrency level -- (including the current new connection). +---@param limiters resty.limit.req[]|resty.limit.conn[]|resty.limit.count[] +---@param keys string[]|number[] +---@param states (number|((ngx.shared.DICT.error)?))[] function _M.combine(limiters, keys, states) local n = #limiters local max_delay = 0 @@ -54,5 +58,4 @@ function _M.combine(limiters, keys, states) return max_delay end - return _M diff --git a/t/req.t b/t/req.t index e0f4428..8cf04c7 100644 --- a/t/req.t +++ b/t/req.t @@ -42,19 +42,23 @@ $::HttpConfig local uri = ngx.var.uri for i = 1, 80 do local delay, err = lim:incoming(uri, true) + ngx.say("i=", i, ", delay=", delay) if not delay then ngx.say("failed to limit request: ", err) return end - ngx.sleep(delay) end ngx.say("elapsed: ", ngx.now() - begin, " sec.") '; } --- request GET /t ---- response_body_like eval -qr/^elapsed: 1\.9[6-9]\d* sec\.$/ +--- response_body_like +.*i=57, delay=0.425.* +--- response_body_like +.*i=67, delay=0.675.* +--- response_body_like +i=80, delay=1 --- no_error_log [error] [lua] @@ -74,7 +78,7 @@ $::HttpConfig content_by_lua ' local limit_req = require "resty.limit.req" ngx.shared.store:flush_all() - local lim = limit_req.new("store", 2, 10) + local lim = limit_req.new("store", 1, 10) local delay1, excess1 = lim:incoming("foo", true) local delay2, excess2 = lim:incoming("foo", true) local delay3 = lim:incoming("bar", true) @@ -92,10 +96,10 @@ $::HttpConfig --- response_body delay1: 0 excess1: 0 -delay2: 0.5 +delay2: 1 excess2: 1 delay3: 0 -delay4: 0.5 +delay4: 1 --- no_error_log [error] [lua] @@ -134,9 +138,9 @@ $::HttpConfig --- request GET /t --- response_body -2: error: rejected 3: error: rejected 4: error: rejected +5: error: rejected --- no_error_log [error] [lua] @@ -171,82 +175,16 @@ $::HttpConfig GET /t --- response_body delay: 0 +delay: 0 +delay: 0.5 delay: 0.5 -delay: 1 -delay: 1 ---- no_error_log -[error] -[lua] - - - -=== TEST 5: bad value in shdict (integer type) ---- http_config eval -" -$::HttpConfig - - lua_shared_dict store 1m; -" ---- config - location = /t { - content_by_lua ' - local limit_req = require "resty.limit.req" - ngx.shared.store:flush_all() - local key = "bar" - ngx.shared.store:set("bar", 32) - local lim = limit_req.new("store", 2, 10) - local delay, err = lim:incoming(key, true) - if not delay then - ngx.say("failed to limit request: ", err) - else - ngx.say("delay: ", delay) - end - '; - } ---- request - GET /t ---- response_body -failed to limit request: shdict abused by other users ---- no_error_log -[error] -[lua] - - - -=== TEST 6: bad value in shdict (string type, and wrong size) ---- http_config eval -" -$::HttpConfig - - lua_shared_dict store 1m; -" ---- config - location = /t { - content_by_lua ' - local limit_req = require "resty.limit.req" - ngx.shared.store:flush_all() - local key = "bar" - ngx.shared.store:set("bar", "a") - local lim = limit_req.new("store", 2, 10) - local delay, err = lim:incoming(key, true) - if not delay then - ngx.say("failed to limit request: ", err) - else - ngx.say("delay: ", delay) - end - '; - } ---- request - GET /t ---- response_body -failed to limit request: shdict abused by other users --- no_error_log [error] [lua] -=== TEST 7: a single key (commit & uncommit) +=== TEST 5: a single key (commit & uncommit) --- http_config eval " $::HttpConfig @@ -261,10 +199,17 @@ $::HttpConfig local lim = limit_req.new("store", 40, 40) local begin = ngx.now() local uri = ngx.var.uri - for i = 1, 5 do + for i = 1, 40 do + local delay, err = lim:incoming(uri, true) + if not delay or delay ~= 0 then + ngx.say("failed to allow request ", i, ": ", err) + return + end + end + for i = 41, 80 do local delay, err = lim:incoming(uri, true) if not delay then - ngx.say("failed to limit request: ", err) + ngx.say("failed to delay request ", i, ": ", err) return end ngx.say(i, ": delay: ", delay) @@ -272,6 +217,7 @@ $::HttpConfig local ok, err = lim:uncommit(uri) if not ok then ngx.say("failed to uncommit: ", err) + return end -- ]] end @@ -279,12 +225,31 @@ $::HttpConfig } --- request GET /t ---- response_body +--- response_body_like 1: delay: 0 -2: delay: 0.025 -3: delay: 0.025 -4: delay: 0.025 -5: delay: 0.025 +2: delay: 0 +3: delay: 0 +4: delay: 0 +5: delay: 0 +--- response_body_like +36: delay: 0 +37: delay: 0 +38: delay: 0 +39: delay: 0 +40: delay: 0 +--- response_body_like +41: delay: 0.025 +42: delay: 0.025 +43: delay: 0.025 +--- response_body_like +62: delay: 0.025 +63: delay: 0.025 +64: delay: 0.025 +65: delay: 0.025 +--- response_body_like +78: delay: 0.025 +79: delay: 0.025 +80: delay: 0.025 --- no_error_log [error] [lua]