Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 149 additions & 2 deletions lapis/db/postgres.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ do
type, tostring, pairs, select = _obj_0.type, _obj_0.tostring, _obj_0.pairs, _obj_0.select
end
local raw_query
local get_handle
local logger
local FALSE, NULL, TRUE, build_helpers, format_date, is_raw, raw, is_list, list, is_encodable
do
Expand Down Expand Up @@ -47,7 +48,8 @@ local BACKENDS = {
local config = require("lapis.config").get()
local pg_config = assert(config.postgres, "missing postgres configuration")
local pgmoon_conn
return function(str)
local pgmoon_getter
pgmoon_getter = function()
local pgmoon = ngx and ngx.ctx.pgmoon or pgmoon_conn
if not (pgmoon) then
local Postgres
Expand All @@ -63,6 +65,11 @@ local BACKENDS = {
pgmoon_conn = pgmoon
end
end
return pgmoon
end
local query_impl
query_impl = function(str)
local pgmoon = pgmoon_getter()
local start_time
if ngx and config.measure_performance then
ngx.update_time()
Expand All @@ -82,6 +89,7 @@ local BACKENDS = {
end
return res
end
return query_impl, pgmoon_getter
end
}
local set_backend
Expand All @@ -90,7 +98,7 @@ set_backend = function(name, ...)
if not (backend) then
error("Failed to find PostgreSQL backend: " .. tostring(name))
end
raw_query = backend(...)
raw_query, get_handle = backend(...)
end
local set_raw_query
set_raw_query = function(fn)
Expand Down Expand Up @@ -391,6 +399,143 @@ encode_case = function(exp, t, on_else)
append_all(buff, "\nEND")
return concat(buff)
end
local wait, post
do
local changes_sema
local changes = { }
local queues = { }
setmetatable(queues, {
__mode = 'v'
})
local pending_unlisten = { }
local TIMEOUT = 5 * 60
local UNLISTEN_TIMEOUT = 15
local set_gc
set_gc = function(t, finalizer)
local mt = {
__gc = finalizer
}
setmetatable(t, mt)
if _G.newproxy then
mt.newproxy = _G.newproxy(true)
getmetatable(mt.newproxy).__gc = finalizer
end
end
local change_channel
change_channel = function(action, channel)
table.insert(changes, action .. " " .. escape_identifier(channel))
return changes_sema:post()
end
local listen_queues
listen_queues = function()
changes = { }
for channel, queue in pairs(queues) do
if queue.sema:count() < 0 then
change_channel("LISTEN", channel)
end
end
for channel in pairs(pending_unlisten) do
change_channel("LISTEN", channel)
end
end
local notify
notify = function(channel, payload)
local queue = queues[channel]
if queue then
queues[channel] = nil
queue.payload = payload
local sema = queue.sema
if sema:count() < 0 then
return sema:post(-sema:count())
end
end
end
local notifier
notifier = function()
if not (get_handle) then
init_db()
end
local handle = get_handle()
handle.sock:settimeout(TIMEOUT * 1000)
local reader
reader = function()
while handle do
local operation = handle:wait()
if not (operation) then
handle = nil
changes_sema:post()
return
end
if operation.operation == 'notification' then
notify(operation.channel, operation.payload)
end
end
end
local writer
writer = function()
while handle do
changes_sema:wait(TIMEOUT)
if handle then
for _index_0 = 1, #changes do
local change = changes[_index_0]
if not (handle:post(change)) then
handle = nil
return
end
end
changes = { }
end
end
end
local reader_co = ngx.thread.spawn(reader)
local writer_co = ngx.thread.spawn(writer)
ngx.thread.wait(reader_co, writer_co)
listen_queues()
return ngx.timer.at(0.0, notifier)
end
post = function(channel, payload)
if payload == nil then
payload = ''
end
return query("NOTIFY " .. escape_identifier(channel) .. ", " .. escape_literal(payload))
end
wait = function(channel)
local semaphore = require("ngx.semaphore")
if not (changes_sema) then
changes_sema = semaphore.new()
ngx.timer.at(0.0, notifier)
end
local queue = queues[channel]
if not (queue) then
queue = {
sema = assert(semaphore.new())
}
set_gc(queue, function()
if not queues[channel] then
local label = { }
pending_unlisten[channel] = label
return ngx.timer.at(UNLISTEN_TIMEOUT, function()
if pending_unlisten[channel] == label then
pending_unlisten[channel] = nil
if not queues[channel] then
return change_channel("UNLISTEN", channel)
end
end
end)
end
end)
queues[channel] = queue
pending_unlisten[channel] = nil
change_channel("LISTEN", channel)
end
while true do
if queue.sema:wait(TIMEOUT) then
break
end
end
return queue.payload
end
end
return {
connect = connect,
query = query,
Expand All @@ -416,6 +561,8 @@ return {
set_backend = set_backend,
set_raw_query = set_raw_query,
get_raw_query = get_raw_query,
post = post,
wait = wait,
select = _select,
insert = _insert,
update = _update,
Expand Down
136 changes: 134 additions & 2 deletions lapis/db/postgres.moon
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import concat from table
import type, tostring, pairs, select from _G

local raw_query
local get_handle
local logger

import
Expand Down Expand Up @@ -42,7 +43,7 @@ BACKENDS = {
pg_config = assert config.postgres, "missing postgres configuration"
local pgmoon_conn

(str) ->
pgmoon_getter = ->
pgmoon = ngx and ngx.ctx.pgmoon or pgmoon_conn

unless pgmoon
Expand All @@ -56,6 +57,11 @@ BACKENDS = {
else
pgmoon_conn = pgmoon

pgmoon

query_impl = (str) ->
pgmoon = pgmoon_getter!

start_time = if ngx and config.measure_performance
ngx.update_time!
ngx.now!
Expand All @@ -71,14 +77,16 @@ BACKENDS = {
if not res and err
error "#{str}\n#{err}"
res

return query_impl, pgmoon_getter
}

set_backend = (name, ...) ->
backend = BACKENDS[name]
unless backend
error "Failed to find PostgreSQL backend: #{name}"

raw_query = backend ...
raw_query, get_handle = backend ...

set_raw_query = (fn) ->
raw_query = fn
Expand Down Expand Up @@ -314,6 +322,128 @@ encode_case = (exp, t, on_else) ->
append_all buff, "\nEND"
concat buff

local wait, post

do
local changes_sema
changes = {}
queues = {}
setmetatable queues, {
__mode: 'v'
}
pending_unlisten = {}
TIMEOUT = 5 * 60 -- 5 minutes
UNLISTEN_TIMEOUT = 15 -- seconds

set_gc = (t, finalizer) ->
mt = {__gc: finalizer}
setmetatable t, mt
if _G.newproxy
-- tables' finalizers don't work in Lua 5.1
mt.newproxy = _G.newproxy true
getmetatable(mt.newproxy).__gc = finalizer

change_channel = (action, channel) ->
-- push SQL command to "notifier"
table.insert changes, action .. " " .. escape_identifier(channel)
changes_sema\post!

listen_queues = ->
-- resume used channels
changes = {}
for channel, queue in pairs(queues)
if queue.sema\count! < 0
change_channel "LISTEN", channel
for channel in pairs(pending_unlisten)
change_channel "LISTEN", channel

notify = (channel, payload) ->
queue = queues[channel]
if queue
queues[channel] = nil
queue.payload = payload
sema = queue.sema
if sema\count! < 0
sema\post(-sema\count!)

notifier = ->
unless get_handle
init_db! -- replaces get_handle to default backend
handle = get_handle!
handle.sock\settimeout(TIMEOUT * 1000)

reader = ->
while handle
operation = handle\wait!
unless operation
handle = nil
changes_sema\post!
return
if operation.operation == 'notification'
notify operation.channel, operation.payload

writer = ->
while handle
changes_sema\wait TIMEOUT
if handle
for change in *changes
unless handle\post change
handle = nil
return
changes = {}

reader_co = ngx.thread.spawn reader
writer_co = ngx.thread.spawn writer

ngx.thread.wait reader_co, writer_co

-- timeout or other error occurred
listen_queues!
-- restart
ngx.timer.at 0.0, notifier

post = (channel, payload='') ->
query "NOTIFY " .. escape_identifier(channel) ..
", " .. escape_literal(payload)

wait = (channel) ->
semaphore = require "ngx.semaphore"

unless changes_sema
-- start backgroud light thread "notifier"
-- which (un)listens on channels
-- and waits for notifications
changes_sema = semaphore.new!
ngx.timer.at 0.0, notifier

queue = queues[channel]
unless queue
queue = {
sema: assert semaphore.new!
}
set_gc queue, ->
-- Prevent losing notifications. Send UNLISTEN command after
-- a channel being unused for some time.
if not queues[channel]
label = {}
pending_unlisten[channel] = label
ngx.timer.at UNLISTEN_TIMEOUT, ->
if pending_unlisten[channel] == label
pending_unlisten[channel] = nil
if not queues[channel]
change_channel "UNLISTEN", channel

queues[channel] = queue
pending_unlisten[channel] = nil

change_channel "LISTEN", channel

while true
if queue.sema\wait TIMEOUT
break

return queue.payload

{
:connect
:query, :raw, :is_raw, :list, :is_list, :array, :is_array, :NULL, :TRUE,
Expand All @@ -327,6 +457,8 @@ encode_case = (exp, t, on_else) ->
:set_raw_query
:get_raw_query

:post, :wait,

select: _select
insert: _insert
update: _update
Expand Down