|
|
@@ -0,0 +1,149 @@ |
|
|
-module(ets_queue). |
|
|
-export([behaviour_info/1]). |
|
|
-define(DEFAULT_PAUSED, false). |
|
|
-define(DEFAULT_DELAY, 500). |
|
|
|
|
|
-record(state, {module, queue_name, paused, delay, peek}). |
|
|
|
|
|
behaviour_info(callbacks) -> [{init, 1}, {process, 1}]; |
|
|
behaviour_info(_) -> undefined. |
|
|
|
|
|
-compile(export_all). |
|
|
|
|
|
start_worker(Module, MonitorConfig) -> |
|
|
proc_lib:start_link(ets_queue, worker_init, [self(), Module, MonitorConfig]). |
|
|
|
|
|
worker_init(Parent, Module, MonitorConfig) -> |
|
|
try Module:init(Module) of |
|
|
{ok, QueueName} -> |
|
|
bootstrap_queue(QueueName), |
|
|
pg2:create(Module), |
|
|
pg2:join(Module, self()), |
|
|
proc_lib:init_ack(Parent, {ok, self()}), |
|
|
error_logger:info_report([?MODULE, {start_worker, Module}, init_ok]), |
|
|
State = #state{ |
|
|
module = Module, |
|
|
queue_name = QueueName, |
|
|
paused = proplists:get_value(paused, MonitorConfig, ?DEFAULT_PAUSED), |
|
|
delay = proplists:get_value(delay, MonitorConfig, ?DEFAULT_DELAY) |
|
|
}, |
|
|
ets_queue:worker_loop(State); |
|
|
Response -> |
|
|
error_logger:error_report([?MODULE, {start_worker, init_nok}, Response]), |
|
|
proc_lib:init_ack(Parent, {error, Response}) |
|
|
catch |
|
|
Ma:Mi -> |
|
|
error_logger:error_report([?MODULE, {start_worker, init_nok}, {Ma, Mi}]), |
|
|
proc_lib:init_ack(Parent, {Ma, Mi}) |
|
|
end. |
|
|
|
|
|
worker_loop(State) -> |
|
|
receive |
|
|
{'$ets_queue_worker', From, Message} -> |
|
|
State1 = case Message of |
|
|
pause -> |
|
|
gen:reply(From, ok), |
|
|
State#state{paused = true}; |
|
|
unpause -> |
|
|
gen:reply(From, ok), |
|
|
State#state{paused = false}; |
|
|
{set_delay, N} -> |
|
|
gen:reply(From, ok), |
|
|
State#state{delay = N}; |
|
|
get_delay -> |
|
|
gen:reply(From, State#state.delay), |
|
|
State; |
|
|
is_paused -> |
|
|
gen:reply(From, State#state.paused), |
|
|
State; |
|
|
peek -> |
|
|
State#state{peek = From}; |
|
|
Other -> |
|
|
error_logger:warning_report({unknown_message, Other}), |
|
|
gen:reply(From, {unknown_message, Other}), |
|
|
State |
|
|
end, |
|
|
worker_loop(State1) |
|
|
after 0 -> |
|
|
case State#state.paused of |
|
|
true -> |
|
|
timer:sleep(1000), |
|
|
worker_loop(State); |
|
|
false -> |
|
|
NewState = worker_dequeue(State), |
|
|
case NewState#state.delay of |
|
|
0 -> ok; |
|
|
Delay -> timer:sleep(Delay) |
|
|
end, |
|
|
worker_loop(NewState) |
|
|
end |
|
|
end. |
|
|
|
|
|
worker_dequeue(#state{module=Module, queue_name=QueueName, peek=Peek} = State) -> |
|
|
case (catch ets_queue:dequeue(QueueName)) of |
|
|
empty -> State; |
|
|
{ok, Message} -> |
|
|
if |
|
|
Peek =/= undefined -> gen:reply(Peek, Message); |
|
|
true -> ok |
|
|
end, |
|
|
case (catch Module:process(Message)) of |
|
|
{'EXIT', Reason} -> |
|
|
error_logger:error_report({process_exception, {module, Module}, {queue, QueueName}, {reason, Reason}}); |
|
|
Result -> |
|
|
error_logger:info_report([{Module, process, [Message]}, Result]) |
|
|
end, |
|
|
State#state{peek = undefined}; |
|
|
not_running -> |
|
|
error_logger:warning_report({queue_not_running, pausing_observer, {module, Module}, {queue, QueueName}}), |
|
|
State#state{paused = true}; |
|
|
{'EXIT', Reason} -> |
|
|
error_logger:error_report({?MODULE, ?LINE, {module, Module}, {queue, QueueName}, {reason, Reason}}), |
|
|
State; |
|
|
Other -> |
|
|
error_logger:error_report([unhandled_message, Other]), |
|
|
State |
|
|
end. |
|
|
|
|
|
bootstrap_queue(Server) -> |
|
|
case global:whereis_name(Server) of |
|
|
undefined -> proc_lib:start(?MODULE, queue_init, [self(), Server]); |
|
|
Pid -> {ok, Pid} |
|
|
end. |
|
|
|
|
|
queue_init(Parent, Server) -> |
|
|
global:register_name(Server, self()), |
|
|
Table = list_to_atom(atom_to_list(Server) ++ "_table"), |
|
|
ets:new(Table, [ordered_set, named_table, protected]), |
|
|
proc_lib:init_ack(Parent, {ok, self()}), |
|
|
ets_queue:queue_loop(Server, Table). |
|
|
|
|
|
queue_loop(Server, Table) -> |
|
|
receive |
|
|
{_From, queue, Item} -> |
|
|
ets:insert(Table, {now(), Item}); |
|
|
{From, dequeue} -> |
|
|
case ets:first(Table) of |
|
|
'$end_of_table' -> From ! empty; |
|
|
Key -> |
|
|
[Object] = ets:lookup(Table, Key), |
|
|
ets:delete(Table, Key), |
|
|
From ! {ok, Object} |
|
|
end; |
|
|
{From, info} -> |
|
|
From ! ets:info(Table) |
|
|
end, |
|
|
ets_queue:queue_loop(Server, Table). |
|
|
|
|
|
queue(Server, Message) -> |
|
|
{ok, ServerPid} = bootstrap_queue(Server), |
|
|
ServerPid ! {self(), queue, Message}, |
|
|
ok. |
|
|
|
|
|
dequeue(Server) -> |
|
|
case global:whereis_name(Server) of |
|
|
undefined -> not_running; |
|
|
Pid -> |
|
|
Pid ! {self(), dequeue}, |
|
|
receive X -> X end |
|
|
end. |