TT#2723 Implement janus-client JS library to communicate from App to Janus

Change-Id: Ie65b144f30cf35711a7f31709b1fc1db00cd4cf8
changes/54/8554/2
Hans-Peter Herzog 10 years ago
parent 4d7d5c520c
commit c31a9d00a9

@ -0,0 +1,12 @@
root = true
[*]
indent_style = space
indent_size = 4
end_of_line = lf
charset = utf-8
trim_trailing_whitespace = true
insert_final_newline = true
[*.md]
trim_trailing_whitespace = false

23
.gitignore vendored

@ -0,0 +1,23 @@
# Eclipse
.classpath
.project
.settings/
# Intellij
.idea/
*.iml
*.iws
# Mac
.DS_Store
# Maven
log/
target/
# Node.js
node_modules/
npm-debug.log
# Bower
bower_components/

@ -0,0 +1,22 @@
{
"name": "janus-client",
"version": "1.0.0",
"main": "src/janus.js",
"scripts": {
"test": "mocha -R nyan --delay"
},
"author": "",
"license": "ISC",
"description": "",
"dependencies": {
"bluebird": "^3.4.1",
"debug-logger": "^0.4.1",
"lodash": "^4.14.1",
"uuid": "^2.0.2",
"ws": "^1.1.1"
},
"devDependencies": {
"chai": "^3.5.0",
"mocha": "^3.0.1"
}
}

@ -0,0 +1,222 @@
'use strict';
var _ = require('lodash');
var WebSocket = require('ws');
var EventEmitter = require('events').EventEmitter;
var Promise = require('bluebird');
var Transaction = require('./transaction').Transaction;
var logger = require('debug-logger')('janus:client');
var ConnectionState = {
CONNECTED: 'CONNECTED',
DISCONNECTED: 'DISCONNECTED'
};
var ClientEvent = {
connected: 'connected',
disconnected: 'disconnected',
object: 'object',
error: 'error',
timeout: 'timeout'
};
var WebSocketEvent = {
open: 'open',
message: 'message',
error: 'error',
close: 'close'
};
/**
* @class
*/
class ClientResponse {
constructor(req, res) {
this.request = req;
this.response = res;
}
getRequest() {
return this.request;
}
getResponse() {
return this.response;
}
getType() {
return _.get(this.response, 'janus', null);
}
isError() {
return this.getType() === 'error';
}
isAck() {
return this.getType() === 'ack';
}
isSuccess() {
return this.getType() === 'success';
}
}
/**
* @class
*/
class Client {
constructor(options) {
options = options || {};
this.url = options.url || 'ws://localhost:8188';
this.logger = options.logger || logger || console;
this.requestTimeout = options.requestTimeout || 6000;
this.protocol = 'janus-protocol';
this.webSocket = null;
this.connectionState = ConnectionState.DISCONNECTED;
this.emitter = new EventEmitter();
this.transactions = {};
this.timeoutTimer = null;
this.timeout = options.timeout || 60000;
}
startTimeout() {
this.stopTimeout();
this.timeoutTimer = setTimeout(()=>{
this.emitter.emit(ClientEvent.timeout);
}, this.timeout);
}
stopTimeout() {
if(this.timeoutTimer !== null) {
clearTimeout(this.timeoutTimer);
}
}
connect() {
this.webSocket = new WebSocket(this.url, this.protocol);
this.webSocket.on(WebSocketEvent.open, ()=>{
this.webSocketOpen();
});
this.webSocket.on(WebSocketEvent.close, ()=>{
this.webSocketClose();
});
this.webSocket.on(WebSocketEvent.message, (message)=>{
this.webSocketMessage(message);
});
this.webSocket.on(WebSocketEvent.error, (err)=>{
this.webSocketError(err);
});
}
webSocketOpen() {
this.setConnectionState(ConnectionState.CONNECTED);
}
webSocketClose() {
this.setConnectionState(ConnectionState.DISCONNECTED);
}
webSocketMessage(message) {
this.startTimeout();
var obj;
try {
obj = JSON.parse(message);
this.logger.debug('Received message', obj);
this.dispatchObject(obj);
} catch(err) {
this.emitter.emit(ClientEvent.error, err);
}
}
webSocketError(err) {
this.emitter.emit(ClientEvent.error, err);
}
setConnectionState(state) {
var hasState = this.connectionState === state;
if(!hasState) {
switch(state) {
case ConnectionState.CONNECTED:
case ConnectionState.DISCONNECTED:
this.connectionState = state;
this.emitter.emit(state.toLowerCase());
break;
default:
throw new Error('Invalid state ' + state);
}
}
}
on(type, listener) {
this.emitter.on(type, listener);
}
off(type, listener) {
this.emitter.removeListener(type, listener);
}
dispatchObject(obj) {
if(_.isString(obj.transaction) && this.transactions[obj.transaction] instanceof Transaction) {
var transaction = this.transactions[obj.transaction];
var response = new ClientResponse(transaction.getRequest(), obj);
transaction.response(response);
} else {
this.emitter.emit(ClientEvent.object, obj);
}
}
sendObject(obj) {
return new Promise((resolve, reject)=>{
this.webSocket.send(JSON.stringify(obj), (err)=>{
if(_.isObject(err)) {
reject(err);
} else {
this.logger.debug('Sent message', obj);
resolve();
}
});
});
}
transact(req) {
var transaction = new Transaction(req, (finalReq)=>{
this.sendObject(finalReq).then(()=>{
}).catch((err)=>{
transaction.error(err);
});
});
this.transactions[transaction.getId()] = transaction;
transaction.onEnd(()=>{
delete this.transactions[transaction.getId()];
});
return transaction;
}
request(req, options) {
return new Promise((resolve, reject)=>{
var options = options || {};
var requestTimeout = options.requestTimeout || this.requestTimeout;
var transaction = this.transact(req).onAck((res)=>{
transaction.end();
resolve(res);
}).onResponse((res)=>{
transaction.end();
resolve(res);
}).onError(function(err){
transaction.offError(this.constructor);
reject(err);
}).timeout(requestTimeout).start();
});
}
}
exports.Client = Client;
exports.ClientEvent = ClientEvent;
exports.ConnectionState = ConnectionState;
exports.WebSocketEvent = WebSocketEvent;
exports.ClientResponse = ClientResponse;

