|
|
|
@ -2,27 +2,178 @@
|
|
|
|
|
|
|
|
|
|
-compile(export_all).
|
|
|
|
|
|
|
|
|
|
% 4 Создайте и экспортируйте вспомогательные функции для запуска нового серверного процесса, обслуживающего очередь
|
|
|
|
|
start() ->
|
|
|
|
|
Queue = [],
|
|
|
|
|
spawn(?MODULE, server, [Queue]).
|
|
|
|
|
-include("logging.hrl")
|
|
|
|
|
-include_lib("/usr/lib/erlang/lib/xmerl-1.3.26/include/xmerl.hrl")
|
|
|
|
|
|
|
|
|
|
% The server implements the gen_server behavior.
|
|
|
|
|
-behaviour(gen_server).
|
|
|
|
|
|
|
|
|
|
% callbacks
|
|
|
|
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
|
|
|
|
code_change/3]).
|
|
|
|
|
|
|
|
|
|
% Additional helper functions exported by the callback module.
|
|
|
|
|
-export([start/1]).
|
|
|
|
|
|
|
|
|
|
-define(TIMEOUT, 10000).
|
|
|
|
|
-record(rssState, {queue, subscribers}).
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
% TODO: Implement module helper functions.
|
|
|
|
|
|
|
|
|
|
start(Name) -> gen_server:start({local, Name}, ?MODULE, [], []).
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
%% @doc init([]), init([Url])
|
|
|
|
|
%% нужно написать функцию init, и в обоих версиях функции start которые запускают процессы добавить вызовы init. После этого, init может вызвать вашу функцию server с необходимым набором аргументов.
|
|
|
|
|
init([]) ->
|
|
|
|
|
process_flag(trap_exit, true),
|
|
|
|
|
{ok, #rssState{queue=[], subscribers=sets:new()}};
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
init([Url]) ->
|
|
|
|
|
RSS = #rssState{queue=[], subscribers=sets:new()},
|
|
|
|
|
process_flag(trap_exit, true),
|
|
|
|
|
rss_reader:start(Url, self()), {ok, RSS}.
|
|
|
|
|
|
|
|
|
|
%% @doc Обработка запроса
|
|
|
|
|
handle_call(_Request={subscribe, QPid}, _From, State=#rssState{queue=Queue1, subscribers=Subscribers1}) ->
|
|
|
|
|
{Reply, NewState} = case sets:is_element(QPid, Subscribers1) of
|
|
|
|
|
true ->
|
|
|
|
|
{{error, already_subscribed}, State};
|
|
|
|
|
false ->
|
|
|
|
|
erlang:monitor(process,QPid),
|
|
|
|
|
?INFO("`handle_call`: New subscriber ~p to ~p~n",[QPid, self()]),
|
|
|
|
|
[add_item(QPid, Item) || Item <- Queue1],
|
|
|
|
|
{ok, State#rssState{subscribers=sets:add_element(QPid, Subscribers1)}}
|
|
|
|
|
end,
|
|
|
|
|
{reply, Reply, NewState};
|
|
|
|
|
|
|
|
|
|
%% @doc Обработка запроса
|
|
|
|
|
handle_call(_Request={get_all}, _From, State=#rssState{queue=Queue1}) ->
|
|
|
|
|
{reply, Queue1, State};
|
|
|
|
|
|
|
|
|
|
%% @doc Обработка запроса
|
|
|
|
|
handle_call(_Request, _From, State) ->
|
|
|
|
|
{reply, {error, {unknown_request,_Request}}, State}.
|
|
|
|
|
|
|
|
|
|
%% @doc Обработка `cast`
|
|
|
|
|
handle_cast(_Msg={add_item, RSSItem=#xmlElement{name=item}}, State=#rssState{queue=Queue1, subscribers=Subscribers1}) ->
|
|
|
|
|
NewQ = push_item(RSSItem, Queue1, Subscribers1),
|
|
|
|
|
{noreply, State#rssState{queue=NewQ}};
|
|
|
|
|
|
|
|
|
|
%% @doc Обработка `cast` - для отписки
|
|
|
|
|
handle_cast(_Msg={unsubscribe, QPid}, State=#rssState{subscribers=Subscribers1}) -> { noreply, State#rssState{subscribers=sets:del_element(QPid, Subscribers1)} };
|
|
|
|
|
|
|
|
|
|
%% @doc Обработка `cast`
|
|
|
|
|
handle_cast(_Msg, State) -> ?WARN("Unknown msg {~p} to Q{~p}", [_Msg, State]), {noreply, State}.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
%% @doc Обработка info
|
|
|
|
|
handle_info(_Info={'DOWN', _, _, QPid, _Reason}, State=#rssState{subscribers=Subscribers1}) ->
|
|
|
|
|
{noreply, State#rssState{subscribers=sets:del_element(QPid, Subscribers1)}};
|
|
|
|
|
|
|
|
|
|
%% @doc Обработка info
|
|
|
|
|
handle_info(_Info={'EXIT', FromPid, _Reason}, State) ->
|
|
|
|
|
?ERROR("Something went wront in ~p coz: ~n", [FromPid, _Reason]),
|
|
|
|
|
{noreply, State};
|
|
|
|
|
|
|
|
|
|
%% @doc Обработка info
|
|
|
|
|
handle_info(_Info, State) -> {noreply, State}.
|
|
|
|
|
|
|
|
|
|
%% @doc Pre-terminated call
|
|
|
|
|
terminate(_Reason, _State) -> ok.
|
|
|
|
|
|
|
|
|
|
%% @doc Изменение кода
|
|
|
|
|
code_change(_OldVsn, State, _Extra) -> {ok, State}.
|
|
|
|
|
|
|
|
|
|
%% @doc подписка
|
|
|
|
|
subscribe(From, To) ->
|
|
|
|
|
gen_server:call(To, {subscribe, From}).
|
|
|
|
|
|
|
|
|
|
%% @doc start()
|
|
|
|
|
%% Для того чтобы сделать подписчику, вам надо добавить к состоянию очереди новое поле Subscribers, которое будет использоваться для хранения PID'ов очередей. Запускает очередь с заданным именем
|
|
|
|
|
start(Name) ->
|
|
|
|
|
gen_server:start({local, Name}, ?MODULE, [], []).
|
|
|
|
|
|
|
|
|
|
%% @doc start(Url)
|
|
|
|
|
%% Добавьте новую функцию start/1, которая имеет один аргумент URL, и запускает процесс rss_queue, который привязывается к процессу чтения, получая от него элементы ленты RSS, как это обсуждалось выше. Запускает очередь с заданным именем и URL-адресом
|
|
|
|
|
start(Name, Url) ->
|
|
|
|
|
gen_server:start({local, Name}, ?MODULE, [Url], []).
|
|
|
|
|
|
|
|
|
|
%% @doc Добавление в очередь
|
|
|
|
|
%% Функция добавляет в очередь очередной элемент
|
|
|
|
|
add_item(QPid, Item) when is_pid(QPid) ->
|
|
|
|
|
State = gen_server:cast(QPid, {add_item, Item}),
|
|
|
|
|
State.
|
|
|
|
|
|
|
|
|
|
%% @doc Добавление Фида
|
|
|
|
|
add_feed(QPid, RSS2Feed) ->
|
|
|
|
|
Feed = rss_parse:get_feed_items(RSS2Feed),
|
|
|
|
|
lists:foreach(
|
|
|
|
|
fun(Item) ->
|
|
|
|
|
add_item(QPid, Item)
|
|
|
|
|
end, Feed),
|
|
|
|
|
?INFO("Collection length: ~p ~n", [length(Feed)]),
|
|
|
|
|
ok.
|
|
|
|
|
|
|
|
|
|
%% @doc Получение элементов
|
|
|
|
|
%% Получает все эелементы фидов
|
|
|
|
|
get_all(QPid) when is_pid(QPid) ->
|
|
|
|
|
gen_server:call(QPid, {get_all}).
|
|
|
|
|
|
|
|
|
|
%% @doc Основный цикл программы, который слушает события
|
|
|
|
|
%% {add_item, RSSItem} и {get_all, RegPid}.
|
|
|
|
|
server(Queue, Subscribers) ->
|
|
|
|
|
receive
|
|
|
|
|
% Измените код обработки этого сообщения так, чтобы входящие элементы RSS ленты, новые или обновленные версии старых, дополнительно транслировались каждому подписчику очереди
|
|
|
|
|
% (async) - Добавляет элемент во внутреннюю очередь, если он новый или обновленный. Если элемент новый или обновленный он также транслируется всем подписчикам.
|
|
|
|
|
{add_item, RSSItem} ->
|
|
|
|
|
UpdatedQueue = push_item(RSSItem, Queue, Subscribers),
|
|
|
|
|
server(UpdatedQueue, Subscribers);
|
|
|
|
|
% (sync) - Запрашивает текущий список элементов RSS ленты. Список элементов возвращается процессу с идентификатором ReqPid
|
|
|
|
|
{get_all, RegPid} ->
|
|
|
|
|
RegPid ! {self(), Queue},
|
|
|
|
|
server(Queue, Subscribers);
|
|
|
|
|
% Добавьте обработку нового сообщения {unsubscribe, QPid}, которое удаляет заданный PID из множества подписчиков
|
|
|
|
|
% (async) - Удаляет указанную очередь RSS из списка подписчиков.
|
|
|
|
|
{unsubscribe, QPid} ->
|
|
|
|
|
server(Queue, sets:del_element(QPid,Subscribers));
|
|
|
|
|
% Обработайте новое сообщение {subscribe, QPid}, которое позволяет очереди подписаться на получение элементов ленты другой очереди
|
|
|
|
|
% (sync) - Подписывает указанную очередь RSS на получение сообщений очереди. Если QPid отсутствует в списке подписчиков, в ответ возвращается список всех элементов ленты RSS, которые имеются в очереди
|
|
|
|
|
{subscribe, QPid} ->
|
|
|
|
|
[add_item(QPid,Item) || Item <- Queue],
|
|
|
|
|
server(Queue, sets:add_element(QPid,Subscribers));
|
|
|
|
|
% Измените очередь так, чтобы она могла получать сообщения 'EXIT' или 'DOWN' от очередей подписчиков, в зависимости от того выбрали вы способ с установкой связи между процессами, или мониторинг очереди подписчика.
|
|
|
|
|
{'DOWN',_ , _, QPid, _} ->
|
|
|
|
|
server(Queue, sets:del_element(QPid,Subscribers))
|
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
%% @doc Функция для сравнения дат
|
|
|
|
|
date_comporator(A, B) ->
|
|
|
|
|
rss_parse:get_item_time(A) < rss_parse:get_item_time(B).
|
|
|
|
|
|
|
|
|
|
%% @doc Передача новых элементов
|
|
|
|
|
broadcast(Item, Subscribers) ->
|
|
|
|
|
[add_item(SubscriberPid, Item) || SubscriberPid <- sets:to_list(Subscribers)].
|
|
|
|
|
|
|
|
|
|
%% @doc Добавление RSS-элемента очередь -- процедуру определенно требуется разделить на несколько вспомогательных функций
|
|
|
|
|
push_item(RSSItem, Queue) ->
|
|
|
|
|
{State, FoundItem} = search_item(RSSItem, Queue), %3.1.2
|
|
|
|
|
case State of
|
|
|
|
|
same ->
|
|
|
|
|
Queue; % 3.1.4.1 мы просто игнорируем его.
|
|
|
|
|
updated ->
|
|
|
|
|
QueueUpdated = Queue--[FoundItem], % 3.1.5.1 старую версию удалить из очереди
|
|
|
|
|
lists:sort(fun date_comporator/2, QueueUpdated++[RSSItem]); % 3.1.5.2 прежде чем добавлять новую
|
|
|
|
|
different ->
|
|
|
|
|
lists:sort(fun date_comporator/2, Queue++[RSSItem])
|
|
|
|
|
end.
|
|
|
|
|
push_item(RSSItem, Queue, Subscribers) ->
|
|
|
|
|
{Res, OldItem} = search_item(RSSItem, Queue),
|
|
|
|
|
case Res of
|
|
|
|
|
same ->
|
|
|
|
|
?INFO("`update_queue`: same_item ~n", []),
|
|
|
|
|
Queue;
|
|
|
|
|
updated ->
|
|
|
|
|
?INFO("`update_queue`: updated_item ~n", []),
|
|
|
|
|
broadcast(RSSItem, Subscribers),
|
|
|
|
|
QueueUpdated = Queue--[OldItem],
|
|
|
|
|
lists:sort(fun date_comporator/2, QueueUpdated++[RSSItem]);
|
|
|
|
|
different ->
|
|
|
|
|
?INFO("`update_queue`: different_item ~n", []),
|
|
|
|
|
broadcast(RSSItem, Subscribers),
|
|
|
|
|
lists:sort(fun date_comporator/2, Queue++[RSSItem])
|
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
% 2. Модуль должен содержать функцию server, в которой реализован цикл сервера
|
|
|
|
|
server(Queue) -> % 2.1 Пока состояние очереди должно хранить только список элементов ленты
|
|
|
|
@ -48,32 +199,6 @@ search_item(RSSItem, Queue) ->
|
|
|
|
|
different -> search_item(RSSItem, Tail) % 3.1.6 Если в очереди отсутствует такой же элемент
|
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
%% @doc 5. Добавление элемента
|
|
|
|
|
add_item(QPid, Item)
|
|
|
|
|
when is_pid(QPid) ->
|
|
|
|
|
QPid ! {add_item, Item},
|
|
|
|
|
ok.
|
|
|
|
|
|
|
|
|
|
%% @doc 6. Добавление Фида (не сложно написать, с учетом ранее проделанной работы)
|
|
|
|
|
add_feed(QPid, RSS2Feed) ->
|
|
|
|
|
Items = rss_parse:get_feed_items(RSS2Feed),
|
|
|
|
|
lists:foreach(fun(Item) ->
|
|
|
|
|
add_item(QPid, Item)
|
|
|
|
|
end, Items),
|
|
|
|
|
ok.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
%% @doc 7. Получения RSS
|
|
|
|
|
get_all(QPid) when is_pid(QPid) ->
|
|
|
|
|
QPid ! {get_all, self()},
|
|
|
|
|
receive
|
|
|
|
|
{QPid, List} ->
|
|
|
|
|
io:format("length ~p~n",[length(List)]),
|
|
|
|
|
List
|
|
|
|
|
after 1000 ->
|
|
|
|
|
{error, timeout}
|
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
%%% функция для тестирования
|
|
|
|
|
test() ->
|
|
|
|
|
PID = start(),
|
|
|
|
|