1 /**
2 	This provides a kind of real time updates that can be consumed
3 	by javascript (and probably other things eventually).
4 
5 	First, you compile the server app. dmd -version=standalone_rtud -version=rtud_daemon
6 
7 	Run it. It's pretty generic; probably don't have to modify it
8 	but you always can. It's useful to have a long running process
9 	anyway.
10 
11 	But then you'll want an intermediary between this and the javascript.
12 	Use the handleListenerGateway() function in your cgi app for that.
13 	You can pass it a channel prefix for things like user ids.
14 
15 	In your javascript, use EventListener("path to gateway");
16 	And addEventListener(type, function, false);
17 
18 	Note: this javascript does not work in all browsers, but
19 	real time updates should really be optional anyway.
20 
21 	I might add a traditional ajax fallback but still, it's all
22 	js so be sure it's non-essential if possible.
23 
24 
25 	And in your app, as events happen, use in D:
26 		auto stream = new UpdateStream(channel);
27 		stream.sendMessage(type, message);
28 
29 	and the helper app will push it all out. You might want to wrap
30 	some of this in try/catch since if the helper app dies, this will
31 	throw since it can't connect.
32 
33 
34 	I found using user names as channels is good stuff. Then your JS
35 	doesn't provide a channel at all - your helper app gives it through
36 	the channel prefix argument.
37 */
38 module arsd.rtud;
39 
40 import std.string;
41 import std.array : replace;
42 import std.conv;
43 import std.date;
44 
45 
46 class UpdateStream {
47 	File net;
48 	string channel;
49 
50 	this(string channel) {
51 		net = openNetwork("localhost", 7071);
52 		this.channel = channel;
53 	}
54 
55 	~this() {
56 		net.close();
57 	}
58 
59 	void deleteMessage(string messageId) {
60 		import arsd.cgi; // : encodeVariables;
61 		string message = encodeVariables([
62 			"id" : messageId,
63 			"operation" : "delete"
64 		]);
65 
66 		net.writeln(message);
67 		net.flush();
68 	}
69 
70 	void sendMessage(string eventType, string messageText, long ttl = 2500) {
71 		import arsd.cgi; // : encodeVariables;
72 		string message = encodeVariables([
73 			//"operation" : "post",
74 			//"id" : ????,
75 			"channel" : channel,
76 			"type" : eventType,
77 			"data" : messageText,
78 			"ttl" : to!string(ttl)
79 		]);
80 
81 		net.writeln(message);
82 		net.flush();
83 	}
84 }
85 
86 /+
87 		if("channels" in message) {
88 		if("last-message-id" in message)
89 		if("minimum-time" in message)
90 		if("close-time" in message)
91 +/
92 
93 version(D_Version2) {
94 	static import linux = core.sys.posix.unistd;
95 	static import sock = core.sys.posix.sys.socket;
96 } else {
97 	static import linux = std.c.linux.linux;
98 	static import sock = std.c.linux.socket;
99 }
100 
101 int openNetworkFd(string host, ushort port) {
102 	import std.exception;
103 	auto h = enforce( sock.gethostbyname(std..string.toStringz(host)),
104 		new StdioException("gethostbyname"));
105 
106 	int s = sock.socket(sock.AF_INET, sock.SOCK_STREAM, 0);
107 	enforce(s != -1, new StdioException("socket"));
108 
109 	scope(failure) {
110 	    linux.close(s);
111 	}
112 
113 	sock.sockaddr_in addr;
114 
115 	addr.sin_family = sock.AF_INET;
116 	addr.sin_port = sock.htons(port);
117 	std.c..string.memcpy(&addr.sin_addr.s_addr, h.h_addr, h.h_length);
118 
119 	enforce(sock.connect(s, cast(sock.sockaddr*) &addr, addr.sizeof) != -1,
120 		new StdioException("Connect failed"));
121 
122 	return s;
123 }
124 
125 void writeToFd(int fd, string s) {
126 	again:
127 	auto num = linux.write(fd, s.ptr, s.length);
128 	if(num < 0)
129 		throw new Exception("couldn't write");
130 	if(num == 0)
131 		return;
132 	s = s[num .. $];
133 	if(s.length)
134 		goto again;
135 }
136 
137 __gshared bool deathRequested = false;
138 extern(C)
139 void requestDeath(int sig) {
140 	deathRequested = true;
141 }
142 
143 import arsd.cgi;
144 /// The throttledConnection param is useful for helping to get
145 /// around browser connection limitations.
146 
147 /// If the user opens a bunch of tabs, these long standing
148 /// connections can hit the per-host connection limit, breaking
149 /// navigation until the connection times out.
150 
151 /// The throttle option sets a long retry period and polls
152 /// instead of waits. This sucks, but sucks less than your whole
153 /// site hanging because the browser is queuing your connections!
154 int handleListenerGateway(Cgi cgi, string channelPrefix, bool throttledConnection = false) {
155 	cgi.setCache(false);
156 
157 	import core.sys.posix.signal;
158 	sigaction_t act;
159 	// I want all zero everywhere else; the read() must not automatically restart for this to work.
160 	act.sa_handler = &requestDeath;
161 
162 	if(linux.sigaction(linux.SIGTERM, &act, null) != 0)
163 		throw new Exception("sig err");
164 
165 	auto f = openNetworkFd("localhost", 7070);
166 	scope(exit) linux.close(f);
167 
168 	string[string] variables;
169 
170 	variables["channel"] = channelPrefix ~ ("channel" in cgi.get ? cgi.get["channel"] : "");
171 	if("minimum-time" in cgi.get)
172 		variables["minimum-time"] = cgi.get["minimum-time"];
173 	if("last-message-id" in cgi.get)
174 		variables["last-message-id"] = cgi.get["last-message-id"];
175 
176 	bool isSse;
177 
178 	if(cgi.accept == "text/event-stream") {
179 		cgi.setResponseContentType("text/event-stream");
180 		isSse = true;
181 		if(cgi.lastEventId.length)
182 			variables["last-message-id"] = cgi.lastEventId;
183 
184 		if(throttledConnection) {
185 			cgi.write("retry: 15000\n");
186 		} else {
187 			cgi.write(":\n"); // the comment ensures apache doesn't skip us
188 		}
189 
190 		cgi.flush(); // sending the headers along
191 	} else {
192 		// gotta handle it as ajax polling
193 		variables["close-time"] = "0"; // ask for long polling
194 	}
195 
196 
197 	if(throttledConnection)
198 		variables["close-time"] = "-1"; // close immediately
199 
200 	writeToFd(f, encodeVariables(variables) ~ "\n");
201 
202 	string wegot;
203 
204 	string[4096] buffer;
205 
206 	for(; !deathRequested ;) {
207 		auto num = linux.read(f, buffer.ptr, buffer.length);
208 		if(num < 0)
209 			throw new Exception("read error");
210 		if(num == 0)
211 			break;
212 
213 		auto chunk = buffer[0 .. num];
214 		if(isSse) {
215 			cgi.write(chunk);
216 			cgi.flush();
217 		} else {
218 			wegot ~= cast(string) chunk;
219 		}
220 	}
221 
222 	// this is to support older browsers
223 	if(!isSse && !deathRequested) {
224 		// we have to parse it out and reformat for plain cgi...
225 		auto lol = parseMessages(wegot);
226 		//cgi.setResponseContentType("text/json");
227 		// FIXME gotta reorganize my json stuff
228 		//cgi.write(toJson(lol));
229 		return 1;
230 	}
231 
232 	return 0;
233 }
234 
235 struct Message {
236 	string type;
237 	string id;
238 	string data;
239 	long timestamp;
240 	long ttl;
241 
242 	string operation;
243 }
244 
245 
246 Message[] getMessages(string channel, string eventTypeFilter = null, long maxAge = 0) {
247 	auto f = openNetworkFd("localhost", 7070);
248 	scope(exit) linux.close(f);
249 
250 	string[string] variables;
251 
252 	variables["channel"] = channel;
253 	if(maxAge)
254 		variables["minimum-time"] = to!string(getUtcTime() - maxAge);
255 
256 	variables["close-time"] = "-1"; // close immediately
257 
258 	writeToFd(f, encodeVariables(variables) ~ "\n");
259 
260 	string wegot;
261 
262 	string[4096] buffer;
263 
264 	for(;;) {
265 		auto num = linux.read(f, buffer.ptr, buffer.length);
266 		if(num < 0)
267 			throw new Exception("read error");
268 		if(num == 0)
269 			break;
270 
271 		auto chunk = buffer[0 .. num];
272 		wegot ~= cast(string) chunk;
273 	}
274 
275 	return parseMessages(wegot, eventTypeFilter);
276 }
277 
278 Message[] parseMessages(string wegot, string eventTypeFilter = null) {
279 	// gotta parse this since rtud writes out the format for browsers
280 	Message[] ret;
281 	foreach(message; wegot.split("\n\n")) {
282 		Message m;
283 		foreach(line; message.split("\n")) {
284 			if(line.length == 0)
285 				throw new Exception("wtf");
286 			if(line[0] == ':')
287 				line = line[1 .. $];
288 
289 			if(line.length == 0)
290 				continue; // just an empty comment
291 
292 			auto idx = line.indexOf(":");
293 			if(idx == -1)
294 				continue; // probably just a comment
295 
296 			if(idx + 2 > line.length)
297 				continue; // probably just a comment too
298 
299 			auto name = line[0 .. idx];
300 			auto data = line[idx + 2 .. $];
301 
302 			switch(name) {
303 				default: break; // do nothing
304 				case "timestamp":
305 					if(data.length)
306 					m.timestamp = to!long(data);
307 				break;
308 				case "ttl":
309 					if(data.length)
310 					m.ttl = to!long(data);
311 				break;
312 				case "operation":
313 					m.operation = data;
314 				break;
315 				case "id":
316 					m.id = data;
317 				break;
318 				case "event":
319 					m.type = data;
320 				break;
321 				case "data":
322 					m.data ~= data;
323 				break;
324 			}
325 		}
326 		if(eventTypeFilter is null || eventTypeFilter == m.type)
327 			ret ~= m;
328 	}
329 
330 	return ret;
331 }
332 
333 
334 version(rtud_daemon) :
335 
336 import arsd.netman;
337 
338 // Real time update daemon
339 /*
340 	You push messages out to channels, where they are held for a certain length of time.
341 
342 	It can also do state with listener updates.
343 
344 	Clients ask for messages since a time, and if there are none, you hold the connection until something arrives.
345 
346 
347 	There should be D and Javascript apis for pushing and receiving.
348 
349 
350 	JS:
351 
352 	var updateObject = RealTimeUpdate();
353 
354 	updateObject.someMessage = function(msg) {
355 		// react to it
356 	}
357 
358 	updateObject.listen(channel);
359 
360 	updateObject.send(message, args); // probably shouldn't need this from JS
361 */
362 
363 /*
364 	Incoming Packet format is x-www-urlencoded. There must be no new lines
365 	in there - be sure to url encode them.
366 
367 	A message is separated by newlines.
368 */
369 
370 class RtudConnection : Connection {
371 	RealTimeUpdateDaemon daemon;
372 
373 	this(RealTimeUpdateDaemon daemon) {
374 		this.daemon = daemon;
375 	}
376 
377 	override void onDataReceived() {
378 		import arsd.cgi;// : decodeVariables;
379 		try_again:
380 			auto data = cast(string) read();
381 
382 			auto index = data.indexOf("\n");
383 			if(index == -1)
384 				return; // wait for more data
385 
386 			auto messageRaw = data[0 .. index];
387 			changeReadPosition(index + 1);
388 
389 			auto message = decodeVariables(messageRaw);
390 
391 			handleMessage(message);
392 		goto try_again;
393 	}
394 
395 	invariant() {
396 		assert(daemon !is null);
397 	}
398 
399 	abstract void handleMessage(string[][string] message);
400 }
401 
402 class NotificationConnection : RtudConnection {
403 	this(RealTimeUpdateDaemon daemon) {
404 		super(daemon);
405 		closeTime = long.max;
406 	}
407 
408 	long closeTime;
409 
410 	/// send: what channels you're interested in, a minimum time,
411 	/// and a close time.
412 	/// if the close time is negative, you are just polling curiously.
413 	/// if it is zero, it will close after your next batch. (long polling)
414 	/// anything else stays open for as long as it can in there.
415 
416 	override void handleMessage(string[][string] message) {
417 		Channel*[] channels;
418 
419 		if("channels" in message) {
420 			foreach(ch; message["channels"]) {
421 				auto channel = daemon.getChannel(ch);
422 				channels ~= channel;
423 				channel.subscribeTo(this);
424 			}
425 		}
426 
427 		if("channel" in message) {
428 			auto channel = daemon.getChannel(message["channel"][$-1]);
429 			channels ~= channel;
430 			channel.subscribeTo(this);
431 		}
432 
433 		import std.algorithm;
434 		import std.range;
435 
436 		Message*[] backMessages;
437 
438 		if("last-message-id" in message) {
439 			auto lastMessageId = message["last-message-id"][$-1];
440 			foreach(channel; channels)
441 				backMessages ~= channel.messages;
442 
443 			auto bm = sort!"a.timestamp < b.timestamp"(backMessages);
444 
445 			backMessages = array(find!("a.id == b")(bm, lastMessageId));
446 			while(backMessages.length && backMessages[0].id == lastMessageId)
447 				backMessages = backMessages[1 .. $]; // the last message is the one they got
448 
449 			//writeln("backed up from ", lastMessageId, " is");
450 			//foreach(msg; backMessages)
451 				//writeln(*msg);
452 		} else if("minimum-time" in message) {
453 			foreach(channel; channels)
454 				backMessages ~= channel.messages;
455 
456 			auto bm = sort!"a.timestamp < b.timestamp"(backMessages);
457 
458 			backMessages = array(find!("a.timestamp >= b")(bm, to!long(message["minimum-time"][$-1])));
459 		}
460 
461 		if("close-time" in message)
462 			closeTime = to!long(message["close-time"][$-1]);
463 
464 		// send the back messages immediately
465 		daemon.writeMessagesTo(backMessages, this, "backed-up");
466 
467 //		if(closeTime > 0 && closeTime != long.max)
468 //			closeTime = getUtcTime() + closeTime; // FIXME: do i use this? Should I use this?
469 	}
470 
471 	override void onDisconnect() {
472 		daemon.removeConnection(this);
473 	}
474 
475 }
476 
477 class DataConnection : RtudConnection {
478 	this(RealTimeUpdateDaemon daemon) {
479 		super(daemon);
480 	}
481 
482 	override void handleMessage(string[][string] message) {
483 		string getStr(string key, string def) {
484 			if(key in message) {
485 				auto s = message[key][$ - 1];
486 				if(s.length)
487 					return s;
488 			}
489 			return def;
490 		}
491 
492 		string operation =  getStr("operation", "post");
493 
494 		Message* m = daemon.getMessage(getStr("id", null));
495 		switch(operation) {
496 			default: throw new Exception("unknown operation " ~ operation); break;
497 			case "delete":
498 				daemon.deleteMessage(m);
499 			break;
500 			case "edit":
501 			case "post":
502 				// we have to create the message and send it out
503 				m.type = getStr("type", "message");
504 				m.data = getStr("data", "");
505 				m.timestamp = to!long(getStr("timestamp", to!string(getUtcTime())));
506 				m.ttl = to!long(getStr("ttl", "1000"));
507 		}
508 
509 		assert(m !is null);
510 
511 		if("channels" in message)
512 		foreach(ch; message["channels"]) {
513 			auto channel = daemon.getChannel(ch);
514 			assert(channel !is null);
515 			channel.writeMessage(m, operation);
516 		}
517 
518 		if("channel" in message) {
519 			auto channel = daemon.getChannel(message["channel"][$-1]);
520 			channel.writeMessage(m, operation);
521 		}
522 	}
523 }
524 
525 struct Channel {
526 	string id;
527 	Message*[] messages;
528 
529 	// a poor man's set... 
530 	NotificationConnection[NotificationConnection] listeningConnections;
531 
532 
533 	RealTimeUpdateDaemon daemon;
534 
535 	void writeMessage(Message* message, string operation) {
536 		messages ~= message;
537 		foreach(k, v; listeningConnections)
538 			daemon.writeMessagesTo([message], v, operation);
539 	}
540 
541 	void subscribeTo(NotificationConnection c) {
542 		listeningConnections[c] = c;
543 	}
544 }
545 
546 
547 class RealTimeUpdateDaemon : NetworkManager {
548 	this() {
549 		super();
550 		setConnectionSpawner(7070, &createNotificationConnection);
551 		listen(7070);
552 		setConnectionSpawner(7071, &createDataConnection);
553 		listen(7071);
554 	}
555 
556 	private Channel*[string] channels;
557 	private Message*[string] messages;
558 
559 	Message* getMessage(string id) {
560 		if(id.length && id in messages)
561 			return messages[id];
562 
563 		if(id.length == 0)
564 			id = to!string(getUtcTime());
565 
566 		longerId:
567 		if(id in messages) {
568 			id ~= "-";
569 			goto longerId;
570 		}
571 
572 
573 		auto message = new Message;
574 		message.id = id;
575 		messages[id] = message;
576 
577 		//writeln("NEW MESSAGE: ", *message);
578 
579 		return message;
580 	}
581 
582 	void deleteMessage(Message* m) {
583 		messages.remove(m.id);
584 		foreach(k, v; channels)
585 		foreach(i, msg; v.messages) {
586 			if(msg is m) {
587 				v.messages = v.messages[0 .. i] ~ v.messages[i + 1 .. $];
588 				break;
589 			}
590 		}
591 	}
592 
593 	Channel* getChannel(string id) {
594 		if(id in channels)
595 			return channels[id];
596 
597 		auto c = new Channel;
598 		c.daemon = this;
599 		c.id = id;
600 		channels[id] = c;
601 		return c;
602 	}
603 
604 	void writeMessagesTo(Message*[] messages, NotificationConnection connection, string operation) {
605 		foreach(messageMain; messages) {
606 			if(messageMain.timestamp + messageMain.ttl < getUtcTime)
607 				deleteMessage(messageMain); // too old, kill it
608 			Message message = *messageMain;
609 			message.operation = operation;
610 
611 			// this should never happen, but just in case
612 			replace(message.type, "\n", "");
613 			connection.write(":timestamp: " ~ to!string(message.timestamp) ~ "\n");
614 			connection.write(":ttl: " ~ to!string(message.ttl) ~ "\n");
615 			connection.write(":operation: " ~ message.operation ~ "\n");
616 			if(message.id.length)
617 				connection.write("id: " ~ message.id ~ "\n");
618 			connection.write("event: " ~ message.type ~ "\n");
619 			connection.write("data: " ~ replace(message.data, "\n", "\ndata: ") ~ "\n");
620 			connection.write("\n");
621 		}
622 
623 		if(connection.closeTime <= 0) // FIXME: other times?
624 			if(connection.closeTime != 0 || messages.length)
625 				connection.disconnect(); // note this actually queues a disconnect, so we cool
626 	}
627 
628 	void removeConnection(NotificationConnection connection) {
629 		foreach(channel; channels)
630 			channel.listeningConnections.remove(connection);
631 	}
632 
633 	Connection createNotificationConnection() {
634 		return new NotificationConnection(this);
635 	}
636 
637 	Connection createDataConnection() {
638 		return new DataConnection(this);
639 	}
640 }
641 
642 void rtudMain() {
643 	auto netman = new RealTimeUpdateDaemon;
644 
645 	bool proceed = true;
646 
647 	while(proceed)
648 		try
649 			proceed = netman.proceed();
650 		catch(ConnectionException e) {
651 		writeln(e.toString());
652 			e.c.disconnectNow();
653 		}
654 		catch(Throwable e) {
655 
656 
657 		writeln(e.toString());
658 		}
659 }
660 
661 version(standalone_rtud)
662 void main() {
663 	rtudMain();
664 }