@ -0,0 +1,10 @@
exports.PluginNames = {
VideoRoom: 'janus.plugin.videoroom'
};
exports.JanusEvents = {
webrtcup: 'webrtcup',
media: 'media',
hangup: 'hangup'
};

@ -0,0 +1,69 @@
'use strict';
var _ = require('lodash');
/**
* @class
*/
class ResponseError extends Error {
constructor(req, res) {
super();
this.name = this.constructor.name;
this.message = _.get(res, 'error.reason', null);
this.code = _.get(res, 'error.code', null);
this.request = req;
this.response = res;
}
getCode() {
return this.code;
}
getMessage() {
return this.message;
}
getRequest() {
return this.request;
}
getResponse() {
return this.response;
}
}
/**
* @class
*/
class PluginError extends ResponseError {
constructor(req, res, plugin) {
super(req, res);
this.message = _.get(res, 'plugindata.data.error', null);
this.code = _.get(res, 'plugindata.data.error_code', null);
this.plugin = plugin;
}
getPlugin() {
return this.plugin;
}
}
/**
* @class
*/
class RequestTimeoutError extends Error {
constructor(req) {
super();
this.name = this.constructor.name;
this.message = 'Request timeout';
this.request = req;
}
}
module.exports.ResponseError = ResponseError;
module.exports.RequestTimeoutError = RequestTimeoutError;
module.exports.PluginError = PluginError;

