Skip to content

Commit

Permalink
Merge pull request #433 from ryanseys/pubsub-v1beta2
Browse files Browse the repository at this point in the history
Use pubsub v1beta2
  • Loading branch information
stephenplusplus committed Mar 10, 2015
2 parents 751d97f + 27ebdc1 commit 7fb3c17
Show file tree
Hide file tree
Showing 7 changed files with 270 additions and 260 deletions.
44 changes: 21 additions & 23 deletions lib/pubsub/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var util = require('../common/util.js');
* @const {string} Base URL for Pub/Sub API.
* @private
*/
var PUBSUB_BASE_URL = 'https://www.googleapis.com/pubsub/v1beta1';
var PUBSUB_BASE_URL = 'https://pubsub.googleapis.com/v1beta2/';

/**
* @const {array} Required scopes for Pub/Sub API.
Expand Down Expand Up @@ -116,7 +116,7 @@ function PubSub(options) {
});

this.projectId = options.projectId;
this.projectName = '/projects/' + this.projectId;
this.projectName = 'projects/' + this.projectId;
}

/**
Expand All @@ -125,7 +125,7 @@ function PubSub(options) {
*
* @param {object=} query - Query object.
* @param {string=} query.pageToken - Page token.
* @param {number=} query.maxResults - Maximum number of results to return.
* @param {number=} query.pageSize - Max number of results to return.
* @param {function} callback - The callback function.
*
* @example
Expand All @@ -137,23 +137,23 @@ function PubSub(options) {
*
* // Customize the query.
* pubsub.getTopics({
* maxResults: 3
* pageSize: 3
* }, function(err, topics, nextQuery) {});
*/
PubSub.prototype.getTopics = function(query, callback) {
var that = this;
var self = this;
if (!callback) {
callback = query;
query = {};
}
query.query = 'cloud.googleapis.com/project in (' + this.projectName + ')';
this.makeReq_('GET', 'topics', query, true, function(err, result) {
var path = this.projectName + '/topics';
this.makeReq_('GET', path, query, true, function(err, result) {
if (err) {
callback(err);
return;
}
var topics = (result.topic || []).map(function(item) {
return new Topic(that, {
var topics = (result.topics || []).map(function(item) {
return new Topic(self, {
name: item.name
});
});
Expand All @@ -180,10 +180,8 @@ PubSub.prototype.getTopics = function(query, callback) {
PubSub.prototype.createTopic = function(name, callback) {
callback = callback || util.noop;
var topic = this.topic(name);
var req = {
name: topic.name
};
this.makeReq_('POST', 'topics', null, req, function(err) {
var path = this.projectName + '/topics/' + name;
this.makeReq_('PUT', path, null, null, function(err) {
if (err) {
callback(err);
return;
Expand Down Expand Up @@ -220,7 +218,7 @@ PubSub.prototype.topic = function(name) {
*
* @param {object=} query - Query object.
* @param {string=} query.pageToken - Page token.
* @param {number=} query.maxResults - Maximum number of results to return.
* @param {number=} query.pageSize - Maximum number of results to return.
* @param {function} callback - The callback function.
*
* @example
Expand All @@ -232,25 +230,25 @@ PubSub.prototype.topic = function(name) {
*
* // Customize the query.
* pubsub.getSubscriptions({
* maxResults: 10
* pageSize: 10
* }, function(err, subscriptions, nextQuery) {});
*/
PubSub.prototype.getSubscriptions = function(query, callback) {
var that = this;
var self = this;
if (!callback) {
callback = query;
query = {};
}
if (!query.query) {
query.query = 'cloud.googleapis.com/project in (' + this.projectName + ')';
}
this.makeReq_('GET', 'subscriptions', query, true, function(err, result) {

var path = this.projectName + '/subscriptions';
this.makeReq_('GET', path, query, true, function(err, result) {
if (err) {
callback(err);
return;
}
var subscriptions = (result.subscription || []).map(function(item) {
return new Subscription(that, {

var subscriptions = (result.subscriptions || []).map(function(item) {
return new Subscription(self, {
name: item.name
});
});
Expand Down Expand Up @@ -279,7 +277,7 @@ PubSub.prototype.makeReq_ = function(method, path, q, body, callback) {
var reqOpts = {
method: method,
qs: q,
uri: PUBSUB_BASE_URL + '/' + path
uri: PUBSUB_BASE_URL + path
};

if (body) {
Expand Down
118 changes: 76 additions & 42 deletions lib/pubsub/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ nodeutil.inherits(Subscription, events.EventEmitter);

/**
* Format the name of a subscription. A subscription's full name is in the
* format of /subscription/{projectId}/{name}.
* format of projects/{projectId}/subscriptions/{subName}.
*
* @private
*/
Expand All @@ -136,32 +136,35 @@ Subscription.formatName_ = function(projectId, name) {
if (name.indexOf('/') > -1) {
return name;
}
return '/subscriptions/' + projectId + '/' + name;
return 'projects/' + projectId + '/subscriptions/' + name;
};

/**
* Simplify a message from an API response to have two properties, `id` and
* `data`. `data` is always converted to a string.
* Simplify a message from an API response to have three properties, `id`,
* `data` and `attributes`. `data` is always converted to a string.
*
* @private
*/
Subscription.formatMessage_ = function(msg) {
var event = msg.pubsubEvent;

var innerMessage = msg.message;
var message = {
ackId: msg.ackId
};

if (event && event.message) {
message.id = event.message.messageId;
if (innerMessage) {
message.id = innerMessage.messageId;

if (event.message.data) {
message.data = new Buffer(event.message.data, 'base64').toString('utf-8');
if (innerMessage.data) {
message.data = new Buffer(innerMessage.data, 'base64').toString('utf-8');

try {
message.data = JSON.parse(message.data);
} catch(e) {}
}

if (innerMessage.attributes) {
message.attributes = innerMessage.attributes;
}
}

return message;
Expand All @@ -181,22 +184,22 @@ Subscription.formatMessage_ = function(msg) {
* this.listenForEvents_();
*/
Subscription.prototype.listenForEvents_ = function() {
var that = this;
var self = this;
var messageListeners = 0;

this.on('newListener', function(event) {
if (event === 'message') {
messageListeners++;
if (that.closed) {
that.closed = false;
if (self.closed) {
self.closed = false;
}
that.startPulling_();
self.startPulling_();
}
});

this.on('removeListener', function(event) {
if (event === 'message' && --messageListeners === 0) {
that.closed = true;
self.closed = true;
}
});
};
Expand All @@ -219,21 +222,22 @@ Subscription.prototype.listenForEvents_ = function() {
* subscription.startPulling_();
*/
Subscription.prototype.startPulling_ = function() {
var that = this;
var self = this;
if (this.closed) {
return;
}
this.pull({
maxResults: 1,
returnImmediately: false
}, function(err, message) {
}, function(err, messages) {
if (err) {
that.emit('error', err);
self.emit('error', err);
}
if (message) {
that.emit('message', message);
if (messages) {
messages.forEach(function(message) {
self.emit('message', message);
});
}
setTimeout(that.startPulling_.bind(that), that.interval);
setTimeout(self.startPulling_.bind(self), self.interval);
});
};

Expand All @@ -256,10 +260,10 @@ Subscription.prototype.ack = function(ids, callback) {
}
ids = util.arrayize(ids);
var body = {
subscription: this.name,
ackId: ids
ackIds: ids
};
this.makeReq_('POST', 'subscriptions/acknowledge', null, body, callback);
var path = this.name + ':acknowledge';
this.makeReq_('POST', path, null, body, callback);
};

/**
Expand All @@ -272,17 +276,17 @@ Subscription.prototype.ack = function(ids, callback) {
* subscription.delete(function(err) {});
*/
Subscription.prototype.delete = function(callback) {
var self = this;
callback = callback || util.noop;
this.makeReq_(
'DELETE', 'subscriptions/' + this.name, null, true, function(err) {
this.makeReq_('DELETE', this.name, null, true, function(err) {
if (err) {
callback(err);
return;
}
this.closed = true;
this.removeAllListeners();
self.closed = true;
self.removeAllListeners();
callback(null);
}.bind(this));
});
};

/**
Expand All @@ -295,7 +299,6 @@ Subscription.prototype.delete = function(callback) {
* `subscription.on('message', function() {})` event handler.
*
* @todo Should not be racing with other pull.
* @todo Fix API to return a list of messages.
*
* @param {object=} options - Configuration object.
* @param {boolean} options.returnImmediately - If set, the system will respond
Expand All @@ -311,9 +314,10 @@ Subscription.prototype.delete = function(callback) {
* subscription.pull(function(err, messages) {
* // messages = [
* // {
* // ackId: '', // ID used to acknowledge its receival.
* // id: '', // Unique message ID.
* // data: '' // Contents of the message.
* // ackId: '', // ID used to acknowledge its receival.
* // id: '', // Unique message ID.
* // data: '', // Contents of the message.
* // attributes: {} // Attributes of the message.
* // },
* // // ...
* // ]
Expand All @@ -329,9 +333,8 @@ Subscription.prototype.delete = function(callback) {
* subscription.pull(opts, function(err, messages) {});
*/
Subscription.prototype.pull = function(options, callback) {
var that = this;
var self = this;
var MAX_EVENTS_LIMIT = 1000;
var apiEndpoint = 'subscriptions/pullBatch';

if (!callback) {
callback = options;
Expand All @@ -343,26 +346,26 @@ Subscription.prototype.pull = function(options, callback) {
}

var body = {
subscription: this.name,
returnImmediately: !!options.returnImmediately,
maxEvents: options.maxResults
maxMessages: options.maxResults
};

this.makeReq_('POST', apiEndpoint, null, body, function(err, response) {
var path = this.name + ':pull';
this.makeReq_('POST', path, null, body, function(err, response) {
if (err) {
callback(err);
return;
}

var messages = response.pullResponses || [response];
var messages = response.receivedMessages || [];
messages = messages.map(Subscription.formatMessage_);

if (that.autoAck) {
if (self.autoAck) {
var ackIds = messages.map(function(message) {
return message.ackId;
});

that.ack(ackIds, function(err) {
self.ack(ackIds, function(err) {
callback(err, messages);
});
} else {
Expand All @@ -371,4 +374,35 @@ Subscription.prototype.pull = function(options, callback) {
});
};

/**
* Modify the ack deadline for a specific message. This method is useful to
* indicate that more time is needed to process a message by the subscriber, or
* to make the message available for redelivery if the processing was
* interrupted.
*
* @param {object} options - The configuration object.
* @param {number} options.ackId - The ack id to change.
* @param {number} options.seconds - Number of seconds after call is made to
* set the deadline of the ack.
* @param {Function=} callback - The callback function.
*
* @example
* var options = {
* ackId: 123,
* seconds: 10 // Expire in 10 seconds from call.
* };
* subscription.setAckDeadline(options, function(err) {});
*/
Subscription.prototype.setAckDeadline = function(options, callback) {
var body = {
ackId: options.ackId,
ackDeadlineSeconds: options.seconds
};

callback = callback || util.noop;

var path = this.name + ':modifyAckDeadline';
this.makeReq_('POST', path, null, body, callback);
};

module.exports = Subscription;
Loading

0 comments on commit 7fb3c17

Please sign in to comment.