=== added file 'src/CouchDB/couch_stew_group.erl' --- src/CouchDB/couch_stew_group.erl 1970-01-01 00:00:00 +0000 +++ src/CouchDB/couch_stew_group.erl 2007-12-22 05:55:16 +0000 @@ -0,0 +1,540 @@ +%% CouchDB +%% Copyright (C) 2006 Damien Katz +%% +%% This program is free software; you can redistribute it and/or +%% modify it under the terms of the GNU General Public License +%% as published by the Free Software Foundation; either version 2 +%% of the License, or (at your option) any later version. +%% +%% This program is distributed in the hope that it will be useful, +%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +%% GNU General Public License for more details. +%% +%% You should have received a copy of the GNU General Public License +%% along with this program; if not, write to the Free Software +%% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + +-module(couch_stew_group). +-behaviour(gen_server). + +-export([start_manager/4,stop/1, + open/4,open/5,fold/5,fold/6, + update_group/3,update_view_proc/5,free_groups/2, + update_view_proc/7,get_group_async/3, + less_json/2]). +-export([init/1,terminate/2,handle_call/3,handle_cast/2,handle_info/2,code_change/3]). + +-include("couch_db.hrl"). + +% arbitrarily chosen amount of memory to use before flushing to disk +-define(FLUSH_MAX_MEM, 4000000). + +-record(view_group, + {db, + query_lang, + named_queries, + views, + current_seq, + id_btree, + update_notif_fun, + compiled_doc_map = nil + }). + +-record(view, + {id_num, + name, + btree, + columns, + query_string + }). + +-record(mgr, + {db, + fd, + update_state_by_id = dict:new(), + ids_by_pid = dict:new(), + get_group_info_fun, + update_group_info_fun, + cached_groups=dict:new() + }). + + +start_manager(Db, Fd, GetGroupInfoFun, UpdateGroupInfoFun) -> + gen_server:start_link(couch_stew_group, {Db, Fd, GetGroupInfoFun, UpdateGroupInfoFun}, []). + +stop(MgrPid) -> + gen_server:cast(MgrPid, stop). + +init({Db, Fd, GetGroupInfoFun, UpdateGroupInfoFun}) -> + process_flag(trap_exit, true), + {ok, #mgr{db=Db, fd=Fd, get_group_info_fun=GetGroupInfoFun, update_group_info_fun=UpdateGroupInfoFun}}. + +terminate(_Reason, _Mgr) -> + ok. + +get_group_async(MgrPid, GroupId, OrigFrom) -> + gen_server:cast(MgrPid, {get_group_async, GroupId, OrigFrom}). + + +update_group(MgrPid, GroupId, UpdateNotifFun) -> + gen_server:cast(MgrPid, {update_group, GroupId, UpdateNotifFun}). + +% stops any processing of the view and frees and cached values +free_groups(MgrPid, GroupIds) -> + gen_server:call(MgrPid, {free_groups, GroupIds}). + + +% called from the update process +handle_call({group_cache_update, GroupId, Group}, {FromPid, _FromRef}, Mgr) -> + Group2 = Group#view_group{compiled_doc_map=nil}, + % the process may have been killed by the free groups call + % so check to make sure its alive. + case is_process_alive(FromPid) of + true -> + #mgr{cached_groups=CachedGroups} = Mgr, + CachedGroups2 = dict:store(GroupId, Group2, CachedGroups), + {reply, ok, Mgr#mgr{cached_groups=CachedGroups2}}; + false -> + {reply, ok, Mgr} + end; +handle_call({free_groups, GroupIds}, _From, Mgr) -> + Mgr2 = + lists:foldl(fun(GroupId, MgrAcc) -> + #mgr{cached_groups=CachedGroups, update_state_by_id=ProcsDict} = MgrAcc, + CachedGroups2 = dict:erase(GroupId, CachedGroups), + case dict:find(GroupId, ProcsDict) of + {ok, {processing_request, _NotifyFuns, Pid}} -> + exit(Pid, freed); + {ok, {processing_and_pending_request, _NotifyFuns, _PendingUpdateNotifFuns, Pid}} -> + exit(Pid, freed); + _Else -> + ok + end, + MgrAcc#mgr{cached_groups=CachedGroups2} + end, + Mgr, GroupIds), + {reply, ok, Mgr2}. + +handle_cast({get_group_async, GroupId, OrigFrom}, Mgr) -> + #mgr{ + db=Db, + fd=Fd, + get_group_info_fun=GetGroupInfoFun, + cached_groups=CachedGroups + } = Mgr, + case dict:find(GroupId, CachedGroups) of + {ok, CachedGroup} -> + gen_server:reply(OrigFrom, {ok, CachedGroup}), + {noreply, Mgr}; + error -> + {Mgr2, Reply} = + case GetGroupInfoFun(GroupId) of + {ok, {NamedQueries, GroupState}} -> + {ok, Group} = open(Db, Fd, NamedQueries, GroupState), + NewMgr = Mgr#mgr{cached_groups=dict:store(GroupId, Group, CachedGroups)}, + {NewMgr, {ok, Group}}; + Else -> + {Mgr, Else} + end, + gen_server:reply(OrigFrom, Reply), + {noreply, Mgr2} + end; +handle_cast({update_group, GroupId, UpdateNotifFun}, Mgr) -> + #mgr{ + update_state_by_id=ProcsDict, + ids_by_pid=GroupIdDict, + db=Db, + fd=Fd, + get_group_info_fun=GetGroupInfoFun, + update_group_info_fun=UpdateGroupInfoFun, + cached_groups=CachedGroups + } = Mgr, + case dict:find(GroupId, ProcsDict) of + {ok, {processing_request, NotifyFuns, Pid}} -> + ProcsDict2 = dict:store( + GroupId, + {processing_and_pending_request, NotifyFuns, [UpdateNotifFun], Pid}, + ProcsDict + ), + {noreply, Mgr#mgr{update_state_by_id=ProcsDict2}}; + {ok, {processing_and_pending_request, NotifyFuns, PendingUpdateNotifFuns, Pid}} -> + ProcsDict2 = dict:store( + GroupId, + {processing_and_pending_request, NotifyFuns, [UpdateNotifFun | PendingUpdateNotifFuns], Pid}, + ProcsDict + ), + {noreply, Mgr#mgr{update_state_by_id=ProcsDict2}}; + error -> + case dict:find(GroupId, CachedGroups) of + {ok, Group} -> + Pid = spawn_link(couch_stew_group, update_view_proc, [self(), Group, GroupId, UpdateGroupInfoFun, [UpdateNotifFun]]); + error -> + Pid = spawn_link(couch_stew_group, update_view_proc, [self(), Db, Fd, GroupId, GetGroupInfoFun, UpdateGroupInfoFun, [UpdateNotifFun]]) + end, + ProcsDict2 = dict:store(GroupId, {processing_request, [UpdateNotifFun], Pid}, ProcsDict), + GroupIdDict2 = dict:store(Pid, GroupId, GroupIdDict), + {noreply, Mgr#mgr{update_state_by_id=ProcsDict2, ids_by_pid=GroupIdDict2}} + end; +handle_cast(stop, Mgr) -> + {stop, normal, Mgr}. % causes terminate to be called + +handle_info({'EXIT', FromPid, Reason}, Mgr) -> + #mgr{ + update_state_by_id=ProcsDict, + ids_by_pid=GroupIdDict, + db=Db, + fd=Fd, + get_group_info_fun=GetGroupInfoFun, + update_group_info_fun=UpdateGroupInfoFun, + cached_groups=CachedGroups + } = Mgr, + case dict:find(FromPid, GroupIdDict) of + {ok, GroupId} -> + case dict:find(GroupId, ProcsDict) of + {ok, {processing_request, NotifyFuns, _Pid}} -> + GroupIdDict2 = dict:erase(FromPid, GroupIdDict), + ProcsDict2 = dict:erase(GroupId, ProcsDict); + {ok, {processing_and_pending_request, NotifyFuns, NextNotifyFuns, _Pid}} -> + case dict:find(GroupId, CachedGroups) of + {ok, Group} -> + Pid = spawn_link(couch_stew_group, update_view_proc, [self(), Group, GroupId, UpdateGroupInfoFun, NextNotifyFuns]); + error -> + Pid = spawn_link(couch_stew_group, update_view_proc, [self(), Db, Fd, GroupId, GetGroupInfoFun, UpdateGroupInfoFun, NextNotifyFuns]) + end, + GroupIdDict2 = dict:store(Pid, GroupId, dict:erase(FromPid, GroupIdDict)), + ProcsDict2 = dict:store(GroupId, {processing_request, NextNotifyFuns, Pid}, ProcsDict) + end, + case Reason of + normal -> + ok; + {{nocatch, Error}, _Trace} -> + % process returned abnormally, notify any waiting listeners + [catch NotifyFun(Error) || NotifyFun <- NotifyFuns]; + _Else -> + % process returned abnormally, notify any waiting listeners + [catch NotifyFun(Reason) || NotifyFun <- NotifyFuns] + end, + Mgr2 = Mgr#mgr{update_state_by_id=ProcsDict2, ids_by_pid=GroupIdDict2}, + {noreply, Mgr2}; + error -> + % a linked process must have died, we propagate the error + exit(Reason) + end. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +open(Db, Fd, NamedQueries, GroupState) -> + open(Db, Fd, NamedQueries, GroupState, fun(_View) -> ok end). + +open(Db, Fd, {QueryLang, NamedQueries}, GroupState, UpdateNotifFun) -> + {ViewBtreeStates, CurrentSeq, GroupIdBtreeState} = + case GroupState of + nil -> % new view group, init GroupState to nils + {[ nil || _Query <- NamedQueries], 0, nil}; + GroupState -> + GroupState + end, + {Views, _N} = + lists:mapfoldl(fun({{Name, QueryString}, BtreeState}, N) -> + {ok, Btree} = couch_btree:open(BtreeState, Fd, fun less_json/2), + {#view{name=Name, id_num=N, btree=Btree, query_string=QueryString}, N+1} + end, + 0, lists:zip(NamedQueries, ViewBtreeStates)), + {ok, GroupIdBtree} = couch_btree:open(GroupIdBtreeState, Fd), + ViewGroup = #view_group{db=Db, + views=Views, + current_seq=CurrentSeq, + query_lang=QueryLang, + id_btree=GroupIdBtree, + update_notif_fun=UpdateNotifFun, + named_queries=NamedQueries}, + {ok, ViewGroup}. + + +get_info(#view_group{query_lang=QueryLang, named_queries=NamedQueries, views=Views, current_seq=CurrentSeq, id_btree=GroupIdBtree} = _ViewGroup) -> + ViewBtreeStates = [couch_btree:get_state(View#view.btree) || View <- Views], + {{QueryLang, NamedQueries}, {ViewBtreeStates, CurrentSeq, couch_btree:get_state(GroupIdBtree)}}. + +fold(ViewGroup, ViewName, Dir, Fun, Acc) -> + Result = fold_int(ViewGroup#view_group.views, ViewName, Dir, Fun, Acc), + Result. + +fold_int([], _ViewName, _Dir, _Fun, _Acc) -> + {not_found, missing_named_view}; +fold_int([View | _RestViews], ViewName, Dir, Fun, Acc) when View#view.name == ViewName -> + TotalRowCount = couch_btree:row_count(View#view.btree), + WrapperFun = fun({Key, Value}, Offset, WrapperAcc) -> + Fun(null, Key, Value, Offset, TotalRowCount, WrapperAcc) + end, + {ok, AccResult} = couch_btree:fold(View#view.btree, Dir, WrapperFun, Acc), + {ok, TotalRowCount, AccResult}; +fold_int([_View | RestViews], ViewName, Dir, Fun, Acc) -> + fold_int(RestViews, ViewName, Dir, Fun, Acc). + + +fold(ViewGroup, ViewName, StartKey, Dir, Fun, Acc) -> + Result = fold_int(ViewGroup#view_group.views, ViewName, StartKey, Dir, Fun, Acc), + Result. + +fold_int([], _ViewName, _StartKey, _Dir, _Fun, _Acc) -> + {not_found, missing_named_view}; +fold_int([View | _RestViews], ViewName, StartKey, Dir, Fun, Acc) when View#view.name == ViewName -> + TotalRowCount = couch_btree:row_count(View#view.btree), + WrapperFun = fun({Key, Value}, Offset, WrapperAcc) -> + Fun(null, Key, Value, Offset, TotalRowCount, WrapperAcc) + end, + {ok, AccResult} = couch_btree:fold(View#view.btree, StartKey, Dir, WrapperFun, Acc), + {ok, TotalRowCount, AccResult}; +fold_int([_View | RestViews], ViewName, StartKey, Dir, Fun, Acc) -> + fold_int(RestViews, ViewName, StartKey, Dir, Fun, Acc). + +less_json(A, B) -> + TypeA = type_sort(A), + TypeB = type_sort(B), + if + TypeA == TypeB -> + less_same_type(A,B); + true -> + TypeA < TypeB + end. + +type_sort(V) when is_atom(V) -> 0; +type_sort(V) when is_integer(V) -> 1; +type_sort(V) when is_float(V) -> 1; +type_sort(V) when is_list(V) -> 2; +type_sort({obj, _}) -> 4; % must come before tuple test below +type_sort(V) when is_tuple(V) -> 3; +type_sort(V) when is_binary(V) -> 5. + +atom_sort(nil) -> 0; +atom_sort(null) -> 1; +atom_sort(false) -> 2; +atom_sort(true) -> 3. + +less_same_type(A,B) when is_atom(A) -> + atom_sort(A) < atom_sort(B); +less_same_type(A,B) when is_list(A) -> + couch_util:collate(A, B) < 0; +less_same_type({obj, AProps}, {obj, BProps}) -> + less_props(AProps, BProps); +less_same_type(A, B) when is_tuple(A) -> + less_list(tuple_to_list(A),tuple_to_list(B)); +less_same_type(A, B) -> + A < B. + +ensure_list(V) when is_list(V) -> V; +ensure_list(V) when is_atom(V) -> atom_to_list(V). + +less_props([], [_|_]) -> + true; +less_props(_, []) -> + false; +less_props([{AKey, AValue}|RestA], [{BKey, BValue}|RestB]) -> + case couch_util:collate(ensure_list(AKey), ensure_list(BKey)) of + -1 -> true; + 1 -> false; + 0 -> + case less_json(AValue, BValue) of + true -> true; + false -> + case less_json(BValue, AValue) of + true -> false; + false -> + less_props(RestA, RestB) + end + end + end. + +less_list([], [_|_]) -> + true; +less_list(_, []) -> + false; +less_list([A|RestA], [B|RestB]) -> + case less_json(A,B) of + true -> true; + false -> + case less_json(B,A) of + true -> false; + false -> + less_list(RestA, RestB) + end + end. + + +notify(MgrPid, UpdateStatus, ViewGroup, GroupId, UpdateGroupInfoFun, StatusNotifyFuns) -> + GroupInfo = get_info(ViewGroup), + ok = gen_server:call(MgrPid, {group_cache_update, GroupId, ViewGroup}), + ok = UpdateGroupInfoFun(GroupId, UpdateStatus, GroupInfo), + + StatusNotifyFuns2 = lists:foldl(fun(NotifyFun, AccFuns) -> + case (catch NotifyFun(UpdateStatus)) of + NewNotifyFun when is_function(NewNotifyFun) -> + [NewNotifyFun | AccFuns]; + _Else -> + AccFuns + end + end, + [], StatusNotifyFuns), + fun(UpdateStatus2, ViewGroup2) -> + notify(MgrPid, UpdateStatus2, ViewGroup2, GroupId, UpdateGroupInfoFun, StatusNotifyFuns2) + end. + +update_view_proc(MgrPid, #view_group{} = ViewGroup, GroupId, UpdateGroupInfoFun, StatusNotifyFuns) -> + NotifyFun = + fun(UpdateStatus, ViewGroup2) -> + notify(MgrPid, UpdateStatus, ViewGroup2, GroupId, UpdateGroupInfoFun, StatusNotifyFuns) + end, + update_int(ViewGroup, NotifyFun). + +update_view_proc(MgrPid, Db, Fd, GroupId, GetGroupInfoFun, UpdateGroupInfoFun, StatusNotifyFuns) -> + case GetGroupInfoFun(GroupId) of + {ok, {Queries, GroupState}} -> + {ok, ViewGroup} = open(Db, Fd, Queries, GroupState), + update_view_proc(MgrPid, ViewGroup, GroupId, UpdateGroupInfoFun, StatusNotifyFuns); + Error -> + exit(Error) + end. + +update_int(ViewGroup, NotifyFun) -> + #view_group{ + db=Db, + views=Views, + current_seq=CurrentSeq + } = ViewGroup, + EmptyStewDicts = [dict:new() || _ <- Views], + + % compute on all docs modified since we last computed. + {ok, {UncomputedDocs, ViewGroup2, StewDicts, NewSeq, NotifyFun2}} + = couch_db:enum_docs_since( + Db, + CurrentSeq, + fun(DocInfo, _, Acc) -> process_doc(Db, DocInfo, Acc) end, + {[], ViewGroup, EmptyStewDicts, CurrentSeq, NotifyFun} + ), + + {ViewGroup3, Results} = view_compute(ViewGroup2, UncomputedDocs), + StewDicts2 = stew_unify_query_results(Results, StewDicts), + couch_query_servers:stop_doc_map(ViewGroup3#view_group.compiled_doc_map), + {ok, ViewGroup4} = write_group_changes(ViewGroup3, StewDicts2, NewSeq), + NotifyFun2(complete, ViewGroup4), + {ok, ViewGroup4}. + +%% Per-document consumption support for update_int with additional logic to +%% flush to disk when our memory usage exceeds FLUSH_MAX_MEM. +process_doc(Db, DocInfo, {Docs, ViewGroup, StewDicts, _LastSeq, NotifyFun}) -> + % This fun computes once for each document + #doc_info{id=DocId, + update_seq=Seq} = DocInfo, + case couch_doc:is_special_doc(DocId) of + true -> + % skip this doc + {ok, {Docs, ViewGroup, StewDicts, _LastSeq, NotifyFun}}; + false -> + Docs2 = + case couch_db:open_doc(Db, DocInfo, [conflicts,deleted_conflicts]) of + {ok, Doc} -> + [Doc | Docs]; + {not_found, deleted} -> + Docs + end, + case process_info(self(), memory) of + {memory, Mem} when Mem > ?FLUSH_MAX_MEM -> + {ViewGroup1, Results} = view_compute(ViewGroup, Docs2), + StewDicts2 = stew_unify_query_results(Results, StewDicts), + {ok, ViewGroup2} = write_group_changes(ViewGroup1, StewDicts2, Seq), + garbage_collect(), + NotifyFun2 = NotifyFun(partial, ViewGroup2), + EmptyStewDicts = [dict:new() || _ <- ViewGroup2#view_group.views], + {ok, {[], ViewGroup2, EmptyStewDicts, Seq, NotifyFun2}}; + _Else -> + {ok, {Docs2, ViewGroup, StewDicts, Seq, NotifyFun}} + end + end. + +%% Process a (coordinated) list of documents and mapped query results, returning a +%% list of tuples mapping Stews(/Views) to dicts of keys/summed values (of the +%% query results). +stew_unify_query_results([], StewDicts) -> + StewDicts; +stew_unify_query_results([QueryResults|RestResults], StewDicts) -> + StewDicts2 = stew_dictify_doc_query_results(QueryResults, StewDicts, []), + stew_unify_query_results(RestResults, StewDicts2). + +%% Processes a single document's mapped Results stew-by-stew, summing them with the current +%% set of StewDicts. +%% StewDicts is a list of tuples mapping Stews(/Views) to dictionaries of key/values where +%% the value is a Stew-sum. (Just a number, for now.) +stew_dictify_doc_query_results([], [], StewDictsAcc) -> + lists:reverse(StewDictsAcc); +stew_dictify_doc_query_results([ResultKVs|RestResultKVs], [StewDict|RestStewDicts], StewDictsAcc) -> + StewDict2 = dict:merge( + fun(_Key, Value1, Value2) -> + Value1 + Value2 + end, + dict:from_list(ResultKVs), + StewDict), + stew_dictify_doc_query_results(RestResultKVs, RestStewDicts, [StewDict2|StewDictsAcc]). + +%% Pass the given documents through to the mapping interpreter. For each +%% document, each view mapping function is applied. The result of each mapping +%% function is a list of {key, value} tuples. +%% +%% return: {updated ViewGroup, [Doc-Results+]} +%% where Doc-Results: [View-Results+] +%% where View-Results: [{key1, value1}, {key1, value2}, ...] aka the result of +%% applying the view's mapping function to the Doc in question. +view_compute(ViewGroup, []) -> + {ViewGroup, []}; +view_compute(#view_group{query_lang=QueryLang, compiled_doc_map=DocMap}=ViewGroup, Docs) -> + DocMap1 = + case DocMap of + nil -> % doc map not started + {ok, DocMap0} = couch_query_servers:start_doc_map(QueryLang, queries(ViewGroup)), + DocMap0; + _ -> + DocMap + end, + {ok, Results} = couch_query_servers:map_docs(DocMap1, Docs), + {ViewGroup#view_group{compiled_doc_map=DocMap1}, Results}. + +queries(ViewGroup) -> + [View#view.query_string || View <- ViewGroup#view_group.views]. + + +%% Unify the StewDicts with those in the Btree by looking up all of the current keys from +%% each StewDict in the Btree, doing our sum/unification, and then putting all of the +%% (potentially) revised key/values back in. +%% Because we do not need to touch the Group btree, we are able to sub out the actual +%% per-view logic. +write_group_changes(ViewGroup, StewDicts, NewSeq) -> + % For each Stew(/View)... + Views2 = [write_view_changes(View, StewDict) + || {View, StewDict} + <- lists:zip(ViewGroup#view_group.views, StewDicts)], + ViewGroup2 = ViewGroup#view_group{views=Views2, current_seq=NewSeq}, + {ok, ViewGroup2}. + +write_view_changes(View, StewDict) -> + LookupKeys = dict:fetch_keys(StewDict), + % Perform the look-up + {ok, LookupResults, ViewBtree2} = + couch_btree:query_modify(View#view.btree, LookupKeys, [], []), + % Merge the look-up results with StewDict, + % also, keep track of found keys so that we can remove them + {StewDict2, RemoveKeys} = lists:foldl( + fun(LookupResult, {StewDictAcc, RemoveKeysAcc}) -> + case LookupResult of + {ok, {Key, Value}} -> + {dict:update(Key, fun(OValue) -> Value + OValue end, StewDictAcc), + [Key | RemoveKeysAcc]}; + {not_found, _} -> + {StewDictAcc, RemoveKeysAcc} + end + end, + {StewDict, []}, LookupResults), + % + {ok, ViewBtree3} = couch_btree:add_remove(ViewBtree2, dict:to_list(StewDict2), RemoveKeys), + View#view{btree = ViewBtree3}. === modified file 'src/CouchDB/couch_db.erl' --- src/CouchDB/couch_db.erl 2007-11-10 07:10:31 +0000 +++ src/CouchDB/couch_db.erl 2007-12-21 21:35:10 +0000 @@ -22,6 +22,7 @@ -export([save_docs/2, save_docs/3]). -export([delete_doc/3,open_doc/2,open_doc/3,close/1,enum_docs_since/4,enum_docs_since/5]). -export([enum_docs/4,enum_docs/5, open_doc_revs/4, get_missing_revs/2]). +-export([update_stew_group_sync/2,update_stew_group/2,fold_stew/6,fold_stew/7]). -export([update_view_group_sync/2,update_view_group/2,fold_view/6,fold_view/7,get_info/1]). -export([update_temp_view_group_sync/2, fold_temp_view/5,fold_temp_view/6]). -export([update_loop/2]). @@ -46,6 +47,7 @@ docinfo_by_Id_btree_state = nil, docinfo_by_seq_btree_state = nil, view_group_btree_state = nil, + stew_group_btree_state = nil, local_docs_btree_state = nil, doc_count=0 }). @@ -61,16 +63,19 @@ local_docs_btree, last_update_seq, view_group_btree, + stew_group_btree, doc_count, name, - view_group_mgr + view_group_mgr, + stew_group_mgr }). -record(main, {db, update_pid, view_group_mgr, - temp_view_group_mgr + temp_view_group_mgr, + stew_group_mgr }). start_link(DbName, Filepath, Options) -> @@ -317,6 +322,9 @@ enum_docs(SupPid, StartId, fwd, InFun, Ctx). +update_stew_group(SupPid, ViewGroupDocId) -> + gen_server:call(db_pid(SupPid), {update_stew_group, ViewGroupDocId, fun(_Whatever) -> ok end}). + update_view_group(SupPid, ViewGroupDocId) -> gen_server:call(db_pid(SupPid), {update_view_group, ViewGroupDocId, fun(_Whatever) -> ok end}). @@ -331,6 +339,9 @@ Pid ! {Ref, Else}. +update_stew_group_sync(SupPid, ViewGroupDocId) -> + update_view_group_sync0(SupPid, update_stew_group, ViewGroupDocId). + update_view_group_sync(SupPid, ViewGroupDocId) -> update_view_group_sync0(SupPid, update_view_group, ViewGroupDocId). @@ -351,6 +362,22 @@ Else end. +fold_stew(SupPid, ViewGroupDocId, ViewName, Dir, Fun, Acc) -> + case gen_server:call(db_pid(SupPid), {get_stew_group, ViewGroupDocId}) of + {ok, ViewGroup} -> + couch_stew_group:fold(ViewGroup, ViewName, Dir, Fun, Acc); + Else -> + Else + end. + +fold_stew(SupPid, ViewGroupDocId, ViewName, StartKey, Dir, Fun, Acc) -> + case gen_server:call(db_pid(SupPid), {get_stew_group, ViewGroupDocId}) of + {ok, ViewGroup} -> + couch_stew_group:fold(ViewGroup, ViewName, StartKey, Dir, Fun, Acc); + Else -> + Else + end. + fold_view(SupPid, ViewGroupDocId, ViewName, Dir, Fun, Acc) -> case gen_server:call(db_pid(SupPid), {get_view_group, ViewGroupDocId}) of {ok, ViewGroup} -> @@ -420,6 +447,7 @@ {ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd), {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd), {ok, ViewGroupBtree} = couch_btree:open(Header#db_header.view_group_btree_state, Fd), + {ok, StewGroupBtree} = couch_btree:open(Header#db_header.stew_group_btree_state, Fd), Db = #db{ fd=Fd, @@ -431,6 +459,7 @@ local_docs_btree = LocalDocsBtree, last_update_seq = Header#db_header.last_update_seq, view_group_btree = ViewGroupBtree, + stew_group_btree = StewGroupBtree, doc_count = Header#db_header.doc_count, name = DbName }, @@ -439,6 +468,10 @@ Pid=self(), + GetStewGroupInfoFun = fun(GroupKey) -> + get_stew_group_info(get_db(Pid), GroupKey) + end, + GetViewGroupInfoFun = fun(GroupKey) -> get_view_group_info(get_db(Pid), GroupKey) end, @@ -449,7 +482,13 @@ [$| | Function] = lists:dropwhile(fun($|) -> false; (_) -> true end, GroupKey), {ok, {{Type, [{GroupKey, Function}]}, nil}} end, - + + UpdateStewGroupInfoFun = fun(GroupKey, UpdateStatus, GroupInfo) -> + % send the updated stew group info to the update process + UpdatePid ! {stew_group_updated, GroupKey, UpdateStatus, GroupInfo}, + ok + end, + UpdateViewGroupInfoFun = fun(GroupKey, UpdateStatus, GroupInfo) -> % send the updated view group info to the update process UpdatePid ! {view_group_updated, GroupKey, UpdateStatus, GroupInfo}, @@ -462,17 +501,30 @@ {ok, TempFd} = couch_file:open(Filepath ++ ".temp", [create,overwrite]), {ok, ViewMgr} = couch_view_group:start_manager(Supervisor, Fd, GetViewGroupInfoFun, UpdateViewGroupInfoFun), {ok, TempViewMgr} = couch_view_group:start_manager(Supervisor, TempFd, GetTempViewGroupInfoFun, UpdateTempViewGroupInfoFun), + {ok, StewMgr} = couch_stew_group:start_manager(Supervisor, Fd, GetStewGroupInfoFun, UpdateStewGroupInfoFun), UpdatePid ! {set_view_group_mgr, ViewMgr}, + UpdatePid ! {set_stew_group_mgr, StewMgr}, - {ok, #main{db=Db, update_pid=UpdatePid, view_group_mgr=ViewMgr, temp_view_group_mgr=TempViewMgr}}. + {ok, #main{db=Db, update_pid=UpdatePid, view_group_mgr=ViewMgr, temp_view_group_mgr=TempViewMgr, stew_group_mgr=StewMgr}}. terminate(_Reason, #main{db=Db} = Main) -> Main#main.update_pid ! close, couch_view_group:stop(Main#main.view_group_mgr), couch_view_group:stop(Main#main.temp_view_group_mgr), + couch_stew_group:stop(Main#main.stew_group_mgr), couch_file:close(Db#db.fd). +handle_call({get_stew_group, ViewGroupDocId}, From, #main{db=Db}=Main) -> + case get_doc_info(Db, ViewGroupDocId) of + {ok, #doc_info{deleted=true}} -> + {reply, {not_found, deleted}, Main}; + {ok, DocInfo} -> + ok = couch_stew_group:get_group_async(Main#main.stew_group_mgr, DocInfo, From), + {noreply, Main}; + not_found -> + {reply, {not_found, missing}, Main} + end; handle_call({get_view_group, ViewGroupDocId}, From, #main{db=Db}=Main) -> case get_doc_info(Db, ViewGroupDocId) of {ok, #doc_info{deleted=true}} -> @@ -491,6 +543,14 @@ {noreply, Main}; handle_call(get_db, _From, #main{db=Db}=Main) -> {reply, {ok, Db}, Main}; +handle_call({update_stew_group, Id, UpdateNotifFun}, _From, #main{db=Db}=Main) -> + case get_doc_info(Db, Id) of + {ok, DocInfo} -> + ok = couch_stew_group:update_group(Main#main.stew_group_mgr, DocInfo, UpdateNotifFun), + {reply, ok, Main}; + Error -> + {reply, Error, Main} + end; handle_call({update_view_group, Id, UpdateNotifFun}, _From, #main{db=Db}=Main) -> case get_doc_info(Db, Id) of {ok, DocInfo} -> @@ -526,6 +586,8 @@ receive {set_view_group_mgr, ViewMgr} -> update_loop(MainPid, Db#db{view_group_mgr=ViewMgr}); + {set_stew_group_mgr, StewMgr} -> + update_loop(MainPid, Db#db{stew_group_mgr=StewMgr}); {OrigFrom, update_docs, DocActions, Options} -> {ok, DocResults, Db2} = update_docs_int(Db, DocActions, Options), ok = gen_server:call(MainPid, {db_updated, Db2}), @@ -546,10 +608,42 @@ % doesn't match, don't save in btree update_loop(MainPid, Db) end; + {stew_group_updated, #doc_info{id=Id}=GroupDocInfo, _updateStatus, ViewGroupInfo} -> + case get_doc_info(Db, GroupDocInfo#doc_info.id) of + {ok, GroupDocInfo} -> + % rev on disk matches the rev of the view group being updated + % so we save the info to disk + {ok, GroupBtree2} = couch_btree:add_remove(Db#db.stew_group_btree, [{Id, ViewGroupInfo}], []), + Db2 = Db#db{stew_group_btree=GroupBtree2, uncommitted_writes=true}, + {ok, Db3} = commit_outstanding(Db2), + ok = gen_server:call(MainPid, {db_updated, Db3}), + update_loop(MainPid, Db3); + _Else -> + % doesn't match, don't save in btree + update_loop(MainPid, Db) + end; close -> % terminate loop ok end. + +get_stew_group_info(#db{}=Db, #doc_info{id=Id}=DocInfo) -> + case couch_btree:lookup_single(Db#db.stew_group_btree, Id) of + {ok, {ViewQueries, ViewGroupState}} -> + {ok, {ViewQueries, ViewGroupState}}; + not_found -> + case open_doc_int(Db, DocInfo, []) of + {ok, Doc} -> + case couch_doc:get_view_functions(Doc) of + none -> + {not_found, no_views_found}; + Queries -> + {ok, {Queries, nil}} + end; + Else -> + Else + end + end. get_view_group_info(#db{}=Db, #doc_info{id=Id}=DocInfo) -> case couch_btree:lookup_single(Db#db.view_group_btree, Id) of @@ -908,7 +1002,9 @@ docinfo_by_Id_btree = DocInfoByIdBTree, docinfo_by_seq_btree = DocInfoBySeqBTree, view_group_btree = ViewGroupBTree, - view_group_mgr = ViewGroupMgr + view_group_mgr = ViewGroupMgr, + stew_group_btree = StewGroupBTree, + stew_group_mgr = StewGroupMgr } = Db, % seperate out the NonRep documents from the rest of the documents @@ -951,6 +1047,7 @@ % clear the computed view cache on disk UpdatedIds = [UpdatedId || {UpdatedId, _DocInfo} <- InfoById], {ok, ViewGroupBTree2} = couch_btree:add_remove(ViewGroupBTree, [], UpdatedIds), + {ok, StewGroupBTree2} = couch_btree:add_remove(StewGroupBTree, [], UpdatedIds), % now notify the view group manager to discard any of the view groups it has in memory @@ -963,11 +1060,13 @@ DocsAndOldDocInfo), ok = couch_view_group:free_groups(ViewGroupMgr, OldDocInfos), + ok = couch_stew_group:free_groups(StewGroupMgr, OldDocInfos), Db4 = Db3#db{ docinfo_by_Id_btree = DocInfoByIdBTree2, docinfo_by_seq_btree = DocInfoBySeqBTree2, view_group_btree = ViewGroupBTree2, + stew_group_btree = StewGroupBTree2, uncommitted_writes = true }, === modified file 'src/CouchDB/couch_db.hrl' --- src/CouchDB/couch_db.hrl 2007-12-02 21:41:18 +0000 +++ src/CouchDB/couch_db.hrl 2007-12-21 19:55:31 +0000 @@ -19,6 +19,8 @@ -define(NON_REP_DOC_PREFIX, "_local/"). -define(DESIGN_DOC_PREFIX0, "_design"). -define(DESIGN_DOC_PREFIX, "_design/"). +-define(STEW_DOC_PREFIX0, "_pot"). +-define(STEW_DOC_PREFIX, "_pot/"). -define(DEFAULT_ATTACHMENT_CONTENT_TYPE, "application/octet-stream"). @@ -51,4 +53,4 @@ % key/value tuple of meta information obtained via doc_open Options meta = [] - }). \ No newline at end of file + }). === modified file 'src/CouchDB/couch_doc.erl' --- src/CouchDB/couch_doc.erl 2007-12-02 21:41:18 +0000 +++ src/CouchDB/couch_doc.erl 2007-12-21 19:55:59 +0000 @@ -119,6 +119,8 @@ is_special_doc(?DESIGN_DOC_PREFIX ++ _ ) -> true; +is_special_doc(?STEW_DOC_PREFIX ++ _ ) -> + true; is_special_doc(#doc{id=Id}) -> is_special_doc(Id); is_special_doc(_) -> === modified file 'src/CouchDB/couch_view_group.erl' --- src/CouchDB/couch_view_group.erl 2007-11-10 07:10:31 +0000 +++ src/CouchDB/couch_view_group.erl 2007-12-22 05:55:08 +0000 @@ -28,7 +28,7 @@ -include("couch_db.hrl"). % arbitrarily chosen amount of memory to use before flushing to disk --define(FLUSH_MAX_MEM, 1000000). +-define(FLUSH_MAX_MEM, 4000000). -record(view_group, {db, @@ -423,6 +423,24 @@ {ok, ViewGroup4}. +%% Given a document id (via DocInfo) in a given Db, lookup and store the given +%% document in the accumulator structure, accounting for deleted documents. +%% Takes and returns an accumulator structure which will be processed and +%% flushed when the process memory utilization exceeds FLUSH_MAX_MEM. +%% Processing entails mapping the documents via view_compute, >>> <<< +%% +%% Accumulator structure: +%% Docs: A list of (added/updated) documents to be processed. +%% ViewGroup: The ViewGroup being processed, current state. +%% ViewKVs: A list of tuples of {View, [{{Key, DocId}, Value}+]}. In other +%% words, it maps Views to their key-value pairs, where the key also has +%% the document that produced it bound to it. +%% DocIdViewIdKeys: A list of tuples mapping document id's to their associated +%% keys (of the form {view id, key}). Deleted documents have their document id's +%% mapped to an empty list of keys. +%% LastSeq: +%% NotifyFun: A callback function to be invoked with (partial, ViewGroup) each +%% time we process and flush a block. process_doc(Db, DocInfo, {Docs, ViewGroup, ViewKVs, DocIdViewIdKeys, _LastSeq, NotifyFun}) -> % This fun computes once for each document #doc_info{id=DocId, @@ -453,6 +471,17 @@ end end. +%% Processes a list of documents, document-by-document, updating the ViewKVs mapping and +%% DocIdViewIdKeys mappings for each document. +%% +%% Docs: Documents to process. +%% Results: Results of view_compute; key/value tuples mapped by user view mapping functions. +%% We expect an exact correspondence between Docs[n] and Results[n]. +%% ViewKVs: A list of tuples of {View, [{{Key, DocId}, Value}+]}. Maps Views to the +%% keys/values they have produced and the document that generated the given key/value. +%% DocIdViewIdKeysAcc: List of tuples of {document id, ViewIdKeys}, where ViewIdKeys is +%% a list of {view id, key} tuples. Maps documents to all the keys they have produced +%% and the view under which they were produced. view_insert_query_results([], [], ViewKVs, DocIdViewIdKeysAcc) -> {ViewKVs, DocIdViewIdKeysAcc}; view_insert_query_results([Doc|RestDocs], [QueryResults | RestResults], ViewKVs, DocIdViewIdKeysAcc) -> @@ -461,6 +490,22 @@ view_insert_query_results(RestDocs, RestResults, NewViewKVs, NewDocIdViewIdKeys). +%% Processes a single document's mapped Results view-by-view, returning {ViewKVs, ViewIdKeys} where: +%% ViewKVs is an updated version of the passed-in ViewKVsAcc, mapping Views to their +%% list of associated key-doc-value nested tuples of the form {{Key, DocId}, Value}. +%% ViewIdKeys is a per-document list of {view id, key} tuples. It lists all the keys +%% produced from mapping the given document and the views that produced them. +%% (Because the list of views is traversed left-to-right and builds up the result lists +%% as it goes, the result lists need to be reversed when the function returns, hence +%% the specialized final case.) +%% +%% Doc: The document, beloved for its id. +%% Results: The list of not yet processed mapping results for this document. +%% ViewKVs: The list of not yet processed views and the key-doc-value nested tuples currently +%% associated with each view. +%% ViewKVsAcc: ViewKVs containing the views already processed and updated for the current +%% document (accumulated). +%% ViewIdKeysAcc: List of one-per-document lists of tuples of {view id, key} (accumulated). view_insert_doc_query_results(_Doc, [], [], ViewKVsAcc, ViewIdKeysAcc) -> {lists:reverse(ViewKVsAcc), lists:reverse(ViewIdKeysAcc)}; view_insert_doc_query_results(#doc{id=DocId}=Doc, [ResultKVs|RestResults], [{View, KVs}|RestViewKVs], ViewKVsAcc, ViewIdKeysAcc) -> @@ -470,6 +515,14 @@ NewViewIdKeysAcc = NewViewIdKeys ++ ViewIdKeysAcc, view_insert_doc_query_results(Doc, RestResults, RestViewKVs, NewViewKVsAcc, NewViewIdKeysAcc). +%% Pass the given documents through to the mapping interpreter. For each +%% document, each view mapping function is applied. The result of each mapping +%% function is a list of {key, value} tuples. +%% +%% return: {updated ViewGroup, [Doc-Results+]} +%% where Doc-Results: [View-Results+] +%% where View-Results: [{key1, value1}, {key1, value2}, ...] aka the result of +%% applying the view's mapping function to the Doc in question. view_compute(ViewGroup, []) -> {ViewGroup, []}; view_compute(#view_group{query_lang=QueryLang, compiled_doc_map=DocMap}=ViewGroup, Docs) -> @@ -496,6 +549,18 @@ DefaultValue end. +%% Write changes to a ViewGroup by effectively removing all affected documents (either modified or deleted) +%% from the Group Btrees and view Btrees and then adding all the 'current'/new key/value states (for added +%% or modified documents). +%% +%% ViewGroup: The ViewGroup being updated. +%% ViewKeyValuesToAdd: A list of tuples of {View, [{{Key, DocId}, Value}+]}. In other +%% words, it maps Views to their key-value pairs, where the key also has +%% the document that produced it bound to it. +%% DocIdViewIdKeys: A list of tuples mapping document id's to their associated +%% keys (of the form {view id, key}). Deleted documents have their document id's +%% mapped to an empty list of keys. +%% NewSeq: The new sequence number for this ViewGroup state. write_changes(ViewGroup, ViewKeyValuesToAdd, DocIdViewIdKeys, NewSeq) -> #view_group{id_btree=GroupIdBtree} = ViewGroup, @@ -503,8 +568,17 @@ RemoveDocIds = [DocId || {DocId, ViewIdKeys} <- DocIdViewIdKeys, ViewIdKeys == []], LookupDocIds = [DocId || {DocId, _ViewIdKeys} <- DocIdViewIdKeys], + % Modify the Group Btree, whose keys are document id's and values are lists of {ViewId, Key} tuples. + % Our query removes documents which no longer have any keys (RemoveDocIds), adds/replaces documents + % for which we do have keys (AddDocIdViewIdKeys), and looks-up the existing values for all of the + % documents we are modifying so that we can remove those values from the view maps below. (Rather + % than attempt to figure out the net changes for a modified document, we instead just remove all + % of its previously associated keys/values from the views, then add its new keys/values to the + % views.) {ok, LookupResults, GroupIdBtree2} = couch_btree:query_modify(GroupIdBtree, LookupDocIds, AddDocIdViewIdKeys, RemoveDocIds), + % Traverse LookupResults, accumulating a dictionary that maps Views + % to a list of btree keys ({Key, DocId} tuples). KeysToRemoveByView = lists:foldl( fun(LookupResult, KeysToRemoveByViewAcc) -> case LookupResult of @@ -520,6 +594,10 @@ end, dict:new(), LookupResults), + % For each view, add the key values from ViewKeyValuesToAdd (a ViewKVs map, so all new + % observed {{Key, DocId}, Value} tuples), and remove the keys just accumulated above. + % (This is all done to the View's Btree, which is different from the Group Btree manipulation + % performed above. Views2 = [ begin KeysToRemove = dict_find(View#view.id_num, [], KeysToRemoveByView), === modified file 'src/CouchDB/mod_couch.erl' --- src/CouchDB/mod_couch.erl 2007-12-02 21:41:18 +0000 +++ src/CouchDB/mod_couch.erl 2007-12-22 05:19:12 +0000 @@ -28,6 +28,7 @@ doc = "", attachment = "", view = "", + stew = "", querystr = ""}). -record(doc_query_args, @@ -106,22 +107,30 @@ % lets try to parse out the UriPath. {ok, UrlParts} = regexp:split(UriPath, "/"), - {DbName, Id, Attachment, View} = + {DbName, Id, Attachment, View, Stew} = case UrlParts of [Db] -> - {Db, "", "", ""}; + {Db, "", "", "", ""}; [Db, "_design", Doc] -> - {Db, "_design/" ++ Doc, "", ""}; + {Db, "_design/" ++ Doc, "", "", ""}; [Db, "_design", Doc, Attachment0] -> - {Db, "_design/" ++ Doc, Attachment0, ""}; + {Db, "_design/" ++ Doc, Attachment0, "", ""}; + [Db, "_pot", Doc] -> + {Db, "_pot/" ++ Doc, "", "", ""}; + [Db, "_pot", Doc, Attachment0] -> + {Db, "_pot/" ++ Doc, Attachment0, "", ""}; + [Db, "_stew", Doc, StewName] -> + {Db, "_pot/" ++ Doc, "", "", StewName}; + [Db, "_stew%2f" ++ Doc, StewName] -> + {Db, "_pot/" ++ Doc, "", "", StewName}; [Db, "_view", Doc, ViewName] -> - {Db, "_design/" ++ Doc, "", ViewName}; + {Db, "_design/" ++ Doc, "", ViewName, ""}; [Db, "_view%2f" ++ Doc, ViewName] -> - {Db, "_design/" ++ Doc, "", ViewName}; + {Db, "_design/" ++ Doc, "", ViewName, ""}; [Db, Doc] -> - {Db, Doc, "", ""}; + {Db, Doc, "", "", ""}; [Db, Doc, Attachment0] -> - {Db, Doc, Attachment0, ""}; + {Db, Doc, Attachment0, "", ""}; _ -> throw({invalid_uri, lists:flatten(io_lib:format("Uri has too many parts: ~p", [UrlParts]))}) end, @@ -129,6 +138,7 @@ doc=url_decode(Id), attachment=url_decode(Attachment), view=url_decode(View), + stew=url_decode(Stew), querystr=url_decode(QueryStr)}}. resp_json_header(Mod) -> @@ -220,7 +230,7 @@ send_all_docs_by_seq(Mod, Parts); do(#mod{method="GET"}=Mod, #uri_parts{doc=""}=Parts) -> send_database_info(Mod, Parts); -do(#mod{method=Method}=Mod, #uri_parts{attachment="",view=""}=Parts) +do(#mod{method=Method}=Mod, #uri_parts{attachment="",view="",stew=""}=Parts) when Method == "GET" orelse Method == "HEAD" -> #doc_query_args{open_revs=Revs} = doc_parse_query(Parts#uri_parts.querystr), case Revs of @@ -232,6 +242,8 @@ do(#mod{method=Method}=Mod, #uri_parts{attachment=Att}=Parts) when Att /= "", Method == "GET" orelse Method == "HEAD" -> send_attachment(Mod, Parts); +do(#mod{method="GET"}=Mod, #uri_parts{stew=Stew}=Parts) when Stew /= "" -> + send_stew(Mod, Parts); do(#mod{method="GET"}=Mod, #uri_parts{view=View}=Parts) when View /= "" -> send_view(Mod, Parts). @@ -804,6 +816,30 @@ throw(Error) end. +send_stew(Mod, #uri_parts{doc=DocId, stew=ViewId, querystr=QueryStr}=Parts) -> + Db = open_db(Parts), + QueryArgs = view_parse_query(QueryStr), + #query_args{ + start_key=StartKey, + count=Count, + skip=SkipCount, + update=Update, + direction=Dir, + start_docid=StartDocId} = QueryArgs, + case Update of + true -> + case couch_db:update_stew_group_sync(Db, DocId) of + ok -> ok; + Error -> throw(Error) + end; + false -> + ok + end, + FoldlFun = make_view_fold_fun(Mod, QueryArgs), + FoldResult = couch_db:fold_stew(Db, DocId, ViewId, StartKey, + Dir, FoldlFun, {Count, SkipCount, header_not_sent, []}), + finish_view_fold(Mod, FoldResult). + send_view(Mod, #uri_parts{doc=DocId, view=ViewId, querystr=QueryStr}=Parts) -> Db = open_db(Parts), QueryArgs = view_parse_query(QueryStr), === modified file 'src/Makefile.am' --- src/Makefile.am 2007-11-10 07:10:31 +0000 +++ src/Makefile.am 2007-12-21 20:45:00 +0000 @@ -33,6 +33,7 @@ CouchDB/couch_rep.erl \ CouchDB/couch_server.erl \ CouchDB/couch_server_sup.erl \ + CouchDB/couch_stew_group.erl \ CouchDB/couch_stream.erl \ CouchDB/couch_util.erl \ CouchDB/couch_view_group.erl \ @@ -54,6 +55,7 @@ CouchDB/couch_rep.beam \ CouchDB/couch_server.beam \ CouchDB/couch_server_sup.beam \ + CouchDB/couch_stew_group.beam \ CouchDB/couch_stream.beam \ CouchDB/couch_util.beam \ CouchDB/couch_view_group.beam \