1 /++
2 	Uses libpq implement the [arsd.database.Database] interface.
3 
4 	Requires the official pq library from Postgres to be installed to build
5 	and to use. Note that on Windows, it is often distributed as `libpq.lib`.
6 	You will have to copy or rename that to `pq.lib` for dub or dmd to automatically
7 	find it. You will also likely need to add the lib search path yourself on
8 	both Windows and Linux systems (on my Linux box, it is `-L-L/usr/local/pgsql/lib`
9 	to dmd. You can also list things your app's dub.json's lflags too. Note on the
10 	Microsoft linker, the flag is called `/LIBPATH`.)
11 
12 	For example, for the default Postgres install on Windows, try:
13 
14 	```
15 		"lflags-windows": [ "/LIBPATH:C:/Program Files/PostgreSQL/<VERSION>/lib" ],
16 	```
17 
18 	In your dub.json.
19 
20 	When you distribute your application, the user will want to install libpq client on
21 	Linux, and on Windows, you may want to include the libpq.dll in your distribution.
22 	Note it may also depend on OpenSSL ssl and crypto dlls and libintl.dll as well. These
23 	should be found in the PostgreSQL lib and/or bin folders (check them both!).
24 +/
25 module arsd.postgres;
26 pragma(lib, "pq");
27 
28 public import arsd.database;
29 
30 import std.string;
31 import std.exception;
32 
33 // remember to CREATE DATABASE name WITH ENCODING 'utf8'
34 //
35 // http://www.postgresql.org/docs/8.0/static/libpq-exec.html
36 // ExecParams, PQPrepare, PQExecPrepared
37 //
38 // SQL: `DEALLOCATE name` is how to dealloc a prepared statement.
39 
40 /++
41 	The PostgreSql implementation of the [Database] interface.
42 
43 	You should construct this class, but then use it through the
44 	interface functions.
45 
46 	---
47 	auto db = new PostgreSql("dbname=name");
48 	foreach(row; db.query("SELECT id, data FROM table_name"))
49 		writeln(row[0], " = ", row[1]);
50 	---
51 +/
52 class PostgreSql : Database {
53 	/// `dbname=your_database_name` is probably the most common connection string. See section "33.1.1.1. Keyword/Value Connection Strings" on https://www.postgresql.org/docs/10/libpq-connect.html
54 	this(string connectionString) {
55 		this.connectionString = connectionString;
56 		conn = PQconnectdb(toStringz(connectionString));
57 		if(conn is null)
58 			throw new DatabaseException("Unable to allocate PG connection object");
59 		if(PQstatus(conn) != CONNECTION_OK)
60 			throw new DatabaseException(error());
61 		query("SET NAMES 'utf8'"); // D does everything with utf8
62 	}
63 
64 	string connectionString;
65 
66 	~this() {
67 		PQfinish(conn);
68 	}
69 
70 	string sysTimeToValue(SysTime s) {
71 		return "'" ~ escape(s.toISOExtString()) ~ "'::timestamptz";
72 	}
73 
74 	/**
75 		Prepared statement support
76 
77 		This will be added to the Database interface eventually in some form,
78 		but first I need to implement it for all my providers.
79 
80 		The common function of those 4 will be what I put in the interface.
81 	*/
82 
83 	ResultSet executePreparedStatement(T...)(string name, T args) {
84 		char*[args.length] argsStrings;
85 
86 		foreach(idx, arg; args) {
87 			// FIXME: optimize to remove allocations here
88 			static if(!is(typeof(arg) == typeof(null)))
89 				argsStrings[idx] = toStringz(to!string(arg));
90 			// else make it null
91 		}
92 
93 		auto res = PQexecPrepared(conn, toStringz(name), argsStrings.length, argStrings.ptr, 0, null, 0);
94 
95 		int ress = PQresultStatus(res);
96 		if(ress != PGRES_TUPLES_OK
97 			&& ress != PGRES_COMMAND_OK)
98 			throw new DatabaseException(error());
99 
100 		return new PostgresResult(res);
101 
102 	}
103 
104 	///
105 	override void startTransaction() {
106 		query("START TRANSACTION");
107 	}
108 
109 	ResultSet queryImpl(string sql, Variant[] args...) {
110 		sql = escapedVariants(this, sql, args);
111 
112 		bool first_retry = true;
113 
114 		retry:
115 
116 		auto res = PQexec(conn, toStringz(sql));
117 		int ress = PQresultStatus(res);
118 		// https://www.postgresql.org/docs/current/libpq-exec.html
119 		// FIXME: PQresultErrorField can get a lot more info in a more structured way
120 		if(ress != PGRES_TUPLES_OK
121 			&& ress != PGRES_COMMAND_OK)
122 		{
123 			if(first_retry && error() == "no connection to the server\n") {
124 				first_retry = false;
125 				// try to reconnect...
126 				PQfinish(conn);
127 				conn = PQconnectdb(toStringz(connectionString));
128 				if(conn is null)
129 					throw new DatabaseException("Unable to allocate PG connection object");
130 				if(PQstatus(conn) != CONNECTION_OK)
131 					throw new DatabaseException(error());
132 				goto retry;
133 			}
134 			throw new DatabaseException(error());
135 		}
136 
137 		return new PostgresResult(res);
138 	}
139 
140 	string escape(string sqlData) {
141 		char* buffer = (new char[sqlData.length * 2 + 1]).ptr;
142 		ulong size = PQescapeString (buffer, sqlData.ptr, sqlData.length);
143 
144 		string ret = assumeUnique(buffer[0.. cast(size_t) size]);
145 
146 		return ret;
147 	}
148 
149 	string escapeBinaryString(const(ubyte)[] data) {
150 		// must include '\x ... ' here
151 		size_t len;
152 		char* buf = PQescapeByteaConn(conn, data.ptr, data.length, &len);
153 		if(buf is null)
154 			throw new Exception("pgsql out of memory escaping binary string");
155 
156 		string res;
157 		if(len == 0)
158 			res = "''";
159 		else
160 			res = cast(string) ("'" ~ buf[0 .. len - 1] ~ "'"); // gotta cut the zero terminator off
161 
162 		PQfreemem(buf);
163 
164 		return res;
165 	}
166 
167 
168 	///
169 	string error() {
170 		return copyCString(PQerrorMessage(conn));
171 	}
172 
173 	private:
174 		PGconn* conn;
175 }
176 
177 ///
178 class PostgresResult : ResultSet {
179 	// name for associative array to result index
180 	int getFieldIndex(string field) {
181 		if(mapping is null)
182 			makeFieldMapping();
183 		field = field.toLower;
184 		if(field in mapping)
185 			return mapping[field];
186 		else throw new Exception("no mapping " ~ field);
187 	}
188 
189 
190 	string[] fieldNames() {
191 		if(mapping is null)
192 			makeFieldMapping();
193 		return columnNames;
194 	}
195 
196 	// this is a range that can offer other ranges to access it
197 	bool empty() {
198 		return position == numRows;
199 	}
200 
201 	Row front() {
202 		return row;
203 	}
204 
205 	int affectedRows() {
206 		auto g = PQcmdTuples(res);
207 		if(g is null)
208 			return 0;
209 		int num;
210 		while(*g) {
211 			num *= 10;
212 			num += *g - '0';
213 			g++;
214 		}
215 		return num;
216 	}
217 
218 	void popFront() {
219 		position++;
220 		if(position < numRows)
221 			fetchNext();
222 	}
223 
224 	override size_t length() {
225 		return numRows;
226 	}
227 
228 	this(PGresult* res) {
229 		this.res = res;
230 		numFields = PQnfields(res);
231 		numRows = PQntuples(res);
232 
233 		if(numRows)
234 			fetchNext();
235 	}
236 
237 	~this() {
238 		PQclear(res);
239 	}
240 
241 	private:
242 		PGresult* res;
243 		int[string] mapping;
244 		string[] columnNames;
245 		int numFields;
246 
247 		int position;
248 
249 		int numRows;
250 
251 		Row row;
252 
253 		void fetchNext() {
254 			Row r;
255 			r.resultSet = this;
256 			string[] row;
257 
258 			for(int i = 0; i < numFields; i++) {
259 				string a;
260 
261 				if(PQgetisnull(res, position, i))
262 					a = null;
263 				else {
264 					switch(PQfformat(res, i)) {
265 						case 0: // text representation
266 							switch(PQftype(res, i)) {
267 								case BYTEAOID:
268 									size_t len;
269 									char* c = PQunescapeBytea(PQgetvalue(res, position, i), &len);
270 
271 									a = cast(string) c[0 .. len].idup;
272 
273 									PQfreemem(c);
274 								break;
275 								default:
276 									a = copyCString(PQgetvalue(res, position, i), PQgetlength(res, position, i));
277 							}
278 						break;
279 						case 1: // binary representation
280 							throw new Exception("unexpected format returned by pq");
281 						default:
282 							throw new Exception("unknown pq format");
283 					}
284 
285 				}
286 				row ~= a;
287 			}
288 
289 			r.row = row;
290 			this.row = r;
291 		}
292 
293 		void makeFieldMapping() {
294 			for(int i = 0; i < numFields; i++) {
295 				string a = copyCString(PQfname(res, i));
296 
297 				columnNames ~= a;
298 				mapping[a] = i;
299 			}
300 
301 		}
302 }
303 
304 string copyCString(const char* c, int actualLength = -1) {
305 	const(char)* a = c;
306 	if(a is null)
307 		return null;
308 
309 	string ret;
310 	if(actualLength == -1)
311 		while(*a) {
312 			ret ~= *a;
313 			a++;
314 		}
315 	else {
316 		ret = a[0..actualLength].idup;
317 	}
318 
319 	return ret;
320 }
321 
322 extern(C) {
323 	struct PGconn {};
324 	struct PGresult {};
325 
326 	void PQfinish(PGconn*);
327 	PGconn* PQconnectdb(const char*);
328 
329 	int PQstatus(PGconn*); // FIXME check return value
330 
331 	const (char*) PQerrorMessage(PGconn*);
332 
333 	PGresult* PQexec(PGconn*, const char*);
334 	void PQclear(PGresult*);
335 
336 	PGresult* PQprepare(PGconn*, const char* stmtName, const char* query, int nParams, const void* paramTypes);
337 
338 	PGresult* PQexecPrepared(PGconn*, const char* stmtName, int nParams, const char** paramValues, const int* paramLengths, const int* paramFormats, int resultFormat);
339 
340 	int PQresultStatus(PGresult*); // FIXME check return value
341 
342 	int PQnfields(PGresult*); // number of fields in a result
343 	const(char*) PQfname(PGresult*, int); // name of field
344 
345 	int PQntuples(PGresult*); // number of rows in result
346 	const(char*) PQgetvalue(PGresult*, int row, int column);
347 
348 	size_t PQescapeString (char *to, const char *from, size_t length);
349 
350 	enum int CONNECTION_OK = 0;
351 	enum int PGRES_COMMAND_OK = 1;
352 	enum int PGRES_TUPLES_OK = 2;
353 
354 	int PQgetlength(const PGresult *res,
355 			int row_number,
356 			int column_number);
357 	int PQgetisnull(const PGresult *res,
358 			int row_number,
359 			int column_number);
360 
361 	int PQfformat(const PGresult *res, int column_number);
362 
363 	alias Oid = int;
364 	enum BYTEAOID = 17;
365 	Oid PQftype(const PGresult* res, int column_number);
366 
367 	char *PQescapeByteaConn(PGconn *conn,
368                                  const ubyte *from,
369                                  size_t from_length,
370                                  size_t *to_length);
371 	char *PQunescapeBytea(const char *from, size_t *to_length);
372 	void PQfreemem(void *ptr);
373 
374 	char* PQcmdTuples(PGresult *res);
375 
376 }
377 
378 /*
379 import std.stdio;
380 void main() {
381 	auto db = new PostgreSql("dbname = test");
382 
383 	db.query("INSERT INTO users (id, name) values (?, ?)", 30, "hello mang");
384 
385 	foreach(line; db.query("SELECT * FROM users")) {
386 		writeln(line[0], line["name"]);
387 	}
388 }
389 */