1 /++
2 	Uses libpq implement the [arsd.database.Database] interface.
3 
4 	Requires the official pq library from Postgres to be installed to build
5 	and to use. Note that on Windows, it is often distributed as `libpq.lib`.
6 	You will have to copy or rename that to `pq.lib` for dub or dmd to automatically
7 	find it. You will also likely need to add the lib search path yourself on
8 	both Windows and Linux systems (on my Linux box, it is `-L-L/usr/local/pgsql/lib`
9 	to dmd. You can also list things your app's dub.json's lflags too. Note on the
10 	Microsoft linker, the flag is called `/LIBPATH`.)
11 
12 	For example, for the default Postgres install on Windows, try:
13 
14 	```
15 		"lflags-windows": [ "/LIBPATH:C:/Program Files/PostgreSQL/<VERSION>/lib" ],
16 	```
17 
18 	In your dub.json.
19 
20 	When you distribute your application, the user will want to install libpq client on
21 	Linux, and on Windows, you may want to include the libpq.dll in your distribution.
22 	Note it may also depend on OpenSSL ssl and crypto dlls and libintl.dll as well. These
23 	should be found in the PostgreSQL lib and/or bin folders (check them both!).
24 +/
25 module arsd.postgres;
26 pragma(lib, "pq");
27 
28 public import arsd.database;
29 
30 import std.string;
31 import std.exception;
32 
33 // remember to CREATE DATABASE name WITH ENCODING 'utf8'
34 //
35 // http://www.postgresql.org/docs/8.0/static/libpq-exec.html
36 // ExecParams, PQPrepare, PQExecPrepared
37 //
38 // SQL: `DEALLOCATE name` is how to dealloc a prepared statement.
39 
40 /++
41 	The PostgreSql implementation of the [Database] interface.
42 
43 	You should construct this class, but then use it through the
44 	interface functions.
45 
46 	---
47 	auto db = new PostgreSql("dbname=name");
48 	foreach(row; db.query("SELECT id, data FROM table_name"))
49 		writeln(row[0], " = ", row[1]);
50 	---
51 +/
52 class PostgreSql : Database {
53 	/// `dbname=your_database_name` is probably the most common connection string. See section "33.1.1.1. Keyword/Value Connection Strings" on https://www.postgresql.org/docs/10/libpq-connect.html
54 	this(string connectionString) {
55 		this.connectionString = connectionString;
56 		conn = PQconnectdb(toStringz(connectionString));
57 		if(conn is null)
58 			throw new DatabaseException("Unable to allocate PG connection object");
59 		if(PQstatus(conn) != CONNECTION_OK)
60 			throw new DatabaseException(error());
61 		query("SET NAMES 'utf8'"); // D does everything with utf8
62 	}
63 
64 	string connectionString;
65 
66 	~this() {
67 		PQfinish(conn);
68 	}
69 
70 	string sysTimeToValue(SysTime s) {
71 		return "'" ~ escape(s.toISOExtString()) ~ "'::timestamptz";
72 	}
73 
74 	/**
75 		Prepared statement support
76 
77 		This will be added to the Database interface eventually in some form,
78 		but first I need to implement it for all my providers.
79 
80 		The common function of those 4 will be what I put in the interface.
81 	*/
82 
83 	ResultSet executePreparedStatement(T...)(string name, T args) {
84 		char*[args.length] argsStrings;
85 
86 		foreach(idx, arg; args) {
87 			// FIXME: optimize to remove allocations here
88 			static if(!is(typeof(arg) == typeof(null)))
89 				argsStrings[idx] = toStringz(to!string(arg));
90 			// else make it null
91 		}
92 
93 		auto res = PQexecPrepared(conn, toStringz(name), argsStrings.length, argStrings.ptr, 0, null, 0);
94 
95 		int ress = PQresultStatus(res);
96 		if(ress != PGRES_TUPLES_OK
97 			&& ress != PGRES_COMMAND_OK)
98 			throw new DatabaseException(error());
99 
100 		return new PostgresResult(res);
101 
102 	}
103 
104 	///
105 	override void startTransaction() {
106 		query("START TRANSACTION");
107 	}
108 
109 	ResultSet queryImpl(string sql, Variant[] args...) {
110 		sql = escapedVariants(this, sql, args);
111 
112 		bool first_retry = true;
113 
114 		retry:
115 
116 		auto res = PQexec(conn, toStringz(sql));
117 		int ress = PQresultStatus(res);
118 		// https://www.postgresql.org/docs/current/libpq-exec.html
119 		// FIXME: PQresultErrorField can get a lot more info in a more structured way
120 		if(ress != PGRES_TUPLES_OK
121 			&& ress != PGRES_COMMAND_OK)
122 		{
123 			if(first_retry && error() == "no connection to the server\n") {
124 				first_retry = false;
125 				// try to reconnect...
126 				PQfinish(conn);
127 				conn = PQconnectdb(toStringz(connectionString));
128 				if(conn is null)
129 					throw new DatabaseException("Unable to allocate PG connection object");
130 				if(PQstatus(conn) != CONNECTION_OK)
131 					throw new DatabaseException(error());
132 				goto retry;
133 			}
134 			throw new DatabaseException(error());
135 		}
136 
137 		return new PostgresResult(res);
138 	}
139 
140 	string escape(string sqlData) {
141 		char* buffer = (new char[sqlData.length * 2 + 1]).ptr;
142 		ulong size = PQescapeString (buffer, sqlData.ptr, sqlData.length);
143 
144 		string ret = assumeUnique(buffer[0.. cast(size_t) size]);
145 
146 		return ret;
147 	}
148 
149 
150 	///
151 	string error() {
152 		return copyCString(PQerrorMessage(conn));
153 	}
154 
155 	private:
156 		PGconn* conn;
157 }
158 
159 ///
160 class PostgresResult : ResultSet {
161 	// name for associative array to result index
162 	int getFieldIndex(string field) {
163 		if(mapping is null)
164 			makeFieldMapping();
165 		field = field.toLower;
166 		if(field in mapping)
167 			return mapping[field];
168 		else throw new Exception("no mapping " ~ field);
169 	}
170 
171 
172 	string[] fieldNames() {
173 		if(mapping is null)
174 			makeFieldMapping();
175 		return columnNames;
176 	}
177 
178 	// this is a range that can offer other ranges to access it
179 	bool empty() {
180 		return position == numRows;
181 	}
182 
183 	Row front() {
184 		return row;
185 	}
186 
187 	void popFront() {
188 		position++;
189 		if(position < numRows)
190 			fetchNext();
191 	}
192 
193 	override size_t length() {
194 		return numRows;
195 	}
196 
197 	this(PGresult* res) {
198 		this.res = res;
199 		numFields = PQnfields(res);
200 		numRows = PQntuples(res);
201 
202 		if(numRows)
203 			fetchNext();
204 	}
205 
206 	~this() {
207 		PQclear(res);
208 	}
209 
210 	private:
211 		PGresult* res;
212 		int[string] mapping;
213 		string[] columnNames;
214 		int numFields;
215 
216 		int position;
217 
218 		int numRows;
219 
220 		Row row;
221 
222 		void fetchNext() {
223 			Row r;
224 			r.resultSet = this;
225 			string[] row;
226 
227 			for(int i = 0; i < numFields; i++) {
228 				string a;
229 
230 				if(PQgetisnull(res, position, i))
231 					a = null;
232 				else {
233 					a = copyCString(PQgetvalue(res, position, i), PQgetlength(res, position, i));
234 
235 				}
236 				row ~= a;
237 			}
238 
239 			r.row = row;
240 			this.row = r;
241 		}
242 
243 		void makeFieldMapping() {
244 			for(int i = 0; i < numFields; i++) {
245 				string a = copyCString(PQfname(res, i));
246 
247 				columnNames ~= a;
248 				mapping[a] = i;
249 			}
250 
251 		}
252 }
253 
254 string copyCString(const char* c, int actualLength = -1) {
255 	const(char)* a = c;
256 	if(a is null)
257 		return null;
258 
259 	string ret;
260 	if(actualLength == -1)
261 		while(*a) {
262 			ret ~= *a;
263 			a++;
264 		}
265 	else {
266 		ret = a[0..actualLength].idup;
267 	}
268 
269 	return ret;
270 }
271 
272 extern(C) {
273 	struct PGconn {};
274 	struct PGresult {};
275 
276 	void PQfinish(PGconn*);
277 	PGconn* PQconnectdb(const char*);
278 
279 	int PQstatus(PGconn*); // FIXME check return value
280 
281 	const (char*) PQerrorMessage(PGconn*);
282 
283 	PGresult* PQexec(PGconn*, const char*);
284 	void PQclear(PGresult*);
285 
286 	PGresult* PQprepare(PGconn*, const char* stmtName, const char* query, int nParams, const void* paramTypes);
287 
288 	PGresult* PQexecPrepared(PGconn*, const char* stmtName, int nParams, const char** paramValues, const int* paramLengths, const int* paramFormats, int resultFormat);
289 
290 	int PQresultStatus(PGresult*); // FIXME check return value
291 
292 	int PQnfields(PGresult*); // number of fields in a result
293 	const(char*) PQfname(PGresult*, int); // name of field
294 
295 	int PQntuples(PGresult*); // number of rows in result
296 	const(char*) PQgetvalue(PGresult*, int row, int column);
297 
298 	size_t PQescapeString (char *to, const char *from, size_t length);
299 
300 	enum int CONNECTION_OK = 0;
301 	enum int PGRES_COMMAND_OK = 1;
302 	enum int PGRES_TUPLES_OK = 2;
303 
304 	int PQgetlength(const PGresult *res,
305 			int row_number,
306 			int column_number);
307 	int PQgetisnull(const PGresult *res,
308 			int row_number,
309 			int column_number);
310 
311 
312 }
313 
314 /*
315 import std.stdio;
316 void main() {
317 	auto db = new PostgreSql("dbname = test");
318 
319 	db.query("INSERT INTO users (id, name) values (?, ?)", 30, "hello mang");
320 
321 	foreach(line; db.query("SELECT * FROM users")) {
322 		writeln(line[0], line["name"]);
323 	}
324 }
325 */