/*
* This file is part of PKM (Persistent Knowledge Monitor).
* Copyright (c) 2020 Capgemini Group, Commissariat à l'énergie atomique et aux énergies alternatives,
* OW2, Sysgo AG, Technikon, Tree Technology, Universitat Politècnica de València.
*
* PKM is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License version 3 as published by
* the Free Software Foundation.
*
* PKM is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with PKM. If not, see <https://www.gnu.org/licenses/>.
*/
/**
* Insert/Update some documents in a generic manner into several collections of a database
*
* @memberof PKM
* @instance
* @param {string} dbName - database name
* @param {Array.<string>} collection_names - collection names
* @param {Array.<Object>} documents - documents
* @param {boolean} update - flag to enable/disable replacing documents when signature is specified, otherwise ignored
* @param {Function} dispatch - a function (document) => collection_name to dispatch documents to collections that shall throw when dispatching is not possible
* @param {Object} signature - signature (properties in the signature are the document keys).
*
* @return {Promise} a promise
*/
function insert_update_documents_spanned(dbName, collection_names, documents, update, dispatch, signature)
{
return new Promise(function(resolve, reject)
{
const debug = this.debug;
// check that every documents can be dispatched
let document_to_collection_map;
try
{
document_to_collection_map = new Map(documents.map((document) => [ document, dispatch(document) ]));
}
catch(err)
{
reject(this.BadRequest(err));
return;
}
let find_field = function(document, path, callback)
{
if(path.length == 0)
{
return (typeof callback === 'function') ? callback(document) : true;
}
if(typeof document === 'object')
{
if(Array.isArray(document)) return false;
let delim_pos = path.indexOf('.');
const key = (delim_pos >= 0) ? path.slice(0, delim_pos) : path;
if(document.hasOwnProperty(key))
{
return (delim_pos >= 0) ? find_field(document[key], path.slice(delim_pos + 1), callback)
: find_field(document[key], '', callback);
}
}
return false;
};
let get_query = function(signature, document)
{
let query = {}
Object.keys(signature).forEach((path) =>
{
query[path] = find_field(document, path, (value) => value);
});
return query;
};
// Check input documents
try
{
documents.forEach((document) =>
{
Object.keys(signature).forEach((path) =>
{
if(!find_field(document, path))
{
throw this.BadRequest('Document has no ' + path + ' field');
}
});
});
}
catch(err)
{
reject(this.Error(err));
return;
}
let check_for_absence = function(documents)
{
return new Promise((resolve, reject) =>
{
let checking_promises = [];
documents.forEach((document) =>
{
let query = get_query(signature, document);
checking_promises.push(new Promise((resolve, reject) =>
{
this.count_documents_spanned(dbName, collection_names, query, { limit : 1 }).then((count) =>
{
if(count >= 1)
{
reject(this.Conflict('Document which key is ' + JSON.stringify(query) + ' already exists in collections ' + collection_names.join(', ')));
}
else
{
resolve();
}
}).catch((err) =>
{
reject(this.Error(err));
});
}));
});
Promise.all(checking_promises).then(() =>
{
resolve();
}).catch((err) =>
{
reject(this.Error(err));
});
});
}.bind(this);
(update ? Promise.resolve() : check_for_absence(documents)).then(() =>
{
// do insert/update each documents in their respective collections
let insert_update_documents_promises = documents.map((document) => this.insert_update_documents(dbName, document_to_collection_map.get(document), [ document ], update, { signature : signature }));
Promise.all(insert_update_documents_promises).then(() =>
{
resolve();
}).catch((err) =>
{
reject(this.Error(err));
});
}).catch((err) =>
{
reject(this.Error(err));
});
}.bind(this));
}
module.exports.insert_update_documents_spanned = insert_update_documents_spanned;