From f7f73cd26f4929832e71f2bf5f23458a6cebc60e Mon Sep 17 00:00:00 2001 From: "Ivan I. Ovchinnikov" Date: Wed, 26 Apr 2023 09:06:39 +0300 Subject: [PATCH] lab06 unchecked --- src/rss_parse.erl | 2 +- src/rss_queue.erl | 188 +++++++++++++++++++++++++++++++++------------- 2 files changed, 137 insertions(+), 53 deletions(-) diff --git a/src/rss_parse.erl b/src/rss_parse.erl index 93ce79e..243f922 100644 --- a/src/rss_parse.erl +++ b/src/rss_parse.erl @@ -1,7 +1,7 @@ -module(rss_parse). -export([is_rss2_feed/1, get_feed_items/1, get_item_time/1, compare_feed_items/2]). --include_lib("/usr/lib/erlang/lib/xmerl-1.3.26/include/xmerl.hrl"). +-include_lib("/usr/lib/erlang/lib/xmerl-1.3.26/include/xmerl.hrl"<). % В этой функции вызываем функцию `xmerl_scan:file/1`, которая возвращает парсер XML-документа. Затем используем `xmerl_xpath:string/2` для поиска элемента `` с атрибутом `version` равным "2.0". Если такой элемент найден, то функция возвращает true, иначе false. is_rss2_feed(Name)-> diff --git a/src/rss_queue.erl b/src/rss_queue.erl index 890df91..1e832e5 100644 --- a/src/rss_queue.erl +++ b/src/rss_queue.erl @@ -2,43 +2,145 @@ -compile(export_all). +-include("logging.hrl") +-include_lib("/usr/lib/erlang/lib/xmerl-1.3.26/include/xmerl.hrl") + +% The server implements the gen_server behavior. +-behaviour(gen_server). + +% callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +% Additional helper functions exported by the callback module. +-export([start/1]). + +-define(TIMEOUT, 10000). +-record(rssState, {queue, subscribers}). + + +% TODO: Implement module helper functions. + +start(Name) -> gen_server:start({local, Name}, ?MODULE, [], []). + + %% @doc init([]), init([Url]) %% нужно написать функцию init, и в обоих версиях функции start которые запускают процессы добавить вызовы init. После этого, init может вызвать вашу функцию server с необходимым набором аргументов. init([]) -> - start(); + process_flag(trap_exit, true), + {ok, #rssState{queue=[], subscribers=sets:new()}}; + init([Url]) -> - start(Url). + RSS = #rssState{queue=[], subscribers=sets:new()}, + process_flag(trap_exit, true), + rss_reader:start(Url, self()), {ok, RSS}. + +%% @doc Обработка запроса +handle_call(_Request={subscribe, QPid}, _From, State=#rssState{queue=Queue1, subscribers=Subscribers1}) -> + {Reply, NewState} = case sets:is_element(QPid, Subscribers1) of + true -> + {{error, already_subscribed}, State}; + false -> + erlang:monitor(process,QPid), + ?INFO("`handle_call`: New subscriber ~p to ~p~n",[QPid, self()]), + [add_item(QPid, Item) || Item <- Queue1], + {ok, State#rssState{subscribers=sets:add_element(QPid, Subscribers1)}} + end, + {reply, Reply, NewState}; + +%% @doc Обработка запроса +handle_call(_Request={get_all}, _From, State=#rssState{queue=Queue1}) -> + {reply, Queue1, State}; + +%% @doc Обработка запроса +handle_call(_Request, _From, State) -> + {reply, {error, {unknown_request,_Request}}, State}. + +%% @doc Обработка `cast` +handle_cast(_Msg={add_item, RSSItem=#xmlElement{name=item}}, State=#rssState{queue=Queue1, subscribers=Subscribers1}) -> + NewQ = push_item(RSSItem, Queue1, Subscribers1), + {noreply, State#rssState{queue=NewQ}}; + +%% @doc Обработка `cast` - для отписки +handle_cast(_Msg={unsubscribe, QPid}, State=#rssState{subscribers=Subscribers1}) -> { noreply, State#rssState{subscribers=sets:del_element(QPid, Subscribers1)} }; + +%% @doc Обработка `cast` +handle_cast(_Msg, State) -> ?WARN("Unknown msg {~p} to Q{~p}", [_Msg, State]), {noreply, State}. + + +%% @doc Обработка info +handle_info(_Info={'DOWN', _, _, QPid, _Reason}, State=#rssState{subscribers=Subscribers1}) -> + {noreply, State#rssState{subscribers=sets:del_element(QPid, Subscribers1)}}; + +%% @doc Обработка info +handle_info(_Info={'EXIT', FromPid, _Reason}, State) -> + ?ERROR("Something went wront in ~p coz: ~n", [FromPid, _Reason]), + {noreply, State}; + +%% @doc Обработка info +handle_info(_Info, State) -> {noreply, State}. + +%% @doc Pre-terminated call +terminate(_Reason, _State) -> ok. + +%% @doc Изменение кода +code_change(_OldVsn, State, _Extra) -> {ok, State}. + +%% @doc подписка +subscribe(From, To) -> + gen_server:call(To, {subscribe, From}). %% @doc start() -%% Для того чтобы сделать подписчику, вам надо добавить к состоянию очереди новое поле Subscribers, которое будет использоваться для хранения PID'ов очередей. -start() -> - Queue = [], - Subscribers = sets:new(), - spawn(?MODULE, server, [Queue, Subscribers]). +%% Для того чтобы сделать подписчику, вам надо добавить к состоянию очереди новое поле Subscribers, которое будет использоваться для хранения PID'ов очередей. Запускает очередь с заданным именем +start(Name) -> + gen_server:start({local, Name}, ?MODULE, [], []). %% @doc start(Url) -%% Добавьте новую функцию start/1, которая имеет один аргумент URL, и запускает процесс rss_queue, который привязывается к процессу чтения, получая от него элементы ленты RSS, как это обсуждалось выше. -start(Url) -> - QPid = start(), - rss_reader:start(Url,QPid), - QPid. +%% Добавьте новую функцию start/1, которая имеет один аргумент URL, и запускает процесс rss_queue, который привязывается к процессу чтения, получая от него элементы ленты RSS, как это обсуждалось выше. Запускает очередь с заданным именем и URL-адресом +start(Name, Url) -> + gen_server:start({local, Name}, ?MODULE, [Url], []). + +%% @doc Добавление в очередь +%% Функция добавляет в очередь очередной элемент +add_item(QPid, Item) when is_pid(QPid) -> + State = gen_server:cast(QPid, {add_item, Item}), + State. + +%% @doc Добавление Фида +add_feed(QPid, RSS2Feed) -> + Feed = rss_parse:get_feed_items(RSS2Feed), + lists:foreach( + fun(Item) -> + add_item(QPid, Item) + end, Feed), + ?INFO("Collection length: ~p ~n", [length(Feed)]), + ok. + +%% @doc Получение элементов +%% Получает все эелементы фидов +get_all(QPid) when is_pid(QPid) -> + gen_server:call(QPid, {get_all}). %% @doc Основный цикл программы, который слушает события %% {add_item, RSSItem} и {get_all, RegPid}. server(Queue, Subscribers) -> receive -% Измените код обработки этого сообщения так, чтобы входящие элементы RSS ленты, новые или обновленные версии старых, дополнительно транслировались каждому подписчику очереди +% Измените код обработки этого сообщения так, чтобы входящие элементы RSS ленты, новые или обновленные версии старых, дополнительно транслировались каждому подписчику очереди +% (async) - Добавляет элемент во внутреннюю очередь, если он новый или обновленный. Если элемент новый или обновленный он также транслируется всем подписчикам. {add_item, RSSItem} -> UpdatedQueue = push_item(RSSItem, Queue, Subscribers), server(UpdatedQueue, Subscribers); +% (sync) - Запрашивает текущий список элементов RSS ленты. Список элементов возвращается процессу с идентификатором ReqPid {get_all, RegPid} -> RegPid ! {self(), Queue}, server(Queue, Subscribers); % Добавьте обработку нового сообщения {unsubscribe, QPid}, которое удаляет заданный PID из множества подписчиков +% (async) - Удаляет указанную очередь RSS из списка подписчиков. {unsubscribe, QPid} -> server(Queue, sets:del_element(QPid,Subscribers)); -%Обработайте новое сообщение {subscribe, QPid}, которое позволяет очереди подписаться на получение элементов ленты другой очереди +% Обработайте новое сообщение {subscribe, QPid}, которое позволяет очереди подписаться на получение элементов ленты другой очереди +% (sync) - Подписывает указанную очередь RSS на получение сообщений очереди. Если QPid отсутствует в списке подписчиков, в ответ возвращается список всех элементов ленты RSS, которые имеются в очереди {subscribe, QPid} -> [add_item(QPid,Item) || Item <- Queue], server(Queue, sets:add_element(QPid,Subscribers)); @@ -47,23 +149,31 @@ server(Queue, Subscribers) -> server(Queue, sets:del_element(QPid,Subscribers)) end. - %% @doc Функция для сравнения дат date_comporator(A, B) -> rss_parse:get_item_time(A) < rss_parse:get_item_time(B). +%% @doc Передача новых элементов +broadcast(Item, Subscribers) -> + [add_item(SubscriberPid, Item) || SubscriberPid <- sets:to_list(Subscribers)]. + %% @doc Добавление RSS-элемента очередь -- процедуру определенно требуется разделить на несколько вспомогательных функций -push_item(RSSItem, Queue) -> - {State, FoundItem} = search_item(RSSItem, Queue), %3.1.2 - case State of - same -> - Queue; % 3.1.4.1 мы просто игнорируем его. - updated -> - QueueUpdated = Queue--[FoundItem], % 3.1.5.1 старую версию удалить из очереди - lists:sort(fun date_comporator/2, QueueUpdated++[RSSItem]); % 3.1.5.2 прежде чем добавлять новую - different -> - lists:sort(fun date_comporator/2, Queue++[RSSItem]) - end. +push_item(RSSItem, Queue, Subscribers) -> + {Res, OldItem} = search_item(RSSItem, Queue), + case Res of + same -> + ?INFO("`update_queue`: same_item ~n", []), + Queue; + updated -> + ?INFO("`update_queue`: updated_item ~n", []), + broadcast(RSSItem, Subscribers), + QueueUpdated = Queue--[OldItem], + lists:sort(fun date_comporator/2, QueueUpdated++[RSSItem]); + different -> + ?INFO("`update_queue`: different_item ~n", []), + broadcast(RSSItem, Subscribers), + lists:sort(fun date_comporator/2, Queue++[RSSItem]) + end. % 2. Модуль должен содержать функцию server, в которой реализован цикл сервера server(Queue) -> % 2.1 Пока состояние очереди должно хранить только список элементов ленты @@ -89,32 +199,6 @@ search_item(RSSItem, Queue) -> different -> search_item(RSSItem, Tail) % 3.1.6 Если в очереди отсутствует такой же элемент end. -%% @doc 5. Добавление элемента -add_item(QPid, Item) - when is_pid(QPid) -> - QPid ! {add_item, Item}, - ok. - -%% @doc 6. Добавление Фида (не сложно написать, с учетом ранее проделанной работы) -add_feed(QPid, RSS2Feed) -> - Items = rss_parse:get_feed_items(RSS2Feed), - lists:foreach(fun(Item) -> - add_item(QPid, Item) - end, Items), - ok. - - -%% @doc 7. Получения RSS -get_all(QPid) when is_pid(QPid) -> - QPid ! {get_all, self()}, - receive - {QPid, List} -> - io:format("length ~p~n",[length(List)]), - List - after 1000 -> - {error, timeout} - end. - %%% функция для тестирования test() -> PID = start(),