1 /++
2 	Uses libpq implement the [arsd.database.Database] interface.
3 
4 	Requires the official pq library from Postgres to be installed to build
5 	and to use. Note that on Windows, it is often distributed as `libpq.lib`.
6 	You will have to copy or rename that to `pq.lib` for dub or dmd to automatically
7 	find it. You will also likely need to add the lib search path yourself on
8 	both Windows and Linux systems (on my Linux box, it is `-L-L/usr/local/pgsql/lib`
9 	to dmd. You can also list things your app's dub.json's lflags too. Note on the
10 	Microsoft linker, the flag is called `/LIBPATH`.)
11 
12 	For example, for the default Postgres install on Windows, try:
13 
14 	```
15 		"lflags-windows": [ "/LIBPATH:C:/Program Files/PostgreSQL/<VERSION>/lib" ],
16 	```
17 
18 	In your dub.json.
19 
20 	When you distribute your application, the user will want to install libpq client on
21 	Linux, and on Windows, you may want to include the libpq.dll in your distribution.
22 	Note it may also depend on OpenSSL ssl and crypto dlls and libintl.dll as well. These
23 	should be found in the PostgreSQL lib and/or bin folders (check them both!).
24 +/
25 module arsd.postgres;
26 
27 version(Windows)
28 	pragma(lib, "libpq");
29 else
30 	pragma(lib, "pq");
31 
32 public import arsd.database;
33 
34 import std.string;
35 import std.exception;
36 
37 // remember to CREATE DATABASE name WITH ENCODING 'utf8'
38 //
39 // http://www.postgresql.org/docs/8.0/static/libpq-exec.html
40 // ExecParams, PQPrepare, PQExecPrepared
41 //
42 // SQL: `DEALLOCATE name` is how to dealloc a prepared statement.
43 
44 /++
45 	The PostgreSql implementation of the [Database] interface.
46 
47 	You should construct this class, but then use it through the
48 	interface functions.
49 
50 	---
51 	auto db = new PostgreSql("dbname=name");
52 	foreach(row; db.query("SELECT id, data FROM table_name"))
53 		writeln(row[0], " = ", row[1]);
54 	---
55 +/
56 class PostgreSql : Database {
57 	/// `dbname=your_database_name` is probably the most common connection string. See section "33.1.1.1. Keyword/Value Connection Strings" on https://www.postgresql.org/docs/10/libpq-connect.html
58 	this(string connectionString) {
59 		this.connectionString = connectionString;
60 		conn = PQconnectdb(toStringz(connectionString));
61 		if(conn is null)
62 			throw new DatabaseException("Unable to allocate PG connection object");
63 		if(PQstatus(conn) != CONNECTION_OK)
64 			throw new DatabaseException(error());
65 		query("SET NAMES 'utf8'"); // D does everything with utf8
66 	}
67 
68 	string connectionString;
69 
70 	~this() {
71 		PQfinish(conn);
72 	}
73 
74 	string sysTimeToValue(SysTime s) {
75 		return "'" ~ escape(s.toISOExtString()) ~ "'::timestamptz";
76 	}
77 
78 	/**
79 		Prepared statement support
80 
81 		This will be added to the Database interface eventually in some form,
82 		but first I need to implement it for all my providers.
83 
84 		The common function of those 4 will be what I put in the interface.
85 	*/
86 
87 	ResultSet executePreparedStatement(T...)(string name, T args) {
88 		const(char)*[args.length] argsStrings;
89 
90 		foreach(idx, arg; args) {
91 			// FIXME: optimize to remove allocations here
92 			import std.conv;
93 			static if(!is(typeof(arg) == typeof(null)))
94 				argsStrings[idx] = toStringz(to!string(arg));
95 			// else make it null
96 		}
97 
98 		auto res = PQexecPrepared(conn, toStringz(name), argsStrings.length, argsStrings.ptr, null, null, 0);
99 
100 		int ress = PQresultStatus(res);
101 		if(ress != PGRES_TUPLES_OK
102 			&& ress != PGRES_COMMAND_OK)
103 			throw new DatabaseException(error());
104 
105 		return new PostgresResult(res);
106 
107 	}
108 
109 	///
110 	override void startTransaction() {
111 		query("START TRANSACTION");
112 	}
113 
114 	ResultSet queryImpl(string sql, Variant[] args...) {
115 		sql = escapedVariants(this, sql, args);
116 
117 		bool first_retry = true;
118 
119 		retry:
120 
121 		auto res = PQexec(conn, toStringz(sql));
122 		int ress = PQresultStatus(res);
123 		// https://www.postgresql.org/docs/current/libpq-exec.html
124 		// FIXME: PQresultErrorField can get a lot more info in a more structured way
125 		if(ress != PGRES_TUPLES_OK
126 			&& ress != PGRES_COMMAND_OK)
127 		{
128 			if(first_retry && error() == "no connection to the server\n") {
129 				first_retry = false;
130 				// try to reconnect...
131 				PQfinish(conn);
132 				conn = PQconnectdb(toStringz(connectionString));
133 				if(conn is null)
134 					throw new DatabaseException("Unable to allocate PG connection object");
135 				if(PQstatus(conn) != CONNECTION_OK)
136 					throw new DatabaseException(error());
137 				goto retry;
138 			}
139 			throw new DatabaseException(error());
140 		}
141 
142 		return new PostgresResult(res);
143 	}
144 
145 	string escape(string sqlData) {
146 		char* buffer = (new char[sqlData.length * 2 + 1]).ptr;
147 		ulong size = PQescapeString (buffer, sqlData.ptr, sqlData.length);
148 
149 		string ret = assumeUnique(buffer[0.. cast(size_t) size]);
150 
151 		return ret;
152 	}
153 
154 	string escapeBinaryString(const(ubyte)[] data) {
155 		// must include '\x ... ' here
156 		size_t len;
157 		char* buf = PQescapeByteaConn(conn, data.ptr, data.length, &len);
158 		if(buf is null)
159 			throw new Exception("pgsql out of memory escaping binary string");
160 
161 		string res;
162 		if(len == 0)
163 			res = "''";
164 		else
165 			res = cast(string) ("'" ~ buf[0 .. len - 1] ~ "'"); // gotta cut the zero terminator off
166 
167 		PQfreemem(buf);
168 
169 		return res;
170 	}
171 
172 
173 	///
174 	string error() {
175 		return copyCString(PQerrorMessage(conn));
176 	}
177 
178 	private:
179 		PGconn* conn;
180 }
181 
182 ///
183 class PostgresResult : ResultSet {
184 	// name for associative array to result index
185 	int getFieldIndex(string field) {
186 		if(mapping is null)
187 			makeFieldMapping();
188 		field = field.toLower;
189 		if(field in mapping)
190 			return mapping[field];
191 		else throw new Exception("no mapping " ~ field);
192 	}
193 
194 
195 	string[] fieldNames() {
196 		if(mapping is null)
197 			makeFieldMapping();
198 		return columnNames;
199 	}
200 
201 	// this is a range that can offer other ranges to access it
202 	bool empty() {
203 		return position == numRows;
204 	}
205 
206 	Row front() {
207 		return row;
208 	}
209 
210 	int affectedRows() @system {
211 		auto g = PQcmdTuples(res);
212 		if(g is null)
213 			return 0;
214 		int num;
215 		while(*g) {
216 			num *= 10;
217 			num += *g - '0';
218 			g++;
219 		}
220 		return num;
221 	}
222 
223 	void popFront() {
224 		position++;
225 		if(position < numRows)
226 			fetchNext();
227 	}
228 
229 	override size_t length() {
230 		return numRows;
231 	}
232 
233 	this(PGresult* res) {
234 		this.res = res;
235 		numFields = PQnfields(res);
236 		numRows = PQntuples(res);
237 
238 		if(numRows)
239 			fetchNext();
240 	}
241 
242 	~this() {
243 		PQclear(res);
244 	}
245 
246 	private:
247 		PGresult* res;
248 		int[string] mapping;
249 		string[] columnNames;
250 		int numFields;
251 
252 		int position;
253 
254 		int numRows;
255 
256 		Row row;
257 
258 		void fetchNext() {
259 			Row r;
260 			r.resultSet = this;
261 			DatabaseDatum[] row;
262 
263 			for(int i = 0; i < numFields; i++) {
264 				string a;
265 
266 				if(PQgetisnull(res, position, i))
267 					a = null;
268 				else {
269 					switch(PQfformat(res, i)) {
270 						case 0: // text representation
271 							switch(PQftype(res, i)) {
272 								case BYTEAOID:
273 									size_t len;
274 									char* c = PQunescapeBytea(PQgetvalue(res, position, i), &len);
275 
276 									a = cast(string) c[0 .. len].idup;
277 
278 									PQfreemem(c);
279 								break;
280 								default:
281 									a = copyCString(PQgetvalue(res, position, i), PQgetlength(res, position, i));
282 							}
283 						break;
284 						case 1: // binary representation
285 							throw new Exception("unexpected format returned by pq");
286 						default:
287 							throw new Exception("unknown pq format");
288 					}
289 
290 				}
291 				row ~= DatabaseDatum(a);
292 			}
293 
294 			r.row = row;
295 			this.row = r;
296 		}
297 
298 		void makeFieldMapping() {
299 			for(int i = 0; i < numFields; i++) {
300 				string a = copyCString(PQfname(res, i));
301 
302 				columnNames ~= a;
303 				mapping[a] = i;
304 			}
305 
306 		}
307 }
308 
309 string copyCString(const char* c, int actualLength = -1) @system {
310 	const(char)* a = c;
311 	if(a is null)
312 		return null;
313 
314 	string ret;
315 	if(actualLength == -1)
316 		while(*a) {
317 			ret ~= *a;
318 			a++;
319 		}
320 	else {
321 		ret = a[0..actualLength].idup;
322 	}
323 
324 	return ret;
325 }
326 
327 extern(C) {
328 	struct PGconn {};
329 	struct PGresult {};
330 
331 	void PQfinish(PGconn*);
332 	PGconn* PQconnectdb(const char*);
333 
334 	int PQstatus(PGconn*); // FIXME check return value
335 
336 	const (char*) PQerrorMessage(PGconn*);
337 
338 	PGresult* PQexec(PGconn*, const char*);
339 	void PQclear(PGresult*);
340 
341 	PGresult* PQprepare(PGconn*, const char* stmtName, const char* query, int nParams, const void* paramTypes);
342 
343 	PGresult* PQexecPrepared(PGconn*, const char* stmtName, int nParams, const char** paramValues, const int* paramLengths, const int* paramFormats, int resultFormat);
344 
345 	int PQresultStatus(PGresult*); // FIXME check return value
346 
347 	int PQnfields(PGresult*); // number of fields in a result
348 	const(char*) PQfname(PGresult*, int); // name of field
349 
350 	int PQntuples(PGresult*); // number of rows in result
351 	const(char*) PQgetvalue(PGresult*, int row, int column);
352 
353 	size_t PQescapeString (char *to, const char *from, size_t length);
354 
355 	enum int CONNECTION_OK = 0;
356 	enum int PGRES_COMMAND_OK = 1;
357 	enum int PGRES_TUPLES_OK = 2;
358 
359 	int PQgetlength(const PGresult *res,
360 			int row_number,
361 			int column_number);
362 	int PQgetisnull(const PGresult *res,
363 			int row_number,
364 			int column_number);
365 
366 	int PQfformat(const PGresult *res, int column_number);
367 
368 	alias Oid = int;
369 	enum BYTEAOID = 17;
370 	Oid PQftype(const PGresult* res, int column_number);
371 
372 	char *PQescapeByteaConn(PGconn *conn,
373                                  const ubyte *from,
374                                  size_t from_length,
375                                  size_t *to_length);
376 	char *PQunescapeBytea(const char *from, size_t *to_length);
377 	void PQfreemem(void *ptr);
378 
379 	char* PQcmdTuples(PGresult *res);
380 
381 }
382 
383 /*
384 import std.stdio;
385 void main() {
386 	auto db = new PostgreSql("dbname = test");
387 
388 	db.query("INSERT INTO users (id, name) values (?, ?)", 30, "hello mang");
389 
390 	foreach(line; db.query("SELECT * FROM users")) {
391 		writeln(line[0], line["name"]);
392 	}
393 }
394 */