1 module pgsql.connection; 2 3 4 import std.algorithm; 5 import std.array; 6 import std.conv : to; 7 import std.regex : ctRegex, matchFirst; 8 import std.string; 9 import std.traits; 10 import std.uni : sicmp; 11 import std.utf : decode, UseReplacementDchar; 12 13 import pgsql.appender; 14 public import pgsql.exception; 15 import pgsql.packet; 16 import pgsql.protocol; 17 import pgsql.ssl; 18 public import pgsql.type; 19 20 import std.stdio; 21 22 23 struct ConnectionStatus { 24 bool ready; 25 TransactionStatus transaction = TransactionStatus.Idle; 26 27 ulong affected; 28 ulong insertID; 29 } 30 31 32 struct ConnectionNotice { 33 enum Severity : ubyte { 34 ERROR = 1, 35 FATAL, 36 PANIC, 37 WARNING, 38 NOTICE, 39 DEBUG, 40 INFO, 41 LOG, 42 } 43 44 Severity severity; 45 uint position; 46 const(char)[] message; 47 const(char)[] code; 48 const(char)[] hint; 49 const(char)[] detail; 50 const(char)[] where; 51 const(char)[] schema; 52 const(char)[] table; 53 const(char)[] column; 54 const(char)[] type; 55 const(char)[] constraint; 56 57 string toString() const { 58 auto writer = appender!string; 59 toString(writer); 60 return writer.data; 61 } 62 63 void toString(W)(ref W writer) const { 64 import std.format : formattedWrite; 65 writer.formattedWrite("%s(%s) %s", severity, code, message); 66 } 67 } 68 69 70 struct ConnectionSettings { 71 this(const(char)[] connectionString) { 72 parse(connectionString); 73 } 74 75 void parse(const(char)[] connectionString) { 76 auto remaining = connectionString; 77 78 auto indexValue = remaining.indexOf("="); 79 while (!remaining.empty) { 80 auto indexValueEnd = remaining.indexOf(";", indexValue); 81 if (indexValueEnd <= 0) 82 indexValueEnd = remaining.length; 83 84 auto name = strip(remaining[0..indexValue]); 85 auto value = strip(remaining[indexValue+1..indexValueEnd]); 86 87 switch (name) { 88 case "host": 89 host = value; 90 break; 91 case "user": 92 user = value; 93 break; 94 case "pwd": 95 pwd = value; 96 break; 97 case "db": 98 db = value; 99 break; 100 case "port": 101 port = to!ushort(value); 102 break; 103 case "ssl": 104 switch (value) { 105 case "0": 106 case "no": 107 case "false": 108 break; 109 case "require": 110 case "required": 111 ssl.enforce = true; 112 goto case "yes"; 113 case "1": 114 case "yes": 115 case "true": 116 ssl.use = true; 117 break; 118 default: 119 throw new PgSQLException(format("Bad value for 'ssl' on connection string: %s", value)); 120 } 121 break; 122 case "ssl_rootcert": 123 ssl.rootCertFile = value; 124 break; 125 case "ssl_hostname": 126 ssl.hostName = value; 127 break; 128 case "ssl_ciphers": 129 ssl.ciphers = value; 130 break; 131 case "ssl_version": 132 switch (value) with (SSLConfig.Version) { 133 case "any": 134 ssl.sslVersion = any; 135 break; 136 case "ssl3": 137 ssl.sslVersion = ssl3; 138 break; 139 case "tls1": 140 ssl.sslVersion = tls1; 141 break; 142 case "tls1_1": 143 ssl.sslVersion = tls1_1; 144 break; 145 case "tls1_2": 146 ssl.sslVersion = tls1_2; 147 break; 148 case "dtls1": 149 ssl.sslVersion = dtls1; 150 break; 151 default: 152 throw new PgSQLException(format("Bad value for 'ssl_version' on connection string: %s", value)); 153 } 154 break; 155 case "ssl_validate": 156 switch (value) with (SSLConfig.Validate) { 157 case "basic": 158 ssl.validate = basic; 159 break; 160 case "trust": 161 ssl.validate = trust; 162 break; 163 case "identity": 164 ssl.validate = identity; 165 break; 166 default: 167 throw new PgSQLException(format("Bad value for 'ssl_validate' on connection string: %s", value)); 168 } 169 break; 170 default: 171 throw new PgSQLException(format("Bad connection string: %s", connectionString)); 172 } 173 174 if (indexValueEnd == remaining.length) 175 return; 176 177 remaining = remaining[indexValueEnd+1..$]; 178 indexValue = remaining.indexOf("="); 179 } 180 181 throw new PgSQLException(format("Bad connection string: %s", connectionString)); 182 } 183 184 const(char)[] host; 185 const(char)[] user; 186 const(char)[] pwd; 187 const(char)[] db; 188 ushort port = 3306; 189 190 SSLConfig ssl; 191 } 192 193 private struct ServerInfo { 194 const(char)[] versionString; 195 const(char)[] encoding; 196 const(char)[] application; 197 const(char)[] timeZone; 198 199 uint processId; 200 uint cancellationKey; 201 } 202 203 204 @property string placeholders(size_t x, bool parens = true) { 205 if (x) { 206 auto app = appender!string; 207 if (parens) { 208 app.reserve(x + x - 1); 209 210 app.put('('); 211 foreach (i; 0..x - 1) 212 app.put("?,"); 213 app.put('?'); 214 app.put(')'); 215 } else { 216 app.reserve(x + x + 1); 217 218 foreach (i; 0..x - 1) 219 app.put("?,"); 220 app.put('?'); 221 } 222 return app.data; 223 } 224 225 return null; 226 } 227 228 229 @property string placeholders(T)(T x, bool parens = true) if (is(typeof(() { auto y = x.length; }))) { 230 return x.length.placeholders(parens); 231 } 232 233 234 enum ConnectionOptions { 235 Default = 0 236 } 237 238 239 struct Connection(SocketType, ConnectionOptions Options = ConnectionOptions.Default) { 240 void connect(string connectionString) { 241 settings_ = ConnectionSettings(connectionString); 242 connect(); 243 } 244 245 void connect(ConnectionSettings settings) { 246 settings_ = settings; 247 connect(); 248 } 249 250 void connect(const(char)[] host, ushort port, const(char)[] user, const(char)[] pwd, const(char)[] db) { 251 settings_.host = host; 252 settings_.user = user; 253 settings_.pwd = pwd; 254 settings_.db = db; 255 settings_.port = port; 256 257 connect(); 258 } 259 260 const(char)[] schema() const { 261 return schema_; 262 } 263 264 ConnectionSettings settings() const { 265 return settings_; 266 } 267 268 const(ConnectionNotice)[] notices() const { 269 return notices_; 270 } 271 272 void execute(Args...)(const(char)[] sql, Args args) { 273 query(sql, args); 274 } 275 276 void set(T)(const(char)[] variable, T value) { 277 //query("set session ?=?", PgSQLFragment(variable), value); 278 } 279 280 const(char)[] get(const(char)[] variable) { 281 const(char)[] result; 282 query("show session variables like ?", variable, (PgSQLRow row) { 283 result = row[1].peek!(const(char)[]).dup; 284 }); 285 286 return result; 287 } 288 289 void begin() { 290 if (inTransaction) 291 throw new PgSQLErrorException("PgSQL does not support nested transactions - commit or rollback before starting a new transaction"); 292 293 query("start transaction"); 294 295 assert(inTransaction); 296 } 297 298 void commit() { 299 if (!inTransaction) 300 throw new PgSQLErrorException("No active transaction"); 301 302 query("commit"); 303 304 assert(!inTransaction); 305 } 306 307 void rollback() { 308 if (connected) { 309 if (status_.transaction != TransactionStatus.Inside) 310 throw new PgSQLErrorException("No active transaction"); 311 312 query("rollback"); 313 314 assert(!inTransaction); 315 } 316 } 317 318 @property bool inTransaction() const { 319 return connected && (status_.transaction == TransactionStatus.Inside); 320 } 321 322 alias OnDisconnectCallback = scope void delegate(); 323 @property void onDisconnect(OnDisconnectCallback callback) { 324 onDisconnect_ = callback; 325 } 326 327 @property OnDisconnectCallback onDisconnect() const { 328 return onDisconnect_; 329 } 330 331 @property ulong insertID() const { 332 return status_.insertID; 333 } 334 335 @property ulong affected() const { 336 return status_.affected; 337 } 338 339 @property bool connected() const { 340 return socket_.connected; 341 } 342 343 void disconnect() { 344 socket_.close(); 345 } 346 347 void reuse() { 348 onDisconnect_ = null; 349 350 ensureConnected(); 351 352 if (inTransaction) 353 rollback; 354 } 355 356 private: 357 void disconnect_() { 358 disconnect(); 359 if (onDisconnect_) 360 onDisconnect_(); 361 } 362 363 void query(Args...)(const(char)[] sql, Args args) { 364 scope(failure) disconnect_(); 365 366 static if (args.length == 0) { 367 enum shouldDiscard = true; 368 } else { 369 enum shouldDiscard = !isCallable!(args[args.length - 1]); 370 } 371 372 enum argCount = shouldDiscard ? args.length : (args.length - 1); 373 374 static if (argCount) { 375 auto querySQL = prepareSQL(sql, args[0..argCount]); 376 } else { 377 auto querySQL = sql; 378 } 379 380 //writeln(">> ", querySQL); 381 382 send(querySQL); 383 384 auto answer = retrieve(); 385 if (isStatus(answer)) { 386 eatStatuses(answer); 387 } else { 388 static if (!shouldDiscard) { 389 resultSetText(answer, args[args.length - 1]); 390 } else { 391 discardAll(answer); 392 } 393 } 394 } 395 396 void connect() { 397 socket_.connect(settings_.host, settings_.port); 398 399 if (settings_.ssl.use) { 400 auto requestSSL = OutputPacket(&out_); 401 requestSSL.put!uint((1234 << 16) | 5679); 402 requestSSL.finalize(); 403 404 socket_.write(requestSSL.get()); 405 406 ubyte[1] sslStatus; 407 socket_.read(sslStatus); 408 if (cast(char)*sslStatus.ptr == 'S') { 409 socket_.startSSL(settings_.host, settings_.ssl); 410 } else if (cast(char)*sslStatus.ptr == 'N') { 411 if (settings_.ssl.enforce) 412 throw new PgSQLProtocolException("Server doesn't support SSL"); 413 } else { 414 eatStatuses(retrieve(sslStatus[0])); 415 } 416 } 417 418 auto startup = OutputPacket(&out_); 419 startup.put!uint(0x00030000); 420 startup.putz("user"); 421 startup.putz(settings_.user); 422 if (!settings_.db.empty()) { 423 startup.putz("database"); 424 startup.putz(settings_.db); 425 } 426 startup.put!ubyte(0); 427 startup.finalize(0); 428 429 socket_.write(startup.get()); 430 431 if (eatAuth(retrieve())) 432 eatAuth(retrieve()); 433 eatStatuses(retrieve()); 434 } 435 436 void send(Args...)(Args args) { 437 ensureConnected(); 438 439 auto cmd = OutputPacket(OutputMessageType.Query, &out_); 440 foreach (ref arg; args) 441 cmd.put!(const(char[]))(arg); 442 cmd.put!ubyte(0); 443 cmd.finalize(); 444 445 socket_.write(cmd.get()); 446 } 447 448 void ensureConnected() { 449 if (!socket_.connected) 450 connect(); 451 } 452 453 bool isStatus(InputPacket packet) { 454 auto id = packet.type; 455 switch (id) with (InputMessageType) { 456 case ErrorResponse: 457 case NoticeResponse: 458 case ReadyForQuery: 459 case NotificationResponse: 460 case CommandComplete: 461 return true; 462 default: 463 return false; 464 } 465 } 466 467 InputPacket retrieve(ubyte control) { 468 scope(failure) disconnect_(); 469 470 ubyte[4] header; 471 socket_.read(header); 472 473 //writeln("<< ", cast(char)control, " ", cast(InputMessageType)control, " len: ", native!uint(header.ptr) - 4); 474 475 auto len = native!uint(header.ptr) - 4; 476 in_.length = len; 477 socket_.read(in_); 478 479 if (in_.length != len) 480 throw new PgSQLConnectionException("Wrong number of bytes read"); 481 482 return InputPacket(control, &in_); 483 } 484 485 InputPacket retrieve() { 486 scope(failure) disconnect_(); 487 488 ubyte[5] header; 489 socket_.read(header); 490 491 //writeln("<< ", cast(char)header[0], " ", cast(InputMessageType)header[0], " len: ", native!uint(header.ptr + 1) - 4); 492 493 auto len = native!uint(header.ptr + 1) - 4; 494 in_.length = len; 495 socket_.read(in_); 496 497 if (in_.length != len) 498 throw new PgSQLConnectionException("Wrong number of bytes read"); 499 500 return InputPacket(header[0], &in_); 501 } 502 503 bool eatAuth(InputPacket packet) { 504 scope(failure) disconnect_(); 505 506 auto type = cast(InputMessageType)packet.type; 507 508 switch (type) with (InputMessageType) { 509 case Authentication: 510 auto auth = packet.eat!uint; 511 auto reply = OutputPacket(OutputMessageType.PasswordMessage, &out_); 512 513 switch (auth) { 514 case 0: 515 return false; 516 case 2: 517 goto default; 518 case 3: 519 reply.putz(settings_.pwd); 520 break; 521 case 5: 522 static char[32] MD5toHex(T...)(in T data) { 523 import std.ascii : LetterCase; 524 import std.digest.md : md5Of, toHexString; 525 return md5Of(data).toHexString!(LetterCase.lower); 526 } 527 528 auto salt = packet.eat!(ubyte[])(4); 529 reply.put("md5"); 530 reply.putz(MD5toHex(MD5toHex(settings_.pwd, settings_.user), salt)); 531 break; 532 case 6: // SCM 533 case 7: // GSS 534 case 8: 535 case 9: 536 case 10: // SASL 537 case 11: 538 case 12: 539 goto default; 540 default: 541 throw new PgSQLProtocolException(format("Unsupported authentication method: %s", auth)); 542 } 543 544 reply.finalize(0); 545 socket_.write(reply.get()); 546 break; 547 case NoticeResponse: 548 eatNoticeResponse(packet); 549 break; 550 case ErrorResponse: 551 eatNoticeResponse(packet); 552 throwError(true); 553 break; 554 default: 555 throw new PgSQLProtocolException(format("Unexpected message: %s", type)); 556 } 557 558 return true; 559 } 560 561 void eatParameterStatus(InputPacket packet) { 562 assert(packet.type == InputMessageType.ParameterStatus); 563 auto name = packet.eatz(); 564 auto value = packet.eatz(); 565 566 //writeln("parameter ", name, " = ", value); 567 568 switch (name) { 569 case "server_version": 570 server_.versionString = value.dup; 571 break; 572 case "server_encoding": 573 server_.encoding = value.dup; 574 break; 575 case "application_name": 576 server_.application = value.dup; 577 break; 578 case "TimeZone": 579 server_.timeZone = value.dup; 580 break; 581 default: 582 break; 583 } 584 assert(packet.empty()); 585 } 586 587 void eatBackendKeyData(InputPacket packet) { 588 assert(packet.type == InputMessageType.BackendKeyData); 589 590 server_.processId = packet.eat!uint; 591 server_.cancellationKey = packet.eat!uint; 592 } 593 594 void eatNoticeResponse(InputPacket packet) { 595 assert(packet.type == InputMessageType.NoticeResponse || packet.type == InputMessageType.ErrorResponse); 596 597 ConnectionNotice notice; 598 auto field = packet.eat!ubyte; 599 while (field) { 600 auto value = packet.eatz(); 601 import pgsql.row : hashOf; 602 603 switch (field) with (NoticeMessageField) { 604 case Severity: 605 case SeverityLocal: 606 switch (hashOf(value)) with (ConnectionNotice.Severity) { 607 case hashOf("ERROR"): 608 notice.severity = ERROR; 609 break; 610 case hashOf("FATAL"): 611 notice.severity = FATAL; 612 break; 613 case hashOf("PANIC"): 614 notice.severity = PANIC; 615 break; 616 case hashOf("WARNING"): 617 notice.severity = WARNING; 618 break; 619 case hashOf("DEBUG"): 620 notice.severity = DEBUG; 621 break; 622 case hashOf("INFO"): 623 notice.severity = INFO; 624 break; 625 case hashOf("LOG"): 626 notice.severity = LOG; 627 break; 628 default: 629 break; 630 } 631 break; 632 case Code: 633 notice.code = value.idup; 634 break; 635 case Message: 636 notice.message = value.idup; 637 break; 638 case Detail: 639 notice.detail = value.idup; 640 break; 641 case Hint: 642 notice.hint = value.idup; 643 break; 644 case Position: 645 notice.position = value.to!uint; 646 break; 647 case Where: 648 notice.where = value.idup; 649 break; 650 case Schema: 651 notice.schema = value.idup; 652 break; 653 case Table: 654 notice.table = value.idup; 655 break; 656 case Column: 657 notice.column = value.idup; 658 break; 659 case DataType: 660 notice.type = value.idup; 661 break; 662 case Constraint: 663 notice.constraint = value.idup; 664 break; 665 case File: 666 case Line: 667 case Routine: 668 break; 669 default: 670 writeln(" notice: ", cast(char)field, " ", value); 671 break; 672 } 673 field = packet.eat!ubyte; 674 } 675 676 notices_ ~= notice; 677 } 678 679 void eatCommandComplete(InputPacket packet) { 680 assert(packet.type == InputMessageType.CommandComplete); 681 import pgsql.row : hashOf; 682 683 auto tag = packet.eatz().splitter(' '); 684 auto command = tag.front(); 685 tag.popFront(); 686 687 switch (hashOf(command)) { 688 case hashOf("INSERT"): 689 status_.insertID = tag.front().to!ulong; 690 tag.popFront(); 691 status_.affected = tag.front().to!ulong; 692 break; 693 case hashOf("SELECT"): 694 case hashOf("DELETE"): 695 case hashOf("UPDATE"): 696 case hashOf("MOVE"): 697 case hashOf("FETCH"): 698 case hashOf("COPY"): 699 status_.insertID = 0; 700 status_.affected = tag.empty() ? 0 : tag.front().to!ulong; 701 break; 702 case hashOf("CREATE"): 703 case hashOf("DROP"): 704 status_.insertID = 0; 705 break; 706 default: 707 throw new PgSQLProtocolException(format("Unexpected command tag: %s", command)); 708 } 709 } 710 711 auto eatStatus(InputPacket packet) { 712 auto type = cast(InputMessageType)packet.type(); 713 714 switch (type) with (InputMessageType) { 715 case ParameterStatus: 716 eatParameterStatus(packet); 717 break; 718 case BackendKeyData: 719 eatBackendKeyData(packet); 720 break; 721 case ReadyForQuery: 722 status_.transaction = cast(TransactionStatus)packet.eat!ubyte; 723 status_.ready = true; 724 break; 725 case NoticeResponse: 726 eatNoticeResponse(packet); 727 break; 728 case ErrorResponse: 729 eatNoticeResponse(packet); 730 throwError(true); 731 break; 732 case CommandComplete: 733 eatCommandComplete(packet); 734 break; 735 default: 736 throw new PgSQLProtocolException(format("Unexpected message: %s", type)); 737 } 738 739 return type; 740 } 741 742 void throwError(bool force) { 743 foreach (ref notice; notices_) { 744 switch (notice.severity) with (ConnectionNotice.Severity) { 745 case PANIC: 746 case ERROR: 747 case FATAL: 748 throw new PgSQLErrorException(cast(string)notice.message); 749 default: 750 break; 751 } 752 } 753 754 if (force) 755 throw new PgSQLErrorException(cast(string)notices_.front().message); 756 } 757 758 void eatStatuses(InputPacket packet) { 759 notices_.length = 0; 760 761 auto status = eatStatus(packet); 762 while (status != InputMessageType.ReadyForQuery) 763 status = eatStatus(retrieve()); 764 } 765 766 void skipColumnDef(ref InputPacket packet) { 767 packet.skipz(); 768 packet.skip(18); 769 } 770 771 void columnDef(ref InputPacket packet, ref PgSQLColumn def) { 772 auto name = packet.eatz(); 773 columns_ ~= name; 774 def.name = columns_[$-name.length..$]; 775 776 packet.skip(6); 777 778 def.type = cast(PgColumnTypes)packet.eat!uint; 779 def.length = packet.eat!short; 780 def.modifier = packet.eat!int; 781 def.format = cast(FormatCode)packet.eat!short; 782 } 783 784 void columnDefs(size_t count, ref PgSQLColumn[] defs, ref InputPacket packet) { 785 defs.length = count; 786 foreach (i; 0..count) 787 columnDef(packet, defs[i]); 788 } 789 790 bool callHandler(RowHandler)(RowHandler handler, size_t, PgSQLHeader, PgSQLRow row) if ((ParameterTypeTuple!(RowHandler).length == 1) && is(ParameterTypeTuple!(RowHandler)[0] == PgSQLRow)) { 791 static if (is(ReturnType!(RowHandler) == void)) { 792 handler(row); 793 return true; 794 } else { 795 return handler(row); // return type must be bool 796 } 797 } 798 799 bool callHandler(RowHandler)(RowHandler handler, size_t i, PgSQLHeader, PgSQLRow row) if ((ParameterTypeTuple!(RowHandler).length == 2) && isNumeric!(ParameterTypeTuple!(RowHandler)[0]) && is(ParameterTypeTuple!(RowHandler)[1] == PgSQLRow)) { 800 static if (is(ReturnType!(RowHandler) == void)) { 801 handler(cast(ParameterTypeTuple!(RowHandler)[0])i, row); 802 return true; 803 } else { 804 return handler(cast(ParameterTypeTuple!(RowHandler)[0])i, row); // return type must be bool 805 } 806 } 807 808 bool callHandler(RowHandler)(RowHandler handler, size_t, PgSQLHeader header, PgSQLRow row) if ((ParameterTypeTuple!(RowHandler).length == 2) && is(ParameterTypeTuple!(RowHandler)[0] == PgSQLHeader) && is(ParameterTypeTuple!(RowHandler)[1] == PgSQLRow)) { 809 static if (is(ReturnType!(RowHandler) == void)) { 810 handler(header, row); 811 return true; 812 } else { 813 return handler(header, row); // return type must be bool 814 } 815 } 816 817 bool callHandler(RowHandler)(RowHandler handler, size_t i, PgSQLHeader header, PgSQLRow row) if ((ParameterTypeTuple!(RowHandler).length == 3) && isNumeric!(ParameterTypeTuple!(RowHandler)[0]) && is(ParameterTypeTuple!(RowHandler)[1] == PgSQLHeader) && is(ParameterTypeTuple!(RowHandler)[2] == PgSQLRow)) { 818 static if (is(ReturnType!(RowHandler) == void)) { 819 handler(i, header, row); 820 return true; 821 } else { 822 return handler(i, header, row); // return type must be bool 823 } 824 } 825 826 void resultSetRowText(InputPacket packet, PgSQLHeader header, ref PgSQLRow row) { 827 assert(row.columns.length == header.length); 828 829 assert(packet.type == InputMessageType.DataRow); 830 const rowlen = packet.eat!ushort(); 831 832 foreach(i, ref column; header) { 833 if (i < rowlen) { 834 if (column.format == FormatCode.Text) { 835 eatValueText(packet, column, row.get_(i)); 836 } else { 837 assert(false); 838 } 839 } else { 840 row.get_(i) = PgSQLValue(column.name, PgColumnTypes.NULL, null, 0); 841 } 842 } 843 assert(packet.empty); 844 } 845 846 void resultSetText(RowHandler)(InputPacket packet, RowHandler handler) { 847 columns_.length = 0; 848 849 auto columns = cast(size_t)packet.eat!ushort; 850 851 columnDefs(columns, header_, packet); 852 row_.header_(header_); 853 854 size_t index; 855 while (true) { 856 auto row = retrieve(); 857 if (isStatus(row)) { 858 eatStatuses(row); 859 break; 860 } 861 862 resultSetRowText(row, header_, row_); 863 if (!callHandler(handler, index++, header_, row_)) { 864 discardUntilStatus(); 865 break; 866 } 867 } 868 } 869 870 void discardAll(InputPacket packet) { 871 auto columns = cast(size_t)packet.eatLenEnc(); 872 columnDefs(columns, header_, packet); 873 874 discardUntilStatus(); 875 } 876 877 void discardUntilStatus() { 878 while (true) { 879 auto row = retrieve(); 880 if (isStatus(row)) { 881 eatStatuses(row); 882 break; 883 } 884 } 885 } 886 887 auto prepareSQL(Args...)(const(char)[] sql, Args args) { 888 auto estimated = sql.length; 889 size_t argCount; 890 891 foreach(i, arg; args) { 892 static if (is(typeof(arg) == typeof(null))) { 893 ++argCount; 894 estimated += 4; 895 } else static if (is(Unqual!(typeof(arg)) == PgSQLValue)) { 896 ++argCount; 897 final switch(arg.type) with (PgColumnTypes) { 898 case NULL: 899 estimated += 4; 900 break; 901 case CHAR: 902 estimated += 2; 903 break; 904 case BOOL: 905 estimated += 2; 906 break; 907 case INT2: 908 estimated += 6; 909 break; 910 case INT4: 911 estimated += 7; 912 break; 913 case INT8: 914 estimated += 15; 915 break; 916 case REAL: 917 case DOUBLE: 918 estimated += 8; 919 break; 920 case UNKNOWN: 921 case MONEY: 922 case POINT: 923 case LINE: 924 case LSEG: 925 case PATH: 926 case POLYGON: 927 case TINTERVAL: 928 case CIRCLE: 929 case BOX: 930 case JSON: 931 case JSONB: 932 case XML: 933 case MACADDR: 934 case MACADDR8: 935 case INET: 936 case CIDR: 937 case NAME: 938 case TEXT: 939 case INTERVAL: 940 case BIT: 941 case VARBIT: 942 case NUMERIC: 943 case UUID: 944 case CHARA: 945 case BYTEA: 946 case VARCHAR: 947 estimated += 4 + arg.peek!(const(char)[]).length; 948 break; 949 case DATE: 950 estimated += 10; 951 break; 952 case TIME: 953 case TIMETZ: 954 estimated += 22; 955 break; 956 case TIMESTAMP: 957 case TIMESTAMPTZ: 958 estimated += 30; 959 break; 960 } 961 } else static if (isArray!(typeof(arg)) && !isSomeString!(typeof(arg))) { 962 argCount += arg.length; 963 estimated += arg.length * 6; 964 } else static if (isSomeString!(typeof(arg)) || is(Unqual!(typeof(arg)) == PgSQLRawString) || is(Unqual!(typeof(arg)) == PgSQLFragment) || is(Unqual!(typeof(arg)) == PgSQLBinary)) { 965 ++argCount; 966 estimated += 2 + arg.length; 967 } else { 968 ++argCount; 969 estimated += 6; 970 } 971 } 972 973 sql_.clear; 974 sql_.reserve(max(8192, estimated)); 975 976 alias AppendFunc = bool function(ref Appender!(char[]), ref const(char)[] sql, ref size_t, const(void)*) @safe pure nothrow; 977 AppendFunc[Args.length] funcs; 978 const(void)*[Args.length] addrs; 979 980 foreach (i, Arg; Args) { 981 static if (is(Arg == enum)) { 982 funcs[i] = () @trusted { return cast(AppendFunc)&appendNextValue!(OriginalType!Arg); }(); 983 addrs[i] = (ref x) @trusted { return cast(const void*)&x; }(cast(OriginalType!(Unqual!Arg))args[i]); 984 } else { 985 funcs[i] = () @trusted { return cast(AppendFunc)&appendNextValue!(Arg); }(); 986 addrs[i] = (ref x) @trusted { return cast(const void*)&x; }(args[i]); 987 } 988 } 989 990 size_t indexArg; 991 foreach (i; 0..Args.length) { 992 if (!funcs[i](sql_, sql, indexArg, addrs[i])) 993 throw new PgSQLErrorException(format("Wrong number of parameters for query. Got %d but expected %d.", argCount, indexArg)); 994 } 995 996 if (copyUpToNext(sql_, sql)) { 997 ++indexArg; 998 while (copyUpToNext(sql_, sql)) 999 ++indexArg; 1000 throw new PgSQLErrorException(format("Wrong number of parameters for query. Got %d but expected %d.", argCount, indexArg)); 1001 } 1002 1003 return sql_.data; 1004 } 1005 1006 SocketType socket_; 1007 PgSQLHeader header_; 1008 PgSQLRow row_; 1009 char[] columns_; 1010 char[] schema_; 1011 ubyte[] in_; 1012 ubyte[] out_; 1013 ubyte seq_; 1014 Appender!(char[]) sql_; 1015 1016 OnDisconnectCallback onDisconnect_; 1017 ConnectionStatus status_; 1018 ConnectionSettings settings_; 1019 ServerInfo server_; 1020 1021 ConnectionNotice[] notices_; 1022 } 1023 1024 1025 private auto copyUpToNext(ref Appender!(char[]) app, ref const(char)[] sql) { 1026 size_t offset; 1027 dchar quote = '\0'; 1028 1029 while (offset < sql.length) { 1030 auto ch = decode!(UseReplacementDchar.no)(sql, offset); 1031 switch (ch) { 1032 case '?': 1033 if (!quote) { 1034 app.put(sql[0..offset - 1]); 1035 sql = sql[offset..$]; 1036 return true; 1037 } else { 1038 goto default; 1039 } 1040 case '\'': 1041 case '\"': 1042 case '`': 1043 if (quote == ch) { 1044 quote = '\0'; 1045 } else if (!quote) { 1046 quote = ch; 1047 } 1048 goto default; 1049 case '\\': 1050 if (quote && (offset < sql.length)) 1051 decode!(UseReplacementDchar.no)(sql, offset); 1052 goto default; 1053 default: 1054 break; 1055 } 1056 } 1057 app.put(sql[0..offset]); 1058 sql = sql[offset..$]; 1059 return false; 1060 } 1061 1062 private bool appendNextValue(T)(ref Appender!(char[]) app, ref const(char)[] sql, ref size_t indexArg, const(void)* arg) { 1063 static if (isArray!T && !isSomeString!(OriginalType!T)) { 1064 foreach (i, ref v; *cast(T*)arg) { 1065 if (copyUpToNext(app, sql)) { 1066 appendValue(app, v); 1067 ++indexArg; 1068 } else { 1069 return false; 1070 } 1071 } 1072 } else { 1073 if (copyUpToNext(app, sql)) { 1074 appendValue(app, *cast(T*)arg); 1075 ++indexArg; 1076 } else { 1077 return false; 1078 } 1079 } 1080 return true; 1081 } 1082