1 /++
2 	Uses libpq implement the [arsd.database.Database] interface.
3 
4 	Requires the official pq library from Postgres to be installed to build
5 	and to use. Note that on Windows, it is often distributed as `libpq.lib`.
6 	You will have to copy or rename that to `pq.lib` for dub or dmd to automatically
7 	find it. You will also likely need to add the lib search path yourself on
8 	both Windows and Linux systems (on my Linux box, it is `-L-L/usr/local/pgsql/lib`
9 	to dmd. You can also list things your app's dub.json's lflags too. Note on the
10 	Microsoft linker, the flag is called `/LIBPATH`.)
11 
12 	For example, for the default Postgres install on Windows, try:
13 
14 	```
15 		"lflags-windows": [ "/LIBPATH:C:/Program Files/PostgreSQL/<VERSION>/lib" ],
16 	```
17 
18 	In your dub.json.
19 
20 	When you distribute your application, the user will want to install libpq client on
21 	Linux, and on Windows, you may want to include the libpq.dll in your distribution.
22 	Note it may also depend on OpenSSL ssl and crypto dlls and libintl.dll as well. These
23 	should be found in the PostgreSQL lib and/or bin folders (check them both!).
24 +/
25 module arsd.postgres;
26 
27 version(Windows)
28 	pragma(lib, "libpq");
29 else
30 	pragma(lib, "pq");
31 
32 public import arsd.database;
33 
34 import std.string;
35 import std.exception;
36 
37 // remember to CREATE DATABASE name WITH ENCODING 'utf8'
38 //
39 // http://www.postgresql.org/docs/8.0/static/libpq-exec.html
40 // ExecParams, PQPrepare, PQExecPrepared
41 //
42 // SQL: `DEALLOCATE name` is how to dealloc a prepared statement.
43 
44 /++
45 	The PostgreSql implementation of the [Database] interface.
46 
47 	You should construct this class, but then use it through the
48 	interface functions.
49 
50 	---
51 	auto db = new PostgreSql("dbname=name");
52 	foreach(row; db.query("SELECT id, data FROM table_name"))
53 		writeln(row[0], " = ", row[1]);
54 	---
55 +/
56 class PostgreSql : Database {
57 	/// `dbname=your_database_name` is probably the most common connection string. See section "33.1.1.1. Keyword/Value Connection Strings" on https://www.postgresql.org/docs/10/libpq-connect.html
58 	this(string connectionString) {
59 		this.connectionString = connectionString;
60 		conn = PQconnectdb(toStringz(connectionString));
61 		if(conn is null)
62 			throw new DatabaseConnectionException("Unable to allocate PG connection object");
63 		if(PQstatus(conn) != CONNECTION_OK) {
64 			this.connectionOk = false;
65 			throw new DatabaseConnectionException(error());
66 		}
67 		query("SET NAMES 'utf8'"); // D does everything with utf8
68 		this.connectionOk = true;
69 	}
70 
71 	string connectionString;
72 
73 	~this() {
74 		PQfinish(conn);
75 	}
76 
77 	string sysTimeToValue(SysTime s) {
78 		return "'" ~ escape(s.toISOExtString()) ~ "'::timestamptz";
79 	}
80 
81 	private bool connectionOk;
82 	override bool isAlive() {
83 		return connectionOk;
84 	}
85 
86 	/**
87 		Prepared statement support
88 
89 		This will be added to the Database interface eventually in some form,
90 		but first I need to implement it for all my providers.
91 
92 		The common function of those 4 will be what I put in the interface.
93 	*/
94 
95 	ResultSet executePreparedStatement(T...)(string name, T args) {
96 		const(char)*[args.length] argsStrings;
97 
98 		foreach(idx, arg; args) {
99 			// FIXME: optimize to remove allocations here
100 			import std.conv;
101 			static if(!is(typeof(arg) == typeof(null)))
102 				argsStrings[idx] = toStringz(to!string(arg));
103 			// else make it null
104 		}
105 
106 		auto res = PQexecPrepared(conn, toStringz(name), argsStrings.length, argsStrings.ptr, null, null, 0);
107 
108 		int ress = PQresultStatus(res);
109 		if(ress != PGRES_TUPLES_OK
110 			&& ress != PGRES_COMMAND_OK)
111 			throw new DatabaseException(error());
112 
113 		return new PostgresResult(res);
114 
115 	}
116 
117 	///
118 	override void startTransaction() {
119 		query("START TRANSACTION");
120 	}
121 
122 	ResultSet queryImpl(string sql, Variant[] args...) {
123 		sql = escapedVariants(this, sql, args);
124 
125 		bool first_retry = true;
126 
127 		retry:
128 
129 		auto res = PQexec(conn, toStringz(sql));
130 		int ress = PQresultStatus(res);
131 		// https://www.postgresql.org/docs/current/libpq-exec.html
132 		// FIXME: PQresultErrorField can get a lot more info in a more structured way
133 		if(ress != PGRES_TUPLES_OK
134 			&& ress != PGRES_COMMAND_OK)
135 		{
136 			if(first_retry && error() == "no connection to the server\n") {
137 				first_retry = false;
138 				// try to reconnect...
139 				PQfinish(conn);
140 				conn = PQconnectdb(toStringz(connectionString));
141 				if(conn is null)
142 					throw new DatabaseConnectionException("Unable to allocate PG connection object");
143 				if(PQstatus(conn) != CONNECTION_OK) {
144 					this.connectionOk = false;
145 					throw new DatabaseConnectionException(error());
146 				}
147 				goto retry;
148 			}
149 			throw new SqlException(error());
150 		}
151 
152 		return new PostgresResult(res);
153 	}
154 
155 	string escape(string sqlData) {
156 		char* buffer = (new char[sqlData.length * 2 + 1]).ptr;
157 		ulong size = PQescapeString (buffer, sqlData.ptr, sqlData.length);
158 
159 		string ret = assumeUnique(buffer[0.. cast(size_t) size]);
160 
161 		return ret;
162 	}
163 
164 	string escapeBinaryString(const(ubyte)[] data) {
165 		// must include '\x ... ' here
166 		size_t len;
167 		char* buf = PQescapeByteaConn(conn, data.ptr, data.length, &len);
168 		if(buf is null)
169 			throw new Exception("pgsql out of memory escaping binary string");
170 
171 		string res;
172 		if(len == 0)
173 			res = "''";
174 		else
175 			res = cast(string) ("'" ~ buf[0 .. len - 1] ~ "'"); // gotta cut the zero terminator off
176 
177 		PQfreemem(buf);
178 
179 		return res;
180 	}
181 
182 
183 	///
184 	string error() {
185 		return copyCString(PQerrorMessage(conn));
186 	}
187 
188 	private:
189 		PGconn* conn;
190 }
191 
192 /+
193 # when it changes from lowercase to upper case, call that a new word. or when it goes to/from anything else and underscore or dashes.
194 +/
195 
196 struct PreparedStatementDescription {
197 	PreparedStatementResult[] result;
198 }
199 
200 struct PreparedStatementResult {
201 	string fieldName;
202 	DatabaseDatum type;
203 }
204 
205 PreparedStatementDescription describePrepared(PostgreSql db, string name) {
206 	auto res = PQdescribePrepared(db.conn, name.toStringz);
207 
208 	PreparedStatementResult[] ret;
209 
210 	// PQnparams PQparamtype for params
211 	auto numFields = PQnfields(res);
212 	foreach(num; 0 .. numFields) {
213 		auto typeId = PQftype(res, num);
214 		DatabaseDatum dd;
215 		dd.platformSpecificTag = typeId;
216 		dd.storage = sampleForOid(typeId);
217 		ret ~= PreparedStatementResult(
218 			copyCString(PQfname(res, num)),
219 			dd,
220 		);
221 	}
222 
223 	PQclear(res);
224 
225 	return PreparedStatementDescription(ret);
226 }
227 
228 import arsd.core : LimitedVariant, PackedDateTime, SimplifiedUtcTimestamp;
229 LimitedVariant sampleForOid(int platformSpecificTag) {
230 	switch(platformSpecificTag) {
231 		case BOOLOID:
232 			return LimitedVariant(false);
233 		case BYTEAOID:
234 			return LimitedVariant(cast(const(ubyte)[]) null);
235 		case TEXTOID:
236 		case VARCHAROID:
237 			return LimitedVariant("");
238 		case INT4OID:
239 			return LimitedVariant(0);
240 		case INT8OID:
241 			return LimitedVariant(0L);
242 		case FLOAT4OID:
243 			return LimitedVariant(0.0f);
244 		case FLOAT8OID:
245 			return LimitedVariant(0.0);
246 		case TIMESTAMPOID:
247 		case TIMESTAMPTZOID:
248 			return LimitedVariant(SimplifiedUtcTimestamp(0));
249 		case DATEOID:
250 			PackedDateTime d;
251 			d.hasDate = true;
252 			return LimitedVariant(d); // might want a different type so contains shows the thing without checking hasDate and hasTime
253 		case TIMETZOID: // possibly wrong... the tz isn't in my packed thing
254 		case TIMEOID:
255 			PackedDateTime d;
256 			d.hasTime = true;
257 			return LimitedVariant(d);
258 		case INTERVALOID:
259 			// months, days, and microseconds
260 
261 		case NUMERICOID: // aka decimal
262 		default:
263 			// when in doubt, assume it is just a string
264 			return LimitedVariant("sample");
265 	}
266 }
267 
268 private string toLowerFast(string s) {
269 	import std.ascii : isUpper;
270 	foreach (c; s)
271 		if (c >= 0x80 || isUpper(c))
272 			return toLower(s);
273 	return s;
274 }
275 
276 ///
277 class PostgresResult : ResultSet {
278 	// name for associative array to result index
279 	int getFieldIndex(string field) {
280 		if(mapping is null)
281 			makeFieldMapping();
282 		field = field.toLowerFast;
283 		if(field in mapping)
284 			return mapping[field];
285 		else throw new Exception("no mapping " ~ field);
286 	}
287 
288 
289 	string[] fieldNames() {
290 		if(mapping is null)
291 			makeFieldMapping();
292 		return columnNames;
293 	}
294 
295 	// this is a range that can offer other ranges to access it
296 	bool empty() {
297 		return position == numRows;
298 	}
299 
300 	Row front() {
301 		return row;
302 	}
303 
304 	int affectedRows() @system {
305 		auto g = PQcmdTuples(res);
306 		if(g is null)
307 			return 0;
308 		int num;
309 		while(*g) {
310 			num *= 10;
311 			num += *g - '0';
312 			g++;
313 		}
314 		return num;
315 	}
316 
317 	void popFront() {
318 		position++;
319 		if(position < numRows)
320 			fetchNext();
321 	}
322 
323 	override size_t length() {
324 		return numRows;
325 	}
326 
327 	this(PGresult* res) {
328 		this.res = res;
329 		numFields = PQnfields(res);
330 		numRows = PQntuples(res);
331 
332 		if(numRows)
333 			fetchNext();
334 	}
335 
336 	~this() {
337 		PQclear(res);
338 	}
339 
340 	private:
341 		PGresult* res;
342 		int[string] mapping;
343 		string[] columnNames;
344 		int numFields;
345 
346 		int position;
347 
348 		int numRows;
349 
350 		Row row;
351 
352 		void fetchNext() {
353 			Row r;
354 			r.resultSet = this;
355 			DatabaseDatum[] row;
356 
357 			for(int i = 0; i < numFields; i++) {
358 				string a;
359 
360 				if(PQgetisnull(res, position, i))
361 					a = null;
362 				else {
363 					switch(PQfformat(res, i)) {
364 						case 0: // text representation
365 							switch(PQftype(res, i)) {
366 								case BYTEAOID:
367 									size_t len;
368 									char* c = PQunescapeBytea(PQgetvalue(res, position, i), &len);
369 
370 									a = cast(string) c[0 .. len].idup;
371 
372 									PQfreemem(c);
373 								break;
374 								default:
375 									a = copyCString(PQgetvalue(res, position, i), PQgetlength(res, position, i));
376 							}
377 						break;
378 						case 1: // binary representation
379 							throw new Exception("unexpected format returned by pq");
380 						default:
381 							throw new Exception("unknown pq format");
382 					}
383 
384 				}
385 				row ~= DatabaseDatum(a);
386 			}
387 
388 			r.row = row;
389 			this.row = r;
390 		}
391 
392 		void makeFieldMapping() {
393 			for(int i = 0; i < numFields; i++) {
394 				string a = copyCString(PQfname(res, i));
395 
396 				columnNames ~= a;
397 				mapping[a] = i;
398 			}
399 
400 		}
401 }
402 
403 string copyCString(const char* c, int actualLength = -1) @system {
404 	const(char)* a = c;
405 	if(a is null)
406 		return null;
407 
408 	string ret;
409 	if(actualLength == -1)
410 		while(*a) {
411 			ret ~= *a;
412 			a++;
413 		}
414 	else {
415 		ret = a[0..actualLength].idup;
416 	}
417 
418 	return ret;
419 }
420 
421 extern(C) {
422 	struct PGconn {};
423 	struct PGresult {};
424 
425 	void PQfinish(PGconn*);
426 	PGconn* PQconnectdb(const char*);
427 
428 	int PQstatus(PGconn*); // FIXME check return value
429 
430 	const (char*) PQerrorMessage(PGconn*);
431 
432 	PGresult* PQexec(PGconn*, const char*);
433 	void PQclear(PGresult*);
434 
435 	PGresult* PQprepare(PGconn*, const char* stmtName, const char* query, int nParams, const void* paramTypes);
436 	int PQsendPrepare(PGconn*, const char*, const char*, int, const Oid*);
437 
438 	PGresult* PQexecPrepared(PGconn*, const char* stmtName, int nParams, const char** paramValues, const int* paramLengths, const int* paramFormats, int resultFormat);
439 	int PQsendQueryPrepared(PGconn*, const char* stmtName, int nParams, const char** paramValues, const int* paramLengths, const int* paramFormats, int resultFormat);
440 	int PQsendClosePrepared(PGconn* conn, const char* name);
441 
442 	int PQresultStatus(PGresult*); // FIXME check return value
443 
444 	int PQnfields(PGresult*); // number of fields in a result
445 	const(char*) PQfname(PGresult*, int); // name of field
446 
447 	int PQntuples(PGresult*); // number of rows in result
448 	const(char*) PQgetvalue(PGresult*, int row, int column);
449 
450 	size_t PQescapeString (char *to, const char *from, size_t length);
451 
452 	enum int CONNECTION_OK = 0;
453 
454 	enum int PGRES_EMPTY_QUERY = 0;
455 	enum int PGRES_COMMAND_OK = 1;
456 	enum int PGRES_TUPLES_OK = 2;
457 	enum int PGRES_COPY_OUT = 3;
458 	enum int PGRES_COPY_IN = 4;
459 	enum int PGRES_BAD_RESPONSE = 5;
460 	enum int PGRES_NONFATAL_ERROR = 6;
461 	enum int PGRES_FATAL_ERROR = 7;
462 	enum int PGRES_COPY_BOTH = 8;
463 	enum int PGRES_SINGLE_TUPLE = 9;
464 	enum int PGRES_PIPELINE_SYNC = 10;
465 	enum int PGRES_PIPELINE_ABORTED = 11;
466 	// looks like chunks was added in pq version 17...
467 
468 	int PQsetSingleRowMode(PGconn* conn);
469 
470 	// https://www.postgresql.org/docs/current/libpq-notify.html
471 
472 	enum int PGRES_POLLING_FAILED = 0;
473 	enum int PGRES_POLLING_READING = 1;
474 	enum int PGRES_POLLING_WRITING = 2;
475 	enum int PGRES_POLLING_OK = 3;
476 	PGconn* PQconnectStart(const char* connInfo);
477 	int PQconnectPoll(PGconn* conn);
478 
479 	int PQgetlength(const PGresult *res,
480 			int row_number,
481 			int column_number);
482 	int PQgetisnull(const PGresult *res,
483 			int row_number,
484 			int column_number);
485 
486 	int PQfformat(const PGresult *res, int column_number);
487 
488 	alias Oid = int;
489 	enum BOOLOID = 16;
490 	enum BYTEAOID = 17;
491 	enum TEXTOID = 25;
492 	enum INT4OID = 23; // integer
493 	enum INT8OID = 20; // bigint
494 	enum NUMERICOID = 1700;
495 	enum FLOAT4OID = 700;
496 	enum FLOAT8OID = 701;
497 	enum VARCHAROID = 1043;
498 	enum DATEOID = 1082;
499 	enum TIMEOID = 1083;
500 	enum TIMESTAMPOID = 1114;
501 	enum TIMESTAMPTZOID = 1184;
502 	enum INTERVALOID = 1186;
503 	enum TIMETZOID = 1266;
504 
505 	Oid PQftype(const PGresult* res, int column_number);
506 
507 	char *PQescapeByteaConn(PGconn *conn,
508                                  const ubyte *from,
509                                  size_t from_length,
510                                  size_t *to_length);
511 	char *PQunescapeBytea(const char *from, size_t *to_length);
512 	void PQfreemem(void *ptr);
513 
514 	char* PQcmdTuples(PGresult *res);
515 
516 	int PQsendQuery(PGconn* conn, const char* command);
517 	int PQsendQueryParams(PGconn* conn, const char* command, int params, const Oid* paramTypes, const char** paramValues, const int* paramLengths, const int* paramFormats, int resultFormat);
518 
519 	PGresult *PQdescribePrepared(PGconn *conn, const char *stmtName);
520 	int PQsendDescribePrepared(PGconn *conn, const char *stmtName);
521 
522 	PGresult* PQgetResult(PGconn* conn); // call until it returns null
523 
524 	int PQenterPipelineMode(PGconn* conn); // returns 1 on success
525 	int PQexitPipelineMode(PGconn* conn); // ditto
526 	PGpipelineStatus PQpipelineStatus(const PGconn* conn);
527 	enum PGpipelineStatus {
528 		// FIXME: confirm values
529 		PQ_PIPELINE_ON,
530 		PQ_PIPELINE_OFF,
531 		PQ_PIPELINE_ABORTED
532 	}
533 	int PQpipelineSync(PGconn* conn);
534 	int PQsendPipelineSync(PGconn* conn);
535 	int PQsendFlushRequest(PGconn* conn);
536 
537 	int PQconsumeInput(PGconn* conn);
538 	int PQisBusy(PGconn* conn);
539 
540 	int PQsetnonblocking(PGconn* conn, int arg);
541 	int PQflush(PGconn* conn); // if returns 1, wait for socket readiness
542 
543 	int PQsocket(const PGconn* conn); // returns a fd
544 }
545 
546 /*
547 import std.stdio;
548 void main() {
549 	auto db = new PostgreSql("dbname = test");
550 
551 	db.query("INSERT INTO users (id, name) values (?, ?)", 30, "hello mang");
552 
553 	foreach(line; db.query("SELECT * FROM users")) {
554 		writeln(line[0], line["name"]);
555 	}
556 }
557 */