@ -0,0 +1,170 @@
'use strict';
var _ = require('lodash');
var Promise = require('bluebird');
var client = require('./client');
var Client = client.Client;
var ClientEvent = client.ClientEvent;
var EventEmitter = require('events').EventEmitter;
var Session = require('./session').Session;
var ResponseError = require('./errors').ResponseError;
var logger = require('debug-logger')('janus');
var State = {
connected: 'connected',
disconnected: 'disconnected'
};
/**
* @class
*/
class Janus {
constructor(options) {
options = options || {};
this.url = options.url;
this.logger = options.logger || logger || console;
this.client = options.client || new Client({
url: this.url
});
this.client.on(ClientEvent.connected, ()=> {
this.clientConnected();
});
this.client.on(ClientEvent.disconnected, ()=> {
this.clientDisconnected();
});
this.client.on(ClientEvent.object, (obj)=> {
this.clientObject(obj);
});
this.client.on(ClientEvent.error, (err)=> {
this.clientError(err);
});
this.hasInfo = false;
this.info = {
version_string: null,
plugins: {}
};
this.emitter = new EventEmitter();
this.sessions = {};
this.state = State.disconnected;
}
clientConnected() {
this.state = State.connected;
this.getInfo().then((res)=>{
this.info = res.response;
this.emitter.emit('connected');
}).catch((err)=>{
this.emitter.emit('error', err);
});
}
clientDisconnected() {
this.state = State.disconnected;
}
clientObject(obj) {
if(obj.session_id && this.hasSession(obj.session_id)) {
this.sessions[obj.session_id].emitEvent(obj);
} else if(obj.janus === 'timeout' && this.hasSession(obj.session_id)) {
this.deleteSession(obj.session_id);
} else if(obj.janus === 'timeout') {
// Todo: Log dropped timeout
} else {
// Todo: emit janus event
}
}
clientError(err) {
this.clientDisconnected();
this.emitter.emit('error', err);
}
connect() {
this.client.connect();
}
createSession() {
return new Promise((resolve, reject)=>{
this.client.request({ janus: 'create' }).then((res)=>{
if(res.isSuccess()) {
var session = new Session(res.getResponse().data.id, this);
this.sessions[session.getId()] = session;
this.logger.info('Created session=%s',session.getId());
session.onKeepAlive((result)=>{
if(result) {
this.logger.debug('KeepAlive session=%s', session.getId());
} else {
this.logger.warn('KeepAlive failed session=%s', session.getId());
}
});
session.onTimeout(()=>{
this.logger.info('Timeout session=%s',session.getId());
this.deleteSession(session.getId());
});
resolve(session);
} else {
reject(new ResponseError(res.getRequest(), res));
}
}).catch((err)=>{
reject(err);
});
});
}
hasSession(id) {
return this.sessions[id] instanceof Session;
}
deleteSession(id) {
delete this.sessions[id];
this.logger.info('Deleted session=%s', id);
this.logger.info('Sessions count=%s', Object.keys(this.sessions).length);
}
getInfo() {
return new Promise((resolve, reject)=>{
this.client.request({ janus: 'info' }).then((res)=>{
if(res.getType() === 'server_info') {
this.hasInfo = true;
resolve(res);
} else {
reject(new ResponseError(res.getRequest(), res));
}
}).catch((err)=>{
reject(err);
});
});
}
getVersion() {
return (this.hasInfo)? this.info.version_string : '';
}
transact() {
return this.client.transact.apply(this.client, arguments);
}
request() {
return this.client.request.apply(this.client, arguments);
}
isConnected() {
return this.state === State.connected;
}
onConnected(listener) {
this.emitter.on('connected', listener);
}
onDisconnected(listener) {
this.emitter.on('disconnected', listener);
}
onError(listener) {
this.emitter.on('error', listener);
}
}
exports.Janus = Janus;

@ -0,0 +1,3 @@
exports.VideoRoom = require('./videoroom-handle').VideoRoomHandle;

