1 /++
2 	Uses libpq implement the [arsd.database.Database] interface.
3 
4 	Requires the official pq library from Postgres to be installed to build
5 	and to use. Note that on Windows, it is often distributed as `libpq.lib`.
6 	You will have to copy or rename that to `pq.lib` for dub or dmd to automatically
7 	find it. You will also likely need to add the lib search path yourself on
8 	both Windows and Linux systems (on my Linux box, it is `-L-L/usr/local/pgsql/lib`
9 	to dmd. You can also list things your app's dub.json's lflags too. Note on the
10 	Microsoft linker, the flag is called `/LIBPATH`.)
11 
12 	For example, for the default Postgres install on Windows, try:
13 
14 	```
15 		"lflags-windows": [ "/LIBPATH:C:/Program Files/PostgreSQL/<VERSION>/lib" ],
16 	```
17 
18 	In your dub.json.
19 
20 	When you distribute your application, the user will want to install libpq client on
21 	Linux, and on Windows, you may want to include the libpq.dll in your distribution.
22 	Note it may also depend on OpenSSL ssl and crypto dlls and libintl.dll as well. These
23 	should be found in the PostgreSQL lib and/or bin folders (check them both!).
24 +/
25 module arsd.postgres;
26 
27 version(Windows)
28 	pragma(lib, "libpq");
29 else
30 	pragma(lib, "pq");
31 
32 public import arsd.database;
33 
34 import std.string;
35 import std.exception;
36 
37 // remember to CREATE DATABASE name WITH ENCODING 'utf8'
38 //
39 // http://www.postgresql.org/docs/8.0/static/libpq-exec.html
40 // ExecParams, PQPrepare, PQExecPrepared
41 //
42 // SQL: `DEALLOCATE name` is how to dealloc a prepared statement.
43 
44 /++
45 	The PostgreSql implementation of the [Database] interface.
46 
47 	You should construct this class, but then use it through the
48 	interface functions.
49 
50 	---
51 	auto db = new PostgreSql("dbname=name");
52 	foreach(row; db.query("SELECT id, data FROM table_name"))
53 		writeln(row[0], " = ", row[1]);
54 	---
55 +/
56 class PostgreSql : Database {
57 	/// `dbname=your_database_name` is probably the most common connection string. See section "33.1.1.1. Keyword/Value Connection Strings" on https://www.postgresql.org/docs/10/libpq-connect.html
58 	this(string connectionString) {
59 		this.connectionString = connectionString;
60 		conn = PQconnectdb(toStringz(connectionString));
61 		if(conn is null)
62 			throw new DatabaseException("Unable to allocate PG connection object");
63 		if(PQstatus(conn) != CONNECTION_OK)
64 			throw new DatabaseException(error());
65 		query("SET NAMES 'utf8'"); // D does everything with utf8
66 	}
67 
68 	string connectionString;
69 
70 	~this() {
71 		PQfinish(conn);
72 	}
73 
74 	string sysTimeToValue(SysTime s) {
75 		return "'" ~ escape(s.toISOExtString()) ~ "'::timestamptz";
76 	}
77 
78 	/**
79 		Prepared statement support
80 
81 		This will be added to the Database interface eventually in some form,
82 		but first I need to implement it for all my providers.
83 
84 		The common function of those 4 will be what I put in the interface.
85 	*/
86 
87 	ResultSet executePreparedStatement(T...)(string name, T args) {
88 		const(char)*[args.length] argsStrings;
89 
90 		foreach(idx, arg; args) {
91 			// FIXME: optimize to remove allocations here
92 			import std.conv;
93 			static if(!is(typeof(arg) == typeof(null)))
94 				argsStrings[idx] = toStringz(to!string(arg));
95 			// else make it null
96 		}
97 
98 		auto res = PQexecPrepared(conn, toStringz(name), argsStrings.length, argsStrings.ptr, null, null, 0);
99 
100 		int ress = PQresultStatus(res);
101 		if(ress != PGRES_TUPLES_OK
102 			&& ress != PGRES_COMMAND_OK)
103 			throw new DatabaseException(error());
104 
105 		return new PostgresResult(res);
106 
107 	}
108 
109 	///
110 	override void startTransaction() {
111 		query("START TRANSACTION");
112 	}
113 
114 	ResultSet queryImpl(string sql, Variant[] args...) {
115 		sql = escapedVariants(this, sql, args);
116 
117 		bool first_retry = true;
118 
119 		retry:
120 
121 		auto res = PQexec(conn, toStringz(sql));
122 		int ress = PQresultStatus(res);
123 		// https://www.postgresql.org/docs/current/libpq-exec.html
124 		// FIXME: PQresultErrorField can get a lot more info in a more structured way
125 		if(ress != PGRES_TUPLES_OK
126 			&& ress != PGRES_COMMAND_OK)
127 		{
128 			if(first_retry && error() == "no connection to the server\n") {
129 				first_retry = false;
130 				// try to reconnect...
131 				PQfinish(conn);
132 				conn = PQconnectdb(toStringz(connectionString));
133 				if(conn is null)
134 					throw new DatabaseException("Unable to allocate PG connection object");
135 				if(PQstatus(conn) != CONNECTION_OK)
136 					throw new DatabaseException(error());
137 				goto retry;
138 			}
139 			throw new DatabaseException(error());
140 		}
141 
142 		return new PostgresResult(res);
143 	}
144 
145 	string escape(string sqlData) {
146 		char* buffer = (new char[sqlData.length * 2 + 1]).ptr;
147 		ulong size = PQescapeString (buffer, sqlData.ptr, sqlData.length);
148 
149 		string ret = assumeUnique(buffer[0.. cast(size_t) size]);
150 
151 		return ret;
152 	}
153 
154 	string escapeBinaryString(const(ubyte)[] data) {
155 		// must include '\x ... ' here
156 		size_t len;
157 		char* buf = PQescapeByteaConn(conn, data.ptr, data.length, &len);
158 		if(buf is null)
159 			throw new Exception("pgsql out of memory escaping binary string");
160 
161 		string res;
162 		if(len == 0)
163 			res = "''";
164 		else
165 			res = cast(string) ("'" ~ buf[0 .. len - 1] ~ "'"); // gotta cut the zero terminator off
166 
167 		PQfreemem(buf);
168 
169 		return res;
170 	}
171 
172 
173 	///
174 	string error() {
175 		return copyCString(PQerrorMessage(conn));
176 	}
177 
178 	private:
179 		PGconn* conn;
180 }
181 
182 private string toLowerFast(string s) {
183 	import std.ascii : isUpper;
184 	foreach (c; s)
185 		if (c >= 0x80 || isUpper(c))
186 			return toLower(s);
187 	return s;
188 }
189 
190 ///
191 class PostgresResult : ResultSet {
192 	// name for associative array to result index
193 	int getFieldIndex(string field) {
194 		if(mapping is null)
195 			makeFieldMapping();
196 		field = field.toLowerFast;
197 		if(field in mapping)
198 			return mapping[field];
199 		else throw new Exception("no mapping " ~ field);
200 	}
201 
202 
203 	string[] fieldNames() {
204 		if(mapping is null)
205 			makeFieldMapping();
206 		return columnNames;
207 	}
208 
209 	// this is a range that can offer other ranges to access it
210 	bool empty() {
211 		return position == numRows;
212 	}
213 
214 	Row front() {
215 		return row;
216 	}
217 
218 	int affectedRows() @system {
219 		auto g = PQcmdTuples(res);
220 		if(g is null)
221 			return 0;
222 		int num;
223 		while(*g) {
224 			num *= 10;
225 			num += *g - '0';
226 			g++;
227 		}
228 		return num;
229 	}
230 
231 	void popFront() {
232 		position++;
233 		if(position < numRows)
234 			fetchNext();
235 	}
236 
237 	override size_t length() {
238 		return numRows;
239 	}
240 
241 	this(PGresult* res) {
242 		this.res = res;
243 		numFields = PQnfields(res);
244 		numRows = PQntuples(res);
245 
246 		if(numRows)
247 			fetchNext();
248 	}
249 
250 	~this() {
251 		PQclear(res);
252 	}
253 
254 	private:
255 		PGresult* res;
256 		int[string] mapping;
257 		string[] columnNames;
258 		int numFields;
259 
260 		int position;
261 
262 		int numRows;
263 
264 		Row row;
265 
266 		void fetchNext() {
267 			Row r;
268 			r.resultSet = this;
269 			DatabaseDatum[] row;
270 
271 			for(int i = 0; i < numFields; i++) {
272 				string a;
273 
274 				if(PQgetisnull(res, position, i))
275 					a = null;
276 				else {
277 					switch(PQfformat(res, i)) {
278 						case 0: // text representation
279 							switch(PQftype(res, i)) {
280 								case BYTEAOID:
281 									size_t len;
282 									char* c = PQunescapeBytea(PQgetvalue(res, position, i), &len);
283 
284 									a = cast(string) c[0 .. len].idup;
285 
286 									PQfreemem(c);
287 								break;
288 								default:
289 									a = copyCString(PQgetvalue(res, position, i), PQgetlength(res, position, i));
290 							}
291 						break;
292 						case 1: // binary representation
293 							throw new Exception("unexpected format returned by pq");
294 						default:
295 							throw new Exception("unknown pq format");
296 					}
297 
298 				}
299 				row ~= DatabaseDatum(a);
300 			}
301 
302 			r.row = row;
303 			this.row = r;
304 		}
305 
306 		void makeFieldMapping() {
307 			for(int i = 0; i < numFields; i++) {
308 				string a = copyCString(PQfname(res, i));
309 
310 				columnNames ~= a;
311 				mapping[a] = i;
312 			}
313 
314 		}
315 }
316 
317 string copyCString(const char* c, int actualLength = -1) @system {
318 	const(char)* a = c;
319 	if(a is null)
320 		return null;
321 
322 	string ret;
323 	if(actualLength == -1)
324 		while(*a) {
325 			ret ~= *a;
326 			a++;
327 		}
328 	else {
329 		ret = a[0..actualLength].idup;
330 	}
331 
332 	return ret;
333 }
334 
335 extern(C) {
336 	struct PGconn {};
337 	struct PGresult {};
338 
339 	void PQfinish(PGconn*);
340 	PGconn* PQconnectdb(const char*);
341 
342 	int PQstatus(PGconn*); // FIXME check return value
343 
344 	const (char*) PQerrorMessage(PGconn*);
345 
346 	PGresult* PQexec(PGconn*, const char*);
347 	void PQclear(PGresult*);
348 
349 	PGresult* PQprepare(PGconn*, const char* stmtName, const char* query, int nParams, const void* paramTypes);
350 
351 	PGresult* PQexecPrepared(PGconn*, const char* stmtName, int nParams, const char** paramValues, const int* paramLengths, const int* paramFormats, int resultFormat);
352 
353 	int PQresultStatus(PGresult*); // FIXME check return value
354 
355 	int PQnfields(PGresult*); // number of fields in a result
356 	const(char*) PQfname(PGresult*, int); // name of field
357 
358 	int PQntuples(PGresult*); // number of rows in result
359 	const(char*) PQgetvalue(PGresult*, int row, int column);
360 
361 	size_t PQescapeString (char *to, const char *from, size_t length);
362 
363 	enum int CONNECTION_OK = 0;
364 	enum int PGRES_COMMAND_OK = 1;
365 	enum int PGRES_TUPLES_OK = 2;
366 
367 	int PQgetlength(const PGresult *res,
368 			int row_number,
369 			int column_number);
370 	int PQgetisnull(const PGresult *res,
371 			int row_number,
372 			int column_number);
373 
374 	int PQfformat(const PGresult *res, int column_number);
375 
376 	alias Oid = int;
377 	enum BYTEAOID = 17;
378 	Oid PQftype(const PGresult* res, int column_number);
379 
380 	char *PQescapeByteaConn(PGconn *conn,
381                                  const ubyte *from,
382                                  size_t from_length,
383                                  size_t *to_length);
384 	char *PQunescapeBytea(const char *from, size_t *to_length);
385 	void PQfreemem(void *ptr);
386 
387 	char* PQcmdTuples(PGresult *res);
388 
389 }
390 
391 /*
392 import std.stdio;
393 void main() {
394 	auto db = new PostgreSql("dbname = test");
395 
396 	db.query("INSERT INTO users (id, name) values (?, ?)", 30, "hello mang");
397 
398 	foreach(line; db.query("SELECT * FROM users")) {
399 		writeln(line[0], line["name"]);
400 	}
401 }
402 */