1 /++
2 	Uses libpq implement the [arsd.database.Database] interface.
3 
4 	Requires the official pq library from Postgres to be installed to build
5 	and to use. Note that on Windows, it is often distributed as `libpq.lib`.
6 	You will have to copy or rename that to `pq.lib` for dub or dmd to automatically
7 	find it. You will also likely need to add the lib search path yourself on
8 	both Windows and Linux systems (on my Linux box, it is `-L-L/usr/local/pgsql/lib`
9 	to dmd. You can also list things your app's dub.json's lflags too. Note on the
10 	Microsoft linker, the flag is called `/LIBPATH`.)
11 
12 	For example, for the default Postgres install on Windows, try:
13 
14 	```
15 		"lflags-windows": [ "/LIBPATH:C:/Program Files/PostgreSQL/<VERSION>/lib" ],
16 	```
17 
18 	In your dub.json.
19 
20 	When you distribute your application, the user will want to install libpq client on
21 	Linux, and on Windows, you may want to include the libpq.dll in your distribution.
22 	Note it may also depend on OpenSSL ssl and crypto dlls and libintl.dll as well. These
23 	should be found in the PostgreSQL lib and/or bin folders (check them both!).
24 +/
25 module arsd.postgres;
26 
27 version(Windows)
28 	pragma(lib, "libpq");
29 else
30 	pragma(lib, "pq");
31 
32 public import arsd.database;
33 
34 import std.string;
35 import std.exception;
36 
37 // remember to CREATE DATABASE name WITH ENCODING 'utf8'
38 //
39 // http://www.postgresql.org/docs/8.0/static/libpq-exec.html
40 // ExecParams, PQPrepare, PQExecPrepared
41 //
42 // SQL: `DEALLOCATE name` is how to dealloc a prepared statement.
43 
44 /++
45 	The PostgreSql implementation of the [Database] interface.
46 
47 	You should construct this class, but then use it through the
48 	interface functions.
49 
50 	---
51 	auto db = new PostgreSql("dbname=name");
52 	foreach(row; db.query("SELECT id, data FROM table_name"))
53 		writeln(row[0], " = ", row[1]);
54 	---
55 +/
56 class PostgreSql : Database {
57 	/// `dbname=your_database_name` is probably the most common connection string. See section "33.1.1.1. Keyword/Value Connection Strings" on https://www.postgresql.org/docs/10/libpq-connect.html
58 	this(string connectionString) {
59 		this.connectionString = connectionString;
60 		conn = PQconnectdb(toStringz(connectionString));
61 		if(conn is null)
62 			throw new DatabaseException("Unable to allocate PG connection object");
63 		if(PQstatus(conn) != CONNECTION_OK)
64 			throw new DatabaseException(error());
65 		query("SET NAMES 'utf8'"); // D does everything with utf8
66 	}
67 
68 	string connectionString;
69 
70 	~this() {
71 		PQfinish(conn);
72 	}
73 
74 	string sysTimeToValue(SysTime s) {
75 		return "'" ~ escape(s.toISOExtString()) ~ "'::timestamptz";
76 	}
77 
78 	/**
79 		Prepared statement support
80 
81 		This will be added to the Database interface eventually in some form,
82 		but first I need to implement it for all my providers.
83 
84 		The common function of those 4 will be what I put in the interface.
85 	*/
86 
87 	ResultSet executePreparedStatement(T...)(string name, T args) {
88 		char*[args.length] argsStrings;
89 
90 		foreach(idx, arg; args) {
91 			// FIXME: optimize to remove allocations here
92 			static if(!is(typeof(arg) == typeof(null)))
93 				argsStrings[idx] = toStringz(to!string(arg));
94 			// else make it null
95 		}
96 
97 		auto res = PQexecPrepared(conn, toStringz(name), argsStrings.length, argStrings.ptr, 0, null, 0);
98 
99 		int ress = PQresultStatus(res);
100 		if(ress != PGRES_TUPLES_OK
101 			&& ress != PGRES_COMMAND_OK)
102 			throw new DatabaseException(error());
103 
104 		return new PostgresResult(res);
105 
106 	}
107 
108 	///
109 	override void startTransaction() {
110 		query("START TRANSACTION");
111 	}
112 
113 	ResultSet queryImpl(string sql, Variant[] args...) {
114 		sql = escapedVariants(this, sql, args);
115 
116 		bool first_retry = true;
117 
118 		retry:
119 
120 		auto res = PQexec(conn, toStringz(sql));
121 		int ress = PQresultStatus(res);
122 		// https://www.postgresql.org/docs/current/libpq-exec.html
123 		// FIXME: PQresultErrorField can get a lot more info in a more structured way
124 		if(ress != PGRES_TUPLES_OK
125 			&& ress != PGRES_COMMAND_OK)
126 		{
127 			if(first_retry && error() == "no connection to the server\n") {
128 				first_retry = false;
129 				// try to reconnect...
130 				PQfinish(conn);
131 				conn = PQconnectdb(toStringz(connectionString));
132 				if(conn is null)
133 					throw new DatabaseException("Unable to allocate PG connection object");
134 				if(PQstatus(conn) != CONNECTION_OK)
135 					throw new DatabaseException(error());
136 				goto retry;
137 			}
138 			throw new DatabaseException(error());
139 		}
140 
141 		return new PostgresResult(res);
142 	}
143 
144 	string escape(string sqlData) {
145 		char* buffer = (new char[sqlData.length * 2 + 1]).ptr;
146 		ulong size = PQescapeString (buffer, sqlData.ptr, sqlData.length);
147 
148 		string ret = assumeUnique(buffer[0.. cast(size_t) size]);
149 
150 		return ret;
151 	}
152 
153 	string escapeBinaryString(const(ubyte)[] data) {
154 		// must include '\x ... ' here
155 		size_t len;
156 		char* buf = PQescapeByteaConn(conn, data.ptr, data.length, &len);
157 		if(buf is null)
158 			throw new Exception("pgsql out of memory escaping binary string");
159 
160 		string res;
161 		if(len == 0)
162 			res = "''";
163 		else
164 			res = cast(string) ("'" ~ buf[0 .. len - 1] ~ "'"); // gotta cut the zero terminator off
165 
166 		PQfreemem(buf);
167 
168 		return res;
169 	}
170 
171 
172 	///
173 	string error() {
174 		return copyCString(PQerrorMessage(conn));
175 	}
176 
177 	private:
178 		PGconn* conn;
179 }
180 
181 ///
182 class PostgresResult : ResultSet {
183 	// name for associative array to result index
184 	int getFieldIndex(string field) {
185 		if(mapping is null)
186 			makeFieldMapping();
187 		field = field.toLower;
188 		if(field in mapping)
189 			return mapping[field];
190 		else throw new Exception("no mapping " ~ field);
191 	}
192 
193 
194 	string[] fieldNames() {
195 		if(mapping is null)
196 			makeFieldMapping();
197 		return columnNames;
198 	}
199 
200 	// this is a range that can offer other ranges to access it
201 	bool empty() {
202 		return position == numRows;
203 	}
204 
205 	Row front() {
206 		return row;
207 	}
208 
209 	int affectedRows() {
210 		auto g = PQcmdTuples(res);
211 		if(g is null)
212 			return 0;
213 		int num;
214 		while(*g) {
215 			num *= 10;
216 			num += *g - '0';
217 			g++;
218 		}
219 		return num;
220 	}
221 
222 	void popFront() {
223 		position++;
224 		if(position < numRows)
225 			fetchNext();
226 	}
227 
228 	override size_t length() {
229 		return numRows;
230 	}
231 
232 	this(PGresult* res) {
233 		this.res = res;
234 		numFields = PQnfields(res);
235 		numRows = PQntuples(res);
236 
237 		if(numRows)
238 			fetchNext();
239 	}
240 
241 	~this() {
242 		PQclear(res);
243 	}
244 
245 	private:
246 		PGresult* res;
247 		int[string] mapping;
248 		string[] columnNames;
249 		int numFields;
250 
251 		int position;
252 
253 		int numRows;
254 
255 		Row row;
256 
257 		void fetchNext() {
258 			Row r;
259 			r.resultSet = this;
260 			DatabaseDatum[] row;
261 
262 			for(int i = 0; i < numFields; i++) {
263 				string a;
264 
265 				if(PQgetisnull(res, position, i))
266 					a = null;
267 				else {
268 					switch(PQfformat(res, i)) {
269 						case 0: // text representation
270 							switch(PQftype(res, i)) {
271 								case BYTEAOID:
272 									size_t len;
273 									char* c = PQunescapeBytea(PQgetvalue(res, position, i), &len);
274 
275 									a = cast(string) c[0 .. len].idup;
276 
277 									PQfreemem(c);
278 								break;
279 								default:
280 									a = copyCString(PQgetvalue(res, position, i), PQgetlength(res, position, i));
281 							}
282 						break;
283 						case 1: // binary representation
284 							throw new Exception("unexpected format returned by pq");
285 						default:
286 							throw new Exception("unknown pq format");
287 					}
288 
289 				}
290 				row ~= DatabaseDatum(a);
291 			}
292 
293 			r.row = row;
294 			this.row = r;
295 		}
296 
297 		void makeFieldMapping() {
298 			for(int i = 0; i < numFields; i++) {
299 				string a = copyCString(PQfname(res, i));
300 
301 				columnNames ~= a;
302 				mapping[a] = i;
303 			}
304 
305 		}
306 }
307 
308 string copyCString(const char* c, int actualLength = -1) {
309 	const(char)* a = c;
310 	if(a is null)
311 		return null;
312 
313 	string ret;
314 	if(actualLength == -1)
315 		while(*a) {
316 			ret ~= *a;
317 			a++;
318 		}
319 	else {
320 		ret = a[0..actualLength].idup;
321 	}
322 
323 	return ret;
324 }
325 
326 extern(C) {
327 	struct PGconn {};
328 	struct PGresult {};
329 
330 	void PQfinish(PGconn*);
331 	PGconn* PQconnectdb(const char*);
332 
333 	int PQstatus(PGconn*); // FIXME check return value
334 
335 	const (char*) PQerrorMessage(PGconn*);
336 
337 	PGresult* PQexec(PGconn*, const char*);
338 	void PQclear(PGresult*);
339 
340 	PGresult* PQprepare(PGconn*, const char* stmtName, const char* query, int nParams, const void* paramTypes);
341 
342 	PGresult* PQexecPrepared(PGconn*, const char* stmtName, int nParams, const char** paramValues, const int* paramLengths, const int* paramFormats, int resultFormat);
343 
344 	int PQresultStatus(PGresult*); // FIXME check return value
345 
346 	int PQnfields(PGresult*); // number of fields in a result
347 	const(char*) PQfname(PGresult*, int); // name of field
348 
349 	int PQntuples(PGresult*); // number of rows in result
350 	const(char*) PQgetvalue(PGresult*, int row, int column);
351 
352 	size_t PQescapeString (char *to, const char *from, size_t length);
353 
354 	enum int CONNECTION_OK = 0;
355 	enum int PGRES_COMMAND_OK = 1;
356 	enum int PGRES_TUPLES_OK = 2;
357 
358 	int PQgetlength(const PGresult *res,
359 			int row_number,
360 			int column_number);
361 	int PQgetisnull(const PGresult *res,
362 			int row_number,
363 			int column_number);
364 
365 	int PQfformat(const PGresult *res, int column_number);
366 
367 	alias Oid = int;
368 	enum BYTEAOID = 17;
369 	Oid PQftype(const PGresult* res, int column_number);
370 
371 	char *PQescapeByteaConn(PGconn *conn,
372                                  const ubyte *from,
373                                  size_t from_length,
374                                  size_t *to_length);
375 	char *PQunescapeBytea(const char *from, size_t *to_length);
376 	void PQfreemem(void *ptr);
377 
378 	char* PQcmdTuples(PGresult *res);
379 
380 }
381 
382 /*
383 import std.stdio;
384 void main() {
385 	auto db = new PostgreSql("dbname = test");
386 
387 	db.query("INSERT INTO users (id, name) values (?, ?)", 30, "hello mang");
388 
389 	foreach(line; db.query("SELECT * FROM users")) {
390 		writeln(line[0], line["name"]);
391 	}
392 }
393 */