diff --git a/lapis/db/postgres.lua b/lapis/db/postgres.lua index 1b42790e..b799b5d3 100644 --- a/lapis/db/postgres.lua +++ b/lapis/db/postgres.lua @@ -6,6 +6,7 @@ do type, tostring, pairs, select = _obj_0.type, _obj_0.tostring, _obj_0.pairs, _obj_0.select end local raw_query +local get_handle local logger local FALSE, NULL, TRUE, build_helpers, format_date, is_raw, raw, is_list, list, is_encodable do @@ -47,7 +48,8 @@ local BACKENDS = { local config = require("lapis.config").get() local pg_config = assert(config.postgres, "missing postgres configuration") local pgmoon_conn - return function(str) + local pgmoon_getter + pgmoon_getter = function() local pgmoon = ngx and ngx.ctx.pgmoon or pgmoon_conn if not (pgmoon) then local Postgres @@ -63,6 +65,11 @@ local BACKENDS = { pgmoon_conn = pgmoon end end + return pgmoon + end + local query_impl + query_impl = function(str) + local pgmoon = pgmoon_getter() local start_time if ngx and config.measure_performance then ngx.update_time() @@ -82,6 +89,7 @@ local BACKENDS = { end return res end + return query_impl, pgmoon_getter end } local set_backend @@ -90,7 +98,7 @@ set_backend = function(name, ...) if not (backend) then error("Failed to find PostgreSQL backend: " .. tostring(name)) end - raw_query = backend(...) + raw_query, get_handle = backend(...) end local set_raw_query set_raw_query = function(fn) @@ -391,6 +399,143 @@ encode_case = function(exp, t, on_else) append_all(buff, "\nEND") return concat(buff) end +local wait, post +do + local changes_sema + local changes = { } + local queues = { } + setmetatable(queues, { + __mode = 'v' + }) + local pending_unlisten = { } + local TIMEOUT = 5 * 60 + local UNLISTEN_TIMEOUT = 15 + local set_gc + set_gc = function(t, finalizer) + local mt = { + __gc = finalizer + } + setmetatable(t, mt) + if _G.newproxy then + mt.newproxy = _G.newproxy(true) + getmetatable(mt.newproxy).__gc = finalizer + end + end + local change_channel + change_channel = function(action, channel) + table.insert(changes, action .. " " .. escape_identifier(channel)) + return changes_sema:post() + end + local listen_queues + listen_queues = function() + changes = { } + for channel, queue in pairs(queues) do + if queue.sema:count() < 0 then + change_channel("LISTEN", channel) + end + end + for channel in pairs(pending_unlisten) do + change_channel("LISTEN", channel) + end + end + local notify + notify = function(channel, payload) + local queue = queues[channel] + if queue then + queues[channel] = nil + queue.payload = payload + local sema = queue.sema + if sema:count() < 0 then + return sema:post(-sema:count()) + end + end + end + local notifier + notifier = function() + if not (get_handle) then + init_db() + end + local handle = get_handle() + handle.sock:settimeout(TIMEOUT * 1000) + local reader + reader = function() + while handle do + local operation = handle:wait() + if not (operation) then + handle = nil + changes_sema:post() + return + end + if operation.operation == 'notification' then + notify(operation.channel, operation.payload) + end + end + end + local writer + writer = function() + while handle do + changes_sema:wait(TIMEOUT) + if handle then + for _index_0 = 1, #changes do + local change = changes[_index_0] + if not (handle:post(change)) then + handle = nil + return + end + end + changes = { } + end + end + end + local reader_co = ngx.thread.spawn(reader) + local writer_co = ngx.thread.spawn(writer) + ngx.thread.wait(reader_co, writer_co) + listen_queues() + return ngx.timer.at(0.0, notifier) + end + post = function(channel, payload) + if payload == nil then + payload = '' + end + return query("NOTIFY " .. escape_identifier(channel) .. ", " .. escape_literal(payload)) + end + wait = function(channel) + local semaphore = require("ngx.semaphore") + if not (changes_sema) then + changes_sema = semaphore.new() + ngx.timer.at(0.0, notifier) + end + local queue = queues[channel] + if not (queue) then + queue = { + sema = assert(semaphore.new()) + } + set_gc(queue, function() + if not queues[channel] then + local label = { } + pending_unlisten[channel] = label + return ngx.timer.at(UNLISTEN_TIMEOUT, function() + if pending_unlisten[channel] == label then + pending_unlisten[channel] = nil + if not queues[channel] then + return change_channel("UNLISTEN", channel) + end + end + end) + end + end) + queues[channel] = queue + pending_unlisten[channel] = nil + change_channel("LISTEN", channel) + end + while true do + if queue.sema:wait(TIMEOUT) then + break + end + end + return queue.payload + end +end return { connect = connect, query = query, @@ -416,6 +561,8 @@ return { set_backend = set_backend, set_raw_query = set_raw_query, get_raw_query = get_raw_query, + post = post, + wait = wait, select = _select, insert = _insert, update = _update, diff --git a/lapis/db/postgres.moon b/lapis/db/postgres.moon index 9aa1c3c5..204fff46 100644 --- a/lapis/db/postgres.moon +++ b/lapis/db/postgres.moon @@ -2,6 +2,7 @@ import concat from table import type, tostring, pairs, select from _G local raw_query +local get_handle local logger import @@ -42,7 +43,7 @@ BACKENDS = { pg_config = assert config.postgres, "missing postgres configuration" local pgmoon_conn - (str) -> + pgmoon_getter = -> pgmoon = ngx and ngx.ctx.pgmoon or pgmoon_conn unless pgmoon @@ -56,6 +57,11 @@ BACKENDS = { else pgmoon_conn = pgmoon + pgmoon + + query_impl = (str) -> + pgmoon = pgmoon_getter! + start_time = if ngx and config.measure_performance ngx.update_time! ngx.now! @@ -71,6 +77,8 @@ BACKENDS = { if not res and err error "#{str}\n#{err}" res + + return query_impl, pgmoon_getter } set_backend = (name, ...) -> @@ -78,7 +86,7 @@ set_backend = (name, ...) -> unless backend error "Failed to find PostgreSQL backend: #{name}" - raw_query = backend ... + raw_query, get_handle = backend ... set_raw_query = (fn) -> raw_query = fn @@ -314,6 +322,128 @@ encode_case = (exp, t, on_else) -> append_all buff, "\nEND" concat buff +local wait, post + +do + local changes_sema + changes = {} + queues = {} + setmetatable queues, { + __mode: 'v' + } + pending_unlisten = {} + TIMEOUT = 5 * 60 -- 5 minutes + UNLISTEN_TIMEOUT = 15 -- seconds + + set_gc = (t, finalizer) -> + mt = {__gc: finalizer} + setmetatable t, mt + if _G.newproxy + -- tables' finalizers don't work in Lua 5.1 + mt.newproxy = _G.newproxy true + getmetatable(mt.newproxy).__gc = finalizer + + change_channel = (action, channel) -> + -- push SQL command to "notifier" + table.insert changes, action .. " " .. escape_identifier(channel) + changes_sema\post! + + listen_queues = -> + -- resume used channels + changes = {} + for channel, queue in pairs(queues) + if queue.sema\count! < 0 + change_channel "LISTEN", channel + for channel in pairs(pending_unlisten) + change_channel "LISTEN", channel + + notify = (channel, payload) -> + queue = queues[channel] + if queue + queues[channel] = nil + queue.payload = payload + sema = queue.sema + if sema\count! < 0 + sema\post(-sema\count!) + + notifier = -> + unless get_handle + init_db! -- replaces get_handle to default backend + handle = get_handle! + handle.sock\settimeout(TIMEOUT * 1000) + + reader = -> + while handle + operation = handle\wait! + unless operation + handle = nil + changes_sema\post! + return + if operation.operation == 'notification' + notify operation.channel, operation.payload + + writer = -> + while handle + changes_sema\wait TIMEOUT + if handle + for change in *changes + unless handle\post change + handle = nil + return + changes = {} + + reader_co = ngx.thread.spawn reader + writer_co = ngx.thread.spawn writer + + ngx.thread.wait reader_co, writer_co + + -- timeout or other error occurred + listen_queues! + -- restart + ngx.timer.at 0.0, notifier + + post = (channel, payload='') -> + query "NOTIFY " .. escape_identifier(channel) .. + ", " .. escape_literal(payload) + + wait = (channel) -> + semaphore = require "ngx.semaphore" + + unless changes_sema + -- start backgroud light thread "notifier" + -- which (un)listens on channels + -- and waits for notifications + changes_sema = semaphore.new! + ngx.timer.at 0.0, notifier + + queue = queues[channel] + unless queue + queue = { + sema: assert semaphore.new! + } + set_gc queue, -> + -- Prevent losing notifications. Send UNLISTEN command after + -- a channel being unused for some time. + if not queues[channel] + label = {} + pending_unlisten[channel] = label + ngx.timer.at UNLISTEN_TIMEOUT, -> + if pending_unlisten[channel] == label + pending_unlisten[channel] = nil + if not queues[channel] + change_channel "UNLISTEN", channel + + queues[channel] = queue + pending_unlisten[channel] = nil + + change_channel "LISTEN", channel + + while true + if queue.sema\wait TIMEOUT + break + + return queue.payload + { :connect :query, :raw, :is_raw, :list, :is_list, :array, :is_array, :NULL, :TRUE, @@ -327,6 +457,8 @@ encode_case = (exp, t, on_else) -> :set_raw_query :get_raw_query + :post, :wait, + select: _select insert: _insert update: _update