Skip to content

Instantly share code, notes, and snippets.

@snaiper80
Forked from ngerakines/ets_queue.erl
Created August 21, 2014 05:05
Show Gist options
  • Save snaiper80/02e582ee6ac7e224fe86 to your computer and use it in GitHub Desktop.
Save snaiper80/02e582ee6ac7e224fe86 to your computer and use it in GitHub Desktop.

Revisions

  1. @ngerakines ngerakines created this gist Oct 26, 2009.
    149 changes: 149 additions & 0 deletions ets_queue.erl
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,149 @@
    -module(ets_queue).
    -export([behaviour_info/1]).
    -define(DEFAULT_PAUSED, false).
    -define(DEFAULT_DELAY, 500).

    -record(state, {module, queue_name, paused, delay, peek}).

    behaviour_info(callbacks) -> [{init, 1}, {process, 1}];
    behaviour_info(_) -> undefined.

    -compile(export_all).

    start_worker(Module, MonitorConfig) ->
    proc_lib:start_link(ets_queue, worker_init, [self(), Module, MonitorConfig]).

    worker_init(Parent, Module, MonitorConfig) ->
    try Module:init(Module) of
    {ok, QueueName} ->
    bootstrap_queue(QueueName),
    pg2:create(Module),
    pg2:join(Module, self()),
    proc_lib:init_ack(Parent, {ok, self()}),
    error_logger:info_report([?MODULE, {start_worker, Module}, init_ok]),
    State = #state{
    module = Module,
    queue_name = QueueName,
    paused = proplists:get_value(paused, MonitorConfig, ?DEFAULT_PAUSED),
    delay = proplists:get_value(delay, MonitorConfig, ?DEFAULT_DELAY)
    },
    ets_queue:worker_loop(State);
    Response ->
    error_logger:error_report([?MODULE, {start_worker, init_nok}, Response]),
    proc_lib:init_ack(Parent, {error, Response})
    catch
    Ma:Mi ->
    error_logger:error_report([?MODULE, {start_worker, init_nok}, {Ma, Mi}]),
    proc_lib:init_ack(Parent, {Ma, Mi})
    end.

    worker_loop(State) ->
    receive
    {'$ets_queue_worker', From, Message} ->
    State1 = case Message of
    pause ->
    gen:reply(From, ok),
    State#state{paused = true};
    unpause ->
    gen:reply(From, ok),
    State#state{paused = false};
    {set_delay, N} ->
    gen:reply(From, ok),
    State#state{delay = N};
    get_delay ->
    gen:reply(From, State#state.delay),
    State;
    is_paused ->
    gen:reply(From, State#state.paused),
    State;
    peek ->
    State#state{peek = From};
    Other ->
    error_logger:warning_report({unknown_message, Other}),
    gen:reply(From, {unknown_message, Other}),
    State
    end,
    worker_loop(State1)
    after 0 ->
    case State#state.paused of
    true ->
    timer:sleep(1000),
    worker_loop(State);
    false ->
    NewState = worker_dequeue(State),
    case NewState#state.delay of
    0 -> ok;
    Delay -> timer:sleep(Delay)
    end,
    worker_loop(NewState)
    end
    end.

    worker_dequeue(#state{module=Module, queue_name=QueueName, peek=Peek} = State) ->
    case (catch ets_queue:dequeue(QueueName)) of
    empty -> State;
    {ok, Message} ->
    if
    Peek =/= undefined -> gen:reply(Peek, Message);
    true -> ok
    end,
    case (catch Module:process(Message)) of
    {'EXIT', Reason} ->
    error_logger:error_report({process_exception, {module, Module}, {queue, QueueName}, {reason, Reason}});
    Result ->
    error_logger:info_report([{Module, process, [Message]}, Result])
    end,
    State#state{peek = undefined};
    not_running ->
    error_logger:warning_report({queue_not_running, pausing_observer, {module, Module}, {queue, QueueName}}),
    State#state{paused = true};
    {'EXIT', Reason} ->
    error_logger:error_report({?MODULE, ?LINE, {module, Module}, {queue, QueueName}, {reason, Reason}}),
    State;
    Other ->
    error_logger:error_report([unhandled_message, Other]),
    State
    end.

    bootstrap_queue(Server) ->
    case global:whereis_name(Server) of
    undefined -> proc_lib:start(?MODULE, queue_init, [self(), Server]);
    Pid -> {ok, Pid}
    end.

    queue_init(Parent, Server) ->
    global:register_name(Server, self()),
    Table = list_to_atom(atom_to_list(Server) ++ "_table"),
    ets:new(Table, [ordered_set, named_table, protected]),
    proc_lib:init_ack(Parent, {ok, self()}),
    ets_queue:queue_loop(Server, Table).

    queue_loop(Server, Table) ->
    receive
    {_From, queue, Item} ->
    ets:insert(Table, {now(), Item});
    {From, dequeue} ->
    case ets:first(Table) of
    '$end_of_table' -> From ! empty;
    Key ->
    [Object] = ets:lookup(Table, Key),
    ets:delete(Table, Key),
    From ! {ok, Object}
    end;
    {From, info} ->
    From ! ets:info(Table)
    end,
    ets_queue:queue_loop(Server, Table).

    queue(Server, Message) ->
    {ok, ServerPid} = bootstrap_queue(Server),
    ServerPid ! {self(), queue, Message},
    ok.

    dequeue(Server) ->
    case global:whereis_name(Server) of
    undefined -> not_running;
    Pid ->
    Pid ! {self(), dequeue},
    receive X -> X end
    end.
    10 changes: 10 additions & 0 deletions iplaywar_queue.erl
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,10 @@
    -module(iplaywar_queue).
    -behavior(ets_queue).
    -export([init/1, process/1]).

    init(_) ->
    {ok, iplaywar_queue}.

    process(Data) ->
    iplaywar_data:process_character(Data),
    ok.