1 module sqlited;
2 
3 //          Copyright Stefan Koch 2015 - 2018.
4 // Distributed under the Boost Software License, Version 1.0.
5 //    (See accompanying file LICENSE.md or copy at
6 //          http://www.boost.org/LICENSE_1_0.txt)
7 
8 /****************************
9  * SQLite-D SQLite 3 Reader *
10  * By Stefan Koch 2016      *
11  ****************************/
12 
13 import utils;
14 import varint;
15 
16 struct Database {
17 	string dbFilename;
18 	const ubyte[] data;
19 	alias Row = BTreePage.Row;
20 	alias BTreePageType = BTreePage.BTreePageType;
21 
22 	SQLiteHeader _cachedHeader;
23 
24 	SQLiteHeader header() pure const {
25 		return _cachedHeader;
26 	}
27 
28 	BTreePage rootPage() pure const {
29 		return pages[0];
30 	}
31 
32 	PageRange pages() pure const {
33 		return PageRange(cast(const)data, header.pageSize,
34 			usablePageSize, cast(const uint)(data.length / header.pageSize));
35 	}
36 
37 	const(uint) usablePageSize() pure const {
38 		return header.pageSize - header.reserved;
39 	}
40 
41 	struct PageRange {
42 		const ubyte[] data;
43 		const uint pageSize;
44 		const uint usablePageSize;
45 
46 		const uint numberOfPages;
47 	pure :
48 		@property uint length() const {
49 			return numberOfPages;
50 		}
51 
52 		BTreePage opIndex(const uint pageNumber) pure const {
53 			assert(pageNumber <= numberOfPages,
54 				"Attempting to go to invalid page");
55 
56 			const size_t pageBegin = 
57 				(pageSize * pageNumber);
58 			const size_t pageEnd = 
59 				(pageSize * pageNumber) + usablePageSize;
60 
61 			return BTreePage(data[pageBegin .. pageEnd], pageNumber ? 0 : 100);
62 		}
63 
64 		this(const ubyte[] data, const uint pageSize,
65 			const uint usablePageSize, const uint numberOfPages) pure {
66 			this.data = data;
67 			this.pageSize = pageSize;
68 			this.usablePageSize = usablePageSize;
69 			this.numberOfPages = numberOfPages;
70 			assert(usablePageSize >= 512);
71 		}
72 
73 	}
74 
75 	static struct Payload {
76 		alias SerialTypeCodeEnum = SerialTypeCode.SerialTypeCodeEnum;
77 		static struct SerialTypeCode {
78 			alias SerialTypeCodeEnum this;
79 
80 			static enum SerialTypeCodeEnum : long {
81 				NULL = (0),
82 				int8 = (1),
83 				int16 = (2),
84 				int24 = (3),
85 				int32 = (4),
86 				int48 = (5),
87 				int64 = (6),
88 				float64 = (7),
89 
90 				bool_false = (8),
91 				bool_true = (9),
92 
93 				blob = (12),
94 				_string = (13)
95 			}
96 
97 			this(VarInt v) pure nothrow {
98 				long _v = v; // this could be an int
99                                              // also the varint handling would not need to swap 8 bytes
100 
101 				if (_v > 11) {
102 					if (_v & 1) {
103 						type = SerialTypeCode.SerialTypeCodeEnum._string;
104 						length = (_v - 13) / 2;
105 					} else {
106 						type = SerialTypeCode.SerialTypeCodeEnum.blob;
107 						length = (_v - 12) / 2;
108 					}
109 				} else {
110 					static immutable uint[11] lengthTbl = [0, 1, 2, 3, 4, 6, 8, 0, 0, -1, -1];
111 					type = cast(SerialTypeCodeEnum) _v;
112 					length = lengthTbl[_v];
113 				}
114 
115 			}
116 
117 			SerialTypeCodeEnum type;
118 			long length;
119 		}
120 
121 		union {
122 			byte int8;
123 			short int16;
124 			int int24;
125 			int int32;
126 			long int48;
127 			long int64;
128 			double float64;
129 			const(char)[] _string;
130 			ubyte[] blob;
131 		}
132 
133 		SerialTypeCode typeCode;
134 		alias length = typeCode.length;
135 		alias type = typeCode.type;
136 	}
137 
138 	static struct Table {
139 		TableSchema schema;
140 	//	Row[] rows;
141 	}
142 
143 	static align(1) struct SQLiteHeader {
144 
145 		bool isValid() pure {
146 			return magicString == "SQLite format 3\0";
147 		}
148 
149 	align(1):
150 		// ALL NUMBERS ARE BIGENDIAN
151 		static {
152 			enum TextEncoding : uint {
153 				utf8 = bigEndian(1),
154 				utf16le = bigEndian(2),
155 				utf16be = bigEndian(3)
156 			}
157 
158 			enum SchemaFormat : uint {
159 				_1 = bigEndian(1),
160 				_2 = bigEndian(2),
161 				_3 = bigEndian(3),
162 				_4 = bigEndian(4),
163 			}
164 
165 			enum FileFormat : ubyte {
166 				legacy = 1,
167 				wal = 2,
168 			}
169 
170 			struct Freelist {
171 				BigEndian!uint nextPage;
172 				BigEndian!uint leafPointers; // Number if leafPoinrers;
173 			}
174 
175 		}
176 
177 		char[16] magicString;
178 
179 		BigEndian!ushort pageSize; /// between 512 and 32768 or 1
180 
181 		FileFormat FileFormatWriteVer;
182 		FileFormat FileFormatReadVer;
183 
184 		ubyte reserved; /// unused bytes at the end of each page
185 
186 		immutable ubyte maxEmbeddedPayloadFract = 64;
187 		immutable ubyte minEmbeddedPayloadFract = 32;
188 		immutable ubyte leafPayloadFract = 32;
189 		BigEndian!uint fileChangeCounter;
190 		BigEndian!uint sizeInPages; /// fileChangeCounter has to match validForVersion or sizeInPages is invalid
191 
192 		BigEndian!uint firstFreelistPage; /// Page number of the first freelist trunk page. 
193 		BigEndian!uint freelistPages; /// Total number of freelist Pages;
194 
195 		BigEndian!uint schemaCookie;
196 		SchemaFormat schemaFormatVer; // 1,2,3 or 4
197 
198 		BigEndian!uint defaultCacheSize;
199 		BigEndian!uint largestRootPage; /// used in incermental and auto vacuum modes. 0 otherwise.
200 
201 		TextEncoding textEncoding;
202 
203 		BigEndian!uint userVersion;
204 
205 		BigEndian!uint incrementalVacuum; /// Non-Zero if on, Zero otherwise;
206 
207 		BigEndian!uint applicationId;
208 
209 		BigEndian!uint[5] _reserved; /// Reserved space for future format expansion 
210 
211 		BigEndian!uint validForVersion;
212 		BigEndian!uint sqliteVersion;
213 
214 		static SQLiteHeader fromArray (const ubyte[] raw) pure {
215 			assert(raw.length >= this.sizeof);
216 			if (__ctfe) {
217 				SQLiteHeader result;
218 
219 				result.magicString = cast(char[])raw[0 .. 16];
220 				result.pageSize = raw[16 .. 18];
221 				result.FileFormatWriteVer = cast(FileFormat)raw[18 .. 19][0];
222 				result.FileFormatReadVer = cast(FileFormat)raw[19 .. 20][0];
223 				result.reserved = raw[20 .. 21][0];
224 				// maxEmbeddedPayloadFract ff.
225 				// are immutable values and do not need to be read
226 
227 				//	result.maxEmbeddedPayloadFract = raw[21 .. 22];
228 				//	result.minEmbeddedPayloadFract = raw[22 .. 23];
229 				//	result.leafPayloadFract = raw[23 .. 24]
230 
231 				result.fileChangeCounter = raw[24 .. 28];
232 				result.sizeInPages = raw[28 .. 32];
233 				result.firstFreelistPage = raw[32 .. 36];
234 
235 				return result;
236 			} else {
237 				return *(cast(SQLiteHeader*) raw);
238 			}
239 		}
240 	}
241 
242 	this(string filename, bool readEntirely = true) {
243 		import std.file;
244 
245 		auto data = cast(ubyte[]) read(filename);
246 		this(data, filename);
247 	}
248 	/// If you pass null as buffer a new one will be gc-allocated;
249 	this(const ubyte[] buffer, string filename = ":Memory:") pure {
250 		if (buffer is null) {
251 			ubyte[] myBuffer = new ubyte[](1024);
252 			data = myBuffer;
253 			///TODO write a suitable default header here.
254 		} else {
255 			data = cast(ubyte[])buffer;
256 		}
257 		dbFilename = filename;
258 		_cachedHeader = SQLiteHeader.fromArray(buffer); 
259 		assert(_cachedHeader.magicString[0..6] == "SQLite");
260 	}
261 
262 	/**
263 	 * CREATE TABLE sqlite_master(
264 	 *	type text,
265 	 *	name text,
266 	 *	tbl_name text,
267 	 *	rootpage integer,
268 	 *	sql text
269 	 * );
270 	 *	 
271 	 */
272 	static struct MasterTableSchema {
273 		string type;
274 		string name;
275 		string tbl_name;
276 		uint rootPage;
277 		string sql;
278 	}
279 
280 	static struct TableSchema {
281 		static struct SchemaEntry {
282 			//	uint colNumber;
283 			string columnName;
284 			string TypeName;
285 			string defaultValue;
286 			bool isPrimayKey;
287 			bool notNull;
288 		}
289 	}
290 
291 
292 
293 	static struct BTreePage {
294 	
295 		enum BTreePageType : ubyte {
296 			emptyPage = 0,
297 			indexInteriorPage = 2,
298 			tableInteriorPage = 5,
299 			indexLeafPage = 10,
300 			tableLeafPage = 13
301 		}
302 
303 	
304 		const ubyte[] page;
305 		const uint headerOffset;
306 		
307 		static struct Row {
308 			const PageRange pages;
309 			const uint payloadSize;
310 			const uint rowId;
311 			const uint payloadHeaderSize;
312 			const ubyte[] payloadHeaderBytes;
313 			const ubyte[] payloadStart;
314 			const BTreePageType pageType;
315 
316 			auto column(const uint colNum) pure const {
317 				auto payloadHeader = PayloadHeader(payloadHeaderBytes);
318 				uint offset;
319 				
320 				foreach(_; 0 .. colNum) {
321 					offset += payloadHeader.front().length;
322 					payloadHeader.popFront();
323 				}
324 				
325 				auto typeCode = payloadHeader.front();
326 				uint payloadEnd = cast(uint) (offset + typeCode.length);
327 				
328 				if (payloadStart.length > payloadEnd) {
329 					return extractPayload(payloadStart[offset .. payloadEnd], typeCode);
330 				} else {
331 					auto overflowInfo = OverflowInfo(payloadStart, offset, payloadSize, pages, payloadHeaderSize, pageType);
332 					return extractPayload(&overflowInfo, typeCode, pages);
333 				}
334 			}
335 		}
336 
337 
338 
339 		Row getRow(const ushort cellPointer, const PageRange pages, const BTreePageType pageType) pure const {
340 			uint offset = cellPointer;
341 			import std.algorithm : min;
342 
343 			auto payloadSize = VarInt(page[offset .. $]);
344 			offset += payloadSize.length;
345 
346 			ulong rowId;
347 			if (pageType == BTreePageType.tableLeafPage) {
348 				VarInt rowId_v = VarInt(page[offset .. $]);
349 				rowId = rowId_v;
350 				offset += rowId_v.length;
351 			}
352 			
353 			auto payloadHeaderSize = VarInt(page[offset ..  page.length]);
354 			uint _payloadHeaderSize = cast(uint)payloadHeaderSize;
355 
356 			if (_payloadHeaderSize > page.length - offset) {
357 				assert(0, "Overflowing payloadHeaders are currently not handeled");
358 			}
359 
360 			auto ph = page[offset + payloadHeaderSize.length .. offset + _payloadHeaderSize];
361 			offset += _payloadHeaderSize;
362 			// TODO The payloadHeader does not have to be sliced off here
363 			// We can potentially do better of we pass just one buffer to struct Row and slice there.
364 
365 			return Row(pages, cast(uint) payloadSize, cast(uint) rowId, cast(uint) _payloadHeaderSize , ph, page[offset .. $], pageType);
366 		}
367 
368 		//		string toString(PageRange pages) {
369 		//			import std.conv;
370 		//
371 		//			auto pageType = header.pageType;
372 		//			string result = to!string(pageType);
373 		//
374 		//			auto cellPointers = getCellPointerArray();
375 		//			foreach (cp; cellPointers) {
376 		//				ubyte* printPtr = cast(ubyte*) base + cp;
377 		//
378 		//				final switch (header.pageType) with (
379 		//					BTreePageHeader.BTreePageType) {
380 		//				case emptyPage:
381 		//					result ~= "This page is Empty or the pointer is bogus\n";
382 		//					break;
383 		//
384 		//				case tableLeafPage: {
385 		//						auto singlePagePayloadSize = usablePageSize - 35;
386 		//						result ~= "singlePagePayloadSize : " ~ (
387 		//							singlePagePayloadSize).to!string ~ "\n";
388 		//						auto payloadSize = VarInt(printPtr);
389 		//						result ~= "payloadSize: " ~ (payloadSize).to!string
390 		//							~ "\n";
391 		//						printPtr += payloadSize.length;
392 		//						auto rowid = VarInt(printPtr);
393 		//						result ~= "rowid: " ~ (rowid).to!string ~ "\n";
394 		//						printPtr += rowid.length;
395 		//
396 		//						auto payloadHeaderSize = VarInt(printPtr);
397 		//						result ~= "payloadHeaderSize: " ~ (payloadHeaderSize)
398 		//							.to!string ~ "\n";
399 		//
400 		//						printPtr += payloadHeaderSize.length;
401 		//
402 		//						auto typeCodes = processPayloadHeader(printPtr,
403 		//								payloadHeaderSize);
404 		//						printPtr += payloadHeaderSize - payloadHeaderSize.length;
405 		//
406 		//						import std.algorithm;
407 		//						assert(typeCodes.map!(tc => tc.length).sum == payloadSize - payloadHeaderSize);
408 		//
409 		//						result ~= "{ ";
410 		//						if (payloadSize < singlePagePayloadSize) {
411 		//							foreach (typeCode; typeCodes) {
412 		//								auto p = extractPayload(printPtr, typeCode);
413 		//								result ~= p.apply!(
414 		//									v => "\t\"" ~ to!string(v) ~ "\",\n");
415 		//								printPtr += typeCode.length;
416 		//							}
417 		//						} else {
418 		//							auto overflowInfo = OverflowInfo();
419 		//
420 		//							overflowInfo.remainingTotalPayload = cast(uint)payloadSize;
421 		//							overflowInfo.payloadOnFirstPage = 
422 		//								payloadOnPage(cast(uint)(payloadSize)) - cast(uint)payloadHeaderSize;
423 		//
424 		//							foreach (typeCode; typeCodes) {
425 		//								auto p = extractPayload(&printPtr, typeCode,
426 		//										&overflowInfo, pages);
427 		//								auto str = p.apply!(v => "\t\"" ~ to!string(v) ~ "\",\n");
428 		//								result ~= str;
429 		//
430 		//							}
431 		//						}
432 		//
433 		//						result ~= " }\n";
434 		//
435 		//						printPtr += payloadSize;
436 		//					}
437 		//					break;
438 		//
439 		//				case tableInteriorPage: {
440 		//						BigEndian!uint leftChildPointer = *(
441 		//							cast(uint*) printPtr);
442 		//						result ~= "nextPagePointer: " ~ (leftChildPointer)
443 		//							.to!string ~ "\n";
444 		//						//auto lc = BTreePage(base, usablePageSize, leftChildPointer);
445 		//						//result ~= to!string(lc);
446 		//						printPtr += uint.sizeof;
447 		//						auto integerKey = VarInt(printPtr);
448 		//						result ~= "integerKey: " ~ (integerKey).to!string
449 		//							~ "\n";
450 		//						printPtr += integerKey.length;
451 		//					}
452 		//					
453 		//					break;
454 		//
455 		//				case indexLeafPage: {
456 		//						auto payloadSize = VarInt(cast(ubyte*) printPtr);
457 		//						result ~= "payloadSize: " ~ (payloadSize).to!string
458 		//							~ "\n";
459 		//						printPtr += payloadSize.length;
460 		//						auto payloadHeaderSize = VarInt(cast(ubyte*) printPtr);
461 		//						printPtr += payloadHeaderSize.length;
462 		//						result ~= "payloadHeaderSize: " ~ (payloadHeaderSize)
463 		//							.to!string ~ "\n";
464 		//						auto typeCodes = processPayloadHeader(printPtr,
465 		//								payloadHeaderSize);
466 		//						foreach (typeCode; typeCodes) {
467 		//							result ~= to!string(typeCode) ~ ", ";
468 		//						}
469 		//						result ~= "\n";
470 		//						result ~= "rs-phs : " ~ to!string(
471 		//							payloadSize - payloadHeaderSize) ~ "\n";
472 		//						auto payload = CArray!char.toArray(
473 		//							cast(ubyte*)(printPtr + payloadHeaderSize),
474 		//								payloadSize - payloadHeaderSize);
475 		//						printPtr += payloadSize;
476 		//						result ~= (payload.length) ? (payload).to!string : "";
477 		//						
478 		//					}
479 		//					break;
480 		//
481 		//				case indexInteriorPage: {
482 		//						BigEndian!uint leftChildPointer = 
483 		//							*(cast(uint*) printPtr);
484 		//						result ~= "leftChildPinter: " ~ (leftChildPointer)
485 		//							.to!string ~ "\n";
486 		//						VarInt payloadSize;
487 		//						CArray!ubyte _payload;
488 		//						BigEndian!uint _firstOverflowPage;
489 		//						//assert(0,"No support for indexInteriorPage");
490 		//					}
491 		//					break;
492 		//
493 		//				}
494 		//
495 		//			}
496 		//			return result;
497 		//		}
498 
499 		static align(1) struct BTreePageHeader {
500 		align(1):
501 			//	pure :
502 			BTreePageType _pageType;
503 			BigEndian!ushort firstFreeBlock;
504 			BigEndian!ushort cellsInPage;
505 			BigEndian!ushort startCellContantArea; /// 0 is interpreted as 65536
506 			ubyte fragmentedFreeBytes;
507 			BigEndian!uint _rightmostPointer;
508 
509 			static BTreePageHeader fromArray(const ubyte[] _array) pure {
510 				assert(_array.length >= this.sizeof);
511 				if (__ctfe) {
512 					BTreePageHeader result;
513 
514 					result._pageType = cast(BTreePageType)_array[0];
515 					result.firstFreeBlock = _array[1 .. 3]; 
516 					result.cellsInPage = _array[3 .. 5];
517 					result.startCellContantArea = _array[5 .. 7];
518 					result.fragmentedFreeBytes = _array[7];
519 					if (result.isInteriorPage) {
520 						result._rightmostPointer = _array[8 .. 12];
521 					}
522 
523 					return result;
524 				} else {
525 					return *(cast (BTreePageHeader*) _array.ptr);
526 				}
527 			}
528 
529 			bool isInteriorPage() pure const {
530 				return (pageType == pageType.indexInteriorPage
531 					|| pageType == pageType.tableInteriorPage);
532 			}
533 
534 			@property auto pageType() const pure {
535 				return cast(const) _pageType;
536 			}
537 
538 			@property BigEndian!uint rightmostPointer() {
539 				assert(isInteriorPage,
540 					"the rightmost pointer is only in interior nodes");
541 				return _rightmostPointer;
542 			}
543 
544 			@property void rightmostPointer(uint rmp) {
545 				assert(isInteriorPage,
546 					"the rightmost pointer is only in interior nodes");
547 				_rightmostPointer = rmp;
548 			}
549 
550 			uint length() pure const {
551 				return 12 - (isInteriorPage ? 0 : 4);
552 			}
553 
554 //			string toString() {
555 //				import std.conv;
556 //
557 //				string result = "pageType:\t";
558 //				result ~= to!string(pageType) ~ "\n";
559 //				result ~= "firstFreeBlock:\t";
560 //				result ~= to!string(firstFreeBlock) ~ "\n";
561 //				result ~= "cellsInPage:\t";
562 //				result ~= to!string(cellsInPage) ~ "\n";
563 //				result ~= "startCellContantArea:\t";
564 //				result ~= to!string(startCellContantArea) ~ "\n";
565 //				result ~= "fragmentedFreeBytes:\t";
566 //				result ~= to!string(fragmentedFreeBytes) ~ "\n";
567 //				if (isInteriorPage) {
568 //					result ~= "_rightmostPointer";
569 //					result ~= to!string(_rightmostPointer) ~ "\n";
570 //				}
571 //
572 //				return result;
573 //			}
574 		}
575 
576 		bool hasPayload() const pure {
577 			final switch (pageType) with (BTreePageType) {
578 				case emptyPage:
579 					return false;
580 				case tableInteriorPage:
581 					return false;
582 				case indexInteriorPage:
583 					return true;
584 				case indexLeafPage:
585 					return true;
586 				case tableLeafPage:
587 					return true;
588 			}
589 		}
590 
591 		auto payloadSize(BigEndian!ushort cp) {
592 			assert(hasPayload);
593 			final switch (pageType) with (BTreePageType) {
594 				case emptyPage:
595 				case tableInteriorPage:
596 					assert(0, "page has no payload");
597 					
598 				case indexInteriorPage:
599 					return VarInt(page[cp + uint.sizeof .. $]);
600 				case indexLeafPage:
601 					return VarInt(page[cp .. $]);
602 				case tableLeafPage:
603 					return VarInt(page[cp .. $]);
604 			}
605 		}
606 
607 		BTreePageHeader header() const pure {
608 			return BTreePageHeader.fromArray(page[headerOffset.. headerOffset + BTreePageHeader.sizeof]);
609 		}
610 
611 		BigEndian!ushort[] getCellPointerArray() const pure {
612 			auto offset = header.length + headerOffset;
613 			return page[offset .. offset + header.cellsInPage * ushort.sizeof]
614 				.toArray!(BigEndian!ushort)(header.cellsInPage);
615 		}
616 
617 		BTreePageType pageType() const pure {
618 			return (header()).pageType;
619 		}
620 
621 		struct PayloadHeader {
622 			const ubyte[] payloadHeader;
623 			uint offset;
624 			uint _length;
625 			 
626 			void popFront() pure {
627 				assert(offset < payloadHeader.length);
628 				offset += _length;
629 				_length = 0;
630 			}
631 
632 			Payload.SerialTypeCode front() pure {
633 				auto v = VarInt(payloadHeader[offset .. $]);
634 				_length = cast(uint)v.length;
635 				return Payload.SerialTypeCode(v);
636 			}
637 
638 			bool empty() pure const {
639 				return offset == payloadHeader.length;
640 			}
641 		}
642 
643 
644 	}
645 }
646 
647 struct OverflowInfo {
648 	const(ubyte)[] pageSlice;
649 	uint nextPageIdx;
650 
651 	this(const ubyte[] payloadStart, int offset, const uint payloadSize, const Database.PageRange pages, const uint payloadHeaderSize, const Database.BTreePageType pageType) pure {
652 		import std.algorithm : min;
653 		
654 		uint x;
655 		if (isIndex(pageType)) {
656 			x = ((pages.usablePageSize - 12) * 32 / 255) - 23;
657 		} else {
658 			x = pages.usablePageSize - 35; 
659 		}
660 		
661 		if (payloadSize > x) {
662 			auto m = ((pages.usablePageSize - 12) * 32 / 255) - 23;
663 			auto k = m + ((payloadSize - m) % (pages.usablePageSize - 4));
664 
665 			auto payloadOnFirstPage = (k <= x ? k : m) - payloadHeaderSize;
666 	
667 			nextPageIdx = BigEndian!uint(payloadStart[payloadOnFirstPage .. payloadOnFirstPage + uint.sizeof]);
668 		
669 			if(offset > payloadOnFirstPage) {
670 				offset -= payloadOnFirstPage;
671 				gotoNextPage(pages);
672 
673 				auto payloadOnOverflowPage = pages.usablePageSize - uint.sizeof;
674 				while(offset>payloadOnOverflowPage) {
675 					gotoNextPage(pages);
676 					offset -= payloadOnOverflowPage;
677 				}
678 			} else {
679 				pageSlice = payloadStart[0 .. payloadOnFirstPage];
680 			}
681 		} else {
682 			pageSlice = payloadStart;
683 		}
684 		pageSlice = pageSlice[offset .. $];
685 	}
686 
687 	void gotoNextPage(const Database.PageRange pages) pure {
688 		assert(nextPageIdx != 0, "No next Page to go to");
689 		auto nextPage = pages[nextPageIdx - 1];
690 		nextPageIdx = BigEndian!uint(nextPage.page[0 .. uint.sizeof]);
691 		pageSlice = nextPage.page[uint.sizeof .. $];
692 	}
693 
694 }
695 
696 
697 static Database.Payload extractPayload(
698 	OverflowInfo* overflowInfo,
699 	const Database.Payload.SerialTypeCode typeCode,
700 	const Database.PageRange pages,
701 	) pure {
702 	
703 
704 	if (overflowInfo.pageSlice.length >= typeCode.length) {
705 		auto _length = cast(uint) typeCode.length;
706 
707 		auto payload = overflowInfo.pageSlice[0 .. _length];
708 		overflowInfo.pageSlice = overflowInfo.pageSlice[_length .. $];
709 
710 		if (overflowInfo.pageSlice.length == uint.sizeof) {
711 			assert(0, "I do not expect us to ever get here\n" ~
712 				"If we ever do, uncomment the two lines below and delete this assert");
713 		//		overflowInfo.nextPageIdx = BigEndian(overflowInfo.pageSlice[0 .. uint.sizeof]);
714 		//		overflowInfo.gotoNextPage(pages);
715 		}
716 
717 		return extractPayload(payload, typeCode);
718 	} else { // typeCode.length > payloadOnFirstPage
719 		alias et = Database.Payload.SerialTypeCode.SerialTypeCodeEnum;
720 		// let's assume SQLite is sane and does not split primitive types in the middle
721 		assert(typeCode.type == et.blob || typeCode.type == et._string);
722 		
723 		auto remainingBytesOfPayload = cast(uint) typeCode.length;
724 		ubyte[] _payloadBuffer;
725 		
726 		for (;;) {
727 
728 			import std.algorithm : min;
729 			
730 			auto readBytes = cast(uint) min(overflowInfo.pageSlice.length,
731 				remainingBytesOfPayload);
732 
733 			remainingBytesOfPayload -= readBytes;
734 
735 			_payloadBuffer ~= overflowInfo.pageSlice[0 .. readBytes];
736 
737 			if (remainingBytesOfPayload == 0) {
738 				assert(typeCode.length == _payloadBuffer.length);
739 
740 				return extractPayload(cast(const)_payloadBuffer, typeCode);
741 			} else {
742 				if (overflowInfo.nextPageIdx == 0 && !remainingBytesOfPayload) {
743 					//This is a very special-case 
744 					//Hopefully we don't hit it :)
745 					assert(0, "Moow!");
746 				}
747 				overflowInfo.gotoNextPage(pages);
748 			}
749 		}
750 	}
751 }
752 
753 static Database.Payload extractPayload(const ubyte[] startPayload,
754 	const Database.Payload.SerialTypeCode typeCode) pure {
755 	Database.Payload p;
756 	p.typeCode = typeCode;
757 	
758 	final switch (typeCode.type) {
759 		case typeof(typeCode).int8:
760 			p.int8 = *cast(byte*) startPayload;
761 			break;
762 		case typeof(typeCode).int16:
763 			p.int16 = *cast(BigEndian!short*) startPayload;
764 			break;
765 		case typeof(typeCode).int24:
766 			p.int24 = (*cast(BigEndian!int*) startPayload) & 0xffff_ff;
767 			break;
768 		case typeof(typeCode).int32:
769 			p.int32 = *cast(BigEndian!int*) startPayload;
770 			break;
771 		case typeof(typeCode).int48:
772 			p.int48 = (*cast(BigEndian!long*) startPayload) & 0xffff_ffff_ffff;
773 			break;
774 		case typeof(typeCode).int64:
775 			p.int64 = *cast(BigEndian!long*) startPayload;
776 			break;
777 		case typeof(typeCode).float64:
778 			if (!__ctfe) {
779 				p.float64 = cast(double) (*cast(BigEndian!long*) startPayload);
780                 break;
781 			} else
782 				assert(0, "not supporeted at CTFE yet");
783 		//	break;
784 		case typeof(typeCode).blob:
785 			p.blob = cast(ubyte[]) startPayload[0 .. cast(uint) typeCode.length];
786 			break;
787 		case typeof(typeCode)._string:
788 			p._string = cast(string) startPayload[0 .. cast(uint) typeCode.length];
789 			break;
790 			
791 		case typeof(typeCode).NULL:
792 		case typeof(typeCode).bool_false:
793 		case typeof(typeCode).bool_true:
794 			break;
795 	}
796 	
797 	return p;
798 }
799 
800 
801 
802 auto apply(alias handler)(const Database.Payload p) {
803 	final switch (p.typeCode.type) with (typeof(p.typeCode.type)) {
804 		case NULL:
805 			static if (__traits(compiles, handler(null))) {
806 				return handler(null);
807 			} else {
808 				assert(0, "handler cannot take null");
809 			}
810 		case int8:
811 			static if (__traits(compiles, handler(p.int8))) {
812 				return handler(p.int8);
813 			} else {
814 				assert(0);
815 			}
816 		case int16:
817 			static if (__traits(compiles, handler(p.int16))) {
818 				return handler(p.int16);
819 			} else {
820 				assert(0);
821 			}
822 		case int24:
823 			static if (__traits(compiles, handler(p.int24))) {
824 				return handler(p.int24);
825 			} else {
826 				assert(0);
827 			}
828 		case int32:
829 			static if (__traits(compiles, handler(p.int32))) {
830 				return handler(p.int32);
831 			} else {
832 				assert(0);
833 			}
834 		case int48:
835 			static if (__traits(compiles, handler(p.int48))) {
836 				return handler(p.int48);
837 			} else {
838 				assert(0);
839 			}
840 		case int64:
841 			static if (__traits(compiles, handler(p.int64))) {
842 				return handler(p.int64);
843 			} else {
844 				assert(0);
845 			}
846 		case float64:
847 			static if (__traits(compiles, handler(p.float64))) {
848 				return handler(p.float64);
849 			} else {
850 				assert(0);
851 			}
852 		case bool_false:
853 			static if (__traits(compiles, handler(false))) {
854 				return handler(false);
855 			} else {
856 				assert(0);
857 			}
858 		case bool_true:
859 			static if (__traits(compiles, handler(true))) {
860 				return handler(true);
861 			} else {
862 				assert(0);
863 			}
864 		case blob:
865 			static if (__traits(compiles, handler(p.blob))) {
866 				return handler(p.blob);
867 			} else {
868 				assert(0);
869 			}
870 		case _string:
871 			static if (__traits(compiles, handler(p._string))) {
872 				return handler(p._string);
873 			} else {
874 				assert(0,"handler " ~ typeof(handler).stringof ~ " cannot be called with string");
875 			}
876 	}
877 }
878 pure :
879 auto getAs(T)(Database.Payload p) {
880 	return p.apply!(v => cast(T) v);
881 }
882 
883 auto getAs(T)(Database.Row r, uint columnIndex) {
884 	return r.column(columnIndex).getAs!T();
885 }
886 
887 auto getAs(T)(Database.Row r, Database.TableSchema s, string colName) {
888 	return r.getAs!T(s.getcolumnIndex(colName));
889 }
890 
891 unittest {
892 	Database.Payload p;
893 	p.typeCode.type = Database.Payload.SerialTypeCodeEnum.bool_true;
894 	assert(p.getAs!(int) == 1);
895 	p.typeCode.type = Database.Payload.SerialTypeCodeEnum.bool_false;
896 	assert(p.getAs!(int) == 0);
897 }
898 
899 bool isIndex(const Database.BTreePage.BTreePageType pageType) pure {
900 	return ((pageType & 2) ^ 2) == 0; 
901 }
902 
903 static assert(isIndex(Database.BTreePageType.indexLeafPage) && isIndex(Database.BTreePageType.indexInteriorPage) && !isIndex(Database.BTreePageType.tableLeafPage));