1 module pgsql.connection;
2 
3 
4 import std.algorithm;
5 import std.array;
6 import std.conv : to;
7 import std.regex : ctRegex, matchFirst;
8 import std.string;
9 import std.traits;
10 import std.uni : sicmp;
11 import std.utf : decode, UseReplacementDchar;
12 
13 import pgsql.appender;
14 public import pgsql.exception;
15 import pgsql.packet;
16 import pgsql.protocol;
17 import pgsql.ssl;
18 public import pgsql.type;
19 
20 import std.stdio;
21 
22 
23 struct ConnectionStatus {
24 	bool ready;
25 	TransactionStatus transaction = TransactionStatus.Idle;
26 
27 	ulong affected;
28 	ulong insertID;
29 }
30 
31 
32 struct ConnectionNotice {
33 	enum Severity : ubyte {
34 		ERROR = 1,
35 		FATAL,
36 		PANIC,
37 		WARNING,
38 		NOTICE,
39 		DEBUG,
40 		INFO,
41 		LOG,
42 	}
43 
44 	Severity severity;
45 	uint position;
46 	const(char)[] message;
47 	const(char)[] code;
48 	const(char)[] hint;
49 	const(char)[] detail;
50 	const(char)[] where;
51 	const(char)[] schema;
52 	const(char)[] table;
53 	const(char)[] column;
54 	const(char)[] type;
55 	const(char)[] constraint;
56 
57 	string toString() const {
58 		auto writer = appender!string;
59 		toString(writer);
60 		return writer.data;
61 	}
62 
63 	void toString(W)(ref W writer) const {
64 		import std.format : formattedWrite;
65 		writer.formattedWrite("%s(%s) %s", severity, code, message);
66 	}
67 }
68 
69 
70 struct ConnectionSettings {
71 	this(const(char)[] connectionString) {
72 		parse(connectionString);
73 	}
74 
75 	void parse(const(char)[] connectionString) {
76 		auto remaining = connectionString;
77 
78 		auto indexValue = remaining.indexOf("=");
79 		while (!remaining.empty) {
80 			auto indexValueEnd = remaining.indexOf(";", indexValue);
81 			if (indexValueEnd <= 0)
82 				indexValueEnd = remaining.length;
83 
84 			auto name = strip(remaining[0..indexValue]);
85 			auto value = strip(remaining[indexValue+1..indexValueEnd]);
86 
87 			switch (name) {
88 			case "host":
89 				host = value;
90 				break;
91 			case "user":
92 				user = value;
93 				break;
94 			case "pwd":
95 				pwd = value;
96 				break;
97 			case "db":
98 				db = value;
99 				break;
100 			case "port":
101 				port = to!ushort(value);
102 				break;
103 			case "ssl":
104 				switch (value) {
105 				case "0":
106 				case "no":
107 				case "false":
108 					break;
109 				case "require":
110 				case "required":
111 					ssl.enforce = true;
112 					goto case "yes";
113 				case "1":
114 				case "yes":
115 				case "true":
116 					ssl.use = true;
117 					break;
118 				default:
119 					throw new PgSQLException(format("Bad value for 'ssl' on connection string: %s", value));
120 				}
121 				break;
122 			case "ssl_rootcert":
123 				ssl.rootCertFile = value;
124 				break;
125 			case "ssl_hostname":
126 				ssl.hostName = value;
127 				break;
128 			case "ssl_ciphers":
129 				ssl.ciphers = value;
130 				break;
131 			case "ssl_version":
132 				switch (value) with (SSLConfig.Version) {
133 				case "any":
134 					ssl.sslVersion = any;
135 					break;
136 				case "ssl3":
137 					ssl.sslVersion = ssl3;
138 					break;
139 				case "tls1":
140 					ssl.sslVersion = tls1;
141 					break;
142 				case "tls1_1":
143 					ssl.sslVersion = tls1_1;
144 					break;
145 				case "tls1_2":
146 					ssl.sslVersion = tls1_2;
147 					break;
148 				case "dtls1":
149 					ssl.sslVersion = dtls1;
150 					break;
151 				default:
152 					throw new PgSQLException(format("Bad value for 'ssl_version' on connection string: %s", value));
153 				}
154 				break;
155 			case "ssl_validate":
156 				switch (value) with (SSLConfig.Validate) {
157 				case "basic":
158 					ssl.validate = basic;
159 					break;
160 				case "trust":
161 					ssl.validate = trust;
162 					break;
163 				case "identity":
164 					ssl.validate = identity;
165 					break;
166 				default:
167 					throw new PgSQLException(format("Bad value for 'ssl_validate' on connection string: %s", value));
168 				}
169 				break;
170 			default:
171 				throw new PgSQLException(format("Bad connection string: %s", connectionString));
172 			}
173 
174 			if (indexValueEnd == remaining.length)
175 				return;
176 
177 			remaining = remaining[indexValueEnd+1..$];
178 			indexValue = remaining.indexOf("=");
179 		}
180 
181 		throw new PgSQLException(format("Bad connection string: %s", connectionString));
182 	}
183 
184 	const(char)[] host;
185 	const(char)[] user;
186 	const(char)[] pwd;
187 	const(char)[] db;
188 	ushort port = 3306;
189 
190 	SSLConfig ssl;
191 }
192 
193 private struct ServerInfo {
194 	const(char)[] versionString;
195 	const(char)[] encoding;
196 	const(char)[] application;
197 	const(char)[] timeZone;
198 
199 	uint processId;
200 	uint cancellationKey;
201 }
202 
203 
204 @property string placeholders(size_t x, bool parens = true) {
205 	if (x) {
206 		auto app = appender!string;
207 		if (parens) {
208 			app.reserve(x + x - 1);
209 
210 			app.put('(');
211 			foreach (i; 0..x - 1)
212 				app.put("?,");
213 			app.put('?');
214 			app.put(')');
215 		} else {
216 			app.reserve(x + x + 1);
217 
218 			foreach (i; 0..x - 1)
219 				app.put("?,");
220 			app.put('?');
221 		}
222 		return app.data;
223 	}
224 
225 	return null;
226 }
227 
228 
229 @property string placeholders(T)(T x, bool parens = true) if (is(typeof(() { auto y = x.length; }))) {
230 	return x.length.placeholders(parens);
231 }
232 
233 
234 enum ConnectionOptions {
235 	Default		= 0
236 }
237 
238 
239 struct Connection(SocketType, ConnectionOptions Options = ConnectionOptions.Default) {
240 	void connect(string connectionString) {
241 		settings_ = ConnectionSettings(connectionString);
242 		connect();
243 	}
244 
245 	void connect(ConnectionSettings settings) {
246 		settings_ = settings;
247 		connect();
248 	}
249 
250 	void connect(const(char)[] host, ushort port, const(char)[] user, const(char)[] pwd, const(char)[] db) {
251 		settings_.host = host;
252 		settings_.user = user;
253 		settings_.pwd = pwd;
254 		settings_.db = db;
255 		settings_.port = port;
256 
257 		connect();
258 	}
259 
260 	const(char)[] schema() const {
261 		return schema_;
262 	}
263 
264 	ConnectionSettings settings() const {
265 		return settings_;
266 	}
267 
268 	const(ConnectionNotice)[] notices() const {
269 		return notices_;
270 	}
271 
272 	void execute(Args...)(const(char)[] sql, Args args) {
273 		query(sql, args);
274 	}
275 
276 	void set(T)(const(char)[] variable, T value) {
277 		//query("set session ?=?", PgSQLFragment(variable), value);
278 	}
279 
280 	const(char)[] get(const(char)[] variable) {
281 		const(char)[] result;
282 		query("show session variables like ?", variable, (PgSQLRow row) {
283 			result = row[1].peek!(const(char)[]).dup;
284 		});
285 
286 		return result;
287 	}
288 
289 	void begin() {
290 		if (inTransaction)
291 			throw new PgSQLErrorException("PgSQL does not support nested transactions - commit or rollback before starting a new transaction");
292 
293 		query("start transaction");
294 
295 		assert(inTransaction);
296 	}
297 
298 	void commit() {
299 		if (!inTransaction)
300 			throw new PgSQLErrorException("No active transaction");
301 
302 		query("commit");
303 
304 		assert(!inTransaction);
305 	}
306 
307 	void rollback() {
308 		if (connected) {
309 			if (status_.transaction != TransactionStatus.Inside)
310 				throw new PgSQLErrorException("No active transaction");
311 
312 			query("rollback");
313 
314 			assert(!inTransaction);
315 		}
316 	}
317 
318 	@property bool inTransaction() const {
319 		return connected && (status_.transaction == TransactionStatus.Inside);
320 	}
321 
322 	alias OnDisconnectCallback = scope void delegate();
323 	@property void onDisconnect(OnDisconnectCallback callback) {
324 		onDisconnect_ = callback;
325 	}
326 
327 	@property OnDisconnectCallback onDisconnect() const {
328 		return onDisconnect_;
329 	}
330 
331 	@property ulong insertID() const {
332 		return status_.insertID;
333 	}
334 
335 	@property ulong affected() const {
336 		return status_.affected;
337 	}
338 
339 	@property bool connected() const {
340 		return socket_.connected;
341 	}
342 
343 	void disconnect() {
344 		socket_.close();
345 	}
346 
347 	void reuse() {
348 		onDisconnect_ = null;
349 
350 		ensureConnected();
351 
352 		if (inTransaction)
353 			rollback;
354 	}
355 
356 private:
357 	void disconnect_() {
358 		disconnect();
359 		if (onDisconnect_)
360 			onDisconnect_();
361 	}
362 
363 	void query(Args...)(const(char)[] sql, Args args) {
364 		scope(failure) disconnect_();
365 
366 		static if (args.length == 0) {
367 			enum shouldDiscard = true;
368 		} else {
369 			enum shouldDiscard = !isCallable!(args[args.length - 1]);
370 		}
371 
372 		enum argCount = shouldDiscard ? args.length : (args.length - 1);
373 
374 		static if (argCount) {
375 			auto querySQL = prepareSQL(sql, args[0..argCount]);
376 		} else {
377 			auto querySQL = sql;
378 		}
379 
380 		//writeln(">> ", querySQL);
381 
382 		send(querySQL);
383 
384 		auto answer = retrieve();
385 		if (isStatus(answer)) {
386 			eatStatuses(answer);
387 		} else {
388 			static if (!shouldDiscard) {
389 				resultSetText(answer, args[args.length - 1]);
390 			} else {
391 				discardAll(answer);
392 			}
393 		}
394 	}
395 
396 	void connect() {
397 		socket_.connect(settings_.host, settings_.port);
398 
399 		if (settings_.ssl.use) {
400 			auto requestSSL = OutputPacket(&out_);
401 			requestSSL.put!uint((1234 << 16) | 5679);
402 			requestSSL.finalize();
403 
404 			socket_.write(requestSSL.get());
405 
406 			ubyte[1] sslStatus;
407 			socket_.read(sslStatus);
408 			if (cast(char)*sslStatus.ptr == 'S') {
409 				socket_.startSSL(settings_.host, settings_.ssl);
410 			} else if (cast(char)*sslStatus.ptr == 'N') {
411 				if (settings_.ssl.enforce)
412 					throw new PgSQLProtocolException("Server doesn't support SSL");
413 			} else {
414 				eatStatuses(retrieve(sslStatus[0]));
415 			}
416 		}
417 
418 		auto startup = OutputPacket(&out_);
419 		startup.put!uint(0x00030000);
420 		startup.putz("user");
421 		startup.putz(settings_.user);
422 		if (!settings_.db.empty()) {
423 			startup.putz("database");
424 			startup.putz(settings_.db);
425 		}
426 		startup.put!ubyte(0);
427 		startup.finalize(0);
428 
429 		socket_.write(startup.get());
430 
431 		if (eatAuth(retrieve()))
432 			eatAuth(retrieve());
433 		eatStatuses(retrieve());
434 	}
435 
436 	void send(Args...)(Args args) {
437 		ensureConnected();
438 
439 		auto cmd = OutputPacket(OutputMessageType.Query, &out_);
440 		foreach (ref arg; args)
441 			cmd.put!(const(char[]))(arg);
442 		cmd.put!ubyte(0);
443 		cmd.finalize();
444 
445 		socket_.write(cmd.get());
446 	}
447 
448 	void ensureConnected() {
449 		if (!socket_.connected)
450 			connect();
451 	}
452 
453 	bool isStatus(InputPacket packet) {
454 		auto id = packet.type;
455 		switch (id) with (InputMessageType) {
456 		case ErrorResponse:
457 		case NoticeResponse:
458 		case ReadyForQuery:
459 		case NotificationResponse:
460 		case CommandComplete:
461 			return true;
462 		default:
463 			return false;
464 		}
465 	}
466 
467 	InputPacket retrieve(ubyte control) {
468 		scope(failure) disconnect_();
469 
470 		ubyte[4] header;
471 		socket_.read(header);
472 
473 		//writeln("<< ", cast(char)control, " ", cast(InputMessageType)control, " len: ", native!uint(header.ptr) - 4);
474 
475 		auto len = native!uint(header.ptr) - 4;
476 		in_.length = len;
477 		socket_.read(in_);
478 
479 		if (in_.length != len)
480 			throw new PgSQLConnectionException("Wrong number of bytes read");
481 
482 		return InputPacket(control, &in_);
483 	}
484 
485 	InputPacket retrieve() {
486 		scope(failure) disconnect_();
487 
488 		ubyte[5] header;
489 		socket_.read(header);
490 
491 		//writeln("<< ", cast(char)header[0], " ", cast(InputMessageType)header[0], " len: ", native!uint(header.ptr + 1) - 4);
492 
493 		auto len = native!uint(header.ptr + 1) - 4;
494 		in_.length = len;
495 		socket_.read(in_);
496 
497 		if (in_.length != len)
498 			throw new PgSQLConnectionException("Wrong number of bytes read");
499 
500 		return InputPacket(header[0], &in_);
501 	}
502 
503 	bool eatAuth(InputPacket packet) {
504 		scope(failure) disconnect_();
505 
506 		auto type = cast(InputMessageType)packet.type;
507 
508 		switch (type) with (InputMessageType) {
509 		case Authentication:
510 			auto auth = packet.eat!uint;
511 			auto reply = OutputPacket(OutputMessageType.PasswordMessage, &out_);
512 
513 			switch (auth) {
514 			case 0:
515 				return false;
516 			case 2:
517 				goto default;
518 			case 3:
519 				reply.putz(settings_.pwd);
520 				break;
521 			case 5:
522 				static char[32] MD5toHex(T...)(in T data) {
523 					import std.ascii : LetterCase;
524 					import std.digest.md : md5Of, toHexString;
525 					return md5Of(data).toHexString!(LetterCase.lower);
526 				}
527 
528 				auto salt = packet.eat!(ubyte[])(4);
529 				reply.put("md5");
530 				reply.putz(MD5toHex(MD5toHex(settings_.pwd, settings_.user), salt));
531 				break;
532 			case 6: // SCM
533 			case 7: // GSS
534 			case 8:
535 			case 9:
536 			case 10: // SASL
537 			case 11:
538 			case 12:
539 				goto default;
540 			default:
541 				throw new PgSQLProtocolException(format("Unsupported authentication method: %s", auth));
542 			}
543 
544 			reply.finalize(0);
545 			socket_.write(reply.get());
546 			break;
547 		case NoticeResponse:
548 			eatNoticeResponse(packet);
549 			break;
550 		case ErrorResponse:
551 			eatNoticeResponse(packet);
552 			throwError(true);
553 			break;
554 		default:
555 			throw new PgSQLProtocolException(format("Unexpected message: %s", type));
556 		}
557 
558 		return true;
559 	}
560 
561 	void eatParameterStatus(InputPacket packet) {
562 		assert(packet.type == InputMessageType.ParameterStatus);
563 		auto name = packet.eatz();
564 		auto value = packet.eatz();
565 
566 		//writeln("parameter ", name, " = ", value);
567 
568 		switch (name) {
569 		case "server_version":
570 			server_.versionString = value.dup;
571 			break;
572 		case "server_encoding":
573 			server_.encoding = value.dup;
574 			break;
575 		case "application_name":
576 			server_.application = value.dup;
577 			break;
578 		case "TimeZone":
579 			server_.timeZone = value.dup;
580 			break;
581 		default:
582 			break;
583 		}
584 		assert(packet.empty());
585 	}
586 
587 	void eatBackendKeyData(InputPacket packet) {
588 		assert(packet.type == InputMessageType.BackendKeyData);
589 
590 		server_.processId = packet.eat!uint;
591 		server_.cancellationKey = packet.eat!uint;
592 	}
593 
594 	void eatNoticeResponse(InputPacket packet) {
595 		assert(packet.type == InputMessageType.NoticeResponse || packet.type == InputMessageType.ErrorResponse);
596 
597 		ConnectionNotice notice;
598 		auto field = packet.eat!ubyte;
599 		while (field) {
600 			auto value = packet.eatz();
601 			import pgsql.row : hashOf;
602 
603 			switch (field) with (NoticeMessageField) {
604 			case Severity:
605 			case SeverityLocal:
606 				switch (hashOf(value)) with (ConnectionNotice.Severity) {
607 				case hashOf("ERROR"):
608 					notice.severity = ERROR;
609 					break;
610 				case hashOf("FATAL"):
611 					notice.severity = FATAL;
612 					break;
613 				case hashOf("PANIC"):
614 					notice.severity = PANIC;
615 					break;
616 				case hashOf("WARNING"):
617 					notice.severity = WARNING;
618 					break;
619 				case hashOf("DEBUG"):
620 					notice.severity = DEBUG;
621 					break;
622 				case hashOf("INFO"):
623 					notice.severity = INFO;
624 					break;
625 				case hashOf("LOG"):
626 					notice.severity = LOG;
627 					break;
628 				default:
629 					break;
630 				}
631 				break;
632 			case Code:
633 				notice.code = value.idup;
634 				break;
635 			case Message:
636 				notice.message = value.idup;
637 				break;
638 			case Detail:
639 				notice.detail = value.idup;
640 				break;
641 			case Hint:
642 				notice.hint = value.idup;
643 				break;
644 			case Position:
645 				notice.position = value.to!uint;
646 				break;
647 			case Where:
648 				notice.where = value.idup;
649 				break;
650 			case Schema:
651 				notice.schema = value.idup;
652 				break;
653 			case Table:
654 				notice.table = value.idup;
655 				break;
656 			case Column:
657 				notice.column = value.idup;
658 				break;
659 			case DataType:
660 				notice.type = value.idup;
661 				break;
662 			case Constraint:
663 				notice.constraint = value.idup;
664 				break;
665 			case File:
666 			case Line:
667 			case Routine:
668 				break;
669 			default:
670 				writeln("  notice: ", cast(char)field, " ", value);
671 				break;
672 			}
673 			field = packet.eat!ubyte;
674 		}
675 
676 		notices_ ~= notice;
677 	}
678 
679 	void eatCommandComplete(InputPacket packet) {
680 		assert(packet.type == InputMessageType.CommandComplete);
681 		import pgsql.row : hashOf;
682 
683 		auto tag = packet.eatz().splitter(' ');
684 		auto command = tag.front();
685 		tag.popFront();
686 
687 		switch (hashOf(command)) {
688 		case hashOf("INSERT"):
689 			status_.insertID = tag.front().to!ulong;
690 			tag.popFront();
691 			status_.affected = tag.front().to!ulong;
692 			break;
693 		case hashOf("SELECT"):
694 		case hashOf("DELETE"):
695 		case hashOf("UPDATE"):
696 		case hashOf("MOVE"):
697 		case hashOf("FETCH"):
698 		case hashOf("COPY"):
699 			status_.insertID = 0;
700 			status_.affected = tag.empty() ? 0 : tag.front().to!ulong;
701 			break;
702 		case hashOf("CREATE"):
703 		case hashOf("DROP"):
704 			status_.insertID = 0;
705 			break;
706 		default:
707 			throw new PgSQLProtocolException(format("Unexpected command tag: %s", command));
708 		}
709 	}
710 
711 	auto eatStatus(InputPacket packet) {
712 		auto type = cast(InputMessageType)packet.type();
713 
714 		switch (type) with (InputMessageType) {
715 		case ParameterStatus:
716 			eatParameterStatus(packet);
717 			break;
718 		case BackendKeyData:
719 			eatBackendKeyData(packet);
720 			break;
721 		case ReadyForQuery:
722 			status_.transaction = cast(TransactionStatus)packet.eat!ubyte;
723 			status_.ready = true;
724 			break;
725 		case NoticeResponse:
726 			eatNoticeResponse(packet);
727 			break;
728 		case ErrorResponse:
729 			eatNoticeResponse(packet);
730 			throwError(true);
731 			break;
732 		case CommandComplete:
733 			eatCommandComplete(packet);
734 			break;
735 		default:
736 			throw new PgSQLProtocolException(format("Unexpected message: %s", type));
737 		}
738 
739 		return type;
740 	}
741 
742 	void throwError(bool force) {
743 		foreach (ref notice; notices_) {
744 			switch (notice.severity) with (ConnectionNotice.Severity) {
745 			case PANIC:
746 			case ERROR:
747 			case FATAL:
748 				throw new PgSQLErrorException(cast(string)notice.message);
749 			default:
750 				break;
751 			}
752 		}
753 
754 		if (force)
755 			throw new PgSQLErrorException(cast(string)notices_.front().message);
756 	}
757 
758 	void eatStatuses(InputPacket packet) {
759 		notices_.length = 0;
760 
761 		auto status = eatStatus(packet);
762 		while (status != InputMessageType.ReadyForQuery)
763 			status = eatStatus(retrieve());
764 	}
765 
766 	void skipColumnDef(ref InputPacket packet) {
767 		packet.skipz();
768 		packet.skip(18);
769 	}
770 
771 	void columnDef(ref InputPacket packet, ref PgSQLColumn def) {
772 		auto name = packet.eatz();
773 		columns_ ~= name;
774 		def.name = columns_[$-name.length..$];
775 
776 		packet.skip(6);
777 
778 		def.type = cast(PgColumnTypes)packet.eat!uint;
779 		def.length = packet.eat!short;
780 		def.modifier = packet.eat!int;
781 		def.format = cast(FormatCode)packet.eat!short;
782 	}
783 
784 	void columnDefs(size_t count, ref PgSQLColumn[] defs, ref InputPacket packet) {
785 		defs.length = count;
786 		foreach (i; 0..count)
787 			columnDef(packet, defs[i]);
788 	}
789 
790 	bool callHandler(RowHandler)(RowHandler handler, size_t, PgSQLHeader, PgSQLRow row) if ((ParameterTypeTuple!(RowHandler).length == 1) && is(ParameterTypeTuple!(RowHandler)[0] == PgSQLRow)) {
791 		static if (is(ReturnType!(RowHandler) == void)) {
792 			handler(row);
793 			return true;
794 		} else {
795 			return handler(row); // return type must be bool
796 		}
797 	}
798 
799 	bool callHandler(RowHandler)(RowHandler handler, size_t i, PgSQLHeader, PgSQLRow row) if ((ParameterTypeTuple!(RowHandler).length == 2) && isNumeric!(ParameterTypeTuple!(RowHandler)[0]) && is(ParameterTypeTuple!(RowHandler)[1] == PgSQLRow)) {
800 		static if (is(ReturnType!(RowHandler) == void)) {
801 			handler(cast(ParameterTypeTuple!(RowHandler)[0])i, row);
802 			return true;
803 		} else {
804 			return handler(cast(ParameterTypeTuple!(RowHandler)[0])i, row); // return type must be bool
805 		}
806 	}
807 
808 	bool callHandler(RowHandler)(RowHandler handler, size_t, PgSQLHeader header, PgSQLRow row) if ((ParameterTypeTuple!(RowHandler).length == 2) && is(ParameterTypeTuple!(RowHandler)[0] == PgSQLHeader) && is(ParameterTypeTuple!(RowHandler)[1] == PgSQLRow)) {
809 		static if (is(ReturnType!(RowHandler) == void)) {
810 			handler(header, row);
811 			return true;
812 		} else {
813 			return handler(header, row); // return type must be bool
814 		}
815 	}
816 
817 	bool callHandler(RowHandler)(RowHandler handler, size_t i, PgSQLHeader header, PgSQLRow row) if ((ParameterTypeTuple!(RowHandler).length == 3) && isNumeric!(ParameterTypeTuple!(RowHandler)[0]) && is(ParameterTypeTuple!(RowHandler)[1] == PgSQLHeader) && is(ParameterTypeTuple!(RowHandler)[2] == PgSQLRow)) {
818 		static if (is(ReturnType!(RowHandler) == void)) {
819 			handler(i, header, row);
820 			return true;
821 		} else {
822 			return handler(i, header, row); // return type must be bool
823 		}
824 	}
825 
826 	void resultSetRowText(InputPacket packet, PgSQLHeader header, ref PgSQLRow row) {
827 		assert(row.columns.length == header.length);
828 
829 		assert(packet.type == InputMessageType.DataRow);
830 		const rowlen = packet.eat!ushort();
831 
832 		foreach(i, ref column; header) {
833 			if (i < rowlen) {
834 				if (column.format == FormatCode.Text) {
835 					eatValueText(packet, column, row.get_(i));
836 				} else {
837 					assert(false);
838 				}
839 			} else {
840 				row.get_(i) = PgSQLValue(column.name, PgColumnTypes.NULL, null, 0);
841 			}
842 		}
843 		assert(packet.empty);
844 	}
845 
846 	void resultSetText(RowHandler)(InputPacket packet, RowHandler handler) {
847 		columns_.length = 0;
848 
849 		auto columns = cast(size_t)packet.eat!ushort;
850 
851 		columnDefs(columns, header_, packet);
852 		row_.header_(header_);
853 
854 		size_t index;
855 		while (true) {
856 			auto row = retrieve();
857 			if (isStatus(row)) {
858 				eatStatuses(row);
859 				break;
860 			}
861 
862 			resultSetRowText(row, header_, row_);
863 			if (!callHandler(handler, index++, header_, row_)) {
864 				discardUntilStatus();
865 				break;
866 			}
867 		}
868 	}
869 
870 	void discardAll(InputPacket packet) {
871 		auto columns = cast(size_t)packet.eatLenEnc();
872 		columnDefs(columns, header_, packet);
873 
874 		discardUntilStatus();
875 	}
876 
877 	void discardUntilStatus() {
878 		while (true) {
879 			auto row = retrieve();
880 			if (isStatus(row)) {
881 				eatStatuses(row);
882 				break;
883 			}
884 		}
885 	}
886 
887 	auto prepareSQL(Args...)(const(char)[] sql, Args args) {
888 		auto estimated = sql.length;
889 		size_t argCount;
890 
891 		foreach(i, arg; args) {
892 			static if (is(typeof(arg) == typeof(null))) {
893 				++argCount;
894 				estimated += 4;
895 			} else static if (is(Unqual!(typeof(arg)) == PgSQLValue)) {
896 				++argCount;
897 				final switch(arg.type) with (PgColumnTypes) {
898 				case NULL:
899 					estimated += 4;
900 					break;
901 				case CHAR:
902 					estimated += 2;
903 					break;
904 				case BOOL:
905 					estimated += 2;
906 					break;
907 				case INT2:
908 					estimated += 6;
909 					break;
910 				case INT4:
911 					estimated += 7;
912 					break;
913 				case INT8:
914 					estimated += 15;
915 					break;
916 				case REAL:
917 				case DOUBLE:
918 					estimated += 8;
919 					break;
920 				case UNKNOWN:
921 				case MONEY:
922 				case POINT:
923 				case LINE:
924 				case LSEG:
925 				case PATH:
926 				case POLYGON:
927 				case TINTERVAL:
928 				case CIRCLE:
929 				case BOX:
930 				case JSON:
931 				case JSONB:
932 				case XML:
933 				case MACADDR:
934 				case MACADDR8:
935 				case INET:
936 				case CIDR:
937 				case NAME:
938 				case TEXT:
939 				case INTERVAL:
940 				case BIT:
941 				case VARBIT:
942 				case NUMERIC:
943 				case UUID:
944 				case CHARA:
945 				case BYTEA:
946 				case VARCHAR:
947 					estimated += 4 + arg.peek!(const(char)[]).length;
948 					break;
949 				case DATE:
950 					estimated += 10;
951 					break;
952 				case TIME:
953 				case TIMETZ:
954 					estimated += 22;
955 					break;
956 				case TIMESTAMP:
957 				case TIMESTAMPTZ:
958 					estimated += 30;
959 					break;
960 				}
961 			} else static if (isArray!(typeof(arg)) && !isSomeString!(typeof(arg))) {
962 				argCount += arg.length;
963 				estimated += arg.length * 6;
964 			} else static if (isSomeString!(typeof(arg)) || is(Unqual!(typeof(arg)) == PgSQLRawString) || is(Unqual!(typeof(arg)) == PgSQLFragment) || is(Unqual!(typeof(arg)) == PgSQLBinary)) {
965 				++argCount;
966 				estimated += 2 + arg.length;
967 			} else {
968 				++argCount;
969 				estimated += 6;
970 			}
971 		}
972 
973 		sql_.clear;
974 		sql_.reserve(max(8192, estimated));
975 
976 		alias AppendFunc = bool function(ref Appender!(char[]), ref const(char)[] sql, ref size_t, const(void)*) @safe pure nothrow;
977 		AppendFunc[Args.length] funcs;
978 		const(void)*[Args.length] addrs;
979 
980 		foreach (i, Arg; Args) {
981 			static if (is(Arg == enum)) {
982 				funcs[i] = () @trusted { return cast(AppendFunc)&appendNextValue!(OriginalType!Arg); }();
983 				addrs[i] = (ref x) @trusted { return cast(const void*)&x; }(cast(OriginalType!(Unqual!Arg))args[i]);
984 			} else {
985 				funcs[i] = () @trusted { return cast(AppendFunc)&appendNextValue!(Arg); }();
986 				addrs[i] = (ref x) @trusted { return cast(const void*)&x; }(args[i]);
987 			}
988 		}
989 
990 		size_t indexArg;
991 		foreach (i; 0..Args.length) {
992 			if (!funcs[i](sql_, sql, indexArg, addrs[i]))
993 				throw new PgSQLErrorException(format("Wrong number of parameters for query. Got %d but expected %d.", argCount, indexArg));
994 		}
995 
996 		if (copyUpToNext(sql_, sql)) {
997 			++indexArg;
998 			while (copyUpToNext(sql_, sql))
999 				++indexArg;
1000 			throw new PgSQLErrorException(format("Wrong number of parameters for query. Got %d but expected %d.", argCount, indexArg));
1001 		}
1002 
1003 		return sql_.data;
1004 	}
1005 
1006 	SocketType socket_;
1007 	PgSQLHeader header_;
1008 	PgSQLRow row_;
1009 	char[] columns_;
1010 	char[] schema_;
1011 	ubyte[] in_;
1012 	ubyte[] out_;
1013 	ubyte seq_;
1014 	Appender!(char[]) sql_;
1015 
1016 	OnDisconnectCallback onDisconnect_;
1017 	ConnectionStatus status_;
1018 	ConnectionSettings settings_;
1019 	ServerInfo server_;
1020 
1021 	ConnectionNotice[] notices_;
1022 }
1023 
1024 
1025 private auto copyUpToNext(ref Appender!(char[]) app, ref const(char)[] sql) {
1026 	size_t offset;
1027 	dchar quote = '\0';
1028 
1029 	while (offset < sql.length) {
1030 		auto ch = decode!(UseReplacementDchar.no)(sql, offset);
1031 		switch (ch) {
1032 		case '?':
1033 			if (!quote) {
1034 				app.put(sql[0..offset - 1]);
1035 				sql = sql[offset..$];
1036 				return true;
1037 			} else {
1038 				goto default;
1039 			}
1040 		case '\'':
1041 		case '\"':
1042 		case '`':
1043 			if (quote == ch) {
1044 				quote = '\0';
1045 			} else if (!quote) {
1046 				quote = ch;
1047 			}
1048 			goto default;
1049 		case '\\':
1050 			if (quote && (offset < sql.length))
1051 				decode!(UseReplacementDchar.no)(sql, offset);
1052 			goto default;
1053 		default:
1054 			break;
1055 		}
1056 	}
1057 	app.put(sql[0..offset]);
1058 	sql = sql[offset..$];
1059 	return false;
1060 }
1061 
1062 private bool appendNextValue(T)(ref Appender!(char[]) app, ref const(char)[] sql, ref size_t indexArg, const(void)* arg) {
1063 	static if (isArray!T && !isSomeString!(OriginalType!T)) {
1064 		foreach (i, ref v; *cast(T*)arg) {
1065 			if (copyUpToNext(app, sql)) {
1066 				appendValue(app, v);
1067 				++indexArg;
1068 			} else {
1069 				return false;
1070 			}
1071 		}
1072 	} else {
1073 		if (copyUpToNext(app, sql)) {
1074 			appendValue(app, *cast(T*)arg);
1075 			++indexArg;
1076 		} else {
1077 			return false;
1078 		}
1079 	}
1080 	return true;
1081 }
1082