Add a library for running SQLite databases in threads (#7350)

This commit is contained in:
Mia 2021-07-19 21:02:03 -05:00 committed by GitHub
parent b175d8b000
commit 2c1b7d7828
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 290 additions and 2 deletions

View File

@ -6,3 +6,4 @@ export {FS} from './fs';
export * as Utils from './utils';
export {crashlogger} from './crashlogger';
export * as ProcessManager from './process-manager';
export {SQL} from './sql';

View File

@ -93,7 +93,7 @@ class RawSubprocessStream extends Streams.ObjectReadWriteStream<string> {
}
}
interface ProcessWrapper {
export interface ProcessWrapper {
getLoad: () => number;
process: ChildProcess | Worker;
release: () => Promise<void>;
@ -503,7 +503,7 @@ export abstract class ProcessManager<T extends ProcessWrapper = ProcessWrapper>
return unspawned;
}
abstract listen(): void;
abstract createProcess(): T;
abstract createProcess(...args: any): T;
destroyProcess(process: T) {}
destroy() {
const index = processManagers.indexOf(this);

257
lib/sql.ts Normal file
View File

@ -0,0 +1,257 @@
/**
* Async worker thread wrapper around SQLite, written to improve concurrent performance.
* @author mia-pi-git
*/
import {ProcessWrapper, ProcessManager} from './process-manager';
import * as child_process from 'child_process';
import * as path from 'path';
import {FS} from './fs';
import type * as sqlite from 'better-sqlite3';
interface SQLOptions {
file: string;
/** file to import database functions from - this should be relative to this filename. */
extension?: string;
}
type DataType = unknown[] | Record<string, unknown>;
type Statement = string | number;
type DatabaseQuery = {
/** Prepare a statement - data is the statement. */
type: 'prepare', data: string,
} | {
/** Get all lines from a statement. Data is the params, num is the statement number. */
type: 'all', data: DataType, num: number,
} | {
/** Execute raw SQL in the database. */
type: "exec", data: string,
} | {
/** Get one line from a prepared statement. */
type: 'get', data: DataType, num: number,
} | {
/** Run a prepared statement. */
type: 'run', data: DataType, num: number,
} | {
type: 'transaction', num: number, data: DataType,
};
function getModule() {
try {
return require('better-sqlite3') as typeof sqlite.default;
} catch {
return null;
}
}
export class DatabaseWrapper implements ProcessWrapper {
statements: Map<string, number>;
process: child_process.ChildProcess;
pendingRequests: ((data: string | DataType | null) => any)[];
constructor(options: SQLOptions) {
this.statements = new Map();
this.pendingRequests = [];
this.process = child_process.fork(__filename, [], {
env: options as AnyObject, cwd: path.resolve(__dirname, '..'), execArgv: ['-r', 'ts-node/register'],
});
this.listen();
}
getProcess() {
return this.process;
}
listen() {
this.process.on("message", (message: DataType) => {
const resolver = this.pendingRequests.shift();
if (resolver) return resolver(message);
throw new Error(`Database wrapper received a message, but there was no pending request.`);
});
}
release() {
this.statements.clear();
this.process.kill();
for (const resolver of this.pendingRequests.values()) {
resolver(null);
}
return Promise.resolve();
}
getLoad() {
return this.pendingRequests.length;
}
runFile(filename: string) {
const file = FS(filename);
if (!file.existsSync()) throw new Error(`File passed to runFile does not exist.`);
if (!filename.endsWith('.sql')) throw new Error(`File passed to runFile is not a .sql file.`);
const content = file.readSync();
void this.exec(content);
}
async prepare(statement: string) {
const cachedStatement = this.statements.get(statement);
if (cachedStatement) {
return cachedStatement;
}
const int = await this.query({type: 'prepare', data: statement});
if (typeof int === 'number') this.statements.set(statement, int);
return int;
}
all(statement: Statement, data: DataType = {}) {
const num = typeof statement === 'number' ? statement : this.statements.get(statement);
if (num === undefined || ![...this.statements.values()].includes(num)) {
throw new Error(`Prepare a statement before using another database function with SQLDatabase.prepare.`);
}
return this.query({type: 'all', num: num, data});
}
get(statement: Statement, data: DataType = {}) {
const num = typeof statement === 'number' ? statement : this.statements.get(statement);
if (num === undefined || ![...this.statements.values()].includes(num)) {
throw new Error(`Prepare a statement before using another database function with SQLDatabase.prepare.`);
}
return this.query({type: 'get', data, num});
}
run(statement: Statement, data: DataType) {
const num = typeof statement === 'number' ? statement : this.statements.get(statement);
if (num === undefined || ![...this.statements.values()].includes(num)) {
throw new Error(`Prepare a statement before using another database function with SQLDatabase.prepare.`);
}
return this.query({type: 'run', num, data});
}
/** We don't want these to be prepared statements, since this should be used SPARINGLY. */
exec(statement: string) {
return this.query({type: 'exec', data: statement});
}
query(args: DatabaseQuery) {
this.process.send(args);
return new Promise<any>(resolve => {
this.pendingRequests.push(resolve);
});
}
}
class SQLProcessManager extends ProcessManager {
constructor(module: NodeJS.Module) {
super(module);
}
createProcess(options: SQLOptions) {
const process: ProcessWrapper = new DatabaseWrapper(options);
this.processes.push(process);
return process;
}
listen() {}
destroyProcess(process: DatabaseWrapper) {
void process.release();
this.processes.splice(this.processes.indexOf(process), 1);
}
}
export const PM = new SQLProcessManager(module);
const Database = getModule();
if (!PM.isParentProcess) {
let statementNum = 0;
const statements: Map<number, sqlite.Statement> = new Map();
const transactions: Map<number, sqlite.Transaction> = new Map();
const {file, extension} = process.env;
const database = Database ? new Database(file!) : null;
if (extension && database) {
const {
functions,
transactions: storedTransactions,
statements: storedStatements,
onDatabaseStart,
// eslint-disable-next-line @typescript-eslint/no-var-requires
} = require(`../${extension}`);
if (functions) {
for (const k in functions) {
database.function(k, functions[k]);
}
}
if (storedTransactions) {
for (const t in storedTransactions) {
const transaction = database.transaction(storedTransactions[t]);
transactions.set(transactions.size + 1, transaction);
}
}
if (storedStatements) {
for (const k in storedStatements) {
const statement = database.prepare(storedStatements[k]);
statements.set(statementNum++, statement); // we use statementNum here to track with the rest
}
}
if (onDatabaseStart) {
onDatabaseStart(database);
}
}
database?.pragma(`foreign_keys=on`);
process.on('message', (query: DatabaseQuery) => {
let statement;
let results;
switch (query.type) {
case 'prepare': {
if (!database) return process.send!(-1);
const {data} = query;
const newStatement = database.prepare(data);
const nextNum = statementNum++;
statements.set(nextNum, newStatement);
return process.send!(nextNum);
}
case 'all': {
if (!database) {
results = [];
break;
}
const {num, data} = query;
statement = statements.get(num);
results = statement?.all(data) || [];
}
break;
case 'get': {
if (!database) {
results = null;
break;
}
const {num, data} = query;
statement = statements.get(num);
results = statement?.get(...data as any) || null;
}
break;
case 'run': {
if (!database) {
results = null;
break;
}
const {num, data} = query;
statement = statements.get(num);
results = statement?.run(...data as any) || null;
}
break;
case 'exec': {
const {data} = query;
database?.exec(data);
results = !!database;
}
break;
case 'transaction': {
if (!database) {
results = null;
break;
}
const {num, data} = query;
const transaction = transactions.get(num);
if (!transaction) {
results = null;
break;
}
results = transaction(database, data);
}
break;
}
process.send!(results);
});
}
/**
* @param options Either an object of filename, extension, or just the string filename
*/
export function SQL(options: SQLOptions | string) {
if (typeof options === 'string') options = {file: options};
return PM.createProcess(options);
}

30
test/lib/sql.js Normal file
View File

@ -0,0 +1,30 @@
"use strict";
const {SQL} = require('../../lib/sql');
const assert = require('../assert').strict;
const database = SQL(`:memory:`);
describe(`SQLite worker wrapper`, async () => {
// prepare statements and set up table
await database.exec(`CREATE TABLE IF NOT EXISTS test (col TEXT, col2 TEXT)`);
const select = await database.prepare(`SELECT * FROM test`); // 0
const insert = await database.prepare(`INSERT INTO test (col, col2) VALUES (?, ?)`); // 1
it(`should require you to prepare a statement before running`, async () => {
assert.throws(async () => {
await database.get('SELECT col FROM test');
}, /Prepare a statement before using another database function with SQLDatabase.prepare/);
});
it(`should retrieve one line from Database.get`, async () => {
const result = await database.get(select);
assert(result.length, 1);
});
it(`should support both statement strings and corresponding numbers`, async () => {
await database.run(`INSERT INTO test (col, col2) VALUES(?, ?)`, ['a', 'b']);
await database.run(insert, ['a', 'b']);
});
it(`should support both inline and object params`, async () => {
const num = await database.prepare(`INSERT INTO test (col, col2) VALUES($col, $col2)`);
await database.run(insert, ['a', 'b']);
await database.run(num, {col: 'a', col1: 'b'});
});
});