Source: client.js

/*! meta-client/modules/client */
/*jslint
    browser, long
*/
/*global
    Blob, FileReader, WebSocket, Worker
*/
/**
 * client module.
 *
 * @module meta-client/modules/client
 */
import _ from "underscore";
import crypto from "util-web/modules/crypto";
import ext from "util-web/modules/ext";
import ko from "knockout";
import Logger from "js-logger";
import protocol from "meta-core/modules/protocol";

const LOG = Logger.get("meta-client/client");
const CONNECTION_TIMEOUT = 1000 * 3;
const DOWNLOAD_IN_PROGRESS_ERROR = protocol.errorReplyFactory({code: 0, templateString: "download in progress", type: protocol.ERROR_TYPE.SYSTEM});
const UPLOAD_DUPLICATE_ERROR = protocol.errorReplyFactory({code: 0, templateString: "duplicate file for hash: <%= hash %>", type: protocol.ERROR_TYPE.VALIDATION});
const INVALID_HASH_ERROR = protocol.errorReplyFactory({code: 0, templateString: "invalid hash for <%= key %>: <%= hash %>, expected: <%= expectedHash %>", type: protocol.ERROR_TYPE.VALIDATION});
const INVALID_REQUEST_ERROR = protocol.errorReplyFactory({code: 0, templateString: "invalid request", type: protocol.ERROR_TYPE.VALIDATION});
const NO_CONNECTION_ERROR = protocol.errorReplyFactory({code: 0, templateString: "no connection", type: protocol.ERROR_TYPE.SYSTEM});
const PING = protocol.pingRequest();
const PING_INTERVAL = 1000 * 5;
const RECONNECT_INTERVAL = 500;
const RECONNECT_INTERVAL_FACTOR = 1.5;
const RECONNECT_INTERVAL_MAX = 1000 * 8;
const SEND_LOG_CTX = "meta-client/client/send";
const SEND_LOG = Logger.get(SEND_LOG_CTX);
const UPLOAD_BUFFER_SIZE = 1024 * 4; // org.eclipse.jetty.websocket.core.WebSocketConstants.DEFAULT_OUTPUT_BUFFER_SIZE
const WORKER_PROTOCOL = Object.freeze({
    REPLIES: {
        DOWNLOAD_DONE: "DD",
        DOWNLOAD_FAILED: "DF",
        DOWNLOAD_PROGRESS: "DP",
        UPLOAD_PROGRESS: "UO",
        WEBSOCKET_CLOSE: "WC",
        WEBSOCKET_ERROR: "WE",
        WEBSOCKET_OPEN: "WO",
        WEBSOCKET_TEXT: "WT"
    },
    REQUESTS: {
        CONNECT: "CN",
        DISCONNECT: "DC",
        DOWNLOAD: "DL",
        SEND: "SD",
        UPLOAD: "UP"
    }
});
const forgeEngineUrl = (engine) => `https://${engine}/`;
const forgeEngineRestUrl = (engine) => `${forgeEngineUrl(engine)}${protocol.REST_PATH}/`;
const forgeEngineWebSocketUrl = (engine) => `wss://${engine}/${protocol.SOCKET_PATH}/`;
const clientFactory = function (engine) {
    const client = {};
    let activeDownloadProgress;
    let activeDownloadReject;
    let activeDownloadResolve;
    let activeUploadProgress;
    const checkEventType = function (request, expectedType) {
        const type = _.get(request, protocol.REQUEST_TYPE_PROPERTY);
        if (expectedType !== type) {
            throw INVALID_REQUEST_ERROR.newErrorReply();
        }
    };
    const clientWorker = new Worker("client-worker.js");
    let connectionTimeoutHandle;
    let connectPromiseReject;
    let connectPromiseResolve;
    let deliberateDisconnect = false;
    let disconnectResolve;
    const isConnecting = () => connectPromiseResolve || connectPromiseReject;
    const newFileProgress = function () {
        const progress = {};
        progress.fileKey = ko.observable(0);
        progress.size = ko.observable(0);
        progress.sizeDone = ko.observable(0);
        progress.noOfChunks = ko.observable(0);
        progress.noOfDoneChunks = ko.observable(0);
        return progress;
    };
    const openRequests = new Map();
    let pingTimeoutHandle;
    const ping = function () {
        if (client.isConnected() === false) {
            return;
        }
        pingTimeoutHandle = setTimeout(function () {
            const timestamp = Date.now();
            const pingPromise = client.postRequest(PING);
            const timeoutPromise = new Promise(function (ignore, reject) {
                setTimeout(reject, PING_INTERVAL);
            });
            Promise.race([pingPromise, timeoutPromise]).then(function () {
                const latency = Date.now() - timestamp;
                LOG.debug(`ping received, latency=${latency}`);
                ping();
            }, function () {
                LOG.warn("ping timeout");
                client.isConnected(false);
            });
        }, PING_INTERVAL);
    };
    const postWorkerRequest = function ({data, transferables, type}) {
        clientWorker.postMessage({data, type}, transferables);
    };
    let reconnectTimeoutHandle;
    const sendEvent = function (metaEvent) {
        if (Logger.DEBUG === SEND_LOG.getLevel()) {
            const eventType = metaEvent[protocol.EVENT_TYPE_PROPERTY];
            if (protocol.LOGGING_MESSAGE !== eventType) {
                SEND_LOG.debug(`sendEvent, eventType=${eventType}`, metaEvent);
            }
        }
        if (_.isObject(metaEvent) && client.isConnected()) {
            postWorkerRequest({data: JSON.stringify(metaEvent), type: WORKER_PROTOCOL.REQUESTS.SEND});
        }
    };
    const tryReconnect = function (interval = RECONNECT_INTERVAL) {
        if (client.isConnected() || deliberateDisconnect || client.isReconnecting()) {
            return;
        }
        client.isReconnecting(true);
        const jitter = crypto.getRandomInt(0, Math.round(interval / 0.5));
        reconnectTimeoutHandle = setTimeout(function () {
            LOG.debug(`tryReconnect, interval=${interval}, jitter=${jitter}`);
            client.connect().then(function () {
                LOG.debug("tryReconnect done");
                client.isReconnecting(false);
            }, function (exc) {
                LOG.debug("tryReconnect failed", exc);
                client.isReconnecting(false);
                const nextInterval = Math.min(Math.round(interval * RECONNECT_INTERVAL_FACTOR), RECONNECT_INTERVAL_MAX + jitter);
                tryReconnect(nextInterval);
            });
        }, interval + jitter);
    };
    const subscriptions = new Map();

    client.isConnected = ko.observable(false);
    client.isAuth = ko.observable(false);
    client.clientId = ko.observable();
    client.user = ko.observable();
    client.groups = ko.observableArray();
    client.isMemberOf = (group) => client.groups().includes(group);
    client.configs = ko.observableArray();
    client.client = ko.observable();
    client.version = ko.observable();
    client.hasOpenRequests = ko.observable(false);
    client.isReconnecting = ko.observable(false);
    client.closeEvent = ko.observable();

    client.forgeEngineUrl = () => forgeEngineUrl(engine);
    client.forgeEngineRestUrl = () => forgeEngineRestUrl(engine);
    client.forgeEngineWebSocketUrl = () => forgeEngineWebSocketUrl(engine);

    client.connect = function () {
        if (client.isConnected()) {
            return Promise.resolve(undefined);
        }
        if (isConnecting()) {
            return Promise.reject(new Error("connect in progress"));
        }
        const webSocketUrl = client.forgeEngineWebSocketUrl();
        LOG.info(`connect, webSocketUrl=${webSocketUrl}`);
        return new Promise(function (resolve, reject) {
            disconnectResolve = undefined;
            deliberateDisconnect = false;
            connectPromiseResolve = resolve;
            connectPromiseReject = reject;
            postWorkerRequest({data: webSocketUrl, type: WORKER_PROTOCOL.REQUESTS.CONNECT});
            if (client.isReconnecting() === false) {
                connectionTimeoutHandle = setTimeout(function () {
                    if (client.isReconnecting() || client.isConnected()) {
                        return;
                    }
                    LOG.warn("connect timeout reached");
                    postWorkerRequest({type: WORKER_PROTOCOL.REQUESTS.DISCONNECT});
                }, CONNECTION_TIMEOUT);
            }
        });
    };

    client.isConnected.subscribe(function (connected) {
        LOG.debug(`isConnected, connected=${connected}`);
        clearTimeout(connectionTimeoutHandle);
        clearTimeout(pingTimeoutHandle);
        if (connected) {
            ping();
        } else {
            openRequests.forEach(function (promise) {
                try {
                    promise.reject(NO_CONNECTION_ERROR.newErrorReply());
                } catch (ignore) {}
            });
            openRequests.clear();
            client.isAuth(false);
            client.clientId(undefined);
            if (protocol.KICK_CLOSE_STATUS_CODE !== _.get(client.closeEvent(), "statusCode", 0)) {
                tryReconnect();
            }
        }
    });

    client.isAuth.subscribe(function (auth) {
        LOG.debug(`isAuth=${auth}`);
        if (auth === false) {
            client.configs([]);
            client.groups([]);
            client.user(undefined);
            client.client(undefined);
            client.version(undefined);
        }
    });

    client.auth = function (request) {
        checkEventType(request, protocol.AUTH_REQUEST);
        return client.postRequest(request).then(function (reply) {
            client.configs(reply.configs);
            client.groups(reply.groups);
            client.isAuth(true);
            return reply;
        });
    };

    client.authToken = function (request) {
        checkEventType(request, protocol.AUTH_TOKEN_REQUEST);
        return client.postRequest(request);
    };

    client.backup = function (request) {
        checkEventType(request, protocol.BACKUP_REQUEST);
        return client.postRequest(request);
    };

    client.backupRestore = function (request) {
        checkEventType(request, protocol.BACKUP_RESTORE_REQUEST);
        return client.postRequest(request);
    };

    client.configSearch = function (request) {
        checkEventType(request, protocol.USER_CONFIG_SEARCH_REQUEST);
        return client.postRequest(request);
    };

    client.configWrite = function (request) {
        checkEventType(request, protocol.USER_CONFIG_WRITE_REQUEST);
        return client.postRequest(request);
    };

    client.disconnect = function () {
        deliberateDisconnect = true;
        clearTimeout(reconnectTimeoutHandle);
        if (client.isConnected() === false) {
            return Promise.resolve(undefined);
        }
        LOG.info(`disconnect, openRequestsSize=${openRequests.size}`);
        return new Promise(function (resolve) {
            disconnectResolve = resolve;
            postWorkerRequest({data: 1000, type: WORKER_PROTOCOL.REQUESTS.DISCONNECT});
        });
    };

    client.download = function (request) {
        checkEventType(request, protocol.DOWNLOAD_REQUEST);
        if (activeDownloadProgress) {
            return Promise.reject(DOWNLOAD_IN_PROGRESS_ERROR.newErrorReply());
        }
        return client.postRequest(request).then(function (downloadReply) {
            const activeDownloadSize = downloadReply.file.size;
            const downloadProgress = newFileProgress();
            downloadProgress.fileKey(request.key);
            downloadProgress.size(ext.formatBytes(activeDownloadSize));
            activeDownloadProgress = downloadProgress;
            downloadReply.progress = downloadProgress;
            downloadReply.donePromise = new Promise(function (resolve, reject) {
                activeDownloadResolve = resolve;
                activeDownloadReject = reject;
                postWorkerRequest({data: activeDownloadSize, type: WORKER_PROTOCOL.REQUESTS.DOWNLOAD});
            });
            return downloadReply;
        });
    };

    client.downloadAwait = function (request) {
        checkEventType(request, protocol.DOWNLOAD_REQUEST);
        let downloadReply;
        return client.download(request).then(function (reply) {
            downloadReply = reply;
            return downloadReply.donePromise;
        }).then(function (blob) {
            return blob.arrayBuffer();
        }).then(function (buffer) {
            return Promise.all([crypto.hashBuffer(buffer), Promise.resolve(buffer)]);
        }).then(function (result) {
            const hash = result[0];
            const buffer = result[1];
            const expectedHahsh = downloadReply.file.hash;
            if (hash !== expectedHahsh) {
                throw INVALID_HASH_ERROR.newErrorReply(null, {expectedHahsh, hash, key: request.key});
            }
            downloadReply.buffer = buffer;
            return downloadReply;
        });
    };

    client.execute = function (request) {
        checkEventType(request, protocol.EXECUTE_REQUEST);
        return client.postRequest(request);
    };

    client.fileDelete = function (request) {
        checkEventType(request, protocol.FILE_DELETE_REQUEST);
        return client.postRequest(request);
    };

    client.fileSearch = function (request) {
        checkEventType(request, protocol.FILE_SEARCH_REQUEST);
        return client.postRequest(request);
    };

    client.getClientList = function (request) {
        checkEventType(request, protocol.CLIENT_LIST_REQUEST);
        return client.postRequest(request);
    };

    client.groupSearch = function (request) {
        checkEventType(request, protocol.USER_GROUP_SEARCH_REQUEST);
        return client.postRequest(request);
    };

    client.groupWrite = function (request) {
        checkEventType(request, protocol.USER_GROUP_WRITE_REQUEST);
        return client.postRequest(request);
    };

    client.groupWriteUpdate = function (request) {
        checkEventType(request, protocol.USER_GROUP_WRITE_UPDATE_REQUEST);
        return client.postRequest(request);
    };

    client.log = (loggingMessage) => sendEvent(loggingMessage);

    client.loggingDelete = function (request) {
        checkEventType(request, protocol.LOGGING_DELETE_REQUEST);
        return client.postRequest(request);
    };

    client.loggingEntrySearch = function (request) {
        checkEventType(request, protocol.LOGGING_ENTRY_SEARCH_REQUEST);
        return client.postRequest(request);
    };

    client.loggingMetricSearch = function (request) {
        checkEventType(request, protocol.LOGGING_METRIC_SEARCH_REQUEST);
        return client.postRequest(request);
    };

    client.loggingMetricTime = function (request) {
        checkEventType(request, protocol.LOGGING_METRIC_TIME_REQUEST);
        return client.postRequest(request);
    };

    client.login = function (request) {
        checkEventType(request, protocol.LOGIN_REQUEST);
        return client.postRequest(request).then(function (reply) {
            client.clientId(reply.clientId);
            client.user(reply.user);
            client.client(reply.client);
            client.version(reply.version);
            LOG.info(`onLogin, clientId=${client.clientId()}, userAgent=${navigator.userAgent}`);
            if (reply.isAuth) {
                client.configs(reply.configs);
                client.groups(reply.groups);
                client.isAuth(reply.isAuth);
            }
            return reply;
        });
    };

    client.logout = function (request) {
        checkEventType(request, protocol.LOGOUT_REQUEST);
        return client.postRequest(request).then(function (reply) {
            client.isAuth(false);
            return reply;
        });
    };

    client.postClientNotification = (clientNotification) => sendEvent(clientNotification);

    client.recoverUser = function (request) {
        checkEventType(request, protocol.RECOVER_USER_REQUEST);
        return client.postRequest(request);
    };

    client.resetPassword = function (request) {
        checkEventType(request, protocol.RESET_PASSWORD_REQUEST);
        return client.postRequest(request);
    };

    client.search = function (request) {
        checkEventType(request, protocol.SEARCH_REQUEST);
        return client.postRequest(request);
    };

    client.getServiceInfo = function (request) {
        checkEventType(request, protocol.SERVICE_INFO_REQUEST);
        return client.postRequest(request);
    };

    client.subscribe = function (request) {
        checkEventType(request, protocol.SUBSCRIPTION_REQUEST);
        return client.postRequest(request);
    };

    client.upload = function (request) {
        checkEventType(request, protocol.UPLOAD_REQUEST);
        return client.postRequest(request).then(function (reply) {
            const fileKey = reply.key;
            const uploadProgress = newFileProgress();
            activeUploadProgress = uploadProgress;
            reply.progress = uploadProgress;
            reply.donePromise = new Promise(function (resolve, reject) {
                const subscription = {};
                let eventHandle;
                subscription[protocol.CLIENT_NOTIFICATION] = function (clientNotification) {
                    if (fileKey === _.get(clientNotification, ["data", "key"])) {
                        client.unregisterOnEvent(eventHandle);
                        switch (clientNotification.action) {
                        case protocol.UPLOAD_DONE_NOTIFICATION_ACTION:
                            resolve();
                            break;
                        case protocol.UPLOAD_FAIL_NOTIFICATION_ACTION:
                            reject();
                            break;
                        }
                    }
                };
                eventHandle = client.registerOnEvent(subscription);
            });
            reply.sendFile = function (buffer) {
                LOG.debug(`sendFile, fileKey=${fileKey}`);
                uploadProgress.fileKey(fileKey);
                uploadProgress.size(ext.formatBytes(buffer.byteLength));
                postWorkerRequest({
                    data: {buffer, bufferSize: UPLOAD_BUFFER_SIZE},
                    transferables: [buffer],
                    type: WORKER_PROTOCOL.REQUESTS.UPLOAD
                });
                return Promise.resolve(undefined);
            };
            return reply;
        });
    };

    client.uploadAwait = function (request, buffer) {
        checkEventType(request, protocol.UPLOAD_REQUEST);
        const size = buffer.byteLength;
        LOG.debug(`uploadAwait, size=${size}`);
        let fileHash;
        let uploadReply;
        return crypto.hashBuffer(buffer).then(function (hash) {
            fileHash = hash;
            return client.upload(protocol.uploadRequest(Object.assign({}, request, {hash: fileHash, size})));
        }).then(function (reply) {
            if (reply.duplicate) {
                return Promise.reject(UPLOAD_DUPLICATE_ERROR.newErrorReply(null, {hash: fileHash}));
            }
            uploadReply = reply;
            return Promise.all([
                uploadReply.donePromise,
                uploadReply.sendFile(buffer)
            ]);
        }).then(function () {
            LOG.debug("uploadAwait done");
            return uploadReply;
        });
    };

    client.userSearch = function (request) {
        checkEventType(request, protocol.USER_SEARCH_REQUEST);
        return client.postRequest(request);
    };

    client.userWrite = function (request) {
        checkEventType(request, protocol.USER_WRITE_REQUEST);
        return client.postRequest(request);
    };

    client.userWriteUpdate = function (request) {
        checkEventType(request, protocol.USER_WRITE_UPDATE_REQUEST);
        return client.postRequest(request);
    };

    client.write = function (request) {
        checkEventType(request, protocol.WRITE_REQUEST);
        return client.postRequest(request);
    };

    client.writeObjectUpdate = function (request) {
        checkEventType(request, protocol.WRITE_OBJECT_UPDATE_REQUEST);
        return client.postRequest(request);
    };

    client.postRequest = function (request) {
        if (client.isConnected() === false) {
            return Promise.reject(NO_CONNECTION_ERROR.newErrorReply());
        }
        return new Promise(function (resolve, reject) {
            const id = crypto.newUuid().substring(0, 8);
            const requestMessage = protocol.requestMessage({id, request});
            openRequests.set(requestMessage.id, {reject, resolve});
            client.hasOpenRequests(true);
            sendEvent(requestMessage);
        });
    };

    client.registerOnEvent = ext.registerOnEvent.bind(undefined, subscriptions);
    client.unregisterOnEvent = ext.unregister.bind(undefined, subscriptions);

    client.onEvent = function (event) {
        const eventType = event[protocol.EVENT_TYPE_PROPERTY];
        const id = _.get(event, "id");
        if (protocol.REPLY_MESSAGE === eventType && _.isEmpty(id) === false) {
            const promise = openRequests.get(id);
            if (promise) {
                openRequests.delete(id);
                client.hasOpenRequests(0 < openRequests.size);
                if (protocol.ERROR_REPLY === event.reply[protocol.REPLY_TYPE_PROPERTY]) {
                    promise.reject(event.reply);
                } else {
                    promise.resolve(event.reply);
                }
            } else {
                LOG.warn(`onEvent openRequests missing, id=${id}`);
            }
        }
        subscriptions.forEach(function notify(subscription) {
            const eventHandler = _.get(subscription, eventType);
            if (_.isFunction(eventHandler)) {
                eventHandler(event);
            }
        });
    };

    clientWorker.addEventListener("message", function (event) {
        ko.tasks.schedule(function () {
            const message = event.data;
            const type = message.type;
            let metaEvent;
            switch (type) {
            case WORKER_PROTOCOL.REPLIES.WEBSOCKET_OPEN:
                client.isConnected(true);
                if (connectPromiseResolve) {
                    connectPromiseResolve();
                    connectPromiseResolve = undefined;
                    connectPromiseReject = undefined;
                }
                break;
            case WORKER_PROTOCOL.REPLIES.WEBSOCKET_CLOSE:
                LOG.debug(`message close, code=${message.data.code}, reason=${message.data.reason}`, message);
                client.closeEvent(message.data);
                if (disconnectResolve) {
                    disconnectResolve(message.data.code);
                    disconnectResolve = undefined;
                }
                client.isConnected(false);
                break;
            case WORKER_PROTOCOL.REPLIES.WEBSOCKET_ERROR:
                LOG.warn(`message error, code=${message.data.code}, reason=${message.data.reason}`, message);
                if (connectPromiseReject) {
                    connectPromiseReject(message.data);
                    connectPromiseReject = undefined;
                    connectPromiseResolve = undefined;
                } else {
                    client.isConnected(false);
                }
                break;
            case WORKER_PROTOCOL.REPLIES.WEBSOCKET_TEXT:
                try {
                    metaEvent = JSON.parse(message.data);
                    LOG.debug("message text", metaEvent);
                    client.onEvent(metaEvent);
                } catch (exc) {
                    LOG.warn("message text failed", exc, message);
                }
                break;
            case WORKER_PROTOCOL.REPLIES.UPLOAD_PROGRESS:
                if (activeUploadProgress) {
                    if (_.has(message.data, "sizeDone")) {
                        activeUploadProgress.sizeDone(ext.formatBytes(message.data.sizeDone));
                    }
                    if (_.has(message.data, "chunks")) {
                        activeUploadProgress.noOfChunks(message.data.chunks);
                    }
                    if (_.has(message.data, "chunksDone")) {
                        activeUploadProgress.noOfDoneChunks(message.data.chunksDone);
                    }
                }
                break;
            case WORKER_PROTOCOL.REPLIES.DOWNLOAD_PROGRESS:
                if (activeDownloadProgress) {
                    activeDownloadProgress.sizeDone(ext.formatBytes(message.data.size));
                    activeDownloadProgress.noOfDoneChunks(message.data.chunks);
                }
                break;
            case WORKER_PROTOCOL.REPLIES.DOWNLOAD_DONE:
                if (activeDownloadResolve) {
                    activeDownloadResolve(new Blob(message.data));
                    activeDownloadResolve = undefined;
                    activeDownloadProgress = undefined;
                }
                break;
            case WORKER_PROTOCOL.REPLIES.DOWNLOAD_FAILED:
                if (activeDownloadReject) {
                    activeDownloadReject();
                    activeDownloadReject = undefined;
                    activeDownloadProgress = undefined;
                }
                break;
            }
        });
    });

    clientWorker.postMessage({
        data: JSON.stringify(WORKER_PROTOCOL)
    });

    return Object.freeze(client);
};

clientFactory.SEND_LOG_CTX = SEND_LOG_CTX;
clientFactory.forgeEngineWebSocketUrl = forgeEngineWebSocketUrl;
clientFactory.forgeEngineRestUrl = forgeEngineRestUrl;
clientFactory.forgeEngineUrl = forgeEngineUrl;

export default Object.freeze(clientFactory);