/*! meta-client/modules/client */
/*jslint
browser, long
*/
/*global
Blob, FileReader, WebSocket, Worker
*/
/**
* client module.
*
* @module meta-client/modules/client
*/
import _ from "underscore";
import crypto from "util-web/modules/crypto";
import ext from "util-web/modules/ext";
import ko from "knockout";
import Logger from "js-logger";
import protocol from "meta-core/modules/protocol";
const LOG = Logger.get("meta-client/client");
const CONNECTION_TIMEOUT = 1000 * 3;
const DOWNLOAD_IN_PROGRESS_ERROR = protocol.errorReplyFactory({code: 0, templateString: "download in progress", type: protocol.ERROR_TYPE.SYSTEM});
const UPLOAD_DUPLICATE_ERROR = protocol.errorReplyFactory({code: 0, templateString: "duplicate file for hash: <%= hash %>", type: protocol.ERROR_TYPE.VALIDATION});
const INVALID_HASH_ERROR = protocol.errorReplyFactory({code: 0, templateString: "invalid hash for <%= key %>: <%= hash %>, expected: <%= expectedHash %>", type: protocol.ERROR_TYPE.VALIDATION});
const INVALID_REQUEST_ERROR = protocol.errorReplyFactory({code: 0, templateString: "invalid request", type: protocol.ERROR_TYPE.VALIDATION});
const NO_CONNECTION_ERROR = protocol.errorReplyFactory({code: 0, templateString: "no connection", type: protocol.ERROR_TYPE.SYSTEM});
const PING = protocol.pingRequest();
const PING_INTERVAL = 1000 * 5;
const RECONNECT_INTERVAL = 500;
const RECONNECT_INTERVAL_FACTOR = 1.5;
const RECONNECT_INTERVAL_MAX = 1000 * 8;
const SEND_LOG_CTX = "meta-client/client/send";
const SEND_LOG = Logger.get(SEND_LOG_CTX);
const UPLOAD_BUFFER_SIZE = 1024 * 4; // org.eclipse.jetty.websocket.core.WebSocketConstants.DEFAULT_OUTPUT_BUFFER_SIZE
const WORKER_PROTOCOL = Object.freeze({
REPLIES: {
DOWNLOAD_DONE: "DD",
DOWNLOAD_FAILED: "DF",
DOWNLOAD_PROGRESS: "DP",
UPLOAD_PROGRESS: "UO",
WEBSOCKET_CLOSE: "WC",
WEBSOCKET_ERROR: "WE",
WEBSOCKET_OPEN: "WO",
WEBSOCKET_TEXT: "WT"
},
REQUESTS: {
CONNECT: "CN",
DISCONNECT: "DC",
DOWNLOAD: "DL",
SEND: "SD",
UPLOAD: "UP"
}
});
const forgeEngineUrl = (engine) => `https://${engine}/`;
const forgeEngineRestUrl = (engine) => `${forgeEngineUrl(engine)}${protocol.REST_PATH}/`;
const forgeEngineWebSocketUrl = (engine) => `wss://${engine}/${protocol.SOCKET_PATH}/`;
const clientFactory = function (engine) {
const client = {};
let activeDownloadProgress;
let activeDownloadReject;
let activeDownloadResolve;
let activeUploadProgress;
const checkEventType = function (request, expectedType) {
const type = _.get(request, protocol.REQUEST_TYPE_PROPERTY);
if (expectedType !== type) {
throw INVALID_REQUEST_ERROR.newErrorReply();
}
};
const clientWorker = new Worker("client-worker.js");
let connectionTimeoutHandle;
let connectPromiseReject;
let connectPromiseResolve;
let deliberateDisconnect = false;
let disconnectResolve;
const isConnecting = () => connectPromiseResolve || connectPromiseReject;
const newFileProgress = function () {
const progress = {};
progress.fileKey = ko.observable(0);
progress.size = ko.observable(0);
progress.sizeDone = ko.observable(0);
progress.noOfChunks = ko.observable(0);
progress.noOfDoneChunks = ko.observable(0);
return progress;
};
const openRequests = new Map();
let pingTimeoutHandle;
const ping = function () {
if (client.isConnected() === false) {
return;
}
pingTimeoutHandle = setTimeout(function () {
const timestamp = Date.now();
const pingPromise = client.postRequest(PING);
const timeoutPromise = new Promise(function (ignore, reject) {
setTimeout(reject, PING_INTERVAL);
});
Promise.race([pingPromise, timeoutPromise]).then(function () {
const latency = Date.now() - timestamp;
LOG.debug(`ping received, latency=${latency}`);
ping();
}, function () {
LOG.warn("ping timeout");
client.isConnected(false);
});
}, PING_INTERVAL);
};
const postWorkerRequest = function ({data, transferables, type}) {
clientWorker.postMessage({data, type}, transferables);
};
let reconnectTimeoutHandle;
const sendEvent = function (metaEvent) {
if (Logger.DEBUG === SEND_LOG.getLevel()) {
const eventType = metaEvent[protocol.EVENT_TYPE_PROPERTY];
if (protocol.LOGGING_MESSAGE !== eventType) {
SEND_LOG.debug(`sendEvent, eventType=${eventType}`, metaEvent);
}
}
if (_.isObject(metaEvent) && client.isConnected()) {
postWorkerRequest({data: JSON.stringify(metaEvent), type: WORKER_PROTOCOL.REQUESTS.SEND});
}
};
const tryReconnect = function (interval = RECONNECT_INTERVAL) {
if (client.isConnected() || deliberateDisconnect || client.isReconnecting()) {
return;
}
client.isReconnecting(true);
const jitter = crypto.getRandomInt(0, Math.round(interval / 0.5));
reconnectTimeoutHandle = setTimeout(function () {
LOG.debug(`tryReconnect, interval=${interval}, jitter=${jitter}`);
client.connect().then(function () {
LOG.debug("tryReconnect done");
client.isReconnecting(false);
}, function (exc) {
LOG.debug("tryReconnect failed", exc);
client.isReconnecting(false);
const nextInterval = Math.min(Math.round(interval * RECONNECT_INTERVAL_FACTOR), RECONNECT_INTERVAL_MAX + jitter);
tryReconnect(nextInterval);
});
}, interval + jitter);
};
const subscriptions = new Map();
client.isConnected = ko.observable(false);
client.isAuth = ko.observable(false);
client.clientId = ko.observable();
client.user = ko.observable();
client.groups = ko.observableArray();
client.isMemberOf = (group) => client.groups().includes(group);
client.configs = ko.observableArray();
client.client = ko.observable();
client.version = ko.observable();
client.hasOpenRequests = ko.observable(false);
client.isReconnecting = ko.observable(false);
client.closeEvent = ko.observable();
client.forgeEngineUrl = () => forgeEngineUrl(engine);
client.forgeEngineRestUrl = () => forgeEngineRestUrl(engine);
client.forgeEngineWebSocketUrl = () => forgeEngineWebSocketUrl(engine);
client.connect = function () {
if (client.isConnected()) {
return Promise.resolve(undefined);
}
if (isConnecting()) {
return Promise.reject(new Error("connect in progress"));
}
const webSocketUrl = client.forgeEngineWebSocketUrl();
LOG.info(`connect, webSocketUrl=${webSocketUrl}`);
return new Promise(function (resolve, reject) {
disconnectResolve = undefined;
deliberateDisconnect = false;
connectPromiseResolve = resolve;
connectPromiseReject = reject;
postWorkerRequest({data: webSocketUrl, type: WORKER_PROTOCOL.REQUESTS.CONNECT});
if (client.isReconnecting() === false) {
connectionTimeoutHandle = setTimeout(function () {
if (client.isReconnecting() || client.isConnected()) {
return;
}
LOG.warn("connect timeout reached");
postWorkerRequest({type: WORKER_PROTOCOL.REQUESTS.DISCONNECT});
}, CONNECTION_TIMEOUT);
}
});
};
client.isConnected.subscribe(function (connected) {
LOG.debug(`isConnected, connected=${connected}`);
clearTimeout(connectionTimeoutHandle);
clearTimeout(pingTimeoutHandle);
if (connected) {
ping();
} else {
openRequests.forEach(function (promise) {
try {
promise.reject(NO_CONNECTION_ERROR.newErrorReply());
} catch (ignore) {}
});
openRequests.clear();
client.isAuth(false);
client.clientId(undefined);
if (protocol.KICK_CLOSE_STATUS_CODE !== _.get(client.closeEvent(), "statusCode", 0)) {
tryReconnect();
}
}
});
client.isAuth.subscribe(function (auth) {
LOG.debug(`isAuth=${auth}`);
if (auth === false) {
client.configs([]);
client.groups([]);
client.user(undefined);
client.client(undefined);
client.version(undefined);
}
});
client.auth = function (request) {
checkEventType(request, protocol.AUTH_REQUEST);
return client.postRequest(request).then(function (reply) {
client.configs(reply.configs);
client.groups(reply.groups);
client.isAuth(true);
return reply;
});
};
client.authToken = function (request) {
checkEventType(request, protocol.AUTH_TOKEN_REQUEST);
return client.postRequest(request);
};
client.backup = function (request) {
checkEventType(request, protocol.BACKUP_REQUEST);
return client.postRequest(request);
};
client.backupRestore = function (request) {
checkEventType(request, protocol.BACKUP_RESTORE_REQUEST);
return client.postRequest(request);
};
client.configSearch = function (request) {
checkEventType(request, protocol.USER_CONFIG_SEARCH_REQUEST);
return client.postRequest(request);
};
client.configWrite = function (request) {
checkEventType(request, protocol.USER_CONFIG_WRITE_REQUEST);
return client.postRequest(request);
};
client.disconnect = function () {
deliberateDisconnect = true;
clearTimeout(reconnectTimeoutHandle);
if (client.isConnected() === false) {
return Promise.resolve(undefined);
}
LOG.info(`disconnect, openRequestsSize=${openRequests.size}`);
return new Promise(function (resolve) {
disconnectResolve = resolve;
postWorkerRequest({data: 1000, type: WORKER_PROTOCOL.REQUESTS.DISCONNECT});
});
};
client.download = function (request) {
checkEventType(request, protocol.DOWNLOAD_REQUEST);
if (activeDownloadProgress) {
return Promise.reject(DOWNLOAD_IN_PROGRESS_ERROR.newErrorReply());
}
return client.postRequest(request).then(function (downloadReply) {
const activeDownloadSize = downloadReply.file.size;
const downloadProgress = newFileProgress();
downloadProgress.fileKey(request.key);
downloadProgress.size(ext.formatBytes(activeDownloadSize));
activeDownloadProgress = downloadProgress;
downloadReply.progress = downloadProgress;
downloadReply.donePromise = new Promise(function (resolve, reject) {
activeDownloadResolve = resolve;
activeDownloadReject = reject;
postWorkerRequest({data: activeDownloadSize, type: WORKER_PROTOCOL.REQUESTS.DOWNLOAD});
});
return downloadReply;
});
};
client.downloadAwait = function (request) {
checkEventType(request, protocol.DOWNLOAD_REQUEST);
let downloadReply;
return client.download(request).then(function (reply) {
downloadReply = reply;
return downloadReply.donePromise;
}).then(function (blob) {
return blob.arrayBuffer();
}).then(function (buffer) {
return Promise.all([crypto.hashBuffer(buffer), Promise.resolve(buffer)]);
}).then(function (result) {
const hash = result[0];
const buffer = result[1];
const expectedHahsh = downloadReply.file.hash;
if (hash !== expectedHahsh) {
throw INVALID_HASH_ERROR.newErrorReply(null, {expectedHahsh, hash, key: request.key});
}
downloadReply.buffer = buffer;
return downloadReply;
});
};
client.execute = function (request) {
checkEventType(request, protocol.EXECUTE_REQUEST);
return client.postRequest(request);
};
client.fileDelete = function (request) {
checkEventType(request, protocol.FILE_DELETE_REQUEST);
return client.postRequest(request);
};
client.fileSearch = function (request) {
checkEventType(request, protocol.FILE_SEARCH_REQUEST);
return client.postRequest(request);
};
client.getClientList = function (request) {
checkEventType(request, protocol.CLIENT_LIST_REQUEST);
return client.postRequest(request);
};
client.groupSearch = function (request) {
checkEventType(request, protocol.USER_GROUP_SEARCH_REQUEST);
return client.postRequest(request);
};
client.groupWrite = function (request) {
checkEventType(request, protocol.USER_GROUP_WRITE_REQUEST);
return client.postRequest(request);
};
client.groupWriteUpdate = function (request) {
checkEventType(request, protocol.USER_GROUP_WRITE_UPDATE_REQUEST);
return client.postRequest(request);
};
client.log = (loggingMessage) => sendEvent(loggingMessage);
client.loggingDelete = function (request) {
checkEventType(request, protocol.LOGGING_DELETE_REQUEST);
return client.postRequest(request);
};
client.loggingEntrySearch = function (request) {
checkEventType(request, protocol.LOGGING_ENTRY_SEARCH_REQUEST);
return client.postRequest(request);
};
client.loggingMetricSearch = function (request) {
checkEventType(request, protocol.LOGGING_METRIC_SEARCH_REQUEST);
return client.postRequest(request);
};
client.loggingMetricTime = function (request) {
checkEventType(request, protocol.LOGGING_METRIC_TIME_REQUEST);
return client.postRequest(request);
};
client.login = function (request) {
checkEventType(request, protocol.LOGIN_REQUEST);
return client.postRequest(request).then(function (reply) {
client.clientId(reply.clientId);
client.user(reply.user);
client.client(reply.client);
client.version(reply.version);
LOG.info(`onLogin, clientId=${client.clientId()}, userAgent=${navigator.userAgent}`);
if (reply.isAuth) {
client.configs(reply.configs);
client.groups(reply.groups);
client.isAuth(reply.isAuth);
}
return reply;
});
};
client.logout = function (request) {
checkEventType(request, protocol.LOGOUT_REQUEST);
return client.postRequest(request).then(function (reply) {
client.isAuth(false);
return reply;
});
};
client.postClientNotification = (clientNotification) => sendEvent(clientNotification);
client.recoverUser = function (request) {
checkEventType(request, protocol.RECOVER_USER_REQUEST);
return client.postRequest(request);
};
client.resetPassword = function (request) {
checkEventType(request, protocol.RESET_PASSWORD_REQUEST);
return client.postRequest(request);
};
client.search = function (request) {
checkEventType(request, protocol.SEARCH_REQUEST);
return client.postRequest(request);
};
client.getServiceInfo = function (request) {
checkEventType(request, protocol.SERVICE_INFO_REQUEST);
return client.postRequest(request);
};
client.subscribe = function (request) {
checkEventType(request, protocol.SUBSCRIPTION_REQUEST);
return client.postRequest(request);
};
client.upload = function (request) {
checkEventType(request, protocol.UPLOAD_REQUEST);
return client.postRequest(request).then(function (reply) {
const fileKey = reply.key;
const uploadProgress = newFileProgress();
activeUploadProgress = uploadProgress;
reply.progress = uploadProgress;
reply.donePromise = new Promise(function (resolve, reject) {
const subscription = {};
let eventHandle;
subscription[protocol.CLIENT_NOTIFICATION] = function (clientNotification) {
if (fileKey === _.get(clientNotification, ["data", "key"])) {
client.unregisterOnEvent(eventHandle);
switch (clientNotification.action) {
case protocol.UPLOAD_DONE_NOTIFICATION_ACTION:
resolve();
break;
case protocol.UPLOAD_FAIL_NOTIFICATION_ACTION:
reject();
break;
}
}
};
eventHandle = client.registerOnEvent(subscription);
});
reply.sendFile = function (buffer) {
LOG.debug(`sendFile, fileKey=${fileKey}`);
uploadProgress.fileKey(fileKey);
uploadProgress.size(ext.formatBytes(buffer.byteLength));
postWorkerRequest({
data: {buffer, bufferSize: UPLOAD_BUFFER_SIZE},
transferables: [buffer],
type: WORKER_PROTOCOL.REQUESTS.UPLOAD
});
return Promise.resolve(undefined);
};
return reply;
});
};
client.uploadAwait = function (request, buffer) {
checkEventType(request, protocol.UPLOAD_REQUEST);
const size = buffer.byteLength;
LOG.debug(`uploadAwait, size=${size}`);
let fileHash;
let uploadReply;
return crypto.hashBuffer(buffer).then(function (hash) {
fileHash = hash;
return client.upload(protocol.uploadRequest(Object.assign({}, request, {hash: fileHash, size})));
}).then(function (reply) {
if (reply.duplicate) {
return Promise.reject(UPLOAD_DUPLICATE_ERROR.newErrorReply(null, {hash: fileHash}));
}
uploadReply = reply;
return Promise.all([
uploadReply.donePromise,
uploadReply.sendFile(buffer)
]);
}).then(function () {
LOG.debug("uploadAwait done");
return uploadReply;
});
};
client.userSearch = function (request) {
checkEventType(request, protocol.USER_SEARCH_REQUEST);
return client.postRequest(request);
};
client.userWrite = function (request) {
checkEventType(request, protocol.USER_WRITE_REQUEST);
return client.postRequest(request);
};
client.userWriteUpdate = function (request) {
checkEventType(request, protocol.USER_WRITE_UPDATE_REQUEST);
return client.postRequest(request);
};
client.write = function (request) {
checkEventType(request, protocol.WRITE_REQUEST);
return client.postRequest(request);
};
client.writeObjectUpdate = function (request) {
checkEventType(request, protocol.WRITE_OBJECT_UPDATE_REQUEST);
return client.postRequest(request);
};
client.postRequest = function (request) {
if (client.isConnected() === false) {
return Promise.reject(NO_CONNECTION_ERROR.newErrorReply());
}
return new Promise(function (resolve, reject) {
const id = crypto.newUuid().substring(0, 8);
const requestMessage = protocol.requestMessage({id, request});
openRequests.set(requestMessage.id, {reject, resolve});
client.hasOpenRequests(true);
sendEvent(requestMessage);
});
};
client.registerOnEvent = ext.registerOnEvent.bind(undefined, subscriptions);
client.unregisterOnEvent = ext.unregister.bind(undefined, subscriptions);
client.onEvent = function (event) {
const eventType = event[protocol.EVENT_TYPE_PROPERTY];
const id = _.get(event, "id");
if (protocol.REPLY_MESSAGE === eventType && _.isEmpty(id) === false) {
const promise = openRequests.get(id);
if (promise) {
openRequests.delete(id);
client.hasOpenRequests(0 < openRequests.size);
if (protocol.ERROR_REPLY === event.reply[protocol.REPLY_TYPE_PROPERTY]) {
promise.reject(event.reply);
} else {
promise.resolve(event.reply);
}
} else {
LOG.warn(`onEvent openRequests missing, id=${id}`);
}
}
subscriptions.forEach(function notify(subscription) {
const eventHandler = _.get(subscription, eventType);
if (_.isFunction(eventHandler)) {
eventHandler(event);
}
});
};
clientWorker.addEventListener("message", function (event) {
ko.tasks.schedule(function () {
const message = event.data;
const type = message.type;
let metaEvent;
switch (type) {
case WORKER_PROTOCOL.REPLIES.WEBSOCKET_OPEN:
client.isConnected(true);
if (connectPromiseResolve) {
connectPromiseResolve();
connectPromiseResolve = undefined;
connectPromiseReject = undefined;
}
break;
case WORKER_PROTOCOL.REPLIES.WEBSOCKET_CLOSE:
LOG.debug(`message close, code=${message.data.code}, reason=${message.data.reason}`, message);
client.closeEvent(message.data);
if (disconnectResolve) {
disconnectResolve(message.data.code);
disconnectResolve = undefined;
}
client.isConnected(false);
break;
case WORKER_PROTOCOL.REPLIES.WEBSOCKET_ERROR:
LOG.warn(`message error, code=${message.data.code}, reason=${message.data.reason}`, message);
if (connectPromiseReject) {
connectPromiseReject(message.data);
connectPromiseReject = undefined;
connectPromiseResolve = undefined;
} else {
client.isConnected(false);
}
break;
case WORKER_PROTOCOL.REPLIES.WEBSOCKET_TEXT:
try {
metaEvent = JSON.parse(message.data);
LOG.debug("message text", metaEvent);
client.onEvent(metaEvent);
} catch (exc) {
LOG.warn("message text failed", exc, message);
}
break;
case WORKER_PROTOCOL.REPLIES.UPLOAD_PROGRESS:
if (activeUploadProgress) {
if (_.has(message.data, "sizeDone")) {
activeUploadProgress.sizeDone(ext.formatBytes(message.data.sizeDone));
}
if (_.has(message.data, "chunks")) {
activeUploadProgress.noOfChunks(message.data.chunks);
}
if (_.has(message.data, "chunksDone")) {
activeUploadProgress.noOfDoneChunks(message.data.chunksDone);
}
}
break;
case WORKER_PROTOCOL.REPLIES.DOWNLOAD_PROGRESS:
if (activeDownloadProgress) {
activeDownloadProgress.sizeDone(ext.formatBytes(message.data.size));
activeDownloadProgress.noOfDoneChunks(message.data.chunks);
}
break;
case WORKER_PROTOCOL.REPLIES.DOWNLOAD_DONE:
if (activeDownloadResolve) {
activeDownloadResolve(new Blob(message.data));
activeDownloadResolve = undefined;
activeDownloadProgress = undefined;
}
break;
case WORKER_PROTOCOL.REPLIES.DOWNLOAD_FAILED:
if (activeDownloadReject) {
activeDownloadReject();
activeDownloadReject = undefined;
activeDownloadProgress = undefined;
}
break;
}
});
});
clientWorker.postMessage({
data: JSON.stringify(WORKER_PROTOCOL)
});
return Object.freeze(client);
};
clientFactory.SEND_LOG_CTX = SEND_LOG_CTX;
clientFactory.forgeEngineWebSocketUrl = forgeEngineWebSocketUrl;
clientFactory.forgeEngineRestUrl = forgeEngineRestUrl;
clientFactory.forgeEngineUrl = forgeEngineUrl;
export default Object.freeze(clientFactory);