streamer.js (8211B)
1 /* 2 ***** BEGIN LICENSE BLOCK ***** 3 4 Copyright © 2016 Center for History and New Media 5 George Mason University, Fairfax, Virginia, USA 6 http://zotero.org 7 8 This file is part of Zotero. 9 10 Zotero is free software: you can redistribute it and/or modify 11 it under the terms of the GNU Affero General Public License as published by 12 the Free Software Foundation, either version 3 of the License, or 13 (at your option) any later version. 14 15 Zotero is distributed in the hope that it will be useful, 16 but WITHOUT ANY WARRANTY; without even the implied warranty of 17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 GNU Affero General Public License for more details. 19 20 You should have received a copy of the GNU Affero General Public License 21 along with Zotero. If not, see <http://www.gnu.org/licenses/>. 22 23 ***** END LICENSE BLOCK ***** 24 */ 25 26 "use strict"; 27 28 29 // Initialized as Zotero.Streamer in zotero.js 30 Zotero.Streamer_Module = function (options = {}) { 31 this.url = options.url; 32 this.apiKey = options.apiKey; 33 34 let observer = { 35 notify: (event, type) => { 36 if (event == 'modify' || event == 'delete') { 37 this._update(); 38 } 39 } 40 }; 41 this._observerID = Zotero.Notifier.registerObserver(observer, ['api-key'], 'streamer'); 42 }; 43 44 Zotero.Streamer_Module.prototype = { 45 _initialized: null, 46 _observerID: null, 47 _socket: null, 48 _ready: false, 49 _reconnect: true, 50 _retry: null, 51 _subscriptions: new Set(), 52 53 54 init: function () { 55 Zotero.Prefs.registerObserver('streaming.enabled', (val) => this._update()); 56 Zotero.Prefs.registerObserver('automaticScraperUpdates', (val) => this._update()); 57 Zotero.Prefs.registerObserver('sync.autoSync', (val) => this._update()); 58 Zotero.uiReadyPromise.then(() => this._update()); 59 }, 60 61 62 _update: async function () { 63 if (!this._isEnabled()) { 64 this._disconnect(); 65 return; 66 } 67 68 // If not connecting or connected, connect now 69 if (!this._socketOpen()) { 70 this._connect(); 71 return; 72 } 73 // If not yet ready for messages, wait until we are, at which point this will be called again 74 if (!this._ready) { 75 return; 76 } 77 78 var apiKey = this.apiKey || (await Zotero.Sync.Data.Local.getAPIKey()); 79 80 var subscriptionsToAdd = []; 81 var subscriptionsToRemove = []; 82 83 if (Zotero.Prefs.get('sync.autoSync') && Zotero.Sync.Runner.enabled) { 84 if (!this._subscriptions.has('sync')) { 85 // Subscribe to all topics accessible to the API key 86 subscriptionsToAdd.push({ apiKey }); 87 } 88 } 89 else if (this._subscriptions.has('sync')) { 90 subscriptionsToRemove.push({ apiKey }); 91 } 92 93 if (Zotero.Prefs.get('automaticScraperUpdates')) { 94 if (!this._subscriptions.has('bundled-files')) { 95 subscriptionsToAdd.push( 96 { 97 topics: ['styles', 'translators'] 98 } 99 ); 100 } 101 } 102 else if (this._subscriptions.has('bundled-files')) { 103 subscriptionsToRemove.push( 104 { 105 topic: 'styles' 106 }, 107 { 108 topic: 'translators' 109 } 110 ); 111 } 112 113 if (subscriptionsToAdd.length) { 114 let data = JSON.stringify({ 115 action: 'createSubscriptions', 116 subscriptions: subscriptionsToAdd 117 }); 118 Zotero.debug("WebSocket message send: " + this._hideAPIKey(data)); 119 this._socket.send(data); 120 } 121 if (subscriptionsToRemove.length) { 122 let data = JSON.stringify({ 123 action: 'deleteSubscriptions', 124 subscriptions: subscriptionsToRemove 125 }); 126 Zotero.debug("WebSocket message send: " + this._hideAPIKey(data)); 127 this._socket.send(data); 128 } 129 }, 130 131 132 _isEnabled: function () { 133 return Zotero.Prefs.get('streaming.enabled') 134 // Only connect if either auto-sync or automatic style/translator updates are enabled 135 && ((Zotero.Prefs.get('sync.autoSync') && Zotero.Sync.Runner.enabled) 136 || Zotero.Prefs.get('automaticScraperUpdates')); 137 }, 138 139 140 _socketOpen: function () { 141 return this._socket && (this._socket.readyState == this._socket.OPEN 142 || this._socket.readyState == this._socket.CONNECTING); 143 }, 144 145 146 _connect: async function () { 147 let url = this.url || Zotero.Prefs.get('streaming.url') || ZOTERO_CONFIG.STREAMING_URL; 148 Zotero.debug(`Connecting to streaming server at ${url}`); 149 150 this._ready = false; 151 this._reconnect = true; 152 153 var window = Cc["@mozilla.org/appshell/appShellService;1"] 154 .getService(Ci.nsIAppShellService).hiddenDOMWindow; 155 this._socket = new window.WebSocket(url, "zotero-streaming-api-v1"); 156 var deferred = Zotero.Promise.defer(); 157 158 this._socket.onopen = () => { 159 Zotero.debug("WebSocket connection opened"); 160 }; 161 162 this._socket.onerror = async function (event) { 163 Zotero.debug("WebSocket error"); 164 }; 165 166 this._socket.onmessage = async function (event) { 167 Zotero.debug("WebSocket message: " + this._hideAPIKey(event.data)); 168 169 let data = JSON.parse(event.data); 170 171 if (data.event == "connected") { 172 this._ready = true; 173 this._update(); 174 } 175 else { 176 this._reconnectGenerator = null; 177 178 if (data.event == "subscriptionsCreated") { 179 for (let s of data.subscriptions) { 180 if (s.apiKey) { 181 this._subscriptions.add('sync'); 182 } 183 else if (s.topics && s.topics.includes('styles')) { 184 this._subscriptions.add('bundled-files'); 185 } 186 } 187 188 for (let error of data.errors) { 189 Zotero.logError(this._hideAPIKey(JSON.stringify(error))); 190 } 191 } 192 else if (data.event == "subscriptionsDeleted") { 193 for (let s of data.subscriptions) { 194 if (s.apiKey) { 195 this._subscriptions.delete('sync'); 196 } 197 else if (s.topics && s.topics.includes('styles')) { 198 this._subscriptions.delete('bundled-files'); 199 } 200 } 201 } 202 // Library added or removed 203 else if (data.event == 'topicAdded' || data.event == 'topicRemoved') { 204 await Zotero.Sync.Runner.sync({ 205 background: true 206 }); 207 } 208 // Library modified 209 else if (data.event == 'topicUpdated') { 210 // Update translators and styles 211 if (data.topic == 'translators' || data.topic == 'styles') { 212 await Zotero.Schema.onUpdateNotification(data.delay); 213 } 214 // Auto-sync 215 else { 216 let library = Zotero.URI.getPathLibrary(data.topic); 217 if (library) { 218 // Ignore if skipped library 219 let skipped = Zotero.Sync.Data.Local.getSkippedLibraries(); 220 if (skipped.includes(library.libraryID)) return; 221 222 if (data.version && data.version == library.libraryVersion) { 223 Zotero.debug("Library is already up to date"); 224 return; 225 } 226 227 await Zotero.Sync.Runner.sync({ 228 background: true, 229 libraries: [library.libraryID] 230 }); 231 } 232 } 233 } 234 // TODO: Handle this in other ways? 235 else if (data.event == 'error') { 236 Zotero.logError(data); 237 } 238 } 239 }.bind(this); 240 241 this._socket.onclose = async function (event) { 242 var msg = `WebSocket connection closed: ${event.code} ${event.reason}`; 243 244 if (event.code != 1000) { 245 Zotero.logError(msg); 246 } 247 else { 248 Zotero.debug(msg); 249 } 250 251 this._subscriptions.clear(); 252 253 if (this._reconnect) { 254 if (event.code >= 4400 && event.code < 4500) { 255 Zotero.debug("Not reconnecting to WebSocket due to client error"); 256 return; 257 } 258 259 if (!this._reconnectGenerator) { 260 let intervals = [ 261 2, 5, 10, 15, 30, // first minute 262 60, 60, 60, 60, // every minute for 4 minutes 263 120, 120, 120, 120, // every 2 minutes for 8 minutes 264 300, 300, // every 5 minutes for 10 minutes 265 600, // 10 minutes 266 1200, // 20 minutes 267 1800, 1800, // 30 minutes for 1 hour 268 3600, 3600, 3600, // every hour for 3 hours 269 14400, 14400, 14400, // every 4 hours for 12 hours 270 86400 // 1 day 271 ].map(i => i * 1000); 272 this._reconnectGenerator = Zotero.Utilities.Internal.delayGenerator(intervals); 273 } 274 await this._reconnectGenerator.next().value; 275 this._update(); 276 } 277 }.bind(this); 278 }, 279 280 281 _hideAPIKey: function (str) { 282 return str.replace(/(apiKey":\s*")[^"]+"/, '$1********"'); 283 }, 284 285 286 _disconnect: function () { 287 this._reconnect = false; 288 this._reconnectGenerator = null; 289 this._subscriptions.clear(); 290 if (this._socket) { 291 this._socket.close(1000); 292 } 293 } 294 };