Compare commits

..

No commits in common. "f7f73cd26f4929832e71f2bf5f23458a6cebc60e" and "bac73b1a858c6faac64a2846bd872c0c1c5b050e" have entirely different histories.

4 changed files with 42 additions and 222 deletions

View File

@ -1,15 +0,0 @@
% Helper functions for logging to the error-logger, or printing trace messages
% to the console.
-define(TRACE(Format, Data),
io:format("[TRACE] ~p ~p: " ++ Format, [?MODULE, self()] ++ Data)).
-define(INFO(Format, Data),
error_logger:info_msg("~p ~p: " ++ Format, [?MODULE, self()] ++ Data)).
-define(WARN(Format, Data),
error_logger:warning_msg("~p ~p: " ++ Format, [?MODULE, self()] ++ Data)).
-define(ERROR(Format, Data),
error_logger:error_msg("~p ~p: " ++ Format, [?MODULE, self()] ++ Data)).

View File

@ -1,7 +1,7 @@
-module(rss_parse).
-export([is_rss2_feed/1, get_feed_items/1, get_item_time/1, compare_feed_items/2]).
-include_lib("/usr/lib/erlang/lib/xmerl-1.3.26/include/xmerl.hrl"<).
-include_lib("/usr/lib/erlang/lib/xmerl-1.3.26/include/xmerl.hrl").
% В этой функции вызываем функцию `xmerl_scan:file/1`, которая возвращает парсер XML-документа. Затем используем `xmerl_xpath:string/2` для поиска элемента `<rss>` с атрибутом `version` равным "2.0". Если такой элемент найден, то функция возвращает true, иначе false.
is_rss2_feed(Name)->

View File

