Compare commits
2 Commits
bac73b1a85
...
f7f73cd26f
Author | SHA1 | Date |
---|---|---|
Ivan I. Ovchinnikov | f7f73cd26f | |
Ivan I. Ovchinnikov | eebaa34291 |
|
@ -0,0 +1,15 @@
|
||||||
|
% Helper functions for logging to the error-logger, or printing trace messages
|
||||||
|
% to the console.
|
||||||
|
|
||||||
|
-define(TRACE(Format, Data),
|
||||||
|
io:format("[TRACE] ~p ~p: " ++ Format, [?MODULE, self()] ++ Data)).
|
||||||
|
|
||||||
|
-define(INFO(Format, Data),
|
||||||
|
error_logger:info_msg("~p ~p: " ++ Format, [?MODULE, self()] ++ Data)).
|
||||||
|
|
||||||
|
-define(WARN(Format, Data),
|
||||||
|
error_logger:warning_msg("~p ~p: " ++ Format, [?MODULE, self()] ++ Data)).
|
||||||
|
|
||||||
|
-define(ERROR(Format, Data),
|
||||||
|
error_logger:error_msg("~p ~p: " ++ Format, [?MODULE, self()] ++ Data)).
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
-module(rss_parse).
|
-module(rss_parse).
|
||||||
|
|
||||||
-export([is_rss2_feed/1, get_feed_items/1, get_item_time/1, compare_feed_items/2]).
|
-export([is_rss2_feed/1, get_feed_items/1, get_item_time/1, compare_feed_items/2]).
|
||||||
-include_lib("/usr/lib/erlang/lib/xmerl-1.3.26/include/xmerl.hrl").
|
-include_lib("/usr/lib/erlang/lib/xmerl-1.3.26/include/xmerl.hrl"<).
|
||||||
|
|
||||||
% В этой функции вызываем функцию `xmerl_scan:file/1`, которая возвращает парсер XML-документа. Затем используем `xmerl_xpath:string/2` для поиска элемента `<rss>` с атрибутом `version` равным "2.0". Если такой элемент найден, то функция возвращает true, иначе false.
|
% В этой функции вызываем функцию `xmerl_scan:file/1`, которая возвращает парсер XML-документа. Затем используем `xmerl_xpath:string/2` для поиска элемента `<rss>` с атрибутом `version` равным "2.0". Если такой элемент найден, то функция возвращает true, иначе false.
|
||||||
is_rss2_feed(Name)->
|
is_rss2_feed(Name)->
|
||||||
|
|
|
@ -2,25 +2,176 @@
|
||||||
|
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
|
|
||||||
% 4 Создайте и экспортируйте вспомогательные функции для запуска нового серверного процесса, обслуживающего очередь
|
-include("logging.hrl")
|
||||||
start() ->
|
-include_lib("/usr/lib/erlang/lib/xmerl-1.3.26/include/xmerl.hrl")
|
||||||
Queue = [],
|
|
||||||
spawn(?MODULE, server, [Queue]).
|
% The server implements the gen_server behavior.
|
||||||
|
-behaviour(gen_server).
|
||||||
|
|
||||||
|
% callbacks
|
||||||
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
||||||
|
code_change/3]).
|
||||||
|
|
||||||
|
% Additional helper functions exported by the callback module.
|
||||||
|
-export([start/1]).
|
||||||
|
|
||||||
|
-define(TIMEOUT, 10000).
|
||||||
|
-record(rssState, {queue, subscribers}).
|
||||||
|
|
||||||
|
|
||||||
|
% TODO: Implement module helper functions.
|
||||||
|
|
||||||
|
start(Name) -> gen_server:start({local, Name}, ?MODULE, [], []).
|
||||||
|
|
||||||
|
|
||||||
|
%% @doc init([]), init([Url])
|
||||||
|
%% нужно написать функцию init, и в обоих версиях функции start которые запускают процессы добавить вызовы init. После этого, init может вызвать вашу функцию server с необходимым набором аргументов.
|
||||||
|
init([]) ->
|
||||||
|
process_flag(trap_exit, true),
|
||||||
|
{ok, #rssState{queue=[], subscribers=sets:new()}};
|
||||||
|
|
||||||
|
|
||||||
|
init([Url]) ->
|
||||||
|
RSS = #rssState{queue=[], subscribers=sets:new()},
|
||||||
|
process_flag(trap_exit, true),
|
||||||
|
rss_reader:start(Url, self()), {ok, RSS}.
|
||||||
|
|
||||||
|
%% @doc Обработка запроса
|
||||||
|
handle_call(_Request={subscribe, QPid}, _From, State=#rssState{queue=Queue1, subscribers=Subscribers1}) ->
|
||||||
|
{Reply, NewState} = case sets:is_element(QPid, Subscribers1) of
|
||||||
|
true ->
|
||||||
|
{{error, already_subscribed}, State};
|
||||||
|
false ->
|
||||||
|
erlang:monitor(process,QPid),
|
||||||
|
?INFO("`handle_call`: New subscriber ~p to ~p~n",[QPid, self()]),
|
||||||
|
[add_item(QPid, Item) || Item <- Queue1],
|
||||||
|
{ok, State#rssState{subscribers=sets:add_element(QPid, Subscribers1)}}
|
||||||
|
end,
|
||||||
|
{reply, Reply, NewState};
|
||||||
|
|
||||||
|
%% @doc Обработка запроса
|
||||||
|
handle_call(_Request={get_all}, _From, State=#rssState{queue=Queue1}) ->
|
||||||
|
{reply, Queue1, State};
|
||||||
|
|
||||||
|
%% @doc Обработка запроса
|
||||||
|
handle_call(_Request, _From, State) ->
|
||||||
|
{reply, {error, {unknown_request,_Request}}, State}.
|
||||||
|
|
||||||
|
%% @doc Обработка `cast`
|
||||||
|
handle_cast(_Msg={add_item, RSSItem=#xmlElement{name=item}}, State=#rssState{queue=Queue1, subscribers=Subscribers1}) ->
|
||||||
|
NewQ = push_item(RSSItem, Queue1, Subscribers1),
|
||||||
|
{noreply, State#rssState{queue=NewQ}};
|
||||||
|
|
||||||
|
%% @doc Обработка `cast` - для отписки
|
||||||
|
handle_cast(_Msg={unsubscribe, QPid}, State=#rssState{subscribers=Subscribers1}) -> { noreply, State#rssState{subscribers=sets:del_element(QPid, Subscribers1)} };
|
||||||
|
|
||||||
|
%% @doc Обработка `cast`
|
||||||
|
handle_cast(_Msg, State) -> ?WARN("Unknown msg {~p} to Q{~p}", [_Msg, State]), {noreply, State}.
|
||||||
|
|
||||||
|
|
||||||
|
%% @doc Обработка info
|
||||||
|
handle_info(_Info={'DOWN', _, _, QPid, _Reason}, State=#rssState{subscribers=Subscribers1}) ->
|
||||||
|
{noreply, State#rssState{subscribers=sets:del_element(QPid, Subscribers1)}};
|
||||||
|
|
||||||
|
%% @doc Обработка info
|
||||||
|
handle_info(_Info={'EXIT', FromPid, _Reason}, State) ->
|
||||||
|
?ERROR("Something went wront in ~p coz: ~n", [FromPid, _Reason]),
|
||||||
|
{noreply, State};
|
||||||
|
|
||||||
|
%% @doc Обработка info
|
||||||
|
handle_info(_Info, State) -> {noreply, State}.
|
||||||
|
|
||||||
|
%% @doc Pre-terminated call
|
||||||
|
terminate(_Reason, _State) -> ok.
|
||||||
|
|
||||||
|
%% @doc Изменение кода
|
||||||
|
code_change(_OldVsn, State, _Extra) -> {ok, State}.
|
||||||
|
|
||||||
|
%% @doc подписка
|
||||||
|
subscribe(From, To) ->
|
||||||
|
gen_server:call(To, {subscribe, From}).
|
||||||
|
|
||||||
|
%% @doc start()
|
||||||
|
%% Для того чтобы сделать подписчику, вам надо добавить к состоянию очереди новое поле Subscribers, которое будет использоваться для хранения PID'ов очередей. Запускает очередь с заданным именем
|
||||||
|
start(Name) ->
|
||||||
|
gen_server:start({local, Name}, ?MODULE, [], []).
|
||||||
|
|
||||||
|
%% @doc start(Url)
|
||||||
|
%% Добавьте новую функцию start/1, которая имеет один аргумент URL, и запускает процесс rss_queue, который привязывается к процессу чтения, получая от него элементы ленты RSS, как это обсуждалось выше. Запускает очередь с заданным именем и URL-адресом
|
||||||
|
start(Name, Url) ->
|
||||||
|
gen_server:start({local, Name}, ?MODULE, [Url], []).
|
||||||
|
|
||||||
|
%% @doc Добавление в очередь
|
||||||
|
%% Функция добавляет в очередь очередной элемент
|
||||||
|
add_item(QPid, Item) when is_pid(QPid) ->
|
||||||
|
State = gen_server:cast(QPid, {add_item, Item}),
|
||||||
|
State.
|
||||||
|
|
||||||
|
%% @doc Добавление Фида
|
||||||
|
add_feed(QPid, RSS2Feed) ->
|
||||||
|
Feed = rss_parse:get_feed_items(RSS2Feed),
|
||||||
|
lists:foreach(
|
||||||
|
fun(Item) ->
|
||||||
|
add_item(QPid, Item)
|
||||||
|
end, Feed),
|
||||||
|
?INFO("Collection length: ~p ~n", [length(Feed)]),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%% @doc Получение элементов
|
||||||
|
%% Получает все эелементы фидов
|
||||||
|
get_all(QPid) when is_pid(QPid) ->
|
||||||
|
gen_server:call(QPid, {get_all}).
|
||||||
|
|
||||||
|
%% @doc Основный цикл программы, который слушает события
|
||||||
|
%% {add_item, RSSItem} и {get_all, RegPid}.
|
||||||
|
server(Queue, Subscribers) ->
|
||||||
|
receive
|
||||||
|
% Измените код обработки этого сообщения так, чтобы входящие элементы RSS ленты, новые или обновленные версии старых, дополнительно транслировались каждому подписчику очереди
|
||||||
|
% (async) - Добавляет элемент во внутреннюю очередь, если он новый или обновленный. Если элемент новый или обновленный он также транслируется всем подписчикам.
|
||||||
|
{add_item, RSSItem} ->
|
||||||
|
UpdatedQueue = push_item(RSSItem, Queue, Subscribers),
|
||||||
|
server(UpdatedQueue, Subscribers);
|
||||||
|
% (sync) - Запрашивает текущий список элементов RSS ленты. Список элементов возвращается процессу с идентификатором ReqPid
|
||||||
|
{get_all, RegPid} ->
|
||||||
|
RegPid ! {self(), Queue},
|
||||||
|
server(Queue, Subscribers);
|
||||||
|
% Добавьте обработку нового сообщения {unsubscribe, QPid}, которое удаляет заданный PID из множества подписчиков
|
||||||
|
% (async) - Удаляет указанную очередь RSS из списка подписчиков.
|
||||||
|
{unsubscribe, QPid} ->
|
||||||
|
server(Queue, sets:del_element(QPid,Subscribers));
|
||||||
|
% Обработайте новое сообщение {subscribe, QPid}, которое позволяет очереди подписаться на получение элементов ленты другой очереди
|
||||||
|
% (sync) - Подписывает указанную очередь RSS на получение сообщений очереди. Если QPid отсутствует в списке подписчиков, в ответ возвращается список всех элементов ленты RSS, которые имеются в очереди
|
||||||
|
{subscribe, QPid} ->
|
||||||
|
[add_item(QPid,Item) || Item <- Queue],
|
||||||
|
server(Queue, sets:add_element(QPid,Subscribers));
|
||||||
|
% Измените очередь так, чтобы она могла получать сообщения 'EXIT' или 'DOWN' от очередей подписчиков, в зависимости от того выбрали вы способ с установкой связи между процессами, или мониторинг очереди подписчика.
|
||||||
|
{'DOWN',_ , _, QPid, _} ->
|
||||||
|
server(Queue, sets:del_element(QPid,Subscribers))
|
||||||
|
end.
|
||||||
|
|
||||||
%% @doc Функция для сравнения дат
|
%% @doc Функция для сравнения дат
|
||||||
date_comporator(A, B) ->
|
date_comporator(A, B) ->
|
||||||
rss_parse:get_item_time(A) < rss_parse:get_item_time(B).
|
rss_parse:get_item_time(A) < rss_parse:get_item_time(B).
|
||||||
|
|
||||||
|
%% @doc Передача новых элементов
|
||||||
|
broadcast(Item, Subscribers) ->
|
||||||
|
[add_item(SubscriberPid, Item) || SubscriberPid <- sets:to_list(Subscribers)].
|
||||||
|
|
||||||
%% @doc Добавление RSS-элемента очередь -- процедуру определенно требуется разделить на несколько вспомогательных функций
|
%% @doc Добавление RSS-элемента очередь -- процедуру определенно требуется разделить на несколько вспомогательных функций
|
||||||
push_item(RSSItem, Queue) ->
|
push_item(RSSItem, Queue, Subscribers) ->
|
||||||
{State, FoundItem} = search_item(RSSItem, Queue), %3.1.2
|
{Res, OldItem} = search_item(RSSItem, Queue),
|
||||||
case State of
|
case Res of
|
||||||
same ->
|
same ->
|
||||||
Queue; % 3.1.4.1 мы просто игнорируем его.
|
?INFO("`update_queue`: same_item ~n", []),
|
||||||
|
Queue;
|
||||||
updated ->
|
updated ->
|
||||||
QueueUpdated = Queue--[FoundItem], % 3.1.5.1 старую версию удалить из очереди
|
?INFO("`update_queue`: updated_item ~n", []),
|
||||||
lists:sort(fun date_comporator/2, QueueUpdated++[RSSItem]); % 3.1.5.2 прежде чем добавлять новую
|
broadcast(RSSItem, Subscribers),
|
||||||
|
QueueUpdated = Queue--[OldItem],
|
||||||
|
lists:sort(fun date_comporator/2, QueueUpdated++[RSSItem]);
|
||||||
different ->
|
different ->
|
||||||
|
?INFO("`update_queue`: different_item ~n", []),
|
||||||
|
broadcast(RSSItem, Subscribers),
|
||||||
lists:sort(fun date_comporator/2, Queue++[RSSItem])
|
lists:sort(fun date_comporator/2, Queue++[RSSItem])
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -48,32 +199,6 @@ search_item(RSSItem, Queue) ->
|
||||||
different -> search_item(RSSItem, Tail) % 3.1.6 Если в очереди отсутствует такой же элемент
|
different -> search_item(RSSItem, Tail) % 3.1.6 Если в очереди отсутствует такой же элемент
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @doc 5. Добавление элемента
|
|
||||||
add_item(QPid, Item)
|
|
||||||
when is_pid(QPid) ->
|
|
||||||
QPid ! {add_item, Item},
|
|
||||||
ok.
|
|
||||||
|
|
||||||
%% @doc 6. Добавление Фида (не сложно написать, с учетом ранее проделанной работы)
|
|
||||||
add_feed(QPid, RSS2Feed) ->
|
|
||||||
Items = rss_parse:get_feed_items(RSS2Feed),
|
|
||||||
lists:foreach(fun(Item) ->
|
|
||||||
add_item(QPid, Item)
|
|
||||||
end, Items),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
|
|
||||||
%% @doc 7. Получения RSS
|
|
||||||
get_all(QPid) when is_pid(QPid) ->
|
|
||||||
QPid ! {get_all, self()},
|
|
||||||
receive
|
|
||||||
{QPid, List} ->
|
|
||||||
io:format("length ~p~n",[length(List)]),
|
|
||||||
List
|
|
||||||
after 1000 ->
|
|
||||||
{error, timeout}
|
|
||||||
end.
|
|
||||||
|
|
||||||
%%% функция для тестирования
|
%%% функция для тестирования
|
||||||
test() ->
|
test() ->
|
||||||
PID = start(),
|
PID = start(),
|
||||||
|
|
|
@ -0,0 +1,40 @@
|
||||||
|
-module(rss_reader).
|
||||||
|
-include("logging.hrl").
|
||||||
|
-compile(export_all).
|
||||||
|
-define(RETRIEVE,3000).
|
||||||
|
|
||||||
|
%% @doc start(Url, QPid)
|
||||||
|
%% Запуск сервера
|
||||||
|
start(Url,QPid)->
|
||||||
|
ssl:start(),
|
||||||
|
spawn(?MODULE,server,[Url,QPid]).
|
||||||
|
|
||||||
|
%% @doc реализация чтения
|
||||||
|
%% Загружаем ленту с указанного URL, с помощью функции httpc:request/1.
|
||||||
|
%% Если код ответа равен 200, извлекаем тело ответа и разбирает его XML содержимое с помощью функции xmerl_scan:string/1.
|
||||||
|
%% Когда информация извлечена из тела запроса, проверяем, что получена лента в формате RSS 2.0. Для этого есть написанная в задании 3 функция rss_parse:is_rss2_feed/1.
|
||||||
|
%% Если все вышеперечисленные шаги завершились без ошибок, с помощью написанной в прошлом задании вспомогательной функции rss_queue:add_feed/2 отправляем все элементы ленты в очередь, которая стоит в паре с этим процессом чтения.
|
||||||
|
%% Ждем заданное время, затем возвращаемся к началу и продолжаем все заново.
|
||||||
|
|
||||||
|
%% Обычно, процессы чтения RSS новостей обновляют свои ленты через большие интервалы времени, начиная с 15 минут до часа. Однако, так вам будет сложно тестировать свою программу, поэтому на время разработки этот интервал можно уменьшить до 60 секунд или около того. Объявите с помощью макроса константу, например, RETRIEVE_INTERVAL, с тем чтобы в последствии вы могли бы легко изменить значение таймаута не отыскивая место в коде, где он используется.
|
||||||
|
|
||||||
|
server(Url, QPid)->
|
||||||
|
?INFO("URL ~s~n", [Url]),
|
||||||
|
{ok,{{_,Code,_},_,Load}}=httpc:request(Url),
|
||||||
|
case Code of
|
||||||
|
200 ->
|
||||||
|
?INFO("HTTPCode ~s~n", [Code]),
|
||||||
|
|
||||||
|
{RSS,_} = xmerl_scan:string(Load),
|
||||||
|
case rss_parse:is_rss2_feed(RSS) of
|
||||||
|
ok ->
|
||||||
|
rss_queue:add_feed(QPid,RSS),
|
||||||
|
receive
|
||||||
|
after ?RETRIEVE ->
|
||||||
|
server(Url, QPid)
|
||||||
|
end;
|
||||||
|
_ ->
|
||||||
|
{error,not_rss2_feed}
|
||||||
|
end;
|
||||||
|
_ -> {error,Code}
|
||||||
|
end.
|
Loading…
Reference in New Issue