1 /// Uses libpq implement the [arsd.database.Database] interface. 2 module arsd.postgres; 3 pragma(lib, "pq"); 4 5 public import arsd.database; 6 7 import std.string; 8 import std.exception; 9 10 // remember to CREATE DATABASE name WITH ENCODING 'utf8' 11 // 12 // http://www.postgresql.org/docs/8.0/static/libpq-exec.html 13 // ExecParams, PQPrepare, PQExecPrepared 14 // 15 // SQL: `DEALLOCATE name` is how to dealloc a prepared statement. 16 17 /++ 18 The PostgreSql implementation of the [Database] interface. 19 20 You should construct this class, but then use it through the 21 interface functions. 22 23 --- 24 auto db = new PostgreSql("dbname=name"); 25 foreach(row; db.query("SELECT id, data FROM table_name")) 26 writeln(row[0], " = ", row[1]); 27 --- 28 +/ 29 class PostgreSql : Database { 30 /// `dbname=your_database_name` is probably the most common connection string. See section "33.1.1.1. Keyword/Value Connection Strings" on https://www.postgresql.org/docs/10/libpq-connect.html 31 this(string connectionString) { 32 this.connectionString = connectionString; 33 conn = PQconnectdb(toStringz(connectionString)); 34 if(conn is null) 35 throw new DatabaseException("Unable to allocate PG connection object"); 36 if(PQstatus(conn) != CONNECTION_OK) 37 throw new DatabaseException(error()); 38 query("SET NAMES 'utf8'"); // D does everything with utf8 39 } 40 41 string connectionString; 42 43 ~this() { 44 PQfinish(conn); 45 } 46 47 string sysTimeToValue(SysTime s) { 48 return "'" ~ escape(s.toISOExtString()) ~ "'::timestamptz"; 49 } 50 51 /** 52 Prepared statement support 53 54 This will be added to the Database interface eventually in some form, 55 but first I need to implement it for all my providers. 56 57 The common function of those 4 will be what I put in the interface. 58 */ 59 60 ResultSet executePreparedStatement(T...)(string name, T args) { 61 char*[args.length] argsStrings; 62 63 foreach(idx, arg; args) { 64 // FIXME: optimize to remove allocations here 65 static if(!is(typeof(arg) == typeof(null))) 66 argsStrings[idx] = toStringz(to!string(arg)); 67 // else make it null 68 } 69 70 auto res = PQexecPrepared(conn, toStringz(name), argsStrings.length, argStrings.ptr, 0, null, 0); 71 72 int ress = PQresultStatus(res); 73 if(ress != PGRES_TUPLES_OK 74 && ress != PGRES_COMMAND_OK) 75 throw new DatabaseException(error()); 76 77 return new PostgresResult(res); 78 79 } 80 81 /// 82 override void startTransaction() { 83 query("START TRANSACTION"); 84 } 85 86 ResultSet queryImpl(string sql, Variant[] args...) { 87 sql = escapedVariants(this, sql, args); 88 89 bool first_retry = true; 90 91 retry: 92 93 auto res = PQexec(conn, toStringz(sql)); 94 int ress = PQresultStatus(res); 95 // https://www.postgresql.org/docs/current/libpq-exec.html 96 // FIXME: PQresultErrorField can get a lot more info in a more structured way 97 if(ress != PGRES_TUPLES_OK 98 && ress != PGRES_COMMAND_OK) 99 { 100 if(first_retry && error() == "no connection to the server\n") { 101 first_retry = false; 102 // try to reconnect... 103 PQfinish(conn); 104 conn = PQconnectdb(toStringz(connectionString)); 105 if(conn is null) 106 throw new DatabaseException("Unable to allocate PG connection object"); 107 if(PQstatus(conn) != CONNECTION_OK) 108 throw new DatabaseException(error()); 109 goto retry; 110 } 111 throw new DatabaseException(error()); 112 } 113 114 return new PostgresResult(res); 115 } 116 117 string escape(string sqlData) { 118 char* buffer = (new char[sqlData.length * 2 + 1]).ptr; 119 ulong size = PQescapeString (buffer, sqlData.ptr, sqlData.length); 120 121 string ret = assumeUnique(buffer[0.. cast(size_t) size]); 122 123 return ret; 124 } 125 126 127 /// 128 string error() { 129 return copyCString(PQerrorMessage(conn)); 130 } 131 132 private: 133 PGconn* conn; 134 } 135 136 /// 137 class PostgresResult : ResultSet { 138 // name for associative array to result index 139 int getFieldIndex(string field) { 140 if(mapping is null) 141 makeFieldMapping(); 142 field = field.toLower; 143 if(field in mapping) 144 return mapping[field]; 145 else throw new Exception("no mapping " ~ field); 146 } 147 148 149 string[] fieldNames() { 150 if(mapping is null) 151 makeFieldMapping(); 152 return columnNames; 153 } 154 155 // this is a range that can offer other ranges to access it 156 bool empty() { 157 return position == numRows; 158 } 159 160 Row front() { 161 return row; 162 } 163 164 void popFront() { 165 position++; 166 if(position < numRows) 167 fetchNext(); 168 } 169 170 override size_t length() { 171 return numRows; 172 } 173 174 this(PGresult* res) { 175 this.res = res; 176 numFields = PQnfields(res); 177 numRows = PQntuples(res); 178 179 if(numRows) 180 fetchNext(); 181 } 182 183 ~this() { 184 PQclear(res); 185 } 186 187 private: 188 PGresult* res; 189 int[string] mapping; 190 string[] columnNames; 191 int numFields; 192 193 int position; 194 195 int numRows; 196 197 Row row; 198 199 void fetchNext() { 200 Row r; 201 r.resultSet = this; 202 string[] row; 203 204 for(int i = 0; i < numFields; i++) { 205 string a; 206 207 if(PQgetisnull(res, position, i)) 208 a = null; 209 else { 210 a = copyCString(PQgetvalue(res, position, i), PQgetlength(res, position, i)); 211 212 } 213 row ~= a; 214 } 215 216 r.row = row; 217 this.row = r; 218 } 219 220 void makeFieldMapping() { 221 for(int i = 0; i < numFields; i++) { 222 string a = copyCString(PQfname(res, i)); 223 224 columnNames ~= a; 225 mapping[a] = i; 226 } 227 228 } 229 } 230 231 string copyCString(const char* c, int actualLength = -1) { 232 const(char)* a = c; 233 if(a is null) 234 return null; 235 236 string ret; 237 if(actualLength == -1) 238 while(*a) { 239 ret ~= *a; 240 a++; 241 } 242 else { 243 ret = a[0..actualLength].idup; 244 } 245 246 return ret; 247 } 248 249 extern(C) { 250 struct PGconn {}; 251 struct PGresult {}; 252 253 void PQfinish(PGconn*); 254 PGconn* PQconnectdb(const char*); 255 256 int PQstatus(PGconn*); // FIXME check return value 257 258 const (char*) PQerrorMessage(PGconn*); 259 260 PGresult* PQexec(PGconn*, const char*); 261 void PQclear(PGresult*); 262 263 PGresult* PQprepare(PGconn*, const char* stmtName, const char* query, int nParams, const void* paramTypes); 264 265 PGresult* PQexecPrepared(PGconn*, const char* stmtName, int nParams, const char** paramValues, const int* paramLengths, const int* paramFormats, int resultFormat); 266 267 int PQresultStatus(PGresult*); // FIXME check return value 268 269 int PQnfields(PGresult*); // number of fields in a result 270 const(char*) PQfname(PGresult*, int); // name of field 271 272 int PQntuples(PGresult*); // number of rows in result 273 const(char*) PQgetvalue(PGresult*, int row, int column); 274 275 size_t PQescapeString (char *to, const char *from, size_t length); 276 277 enum int CONNECTION_OK = 0; 278 enum int PGRES_COMMAND_OK = 1; 279 enum int PGRES_TUPLES_OK = 2; 280 281 int PQgetlength(const PGresult *res, 282 int row_number, 283 int column_number); 284 int PQgetisnull(const PGresult *res, 285 int row_number, 286 int column_number); 287 288 289 } 290 291 /* 292 import std.stdio; 293 void main() { 294 auto db = new PostgreSql("dbname = test"); 295 296 db.query("INSERT INTO users (id, name) values (?, ?)", 30, "hello mang"); 297 298 foreach(line; db.query("SELECT * FROM users")) { 299 writeln(line[0], line["name"]); 300 } 301 } 302 */