diff --git a/src/grpc/plugin.ts b/src/grpc/plugin.ts index cc099c4..f5ba0d5 100644 --- a/src/grpc/plugin.ts +++ b/src/grpc/plugin.ts @@ -7,6 +7,7 @@ import { encodeTables } from '../schema/table.js'; export class MigrateTable extends pluginV3.cloudquery.plugin.v3.Sync.MessageMigrateTable {} export class SyncResponse extends pluginV3.cloudquery.plugin.v3.Sync.Response {} export class ReadResponse extends pluginV3.cloudquery.plugin.v3.Read.Response {} +export class WriteRequest extends pluginV3.cloudquery.plugin.v3.Write.Request {} export class WriteResponse extends pluginV3.cloudquery.plugin.v3.Write.Response {} export type SyncStream = grpc.ServerWritableStream< diff --git a/src/memdb/memdb.ts b/src/memdb/memdb.ts index 8d4d2b5..8bc3326 100644 --- a/src/memdb/memdb.ts +++ b/src/memdb/memdb.ts @@ -1,3 +1,7 @@ +import { StructRowProxy } from '@apache-arrow/esnext-esm'; +import { pluginV3 } from '@cloudquery/plugin-pb-javascript'; + +import { WriteRequest, WriteStream } from '../grpc/plugin.js'; import { Plugin, newUnimplementedDestination, @@ -7,20 +11,87 @@ import { NewClientOptions, } from '../plugin/plugin.js'; import { sync } from '../scheduler/scheduler.js'; -import { Table, createTable, filterTables } from '../schema/table.js'; +import { Table, createTable, filterTables, decodeTable, decodeRecord, getPrimaryKeys } from '../schema/table.js'; export const createMemDBClient = () => { - return { id: () => 'memdb' }; + //eslint-disable-next-line @typescript-eslint/no-explicit-any + const memoryDB: Record = {}; + const tables: Record = {}; + return { + id: () => 'memdb', + memoryDB, + tables, + }; }; export const newMemDBPlugin = (): Plugin => { const memdbClient = createMemDBClient(); + const memoryDB = memdbClient.memoryDB; + const tables = memdbClient.tables; const allTables: Table[] = [ createTable({ name: 'table1', title: 'Table 1', description: 'Table 1 description' }), createTable({ name: 'table2', title: 'Table 2', description: 'Table 2 description' }), ]; + const memdb: { inserts: unknown[]; [key: string]: unknown } = { + inserts: [], + ...memoryDB, + }; + + //eslint-disable-next-line @typescript-eslint/no-explicit-any + const overwrite = (table: Table, primaryKeys: string[], record: StructRowProxy) => { + const tableData = memoryDB[table.name] || []; + + if (primaryKeys.length === 0) { + // If there are no primary keys, simply append the data + tableData.push(record); + memoryDB[table.name] = tableData; + return; + } + + // Otherwise, perform an upsert based on the primary keys + const recordIndex = tableData.findIndex((existingRecord) => { + return primaryKeys.every((key) => existingRecord[key] === record[key]); + }); + + if (recordIndex > -1) { + // If record exists, update (overwrite) it + tableData[recordIndex] = record; + } else { + // If record doesn't exist, insert it + tableData.push(record); + } + + memoryDB[table.name] = tableData; // Update the memoryDB with the modified table data + }; + + const deleteStale = (message: pluginV3.cloudquery.plugin.v3.Write.MessageDeleteStale): void => { + const tableName = message.table_name; + + // Filter the table based on the provided criteria + const filteredTable = memoryDB[tableName].filter((row) => { + const sc = row.Schema(); + + const sourceColIndex = sc.FieldIndices('source_name_column'); + const syncColIndex = sc.FieldIndices('sync_time_column'); + + // Ensure both columns are present + if (sourceColIndex === undefined || syncColIndex === undefined) { + return true; // Keep the record if either column is missing + } + + const rowSourceName = row.Column(sourceColIndex).Value(0); + const rowSyncTime = row.Column(syncColIndex).Value(0); // Assuming it returns a Date object + + // If source names match and the record's sync time is not before the given sync time, keep the record + return rowSourceName === message.source_name && !rowSyncTime.before(message.sync_time); + }); + + // Update the memory database with the filtered table + memoryDB[tableName] = filteredTable; + }; + const pluginClient = { ...newUnimplementedDestination(), init: (spec: string, options: NewClientOptions) => Promise.resolve(), @@ -35,6 +106,56 @@ export const newMemDBPlugin = (): Plugin => { const filtered = filterTables(allTables, tables, skipTables, skipDependentTables); return await sync(memdbClient, filtered, stream, { deterministicCQId }); }, + write(stream: WriteStream): Promise { + return new Promise((resolve, reject) => { + stream.on('data', (request: WriteRequest) => { + switch (request.message) { + case 'migrate_table': { + // Update table schema in the `tables` map + const table = decodeTable(request.migrate_table.table); + tables[table.name] = table; + break; + } + + case 'insert': { + const [tableName, batches] = decodeRecord(request.insert.record); + + if (!memoryDB[tableName]) { + memoryDB[tableName] = []; + } + + const tableSchema = tables[tableName]; + const pks = getPrimaryKeys(tableSchema); + + for (const batch of batches) { + //eslint-disable-next-line unicorn/no-array-for-each + for (const record of batch) { + overwrite(tableSchema, pks, record); + } + } + break; + } + + case 'delete': { + deleteStale(request.delete); + break; + } + + default: { + throw new Error(`Unknown request message type: ${request.message}`); + } + } + }); + + stream.on('finish', () => { + resolve(); + }); + + stream.on('error', (error) => { + reject(error); + }); + }); + }, }; return newPlugin('memdb', '0.0.1', () => Promise.resolve(pluginClient)); diff --git a/src/schema/table.ts b/src/schema/table.ts index d8e2da4..1b95bb8 100644 --- a/src/schema/table.ts +++ b/src/schema/table.ts @@ -1,10 +1,10 @@ import { Writable } from 'node:stream'; -import { Table as ArrowTable, tableToIPC, Schema } from '@apache-arrow/esnext-esm'; +import { Table as ArrowTable, tableFromIPC, tableToIPC, Schema, RecordBatch } from '@apache-arrow/esnext-esm'; import { isMatch } from 'matcher'; import * as arrow from './arrow.js'; -import { Column, toArrowField } from './column.js'; +import { Column, fromArrowField, toArrowField } from './column.js'; import { ClientMeta } from './meta.js'; import { Resource } from './resource.js'; import { Nullable } from './types.js'; @@ -80,6 +80,10 @@ export const getTableByName = (tables: Table[], name: string): Table | undefined } }; +export const getPrimaryKeys = (table: Table): string[] => { + return table.columns.filter((column) => column.primaryKey).map((column) => column.name); +}; + export const flattenTables = (tables: Table[]): Table[] => { return tables.flatMap((table) => [table, ...flattenTables(table.relations.map((c) => ({ ...c, parent: table })))]); }; @@ -126,7 +130,7 @@ export const filterTables = ( return withSkipDependant; }; -export const toArrowSchema = (table: Table) => { +export const toArrowSchema = (table: Table): Schema => { const metadata = new Map(); metadata.set(arrow.METADATA_TABLE_NAME, table.name); metadata.set(arrow.METADATA_TABLE_DESCRIPTION, table.description); @@ -142,13 +146,35 @@ export const toArrowSchema = (table: Table) => { return new Schema(fields, metadata); }; +export const fromArrowSchema = (schema: Schema): Table => { + return createTable({ + name: schema.metadata.get(arrow.METADATA_TABLE_NAME) || '', + title: schema.metadata.get(arrow.METADATA_TABLE_TITLE) || '', + description: schema.metadata.get(arrow.METADATA_TABLE_DESCRIPTION) || '', + pkConstraintName: schema.metadata.get(arrow.METADATA_CONSTRAINT_NAME) || '', + isIncremental: schema.metadata.get(arrow.METADATA_INCREMENTAL) === arrow.METADATA_TRUE, + // dependencies: schema.metadata.get(arrow.METADATA_TABLE_DEPENDS_ON) || '', + columns: schema.fields.map((f) => fromArrowField(f)), + }); +}; + export const encodeTable = (table: Table): Uint8Array => { const schema = toArrowSchema(table); const arrowTable = new ArrowTable(schema); - const bytes = tableToIPC(arrowTable); - return bytes; + return tableToIPC(arrowTable); }; export const encodeTables = (tables: Table[]): Uint8Array[] => { return tables.map((table) => encodeTable(table)); }; + +export const decodeTable = (bytes: Uint8Array): Table => { + const arrowTable = tableFromIPC(bytes); + return fromArrowSchema(arrowTable.schema); +}; + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export const decodeRecord = (bytes: Uint8Array): [string, RecordBatch[]] => { + const arrowTable = tableFromIPC(bytes); + return [(arrowTable.schema.metadata.get(arrow.METADATA_TABLE_NAME) || '')!, arrowTable.batches]; +};