Skip to content

Instantly share code, notes, and snippets.

@snaiper80
Forked from ngerakines/ets_queue.erl
Created August 21, 2014 05:05
Show Gist options
  • Save snaiper80/02e582ee6ac7e224fe86 to your computer and use it in GitHub Desktop.
Save snaiper80/02e582ee6ac7e224fe86 to your computer and use it in GitHub Desktop.
-module(ets_queue).
-export([behaviour_info/1]).
-define(DEFAULT_PAUSED, false).
-define(DEFAULT_DELAY, 500).
-record(state, {module, queue_name, paused, delay, peek}).
behaviour_info(callbacks) -> [{init, 1}, {process, 1}];
behaviour_info(_) -> undefined.
-compile(export_all).
start_worker(Module, MonitorConfig) ->
proc_lib:start_link(ets_queue, worker_init, [self(), Module, MonitorConfig]).
worker_init(Parent, Module, MonitorConfig) ->
try Module:init(Module) of
{ok, QueueName} ->
bootstrap_queue(QueueName),
pg2:create(Module),
pg2:join(Module, self()),
proc_lib:init_ack(Parent, {ok, self()}),
error_logger:info_report([?MODULE, {start_worker, Module}, init_ok]),
State = #state{
module = Module,
queue_name = QueueName,
paused = proplists:get_value(paused, MonitorConfig, ?DEFAULT_PAUSED),
delay = proplists:get_value(delay, MonitorConfig, ?DEFAULT_DELAY)
},
ets_queue:worker_loop(State);
Response ->
error_logger:error_report([?MODULE, {start_worker, init_nok}, Response]),
proc_lib:init_ack(Parent, {error, Response})
catch
Ma:Mi ->
error_logger:error_report([?MODULE, {start_worker, init_nok}, {Ma, Mi}]),
proc_lib:init_ack(Parent, {Ma, Mi})
end.
worker_loop(State) ->
receive
{'$ets_queue_worker', From, Message} ->
State1 = case Message of
pause ->
gen:reply(From, ok),
State#state{paused = true};
unpause ->
gen:reply(From, ok),
State#state{paused = false};
{set_delay, N} ->
gen:reply(From, ok),
State#state{delay = N};
get_delay ->
gen:reply(From, State#state.delay),
State;
is_paused ->
gen:reply(From, State#state.paused),
State;
peek ->
State#state{peek = From};
Other ->
error_logger:warning_report({unknown_message, Other}),
gen:reply(From, {unknown_message, Other}),
State
end,
worker_loop(State1)
after 0 ->
case State#state.paused of
true ->
timer:sleep(1000),
worker_loop(State);
false ->
NewState = worker_dequeue(State),
case NewState#state.delay of
0 -> ok;
Delay -> timer:sleep(Delay)
end,
worker_loop(NewState)
end
end.
worker_dequeue(#state{module=Module, queue_name=QueueName, peek=Peek} = State) ->
case (catch ets_queue:dequeue(QueueName)) of
empty -> State;
{ok, Message} ->
if
Peek =/= undefined -> gen:reply(Peek, Message);
true -> ok
end,
case (catch Module:process(Message)) of
{'EXIT', Reason} ->
error_logger:error_report({process_exception, {module, Module}, {queue, QueueName}, {reason, Reason}});
Result ->
error_logger:info_report([{Module, process, [Message]}, Result])
end,
State#state{peek = undefined};
not_running ->
error_logger:warning_report({queue_not_running, pausing_observer, {module, Module}, {queue, QueueName}}),
State#state{paused = true};
{'EXIT', Reason} ->
error_logger:error_report({?MODULE, ?LINE, {module, Module}, {queue, QueueName}, {reason, Reason}}),
State;
Other ->
error_logger:error_report([unhandled_message, Other]),
State
end.
bootstrap_queue(Server) ->
case global:whereis_name(Server) of
undefined -> proc_lib:start(?MODULE, queue_init, [self(), Server]);
Pid -> {ok, Pid}
end.
queue_init(Parent, Server) ->
global:register_name(Server, self()),
Table = list_to_atom(atom_to_list(Server) ++ "_table"),
ets:new(Table, [ordered_set, named_table, protected]),
proc_lib:init_ack(Parent, {ok, self()}),
ets_queue:queue_loop(Server, Table).
queue_loop(Server, Table) ->
receive
{_From, queue, Item} ->
ets:insert(Table, {now(), Item});
{From, dequeue} ->
case ets:first(Table) of
'$end_of_table' -> From ! empty;
Key ->
[Object] = ets:lookup(Table, Key),
ets:delete(Table, Key),
From ! {ok, Object}
end;
{From, info} ->
From ! ets:info(Table)
end,
ets_queue:queue_loop(Server, Table).
queue(Server, Message) ->
{ok, ServerPid} = bootstrap_queue(Server),
ServerPid ! {self(), queue, Message},
ok.
dequeue(Server) ->
case global:whereis_name(Server) of
undefined -> not_running;
Pid ->
Pid ! {self(), dequeue},
receive X -> X end
end.
-module(iplaywar_queue).
-behavior(ets_queue).
-export([init/1, process/1]).
init(_) ->
{ok, iplaywar_queue}.
process(Data) ->
iplaywar_data:process_character(Data),
ok.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment