1 /++ 2 Uses libpq implement the [arsd.database.Database] interface. 3 4 Requires the official pq library from Postgres to be installed to build 5 and to use. Note that on Windows, it is often distributed as `libpq.lib`. 6 You will have to copy or rename that to `pq.lib` for dub or dmd to automatically 7 find it. You will also likely need to add the lib search path yourself on 8 both Windows and Linux systems (on my Linux box, it is `-L-L/usr/local/pgsql/lib` 9 to dmd. You can also list things your app's dub.json's lflags too. Note on the 10 Microsoft linker, the flag is called `/LIBPATH`.) 11 12 For example, for the default Postgres install on Windows, try: 13 14 ``` 15 "lflags-windows": [ "/LIBPATH:C:/Program Files/PostgreSQL/<VERSION>/lib" ], 16 ``` 17 18 In your dub.json. 19 20 When you distribute your application, the user will want to install libpq client on 21 Linux, and on Windows, you may want to include the libpq.dll in your distribution. 22 Note it may also depend on OpenSSL ssl and crypto dlls and libintl.dll as well. These 23 should be found in the PostgreSQL lib and/or bin folders (check them both!). 24 +/ 25 module arsd.postgres; 26 27 version(Windows) 28 pragma(lib, "libpq"); 29 else 30 pragma(lib, "pq"); 31 32 public import arsd.database; 33 34 import std.string; 35 import std.exception; 36 37 // remember to CREATE DATABASE name WITH ENCODING 'utf8' 38 // 39 // http://www.postgresql.org/docs/8.0/static/libpq-exec.html 40 // ExecParams, PQPrepare, PQExecPrepared 41 // 42 // SQL: `DEALLOCATE name` is how to dealloc a prepared statement. 43 44 /++ 45 The PostgreSql implementation of the [Database] interface. 46 47 You should construct this class, but then use it through the 48 interface functions. 49 50 --- 51 auto db = new PostgreSql("dbname=name"); 52 foreach(row; db.query("SELECT id, data FROM table_name")) 53 writeln(row[0], " = ", row[1]); 54 --- 55 +/ 56 class PostgreSql : Database { 57 /// `dbname=your_database_name` is probably the most common connection string. See section "33.1.1.1. Keyword/Value Connection Strings" on https://www.postgresql.org/docs/10/libpq-connect.html 58 this(string connectionString) { 59 this.connectionString = connectionString; 60 conn = PQconnectdb(toStringz(connectionString)); 61 if(conn is null) 62 throw new DatabaseConnectionException("Unable to allocate PG connection object"); 63 if(PQstatus(conn) != CONNECTION_OK) { 64 this.connectionOk = false; 65 throw new DatabaseConnectionException(error()); 66 } 67 query("SET NAMES 'utf8'"); // D does everything with utf8 68 this.connectionOk = true; 69 } 70 71 string connectionString; 72 73 ~this() { 74 PQfinish(conn); 75 } 76 77 string sysTimeToValue(SysTime s) { 78 return "'" ~ escape(s.toISOExtString()) ~ "'::timestamptz"; 79 } 80 81 private bool connectionOk; 82 override bool isAlive() { 83 return connectionOk; 84 } 85 86 /** 87 Prepared statement support 88 89 This will be added to the Database interface eventually in some form, 90 but first I need to implement it for all my providers. 91 92 The common function of those 4 will be what I put in the interface. 93 */ 94 95 ResultSet executePreparedStatement(T...)(string name, T args) { 96 const(char)*[args.length] argsStrings; 97 98 foreach(idx, arg; args) { 99 // FIXME: optimize to remove allocations here 100 import std.conv; 101 static if(!is(typeof(arg) == typeof(null))) 102 argsStrings[idx] = toStringz(to!string(arg)); 103 // else make it null 104 } 105 106 auto res = PQexecPrepared(conn, toStringz(name), argsStrings.length, argsStrings.ptr, null, null, 0); 107 108 int ress = PQresultStatus(res); 109 if(ress != PGRES_TUPLES_OK 110 && ress != PGRES_COMMAND_OK) 111 throw new DatabaseException(error()); 112 113 return new PostgresResult(res); 114 115 } 116 117 /// 118 override void startTransaction() { 119 query("START TRANSACTION"); 120 } 121 122 ResultSet queryImpl(string sql, Variant[] args...) { 123 sql = escapedVariants(this, sql, args); 124 125 bool first_retry = true; 126 127 retry: 128 129 auto res = PQexec(conn, toStringz(sql)); 130 int ress = PQresultStatus(res); 131 // https://www.postgresql.org/docs/current/libpq-exec.html 132 // FIXME: PQresultErrorField can get a lot more info in a more structured way 133 if(ress != PGRES_TUPLES_OK 134 && ress != PGRES_COMMAND_OK) 135 { 136 if(first_retry && error() == "no connection to the server\n") { 137 first_retry = false; 138 // try to reconnect... 139 PQfinish(conn); 140 conn = PQconnectdb(toStringz(connectionString)); 141 if(conn is null) 142 throw new DatabaseConnectionException("Unable to allocate PG connection object"); 143 if(PQstatus(conn) != CONNECTION_OK) { 144 this.connectionOk = false; 145 throw new DatabaseConnectionException(error()); 146 } 147 goto retry; 148 } 149 throw new SqlException(error()); 150 } 151 152 return new PostgresResult(res); 153 } 154 155 string escape(string sqlData) { 156 char* buffer = (new char[sqlData.length * 2 + 1]).ptr; 157 ulong size = PQescapeString (buffer, sqlData.ptr, sqlData.length); 158 159 string ret = assumeUnique(buffer[0.. cast(size_t) size]); 160 161 return ret; 162 } 163 164 string escapeBinaryString(const(ubyte)[] data) { 165 // must include '\x ... ' here 166 size_t len; 167 char* buf = PQescapeByteaConn(conn, data.ptr, data.length, &len); 168 if(buf is null) 169 throw new Exception("pgsql out of memory escaping binary string"); 170 171 string res; 172 if(len == 0) 173 res = "''"; 174 else 175 res = cast(string) ("'" ~ buf[0 .. len - 1] ~ "'"); // gotta cut the zero terminator off 176 177 PQfreemem(buf); 178 179 return res; 180 } 181 182 183 /// 184 string error() { 185 return copyCString(PQerrorMessage(conn)); 186 } 187 188 private: 189 PGconn* conn; 190 } 191 192 /+ 193 # when it changes from lowercase to upper case, call that a new word. or when it goes to/from anything else and underscore or dashes. 194 +/ 195 196 struct PreparedStatementDescription { 197 PreparedStatementResult[] result; 198 } 199 200 struct PreparedStatementResult { 201 string fieldName; 202 DatabaseDatum type; 203 } 204 205 PreparedStatementDescription describePrepared(PostgreSql db, string name) { 206 auto res = PQdescribePrepared(db.conn, name.toStringz); 207 208 PreparedStatementResult[] ret; 209 210 // PQnparams PQparamtype for params 211 auto numFields = PQnfields(res); 212 foreach(num; 0 .. numFields) { 213 auto typeId = PQftype(res, num); 214 DatabaseDatum dd; 215 dd.platformSpecificTag = typeId; 216 dd.storage = sampleForOid(typeId); 217 ret ~= PreparedStatementResult( 218 copyCString(PQfname(res, num)), 219 dd, 220 ); 221 } 222 223 PQclear(res); 224 225 return PreparedStatementDescription(ret); 226 } 227 228 import arsd.core : LimitedVariant, PackedDateTime, SimplifiedUtcTimestamp; 229 LimitedVariant sampleForOid(int platformSpecificTag) { 230 switch(platformSpecificTag) { 231 case BOOLOID: 232 return LimitedVariant(false); 233 case BYTEAOID: 234 return LimitedVariant(cast(const(ubyte)[]) null); 235 case TEXTOID: 236 case VARCHAROID: 237 return LimitedVariant(""); 238 case INT4OID: 239 return LimitedVariant(0); 240 case INT8OID: 241 return LimitedVariant(0L); 242 case FLOAT4OID: 243 return LimitedVariant(0.0f); 244 case FLOAT8OID: 245 return LimitedVariant(0.0); 246 case TIMESTAMPOID: 247 case TIMESTAMPTZOID: 248 return LimitedVariant(SimplifiedUtcTimestamp(0)); 249 case DATEOID: 250 PackedDateTime d; 251 d.hasDate = true; 252 return LimitedVariant(d); // might want a different type so contains shows the thing without checking hasDate and hasTime 253 case TIMETZOID: // possibly wrong... the tz isn't in my packed thing 254 case TIMEOID: 255 PackedDateTime d; 256 d.hasTime = true; 257 return LimitedVariant(d); 258 case INTERVALOID: 259 // months, days, and microseconds 260 261 case NUMERICOID: // aka decimal 262 default: 263 // when in doubt, assume it is just a string 264 return LimitedVariant("sample"); 265 } 266 } 267 268 private string toLowerFast(string s) { 269 import std.ascii : isUpper; 270 foreach (c; s) 271 if (c >= 0x80 || isUpper(c)) 272 return toLower(s); 273 return s; 274 } 275 276 /// 277 class PostgresResult : ResultSet { 278 // name for associative array to result index 279 int getFieldIndex(string field) { 280 if(mapping is null) 281 makeFieldMapping(); 282 field = field.toLowerFast; 283 if(field in mapping) 284 return mapping[field]; 285 else throw new Exception("no mapping " ~ field); 286 } 287 288 289 string[] fieldNames() { 290 if(mapping is null) 291 makeFieldMapping(); 292 return columnNames; 293 } 294 295 // this is a range that can offer other ranges to access it 296 bool empty() { 297 return position == numRows; 298 } 299 300 Row front() { 301 return row; 302 } 303 304 int affectedRows() @system { 305 auto g = PQcmdTuples(res); 306 if(g is null) 307 return 0; 308 int num; 309 while(*g) { 310 num *= 10; 311 num += *g - '0'; 312 g++; 313 } 314 return num; 315 } 316 317 void popFront() { 318 position++; 319 if(position < numRows) 320 fetchNext(); 321 } 322 323 override size_t length() { 324 return numRows; 325 } 326 327 this(PGresult* res) { 328 this.res = res; 329 numFields = PQnfields(res); 330 numRows = PQntuples(res); 331 332 if(numRows) 333 fetchNext(); 334 } 335 336 ~this() { 337 PQclear(res); 338 } 339 340 private: 341 PGresult* res; 342 int[string] mapping; 343 string[] columnNames; 344 int numFields; 345 346 int position; 347 348 int numRows; 349 350 Row row; 351 352 void fetchNext() { 353 Row r; 354 r.resultSet = this; 355 DatabaseDatum[] row; 356 357 for(int i = 0; i < numFields; i++) { 358 string a; 359 360 if(PQgetisnull(res, position, i)) 361 a = null; 362 else { 363 switch(PQfformat(res, i)) { 364 case 0: // text representation 365 switch(PQftype(res, i)) { 366 case BYTEAOID: 367 size_t len; 368 char* c = PQunescapeBytea(PQgetvalue(res, position, i), &len); 369 370 a = cast(string) c[0 .. len].idup; 371 372 PQfreemem(c); 373 break; 374 default: 375 a = copyCString(PQgetvalue(res, position, i), PQgetlength(res, position, i)); 376 } 377 break; 378 case 1: // binary representation 379 throw new Exception("unexpected format returned by pq"); 380 default: 381 throw new Exception("unknown pq format"); 382 } 383 384 } 385 row ~= DatabaseDatum(a); 386 } 387 388 r.row = row; 389 this.row = r; 390 } 391 392 void makeFieldMapping() { 393 for(int i = 0; i < numFields; i++) { 394 string a = copyCString(PQfname(res, i)); 395 396 columnNames ~= a; 397 mapping[a] = i; 398 } 399 400 } 401 } 402 403 string copyCString(const char* c, int actualLength = -1) @system { 404 const(char)* a = c; 405 if(a is null) 406 return null; 407 408 string ret; 409 if(actualLength == -1) 410 while(*a) { 411 ret ~= *a; 412 a++; 413 } 414 else { 415 ret = a[0..actualLength].idup; 416 } 417 418 return ret; 419 } 420 421 extern(C) { 422 struct PGconn {}; 423 struct PGresult {}; 424 425 void PQfinish(PGconn*); 426 PGconn* PQconnectdb(const char*); 427 428 int PQstatus(PGconn*); // FIXME check return value 429 430 const (char*) PQerrorMessage(PGconn*); 431 432 PGresult* PQexec(PGconn*, const char*); 433 void PQclear(PGresult*); 434 435 PGresult* PQprepare(PGconn*, const char* stmtName, const char* query, int nParams, const void* paramTypes); 436 int PQsendPrepare(PGconn*, const char*, const char*, int, const Oid*); 437 438 PGresult* PQexecPrepared(PGconn*, const char* stmtName, int nParams, const char** paramValues, const int* paramLengths, const int* paramFormats, int resultFormat); 439 int PQsendQueryPrepared(PGconn*, const char* stmtName, int nParams, const char** paramValues, const int* paramLengths, const int* paramFormats, int resultFormat); 440 int PQsendClosePrepared(PGconn* conn, const char* name); 441 442 int PQresultStatus(PGresult*); // FIXME check return value 443 444 int PQnfields(PGresult*); // number of fields in a result 445 const(char*) PQfname(PGresult*, int); // name of field 446 447 int PQntuples(PGresult*); // number of rows in result 448 const(char*) PQgetvalue(PGresult*, int row, int column); 449 450 size_t PQescapeString (char *to, const char *from, size_t length); 451 452 enum int CONNECTION_OK = 0; 453 454 enum int PGRES_EMPTY_QUERY = 0; 455 enum int PGRES_COMMAND_OK = 1; 456 enum int PGRES_TUPLES_OK = 2; 457 enum int PGRES_COPY_OUT = 3; 458 enum int PGRES_COPY_IN = 4; 459 enum int PGRES_BAD_RESPONSE = 5; 460 enum int PGRES_NONFATAL_ERROR = 6; 461 enum int PGRES_FATAL_ERROR = 7; 462 enum int PGRES_COPY_BOTH = 8; 463 enum int PGRES_SINGLE_TUPLE = 9; 464 enum int PGRES_PIPELINE_SYNC = 10; 465 enum int PGRES_PIPELINE_ABORTED = 11; 466 // looks like chunks was added in pq version 17... 467 468 int PQsetSingleRowMode(PGconn* conn); 469 470 // https://www.postgresql.org/docs/current/libpq-notify.html 471 472 enum int PGRES_POLLING_FAILED = 0; 473 enum int PGRES_POLLING_READING = 1; 474 enum int PGRES_POLLING_WRITING = 2; 475 enum int PGRES_POLLING_OK = 3; 476 PGconn* PQconnectStart(const char* connInfo); 477 int PQconnectPoll(PGconn* conn); 478 479 int PQgetlength(const PGresult *res, 480 int row_number, 481 int column_number); 482 int PQgetisnull(const PGresult *res, 483 int row_number, 484 int column_number); 485 486 int PQfformat(const PGresult *res, int column_number); 487 488 alias Oid = int; 489 enum BOOLOID = 16; 490 enum BYTEAOID = 17; 491 enum TEXTOID = 25; 492 enum INT4OID = 23; // integer 493 enum INT8OID = 20; // bigint 494 enum NUMERICOID = 1700; 495 enum FLOAT4OID = 700; 496 enum FLOAT8OID = 701; 497 enum VARCHAROID = 1043; 498 enum DATEOID = 1082; 499 enum TIMEOID = 1083; 500 enum TIMESTAMPOID = 1114; 501 enum TIMESTAMPTZOID = 1184; 502 enum INTERVALOID = 1186; 503 enum TIMETZOID = 1266; 504 505 Oid PQftype(const PGresult* res, int column_number); 506 507 char *PQescapeByteaConn(PGconn *conn, 508 const ubyte *from, 509 size_t from_length, 510 size_t *to_length); 511 char *PQunescapeBytea(const char *from, size_t *to_length); 512 void PQfreemem(void *ptr); 513 514 char* PQcmdTuples(PGresult *res); 515 516 int PQsendQuery(PGconn* conn, const char* command); 517 int PQsendQueryParams(PGconn* conn, const char* command, int params, const Oid* paramTypes, const char** paramValues, const int* paramLengths, const int* paramFormats, int resultFormat); 518 519 PGresult *PQdescribePrepared(PGconn *conn, const char *stmtName); 520 int PQsendDescribePrepared(PGconn *conn, const char *stmtName); 521 522 PGresult* PQgetResult(PGconn* conn); // call until it returns null 523 524 int PQenterPipelineMode(PGconn* conn); // returns 1 on success 525 int PQexitPipelineMode(PGconn* conn); // ditto 526 PGpipelineStatus PQpipelineStatus(const PGconn* conn); 527 enum PGpipelineStatus { 528 // FIXME: confirm values 529 PQ_PIPELINE_ON, 530 PQ_PIPELINE_OFF, 531 PQ_PIPELINE_ABORTED 532 } 533 int PQpipelineSync(PGconn* conn); 534 int PQsendPipelineSync(PGconn* conn); 535 int PQsendFlushRequest(PGconn* conn); 536 537 int PQconsumeInput(PGconn* conn); 538 int PQisBusy(PGconn* conn); 539 540 int PQsetnonblocking(PGconn* conn, int arg); 541 int PQflush(PGconn* conn); // if returns 1, wait for socket readiness 542 543 int PQsocket(const PGconn* conn); // returns a fd 544 } 545 546 /* 547 import std.stdio; 548 void main() { 549 auto db = new PostgreSql("dbname = test"); 550 551 db.query("INSERT INTO users (id, name) values (?, ?)", 30, "hello mang"); 552 553 foreach(line; db.query("SELECT * FROM users")) { 554 writeln(line[0], line["name"]); 555 } 556 } 557 */