Skip to content

Instantly share code, notes, and snippets.

@gdamjan
Created September 8, 2011 00:34
Show Gist options
  • Select an option

  • Save gdamjan/1202289 to your computer and use it in GitHub Desktop.

Select an option

Save gdamjan/1202289 to your computer and use it in GitHub Desktop.

Revisions

  1. gdamjan created this gist Sep 8, 2011.
    83 changes: 83 additions & 0 deletions twitter_stream.erl
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,83 @@
    -module(twitter_stream).
    -author("[email protected]").

    %% Depends on:
    %% ibrowse for http
    %% couchbeam for couchbeam_json_stream (a fork of damienkatz json_stream_parse)
    %% mochiweb for mochiweb_util:urlencode

    -export([start/0]).


    % replace this with whatever you need
    print_message(Json) ->
    % io:format("~p~n--------~n", [Json]),
    Text = proplists:get_value(<<"text">>, Json),
    TextStriped = re:replace(Text, "\\s+", " ", [global, {return, binary}]),
    Id = proplists:get_value(<<"id_str">>, Json),
    {User} = proplists:get_value(<<"user">>, Json),
    UserName = proplists:get_value(<<"screen_name">>, User),
    io:format("~ts~n", [<<"https://twitter.com/", UserName/binary, "/status/",
    Id/binary, " | ", TextStriped/binary>>]).


    event_fun(Data, UserFun) ->
    UserFun(Data),
    fun({Data1}) -> event_fun(Data1, UserFun) end.


    start() ->
    ibrowse:start(),

    Url = "http://stream.twitter.com/1/statuses/filter.json",
    Query = mochiweb_util:urlencode([
    {"track", "erlang,python,ruby,c++"},
    {"follow", "15804774,50393960,187371887"} % some random twitter users
    ]),
    Headers = [{"content-type","application/x-www-form-urlencoded"},
    {"accept", "*/*"}, {"user-agent", "ibrowse"} ],
    Options = [{basic_auth, {"MY_USER_NAME", "AND_PASSWORD"}},
    {response_format, binary},
    {stream_to, {self(), once}},
    {inactivity_timeout, 60000}
    ],
    {ibrowse_req_id, ReqId} = ibrowse:send_req(Url, Headers, post, Query, Options, infinity),
    stream_loop(
    fun ()-> data_fun(ReqId) end,
    fun({Data}) -> event_fun(Data, fun print_message/1) end
    ).


    stream_loop(DataFun, UserFun) ->
    {DataFun2, _, Rest} = couchbeam_json_stream:events(
    DataFun,
    fun(Ev) -> collect_object(Ev, UserFun) end),
    stream_loop(fun() -> {Rest, DataFun2} end, UserFun).

    collect_object(object_start, UserFun) ->
    fun(Ev) ->
    couchbeam_json_stream:collect_object(Ev,
    fun(Obj) -> UserFun(Obj) end)
    end.


    data_fun(ReqId) ->
    receive
    {ibrowse_async_headers, ReqId, "200", Headers} ->
    io:format("~p ~n", [Headers]),
    data_fun(ReqId);
    {ibrowse_async_headers, ReqId, [$4|_], Headers} ->
    io:format("~p ~n", [Headers]),
    done;
    {ibrowse_async_headers, ReqId, [$5|_], Headers} ->
    io:format("~p ~n", [Headers]),
    done;
    {ibrowse_async_response, ReqId, {error, Reason}} ->
    io:format("~p ~n", [{error, Reason}]),
    done;
    {ibrowse_async_response, ReqId, Chunk} ->
    ibrowse:stream_next(ReqId),
    {Chunk, fun () -> data_fun(ReqId) end};
    {ibrowse_async_response_end, ReqId} ->
    done
    end.