1 /** 2 This provides a kind of real time updates that can be consumed 3 by javascript (and probably other things eventually). 4 5 First, you compile the server app. dmd -version=standalone_rtud -version=rtud_daemon 6 7 Run it. It's pretty generic; probably don't have to modify it 8 but you always can. It's useful to have a long running process 9 anyway. 10 11 But then you'll want an intermediary between this and the javascript. 12 Use the handleListenerGateway() function in your cgi app for that. 13 You can pass it a channel prefix for things like user ids. 14 15 In your javascript, use EventListener("path to gateway"); 16 And addEventListener(type, function, false); 17 18 Note: this javascript does not work in all browsers, but 19 real time updates should really be optional anyway. 20 21 I might add a traditional ajax fallback but still, it's all 22 js so be sure it's non-essential if possible. 23 24 25 And in your app, as events happen, use in D: 26 auto stream = new UpdateStream(channel); 27 stream.sendMessage(type, message); 28 29 and the helper app will push it all out. You might want to wrap 30 some of this in try/catch since if the helper app dies, this will 31 throw since it can't connect. 32 33 34 I found using user names as channels is good stuff. Then your JS 35 doesn't provide a channel at all - your helper app gives it through 36 the channel prefix argument. 37 */ 38 module arsd.rtud; 39 40 import std.string; 41 import std.array : replace; 42 import std.conv; 43 import std.date; 44 45 46 class UpdateStream { 47 File net; 48 string channel; 49 50 this(string channel) { 51 net = openNetwork("localhost", 7071); 52 this.channel = channel; 53 } 54 55 ~this() { 56 net.close(); 57 } 58 59 void deleteMessage(string messageId) { 60 import arsd.cgi; // : encodeVariables; 61 string message = encodeVariables([ 62 "id" : messageId, 63 "operation" : "delete" 64 ]); 65 66 net.writeln(message); 67 net.flush(); 68 } 69 70 void sendMessage(string eventType, string messageText, long ttl = 2500) { 71 import arsd.cgi; // : encodeVariables; 72 string message = encodeVariables([ 73 //"operation" : "post", 74 //"id" : ????, 75 "channel" : channel, 76 "type" : eventType, 77 "data" : messageText, 78 "ttl" : to!string(ttl) 79 ]); 80 81 net.writeln(message); 82 net.flush(); 83 } 84 } 85 86 /+ 87 if("channels" in message) { 88 if("last-message-id" in message) 89 if("minimum-time" in message) 90 if("close-time" in message) 91 +/ 92 93 version(D_Version2) { 94 static import linux = core.sys.posix.unistd; 95 static import sock = core.sys.posix.sys.socket; 96 } else { 97 static import linux = std.c.linux.linux; 98 static import sock = std.c.linux.socket; 99 } 100 101 int openNetworkFd(string host, ushort port) { 102 import std.exception; 103 auto h = enforce( sock.gethostbyname(std..string.toStringz(host)), 104 new StdioException("gethostbyname")); 105 106 int s = sock.socket(sock.AF_INET, sock.SOCK_STREAM, 0); 107 enforce(s != -1, new StdioException("socket")); 108 109 scope(failure) { 110 linux.close(s); 111 } 112 113 sock.sockaddr_in addr; 114 115 addr.sin_family = sock.AF_INET; 116 addr.sin_port = sock.htons(port); 117 std.c..string.memcpy(&addr.sin_addr.s_addr, h.h_addr, h.h_length); 118 119 enforce(sock.connect(s, cast(sock.sockaddr*) &addr, addr.sizeof) != -1, 120 new StdioException("Connect failed")); 121 122 return s; 123 } 124 125 void writeToFd(int fd, string s) { 126 again: 127 auto num = linux.write(fd, s.ptr, s.length); 128 if(num < 0) 129 throw new Exception("couldn't write"); 130 if(num == 0) 131 return; 132 s = s[num .. $]; 133 if(s.length) 134 goto again; 135 } 136 137 __gshared bool deathRequested = false; 138 extern(C) 139 void requestDeath(int sig) { 140 deathRequested = true; 141 } 142 143 import arsd.cgi; 144 /// The throttledConnection param is useful for helping to get 145 /// around browser connection limitations. 146 147 /// If the user opens a bunch of tabs, these long standing 148 /// connections can hit the per-host connection limit, breaking 149 /// navigation until the connection times out. 150 151 /// The throttle option sets a long retry period and polls 152 /// instead of waits. This sucks, but sucks less than your whole 153 /// site hanging because the browser is queuing your connections! 154 int handleListenerGateway(Cgi cgi, string channelPrefix, bool throttledConnection = false) { 155 cgi.setCache(false); 156 157 import core.sys.posix.signal; 158 sigaction_t act; 159 // I want all zero everywhere else; the read() must not automatically restart for this to work. 160 act.sa_handler = &requestDeath; 161 162 if(linux.sigaction(linux.SIGTERM, &act, null) != 0) 163 throw new Exception("sig err"); 164 165 auto f = openNetworkFd("localhost", 7070); 166 scope(exit) linux.close(f); 167 168 string[string] variables; 169 170 variables["channel"] = channelPrefix ~ ("channel" in cgi.get ? cgi.get["channel"] : ""); 171 if("minimum-time" in cgi.get) 172 variables["minimum-time"] = cgi.get["minimum-time"]; 173 if("last-message-id" in cgi.get) 174 variables["last-message-id"] = cgi.get["last-message-id"]; 175 176 bool isSse; 177 178 if(cgi.accept == "text/event-stream") { 179 cgi.setResponseContentType("text/event-stream"); 180 isSse = true; 181 if(cgi.lastEventId.length) 182 variables["last-message-id"] = cgi.lastEventId; 183 184 if(throttledConnection) { 185 cgi.write("retry: 15000\n"); 186 } else { 187 cgi.write(":\n"); // the comment ensures apache doesn't skip us 188 } 189 190 cgi.flush(); // sending the headers along 191 } else { 192 // gotta handle it as ajax polling 193 variables["close-time"] = "0"; // ask for long polling 194 } 195 196 197 if(throttledConnection) 198 variables["close-time"] = "-1"; // close immediately 199 200 writeToFd(f, encodeVariables(variables) ~ "\n"); 201 202 string wegot; 203 204 string[4096] buffer; 205 206 for(; !deathRequested ;) { 207 auto num = linux.read(f, buffer.ptr, buffer.length); 208 if(num < 0) 209 throw new Exception("read error"); 210 if(num == 0) 211 break; 212 213 auto chunk = buffer[0 .. num]; 214 if(isSse) { 215 cgi.write(chunk); 216 cgi.flush(); 217 } else { 218 wegot ~= cast(string) chunk; 219 } 220 } 221 222 // this is to support older browsers 223 if(!isSse && !deathRequested) { 224 // we have to parse it out and reformat for plain cgi... 225 auto lol = parseMessages(wegot); 226 //cgi.setResponseContentType("text/json"); 227 // FIXME gotta reorganize my json stuff 228 //cgi.write(toJson(lol)); 229 return 1; 230 } 231 232 return 0; 233 } 234 235 struct Message { 236 string type; 237 string id; 238 string data; 239 long timestamp; 240 long ttl; 241 242 string operation; 243 } 244 245 246 Message[] getMessages(string channel, string eventTypeFilter = null, long maxAge = 0) { 247 auto f = openNetworkFd("localhost", 7070); 248 scope(exit) linux.close(f); 249 250 string[string] variables; 251 252 variables["channel"] = channel; 253 if(maxAge) 254 variables["minimum-time"] = to!string(getUtcTime() - maxAge); 255 256 variables["close-time"] = "-1"; // close immediately 257 258 writeToFd(f, encodeVariables(variables) ~ "\n"); 259 260 string wegot; 261 262 string[4096] buffer; 263 264 for(;;) { 265 auto num = linux.read(f, buffer.ptr, buffer.length); 266 if(num < 0) 267 throw new Exception("read error"); 268 if(num == 0) 269 break; 270 271 auto chunk = buffer[0 .. num]; 272 wegot ~= cast(string) chunk; 273 } 274 275 return parseMessages(wegot, eventTypeFilter); 276 } 277 278 Message[] parseMessages(string wegot, string eventTypeFilter = null) { 279 // gotta parse this since rtud writes out the format for browsers 280 Message[] ret; 281 foreach(message; wegot.split("\n\n")) { 282 Message m; 283 foreach(line; message.split("\n")) { 284 if(line.length == 0) 285 throw new Exception("wtf"); 286 if(line[0] == ':') 287 line = line[1 .. $]; 288 289 if(line.length == 0) 290 continue; // just an empty comment 291 292 auto idx = line.indexOf(":"); 293 if(idx == -1) 294 continue; // probably just a comment 295 296 if(idx + 2 > line.length) 297 continue; // probably just a comment too 298 299 auto name = line[0 .. idx]; 300 auto data = line[idx + 2 .. $]; 301 302 switch(name) { 303 default: break; // do nothing 304 case "timestamp": 305 if(data.length) 306 m.timestamp = to!long(data); 307 break; 308 case "ttl": 309 if(data.length) 310 m.ttl = to!long(data); 311 break; 312 case "operation": 313 m.operation = data; 314 break; 315 case "id": 316 m.id = data; 317 break; 318 case "event": 319 m.type = data; 320 break; 321 case "data": 322 m.data ~= data; 323 break; 324 } 325 } 326 if(eventTypeFilter is null || eventTypeFilter == m.type) 327 ret ~= m; 328 } 329 330 return ret; 331 } 332 333 334 version(rtud_daemon) : 335 336 import arsd.netman; 337 338 // Real time update daemon 339 /* 340 You push messages out to channels, where they are held for a certain length of time. 341 342 It can also do state with listener updates. 343 344 Clients ask for messages since a time, and if there are none, you hold the connection until something arrives. 345 346 347 There should be D and Javascript apis for pushing and receiving. 348 349 350 JS: 351 352 var updateObject = RealTimeUpdate(); 353 354 updateObject.someMessage = function(msg) { 355 // react to it 356 } 357 358 updateObject.listen(channel); 359 360 updateObject.send(message, args); // probably shouldn't need this from JS 361 */ 362 363 /* 364 Incoming Packet format is x-www-urlencoded. There must be no new lines 365 in there - be sure to url encode them. 366 367 A message is separated by newlines. 368 */ 369 370 class RtudConnection : Connection { 371 RealTimeUpdateDaemon daemon; 372 373 this(RealTimeUpdateDaemon daemon) { 374 this.daemon = daemon; 375 } 376 377 override void onDataReceived() { 378 import arsd.cgi;// : decodeVariables; 379 try_again: 380 auto data = cast(string) read(); 381 382 auto index = data.indexOf("\n"); 383 if(index == -1) 384 return; // wait for more data 385 386 auto messageRaw = data[0 .. index]; 387 changeReadPosition(index + 1); 388 389 auto message = decodeVariables(messageRaw); 390 391 handleMessage(message); 392 goto try_again; 393 } 394 395 invariant() { 396 assert(daemon !is null); 397 } 398 399 abstract void handleMessage(string[][string] message); 400 } 401 402 class NotificationConnection : RtudConnection { 403 this(RealTimeUpdateDaemon daemon) { 404 super(daemon); 405 closeTime = long.max; 406 } 407 408 long closeTime; 409 410 /// send: what channels you're interested in, a minimum time, 411 /// and a close time. 412 /// if the close time is negative, you are just polling curiously. 413 /// if it is zero, it will close after your next batch. (long polling) 414 /// anything else stays open for as long as it can in there. 415 416 override void handleMessage(string[][string] message) { 417 Channel*[] channels; 418 419 if("channels" in message) { 420 foreach(ch; message["channels"]) { 421 auto channel = daemon.getChannel(ch); 422 channels ~= channel; 423 channel.subscribeTo(this); 424 } 425 } 426 427 if("channel" in message) { 428 auto channel = daemon.getChannel(message["channel"][$-1]); 429 channels ~= channel; 430 channel.subscribeTo(this); 431 } 432 433 import std.algorithm; 434 import std.range; 435 436 Message*[] backMessages; 437 438 if("last-message-id" in message) { 439 auto lastMessageId = message["last-message-id"][$-1]; 440 foreach(channel; channels) 441 backMessages ~= channel.messages; 442 443 auto bm = sort!"a.timestamp < b.timestamp"(backMessages); 444 445 backMessages = array(find!("a.id == b")(bm, lastMessageId)); 446 while(backMessages.length && backMessages[0].id == lastMessageId) 447 backMessages = backMessages[1 .. $]; // the last message is the one they got 448 449 //writeln("backed up from ", lastMessageId, " is"); 450 //foreach(msg; backMessages) 451 //writeln(*msg); 452 } else if("minimum-time" in message) { 453 foreach(channel; channels) 454 backMessages ~= channel.messages; 455 456 auto bm = sort!"a.timestamp < b.timestamp"(backMessages); 457 458 backMessages = array(find!("a.timestamp >= b")(bm, to!long(message["minimum-time"][$-1]))); 459 } 460 461 if("close-time" in message) 462 closeTime = to!long(message["close-time"][$-1]); 463 464 // send the back messages immediately 465 daemon.writeMessagesTo(backMessages, this, "backed-up"); 466 467 // if(closeTime > 0 && closeTime != long.max) 468 // closeTime = getUtcTime() + closeTime; // FIXME: do i use this? Should I use this? 469 } 470 471 override void onDisconnect() { 472 daemon.removeConnection(this); 473 } 474 475 } 476 477 class DataConnection : RtudConnection { 478 this(RealTimeUpdateDaemon daemon) { 479 super(daemon); 480 } 481 482 override void handleMessage(string[][string] message) { 483 string getStr(string key, string def) { 484 if(key in message) { 485 auto s = message[key][$ - 1]; 486 if(s.length) 487 return s; 488 } 489 return def; 490 } 491 492 string operation = getStr("operation", "post"); 493 494 Message* m = daemon.getMessage(getStr("id", null)); 495 switch(operation) { 496 default: throw new Exception("unknown operation " ~ operation); break; 497 case "delete": 498 daemon.deleteMessage(m); 499 break; 500 case "edit": 501 case "post": 502 // we have to create the message and send it out 503 m.type = getStr("type", "message"); 504 m.data = getStr("data", ""); 505 m.timestamp = to!long(getStr("timestamp", to!string(getUtcTime()))); 506 m.ttl = to!long(getStr("ttl", "1000")); 507 } 508 509 assert(m !is null); 510 511 if("channels" in message) 512 foreach(ch; message["channels"]) { 513 auto channel = daemon.getChannel(ch); 514 assert(channel !is null); 515 channel.writeMessage(m, operation); 516 } 517 518 if("channel" in message) { 519 auto channel = daemon.getChannel(message["channel"][$-1]); 520 channel.writeMessage(m, operation); 521 } 522 } 523 } 524 525 struct Channel { 526 string id; 527 Message*[] messages; 528 529 // a poor man's set... 530 NotificationConnection[NotificationConnection] listeningConnections; 531 532 533 RealTimeUpdateDaemon daemon; 534 535 void writeMessage(Message* message, string operation) { 536 messages ~= message; 537 foreach(k, v; listeningConnections) 538 daemon.writeMessagesTo([message], v, operation); 539 } 540 541 void subscribeTo(NotificationConnection c) { 542 listeningConnections[c] = c; 543 } 544 } 545 546 547 class RealTimeUpdateDaemon : NetworkManager { 548 this() { 549 super(); 550 setConnectionSpawner(7070, &createNotificationConnection); 551 listen(7070); 552 setConnectionSpawner(7071, &createDataConnection); 553 listen(7071); 554 } 555 556 private Channel*[string] channels; 557 private Message*[string] messages; 558 559 Message* getMessage(string id) { 560 if(id.length && id in messages) 561 return messages[id]; 562 563 if(id.length == 0) 564 id = to!string(getUtcTime()); 565 566 longerId: 567 if(id in messages) { 568 id ~= "-"; 569 goto longerId; 570 } 571 572 573 auto message = new Message; 574 message.id = id; 575 messages[id] = message; 576 577 //writeln("NEW MESSAGE: ", *message); 578 579 return message; 580 } 581 582 void deleteMessage(Message* m) { 583 messages.remove(m.id); 584 foreach(k, v; channels) 585 foreach(i, msg; v.messages) { 586 if(msg is m) { 587 v.messages = v.messages[0 .. i] ~ v.messages[i + 1 .. $]; 588 break; 589 } 590 } 591 } 592 593 Channel* getChannel(string id) { 594 if(id in channels) 595 return channels[id]; 596 597 auto c = new Channel; 598 c.daemon = this; 599 c.id = id; 600 channels[id] = c; 601 return c; 602 } 603 604 void writeMessagesTo(Message*[] messages, NotificationConnection connection, string operation) { 605 foreach(messageMain; messages) { 606 if(messageMain.timestamp + messageMain.ttl < getUtcTime) 607 deleteMessage(messageMain); // too old, kill it 608 Message message = *messageMain; 609 message.operation = operation; 610 611 // this should never happen, but just in case 612 replace(message.type, "\n", ""); 613 connection.write(":timestamp: " ~ to!string(message.timestamp) ~ "\n"); 614 connection.write(":ttl: " ~ to!string(message.ttl) ~ "\n"); 615 connection.write(":operation: " ~ message.operation ~ "\n"); 616 if(message.id.length) 617 connection.write("id: " ~ message.id ~ "\n"); 618 connection.write("event: " ~ message.type ~ "\n"); 619 connection.write("data: " ~ replace(message.data, "\n", "\ndata: ") ~ "\n"); 620 connection.write("\n"); 621 } 622 623 if(connection.closeTime <= 0) // FIXME: other times? 624 if(connection.closeTime != 0 || messages.length) 625 connection.disconnect(); // note this actually queues a disconnect, so we cool 626 } 627 628 void removeConnection(NotificationConnection connection) { 629 foreach(channel; channels) 630 channel.listeningConnections.remove(connection); 631 } 632 633 Connection createNotificationConnection() { 634 return new NotificationConnection(this); 635 } 636 637 Connection createDataConnection() { 638 return new DataConnection(this); 639 } 640 } 641 642 void rtudMain() { 643 auto netman = new RealTimeUpdateDaemon; 644 645 bool proceed = true; 646 647 while(proceed) 648 try 649 proceed = netman.proceed(); 650 catch(ConnectionException e) { 651 writeln(e.toString()); 652 e.c.disconnectNow(); 653 } 654 catch(Throwable e) { 655 656 657 writeln(e.toString()); 658 } 659 } 660 661 version(standalone_rtud) 662 void main() { 663 rtudMain(); 664 }