1 /++ 2 Uses libpq implement the [arsd.database.Database] interface. 3 4 Requires the official pq library from Postgres to be installed to build 5 and to use. Note that on Windows, it is often distributed as `libpq.lib`. 6 You will have to copy or rename that to `pq.lib` for dub or dmd to automatically 7 find it. You will also likely need to add the lib search path yourself on 8 both Windows and Linux systems (on my Linux box, it is `-L-L/usr/local/pgsql/lib` 9 to dmd. You can also list things your app's dub.json's lflags too. Note on the 10 Microsoft linker, the flag is called `/LIBPATH`.) 11 12 For example, for the default Postgres install on Windows, try: 13 14 ``` 15 "lflags-windows": [ "/LIBPATH:C:/Program Files/PostgreSQL/<VERSION>/lib" ], 16 ``` 17 18 In your dub.json. 19 20 When you distribute your application, the user will want to install libpq client on 21 Linux, and on Windows, you may want to include the libpq.dll in your distribution. 22 Note it may also depend on OpenSSL ssl and crypto dlls and libintl.dll as well. These 23 should be found in the PostgreSQL lib and/or bin folders (check them both!). 24 +/ 25 module arsd.postgres; 26 27 version(Windows) 28 pragma(lib, "libpq"); 29 else 30 pragma(lib, "pq"); 31 32 public import arsd.database; 33 34 import std.string; 35 import std.exception; 36 37 // remember to CREATE DATABASE name WITH ENCODING 'utf8' 38 // 39 // http://www.postgresql.org/docs/8.0/static/libpq-exec.html 40 // ExecParams, PQPrepare, PQExecPrepared 41 // 42 // SQL: `DEALLOCATE name` is how to dealloc a prepared statement. 43 44 /++ 45 The PostgreSql implementation of the [Database] interface. 46 47 You should construct this class, but then use it through the 48 interface functions. 49 50 --- 51 auto db = new PostgreSql("dbname=name"); 52 foreach(row; db.query("SELECT id, data FROM table_name")) 53 writeln(row[0], " = ", row[1]); 54 --- 55 +/ 56 class PostgreSql : Database { 57 /// `dbname=your_database_name` is probably the most common connection string. See section "33.1.1.1. Keyword/Value Connection Strings" on https://www.postgresql.org/docs/10/libpq-connect.html 58 this(string connectionString) { 59 this.connectionString = connectionString; 60 conn = PQconnectdb(toStringz(connectionString)); 61 if(conn is null) 62 throw new DatabaseException("Unable to allocate PG connection object"); 63 if(PQstatus(conn) != CONNECTION_OK) 64 throw new DatabaseException(error()); 65 query("SET NAMES 'utf8'"); // D does everything with utf8 66 } 67 68 string connectionString; 69 70 ~this() { 71 PQfinish(conn); 72 } 73 74 string sysTimeToValue(SysTime s) { 75 return "'" ~ escape(s.toISOExtString()) ~ "'::timestamptz"; 76 } 77 78 /** 79 Prepared statement support 80 81 This will be added to the Database interface eventually in some form, 82 but first I need to implement it for all my providers. 83 84 The common function of those 4 will be what I put in the interface. 85 */ 86 87 ResultSet executePreparedStatement(T...)(string name, T args) { 88 const(char)*[args.length] argsStrings; 89 90 foreach(idx, arg; args) { 91 // FIXME: optimize to remove allocations here 92 import std.conv; 93 static if(!is(typeof(arg) == typeof(null))) 94 argsStrings[idx] = toStringz(to!string(arg)); 95 // else make it null 96 } 97 98 auto res = PQexecPrepared(conn, toStringz(name), argsStrings.length, argsStrings.ptr, null, null, 0); 99 100 int ress = PQresultStatus(res); 101 if(ress != PGRES_TUPLES_OK 102 && ress != PGRES_COMMAND_OK) 103 throw new DatabaseException(error()); 104 105 return new PostgresResult(res); 106 107 } 108 109 /// 110 override void startTransaction() { 111 query("START TRANSACTION"); 112 } 113 114 ResultSet queryImpl(string sql, Variant[] args...) { 115 sql = escapedVariants(this, sql, args); 116 117 bool first_retry = true; 118 119 retry: 120 121 auto res = PQexec(conn, toStringz(sql)); 122 int ress = PQresultStatus(res); 123 // https://www.postgresql.org/docs/current/libpq-exec.html 124 // FIXME: PQresultErrorField can get a lot more info in a more structured way 125 if(ress != PGRES_TUPLES_OK 126 && ress != PGRES_COMMAND_OK) 127 { 128 if(first_retry && error() == "no connection to the server\n") { 129 first_retry = false; 130 // try to reconnect... 131 PQfinish(conn); 132 conn = PQconnectdb(toStringz(connectionString)); 133 if(conn is null) 134 throw new DatabaseException("Unable to allocate PG connection object"); 135 if(PQstatus(conn) != CONNECTION_OK) 136 throw new DatabaseException(error()); 137 goto retry; 138 } 139 throw new DatabaseException(error()); 140 } 141 142 return new PostgresResult(res); 143 } 144 145 string escape(string sqlData) { 146 char* buffer = (new char[sqlData.length * 2 + 1]).ptr; 147 ulong size = PQescapeString (buffer, sqlData.ptr, sqlData.length); 148 149 string ret = assumeUnique(buffer[0.. cast(size_t) size]); 150 151 return ret; 152 } 153 154 string escapeBinaryString(const(ubyte)[] data) { 155 // must include '\x ... ' here 156 size_t len; 157 char* buf = PQescapeByteaConn(conn, data.ptr, data.length, &len); 158 if(buf is null) 159 throw new Exception("pgsql out of memory escaping binary string"); 160 161 string res; 162 if(len == 0) 163 res = "''"; 164 else 165 res = cast(string) ("'" ~ buf[0 .. len - 1] ~ "'"); // gotta cut the zero terminator off 166 167 PQfreemem(buf); 168 169 return res; 170 } 171 172 173 /// 174 string error() { 175 return copyCString(PQerrorMessage(conn)); 176 } 177 178 private: 179 PGconn* conn; 180 } 181 182 private string toLowerFast(string s) { 183 import std.ascii : isUpper; 184 foreach (c; s) 185 if (c >= 0x80 || isUpper(c)) 186 return toLower(s); 187 return s; 188 } 189 190 /// 191 class PostgresResult : ResultSet { 192 // name for associative array to result index 193 int getFieldIndex(string field) { 194 if(mapping is null) 195 makeFieldMapping(); 196 field = field.toLowerFast; 197 if(field in mapping) 198 return mapping[field]; 199 else throw new Exception("no mapping " ~ field); 200 } 201 202 203 string[] fieldNames() { 204 if(mapping is null) 205 makeFieldMapping(); 206 return columnNames; 207 } 208 209 // this is a range that can offer other ranges to access it 210 bool empty() { 211 return position == numRows; 212 } 213 214 Row front() { 215 return row; 216 } 217 218 int affectedRows() @system { 219 auto g = PQcmdTuples(res); 220 if(g is null) 221 return 0; 222 int num; 223 while(*g) { 224 num *= 10; 225 num += *g - '0'; 226 g++; 227 } 228 return num; 229 } 230 231 void popFront() { 232 position++; 233 if(position < numRows) 234 fetchNext(); 235 } 236 237 override size_t length() { 238 return numRows; 239 } 240 241 this(PGresult* res) { 242 this.res = res; 243 numFields = PQnfields(res); 244 numRows = PQntuples(res); 245 246 if(numRows) 247 fetchNext(); 248 } 249 250 ~this() { 251 PQclear(res); 252 } 253 254 private: 255 PGresult* res; 256 int[string] mapping; 257 string[] columnNames; 258 int numFields; 259 260 int position; 261 262 int numRows; 263 264 Row row; 265 266 void fetchNext() { 267 Row r; 268 r.resultSet = this; 269 DatabaseDatum[] row; 270 271 for(int i = 0; i < numFields; i++) { 272 string a; 273 274 if(PQgetisnull(res, position, i)) 275 a = null; 276 else { 277 switch(PQfformat(res, i)) { 278 case 0: // text representation 279 switch(PQftype(res, i)) { 280 case BYTEAOID: 281 size_t len; 282 char* c = PQunescapeBytea(PQgetvalue(res, position, i), &len); 283 284 a = cast(string) c[0 .. len].idup; 285 286 PQfreemem(c); 287 break; 288 default: 289 a = copyCString(PQgetvalue(res, position, i), PQgetlength(res, position, i)); 290 } 291 break; 292 case 1: // binary representation 293 throw new Exception("unexpected format returned by pq"); 294 default: 295 throw new Exception("unknown pq format"); 296 } 297 298 } 299 row ~= DatabaseDatum(a); 300 } 301 302 r.row = row; 303 this.row = r; 304 } 305 306 void makeFieldMapping() { 307 for(int i = 0; i < numFields; i++) { 308 string a = copyCString(PQfname(res, i)); 309 310 columnNames ~= a; 311 mapping[a] = i; 312 } 313 314 } 315 } 316 317 string copyCString(const char* c, int actualLength = -1) @system { 318 const(char)* a = c; 319 if(a is null) 320 return null; 321 322 string ret; 323 if(actualLength == -1) 324 while(*a) { 325 ret ~= *a; 326 a++; 327 } 328 else { 329 ret = a[0..actualLength].idup; 330 } 331 332 return ret; 333 } 334 335 extern(C) { 336 struct PGconn {}; 337 struct PGresult {}; 338 339 void PQfinish(PGconn*); 340 PGconn* PQconnectdb(const char*); 341 342 int PQstatus(PGconn*); // FIXME check return value 343 344 const (char*) PQerrorMessage(PGconn*); 345 346 PGresult* PQexec(PGconn*, const char*); 347 void PQclear(PGresult*); 348 349 PGresult* PQprepare(PGconn*, const char* stmtName, const char* query, int nParams, const void* paramTypes); 350 351 PGresult* PQexecPrepared(PGconn*, const char* stmtName, int nParams, const char** paramValues, const int* paramLengths, const int* paramFormats, int resultFormat); 352 353 int PQresultStatus(PGresult*); // FIXME check return value 354 355 int PQnfields(PGresult*); // number of fields in a result 356 const(char*) PQfname(PGresult*, int); // name of field 357 358 int PQntuples(PGresult*); // number of rows in result 359 const(char*) PQgetvalue(PGresult*, int row, int column); 360 361 size_t PQescapeString (char *to, const char *from, size_t length); 362 363 enum int CONNECTION_OK = 0; 364 enum int PGRES_COMMAND_OK = 1; 365 enum int PGRES_TUPLES_OK = 2; 366 367 int PQgetlength(const PGresult *res, 368 int row_number, 369 int column_number); 370 int PQgetisnull(const PGresult *res, 371 int row_number, 372 int column_number); 373 374 int PQfformat(const PGresult *res, int column_number); 375 376 alias Oid = int; 377 enum BYTEAOID = 17; 378 Oid PQftype(const PGresult* res, int column_number); 379 380 char *PQescapeByteaConn(PGconn *conn, 381 const ubyte *from, 382 size_t from_length, 383 size_t *to_length); 384 char *PQunescapeBytea(const char *from, size_t *to_length); 385 void PQfreemem(void *ptr); 386 387 char* PQcmdTuples(PGresult *res); 388 389 } 390 391 /* 392 import std.stdio; 393 void main() { 394 auto db = new PostgreSql("dbname = test"); 395 396 db.query("INSERT INTO users (id, name) values (?, ?)", 30, "hello mang"); 397 398 foreach(line; db.query("SELECT * FROM users")) { 399 writeln(line[0], line["name"]); 400 } 401 } 402 */