From 596fb790f13ce16b4fec5ebcb2b9652dc8ba6bdb Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Sun, 3 Apr 2016 17:46:28 +0300 Subject: [PATCH 1/5] make use of Postgres notifications Postgres supports publish-subscribe feature. One client listens on a channel (provided as a string), while another (or same) client send a notification. It can be used to send notifications accross machines and nginx workers. The notification can be delivered with any read operation. To decouple receiving notifications from reading results of normal queries, a background handler with independent database socket is created. The handler starts two light threads: a reader and a writer. The reader reads new notifications from the database. The writer pushes commands "listen" and "unlisten" to the database. Signals to/from the background handler are delivered using openresty's semaphores. Two new methods of "lapis.db.postgres" (and "lapis.db") were added: * wait(channel) -- waits on the channel until it is notified with post(). Returns a payload passed by caller of post(). * post(channel, payload) -- notifies the channel with the payload. All waiting light threads in nginx worker share the same database socket. All light threads waiting on the same channel use the same semaphore object which is triggered by reader light thread of the background handler. When new channel is started being listened, it is provided to writer light thread of the background handler and it sends command "listen channel". When a notification is received, the corresponding channel is "unlistened" by writer light thread and the corresponding semaphore is removed. It is needed to prevent resource leaks in nginx worker and in database worker. All methods are 100% non-blocking. Notifications add zero overhead when not used. Example: location /wait { default_type text/html; lua_check_client_abort on; content_by_lua ' local channel = ngx.req.get_uri_args().channel while true do ngx.say(require("lapis.db").wait(channel)) ngx.flush() end '; } location /post { default_type text/html; content_by_lua ' local channel = ngx.req.get_uri_args().channel local payload = ngx.req.get_uri_args().payload require("lapis.db").post(channel, payload) '; } $ curl http://localhost:8080/wait?channel=foo In another terminal: $ curl 'http://localhost:25516/post?channel=foo&payload=bar' The first terminal should output "bar". --- lapis/db/postgres.lua | 92 +++++++++++++++++++++++++++++++++++++++++- lapis/db/postgres.moon | 86 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 174 insertions(+), 4 deletions(-) diff --git a/lapis/db/postgres.lua b/lapis/db/postgres.lua index 1b42790e..bb601387 100644 --- a/lapis/db/postgres.lua +++ b/lapis/db/postgres.lua @@ -6,6 +6,7 @@ do type, tostring, pairs, select = _obj_0.type, _obj_0.tostring, _obj_0.pairs, _obj_0.select end local raw_query +local get_handle local logger local FALSE, NULL, TRUE, build_helpers, format_date, is_raw, raw, is_list, list, is_encodable do @@ -47,7 +48,8 @@ local BACKENDS = { local config = require("lapis.config").get() local pg_config = assert(config.postgres, "missing postgres configuration") local pgmoon_conn - return function(str) + local pgmoon_getter + pgmoon_getter = function() local pgmoon = ngx and ngx.ctx.pgmoon or pgmoon_conn if not (pgmoon) then local Postgres @@ -63,6 +65,11 @@ local BACKENDS = { pgmoon_conn = pgmoon end end + return pgmoon + end + local query_impl + query_impl = function(str) + local pgmoon = pgmoon_getter() local start_time if ngx and config.measure_performance then ngx.update_time() @@ -82,6 +89,7 @@ local BACKENDS = { end return res end + return query_impl, pgmoon_getter end } local set_backend @@ -90,7 +98,7 @@ set_backend = function(name, ...) if not (backend) then error("Failed to find PostgreSQL backend: " .. tostring(name)) end - raw_query = backend(...) + raw_query, get_handle = backend(...) end local set_raw_query set_raw_query = function(fn) @@ -391,6 +399,84 @@ encode_case = function(exp, t, on_else) append_all(buff, "\nEND") return concat(buff) end +local wait, post +do + local changes_sema + local changes = { } + local queues = { } + local TIMEOUT = 5 * 60 + local change_channel + change_channel = function(action, channel) + table.insert(changes, action .. " " .. escape_identifier(channel)) + return changes_sema:post() + end + local notify + notify = function(channel, payload) + local queue = queues[channel] + if queue then + queues[channel] = nil + change_channel("UNLISTEN", channel) + queue.payload = payload + local sema = queue.sema + return sema:post(-sema:count()) + end + end + local notifier + notifier = function() + init_db() + local handle = get_handle() + local reader + reader = function() + while true do + local operation = handle:wait() + if operation and operation.operation == 'notification' then + notify(operation.channel, operation.payload) + end + end + end + local writer + writer = function() + while true do + changes_sema:wait(TIMEOUT) + for _index_0 = 1, #changes do + local change = changes[_index_0] + assert(handle:post(change)) + end + changes = { } + end + end + local reader_co = ngx.thread.spawn(reader) + local writer_co = ngx.thread.spawn(writer) + return ngx.thread.wait(reader_co, writer_co) + end + post = function(channel, payload) + if payload == nil then + payload = '' + end + return query("NOTIFY " .. escape_identifier(channel) .. ", " .. escape_literal(payload)) + end + wait = function(channel) + local semaphore = require("ngx.semaphore") + if not (changes_sema) then + changes_sema = semaphore.new() + ngx.timer.at(0.0, notifier) + end + local queue = queues[channel] + if not (queue) then + queue = { + sema = assert(semaphore.new()) + } + queues[channel] = queue + change_channel("LISTEN", channel) + end + while true do + if queue.sema:wait(TIMEOUT) then + break + end + end + return queue.payload + end +end return { connect = connect, query = query, @@ -416,6 +502,8 @@ return { set_backend = set_backend, set_raw_query = set_raw_query, get_raw_query = get_raw_query, + post = post, + wait = wait, select = _select, insert = _insert, update = _update, diff --git a/lapis/db/postgres.moon b/lapis/db/postgres.moon index 9aa1c3c5..b05fbff5 100644 --- a/lapis/db/postgres.moon +++ b/lapis/db/postgres.moon @@ -2,6 +2,7 @@ import concat from table import type, tostring, pairs, select from _G local raw_query +local get_handle local logger import @@ -42,7 +43,7 @@ BACKENDS = { pg_config = assert config.postgres, "missing postgres configuration" local pgmoon_conn - (str) -> + pgmoon_getter = -> pgmoon = ngx and ngx.ctx.pgmoon or pgmoon_conn unless pgmoon @@ -56,6 +57,11 @@ BACKENDS = { else pgmoon_conn = pgmoon + pgmoon + + query_impl = (str) -> + pgmoon = pgmoon_getter! + start_time = if ngx and config.measure_performance ngx.update_time! ngx.now! @@ -71,6 +77,8 @@ BACKENDS = { if not res and err error "#{str}\n#{err}" res + + return query_impl, pgmoon_getter } set_backend = (name, ...) -> @@ -78,7 +86,7 @@ set_backend = (name, ...) -> unless backend error "Failed to find PostgreSQL backend: #{name}" - raw_query = backend ... + raw_query, get_handle = backend ... set_raw_query = (fn) -> raw_query = fn @@ -314,6 +322,78 @@ encode_case = (exp, t, on_else) -> append_all buff, "\nEND" concat buff +local wait, post + +do + local changes_sema + changes = {} + queues = {} + TIMEOUT = 5 * 60 -- 5 minutes + + change_channel = (action, channel) -> + -- push SQL command to "notifier" + table.insert changes, action .. " " .. escape_identifier(channel) + changes_sema\post! + + notify = (channel, payload) -> + queue = queues[channel] + if queue + queues[channel] = nil + change_channel "UNLISTEN", channel + queue.payload = payload + sema = queue.sema + sema\post(-sema\count!) + + notifier = -> + init_db! -- replaces get_handle to default backend + handle = get_handle! + + reader = -> + while true + operation = handle\wait! + if operation and operation.operation == 'notification' + notify operation.channel, operation.payload + + writer = -> + while true + changes_sema\wait TIMEOUT + for change in *changes + assert handle\post change + changes = {} + + reader_co = ngx.thread.spawn reader + writer_co = ngx.thread.spawn writer + ngx.thread.wait reader_co, writer_co + + post = (channel, payload='') -> + query "NOTIFY " .. escape_identifier(channel) .. + ", " .. escape_literal(payload) + + wait = (channel) -> + semaphore = require "ngx.semaphore" + + unless changes_sema + -- start backgroud light thread "notifier" + -- which (un)listens on channels + -- and waits for notifications + changes_sema = semaphore.new! + ngx.timer.at 0.0, notifier + + queue = queues[channel] + unless queue + queue = { + sema: assert semaphore.new! + } + queues[channel] = queue + + change_channel "LISTEN", channel + + while true + if queue.sema\wait TIMEOUT + break + + return queue.payload + { :connect :query, :raw, :is_raw, :list, :is_list, :array, :is_array, :NULL, :TRUE, @@ -327,6 +407,8 @@ encode_case = (exp, t, on_else) -> :set_raw_query :get_raw_query + :post, :wait, + select: _select insert: _insert update: _update From a535625c5207522234dcff0cecfeb5acb0a06e38 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Sun, 3 Apr 2016 19:21:17 +0300 Subject: [PATCH 2/5] restart notifier Lua handler on socket timeout In general, read operation is limited by settings of the server or firewall. In case of errors in reader or writer, new background Lua handler runs. All channels being listened are re-listened. Timeout of socket is also set to 5 minutes to have a value. --- lapis/db/postgres.lua | 35 ++++++++++++++++++++++++++--------- lapis/db/postgres.moon | 32 +++++++++++++++++++++++++------- 2 files changed, 51 insertions(+), 16 deletions(-) diff --git a/lapis/db/postgres.lua b/lapis/db/postgres.lua index bb601387..ed713202 100644 --- a/lapis/db/postgres.lua +++ b/lapis/db/postgres.lua @@ -423,31 +423,48 @@ do end local notifier notifier = function() - init_db() + if not (get_handle) then + init_db() + end local handle = get_handle() + handle.sock:settimeout(TIMEOUT * 1000) local reader reader = function() - while true do + while handle do local operation = handle:wait() - if operation and operation.operation == 'notification' then + if not (operation) then + handle = nil + changes_sema:post() + return + end + if operation.operation == 'notification' then notify(operation.channel, operation.payload) end end end local writer writer = function() - while true do + while handle do changes_sema:wait(TIMEOUT) - for _index_0 = 1, #changes do - local change = changes[_index_0] - assert(handle:post(change)) + if handle then + for _index_0 = 1, #changes do + local change = changes[_index_0] + if not (handle:post(change)) then + handle = nil + return + end + end + changes = { } end - changes = { } end end local reader_co = ngx.thread.spawn(reader) local writer_co = ngx.thread.spawn(writer) - return ngx.thread.wait(reader_co, writer_co) + for channel in pairs(queues) do + change_channel("LISTEN", channel) + end + ngx.thread.wait(reader_co, writer_co) + return ngx.timer.at(0.0, notifier) end post = function(channel, payload) if payload == nil then diff --git a/lapis/db/postgres.moon b/lapis/db/postgres.moon index b05fbff5..92406c63 100644 --- a/lapis/db/postgres.moon +++ b/lapis/db/postgres.moon @@ -345,26 +345,44 @@ do sema\post(-sema\count!) notifier = -> - init_db! -- replaces get_handle to default backend + unless get_handle + init_db! -- replaces get_handle to default backend handle = get_handle! + handle.sock\settimeout(TIMEOUT * 1000) reader = -> - while true + while handle operation = handle\wait! - if operation and operation.operation == 'notification' + unless operation + handle = nil + changes_sema\post! + return + if operation.operation == 'notification' notify operation.channel, operation.payload writer = -> - while true + while handle changes_sema\wait TIMEOUT - for change in *changes - assert handle\post change - changes = {} + if handle + for change in *changes + unless handle\post change + handle = nil + return + changes = {} reader_co = ngx.thread.spawn reader writer_co = ngx.thread.spawn writer + + -- resume channels added before timeout + -- can cause double "LISTEN" which are not an error + for channel in pairs(queues) + change_channel "LISTEN", channel + ngx.thread.wait reader_co, writer_co + -- timeout or other error occurred. Restart + ngx.timer.at 0.0, notifier + post = (channel, payload='') -> query "NOTIFY " .. escape_identifier(channel) .. ", " .. escape_literal(payload) From 27639791a37abc905b4538320f59c553639db21d Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Sun, 3 Apr 2016 23:51:08 +0300 Subject: [PATCH 3/5] notifications: cleanup queues periodically In previous implementation, resources were leaked in case of disconnection of all clients (sema:count() == 0) and no notification on the channel. The entry of "queues" in nginx and the channel listener in postres were leaked. Function cleanup_queues was introduced. It checks all queues and removes unused items and unlistens corresponding channels. It is called when reader light thread detects read timeout (after 5 minutes of no notifications) or once in 10000 received notifications. Relistening channels after restart of the background Lua handler was moved to function listen_queues. --- lapis/db/postgres.lua | 35 +++++++++++++++++++++++++++++++---- lapis/db/postgres.moon | 36 +++++++++++++++++++++++++++++------- 2 files changed, 60 insertions(+), 11 deletions(-) diff --git a/lapis/db/postgres.lua b/lapis/db/postgres.lua index ed713202..7dca931c 100644 --- a/lapis/db/postgres.lua +++ b/lapis/db/postgres.lua @@ -405,11 +405,32 @@ do local changes = { } local queues = { } local TIMEOUT = 5 * 60 + local notify_counter = 0 + local CLEANUP_PERIOD = 10000 local change_channel change_channel = function(action, channel) table.insert(changes, action .. " " .. escape_identifier(channel)) return changes_sema:post() end + local cleanup_queues + cleanup_queues = function() + local new_queues = { } + for channel, queue in pairs(queues) do + if queue.sema:count() < 0 then + new_queues[channel] = queue + else + change_channel("UNLISTEN", channel) + end + end + queues = new_queues + end + local listen_queues + listen_queues = function() + changes = { } + for channel in pairs(queues) do + change_channel("LISTEN", channel) + end + end local notify notify = function(channel, payload) local queue = queues[channel] @@ -418,7 +439,14 @@ do change_channel("UNLISTEN", channel) queue.payload = payload local sema = queue.sema - return sema:post(-sema:count()) + if sema:count() < 0 then + sema:post(-sema:count()) + end + end + notify_counter = notify_counter + 1 + if notify_counter == CLEANUP_PERIOD then + notify_counter = 0 + return cleanup_queues() end end local notifier @@ -460,10 +488,9 @@ do end local reader_co = ngx.thread.spawn(reader) local writer_co = ngx.thread.spawn(writer) - for channel in pairs(queues) do - change_channel("LISTEN", channel) - end ngx.thread.wait(reader_co, writer_co) + cleanup_queues() + listen_queues() return ngx.timer.at(0.0, notifier) end post = function(channel, payload) diff --git a/lapis/db/postgres.moon b/lapis/db/postgres.moon index 92406c63..234aba2e 100644 --- a/lapis/db/postgres.moon +++ b/lapis/db/postgres.moon @@ -329,12 +329,30 @@ do changes = {} queues = {} TIMEOUT = 5 * 60 -- 5 minutes + notify_counter = 0 + CLEANUP_PERIOD = 10000 change_channel = (action, channel) -> -- push SQL command to "notifier" table.insert changes, action .. " " .. escape_identifier(channel) changes_sema\post! + cleanup_queues = -> + -- remove unused channels + new_queues = {} + for channel, queue in pairs(queues) + if queue.sema\count! < 0 + new_queues[channel] = queue + else + change_channel "UNLISTEN", channel + queues = new_queues + + listen_queues = -> + -- resume used channels + changes = {} + for channel in pairs(queues) + change_channel "LISTEN", channel + notify = (channel, payload) -> queue = queues[channel] if queue @@ -342,7 +360,13 @@ do change_channel "UNLISTEN", channel queue.payload = payload sema = queue.sema - sema\post(-sema\count!) + if sema\count! < 0 + sema\post(-sema\count!) + + notify_counter += 1 + if notify_counter == CLEANUP_PERIOD + notify_counter = 0 + cleanup_queues! notifier = -> unless get_handle @@ -373,14 +397,12 @@ do reader_co = ngx.thread.spawn reader writer_co = ngx.thread.spawn writer - -- resume channels added before timeout - -- can cause double "LISTEN" which are not an error - for channel in pairs(queues) - change_channel "LISTEN", channel - ngx.thread.wait reader_co, writer_co - -- timeout or other error occurred. Restart + -- timeout or other error occurred + cleanup_queues! + listen_queues! + -- restart ngx.timer.at 0.0, notifier post = (channel, payload='') -> From 726f49705ee82b47c7631e66921cf6e7b269e1bf Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Thu, 7 Apr 2016 00:00:03 +0300 Subject: [PATCH 4/5] notify: use weak table to find unused channels Instead of checking all channels once in a while, put them to weak table and unlisten in a finalizer. --- lapis/db/postgres.lua | 48 +++++++++++++++++++++--------------------- lapis/db/postgres.moon | 39 ++++++++++++++++------------------ 2 files changed, 42 insertions(+), 45 deletions(-) diff --git a/lapis/db/postgres.lua b/lapis/db/postgres.lua index 7dca931c..89b99d10 100644 --- a/lapis/db/postgres.lua +++ b/lapis/db/postgres.lua @@ -404,31 +404,33 @@ do local changes_sema local changes = { } local queues = { } + setmetatable(queues, { + __mode = 'v' + }) local TIMEOUT = 5 * 60 - local notify_counter = 0 - local CLEANUP_PERIOD = 10000 + local set_gc + set_gc = function(t, finalizer) + local mt = { + __gc = finalizer + } + setmetatable(t, mt) + if _G.newproxy then + mt.newproxy = _G.newproxy(true) + getmetatable(mt.newproxy).__gc = finalizer + end + end local change_channel change_channel = function(action, channel) table.insert(changes, action .. " " .. escape_identifier(channel)) return changes_sema:post() end - local cleanup_queues - cleanup_queues = function() - local new_queues = { } - for channel, queue in pairs(queues) do - if queue.sema:count() < 0 then - new_queues[channel] = queue - else - change_channel("UNLISTEN", channel) - end - end - queues = new_queues - end local listen_queues listen_queues = function() changes = { } - for channel in pairs(queues) do - change_channel("LISTEN", channel) + for channel, queue in pairs(queues) do + if queue.sema:count() < 0 then + change_channel("LISTEN", channel) + end end end local notify @@ -436,18 +438,12 @@ do local queue = queues[channel] if queue then queues[channel] = nil - change_channel("UNLISTEN", channel) queue.payload = payload local sema = queue.sema if sema:count() < 0 then - sema:post(-sema:count()) + return sema:post(-sema:count()) end end - notify_counter = notify_counter + 1 - if notify_counter == CLEANUP_PERIOD then - notify_counter = 0 - return cleanup_queues() - end end local notifier notifier = function() @@ -489,7 +485,6 @@ do local reader_co = ngx.thread.spawn(reader) local writer_co = ngx.thread.spawn(writer) ngx.thread.wait(reader_co, writer_co) - cleanup_queues() listen_queues() return ngx.timer.at(0.0, notifier) end @@ -510,6 +505,11 @@ do queue = { sema = assert(semaphore.new()) } + set_gc(queue, function() + if not queues[channel] then + return change_channel("UNLISTEN", channel) + end + end) queues[channel] = queue change_channel("LISTEN", channel) end diff --git a/lapis/db/postgres.moon b/lapis/db/postgres.moon index 234aba2e..c7d5e441 100644 --- a/lapis/db/postgres.moon +++ b/lapis/db/postgres.moon @@ -328,46 +328,40 @@ do local changes_sema changes = {} queues = {} + setmetatable queues, { + __mode: 'v' + } TIMEOUT = 5 * 60 -- 5 minutes - notify_counter = 0 - CLEANUP_PERIOD = 10000 + + set_gc = (t, finalizer) -> + mt = {__gc: finalizer} + setmetatable t, mt + if _G.newproxy + -- tables' finalizers don't work in Lua 5.1 + mt.newproxy = _G.newproxy true + getmetatable(mt.newproxy).__gc = finalizer change_channel = (action, channel) -> -- push SQL command to "notifier" table.insert changes, action .. " " .. escape_identifier(channel) changes_sema\post! - cleanup_queues = -> - -- remove unused channels - new_queues = {} - for channel, queue in pairs(queues) - if queue.sema\count! < 0 - new_queues[channel] = queue - else - change_channel "UNLISTEN", channel - queues = new_queues - listen_queues = -> -- resume used channels changes = {} - for channel in pairs(queues) - change_channel "LISTEN", channel + for channel, queue in pairs(queues) + if queue.sema\count! < 0 + change_channel "LISTEN", channel notify = (channel, payload) -> queue = queues[channel] if queue queues[channel] = nil - change_channel "UNLISTEN", channel queue.payload = payload sema = queue.sema if sema\count! < 0 sema\post(-sema\count!) - notify_counter += 1 - if notify_counter == CLEANUP_PERIOD - notify_counter = 0 - cleanup_queues! - notifier = -> unless get_handle init_db! -- replaces get_handle to default backend @@ -400,7 +394,6 @@ do ngx.thread.wait reader_co, writer_co -- timeout or other error occurred - cleanup_queues! listen_queues! -- restart ngx.timer.at 0.0, notifier @@ -424,6 +417,10 @@ do queue = { sema: assert semaphore.new! } + set_gc queue, -> + if not queues[channel] + change_channel "UNLISTEN", channel + queues[channel] = queue change_channel "LISTEN", channel From 4ff79a057533f37102783526cbcc345e28cf62c3 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Thu, 7 Apr 2016 01:00:57 +0300 Subject: [PATCH 5/5] notify: prevent losing notifications Send UNLISTEN command after a channel being unused for some time. --- lapis/db/postgres.lua | 17 ++++++++++++++++- lapis/db/postgres.moon | 15 ++++++++++++++- 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/lapis/db/postgres.lua b/lapis/db/postgres.lua index 89b99d10..b799b5d3 100644 --- a/lapis/db/postgres.lua +++ b/lapis/db/postgres.lua @@ -407,7 +407,9 @@ do setmetatable(queues, { __mode = 'v' }) + local pending_unlisten = { } local TIMEOUT = 5 * 60 + local UNLISTEN_TIMEOUT = 15 local set_gc set_gc = function(t, finalizer) local mt = { @@ -432,6 +434,9 @@ do change_channel("LISTEN", channel) end end + for channel in pairs(pending_unlisten) do + change_channel("LISTEN", channel) + end end local notify notify = function(channel, payload) @@ -507,10 +512,20 @@ do } set_gc(queue, function() if not queues[channel] then - return change_channel("UNLISTEN", channel) + local label = { } + pending_unlisten[channel] = label + return ngx.timer.at(UNLISTEN_TIMEOUT, function() + if pending_unlisten[channel] == label then + pending_unlisten[channel] = nil + if not queues[channel] then + return change_channel("UNLISTEN", channel) + end + end + end) end end) queues[channel] = queue + pending_unlisten[channel] = nil change_channel("LISTEN", channel) end while true do diff --git a/lapis/db/postgres.moon b/lapis/db/postgres.moon index c7d5e441..204fff46 100644 --- a/lapis/db/postgres.moon +++ b/lapis/db/postgres.moon @@ -331,7 +331,9 @@ do setmetatable queues, { __mode: 'v' } + pending_unlisten = {} TIMEOUT = 5 * 60 -- 5 minutes + UNLISTEN_TIMEOUT = 15 -- seconds set_gc = (t, finalizer) -> mt = {__gc: finalizer} @@ -352,6 +354,8 @@ do for channel, queue in pairs(queues) if queue.sema\count! < 0 change_channel "LISTEN", channel + for channel in pairs(pending_unlisten) + change_channel "LISTEN", channel notify = (channel, payload) -> queue = queues[channel] @@ -418,10 +422,19 @@ do sema: assert semaphore.new! } set_gc queue, -> + -- Prevent losing notifications. Send UNLISTEN command after + -- a channel being unused for some time. if not queues[channel] - change_channel "UNLISTEN", channel + label = {} + pending_unlisten[channel] = label + ngx.timer.at UNLISTEN_TIMEOUT, -> + if pending_unlisten[channel] == label + pending_unlisten[channel] = nil + if not queues[channel] + change_channel "UNLISTEN", channel queues[channel] = queue + pending_unlisten[channel] = nil change_channel "LISTEN", channel