@ -0,0 +1,98 @@
'use strict';
var _ = require('lodash');
var EventEmitter = require('events').EventEmitter;
var JanusEvents = require('../constants').JanusEvents;
/**
* @class
*/
class PluginHandle {
constructor(name, id, session) {
this.name = name;
this.id = id;
this.session = session;
this.emitter = new EventEmitter();
}
getName() {
return this.name;
}
getId() {
return this.id;
}
getSession() {
return this.session;
}
emitEvent(event) {
switch(event.janus) {
case JanusEvents.webrtcup:
this.emitter.emit(JanusEvents.webrtcup);
break;
case JanusEvents.media:
this.emitter.emit(JanusEvents.media);
break;
case JanusEvents.hangup:
this.emitter.emit(JanusEvents.hangup);
break;
default:
}
}
onWebrtcUp(listener) {
this.emitter.addListener(JanusEvents.webrtcup, listener);
}
onMedia(listener) {
this.emitter.addListener(JanusEvents.media, listener);
}
onHangup(listener) {
this.emitter.addListener(JanusEvents.hangup, listener);
}
transact(obj) {
obj.handle_id = this.getId();
return this.session.transact(obj);
}
request(obj, options) {
obj.handle_id = this.getId();
return this.session.request(obj, options);
}
transactJsepMessage(body, jsep) {
return this.transact({
janus: 'message',
body: body,
jsep: jsep
});
}
transactMessage(body) {
return this.transact({
janus: 'message',
body: body
});
}
requestMessage(body, options) {
return new Promise((resolve, reject)=>{
this.request({
janus: 'message',
body: body
}, options).then((res)=>{
resolve(res);
}).catch((err)=>{
reject(err);
});
});
}
}
exports.PluginHandle = PluginHandle;

