1 /++ 2 Uses libpq implement the [arsd.database.Database] interface. 3 4 Requires the official pq library from Postgres to be installed to build 5 and to use. Note that on Windows, it is often distributed as `libpq.lib`. 6 You will have to copy or rename that to `pq.lib` for dub or dmd to automatically 7 find it. You will also likely need to add the lib search path yourself on 8 both Windows and Linux systems (on my Linux box, it is `-L-L/usr/local/pgsql/lib` 9 to dmd. You can also list things your app's dub.json's lflags too. Note on the 10 Microsoft linker, the flag is called `/LIBPATH`.) 11 12 For example, for the default Postgres install on Windows, try: 13 14 ``` 15 "lflags-windows": [ "/LIBPATH:C:/Program Files/PostgreSQL/<VERSION>/lib" ], 16 ``` 17 18 In your dub.json. 19 20 When you distribute your application, the user will want to install libpq client on 21 Linux, and on Windows, you may want to include the libpq.dll in your distribution. 22 Note it may also depend on OpenSSL ssl and crypto dlls and libintl.dll as well. These 23 should be found in the PostgreSQL lib and/or bin folders (check them both!). 24 +/ 25 module arsd.postgres; 26 pragma(lib, "pq"); 27 28 public import arsd.database; 29 30 import std.string; 31 import std.exception; 32 33 // remember to CREATE DATABASE name WITH ENCODING 'utf8' 34 // 35 // http://www.postgresql.org/docs/8.0/static/libpq-exec.html 36 // ExecParams, PQPrepare, PQExecPrepared 37 // 38 // SQL: `DEALLOCATE name` is how to dealloc a prepared statement. 39 40 /++ 41 The PostgreSql implementation of the [Database] interface. 42 43 You should construct this class, but then use it through the 44 interface functions. 45 46 --- 47 auto db = new PostgreSql("dbname=name"); 48 foreach(row; db.query("SELECT id, data FROM table_name")) 49 writeln(row[0], " = ", row[1]); 50 --- 51 +/ 52 class PostgreSql : Database { 53 /// `dbname=your_database_name` is probably the most common connection string. See section "33.1.1.1. Keyword/Value Connection Strings" on https://www.postgresql.org/docs/10/libpq-connect.html 54 this(string connectionString) { 55 this.connectionString = connectionString; 56 conn = PQconnectdb(toStringz(connectionString)); 57 if(conn is null) 58 throw new DatabaseException("Unable to allocate PG connection object"); 59 if(PQstatus(conn) != CONNECTION_OK) 60 throw new DatabaseException(error()); 61 query("SET NAMES 'utf8'"); // D does everything with utf8 62 } 63 64 string connectionString; 65 66 ~this() { 67 PQfinish(conn); 68 } 69 70 string sysTimeToValue(SysTime s) { 71 return "'" ~ escape(s.toISOExtString()) ~ "'::timestamptz"; 72 } 73 74 /** 75 Prepared statement support 76 77 This will be added to the Database interface eventually in some form, 78 but first I need to implement it for all my providers. 79 80 The common function of those 4 will be what I put in the interface. 81 */ 82 83 ResultSet executePreparedStatement(T...)(string name, T args) { 84 char*[args.length] argsStrings; 85 86 foreach(idx, arg; args) { 87 // FIXME: optimize to remove allocations here 88 static if(!is(typeof(arg) == typeof(null))) 89 argsStrings[idx] = toStringz(to!string(arg)); 90 // else make it null 91 } 92 93 auto res = PQexecPrepared(conn, toStringz(name), argsStrings.length, argStrings.ptr, 0, null, 0); 94 95 int ress = PQresultStatus(res); 96 if(ress != PGRES_TUPLES_OK 97 && ress != PGRES_COMMAND_OK) 98 throw new DatabaseException(error()); 99 100 return new PostgresResult(res); 101 102 } 103 104 /// 105 override void startTransaction() { 106 query("START TRANSACTION"); 107 } 108 109 ResultSet queryImpl(string sql, Variant[] args...) { 110 sql = escapedVariants(this, sql, args); 111 112 bool first_retry = true; 113 114 retry: 115 116 auto res = PQexec(conn, toStringz(sql)); 117 int ress = PQresultStatus(res); 118 // https://www.postgresql.org/docs/current/libpq-exec.html 119 // FIXME: PQresultErrorField can get a lot more info in a more structured way 120 if(ress != PGRES_TUPLES_OK 121 && ress != PGRES_COMMAND_OK) 122 { 123 if(first_retry && error() == "no connection to the server\n") { 124 first_retry = false; 125 // try to reconnect... 126 PQfinish(conn); 127 conn = PQconnectdb(toStringz(connectionString)); 128 if(conn is null) 129 throw new DatabaseException("Unable to allocate PG connection object"); 130 if(PQstatus(conn) != CONNECTION_OK) 131 throw new DatabaseException(error()); 132 goto retry; 133 } 134 throw new DatabaseException(error()); 135 } 136 137 return new PostgresResult(res); 138 } 139 140 string escape(string sqlData) { 141 char* buffer = (new char[sqlData.length * 2 + 1]).ptr; 142 ulong size = PQescapeString (buffer, sqlData.ptr, sqlData.length); 143 144 string ret = assumeUnique(buffer[0.. cast(size_t) size]); 145 146 return ret; 147 } 148 149 string escapeBinaryString(const(ubyte)[] data) { 150 // must include '\x ... ' here 151 size_t len; 152 char* buf = PQescapeByteaConn(conn, data.ptr, data.length, &len); 153 if(buf is null) 154 throw new Exception("pgsql out of memory escaping binary string"); 155 156 string res; 157 if(len == 0) 158 res = "''"; 159 else 160 res = cast(string) ("'" ~ buf[0 .. len - 1] ~ "'"); // gotta cut the zero terminator off 161 162 PQfreemem(buf); 163 164 return res; 165 } 166 167 168 /// 169 string error() { 170 return copyCString(PQerrorMessage(conn)); 171 } 172 173 private: 174 PGconn* conn; 175 } 176 177 /// 178 class PostgresResult : ResultSet { 179 // name for associative array to result index 180 int getFieldIndex(string field) { 181 if(mapping is null) 182 makeFieldMapping(); 183 field = field.toLower; 184 if(field in mapping) 185 return mapping[field]; 186 else throw new Exception("no mapping " ~ field); 187 } 188 189 190 string[] fieldNames() { 191 if(mapping is null) 192 makeFieldMapping(); 193 return columnNames; 194 } 195 196 // this is a range that can offer other ranges to access it 197 bool empty() { 198 return position == numRows; 199 } 200 201 Row front() { 202 return row; 203 } 204 205 int affectedRows() { 206 auto g = PQcmdTuples(res); 207 if(g is null) 208 return 0; 209 int num; 210 while(*g) { 211 num *= 10; 212 num += *g - '0'; 213 g++; 214 } 215 return num; 216 } 217 218 void popFront() { 219 position++; 220 if(position < numRows) 221 fetchNext(); 222 } 223 224 override size_t length() { 225 return numRows; 226 } 227 228 this(PGresult* res) { 229 this.res = res; 230 numFields = PQnfields(res); 231 numRows = PQntuples(res); 232 233 if(numRows) 234 fetchNext(); 235 } 236 237 ~this() { 238 PQclear(res); 239 } 240 241 private: 242 PGresult* res; 243 int[string] mapping; 244 string[] columnNames; 245 int numFields; 246 247 int position; 248 249 int numRows; 250 251 Row row; 252 253 void fetchNext() { 254 Row r; 255 r.resultSet = this; 256 string[] row; 257 258 for(int i = 0; i < numFields; i++) { 259 string a; 260 261 if(PQgetisnull(res, position, i)) 262 a = null; 263 else { 264 switch(PQfformat(res, i)) { 265 case 0: // text representation 266 switch(PQftype(res, i)) { 267 case BYTEAOID: 268 size_t len; 269 char* c = PQunescapeBytea(PQgetvalue(res, position, i), &len); 270 271 a = cast(string) c[0 .. len].idup; 272 273 PQfreemem(c); 274 break; 275 default: 276 a = copyCString(PQgetvalue(res, position, i), PQgetlength(res, position, i)); 277 } 278 break; 279 case 1: // binary representation 280 throw new Exception("unexpected format returned by pq"); 281 default: 282 throw new Exception("unknown pq format"); 283 } 284 285 } 286 row ~= a; 287 } 288 289 r.row = row; 290 this.row = r; 291 } 292 293 void makeFieldMapping() { 294 for(int i = 0; i < numFields; i++) { 295 string a = copyCString(PQfname(res, i)); 296 297 columnNames ~= a; 298 mapping[a] = i; 299 } 300 301 } 302 } 303 304 string copyCString(const char* c, int actualLength = -1) { 305 const(char)* a = c; 306 if(a is null) 307 return null; 308 309 string ret; 310 if(actualLength == -1) 311 while(*a) { 312 ret ~= *a; 313 a++; 314 } 315 else { 316 ret = a[0..actualLength].idup; 317 } 318 319 return ret; 320 } 321 322 extern(C) { 323 struct PGconn {}; 324 struct PGresult {}; 325 326 void PQfinish(PGconn*); 327 PGconn* PQconnectdb(const char*); 328 329 int PQstatus(PGconn*); // FIXME check return value 330 331 const (char*) PQerrorMessage(PGconn*); 332 333 PGresult* PQexec(PGconn*, const char*); 334 void PQclear(PGresult*); 335 336 PGresult* PQprepare(PGconn*, const char* stmtName, const char* query, int nParams, const void* paramTypes); 337 338 PGresult* PQexecPrepared(PGconn*, const char* stmtName, int nParams, const char** paramValues, const int* paramLengths, const int* paramFormats, int resultFormat); 339 340 int PQresultStatus(PGresult*); // FIXME check return value 341 342 int PQnfields(PGresult*); // number of fields in a result 343 const(char*) PQfname(PGresult*, int); // name of field 344 345 int PQntuples(PGresult*); // number of rows in result 346 const(char*) PQgetvalue(PGresult*, int row, int column); 347 348 size_t PQescapeString (char *to, const char *from, size_t length); 349 350 enum int CONNECTION_OK = 0; 351 enum int PGRES_COMMAND_OK = 1; 352 enum int PGRES_TUPLES_OK = 2; 353 354 int PQgetlength(const PGresult *res, 355 int row_number, 356 int column_number); 357 int PQgetisnull(const PGresult *res, 358 int row_number, 359 int column_number); 360 361 int PQfformat(const PGresult *res, int column_number); 362 363 alias Oid = int; 364 enum BYTEAOID = 17; 365 Oid PQftype(const PGresult* res, int column_number); 366 367 char *PQescapeByteaConn(PGconn *conn, 368 const ubyte *from, 369 size_t from_length, 370 size_t *to_length); 371 char *PQunescapeBytea(const char *from, size_t *to_length); 372 void PQfreemem(void *ptr); 373 374 char* PQcmdTuples(PGresult *res); 375 376 } 377 378 /* 379 import std.stdio; 380 void main() { 381 auto db = new PostgreSql("dbname = test"); 382 383 db.query("INSERT INTO users (id, name) values (?, ?)", 30, "hello mang"); 384 385 foreach(line; db.query("SELECT * FROM users")) { 386 writeln(line[0], line["name"]); 387 } 388 } 389 */