1 module sqlited;
2 
3 //          Copyright Stefan Koch 2015 - 2018.
4 // Distributed under the Boost Software License, Version 1.0.
5 //    (See accompanying file LICENSE.md or copy at
6 //          http://www.boost.org/LICENSE_1_0.txt)
7 
8 /****************************
9  * SQLite-D SQLite 3 Reader *
10  * By Stefan Koch 2016      *
11  ****************************/
12 
13 import utils;
14 import varint;
15 
16 struct Database {
17 	string dbFilename;
18 	const ubyte[] data;
19 	alias Row = BTreePage.Row;
20 	alias BTreePageType = BTreePage.BTreePageType;
21 
22 	SQLiteHeader _cachedHeader;
23 
24 	SQLiteHeader header() pure const {
25 		return _cachedHeader;
26 	}
27 
28 	BTreePage rootPage() pure const {
29 		return pages[0];
30 	}
31 
32 	PageRange pages() pure const {
33 		return PageRange(cast(const)data, header.pageSize,
34 			usablePageSize, cast(const uint)(data.length / header.pageSize));
35 	}
36 
37 	const(uint) usablePageSize() pure const {
38 		return header.pageSize - header.reserved;
39 	}
40 
41 	struct PageRange {
42 		const ubyte[] data;
43 		const uint pageSize;
44 		const uint usablePageSize;
45 
46 		const uint numberOfPages;
47 	pure :
48 		@property uint length() const {
49 			return numberOfPages;
50 		}
51 
52 		BTreePage opIndex(const uint pageNumber) pure const {
53 			assert(pageNumber <= numberOfPages,
54 				"Attempting to go to invalid page");
55 
56 			const size_t pageBegin = 
57 				(pageSize * pageNumber);
58 			const size_t pageEnd = 
59 				(pageSize * pageNumber) + usablePageSize;
60 
61 			return BTreePage(data[pageBegin .. pageEnd], pageNumber ? 0 : 100);
62 		}
63 
64 		this(const ubyte[] data, const uint pageSize,
65 			const uint usablePageSize, const uint numberOfPages) pure {
66 			this.data = data;
67 			this.pageSize = pageSize;
68 			this.usablePageSize = usablePageSize;
69 			this.numberOfPages = numberOfPages;
70 			assert(usablePageSize >= 512);
71 		}
72 
73 	}
74 
75 	static struct Payload {
76 		alias SerialTypeCodeEnum = SerialTypeCode.SerialTypeCodeEnum;
77 		static struct SerialTypeCode {
78 			alias SerialTypeCodeEnum this;
79 
80 			static enum SerialTypeCodeEnum : long {
81 				NULL = (0),
82 				int8 = (1),
83 				int16 = (2),
84 				int24 = (3),
85 				int32 = (4),
86 				int48 = (5),
87 				int64 = (6),
88 				float64 = (7),
89 
90 				bool_false = (8),
91 				bool_true = (9),
92 
93 				blob = (12),
94 				_string = (13)
95 			}
96 
97 			this(VarInt v) pure nothrow {
98 				long _v = v;
99 				if (_v > 11) {
100 					if (_v & 1) {
101 						type = SerialTypeCode.SerialTypeCodeEnum._string;
102 						length = (_v - 13) / 2;
103 					} else {
104 						type = SerialTypeCode.SerialTypeCodeEnum.blob;
105 						length = (_v - 12) / 2;
106 					}
107 				} else {
108 					type = cast(SerialTypeCodeEnum) _v;
109 					final switch (type) with (SerialTypeCodeEnum) {
110 						case NULL:
111 							length = 0;
112 							break;
113 						case int8:
114 							length = 1;
115 							break;
116 						case int16:
117 							length = 2;
118 							break;
119 						case int24:
120 							length = 3;
121 							break;
122 						case int32:
123 							length = 4;
124 							break;
125 						case int48:
126 							length = 6;
127 							break;
128 						case int64:
129 							length = 8;
130 							break;
131 						case float64:
132 							length = 8;
133 							break;
134 						case bool_false:
135 							length = 0;
136 							break;
137 						case bool_true:
138 							length = 0;
139 							break;
140 						case blob:
141 							assert(0, "SerialType Blob needs an explicit size");
142 						case _string:
143 							assert(0, "SerialType String needs an explicit size");
144 					}
145 				}
146 			}
147 
148 			SerialTypeCodeEnum type;
149 			long length;
150 		}
151 
152 		union {
153 			byte int8;
154 			short int16;
155 			int int24;
156 			int int32;
157 			long int48;
158 			long int64;
159 			double float64;
160 			const(char)[] _string;
161 			ubyte[] blob;
162 		}
163 
164 		SerialTypeCode typeCode;
165 		alias length = typeCode.length;
166 		alias type = typeCode.type;
167 	}
168 
169 	static struct Table {
170 		TableSchema schema;
171 	//	Row[] rows;
172 	}
173 
174 	static align(1) struct SQLiteHeader {
175 
176 		bool isValid() pure {
177 			return magicString == "SQLite format 3\0";
178 		}
179 
180 	align(1):
181 		// ALL NUMBERS ARE BIGENDIAN
182 		static {
183 			enum TextEncoding : uint {
184 				utf8 = bigEndian(1),
185 				utf16le = bigEndian(2),
186 				utf16be = bigEndian(3)
187 			}
188 
189 			enum SchemaFormat : uint {
190 				_1 = bigEndian(1),
191 				_2 = bigEndian(2),
192 				_3 = bigEndian(3),
193 				_4 = bigEndian(4),
194 			}
195 
196 			enum FileFormat : ubyte {
197 				legacy = 1,
198 				wal = 2,
199 			}
200 
201 			struct Freelist {
202 				BigEndian!uint nextPage;
203 				BigEndian!uint leafPointers; // Number if leafPoinrers;
204 			}
205 
206 		}
207 
208 		char[16] magicString;
209 
210 		BigEndian!ushort pageSize; /// between 512 and 32768 or 1
211 
212 		FileFormat FileFormatWriteVer;
213 		FileFormat FileFormatReadVer;
214 
215 		ubyte reserved; /// unused bytes at the end of each page
216 
217 		immutable ubyte maxEmbeddedPayloadFract = 64;
218 		immutable ubyte minEmbeddedPayloadFract = 32;
219 		immutable ubyte leafPayloadFract = 32;
220 		BigEndian!uint fileChangeCounter;
221 		BigEndian!uint sizeInPages; /// fileChangeCounter has to match validForVersion or sizeInPages is invalid
222 
223 		BigEndian!uint firstFreelistPage; /// Page number of the first freelist trunk page. 
224 		BigEndian!uint freelistPages; /// Total number of freelist Pages;
225 
226 		BigEndian!uint schemaCookie;
227 		SchemaFormat schemaFormatVer; // 1,2,3 or 4
228 
229 		BigEndian!uint defaultCacheSize;
230 		BigEndian!uint largestRootPage; /// used in incermental and auto vacuum modes. 0 otherwise.
231 
232 		TextEncoding textEncoding;
233 
234 		BigEndian!uint userVersion;
235 
236 		BigEndian!uint incrementalVacuum; /// Non-Zero if on, Zero otherwise;
237 
238 		BigEndian!uint applicationId;
239 
240 		BigEndian!uint[5] _reserved; /// Reserved space for future format expansion 
241 
242 		BigEndian!uint validForVersion;
243 		BigEndian!uint sqliteVersion;
244 
245 		static SQLiteHeader fromArray (const ubyte[] raw) pure {
246 			assert(raw.length >= this.sizeof);
247 			if (__ctfe) {
248 				SQLiteHeader result;
249 
250 				result.magicString = cast(char[])raw[0 .. 16];
251 				result.pageSize = raw[16 .. 18];
252 				result.FileFormatWriteVer = cast(FileFormat)raw[18 .. 19][0];
253 				result.FileFormatReadVer = cast(FileFormat)raw[19 .. 20][0];
254 				result.reserved = raw[20 .. 21][0];
255 				// maxEmbeddedPayloadFract ff.
256 				// are immutable values and do not need to be read
257 
258 				//	result.maxEmbeddedPayloadFract = raw[21 .. 22];
259 				//	result.minEmbeddedPayloadFract = raw[22 .. 23];
260 				//	result.leafPayloadFract = raw[23 .. 24]
261 
262 				result.fileChangeCounter = raw[24 .. 28];
263 				result.sizeInPages = raw[28 .. 32];
264 				result.firstFreelistPage = raw[32 .. 36];
265 
266 				return result;
267 			} else {
268 				return *(cast(SQLiteHeader*) raw);
269 			}
270 		}
271 	}
272 
273 	this(string filename, bool readEntirely = true) {
274 		import std.file;
275 
276 		auto data = cast(ubyte[]) read(filename);
277 		this(data, filename);
278 	}
279 	/// If you pass null as buffer a new one will be gc-allocated;
280 	this(const ubyte[] buffer, string filename = ":Memory:") pure {
281 		if (buffer is null) {
282 			ubyte[] myBuffer = new ubyte[](1024);
283 			data = myBuffer;
284 			///TODO write a suitable default header here.
285 		} else {
286 			data = cast(ubyte[])buffer;
287 		}
288 		dbFilename = filename;
289 		_cachedHeader = SQLiteHeader.fromArray(buffer); 
290 		assert(_cachedHeader.magicString[0..6] == "SQLite");
291 	}
292 
293 	/**
294 	 * CREATE TABLE sqlite_master(
295 	 *	type text,
296 	 *	name text,
297 	 *	tbl_name text,
298 	 *	rootpage integer,
299 	 *	sql text
300 	 * );
301 	 *	 
302 	 */
303 	static struct MasterTableSchema {
304 		string type;
305 		string name;
306 		string tbl_name;
307 		uint rootPage;
308 		string sql;
309 	}
310 
311 	static struct TableSchema {
312 		static struct SchemaEntry {
313 			//	uint colNumber;
314 			string columnName;
315 			string TypeName;
316 			string defaultValue;
317 			bool isPrimayKey;
318 			bool notNull;
319 		}
320 	}
321 
322 
323 
324 	static struct BTreePage {
325 	
326 		enum BTreePageType : ubyte {
327 			emptyPage = 0,
328 			indexInteriorPage = 2,
329 			tableInteriorPage = 5,
330 			indexLeafPage = 10,
331 			tableLeafPage = 13
332 		}
333 
334 	
335 		const ubyte[] page;
336 		const uint headerOffset;
337 		
338 		static struct Row {
339 			const PageRange pages;
340 			const uint payloadSize;
341 			const uint rowId;
342 			const uint payloadHeaderSize;
343 			const ubyte[] payloadHeaderBytes;
344 			const ubyte[] payloadStart;
345 			const BTreePageType pageType;
346 
347 			auto column(const uint colNum) pure const {
348 				auto payloadHeader = PayloadHeader(payloadHeaderBytes);
349 				uint offset;
350 				
351 				foreach(_; 0 .. colNum) {
352 					offset += payloadHeader.front().length;
353 					payloadHeader.popFront();
354 				}
355 				
356 				auto typeCode = payloadHeader.front();
357 				uint payloadEnd = cast(uint) (offset + typeCode.length);
358 				
359 				if (payloadStart.length > payloadEnd) {
360 					return extractPayload(payloadStart[offset .. payloadEnd], typeCode);
361 				} else {
362 					auto overflowInfo = OverflowInfo(payloadStart, offset, payloadSize, pages, payloadHeaderSize, pageType);
363 					return extractPayload(&overflowInfo, typeCode, pages);
364 				}
365 			}
366 		}
367 
368 
369 
370 		Row getRow(const ushort cellPointer, const PageRange pages, const BTreePageType pageType) pure const {
371 			uint offset = cellPointer;
372 			import std.algorithm : min;
373 
374 			auto payloadSize = VarInt(page[offset .. $]);
375 			offset += payloadSize.length;
376 
377 			ulong rowId;
378 			if (pageType == BTreePageType.tableLeafPage) {
379 				VarInt rowId_v = VarInt(page[offset .. $]);
380 				rowId = rowId_v;
381 				offset += rowId_v.length;
382 			}
383 			
384 			auto payloadHeaderSize = VarInt(page[offset ..  page.length]);
385 			uint _payloadHeaderSize = cast(uint)payloadHeaderSize;
386 
387 			if (_payloadHeaderSize > page.length - offset) {
388 				assert(0, "Overflowing payloadHeaders are currently not handeled");
389 			}
390 
391 			auto ph = page[offset + payloadHeaderSize.length .. offset + _payloadHeaderSize];
392 			offset += _payloadHeaderSize;
393 			// TODO The payloadHeader does not have to be sliced off here
394 			// We can potentially do better of we pass just one buffer to struct Row and slice there.
395 
396 			return Row(pages, cast(uint) payloadSize, cast(uint) rowId, cast(uint) _payloadHeaderSize , ph, page[offset .. $], pageType);
397 		}
398 
399 		//		string toString(PageRange pages) {
400 		//			import std.conv;
401 		//
402 		//			auto pageType = header.pageType;
403 		//			string result = to!string(pageType);
404 		//
405 		//			auto cellPointers = getCellPointerArray();
406 		//			foreach (cp; cellPointers) {
407 		//				ubyte* printPtr = cast(ubyte*) base + cp;
408 		//
409 		//				final switch (header.pageType) with (
410 		//					BTreePageHeader.BTreePageType) {
411 		//				case emptyPage:
412 		//					result ~= "This page is Empty or the pointer is bogus\n";
413 		//					break;
414 		//
415 		//				case tableLeafPage: {
416 		//						auto singlePagePayloadSize = usablePageSize - 35;
417 		//						result ~= "singlePagePayloadSize : " ~ (
418 		//							singlePagePayloadSize).to!string ~ "\n";
419 		//						auto payloadSize = VarInt(printPtr);
420 		//						result ~= "payloadSize: " ~ (payloadSize).to!string
421 		//							~ "\n";
422 		//						printPtr += payloadSize.length;
423 		//						auto rowid = VarInt(printPtr);
424 		//						result ~= "rowid: " ~ (rowid).to!string ~ "\n";
425 		//						printPtr += rowid.length;
426 		//
427 		//						auto payloadHeaderSize = VarInt(printPtr);
428 		//						result ~= "payloadHeaderSize: " ~ (payloadHeaderSize)
429 		//							.to!string ~ "\n";
430 		//
431 		//						printPtr += payloadHeaderSize.length;
432 		//
433 		//						auto typeCodes = processPayloadHeader(printPtr,
434 		//								payloadHeaderSize);
435 		//						printPtr += payloadHeaderSize - payloadHeaderSize.length;
436 		//
437 		//						import std.algorithm;
438 		//						assert(typeCodes.map!(tc => tc.length).sum == payloadSize - payloadHeaderSize);
439 		//
440 		//						result ~= "{ ";
441 		//						if (payloadSize < singlePagePayloadSize) {
442 		//							foreach (typeCode; typeCodes) {
443 		//								auto p = extractPayload(printPtr, typeCode);
444 		//								result ~= p.apply!(
445 		//									v => "\t\"" ~ to!string(v) ~ "\",\n");
446 		//								printPtr += typeCode.length;
447 		//							}
448 		//						} else {
449 		//							auto overflowInfo = OverflowInfo();
450 		//
451 		//							overflowInfo.remainingTotalPayload = cast(uint)payloadSize;
452 		//							overflowInfo.payloadOnFirstPage = 
453 		//								payloadOnPage(cast(uint)(payloadSize)) - cast(uint)payloadHeaderSize;
454 		//
455 		//							foreach (typeCode; typeCodes) {
456 		//								auto p = extractPayload(&printPtr, typeCode,
457 		//										&overflowInfo, pages);
458 		//								auto str = p.apply!(v => "\t\"" ~ to!string(v) ~ "\",\n");
459 		//								result ~= str;
460 		//
461 		//							}
462 		//						}
463 		//
464 		//						result ~= " }\n";
465 		//
466 		//						printPtr += payloadSize;
467 		//					}
468 		//					break;
469 		//
470 		//				case tableInteriorPage: {
471 		//						BigEndian!uint leftChildPointer = *(
472 		//							cast(uint*) printPtr);
473 		//						result ~= "nextPagePointer: " ~ (leftChildPointer)
474 		//							.to!string ~ "\n";
475 		//						//auto lc = BTreePage(base, usablePageSize, leftChildPointer);
476 		//						//result ~= to!string(lc);
477 		//						printPtr += uint.sizeof;
478 		//						auto integerKey = VarInt(printPtr);
479 		//						result ~= "integerKey: " ~ (integerKey).to!string
480 		//							~ "\n";
481 		//						printPtr += integerKey.length;
482 		//					}
483 		//					
484 		//					break;
485 		//
486 		//				case indexLeafPage: {
487 		//						auto payloadSize = VarInt(cast(ubyte*) printPtr);
488 		//						result ~= "payloadSize: " ~ (payloadSize).to!string
489 		//							~ "\n";
490 		//						printPtr += payloadSize.length;
491 		//						auto payloadHeaderSize = VarInt(cast(ubyte*) printPtr);
492 		//						printPtr += payloadHeaderSize.length;
493 		//						result ~= "payloadHeaderSize: " ~ (payloadHeaderSize)
494 		//							.to!string ~ "\n";
495 		//						auto typeCodes = processPayloadHeader(printPtr,
496 		//								payloadHeaderSize);
497 		//						foreach (typeCode; typeCodes) {
498 		//							result ~= to!string(typeCode) ~ ", ";
499 		//						}
500 		//						result ~= "\n";
501 		//						result ~= "rs-phs : " ~ to!string(
502 		//							payloadSize - payloadHeaderSize) ~ "\n";
503 		//						auto payload = CArray!char.toArray(
504 		//							cast(ubyte*)(printPtr + payloadHeaderSize),
505 		//								payloadSize - payloadHeaderSize);
506 		//						printPtr += payloadSize;
507 		//						result ~= (payload.length) ? (payload).to!string : "";
508 		//						
509 		//					}
510 		//					break;
511 		//
512 		//				case indexInteriorPage: {
513 		//						BigEndian!uint leftChildPointer = 
514 		//							*(cast(uint*) printPtr);
515 		//						result ~= "leftChildPinter: " ~ (leftChildPointer)
516 		//							.to!string ~ "\n";
517 		//						VarInt payloadSize;
518 		//						CArray!ubyte _payload;
519 		//						BigEndian!uint _firstOverflowPage;
520 		//						//assert(0,"No support for indexInteriorPage");
521 		//					}
522 		//					break;
523 		//
524 		//				}
525 		//
526 		//			}
527 		//			return result;
528 		//		}
529 
530 		static align(1) struct BTreePageHeader {
531 		align(1):
532 			//	pure :
533 			BTreePageType _pageType;
534 			BigEndian!ushort firstFreeBlock;
535 			BigEndian!ushort cellsInPage;
536 			BigEndian!ushort startCellContantArea; /// 0 is interpreted as 65536
537 			ubyte fragmentedFreeBytes;
538 			BigEndian!uint _rightmostPointer;
539 
540 			static BTreePageHeader fromArray(const ubyte[] _array) pure {
541 				assert(_array.length >= this.sizeof);
542 				if (__ctfe) {
543 					BTreePageHeader result;
544 
545 					result._pageType = cast(BTreePageType)_array[0];
546 					result.firstFreeBlock = _array[1 .. 3]; 
547 					result.cellsInPage = _array[3 .. 5];
548 					result.startCellContantArea = _array[5 .. 7];
549 					result.fragmentedFreeBytes = _array[7];
550 					if (result.isInteriorPage) {
551 						result._rightmostPointer = _array[8 .. 12];
552 					}
553 
554 					return result;
555 				} else {
556 					return *(cast (BTreePageHeader*) _array.ptr);
557 				}
558 			}
559 
560 			bool isInteriorPage() pure const {
561 				return (pageType == pageType.indexInteriorPage
562 					|| pageType == pageType.tableInteriorPage);
563 			}
564 
565 			@property auto pageType() const pure {
566 				return cast(const) _pageType;
567 			}
568 
569 			@property BigEndian!uint rightmostPointer() {
570 				assert(isInteriorPage,
571 					"the rightmost pointer is only in interior nodes");
572 				return _rightmostPointer;
573 			}
574 
575 			@property void rightmostPointer(uint rmp) {
576 				assert(isInteriorPage,
577 					"the rightmost pointer is only in interior nodes");
578 				_rightmostPointer = rmp;
579 			}
580 
581 			uint length() pure const {
582 				return 12 - (isInteriorPage ? 0 : 4);
583 			}
584 
585 //			string toString() {
586 //				import std.conv;
587 //
588 //				string result = "pageType:\t";
589 //				result ~= to!string(pageType) ~ "\n";
590 //				result ~= "firstFreeBlock:\t";
591 //				result ~= to!string(firstFreeBlock) ~ "\n";
592 //				result ~= "cellsInPage:\t";
593 //				result ~= to!string(cellsInPage) ~ "\n";
594 //				result ~= "startCellContantArea:\t";
595 //				result ~= to!string(startCellContantArea) ~ "\n";
596 //				result ~= "fragmentedFreeBytes:\t";
597 //				result ~= to!string(fragmentedFreeBytes) ~ "\n";
598 //				if (isInteriorPage) {
599 //					result ~= "_rightmostPointer";
600 //					result ~= to!string(_rightmostPointer) ~ "\n";
601 //				}
602 //
603 //				return result;
604 //			}
605 		}
606 
607 		bool hasPayload() const pure {
608 			final switch (pageType) with (BTreePageType) {
609 				case emptyPage:
610 					return false;
611 				case tableInteriorPage:
612 					return false;
613 				case indexInteriorPage:
614 					return true;
615 				case indexLeafPage:
616 					return true;
617 				case tableLeafPage:
618 					return true;
619 			}
620 		}
621 
622 		auto payloadSize(BigEndian!ushort cp) {
623 			assert(hasPayload);
624 			final switch (pageType) with (BTreePageType) {
625 				case emptyPage:
626 				case tableInteriorPage:
627 					assert(0, "page has no payload");
628 					
629 				case indexInteriorPage:
630 					return VarInt(page[cp + uint.sizeof .. $]);
631 				case indexLeafPage:
632 					return VarInt(page[cp .. $]);
633 				case tableLeafPage:
634 					return VarInt(page[cp .. $]);
635 			}
636 		}
637 
638 		BTreePageHeader header() const pure {
639 			return BTreePageHeader.fromArray(page[headerOffset.. headerOffset + BTreePageHeader.sizeof]);
640 		}
641 
642 		BigEndian!ushort[] getCellPointerArray() const pure {
643 			auto offset = header.length + headerOffset;
644 			return page[offset .. offset + header.cellsInPage * ushort.sizeof]
645 				.toArray!(BigEndian!ushort)(header.cellsInPage);
646 		}
647 
648 		BTreePageType pageType() const pure {
649 			return (header()).pageType;
650 		}
651 
652 		struct PayloadHeader {
653 			const ubyte[] payloadHeader;
654 			uint offset;
655 			uint _length;
656 			 
657 			void popFront() pure {
658 				assert(offset < payloadHeader.length);
659 				offset += _length;
660 				_length = 0;
661 			}
662 
663 			Payload.SerialTypeCode front() pure {
664 				auto v = VarInt(payloadHeader[offset .. $]);
665 				_length = cast(uint)v.length;
666 				return Payload.SerialTypeCode(v);
667 			}
668 
669 			bool empty() pure const {
670 				return offset == payloadHeader.length;
671 			}
672 		}
673 
674 
675 	}
676 }
677 
678 struct OverflowInfo {
679 	const(ubyte)[] pageSlice;
680 	uint nextPageIdx;
681 
682 	this(const ubyte[] payloadStart, int offset, const uint payloadSize, const Database.PageRange pages, const uint payloadHeaderSize, const Database.BTreePageType pageType) pure {
683 		import std.algorithm : min;
684 		
685 		uint x;
686 		if (isIndex(pageType)) {
687 			x = ((pages.usablePageSize - 12) * 32 / 255) - 23;
688 		} else {
689 			x = pages.usablePageSize - 35; 
690 		}
691 		
692 		if (payloadSize > x) {
693 			auto m = ((pages.usablePageSize - 12) * 32 / 255) - 23;
694 			auto k = m + ((payloadSize - m) % (pages.usablePageSize - 4));
695 
696 			auto payloadOnFirstPage = (k <= x ? k : m) - payloadHeaderSize;
697 	
698 			nextPageIdx = BigEndian!uint(payloadStart[payloadOnFirstPage .. payloadOnFirstPage + uint.sizeof]);
699 		
700 			if(offset > payloadOnFirstPage) {
701 				offset -= payloadOnFirstPage;
702 				gotoNextPage(pages);
703 
704 				auto payloadOnOverflowPage = pages.usablePageSize - uint.sizeof;
705 				while(offset>payloadOnOverflowPage) {
706 					gotoNextPage(pages);
707 					offset -= payloadOnOverflowPage;
708 				}
709 			} else {
710 				pageSlice = payloadStart[0 .. payloadOnFirstPage];
711 			}
712 		} else {
713 			pageSlice = payloadStart;
714 		}
715 		pageSlice = pageSlice[offset .. $];
716 	}
717 
718 	void gotoNextPage(const Database.PageRange pages) pure {
719 		assert(nextPageIdx != 0, "No next Page to go to");
720 		auto nextPage = pages[nextPageIdx - 1];
721 		nextPageIdx = BigEndian!uint(nextPage.page[0 .. uint.sizeof]);
722 		pageSlice = nextPage.page[uint.sizeof .. $];
723 	}
724 
725 }
726 
727 
728 static Database.Payload extractPayload(
729 	OverflowInfo* overflowInfo,
730 	const Database.Payload.SerialTypeCode typeCode,
731 	const Database.PageRange pages,
732 	) pure {
733 	
734 
735 	if (overflowInfo.pageSlice.length >= typeCode.length) {
736 		auto _length = cast(uint) typeCode.length;
737 
738 		auto payload = overflowInfo.pageSlice[0 .. _length];
739 		overflowInfo.pageSlice = overflowInfo.pageSlice[_length .. $];
740 
741 		if (overflowInfo.pageSlice.length == uint.sizeof) {
742 			assert(0, "I do not expect us to ever get here\n"
743 				"If we ever do, uncomment the two lines below and delete this assert");
744 		//		overflowInfo.nextPageIdx = BigEndian(overflowInfo.pageSlice[0 .. uint.sizeof]);
745 		//		overflowInfo.gotoNextPage(pages);
746 		}
747 
748 		return extractPayload(payload, typeCode);
749 	} else { // typeCode.length > payloadOnFirstPage
750 		alias et = Database.Payload.SerialTypeCode.SerialTypeCodeEnum;
751 		// let's assume SQLite is sane and does not split primitive types in the middle
752 		assert(typeCode.type == et.blob || typeCode.type == et._string);
753 		
754 		auto remainingBytesOfPayload = cast(uint) typeCode.length;
755 		ubyte[] _payloadBuffer;
756 		
757 		for (;;) {
758 
759 			import std.algorithm : min;
760 			
761 			auto readBytes = cast(uint) min(overflowInfo.pageSlice.length,
762 				remainingBytesOfPayload);
763 
764 			remainingBytesOfPayload -= readBytes;
765 
766 			_payloadBuffer ~= overflowInfo.pageSlice[0 .. readBytes];
767 
768 			if (remainingBytesOfPayload == 0) {
769 				assert(typeCode.length == _payloadBuffer.length);
770 
771 				return extractPayload(cast(const)_payloadBuffer, typeCode);
772 			} else {
773 				if (overflowInfo.nextPageIdx == 0 && !remainingBytesOfPayload) {
774 					//This is a very special-case 
775 					//Hopefully we don't hit it :)
776 					assert(0, "Moow!");
777 				}
778 				overflowInfo.gotoNextPage(pages);
779 			}
780 		}
781 	}
782 }
783 
784 static Database.Payload extractPayload(const ubyte[] startPayload,
785 	const Database.Payload.SerialTypeCode typeCode) pure {
786 	Database.Payload p;
787 	p.typeCode = typeCode;
788 	
789 	final switch (typeCode.type) {
790 		case typeof(typeCode).int8:
791 			p.int8 = *cast(byte*) startPayload;
792 			break;
793 		case typeof(typeCode).int16:
794 			p.int16 = *cast(BigEndian!short*) startPayload;
795 			break;
796 		case typeof(typeCode).int24:
797 			p.int24 = (*cast(BigEndian!int*) startPayload) & 0xfff0;
798 			break;
799 		case typeof(typeCode).int32:
800 			p.int32 = *cast(BigEndian!int*) startPayload;
801 			break;
802 		case typeof(typeCode).int48:
803 			p.int48 = (*cast(BigEndian!long*) startPayload) & 0xffffff00;
804 			break;
805 		case typeof(typeCode).int64:
806 			p.int64 = *cast(BigEndian!long*) startPayload;
807 			break;
808 		case typeof(typeCode).float64:
809 			if (!__ctfe) {
810 				p.float64 = *cast(double*) startPayload;
811 				assert(0, "Not supported at runtime either it's BigEndian :)");
812 			} else
813 				assert(0, "not supporeted at CTFE yet");
814 		//	break;
815 		case typeof(typeCode).blob:
816 			p.blob = cast(ubyte[]) startPayload[0 .. cast(uint) typeCode.length];
817 			break;
818 		case typeof(typeCode)._string:
819 			p._string = cast(string) startPayload[0 .. cast(uint) typeCode.length];
820 			break;
821 			
822 		case typeof(typeCode).NULL:
823 		case typeof(typeCode).bool_false:
824 		case typeof(typeCode).bool_true:
825 			break;
826 	}
827 	
828 	return p;
829 }
830 
831 
832 
833 auto apply(alias handler)(const Database.Payload p) {
834 	final switch (p.typeCode.type) with (typeof(p.typeCode.type)) {
835 		case NULL:
836 			static if (__traits(compiles, handler(null))) {
837 				return handler(null);
838 			} else {
839 				assert(0, "handler cannot take null");
840 			}
841 		case int8:
842 			static if (__traits(compiles, handler(p.int8))) {
843 				return handler(p.int8);
844 			} else {
845 				assert(0);
846 			}
847 		case int16:
848 			static if (__traits(compiles, handler(p.int16))) {
849 				return handler(p.int16);
850 			} else {
851 				assert(0);
852 			}
853 		case int24:
854 			static if (__traits(compiles, handler(p.int24))) {
855 				return handler(p.int24);
856 			} else {
857 				assert(0);
858 			}
859 		case int32:
860 			static if (__traits(compiles, handler(p.int32))) {
861 				return handler(p.int32);
862 			} else {
863 				assert(0);
864 			}
865 		case int48:
866 			static if (__traits(compiles, handler(p.int48))) {
867 				return handler(p.int48);
868 			} else {
869 				assert(0);
870 			}
871 		case int64:
872 			static if (__traits(compiles, handler(p.int64))) {
873 				return handler(p.int64);
874 			} else {
875 				assert(0);
876 			}
877 		case float64:
878 			static if (__traits(compiles, handler(p.float64))) {
879 				return handler(p.float64);
880 			} else {
881 				assert(0);
882 			}
883 		case bool_false:
884 			static if (__traits(compiles, handler(false))) {
885 				return handler(false);
886 			} else {
887 				assert(0);
888 			}
889 		case bool_true:
890 			static if (__traits(compiles, handler(true))) {
891 				return handler(true);
892 			} else {
893 				assert(0);
894 			}
895 		case blob:
896 			static if (__traits(compiles, handler(p.blob))) {
897 				return handler(p.blob);
898 			} else {
899 				assert(0);
900 			}
901 		case _string:
902 			static if (__traits(compiles, handler(p._string))) {
903 				return handler(p._string);
904 			} else {
905 				assert(0,"handler " ~ typeof(handler).stringof ~ " cannot be called with string");
906 			}
907 	}
908 }
909 pure :
910 auto getAs(T)(Database.Payload p) {
911 	return p.apply!(v => cast(T) v);
912 }
913 
914 auto getAs(T)(Database.Row r, uint columnIndex) {
915 	return r.column(columnIndex).getAs!T();
916 }
917 
918 auto getAs(T)(Database.Row r, Database.TableSchema s, string colName) {
919 	return r.getAs!T(s.getcolumnIndex(colName));
920 }
921 
922 unittest {
923 	Database.Payload p;
924 	p.typeCode.type = Database.Payload.SerialTypeCodeEnum.bool_true;
925 	assert(p.getAs!(int) == 1);
926 	p.typeCode.type = Database.Payload.SerialTypeCodeEnum.bool_false;
927 	assert(p.getAs!(int) == 0);
928 }
929 
930 bool isIndex(const Database.BTreePage.BTreePageType pageType) pure {
931 	return ((pageType & 2) ^ 2) == 0; 
932 }
933 
934 static assert(isIndex(Database.BTreePageType.indexLeafPage) && isIndex(Database.BTreePageType.indexInteriorPage) && !isIndex(Database.BTreePageType.tableLeafPage));