@ -0,0 +1,388 @@
'use strict';
var createId = require('uuid').v4;
var _ = require('lodash');
var Promise = require('bluebird');
var PluginHandle = require('./plugin-handle').PluginHandle;
var PluginNames = require('../constants').PluginNames;
var PluginError = require('../errors').PluginError;
var EventEmitter = require('events').EventEmitter;
var logger = require('debug-logger')('janus:videoroom');
/**
* @class
*/
class VideoRoomHandle extends PluginHandle {
constructor(options) {
super(PluginNames.VideoRoom, options.id, options.session);
}
create() {
return new Promise((resolve, reject)=>{
this.requestMessage({
request: 'create'
}).then((res)=>{
resolve(new VideoRoom({
room: _.get(res.getResponse(), 'plugindata.data.room', null)
}, this));
}).catch((err)=>{
reject(err);
});
});
}
list() {
return new Promise((resolve, reject)=>{
this.requestMessage({
request: 'list'
}).then((res)=>{
resolve(_.get(res.getResponse(), 'plugindata.data.list', []));
}).catch((err)=>{
reject(err);
});
});
}
joinPublisher(room) {
return new Promise((resolve, reject)=>{
var transaction = this.transactMessage({
request: 'join',
ptype: 'publisher',
room: room
}).onResponse((res)=>{
var videoRoom = _.get(res.getResponse(), 'plugindata.data.videoroom', null);
var errorCode = _.get(res.getResponse(), 'plugindata.data.error', null);
if(errorCode !== null) {
reject(new PluginError(transaction.getRequest(), res.getResponse(), this));
} else if(videoRoom === 'joined') {
resolve(res.getResponse());
} else {
reject(new Error('Unknown response'));
}
}).start();
});
}
joinListener(room, feed) {
return new Promise((resolve, reject)=>{
var transaction = this.transactMessage({
request: 'join',
ptype: 'listener',
room: room,
feed: feed
}).onResponse((res)=>{
var videoRoom = _.get(res.getResponse(), 'plugindata.data.videoroom', null);
var errorCode = _.get(res.getResponse(), 'plugindata.data.error', null);
if(errorCode !== null) {
reject(new PluginError(transaction.getRequest(), res.getResponse(), this));
} else if(videoRoom === 'attached') {
resolve(res.getResponse());
} else {
reject(new Error('Unknown response'));
}
}).start();
});
}
configure(options) {
return new Promise((resolve, reject)=>{
var audio = _.get(options, 'audio', true);
var video = _.get(options, 'video', true);
var jsep = _.get(options, 'jsep', null);
if(jsep === null) {
throw new Error('Missing argument jsep');
}
var transaction = this.transactJsepMessage({
request: 'configure',
audio: audio,
video: video
}, options.jsep).onResponse((res)=>{
var configured = _.get(res.getResponse(), 'plugindata.data.configured', null);
var errorCode = _.get(res.getResponse(), 'plugindata.data.error', null);
if(errorCode !== null) {
reject(new PluginError(transaction.getRequest(), res.getResponse(), this));
} else if(configured === 'ok') {
resolve(res.getResponse());
} else {
reject(new Error('Unknown response'));
}
}).start();
});
}
start(options) {
return new Promise((resolve, reject)=>{
var jsep = _.get(options, 'jsep', null);
if(jsep === null) {
throw new Error('Missing argument jsep');
}
var transaction = this.transactJsepMessage({
request: 'start',
room: options.room,
feed: options.feed
}, options.jsep).onResponse((res)=>{
var started = _.get(res.getResponse(), 'plugindata.data.started', null);
var errorCode = _.get(res.getResponse(), 'plugindata.data.error', null);
if(errorCode !== null) {
reject(new PluginError(transaction.getRequest(), res.getResponse(), this));
} else if(started === 'ok') {
resolve(res.getResponse());
} else {
reject(new Error('Unknown response'));
}
}).start();
});
}
publish(options) {
return new Promise((resolve, reject)=>{
var joinResult = null;
var room = _.get(options, 'room', null);
if(room === null) {
throw new Error('Missing argument room');
}
this.joinPublisher(options.room).then((res)=>{
joinResult = res;
return this.configure({
audio: options.audio,
video: options.video,
jsep: options.jsep
});
}).then((res)=>{
resolve({
id: _.get(joinResult, 'plugindata.data.id', null),
publishers: _.get(joinResult, 'plugindata.data.publishers', []),
answer: _.get(res, 'jsep.sdp', null)
});
}).catch((err)=>{
reject(err);
});
});
}
listen(options) {
return new Promise((resolve, reject)=>{
this.joinListener(options.room, options.feed).then((joinResult)=>{
resolve({
id: _.get(joinResult, 'plugindata.data.id', null),
offer: _.get(joinResult, 'jsep.sdp', null)
});
}).catch((err)=>{
reject(err);
});
});
}
trickle(candidate) {
return this.request({
janus: 'trickle',
candidate: candidate
});
}
createPublisher(room) {
return new Promise((resolve, reject)=>{
var publisher = new Publisher({
session: this.session,
room: room
});
publisher.init().then(()=>{
resolve(publisher);
}).catch((err)=>{
reject(err);
});
});
}
createListener(room, feed) {
return new Promise((resolve, reject)=>{
var listener = new Listener({
session: this.session,
room: room,
feed: feed
});
listener.init().then(()=>{
resolve(listener);
}).catch((err)=>{
reject(err);
});
});
}
}
/**
* @class
*/
class VideoRoom {
constructor(options, handle) {
this.room = options.room;
this.handle = handle;
}
}
/**
* @class
*/
class VideoRoomParticipant {
constructor(options) {
this.session = options.session;
this.offer = options.offer;
this.room = options.room;
this.handle = null;
this.answer = null;
}
init() {
return new Promise((resolve, reject)=>{
this.session.createVideoRoomHandle().then((handle)=>{
this.handle = handle;
resolve();
}).catch((err)=>{
reject(err);
});
});
}
getRoom() {
return this.room;
}
setOffer(offer) {
this.offer = offer;
}
getOffer() {
return this.offer;
}
setAnswer(answer) {
this.answer = answer;
}
getAnswer() {
return this.answer;
}
trickle(candidate) {
return this.handle.trickle(candidate);
}
}
/**
* @class
*/
class Publisher extends VideoRoomParticipant {
constructor(options) {
super(options);
this.id = options.id;
this.emitter = new EventEmitter();
this.listeners = {};
}
getId() {
return this.id;
}
addListener(listener) {
this.listeners[listener.getFeed()] = listener;
}
removeListener(id) {
delete this.listeners[id];
}
join(offer) {
return new Promise((resolve, reject)=>{
this.setOffer(offer);
this.handle.publish({
room: this.room,
jsep: {
type: 'offer',
sdp: this.getOffer()
}
}).then((result)=>{
this.id = result.id;
this.answer = result.answer;
_.forEach(result.publishers, (publisher)=>{
this.handle.createListener(this.room, publisher.id).then((listener)=>{
this.addListener(listener);
return listener.createOffer();
}).then(()=>{
this.emitter.emit('joined', this.listeners[publisher.id]);
}).catch((err)=>{
logger.error(err);
});
});
resolve();
}).catch((err)=>{
reject(err);
});
});
}
onJoined(listener) {
this.emitter.on('joined', listener);
}
}
/**
* @class
*/
class Listener extends VideoRoomParticipant {
constructor(options) {
super(options);
this.feed = options.feed;
}
getFeed() {
return this.feed;
}
createOffer() {
return new Promise((resolve, reject)=>{
this.handle.listen({
room: this.room,
feed: this.feed
}).then((result)=>{
this.offer = result.offer;
resolve();
}).catch((err)=>{
reject(err);
});
});
}
setAnswer(sdp) {
this.answer = sdp.replace(/a=(sendrecv|sendonly)/, 'a=recvonly');
}
setRemoteAnswer(answer) {
return new Promise((resolve, reject)=>{
this.setAnswer(answer);
this.handle.start({
room: this.room,
feed: this.feed,
jsep: {
type: 'answer',
sdp: this.getAnswer()
}
}).then(()=>{
resolve();
}).catch((err)=>{
reject(err);
});
});
}
}
exports.VideoRoomHandle = VideoRoomHandle;

