1 module sqlited;
2 
3 //          Copyright Stefan Koch 2015 - 2018.
4 // Distributed under the Boost Software License, Version 1.0.
5 //    (See accompanying file LICENSE.md or copy at
6 //          http://www.boost.org/LICENSE_1_0.txt)
7 
8 /****************************
9  * SQLite-D SQLite 3 Reader *
10  * By Stefan Koch 2016      *
11  ****************************/
12 
13 import utils;
14 import varint;
15 
16 struct Database {
17 	string dbFilename;
18 	const ubyte[] data;
19 	alias Row = BTreePage.Row;
20 	alias BTreePageType = BTreePage.BTreePageType;
21 
22 	SQLiteHeader _cachedHeader;
23 
24 	SQLiteHeader header() pure const {
25 		return _cachedHeader;
26 	}
27 
28 	BTreePage rootPage() pure const {
29 		return pages[0];
30 	}
31 
32 	PageRange pages() pure const {
33 		return PageRange(cast(const)data, header.pageSize,
34 			usablePageSize, cast(const uint)(data.length / header.pageSize));
35 	}
36 
37 	const(uint) usablePageSize() pure const {
38 		return header.pageSize - header.reserved;
39 	}
40 
41 	struct PageRange {
42 		const ubyte[] data;
43 		const uint pageSize;
44 		const uint usablePageSize;
45 
46 		const uint numberOfPages;
47 	pure :
48 		@property uint length() const {
49 			return numberOfPages;
50 		}
51 
52 		BTreePage opIndex(const uint pageNumber) pure const {
53 			assert(pageNumber <= numberOfPages,
54 				"Attempting to go to invalid page");
55 
56 			const size_t pageBegin =
57 				(pageSize * pageNumber);
58 			const size_t pageEnd =
59 				(pageSize * pageNumber) + usablePageSize;
60 
61 			return BTreePage(data[pageBegin .. pageEnd], pageNumber ? 0 : 100);
62 		}
63 
64 		this(const ubyte[] data, const uint pageSize,
65 			const uint usablePageSize, const uint numberOfPages) pure {
66 			this.data = data;
67 			this.pageSize = pageSize;
68 			this.usablePageSize = usablePageSize;
69 			this.numberOfPages = numberOfPages;
70 			assert(usablePageSize >= 512);
71 		}
72 
73 	}
74 
75 	static struct Payload {
76 		alias SerialTypeCodeEnum = SerialTypeCode.SerialTypeCodeEnum;
77 		static struct SerialTypeCode {
78 			alias SerialTypeCodeEnum this;
79 
80 			static enum SerialTypeCodeEnum : long {
81 				NULL = (0),
82 				int8 = (1),
83 				int16 = (2),
84 				int24 = (3),
85 				int32 = (4),
86 				int48 = (5),
87 				int64 = (6),
88 				float64 = (7),
89 
90 				bool_false = (8),
91 				bool_true = (9),
92 
93 				blob = (12),
94 				_string = (13)
95 			}
96 
97 			this(VarInt v) pure nothrow {
98 				long _v = v; // this could be an int
99                                              // also the varint handling would not need to swap 8 bytes
100 
101 				if (_v > 11) {
102 					if (_v & 1) {
103 						type = SerialTypeCode.SerialTypeCodeEnum._string;
104 						length = (_v - 13) / 2;
105 					} else {
106 						type = SerialTypeCode.SerialTypeCodeEnum.blob;
107 						length = (_v - 12) / 2;
108 					}
109 				} else {
110 					static immutable uint[12] lengthTbl = [0,  1,  2,  3,
111                                                            4,  6,  8,  8,
112                                                            0,  0, -1, -1
113                                                           ];
114 					type = cast(SerialTypeCodeEnum) _v;
115 					length = lengthTbl[_v];
116 				}
117 
118 			}
119 
120 			SerialTypeCodeEnum type;
121 			long length;
122 		}
123 
124 		union {
125 			byte int8;
126 			short int16;
127 			int int24;
128 			int int32;
129 			long int48;
130 			long int64;
131 			double float64;
132 			const(char)[] _string;
133 			ubyte[] blob;
134 		}
135 
136 		SerialTypeCode typeCode;
137 		alias length = typeCode.length;
138 		alias type = typeCode.type;
139 	}
140 
141 	static struct Table {
142 		TableSchema schema;
143 	//	Row[] rows;
144 	}
145 
146 	static align(1) struct SQLiteHeader {
147 
148 		bool isValid() pure {
149 			return magicString == "SQLite format 3\0";
150 		}
151 
152 	align(1):
153 		// ALL NUMBERS ARE BIGENDIAN
154 		static {
155 			enum TextEncoding : uint {
156 				utf8 = bigEndian(1),
157 				utf16le = bigEndian(2),
158 				utf16be = bigEndian(3)
159 			}
160 
161 			enum SchemaFormat : uint {
162 				_1 = bigEndian(1),
163 				_2 = bigEndian(2),
164 				_3 = bigEndian(3),
165 				_4 = bigEndian(4),
166 			}
167 
168 			enum FileFormat : ubyte {
169 				legacy = 1,
170 				wal = 2,
171 			}
172 
173 			struct Freelist {
174 				BigEndian!uint nextPage;
175 				BigEndian!uint leafPointers; // Number if leafPoinrers;
176 			}
177 
178 		}
179 
180 		char[16] magicString;
181 
182 		BigEndian!ushort pageSize; /// between 512 and 32768 or 1
183 
184 		FileFormat FileFormatWriteVer;
185 		FileFormat FileFormatReadVer;
186 
187 		ubyte reserved; /// unused bytes at the end of each page
188 
189 		immutable ubyte maxEmbeddedPayloadFract = 64;
190 		immutable ubyte minEmbeddedPayloadFract = 32;
191 		immutable ubyte leafPayloadFract = 32;
192 		BigEndian!uint fileChangeCounter;
193 		BigEndian!uint sizeInPages; /// fileChangeCounter has to match validForVersion or sizeInPages is invalid
194 
195 		BigEndian!uint firstFreelistPage; /// Page number of the first freelist trunk page. 
196 		BigEndian!uint freelistPages; /// Total number of freelist Pages;
197 
198 		BigEndian!uint schemaCookie;
199 		SchemaFormat schemaFormatVer; // 1,2,3 or 4
200 
201 		BigEndian!uint defaultCacheSize;
202 		BigEndian!uint largestRootPage; /// used in incermental and auto vacuum modes. 0 otherwise.
203 
204 		TextEncoding textEncoding;
205 
206 		BigEndian!uint userVersion;
207 
208 		BigEndian!uint incrementalVacuum; /// Non-Zero if on, Zero otherwise;
209 
210 		BigEndian!uint applicationId;
211 
212 		BigEndian!uint[5] _reserved; /// Reserved space for future format expansion 
213 
214 		BigEndian!uint validForVersion;
215 		BigEndian!uint sqliteVersion;
216 
217 		static SQLiteHeader fromArray (const ubyte[] raw) pure {
218 			assert(raw.length >= this.sizeof);
219 			if (__ctfe) {
220 				SQLiteHeader result;
221 
222 				result.magicString = cast(char[])raw[0 .. 16];
223 				result.pageSize = raw[16 .. 18];
224 				result.FileFormatWriteVer = cast(FileFormat)raw[18 .. 19][0];
225 				result.FileFormatReadVer = cast(FileFormat)raw[19 .. 20][0];
226 				result.reserved = raw[20 .. 21][0];
227 				// maxEmbeddedPayloadFract ff.
228 				// are immutable values and do not need to be read
229 
230 				//	result.maxEmbeddedPayloadFract = raw[21 .. 22];
231 				//	result.minEmbeddedPayloadFract = raw[22 .. 23];
232 				//	result.leafPayloadFract = raw[23 .. 24]
233 
234 				result.fileChangeCounter = raw[24 .. 28];
235 				result.sizeInPages = raw[28 .. 32];
236 				result.firstFreelistPage = raw[32 .. 36];
237 
238 				return result;
239 			} else {
240 				return *(cast(SQLiteHeader*) raw);
241 			}
242 		}
243 	}
244 
245 	this(string filename, bool readEntirely = true) {
246 		import std.file;
247 
248 		auto data = cast(ubyte[]) read(filename);
249 		this(data, filename);
250 	}
251 	/// If you pass null as buffer a new one will be gc-allocated;
252 	this(const ubyte[] buffer, string filename = ":Memory:") pure {
253 		if (buffer is null) {
254 			ubyte[] myBuffer = new ubyte[](1024);
255 			data = myBuffer;
256 			///TODO write a suitable default header here.
257 		} else {
258 			data = cast(ubyte[])buffer;
259 		}
260 		dbFilename = filename;
261 		_cachedHeader = SQLiteHeader.fromArray(buffer); 
262 		assert(_cachedHeader.magicString[0..6] == "SQLite");
263 	}
264 
265 	/**
266 	 * CREATE TABLE sqlite_master(
267 	 *	type text,
268 	 *	name text,
269 	 *	tbl_name text,
270 	 *	rootpage integer,
271 	 *	sql text
272 	 * );
273 	 *	 
274 	 */
275 	static struct MasterTableSchema {
276 		string type;
277 		string name;
278 		string tbl_name;
279 		uint rootPage;
280 		string sql;
281 	}
282 
283 	static struct TableSchema {
284 		static struct SchemaEntry {
285 			//	uint colNumber;
286 			string columnName;
287 			string TypeName;
288 			string defaultValue;
289 			bool isPrimayKey;
290 			bool notNull;
291 		}
292 	}
293 
294 
295 
296 	static struct BTreePage {
297 	
298 		enum BTreePageType : ubyte {
299 			emptyPage = 0,
300 			indexInteriorPage = 2,
301 			tableInteriorPage = 5,
302 			indexLeafPage = 10,
303 			tableLeafPage = 13
304 		}
305 
306 	
307 		const ubyte[] page;
308 		const uint headerOffset;
309 		
310 		static struct Row {
311 			const PageRange pages;
312 			const uint payloadSize;
313 			const uint rowId;
314 			const uint payloadHeaderSize;
315 			const ubyte[] payloadHeaderBytes;
316 			const ubyte[] payloadStart;
317 			const BTreePageType pageType;
318 
319 			auto column(const uint colNum) pure const {
320 				auto payloadHeader = PayloadHeader(payloadHeaderBytes);
321 				uint offset;
322 				
323 				foreach(_; 0 .. colNum) {
324 					offset += payloadHeader.front().length;
325 					payloadHeader.popFront();
326 				}
327 				
328 				auto typeCode = payloadHeader.front();
329 				uint payloadEnd = cast(uint) (offset + typeCode.length);
330 				
331 				if (payloadStart.length > payloadEnd) {
332 					return extractPayload(payloadStart[offset .. payloadEnd], typeCode);
333 				} else {
334 					auto overflowInfo = OverflowInfo(payloadStart, offset, payloadSize, pages, payloadHeaderSize, pageType);
335 					return extractPayload(&overflowInfo, typeCode, pages);
336 				}
337 			}
338 		}
339 
340 
341 
342 		Row getRow(const ushort cellPointer, const PageRange pages, const BTreePageType pageType) pure const {
343 			uint offset = cellPointer;
344 			import std.algorithm : min;
345 
346 			auto payloadSize = VarInt(page[offset .. $]);
347 			offset += payloadSize.length;
348 
349 			ulong rowId;
350 			if (pageType == BTreePageType.tableLeafPage) {
351 				VarInt rowId_v = VarInt(page[offset .. $]);
352 				rowId = rowId_v;
353 				offset += rowId_v.length;
354 			}
355 			
356 			auto payloadHeaderSize = VarInt(page[offset ..  page.length]);
357 			uint _payloadHeaderSize = cast(uint)payloadHeaderSize;
358 
359 			if (_payloadHeaderSize > page.length - offset) {
360 				assert(0, "Overflowing payloadHeaders are currently not handeled");
361 			}
362 
363 			auto ph = page[offset + payloadHeaderSize.length .. offset + _payloadHeaderSize];
364 			offset += _payloadHeaderSize;
365 			// TODO The payloadHeader does not have to be sliced off here
366 			// We can potentially do better of we pass just one buffer to struct Row and slice there.
367 
368 			return Row(pages, cast(uint) payloadSize, cast(uint) rowId, cast(uint) _payloadHeaderSize , ph, page[offset .. $], pageType);
369 		}
370 
371 		//		string toString(PageRange pages) {
372 		//			import std.conv;
373 		//
374 		//			auto pageType = header.pageType;
375 		//			string result = to!string(pageType);
376 		//
377 		//			auto cellPointers = getCellPointerArray();
378 		//			foreach (cp; cellPointers) {
379 		//				ubyte* printPtr = cast(ubyte*) base + cp;
380 		//
381 		//				final switch (header.pageType) with (
382 		//					BTreePageHeader.BTreePageType) {
383 		//				case emptyPage:
384 		//					result ~= "This page is Empty or the pointer is bogus\n";
385 		//					break;
386 		//
387 		//				case tableLeafPage: {
388 		//						auto singlePagePayloadSize = usablePageSize - 35;
389 		//						result ~= "singlePagePayloadSize : " ~ (
390 		//							singlePagePayloadSize).to!string ~ "\n";
391 		//						auto payloadSize = VarInt(printPtr);
392 		//						result ~= "payloadSize: " ~ (payloadSize).to!string
393 		//							~ "\n";
394 		//						printPtr += payloadSize.length;
395 		//						auto rowid = VarInt(printPtr);
396 		//						result ~= "rowid: " ~ (rowid).to!string ~ "\n";
397 		//						printPtr += rowid.length;
398 		//
399 		//						auto payloadHeaderSize = VarInt(printPtr);
400 		//						result ~= "payloadHeaderSize: " ~ (payloadHeaderSize)
401 		//							.to!string ~ "\n";
402 		//
403 		//						printPtr += payloadHeaderSize.length;
404 		//
405 		//						auto typeCodes = processPayloadHeader(printPtr,
406 		//								payloadHeaderSize);
407 		//						printPtr += payloadHeaderSize - payloadHeaderSize.length;
408 		//
409 		//						import std.algorithm;
410 		//						assert(typeCodes.map!(tc => tc.length).sum == payloadSize - payloadHeaderSize);
411 		//
412 		//						result ~= "{ ";
413 		//						if (payloadSize < singlePagePayloadSize) {
414 		//							foreach (typeCode; typeCodes) {
415 		//								auto p = extractPayload(printPtr, typeCode);
416 		//								result ~= p.apply!(
417 		//									v => "\t\"" ~ to!string(v) ~ "\",\n");
418 		//								printPtr += typeCode.length;
419 		//							}
420 		//						} else {
421 		//							auto overflowInfo = OverflowInfo();
422 		//
423 		//							overflowInfo.remainingTotalPayload = cast(uint)payloadSize;
424 		//							overflowInfo.payloadOnFirstPage = 
425 		//								payloadOnPage(cast(uint)(payloadSize)) - cast(uint)payloadHeaderSize;
426 		//
427 		//							foreach (typeCode; typeCodes) {
428 		//								auto p = extractPayload(&printPtr, typeCode,
429 		//										&overflowInfo, pages);
430 		//								auto str = p.apply!(v => "\t\"" ~ to!string(v) ~ "\",\n");
431 		//								result ~= str;
432 		//
433 		//							}
434 		//						}
435 		//
436 		//						result ~= " }\n";
437 		//
438 		//						printPtr += payloadSize;
439 		//					}
440 		//					break;
441 		//
442 		//				case tableInteriorPage: {
443 		//						BigEndian!uint leftChildPointer = *(
444 		//							cast(uint*) printPtr);
445 		//						result ~= "nextPagePointer: " ~ (leftChildPointer)
446 		//							.to!string ~ "\n";
447 		//						//auto lc = BTreePage(base, usablePageSize, leftChildPointer);
448 		//						//result ~= to!string(lc);
449 		//						printPtr += uint.sizeof;
450 		//						auto integerKey = VarInt(printPtr);
451 		//						result ~= "integerKey: " ~ (integerKey).to!string
452 		//							~ "\n";
453 		//						printPtr += integerKey.length;
454 		//					}
455 		//					
456 		//					break;
457 		//
458 		//				case indexLeafPage: {
459 		//						auto payloadSize = VarInt(cast(ubyte*) printPtr);
460 		//						result ~= "payloadSize: " ~ (payloadSize).to!string
461 		//							~ "\n";
462 		//						printPtr += payloadSize.length;
463 		//						auto payloadHeaderSize = VarInt(cast(ubyte*) printPtr);
464 		//						printPtr += payloadHeaderSize.length;
465 		//						result ~= "payloadHeaderSize: " ~ (payloadHeaderSize)
466 		//							.to!string ~ "\n";
467 		//						auto typeCodes = processPayloadHeader(printPtr,
468 		//								payloadHeaderSize);
469 		//						foreach (typeCode; typeCodes) {
470 		//							result ~= to!string(typeCode) ~ ", ";
471 		//						}
472 		//						result ~= "\n";
473 		//						result ~= "rs-phs : " ~ to!string(
474 		//							payloadSize - payloadHeaderSize) ~ "\n";
475 		//						auto payload = CArray!char.toArray(
476 		//							cast(ubyte*)(printPtr + payloadHeaderSize),
477 		//								payloadSize - payloadHeaderSize);
478 		//						printPtr += payloadSize;
479 		//						result ~= (payload.length) ? (payload).to!string : "";
480 		//						
481 		//					}
482 		//					break;
483 		//
484 		//				case indexInteriorPage: {
485 		//						BigEndian!uint leftChildPointer = 
486 		//							*(cast(uint*) printPtr);
487 		//						result ~= "leftChildPinter: " ~ (leftChildPointer)
488 		//							.to!string ~ "\n";
489 		//						VarInt payloadSize;
490 		//						CArray!ubyte _payload;
491 		//						BigEndian!uint _firstOverflowPage;
492 		//						//assert(0,"No support for indexInteriorPage");
493 		//					}
494 		//					break;
495 		//
496 		//				}
497 		//
498 		//			}
499 		//			return result;
500 		//		}
501 
502 		static align(1) struct BTreePageHeader {
503 		align(1):
504 			//	pure :
505 			BTreePageType _pageType;
506 			BigEndian!ushort firstFreeBlock;
507 			BigEndian!ushort cellsInPage;
508 			BigEndian!ushort startCellContantArea; /// 0 is interpreted as 65536
509 			ubyte fragmentedFreeBytes;
510 			BigEndian!uint _rightmostPointer;
511 
512 			static BTreePageHeader fromArray(const ubyte[] _array) pure {
513 				assert(_array.length >= this.sizeof);
514 				if (__ctfe) {
515 					BTreePageHeader result;
516 
517 					result._pageType = cast(BTreePageType)_array[0];
518 					result.firstFreeBlock = _array[1 .. 3];
519 					result.cellsInPage = _array[3 .. 5];
520 					result.startCellContantArea = _array[5 .. 7];
521 					result.fragmentedFreeBytes = _array[7];
522 					if (result.isInteriorPage) {
523 						result._rightmostPointer = _array[8 .. 12];
524 					}
525 
526 					return result;
527 				} else {
528 					return *(cast (BTreePageHeader*) _array.ptr);
529 				}
530 			}
531 
532 			bool isInteriorPage() pure const {
533 				return (pageType == pageType.indexInteriorPage
534 					|| pageType == pageType.tableInteriorPage);
535 			}
536 
537 			@property auto pageType() const pure {
538 				return cast(const) _pageType;
539 			}
540 
541 			@property BigEndian!uint rightmostPointer() {
542 				assert(isInteriorPage,
543 					"the rightmost pointer is only in interior nodes");
544 				return _rightmostPointer;
545 			}
546 
547 			@property void rightmostPointer(uint rmp) {
548 				assert(isInteriorPage,
549 					"the rightmost pointer is only in interior nodes");
550 				_rightmostPointer = rmp;
551 			}
552 
553 			uint length() pure const {
554 				return 12 - (isInteriorPage ? 0 : 4);
555 			}
556 
557 //			string toString() {
558 //				import std.conv;
559 //
560 //				string result = "pageType:\t";
561 //				result ~= to!string(pageType) ~ "\n";
562 //				result ~= "firstFreeBlock:\t";
563 //				result ~= to!string(firstFreeBlock) ~ "\n";
564 //				result ~= "cellsInPage:\t";
565 //				result ~= to!string(cellsInPage) ~ "\n";
566 //				result ~= "startCellContantArea:\t";
567 //				result ~= to!string(startCellContantArea) ~ "\n";
568 //				result ~= "fragmentedFreeBytes:\t";
569 //				result ~= to!string(fragmentedFreeBytes) ~ "\n";
570 //				if (isInteriorPage) {
571 //					result ~= "_rightmostPointer";
572 //					result ~= to!string(_rightmostPointer) ~ "\n";
573 //				}
574 //
575 //				return result;
576 //			}
577 		}
578 
579 		bool hasPayload() const pure {
580 			final switch (pageType) with (BTreePageType) {
581 				case emptyPage:
582 					return false;
583 				case tableInteriorPage:
584 					return false;
585 				case indexInteriorPage:
586 					return true;
587 				case indexLeafPage:
588 					return true;
589 				case tableLeafPage:
590 					return true;
591 			}
592 		}
593 
594 		auto payloadSize(BigEndian!ushort cp) {
595 			assert(hasPayload);
596 			final switch (pageType) with (BTreePageType) {
597 				case emptyPage:
598 				case tableInteriorPage:
599 					assert(0, "page has no payload");
600 
601 				case indexInteriorPage:
602 					return VarInt(page[cp + uint.sizeof .. $]);
603 				case indexLeafPage:
604 					return VarInt(page[cp .. $]);
605 				case tableLeafPage:
606 					return VarInt(page[cp .. $]);
607 			}
608 		}
609 
610 		BTreePageHeader header() const pure {
611 			return BTreePageHeader.fromArray(page[headerOffset.. headerOffset + BTreePageHeader.sizeof]);
612 		}
613 
614 		BigEndian!ushort[] getCellPointerArray() const pure {
615 			auto offset = header.length + headerOffset;
616 			return page[offset .. offset + header.cellsInPage * ushort.sizeof]
617 				.toArray!(BigEndian!ushort)(header.cellsInPage);
618 		}
619 
620 		BTreePageType pageType() const pure {
621 			return (header()).pageType;
622 		}
623 
624 		struct PayloadHeader {
625 			const ubyte[] payloadHeader;
626 			uint offset;
627 			uint _length;
628 
629 			void popFront() pure {
630 				assert(offset < payloadHeader.length);
631 				offset += _length;
632 				_length = 0;
633 			}
634 
635 			Payload.SerialTypeCode front() pure {
636 				auto v = VarInt(payloadHeader[offset .. $]);
637 				_length = cast(uint)v.length;
638 				return Payload.SerialTypeCode(v);
639 			}
640 
641 			bool empty() pure const {
642 				return offset == payloadHeader.length;
643 			}
644 		}
645 
646 
647 	}
648 }
649 
650 struct OverflowInfo {
651 	const(ubyte)[] pageSlice;
652 	uint nextPageIdx;
653 
654 	this(const ubyte[] payloadStart, int offset, const uint payloadSize, const Database.PageRange pages, const uint payloadHeaderSize, const Database.BTreePageType pageType) pure {
655 		import std.algorithm : min;
656 
657 		uint x;
658 		if (isIndex(pageType)) {
659 			x = ((pages.usablePageSize - 12) * 32 / 255) - 23;
660 		} else {
661 			x = pages.usablePageSize - 35;
662 		}
663 
664 		if (payloadSize > x) {
665 			auto m = ((pages.usablePageSize - 12) * 32 / 255) - 23;
666 			auto k = m + ((payloadSize - m) % (pages.usablePageSize - 4));
667 
668 			auto payloadOnFirstPage = (k <= x ? k : m) - payloadHeaderSize;
669 
670 			nextPageIdx = BigEndian!uint(payloadStart[payloadOnFirstPage .. payloadOnFirstPage + uint.sizeof]);
671 
672 			if(offset > payloadOnFirstPage) {
673 				offset -= payloadOnFirstPage;
674 				gotoNextPage(pages);
675 
676 				auto payloadOnOverflowPage = pages.usablePageSize - uint.sizeof;
677 				while(offset>payloadOnOverflowPage) {
678 					gotoNextPage(pages);
679 					offset -= payloadOnOverflowPage;
680 				}
681 			} else {
682 				pageSlice = payloadStart[0 .. payloadOnFirstPage];
683 			}
684 		} else {
685 			pageSlice = payloadStart;
686 		}
687 		pageSlice = pageSlice[offset .. $];
688 	}
689 
690 	void gotoNextPage(const Database.PageRange pages) pure {
691 		assert(nextPageIdx != 0, "No next Page to go to");
692 		auto nextPage = pages[nextPageIdx - 1];
693 		nextPageIdx = BigEndian!uint(nextPage.page[0 .. uint.sizeof]);
694 		pageSlice = nextPage.page[uint.sizeof .. $];
695 	}
696 
697 }
698 
699 
700 static Database.Payload extractPayload(
701 	OverflowInfo* overflowInfo,
702 	const Database.Payload.SerialTypeCode typeCode,
703 	const Database.PageRange pages,
704 	) pure {
705 
706 	if (overflowInfo.pageSlice.length >= typeCode.length) {
707 		auto _length = cast(uint) typeCode.length;
708 
709 		auto payload = overflowInfo.pageSlice[0 .. _length];
710 		overflowInfo.pageSlice = overflowInfo.pageSlice[_length .. $];
711 
712 		if (overflowInfo.pageSlice.length == uint.sizeof) {
713 			assert(0, "I do not expect us to ever get here\n" ~
714 				"If we ever do, uncomment the two lines below and delete this assert");
715 		//		overflowInfo.nextPageIdx = BigEndian(overflowInfo.pageSlice[0 .. uint.sizeof]);
716 		//		overflowInfo.gotoNextPage(pages);
717 		}
718 
719 		return extractPayload(payload, typeCode);
720 	} else { // typeCode.length > payloadOnFirstPage
721 		alias et = Database.Payload.SerialTypeCode.SerialTypeCodeEnum;
722 		// let's assume SQLite is sane and does not split primitive types in the middle
723 		assert(typeCode.type == et.blob || typeCode.type == et._string);
724 
725 		auto remainingBytesOfPayload = cast(uint) typeCode.length;
726 		ubyte[] _payloadBuffer;
727 
728 		for (;;) {
729 
730 			import std.algorithm : min;
731 
732 			auto readBytes = cast(uint) min(overflowInfo.pageSlice.length,
733 				remainingBytesOfPayload);
734 
735 			remainingBytesOfPayload -= readBytes;
736 
737 			_payloadBuffer ~= overflowInfo.pageSlice[0 .. readBytes];
738 
739 			if (remainingBytesOfPayload == 0) {
740 				assert(typeCode.length == _payloadBuffer.length);
741 
742 				return extractPayload(cast(const)_payloadBuffer, typeCode);
743 			} else {
744 				if (overflowInfo.nextPageIdx == 0 && !remainingBytesOfPayload) {
745 					//This is a very special-case 
746 					//Hopefully we don't hit it :)
747 					assert(0, "Moow!");
748 				}
749 				overflowInfo.gotoNextPage(pages);
750 			}
751 		}
752 	}
753 }
754 
755 static Database.Payload extractPayload(const ubyte[] startPayload,
756 	const Database.Payload.SerialTypeCode typeCode) pure {
757 	Database.Payload p;
758 	p.typeCode = typeCode;
759 	
760 	final switch (typeCode.type) {
761 		case typeof(typeCode).int8:
762 			p.int8 = *cast(byte*) startPayload;
763 			break;
764 		case typeof(typeCode).int16:
765 			p.int16 = *cast(BigEndian!short*) startPayload;
766 			break;
767 		case typeof(typeCode).int24:
768 			p.int24 = (*cast(BigEndian!int*) startPayload) & 0xfff0;
769 			break;
770 		case typeof(typeCode).int32:
771 			p.int32 = *cast(BigEndian!int*) startPayload;
772 			break;
773 		case typeof(typeCode).int48:
774 			p.int48 = (*cast(BigEndian!long*) startPayload) & 0xffffff00;
775 			break;
776 		case typeof(typeCode).int64:
777 			p.int64 = *cast(BigEndian!long*) startPayload;
778 			break;
779 		case typeof(typeCode).float64:
780 			if (!__ctfe) {
781 				p.float64 = *cast(BigEndian!double*) startPayload;
782 				assert(0, "Not supported at runtime either it's BigEndian :)");
783 			} else
784 				assert(0, "not supporeted at CTFE yet");
785 		//	break;
786 		case typeof(typeCode).blob:
787 			p.blob = cast(ubyte[]) startPayload[0 .. cast(uint) typeCode.length];
788 			break;
789 		case typeof(typeCode)._string:
790 			p._string = cast(string) startPayload[0 .. cast(uint) typeCode.length];
791 			break;
792 
793 		case typeof(typeCode).NULL:
794 		case typeof(typeCode).bool_false:
795 		case typeof(typeCode).bool_true:
796 			break;
797 	}
798 
799 	return p;
800 }
801 
802 
803 
804 auto apply(alias handler)(const Database.Payload p) {
805 	final switch (p.typeCode.type) with (typeof(p.typeCode.type)) {
806 		case NULL:
807 			static if (__traits(compiles, handler(null))) {
808 				return handler(null);
809 			} else {
810 				assert(0, "handler cannot take null");
811 			}
812 		case int8:
813 			static if (__traits(compiles, handler(p.int8))) {
814 				return handler(p.int8);
815 			} else {
816 				assert(0);
817 			}
818 		case int16:
819 			static if (__traits(compiles, handler(p.int16))) {
820 				return handler(p.int16);
821 			} else {
822 				assert(0);
823 			}
824 		case int24:
825 			static if (__traits(compiles, handler(p.int24))) {
826 				return handler(p.int24);
827 			} else {
828 				assert(0);
829 			}
830 		case int32:
831 			static if (__traits(compiles, handler(p.int32))) {
832 				return handler(p.int32);
833 			} else {
834 				assert(0);
835 			}
836 		case int48:
837 			static if (__traits(compiles, handler(p.int48))) {
838 				return handler(p.int48);
839 			} else {
840 				assert(0);
841 			}
842 		case int64:
843 			static if (__traits(compiles, handler(p.int64))) {
844 				return handler(p.int64);
845 			} else {
846 				assert(0);
847 			}
848 		case float64:
849 			static if (__traits(compiles, handler(p.float64))) {
850 				return handler(p.float64);
851 			} else {
852 				assert(0);
853 			}
854 		case bool_false:
855 			static if (__traits(compiles, handler(false))) {
856 				return handler(false);
857 			} else {
858 				assert(0);
859 			}
860 		case bool_true:
861 			static if (__traits(compiles, handler(true))) {
862 				return handler(true);
863 			} else {
864 				assert(0);
865 			}
866 		case blob:
867 			static if (__traits(compiles, handler(p.blob))) {
868 				return handler(p.blob);
869 			} else {
870 				assert(0);
871 			}
872 		case _string:
873 			static if (__traits(compiles, handler(p._string))) {
874 				return handler(p._string);
875 			} else {
876 				assert(0,"handler " ~ typeof(handler).stringof ~ " cannot be called with string");
877 			}
878 	}
879 }
880 pure :
881 auto getAs(T)(Database.Payload p) {
882 	return p.apply!(v => cast(T) v);
883 }
884 
885 auto getAs(T)(Database.Row r, uint columnIndex) {
886 	return r.column(columnIndex).getAs!T();
887 }
888 
889 auto getAs(T)(Database.Row r, Database.TableSchema s, string colName) {
890 	return r.getAs!T(s.getcolumnIndex(colName));
891 }
892 
893 unittest {
894 	Database.Payload p;
895 	p.typeCode.type = Database.Payload.SerialTypeCodeEnum.bool_true;
896 	assert(p.getAs!(int) == 1);
897 	p.typeCode.type = Database.Payload.SerialTypeCodeEnum.bool_false;
898 	assert(p.getAs!(int) == 0);
899 }
900 
901 bool isIndex(const Database.BTreePage.BTreePageType pageType) pure {
902 	return ((pageType & 2) ^ 2) == 0;
903 }
904 
905 static assert(isIndex(Database.BTreePageType.indexLeafPage) && isIndex(Database.BTreePageType.indexInteriorPage) && !isIndex(Database.BTreePageType.tableLeafPage));