1 /** 2 OBSOLETE: This provides a kind of real time updates that can be consumed 3 by javascript (and probably other things eventually). Superseded by 4 new functionality built into [arsd.cgi]. 5 6 First, you compile the server app. dmd -version=standalone_rtud -version=rtud_daemon 7 8 Run it. It's pretty generic; probably don't have to modify it 9 but you always can. It's useful to have a long running process 10 anyway. 11 12 But then you'll want an intermediary between this and the javascript. 13 Use the handleListenerGateway() function in your cgi app for that. 14 You can pass it a channel prefix for things like user ids. 15 16 In your javascript, use EventListener("path to gateway"); 17 And addEventListener(type, function, false); 18 19 Note: this javascript does not work in all browsers, but 20 real time updates should really be optional anyway. 21 22 I might add a traditional ajax fallback but still, it's all 23 js so be sure it's non-essential if possible. 24 25 26 And in your app, as events happen, use in D: 27 auto stream = new UpdateStream(channel); 28 stream.sendMessage(type, message); 29 30 and the helper app will push it all out. You might want to wrap 31 some of this in try/catch since if the helper app dies, this will 32 throw since it can't connect. 33 34 35 I found using user names as channels is good stuff. Then your JS 36 doesn't provide a channel at all - your helper app gives it through 37 the channel prefix argument. 38 */ 39 module arsd.rtud; 40 41 import std.string; 42 import std.array : replace; 43 import std.conv; 44 import std.date; 45 46 47 class UpdateStream { 48 File net; 49 string channel; 50 51 this(string channel) { 52 net = openNetwork("localhost", 7071); 53 this.channel = channel; 54 } 55 56 ~this() { 57 net.close(); 58 } 59 60 void deleteMessage(string messageId) { 61 import arsd.cgi; // : encodeVariables; 62 string message = encodeVariables([ 63 "id" : messageId, 64 "operation" : "delete" 65 ]); 66 67 net.writeln(message); 68 net.flush(); 69 } 70 71 void sendMessage(string eventType, string messageText, long ttl = 2500) { 72 import arsd.cgi; // : encodeVariables; 73 string message = encodeVariables([ 74 //"operation" : "post", 75 //"id" : ????, 76 "channel" : channel, 77 "type" : eventType, 78 "data" : messageText, 79 "ttl" : to!string(ttl) 80 ]); 81 82 net.writeln(message); 83 net.flush(); 84 } 85 } 86 87 /+ 88 if("channels" in message) { 89 if("last-message-id" in message) 90 if("minimum-time" in message) 91 if("close-time" in message) 92 +/ 93 94 version(D_Version2) { 95 static import linux = core.sys.posix.unistd; 96 static import sock = core.sys.posix.sys.socket; 97 } else { 98 static import linux = std.c.linux.linux; 99 static import sock = std.c.linux.socket; 100 } 101 102 int openNetworkFd(string host, ushort port) { 103 import std.exception; 104 auto h = enforce( sock.gethostbyname(std..string.toStringz(host)), 105 new StdioException("gethostbyname")); 106 107 int s = sock.socket(sock.AF_INET, sock.SOCK_STREAM, 0); 108 enforce(s != -1, new StdioException("socket")); 109 110 scope(failure) { 111 linux.close(s); 112 } 113 114 sock.sockaddr_in addr; 115 116 addr.sin_family = sock.AF_INET; 117 addr.sin_port = sock.htons(port); 118 std.c..string.memcpy(&addr.sin_addr.s_addr, h.h_addr, h.h_length); 119 120 enforce(sock.connect(s, cast(sock.sockaddr*) &addr, addr.sizeof) != -1, 121 new StdioException("Connect failed")); 122 123 return s; 124 } 125 126 void writeToFd(int fd, string s) { 127 again: 128 auto num = linux.write(fd, s.ptr, s.length); 129 if(num < 0) 130 throw new Exception("couldn't write"); 131 if(num == 0) 132 return; 133 s = s[num .. $]; 134 if(s.length) 135 goto again; 136 } 137 138 __gshared bool deathRequested = false; 139 extern(C) 140 void requestDeath(int sig) { 141 deathRequested = true; 142 } 143 144 import arsd.cgi; 145 /// The throttledConnection param is useful for helping to get 146 /// around browser connection limitations. 147 148 /// If the user opens a bunch of tabs, these long standing 149 /// connections can hit the per-host connection limit, breaking 150 /// navigation until the connection times out. 151 152 /// The throttle option sets a long retry period and polls 153 /// instead of waits. This sucks, but sucks less than your whole 154 /// site hanging because the browser is queuing your connections! 155 int handleListenerGateway(Cgi cgi, string channelPrefix, bool throttledConnection = false) { 156 cgi.setCache(false); 157 158 import core.sys.posix.signal; 159 sigaction_t act; 160 // I want all zero everywhere else; the read() must not automatically restart for this to work. 161 act.sa_handler = &requestDeath; 162 163 if(linux.sigaction(linux.SIGTERM, &act, null) != 0) 164 throw new Exception("sig err"); 165 166 auto f = openNetworkFd("localhost", 7070); 167 scope(exit) linux.close(f); 168 169 string[string] variables; 170 171 variables["channel"] = channelPrefix ~ ("channel" in cgi.get ? cgi.get["channel"] : ""); 172 if("minimum-time" in cgi.get) 173 variables["minimum-time"] = cgi.get["minimum-time"]; 174 if("last-message-id" in cgi.get) 175 variables["last-message-id"] = cgi.get["last-message-id"]; 176 177 bool isSse; 178 179 if(cgi.accept == "text/event-stream") { 180 cgi.setResponseContentType("text/event-stream"); 181 isSse = true; 182 if(cgi.lastEventId.length) 183 variables["last-message-id"] = cgi.lastEventId; 184 185 if(throttledConnection) { 186 cgi.write("retry: 15000\n"); 187 } else { 188 cgi.write(":\n"); // the comment ensures apache doesn't skip us 189 } 190 191 cgi.flush(); // sending the headers along 192 } else { 193 // gotta handle it as ajax polling 194 variables["close-time"] = "0"; // ask for long polling 195 } 196 197 198 if(throttledConnection) 199 variables["close-time"] = "-1"; // close immediately 200 201 writeToFd(f, encodeVariables(variables) ~ "\n"); 202 203 string wegot; 204 205 string[4096] buffer; 206 207 for(; !deathRequested ;) { 208 auto num = linux.read(f, buffer.ptr, buffer.length); 209 if(num < 0) 210 throw new Exception("read error"); 211 if(num == 0) 212 break; 213 214 auto chunk = buffer[0 .. num]; 215 if(isSse) { 216 cgi.write(chunk); 217 cgi.flush(); 218 } else { 219 wegot ~= cast(string) chunk; 220 } 221 } 222 223 // this is to support older browsers 224 if(!isSse && !deathRequested) { 225 // we have to parse it out and reformat for plain cgi... 226 auto lol = parseMessages(wegot); 227 //cgi.setResponseContentType("text/json"); 228 // FIXME gotta reorganize my json stuff 229 //cgi.write(toJson(lol)); 230 return 1; 231 } 232 233 return 0; 234 } 235 236 struct Message { 237 string type; 238 string id; 239 string data; 240 long timestamp; 241 long ttl; 242 243 string operation; 244 } 245 246 247 Message[] getMessages(string channel, string eventTypeFilter = null, long maxAge = 0) { 248 auto f = openNetworkFd("localhost", 7070); 249 scope(exit) linux.close(f); 250 251 string[string] variables; 252 253 variables["channel"] = channel; 254 if(maxAge) 255 variables["minimum-time"] = to!string(getUtcTime() - maxAge); 256 257 variables["close-time"] = "-1"; // close immediately 258 259 writeToFd(f, encodeVariables(variables) ~ "\n"); 260 261 string wegot; 262 263 string[4096] buffer; 264 265 for(;;) { 266 auto num = linux.read(f, buffer.ptr, buffer.length); 267 if(num < 0) 268 throw new Exception("read error"); 269 if(num == 0) 270 break; 271 272 auto chunk = buffer[0 .. num]; 273 wegot ~= cast(string) chunk; 274 } 275 276 return parseMessages(wegot, eventTypeFilter); 277 } 278 279 Message[] parseMessages(string wegot, string eventTypeFilter = null) { 280 // gotta parse this since rtud writes out the format for browsers 281 Message[] ret; 282 foreach(message; wegot.split("\n\n")) { 283 Message m; 284 foreach(line; message.split("\n")) { 285 if(line.length == 0) 286 throw new Exception("wtf"); 287 if(line[0] == ':') 288 line = line[1 .. $]; 289 290 if(line.length == 0) 291 continue; // just an empty comment 292 293 auto idx = line.indexOf(":"); 294 if(idx == -1) 295 continue; // probably just a comment 296 297 if(idx + 2 > line.length) 298 continue; // probably just a comment too 299 300 auto name = line[0 .. idx]; 301 auto data = line[idx + 2 .. $]; 302 303 switch(name) { 304 default: break; // do nothing 305 case "timestamp": 306 if(data.length) 307 m.timestamp = to!long(data); 308 break; 309 case "ttl": 310 if(data.length) 311 m.ttl = to!long(data); 312 break; 313 case "operation": 314 m.operation = data; 315 break; 316 case "id": 317 m.id = data; 318 break; 319 case "event": 320 m.type = data; 321 break; 322 case "data": 323 m.data ~= data; 324 break; 325 } 326 } 327 if(eventTypeFilter is null || eventTypeFilter == m.type) 328 ret ~= m; 329 } 330 331 return ret; 332 } 333 334 335 version(rtud_daemon) : 336 337 import arsd.netman; 338 339 // Real time update daemon 340 /* 341 You push messages out to channels, where they are held for a certain length of time. 342 343 It can also do state with listener updates. 344 345 Clients ask for messages since a time, and if there are none, you hold the connection until something arrives. 346 347 348 There should be D and Javascript apis for pushing and receiving. 349 350 351 JS: 352 353 var updateObject = RealTimeUpdate(); 354 355 updateObject.someMessage = function(msg) { 356 // react to it 357 } 358 359 updateObject.listen(channel); 360 361 updateObject.send(message, args); // probably shouldn't need this from JS 362 */ 363 364 /* 365 Incoming Packet format is x-www-urlencoded. There must be no new lines 366 in there - be sure to url encode them. 367 368 A message is separated by newlines. 369 */ 370 371 class RtudConnection : Connection { 372 RealTimeUpdateDaemon daemon; 373 374 this(RealTimeUpdateDaemon daemon) { 375 this.daemon = daemon; 376 } 377 378 override void onDataReceived() { 379 import arsd.cgi;// : decodeVariables; 380 try_again: 381 auto data = cast(string) read(); 382 383 auto index = data.indexOf("\n"); 384 if(index == -1) 385 return; // wait for more data 386 387 auto messageRaw = data[0 .. index]; 388 changeReadPosition(index + 1); 389 390 auto message = decodeVariables(messageRaw); 391 392 handleMessage(message); 393 goto try_again; 394 } 395 396 invariant() { 397 assert(daemon !is null); 398 } 399 400 abstract void handleMessage(string[][string] message); 401 } 402 403 class NotificationConnection : RtudConnection { 404 this(RealTimeUpdateDaemon daemon) { 405 super(daemon); 406 closeTime = long.max; 407 } 408 409 long closeTime; 410 411 /// send: what channels you're interested in, a minimum time, 412 /// and a close time. 413 /// if the close time is negative, you are just polling curiously. 414 /// if it is zero, it will close after your next batch. (long polling) 415 /// anything else stays open for as long as it can in there. 416 417 override void handleMessage(string[][string] message) { 418 Channel*[] channels; 419 420 if("channels" in message) { 421 foreach(ch; message["channels"]) { 422 auto channel = daemon.getChannel(ch); 423 channels ~= channel; 424 channel.subscribeTo(this); 425 } 426 } 427 428 if("channel" in message) { 429 auto channel = daemon.getChannel(message["channel"][$-1]); 430 channels ~= channel; 431 channel.subscribeTo(this); 432 } 433 434 import std.algorithm; 435 import std.range; 436 437 Message*[] backMessages; 438 439 if("last-message-id" in message) { 440 auto lastMessageId = message["last-message-id"][$-1]; 441 foreach(channel; channels) 442 backMessages ~= channel.messages; 443 444 auto bm = sort!"a.timestamp < b.timestamp"(backMessages); 445 446 backMessages = array(find!("a.id == b")(bm, lastMessageId)); 447 while(backMessages.length && backMessages[0].id == lastMessageId) 448 backMessages = backMessages[1 .. $]; // the last message is the one they got 449 450 //writeln("backed up from ", lastMessageId, " is"); 451 //foreach(msg; backMessages) 452 //writeln(*msg); 453 } else if("minimum-time" in message) { 454 foreach(channel; channels) 455 backMessages ~= channel.messages; 456 457 auto bm = sort!"a.timestamp < b.timestamp"(backMessages); 458 459 backMessages = array(find!("a.timestamp >= b")(bm, to!long(message["minimum-time"][$-1]))); 460 } 461 462 if("close-time" in message) 463 closeTime = to!long(message["close-time"][$-1]); 464 465 // send the back messages immediately 466 daemon.writeMessagesTo(backMessages, this, "backed-up"); 467 468 // if(closeTime > 0 && closeTime != long.max) 469 // closeTime = getUtcTime() + closeTime; // FIXME: do i use this? Should I use this? 470 } 471 472 override void onDisconnect() { 473 daemon.removeConnection(this); 474 } 475 476 } 477 478 class DataConnection : RtudConnection { 479 this(RealTimeUpdateDaemon daemon) { 480 super(daemon); 481 } 482 483 override void handleMessage(string[][string] message) { 484 string getStr(string key, string def) { 485 if(key in message) { 486 auto s = message[key][$ - 1]; 487 if(s.length) 488 return s; 489 } 490 return def; 491 } 492 493 string operation = getStr("operation", "post"); 494 495 Message* m = daemon.getMessage(getStr("id", null)); 496 switch(operation) { 497 default: throw new Exception("unknown operation " ~ operation); break; 498 case "delete": 499 daemon.deleteMessage(m); 500 break; 501 case "edit": 502 case "post": 503 // we have to create the message and send it out 504 m.type = getStr("type", "message"); 505 m.data = getStr("data", ""); 506 m.timestamp = to!long(getStr("timestamp", to!string(getUtcTime()))); 507 m.ttl = to!long(getStr("ttl", "1000")); 508 } 509 510 assert(m !is null); 511 512 if("channels" in message) 513 foreach(ch; message["channels"]) { 514 auto channel = daemon.getChannel(ch); 515 assert(channel !is null); 516 channel.writeMessage(m, operation); 517 } 518 519 if("channel" in message) { 520 auto channel = daemon.getChannel(message["channel"][$-1]); 521 channel.writeMessage(m, operation); 522 } 523 } 524 } 525 526 struct Channel { 527 string id; 528 Message*[] messages; 529 530 // a poor man's set... 531 NotificationConnection[NotificationConnection] listeningConnections; 532 533 534 RealTimeUpdateDaemon daemon; 535 536 void writeMessage(Message* message, string operation) { 537 messages ~= message; 538 foreach(k, v; listeningConnections) 539 daemon.writeMessagesTo([message], v, operation); 540 } 541 542 void subscribeTo(NotificationConnection c) { 543 listeningConnections[c] = c; 544 } 545 } 546 547 548 class RealTimeUpdateDaemon : NetworkManager { 549 this() { 550 super(); 551 setConnectionSpawner(7070, &createNotificationConnection); 552 listen(7070); 553 setConnectionSpawner(7071, &createDataConnection); 554 listen(7071); 555 } 556 557 private Channel*[string] channels; 558 private Message*[string] messages; 559 560 Message* getMessage(string id) { 561 if(id.length && id in messages) 562 return messages[id]; 563 564 if(id.length == 0) 565 id = to!string(getUtcTime()); 566 567 longerId: 568 if(id in messages) { 569 id ~= "-"; 570 goto longerId; 571 } 572 573 574 auto message = new Message; 575 message.id = id; 576 messages[id] = message; 577 578 //writeln("NEW MESSAGE: ", *message); 579 580 return message; 581 } 582 583 void deleteMessage(Message* m) { 584 messages.remove(m.id); 585 foreach(k, v; channels) 586 foreach(i, msg; v.messages) { 587 if(msg is m) { 588 v.messages = v.messages[0 .. i] ~ v.messages[i + 1 .. $]; 589 break; 590 } 591 } 592 } 593 594 Channel* getChannel(string id) { 595 if(id in channels) 596 return channels[id]; 597 598 auto c = new Channel; 599 c.daemon = this; 600 c.id = id; 601 channels[id] = c; 602 return c; 603 } 604 605 void writeMessagesTo(Message*[] messages, NotificationConnection connection, string operation) { 606 foreach(messageMain; messages) { 607 if(messageMain.timestamp + messageMain.ttl < getUtcTime) 608 deleteMessage(messageMain); // too old, kill it 609 Message message = *messageMain; 610 message.operation = operation; 611 612 // this should never happen, but just in case 613 replace(message.type, "\n", ""); 614 connection.write(":timestamp: " ~ to!string(message.timestamp) ~ "\n"); 615 connection.write(":ttl: " ~ to!string(message.ttl) ~ "\n"); 616 connection.write(":operation: " ~ message.operation ~ "\n"); 617 if(message.id.length) 618 connection.write("id: " ~ message.id ~ "\n"); 619 connection.write("event: " ~ message.type ~ "\n"); 620 connection.write("data: " ~ replace(message.data, "\n", "\ndata: ") ~ "\n"); 621 connection.write("\n"); 622 } 623 624 if(connection.closeTime <= 0) // FIXME: other times? 625 if(connection.closeTime != 0 || messages.length) 626 connection.disconnect(); // note this actually queues a disconnect, so we cool 627 } 628 629 void removeConnection(NotificationConnection connection) { 630 foreach(channel; channels) 631 channel.listeningConnections.remove(connection); 632 } 633 634 Connection createNotificationConnection() { 635 return new NotificationConnection(this); 636 } 637 638 Connection createDataConnection() { 639 return new DataConnection(this); 640 } 641 } 642 643 void rtudMain() { 644 auto netman = new RealTimeUpdateDaemon; 645 646 bool proceed = true; 647 648 while(proceed) 649 try 650 proceed = netman.proceed(); 651 catch(ConnectionException e) { 652 writeln(e.toString()); 653 e.c.disconnectNow(); 654 } 655 catch(Throwable e) { 656 657 658 writeln(e.toString()); 659 } 660 } 661 662 version(standalone_rtud) 663 void main() { 664 rtudMain(); 665 }