@ -0,0 +1,156 @@
'use strict';
var _ = require('lodash');
var Promise = require('bluebird');
var Plugins = require('./plugins');
var PluginNames = require('./constants').PluginNames;
var EventEmitter = require('events').EventEmitter;
var logger = require('debug-logger')('janus:session');
var State = {
alive: 'alive',
dying: 'dying',
dead: 'dead'
};
/**
* @class
*/
class Session {
constructor(id, janus) {
this.id = id;
this.janus = janus;
this.pluginHandles = {};
this.keepAliveTimer = null;
this.keepAliveInterval = 30000;
this.keepAliveFails = 2;
this.keepAliveFailCount = 0;
this.emitter = new EventEmitter();
this.state = (this.janus.isConnected()) ? State.alive : State.dead;
this.startKeepAlive();
}
keepAlive() {
return this.janus.request({
janus: 'keepalive',
session_id: this.id
});
}
startKeepAlive() {
this.stopKeepAlive();
this.keepAliveTimer = setInterval(()=> {
this.keepAlive().then(()=>{
this.keepAliveFailCount = 0;
this.state = State.alive;
this.emitter.emit('keepalive', true);
}).catch(()=>{
this.keepAliveFailCount++;
this.state = State.dying;
this.emitter.emit('keepalive', false);
if(this.keepAliveFailCount === this.keepAliveFails) {
this.state = State.dead;
this.timeout();
}
});
}, this.keepAliveInterval);
}
stopKeepAlive() {
if(this.keepAliveTimer !== null) {
clearInterval(this.keepAliveTimer);
}
}
transact(obj) {
this.startKeepAlive();
obj.session_id = this.id;
return this.janus.transact(obj);
}
request(obj, options) {
this.startKeepAlive();
obj.session_id = this.id;
return this.janus.request(obj, options);
}
createPluginHandle(pluginName) {
return new Promise((resolve, reject)=>{
this.request({
janus: 'attach',
plugin: pluginName
}).then((res)=>{
logger.info('Created handle plugin=%s handle=%s', pluginName, res.getResponse().data.id);
resolve(res.getResponse().data.id);
}).catch((err)=>{
reject(err);
});
});
}
addPluginHandle(pluginHandle) {
this.pluginHandles[pluginHandle.getId()] = pluginHandle;
}
createVideoRoomHandle() {
return new Promise((resolve, reject)=>{
this.createPluginHandle(PluginNames.VideoRoom).then((id)=>{
var pluginHandle = new Plugins.VideoRoom({
session: this,
id: id
});
this.addPluginHandle(pluginHandle);
resolve(pluginHandle);
}).catch((err)=>{
reject(err);
});
});
}
getId() {
return this.id;
}
getState() {
return this.state;
}
isAlive() {
return this.state === State.alive;
}
timeout() {
this.destroy();
this.emitter.emit('timeout');
}
onTimeout(listener) {
this.emitter.on('timeout', listener);
}
onKeepAlive(listener) {
this.emitter.on('keepalive', listener);
}
/**
* @param event
* @param [event.sender]
*/
emitEvent(event) {
if(event.sender && _.isObject(this.pluginHandles[event.sender])) {
this.pluginHandles[event.sender].emitEvent(event);
} else {
this.emitter.emit('event', event);
}
}
destroy() {
this.stopKeepAlive();
this.pluginHandles = {};
this.janus = null;
}
}
exports.Session = Session;
exports.SessionState = State;