@ -2,178 +2,27 @@
-compile(export_all).
-include("logging.hrl")
-include_lib("/usr/lib/erlang/lib/xmerl-1.3.26/include/xmerl.hrl")
% The server implements the gen_server behavior.
-behaviour(gen_server).
% callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
% Additional helper functions exported by the callback module.
-export([start/1]).
-define(TIMEOUT, 10000).
-record(rssState, {queue, subscribers}).
% TODO: Implement module helper functions.
start(Name) -> gen_server:start({local, Name}, ?MODULE, [], []).
%% @doc init([]), init([Url])
%% нужно написать функцию init, и в обоих версиях функции start которые запускают процессы добавить вызовы init. После этого, init может вызвать вашу функцию server с необходимым набором аргументов.
init([]) ->
process_flag(trap_exit, true),
{ok, #rssState{queue=[], subscribers=sets:new()}};
init([Url]) ->
RSS = #rssState{queue=[], subscribers=sets:new()},
process_flag(trap_exit, true),
rss_reader:start(Url, self()), {ok, RSS}.
%% @doc Обработка запроса
handle_call(_Request={subscribe, QPid}, _From, State=#rssState{queue=Queue1, subscribers=Subscribers1}) ->
{Reply, NewState} = case sets:is_element(QPid, Subscribers1) of
true ->
{{error, already_subscribed}, State};
false ->
erlang:monitor(process,QPid),
?INFO("`handle_call`: New subscriber ~p to ~p~n",[QPid, self()]),
[add_item(QPid, Item) || Item <- Queue1],
{ok, State#rssState{subscribers=sets:add_element(QPid, Subscribers1)}}
end,
{reply, Reply, NewState};
%% @doc Обработка запроса
handle_call(_Request={get_all}, _From, State=#rssState{queue=Queue1}) ->
{reply, Queue1, State};
%% @doc Обработка запроса
handle_call(_Request, _From, State) ->
{reply, {error, {unknown_request,_Request}}, State}.
%% @doc Обработка `cast`
handle_cast(_Msg={add_item, RSSItem=#xmlElement{name=item}}, State=#rssState{queue=Queue1, subscribers=Subscribers1}) ->
NewQ = push_item(RSSItem, Queue1, Subscribers1),
{noreply, State#rssState{queue=NewQ}};
%% @doc Обработка `cast` - для отписки
handle_cast(_Msg={unsubscribe, QPid}, State=#rssState{subscribers=Subscribers1}) -> { noreply, State#rssState{subscribers=sets:del_element(QPid, Subscribers1)} };
%% @doc Обработка `cast`
handle_cast(_Msg, State) -> ?WARN("Unknown msg {~p} to Q{~p}", [_Msg, State]), {noreply, State}.
%% @doc Обработка info
handle_info(_Info={'DOWN', _, _, QPid, _Reason}, State=#rssState{subscribers=Subscribers1}) ->
{noreply, State#rssState{subscribers=sets:del_element(QPid, Subscribers1)}};
%% @doc Обработка info
handle_info(_Info={'EXIT', FromPid, _Reason}, State) ->
?ERROR("Something went wront in ~p coz: ~n", [FromPid, _Reason]),
{noreply, State};
%% @doc Обработка info
handle_info(_Info, State) -> {noreply, State}.
%% @doc Pre-terminated call
terminate(_Reason, _State) -> ok.
%% @doc Изменение кода
code_change(_OldVsn, State, _Extra) -> {ok, State}.
%% @doc подписка
subscribe(From, To) ->
gen_server:call(To, {subscribe, From}).
%% @doc start()
%% Для того чтобы сделать подписчику, вам надо добавить к состоянию очереди новое поле Subscribers, которое будет использоваться для хранения PID'ов очередей. Запускает очередь с заданным именем
start(Name) ->
gen_server:start({local, Name}, ?MODULE, [], []).
%% @doc start(Url)
%% Добавьте новую функцию start/1, которая имеет один аргумент URL, и запускает процесс rss_queue, который привязывается к процессу чтения, получая от него элементы ленты RSS, как это обсуждалось выше. Запускает очередь с заданным именем и URL-адресом
start(Name, Url) ->
gen_server:start({local, Name}, ?MODULE, [Url], []).
%% @doc Добавление в очередь
%% Функция добавляет в очередь очередной элемент
add_item(QPid, Item) when is_pid(QPid) ->
State = gen_server:cast(QPid, {add_item, Item}),
State.
%% @doc Добавление Фида
add_feed(QPid, RSS2Feed) ->
Feed = rss_parse:get_feed_items(RSS2Feed),
lists:foreach(
fun(Item) ->
add_item(QPid, Item)
end, Feed),
?INFO("Collection length: ~p ~n", [length(Feed)]),
ok.
%% @doc Получение элементов
%% Получает все эелементы фидов
get_all(QPid) when is_pid(QPid) ->
gen_server:call(QPid, {get_all}).
%% @doc Основный цикл программы, который слушает события
%% {add_item, RSSItem} и {get_all, RegPid}.
server(Queue, Subscribers) ->
receive
% Измените код обработки этого сообщения так, чтобы входящие элементы RSS ленты, новые или обновленные версии старых, дополнительно транслировались каждому подписчику очереди
% (async) - Добавляет элемент во внутреннюю очередь, если он новый или обновленный. Если элемент новый или обновленный он также транслируется всем подписчикам.
{add_item, RSSItem} ->
UpdatedQueue = push_item(RSSItem, Queue, Subscribers),
server(UpdatedQueue, Subscribers);
% (sync) - Запрашивает текущий список элементов RSS ленты. Список элементов возвращается процессу с идентификатором ReqPid
{get_all, RegPid} ->
RegPid ! {self(), Queue},
server(Queue, Subscribers);
% Добавьте обработку нового сообщения {unsubscribe, QPid}, которое удаляет заданный PID из множества подписчиков
% (async) - Удаляет указанную очередь RSS из списка подписчиков.
{unsubscribe, QPid} ->
server(Queue, sets:del_element(QPid,Subscribers));
% Обработайте новое сообщение {subscribe, QPid}, которое позволяет очереди подписаться на получение элементов ленты другой очереди
% (sync) - Подписывает указанную очередь RSS на получение сообщений очереди. Если QPid отсутствует в списке подписчиков, в ответ возвращается список всех элементов ленты RSS, которые имеются в очереди
{subscribe, QPid} ->
[add_item(QPid,Item) || Item <- Queue],
server(Queue, sets:add_element(QPid,Subscribers));
% Измените очередь так, чтобы она могла получать сообщения 'EXIT' или 'DOWN' от очередей подписчиков, в зависимости от того выбрали вы способ с установкой связи между процессами, или мониторинг очереди подписчика.
{'DOWN',_ , _, QPid, _} ->
server(Queue, sets:del_element(QPid,Subscribers))
end.
% 4 Создайте и экспортируйте вспомогательные функции для запуска нового серверного процесса, обслуживающего очередь
start() ->
Queue = [],
spawn(?MODULE, server, [Queue]).
%% @doc Функция для сравнения дат
date_comporator(A, B) ->
rss_parse:get_item_time(A) < rss_parse:get_item_time(B).
%% @doc Передача новых элементов
broadcast(Item, Subscribers) ->
[add_item(SubscriberPid, Item) || SubscriberPid <- sets:to_list(Subscribers)].
%% @doc Добавление RSS-элемента очередь -- процедуру определенно требуется разделить на несколько вспомогательных функций
push_item(RSSItem, Queue, Subscribers) ->
{Res, OldItem} = search_item(RSSItem, Queue),
case Res of
same ->
?INFO("`update_queue`: same_item ~n", []),
Queue;
updated ->
?INFO("`update_queue`: updated_item ~n", []),
broadcast(RSSItem, Subscribers),
QueueUpdated = Queue--[OldItem],
lists:sort(fun date_comporator/2, QueueUpdated++[RSSItem]);
different ->
?INFO("`update_queue`: different_item ~n", []),
broadcast(RSSItem, Subscribers),
lists:sort(fun date_comporator/2, Queue++[RSSItem])
end.
push_item(RSSItem, Queue) ->
{State, FoundItem} = search_item(RSSItem, Queue), %3.1.2
case State of
same ->
Queue; % 3.1.4.1 мы просто игнорируем его.
updated ->
QueueUpdated = Queue--[FoundItem], % 3.1.5.1 старую версию удалить из очереди
lists:sort(fun date_comporator/2, QueueUpdated++[RSSItem]); % 3.1.5.2 прежде чем добавлять новую
different ->
lists:sort(fun date_comporator/2, Queue++[RSSItem])
end.
% 2. Модуль должен содержать функцию server, в которой реализован цикл сервера
server(Queue) -> % 2.1 Пока состояние очереди должно хранить только список элементов ленты
@ -199,6 +48,32 @@ search_item(RSSItem, Queue) ->
different -> search_item(RSSItem, Tail) % 3.1.6 Если в очереди отсутствует такой же элемент
end.
%% @doc 5. Добавление элемента
add_item(QPid, Item)
when is_pid(QPid) ->
QPid ! {add_item, Item},
ok.
%% @doc 6. Добавление Фида (не сложно написать, с учетом ранее проделанной работы)
add_feed(QPid, RSS2Feed) ->
Items = rss_parse:get_feed_items(RSS2Feed),
lists:foreach(fun(Item) ->
add_item(QPid, Item)
end, Items),
ok.
%% @doc 7. Получения RSS
get_all(QPid) when is_pid(QPid) ->
QPid ! {get_all, self()},
receive
{QPid, List} ->
io:format("length ~p~n",[length(List)]),
List
after 1000 ->
{error, timeout}
end.
%%% функция для тестирования
test() ->
PID = start(),

View File

@ -1,40 +0,0 @@
-module(rss_reader).
-include("logging.hrl").
-compile(export_all).
-define(RETRIEVE,3000).
%% @doc start(Url, QPid)
%% Запуск сервера
start(Url,QPid)->
ssl:start(),
spawn(?MODULE,server,[Url,QPid]).
%% @doc реализация чтения
%% Загружаем ленту с указанного URL, с помощью функции httpc:request/1.
%% Если код ответа равен 200, извлекаем тело ответа и разбирает его XML содержимое с помощью функции xmerl_scan:string/1.
%% Когда информация извлечена из тела запроса, проверяем, что получена лента в формате RSS 2.0. Для этого есть написанная в задании 3 функция rss_parse:is_rss2_feed/1.
%% Если все вышеперечисленные шаги завершились без ошибок, с помощью написанной в прошлом задании вспомогательной функции rss_queue:add_feed/2 отправляем все элементы ленты в очередь, которая стоит в паре с этим процессом чтения.
%% Ждем заданное время, затем возвращаемся к началу и продолжаем все заново.
%% Обычно, процессы чтения RSS новостей обновляют свои ленты через большие интервалы времени, начиная с 15 минут до часа. Однако, так вам будет сложно тестировать свою программу, поэтому на время разработки этот интервал можно уменьшить до 60 секунд или около того. Объявите с помощью макроса константу, например, RETRIEVE_INTERVAL, с тем чтобы в последствии вы могли бы легко изменить значение таймаута не отыскивая место в коде, где он используется.
server(Url, QPid)->
?INFO("URL ~s~n", [Url]),
{ok,{{_,Code,_},_,Load}}=httpc:request(Url),
case Code of
200 ->
?INFO("HTTPCode ~s~n", [Code]),
{RSS,_} = xmerl_scan:string(Load),
case rss_parse:is_rss2_feed(RSS) of
ok ->
rss_queue:add_feed(QPid,RSS),
receive
after ?RETRIEVE ->
server(Url, QPid)
end;
_ ->
{error,not_rss2_feed}
end;
_ -> {error,Code}
end.