1 /// Uses libpq implement the [arsd.database.Database] interface.
2 module arsd.postgres;
3 pragma(lib, "pq");
4 
5 public import arsd.database;
6 
7 import std..string;
8 import std.exception;
9 
10 // remember to CREATE DATABASE name WITH ENCODING 'utf8'
11 //
12 // http://www.postgresql.org/docs/8.0/static/libpq-exec.html
13 // ExecParams, PQPrepare, PQExecPrepared
14 //
15 // SQL: `DEALLOCATE name` is how to dealloc a prepared statement.
16 
17 /++
18 	The PostgreSql implementation of the [Database] interface.
19 
20 	You should construct this class, but then use it through the
21 	interface functions.
22 
23 	---
24 	auto db = new PostgreSql("dbname=name");
25 	foreach(row; db.query("SELECT id, data FROM table_name"))
26 		writeln(row[0], " = ", row[1]);
27 	---
28 +/
29 class PostgreSql : Database {
30 	/// dbname = name  is probably the most common connection string
31 	this(string connectionString) {
32 		this.connectionString = connectionString;
33 		conn = PQconnectdb(toStringz(connectionString));
34 		if(conn is null)
35 			throw new DatabaseException("Unable to allocate PG connection object");
36 		if(PQstatus(conn) != CONNECTION_OK)
37 			throw new DatabaseException(error());
38 		query("SET NAMES 'utf8'"); // D does everything with utf8
39 	}
40 
41 	string connectionString;
42 
43 	~this() {
44 		PQfinish(conn);
45 	}
46 
47 	string sysTimeToValue(SysTime s) {
48 		return "'" ~ escape(s.toISOExtString()) ~ "'::timestamptz";
49 	}
50 
51 	/**
52 		Prepared statement support
53 
54 		This will be added to the Database interface eventually in some form,
55 		but first I need to implement it for all my providers.
56 
57 		The common function of those 4 will be what I put in the interface.
58 	*/
59 
60 	ResultSet executePreparedStatement(T...)(string name, T args) {
61 		char*[args.length] argsStrings;
62 
63 		foreach(idx, arg; args) {
64 			// FIXME: optimize to remove allocations here
65 			static if(!is(typeof(arg) == typeof(null)))
66 				argsStrings[idx] = toStringz(to!string(arg));
67 			// else make it null
68 		}
69 
70 		auto res = PQexecPrepared(conn, toStringz(name), argsStrings.length, argStrings.ptr, 0, null, 0);
71 
72 		int ress = PQresultStatus(res);
73 		if(ress != PGRES_TUPLES_OK
74 			&& ress != PGRES_COMMAND_OK)
75 			throw new DatabaseException(error());
76 
77 		return new PostgresResult(res);
78 
79 	}
80 
81 	///
82 	override void startTransaction() {
83 		query("START TRANSACTION");
84 	}
85 
86 	ResultSet queryImpl(string sql, Variant[] args...) {
87 		sql = escapedVariants(this, sql, args);
88 
89 		bool first_retry = true;
90 
91 		retry:
92 
93 		auto res = PQexec(conn, toStringz(sql));
94 		int ress = PQresultStatus(res);
95 		// https://www.postgresql.org/docs/current/libpq-exec.html
96 		// FIXME: PQresultErrorField can get a lot more info in a more structured way
97 		if(ress != PGRES_TUPLES_OK
98 			&& ress != PGRES_COMMAND_OK)
99 		{
100 			if(first_retry && error() == "no connection to the server\n") {
101 				first_retry = false;
102 				// try to reconnect...
103 				PQfinish(conn);
104 				conn = PQconnectdb(toStringz(connectionString));
105 				if(conn is null)
106 					throw new DatabaseException("Unable to allocate PG connection object");
107 				if(PQstatus(conn) != CONNECTION_OK)
108 					throw new DatabaseException(error());
109 				goto retry;
110 			}
111 			throw new DatabaseException(error());
112 		}
113 
114 		return new PostgresResult(res);
115 	}
116 
117 	string escape(string sqlData) {
118 		char* buffer = (new char[sqlData.length * 2 + 1]).ptr;
119 		ulong size = PQescapeString (buffer, sqlData.ptr, sqlData.length);
120 
121 		string ret = assumeUnique(buffer[0.. cast(size_t) size]);
122 
123 		return ret;
124 	}
125 
126 
127 	///
128 	string error() {
129 		return copyCString(PQerrorMessage(conn));
130 	}
131 
132 	private:
133 		PGconn* conn;
134 }
135 
136 ///
137 class PostgresResult : ResultSet {
138 	// name for associative array to result index
139 	int getFieldIndex(string field) {
140 		if(mapping is null)
141 			makeFieldMapping();
142 		field = field.toLower;
143 		if(field in mapping)
144 			return mapping[field];
145 		else throw new Exception("no mapping " ~ field);
146 	}
147 
148 
149 	string[] fieldNames() {
150 		if(mapping is null)
151 			makeFieldMapping();
152 		return columnNames;
153 	}
154 
155 	// this is a range that can offer other ranges to access it
156 	bool empty() {
157 		return position == numRows;
158 	}
159 
160 	Row front() {
161 		return row;
162 	}
163 
164 	void popFront() {
165 		position++;
166 		if(position < numRows)
167 			fetchNext();
168 	}
169 
170 	override size_t length() {
171 		return numRows;
172 	}
173 
174 	this(PGresult* res) {
175 		this.res = res;
176 		numFields = PQnfields(res);
177 		numRows = PQntuples(res);
178 
179 		if(numRows)
180 			fetchNext();
181 	}
182 
183 	~this() {
184 		PQclear(res);
185 	}
186 
187 	private:
188 		PGresult* res;
189 		int[string] mapping;
190 		string[] columnNames;
191 		int numFields;
192 
193 		int position;
194 
195 		int numRows;
196 
197 		Row row;
198 
199 		void fetchNext() {
200 			Row r;
201 			r.resultSet = this;
202 			string[] row;
203 
204 			for(int i = 0; i < numFields; i++) {
205 				string a;
206 
207 				if(PQgetisnull(res, position, i))
208 					a = null;
209 				else {
210 					a = copyCString(PQgetvalue(res, position, i), PQgetlength(res, position, i));
211 
212 				}
213 				row ~= a;
214 			}
215 
216 			r.row = row;
217 			this.row = r;
218 		}
219 
220 		void makeFieldMapping() {
221 			for(int i = 0; i < numFields; i++) {
222 				string a = copyCString(PQfname(res, i));
223 
224 				columnNames ~= a;
225 				mapping[a] = i;
226 			}
227 
228 		}
229 }
230 
231 string copyCString(const char* c, int actualLength = -1) {
232 	const(char)* a = c;
233 	if(a is null)
234 		return null;
235 
236 	string ret;
237 	if(actualLength == -1)
238 		while(*a) {
239 			ret ~= *a;
240 			a++;
241 		}
242 	else {
243 		ret = a[0..actualLength].idup;
244 	}
245 
246 	return ret;
247 }
248 
249 extern(C) {
250 	struct PGconn {};
251 	struct PGresult {};
252 
253 	void PQfinish(PGconn*);
254 	PGconn* PQconnectdb(const char*);
255 
256 	int PQstatus(PGconn*); // FIXME check return value
257 
258 	const (char*) PQerrorMessage(PGconn*);
259 
260 	PGresult* PQexec(PGconn*, const char*);
261 	void PQclear(PGresult*);
262 
263 	PGresult* PQprepare(PGconn*, const char* stmtName, const char* query, int nParams, const void* paramTypes);
264 
265 	PGresult* PQexecPrepared(PGconn*, const char* stmtName, int nParams, const char** paramValues, const int* paramLengths, const int* paramFormats, int resultFormat);
266 
267 	int PQresultStatus(PGresult*); // FIXME check return value
268 
269 	int PQnfields(PGresult*); // number of fields in a result
270 	const(char*) PQfname(PGresult*, int); // name of field
271 
272 	int PQntuples(PGresult*); // number of rows in result
273 	const(char*) PQgetvalue(PGresult*, int row, int column);
274 
275 	size_t PQescapeString (char *to, const char *from, size_t length);
276 
277 	enum int CONNECTION_OK = 0;
278 	enum int PGRES_COMMAND_OK = 1;
279 	enum int PGRES_TUPLES_OK = 2;
280 
281 	int PQgetlength(const PGresult *res,
282 			int row_number,
283 			int column_number);
284 	int PQgetisnull(const PGresult *res,
285 			int row_number,
286 			int column_number);
287 
288 
289 }
290 
291 /*
292 import std.stdio;
293 void main() {
294 	auto db = new PostgreSql("dbname = test");
295 
296 	db.query("INSERT INTO users (id, name) values (?, ?)", 30, "hello mang");
297 
298 	foreach(line; db.query("SELECT * FROM users")) {
299 		writeln(line[0], line["name"]);
300 	}
301 }
302 */
Suggestion Box / Bug Report