@ -0,0 +1,183 @@
'use strict';
var _ = require('lodash');
var createId = require('uuid');
var EventEmitter = require('events').EventEmitter;
var ResponseError = require('./errors').ResponseError;
var State = {
new: 'new',
started: 'started',
ended: 'ended'
};
var Event = {
response: 'response',
end: 'end',
error: 'error'
};
/**
* @class
*/
class InvalidTransactionState {
constructor(transaction) {
this.name = this.constructor.name;
this.message = 'Invalid transaction state';
this.transaction = transaction;
this.state = transaction.getState();
}
}
/**
* @class
*/
class TransactionTimeoutError {
constructor(transaction, timeout) {
this.name = this.constructor.name;
this.message = 'Transaction timeout';
this.transaction = transaction;
this.timeout = timeout;
}
}
/**
* @class
*/
class Transaction {
constructor(request, handler) {
this.id = createId();
this.request = request;
this.request.transaction = this.id;
this.emitter = new EventEmitter();
this.handler = handler;
this.state = State.new;
this.timeoutTimer = null;
this.timeoutMilli = 6000;
this.useTimeout = false;
this.acknowledged = false;
}
getId() {
return this.id;
}
getRequest() {
return this.request;
}
getState() {
return this.state;
}
start() {
if(this.state === State.new) {
this.state = State.started;
this.startTimeout();
this.handler(this.getRequest());
} else {
throw new InvalidTransactionState(this);
}
return this;
}
response(res) {
if(this.state === State.started) {
this.startTimeout();
if(res.isError()) {
this.error(new ResponseError(res.getRequest(), res));
} else if(res.isAck()) {
this.acknowledged = true;
this.emitter.emit('ack');
} else {
this.emitter.emit(Event.response, res);
}
} else {
throw new InvalidTransactionState(this);
}
}
end() {
if(this.state !== State.ended) {
this.state = State.ended;
this.stopTimeout();
this.emitter.emit(Event.end);
} else {
throw new InvalidTransactionState(this);
}
}
error(err) {
this.end();
this.emitter.emit(Event.error, err);
}
onAck(listener) {
this.emitter.on('ack', listener);
return this;
}
onResponse(listener) {
this.emitter.on(Event.response, listener);
return this;
}
offResponse(listener) {
this.emitter.removeListener(Event.response, listener);
return this;
}
onEnd(listener) {
this.emitter.on(Event.end, listener);
return this;
}
offEnd(listener) {
this.emitter.removeListener(Event.end, listener);
return this;
}
onError(listener) {
this.emitter.on(Event.error, listener);
return this;
}
offError(listener) {
this.emitter.removeListener(Event.error, listener);
return this;
}
timeout(timeout) {
if(this.state === State.new) {
this.useTimeout = true;
this.timeoutMilli = timeout;
} else {
throw new InvalidTransactionState(this);
}
return this;
}
startTimeout() {
if(this.useTimeout) {
this.stopTimeout();
this.timeoutTimer = setTimeout(()=> {
this.error(new TransactionTimeoutError(this, this.timeoutMilli));
}, this.timeoutMilli);
}
}
stopTimeout() {
if(this.timeoutTimer !== null) {
clearTimeout(this.timeoutTimer);
}
}
isAcknowledged() {
return this.acknowledged;
}
}
exports.Transaction = Transaction;

File diff suppressed because one or more lines are too long
Loading…
Cancel
Save