lab06 unchecked

This commit is contained in:
Ivan I. Ovchinnikov 2023-04-26 09:06:39 +03:00
parent eebaa34291
commit f7f73cd26f
2 changed files with 137 additions and 53 deletions

View File

@ -1,7 +1,7 @@
-module(rss_parse).
-export([is_rss2_feed/1, get_feed_items/1, get_item_time/1, compare_feed_items/2]).
-include_lib("/usr/lib/erlang/lib/xmerl-1.3.26/include/xmerl.hrl").
-include_lib("/usr/lib/erlang/lib/xmerl-1.3.26/include/xmerl.hrl"<).
% В этой функции вызываем функцию `xmerl_scan:file/1`, которая возвращает парсер XML-документа. Затем используем `xmerl_xpath:string/2` для поиска элемента `<rss>` с атрибутом `version` равным "2.0". Если такой элемент найден, то функция возвращает true, иначе false.
is_rss2_feed(Name)->

View File

@ -2,43 +2,145 @@
-compile(export_all).
-include("logging.hrl")
-include_lib("/usr/lib/erlang/lib/xmerl-1.3.26/include/xmerl.hrl")
% The server implements the gen_server behavior.
-behaviour(gen_server).
% callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
% Additional helper functions exported by the callback module.
-export([start/1]).
-define(TIMEOUT, 10000).
-record(rssState, {queue, subscribers}).
% TODO: Implement module helper functions.
start(Name) -> gen_server:start({local, Name}, ?MODULE, [], []).
%% @doc init([]), init([Url])
%% нужно написать функцию init, и в обоих версиях функции start которые запускают процессы добавить вызовы init. После этого, init может вызвать вашу функцию server с необходимым набором аргументов.
init([]) ->
start();
process_flag(trap_exit, true),
{ok, #rssState{queue=[], subscribers=sets:new()}};
init([Url]) ->
start(Url).
RSS = #rssState{queue=[], subscribers=sets:new()},
process_flag(trap_exit, true),
rss_reader:start(Url, self()), {ok, RSS}.
%% @doc Обработка запроса
handle_call(_Request={subscribe, QPid}, _From, State=#rssState{queue=Queue1, subscribers=Subscribers1}) ->
{Reply, NewState} = case sets:is_element(QPid, Subscribers1) of
true ->
{{error, already_subscribed}, State};
false ->
erlang:monitor(process,QPid),
?INFO("`handle_call`: New subscriber ~p to ~p~n",[QPid, self()]),
[add_item(QPid, Item) || Item <- Queue1],
{ok, State#rssState{subscribers=sets:add_element(QPid, Subscribers1)}}
end,
{reply, Reply, NewState};
%% @doc Обработка запроса
handle_call(_Request={get_all}, _From, State=#rssState{queue=Queue1}) ->
{reply, Queue1, State};
%% @doc Обработка запроса
handle_call(_Request, _From, State) ->
{reply, {error, {unknown_request,_Request}}, State}.
%% @doc Обработка `cast`
handle_cast(_Msg={add_item, RSSItem=#xmlElement{name=item}}, State=#rssState{queue=Queue1, subscribers=Subscribers1}) ->
NewQ = push_item(RSSItem, Queue1, Subscribers1),
{noreply, State#rssState{queue=NewQ}};
%% @doc Обработка `cast` - для отписки
handle_cast(_Msg={unsubscribe, QPid}, State=#rssState{subscribers=Subscribers1}) -> { noreply, State#rssState{subscribers=sets:del_element(QPid, Subscribers1)} };
%% @doc Обработка `cast`
handle_cast(_Msg, State) -> ?WARN("Unknown msg {~p} to Q{~p}", [_Msg, State]), {noreply, State}.
%% @doc Обработка info
handle_info(_Info={'DOWN', _, _, QPid, _Reason}, State=#rssState{subscribers=Subscribers1}) ->
{noreply, State#rssState{subscribers=sets:del_element(QPid, Subscribers1)}};
%% @doc Обработка info
handle_info(_Info={'EXIT', FromPid, _Reason}, State) ->
?ERROR("Something went wront in ~p coz: ~n", [FromPid, _Reason]),
{noreply, State};
%% @doc Обработка info
handle_info(_Info, State) -> {noreply, State}.
%% @doc Pre-terminated call
terminate(_Reason, _State) -> ok.
%% @doc Изменение кода
code_change(_OldVsn, State, _Extra) -> {ok, State}.
%% @doc подписка
subscribe(From, To) ->
gen_server:call(To, {subscribe, From}).
%% @doc start()
%% Для того чтобы сделать подписчику, вам надо добавить к состоянию очереди новое поле Subscribers, которое будет использоваться для хранения PID'ов очередей.
start() ->
Queue = [],
Subscribers = sets:new(),
spawn(?MODULE, server, [Queue, Subscribers]).
%% Для того чтобы сделать подписчику, вам надо добавить к состоянию очереди новое поле Subscribers, которое будет использоваться для хранения PID'ов очередей. Запускает очередь с заданным именем
start(Name) ->
gen_server:start({local, Name}, ?MODULE, [], []).
%% @doc start(Url)
%% Добавьте новую функцию start/1, которая имеет один аргумент URL, и запускает процесс rss_queue, который привязывается к процессу чтения, получая от него элементы ленты RSS, как это обсуждалось выше.
start(Url) ->
QPid = start(),
rss_reader:start(Url,QPid),
QPid.
%% Добавьте новую функцию start/1, которая имеет один аргумент URL, и запускает процесс rss_queue, который привязывается к процессу чтения, получая от него элементы ленты RSS, как это обсуждалось выше. Запускает очередь с заданным именем и URL-адресом
start(Name, Url) ->
gen_server:start({local, Name}, ?MODULE, [Url], []).
%% @doc Добавление в очередь
%% Функция добавляет в очередь очередной элемент
add_item(QPid, Item) when is_pid(QPid) ->
State = gen_server:cast(QPid, {add_item, Item}),
State.
%% @doc Добавление Фида
add_feed(QPid, RSS2Feed) ->
Feed = rss_parse:get_feed_items(RSS2Feed),
lists:foreach(
fun(Item) ->
add_item(QPid, Item)
end, Feed),
?INFO("Collection length: ~p ~n", [length(Feed)]),
ok.
%% @doc Получение элементов
%% Получает все эелементы фидов
get_all(QPid) when is_pid(QPid) ->
gen_server:call(QPid, {get_all}).
%% @doc Основный цикл программы, который слушает события
%% {add_item, RSSItem} и {get_all, RegPid}.
server(Queue, Subscribers) ->
receive
% Измените код обработки этого сообщения так, чтобы входящие элементы RSS ленты, новые или обновленные версии старых, дополнительно транслировались каждому подписчику очереди
% (async) - Добавляет элемент во внутреннюю очередь, если он новый или обновленный. Если элемент новый или обновленный он также транслируется всем подписчикам.
{add_item, RSSItem} ->
UpdatedQueue = push_item(RSSItem, Queue, Subscribers),
server(UpdatedQueue, Subscribers);
% (sync) - Запрашивает текущий список элементов RSS ленты. Список элементов возвращается процессу с идентификатором ReqPid
{get_all, RegPid} ->
RegPid ! {self(), Queue},
server(Queue, Subscribers);
% Добавьте обработку нового сообщения {unsubscribe, QPid}, которое удаляет заданный PID из множества подписчиков
% (async) - Удаляет указанную очередь RSS из списка подписчиков.
{unsubscribe, QPid} ->
server(Queue, sets:del_element(QPid,Subscribers));
%Обработайте новое сообщение {subscribe, QPid}, которое позволяет очереди подписаться на получение элементов ленты другой очереди
% Обработайте новое сообщение {subscribe, QPid}, которое позволяет очереди подписаться на получение элементов ленты другой очереди
% (sync) - Подписывает указанную очередь RSS на получение сообщений очереди. Если QPid отсутствует в списке подписчиков, в ответ возвращается список всех элементов ленты RSS, которые имеются в очереди
{subscribe, QPid} ->
[add_item(QPid,Item) || Item <- Queue],
server(Queue, sets:add_element(QPid,Subscribers));
@ -47,23 +149,31 @@ server(Queue, Subscribers) ->
server(Queue, sets:del_element(QPid,Subscribers))
end.
%% @doc Функция для сравнения дат
date_comporator(A, B) ->
rss_parse:get_item_time(A) < rss_parse:get_item_time(B).
%% @doc Передача новых элементов
broadcast(Item, Subscribers) ->
[add_item(SubscriberPid, Item) || SubscriberPid <- sets:to_list(Subscribers)].
%% @doc Добавление RSS-элемента очередь -- процедуру определенно требуется разделить на несколько вспомогательных функций
push_item(RSSItem, Queue) ->
{State, FoundItem} = search_item(RSSItem, Queue), %3.1.2
case State of
same ->
Queue; % 3.1.4.1 мы просто игнорируем его.
updated ->
QueueUpdated = Queue--[FoundItem], % 3.1.5.1 старую версию удалить из очереди
lists:sort(fun date_comporator/2, QueueUpdated++[RSSItem]); % 3.1.5.2 прежде чем добавлять новую
different ->
lists:sort(fun date_comporator/2, Queue++[RSSItem])
end.
push_item(RSSItem, Queue, Subscribers) ->
{Res, OldItem} = search_item(RSSItem, Queue),
case Res of
same ->
?INFO("`update_queue`: same_item ~n", []),
Queue;
updated ->
?INFO("`update_queue`: updated_item ~n", []),
broadcast(RSSItem, Subscribers),
QueueUpdated = Queue--[OldItem],
lists:sort(fun date_comporator/2, QueueUpdated++[RSSItem]);
different ->
?INFO("`update_queue`: different_item ~n", []),
broadcast(RSSItem, Subscribers),
lists:sort(fun date_comporator/2, Queue++[RSSItem])
end.
% 2. Модуль должен содержать функцию server, в которой реализован цикл сервера
server(Queue) -> % 2.1 Пока состояние очереди должно хранить только список элементов ленты
@ -89,32 +199,6 @@ search_item(RSSItem, Queue) ->
different -> search_item(RSSItem, Tail) % 3.1.6 Если в очереди отсутствует такой же элемент
end.
%% @doc 5. Добавление элемента
add_item(QPid, Item)
when is_pid(QPid) ->
QPid ! {add_item, Item},
ok.
%% @doc 6. Добавление Фида (не сложно написать, с учетом ранее проделанной работы)
add_feed(QPid, RSS2Feed) ->
Items = rss_parse:get_feed_items(RSS2Feed),
lists:foreach(fun(Item) ->
add_item(QPid, Item)
end, Items),
ok.
%% @doc 7. Получения RSS
get_all(QPid) when is_pid(QPid) ->
QPid ! {get_all, self()},
receive
{QPid, List} ->
io:format("length ~p~n",[length(List)]),
List
after 1000 ->
{error, timeout}
end.
%%% функция для тестирования
test() ->
PID = start(),