diff --git a/lib/result/ArrowResultConverter.ts b/lib/result/ArrowResultConverter.ts index 57fa02af..31e4c5af 100644 --- a/lib/result/ArrowResultConverter.ts +++ b/lib/result/ArrowResultConverter.ts @@ -13,7 +13,7 @@ import { RecordBatchReader, util as arrowUtils, } from 'apache-arrow'; -import { TGetResultSetMetadataResp, TColumnDesc } from '../../thrift/TCLIService_types'; +import { TTableSchema, TColumnDesc } from '../../thrift/TCLIService_types'; import IClientContext from '../contracts/IClientContext'; import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider'; import { ArrowBatch, getSchemaColumns, convertThriftValue } from './utils'; @@ -42,7 +42,12 @@ export default class ArrowResultConverter implements IResultsProvider // actually return a non-empty result private prefetchedRecordBatch?: RecordBatch; - constructor(context: IClientContext, source: IResultsProvider, { schema }: TGetResultSetMetadataResp) { + // Only the column `schema` is consumed here. Typed as the minimal shape + // (not the full Thrift `TGetResultSetMetadataResp`) so both the Thrift + // operation backend and the SEA backend's neutral `ResultMetadata` — + // which both carry `schema?: TTableSchema` — can construct the converter + // without an adapter at the call site. + constructor(context: IClientContext, source: IResultsProvider, { schema }: { schema?: TTableSchema }) { this.context = context; this.source = source; this.schema = getSchemaColumns(schema); diff --git a/lib/sea/SeaArrowIpc.ts b/lib/sea/SeaArrowIpc.ts new file mode 100644 index 00000000..c111b6bd --- /dev/null +++ b/lib/sea/SeaArrowIpc.ts @@ -0,0 +1,257 @@ +// Copyright (c) 2026 Databricks, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import { RecordBatchReader, Schema, Field, DataType, TypeMap } from 'apache-arrow'; +import { TTableSchema, TTypeId, TPrimitiveTypeEntry } from '../../thrift/TCLIService_types'; +import { rewriteDurationToInt64, DURATION_UNIT_METADATA_KEY } from './SeaArrowIpcDurationFix'; + +/** + * Field metadata key used by the kernel to attach the original Databricks + * SQL type name to each Arrow field. See `databricks-sql-kernel/src/reader/mod.rs`. + */ +const DATABRICKS_TYPE_NAME = 'databricks.type_name'; + +/** + * Decode an Arrow IPC stream payload (schema header + zero-or-more + * record-batch messages) into its row count. + * + * Returns `{ schema, rowCount }`. The schema is left intact as the + * apache-arrow Schema object so callers can reuse it; the rowCount is + * the sum of `RecordBatch.numRows` across every record-batch message + * in the stream. + * + * Why we parse upfront: `ArrowResultConverter` consumes `ArrowBatch` + * objects which carry an explicit `rowCount`. The kernel's IPC payload + * does not carry a separate count — only per-RecordBatch numRows. We + * walk the messages once to sum them so the converter sees the same + * shape as the thrift path (`ArrowResultHandler.fetchNext` at + * `lib/result/ArrowResultHandler.ts:55`). + * + * Re-parsing inside the converter is unavoidable because `RecordBatch` + * instances created here cannot be passed across the converter's + * `Buffer[]` boundary without rewriting the converter. Callers that already + * patched the IPC bytes can set `alreadyPatched` to avoid running the + * FlatBuffer rewrite twice on the hot fetch path. + */ +export function decodeIpcBatch( + ipcBytes: Buffer, + options: { alreadyPatched?: boolean } = {}, +): { schema: Schema; rowCount: number } { + const patched = options.alreadyPatched ? ipcBytes : rewriteDurationToInt64(ipcBytes); + const reader = RecordBatchReader.from(patched); + // Eagerly open so `schema` is populated. + reader.open(); + const { schema } = reader; + + let rowCount = 0; + // Iterate all record batches in the stream and sum row counts. + for (const batch of reader) { + rowCount += batch.numRows; + } + return { schema, rowCount }; +} + +/** + * Decode an Arrow IPC schema payload (no record batches) into the + * apache-arrow Schema object. + */ +export function decodeIpcSchema(ipcBytes: Buffer): Schema { + const patched = rewriteDurationToInt64(ipcBytes); + const reader = RecordBatchReader.from(patched); + reader.open(); + return reader.schema; +} + +/** + * Pre-process raw IPC bytes from the kernel so they're consumable by + * `apache-arrow@13`. The current transformation is `Duration → Int64` + * with the original duration unit preserved in field metadata (see + * `SeaArrowIpcDurationFix.ts`). Returned bytes are byte-identical to + * the input when no transformation is needed. + * + * Exposed so callers can pre-patch the buffer **once** and pass the + * result through both `decodeIpcBatch` (for row-count extraction in + * `SeaResultsProvider`) and `ArrowResultConverter.fetchNext` (which + * re-decodes the same bytes via `RecordBatchReader.from`). Without + * this, the converter would re-throw on `Duration` because it never + * sees the patched bytes. + */ +export function patchIpcBytes(ipcBytes: Buffer): Buffer { + return rewriteDurationToInt64(ipcBytes); +} + +/** + * Map an Arrow `DataType` (with optional `databricks.type_name` + * metadata) onto the closest Thrift `TTypeId`. + * + * This is the synthesis step that lets the existing + * `ArrowResultConverter` Phase-2 dispatch (`convertThriftValue` in + * `lib/result/utils.ts:61-98`) keep working unchanged for the SEA + * path. Phase-2 keys exclusively off `TPrimitiveTypeEntry.type` per + * column, so we synthesize a `TColumnDesc` whose `TTypeId` matches the + * server-emitted Arrow type as closely as possible. + * + * Resolution order: + * 1. The kernel attaches `databricks.type_name` (e.g. "DECIMAL", + * "INTERVAL", "STRUCT") to each field's metadata. Prefer that when + * present — it carries the original SQL semantic that the Arrow + * type alone can lose (e.g. INTERVAL → Utf8 with metadata). + * 2. Fall back to the Arrow `DataType.typeId` for primitive types. + * + * This matches the JDBC and Python drivers' policy of trusting the + * server's logical type assignment over the wire-level Arrow encoding. + */ +function arrowTypeToTTypeId(field: Field): TTypeId { + const typeName = field.metadata.get(DATABRICKS_TYPE_NAME)?.toUpperCase(); + + switch (typeName) { + case 'BOOLEAN': + return TTypeId.BOOLEAN_TYPE; + case 'TINYINT': + case 'BYTE': + return TTypeId.TINYINT_TYPE; + case 'SMALLINT': + case 'SHORT': + return TTypeId.SMALLINT_TYPE; + case 'INT': + case 'INTEGER': + return TTypeId.INT_TYPE; + case 'BIGINT': + case 'LONG': + return TTypeId.BIGINT_TYPE; + case 'FLOAT': + case 'REAL': + return TTypeId.FLOAT_TYPE; + case 'DOUBLE': + return TTypeId.DOUBLE_TYPE; + case 'STRING': + return TTypeId.STRING_TYPE; + case 'VARCHAR': + return TTypeId.VARCHAR_TYPE; + case 'CHAR': + return TTypeId.CHAR_TYPE; + case 'BINARY': + return TTypeId.BINARY_TYPE; + case 'DATE': + return TTypeId.DATE_TYPE; + case 'TIMESTAMP': + case 'TIMESTAMP_NTZ': + return TTypeId.TIMESTAMP_TYPE; + case 'DECIMAL': + return TTypeId.DECIMAL_TYPE; + case 'INTERVAL': + case 'INTERVAL DAY': + case 'INTERVAL DAY TO HOUR': + case 'INTERVAL DAY TO MINUTE': + case 'INTERVAL DAY TO SECOND': + case 'INTERVAL HOUR': + case 'INTERVAL HOUR TO MINUTE': + case 'INTERVAL HOUR TO SECOND': + case 'INTERVAL MINUTE': + case 'INTERVAL MINUTE TO SECOND': + case 'INTERVAL SECOND': + return TTypeId.INTERVAL_DAY_TIME_TYPE; + case 'INTERVAL YEAR': + case 'INTERVAL YEAR TO MONTH': + case 'INTERVAL MONTH': + return TTypeId.INTERVAL_YEAR_MONTH_TYPE; + case 'ARRAY': + return TTypeId.ARRAY_TYPE; + case 'MAP': + return TTypeId.MAP_TYPE; + case 'STRUCT': + return TTypeId.STRUCT_TYPE; + case 'NULL': + case 'VOID': + return TTypeId.NULL_TYPE; + default: + break; + } + + // Fall back to Arrow's own type id when no databricks metadata is set + // (e.g. unit tests constructing batches without metadata). + const arrowType = field.type; + if (DataType.isBool(arrowType)) return TTypeId.BOOLEAN_TYPE; + if (DataType.isInt(arrowType)) { + // Duration columns are rewritten to Int64 with a + // `databricks.arrow.duration_unit` metadata marker (see + // `SeaArrowIpcDurationFix.ts`). Surface them as INTERVAL_DAY_TIME + // so the converter formats them back into the thrift string form. + if (arrowType.bitWidth === 64 && field.metadata.has(DURATION_UNIT_METADATA_KEY)) { + return TTypeId.INTERVAL_DAY_TIME_TYPE; + } + switch (arrowType.bitWidth) { + case 8: + return TTypeId.TINYINT_TYPE; + case 16: + return TTypeId.SMALLINT_TYPE; + case 32: + return TTypeId.INT_TYPE; + case 64: + return TTypeId.BIGINT_TYPE; + default: + return TTypeId.BIGINT_TYPE; + } + } + if (DataType.isFloat(arrowType)) { + // arrow Float precision: 16=HALF, 32=SINGLE, 64=DOUBLE + return arrowType.precision === 2 ? TTypeId.DOUBLE_TYPE : TTypeId.FLOAT_TYPE; + } + if (DataType.isDecimal(arrowType)) return TTypeId.DECIMAL_TYPE; + if (DataType.isUtf8(arrowType)) return TTypeId.STRING_TYPE; + if (DataType.isBinary(arrowType)) return TTypeId.BINARY_TYPE; + if (DataType.isDate(arrowType)) return TTypeId.DATE_TYPE; + if (DataType.isTimestamp(arrowType)) return TTypeId.TIMESTAMP_TYPE; + // Native Arrow Interval types. The server-side INTERVAL YEAR-MONTH + // (and the legacy IntervalDayTime variant) come through with type + // id 11 / -25 / -26 — apache-arrow@13 surfaces them as `Int32Array` + // pairs which the converter formats to thrift's `"Y-M"` / day-time + // strings. + if (DataType.isInterval(arrowType)) { + // unit 0 = YEAR_MONTH, unit 1 = DAY_TIME, unit 2 = MONTH_DAY_NANO + return arrowType.unit === 0 ? TTypeId.INTERVAL_YEAR_MONTH_TYPE : TTypeId.INTERVAL_DAY_TIME_TYPE; + } + if (DataType.isList(arrowType)) return TTypeId.ARRAY_TYPE; + if (DataType.isMap(arrowType)) return TTypeId.MAP_TYPE; + if (DataType.isStruct(arrowType)) return TTypeId.STRUCT_TYPE; + if (DataType.isNull(arrowType)) return TTypeId.NULL_TYPE; + + return TTypeId.STRING_TYPE; +} + +/** + * Synthesize a Thrift `TTableSchema` from an Arrow schema decoded out + * of the kernel's IPC stream. Used by `SeaOperationBackend.getResultMetadata` + * to drive `ArrowResultConverter.convertThriftTypes` (Phase 2) without + * changing that code. + */ +export function arrowSchemaToThriftSchema(arrowSchema: Schema): TTableSchema { + const columns = arrowSchema.fields.map((field, index) => { + const primitiveEntry: TPrimitiveTypeEntry = { + type: arrowTypeToTTypeId(field), + }; + return { + columnName: field.name, + typeDesc: { + types: [ + { + primitiveEntry, + }, + ], + }, + position: index + 1, + }; + }); + return { columns }; +} diff --git a/lib/sea/SeaArrowIpcDurationFix.ts b/lib/sea/SeaArrowIpcDurationFix.ts new file mode 100644 index 00000000..d013ac78 --- /dev/null +++ b/lib/sea/SeaArrowIpcDurationFix.ts @@ -0,0 +1,602 @@ +// Copyright (c) 2026 Databricks, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * Pre-process an Arrow IPC stream payload to make it consumable by + * `apache-arrow@13`, which predates the addition of the `Duration` type + * (FlatBuffer `Type` enum id 18) in version 14. + * + * The Databricks SQL server emits INTERVAL DAY-TIME columns as Arrow + * `Duration(MICROSECOND)` in the SEA IPC stream. apache-arrow@13's + * `decodeFieldType` (`node_modules/apache-arrow/ipc/metadata/message.js:339-397`) + * throws `Unrecognized type: "Duration" (18)` on the schema FlatBuffer + * before any record batch is read, breaking the entire SEA path for any + * result that contains an INTERVAL DAY-TIME column. + * + * Because the physical layout of an Arrow `Duration` column is + * **identical** to an Arrow `Int64` column (8 bytes of signed integer per + * row in the values buffer, plus the validity bitmap), we can losslessly + * rewrite the schema FlatBuffer to advertise `Int(bitWidth=64, + * signed=true)` in place of `Duration(unit)`. The record-batch body + * bytes pass through unchanged. We embed the original `Duration` time + * unit (`SECOND`/`MILLISECOND`/`MICROSECOND`/`NANOSECOND`) into the + * rewritten field's `custom_metadata` under the key + * `databricks.arrow.duration_unit` so the JS converter can format the + * Int64 value back into a thrift-equivalent string (e.g. + * `"1 02:03:04.000000000"`). + * + * Why this lives in its own file: the rewriter is the only place in the + * codebase that needs to construct FlatBuffers by hand using the + * `flatbuffers` library; isolating it keeps `SeaArrowIpc.ts` focused on + * the high-level Arrow-decoded views. + * + * @see lib/result/ArrowResultConverter.ts — Phase-1 INTERVAL formatting + * reads the metadata key written here. + * @see findings/parity-mismatch/round5-implementation-2026-05-15.md — + * original failure mode (`Unrecognized type: "Duration" (18)`). + */ + +import * as flatbuffers from 'flatbuffers'; +// We reach into apache-arrow's internal FlatBuffer accessor modules +// rather than the high-level Schema/Field classes because the latter +// throw on the `Duration` type id 18 (`apache-arrow@13` predates the +// `Duration` enum entry). The internal `fb/*` modules are generated +// FlatBuffer code and recognize every type id present in the +// FlatBuffer schema, including `Duration`, so we can decode the +// original schema and rebuild it with `Duration` rewritten to `Int64`. +// eslint-disable-next-line import/no-internal-modules +import { Message } from 'apache-arrow/fb/message'; +// eslint-disable-next-line import/no-internal-modules +import { MessageHeader } from 'apache-arrow/fb/message-header'; +// eslint-disable-next-line import/no-internal-modules +import { Schema as FbSchema } from 'apache-arrow/fb/schema'; +// eslint-disable-next-line import/no-internal-modules +import { Field as FbField } from 'apache-arrow/fb/field'; +// eslint-disable-next-line import/no-internal-modules +import { KeyValue as FbKeyValue } from 'apache-arrow/fb/key-value'; +// eslint-disable-next-line import/no-internal-modules +import { Type as FbType } from 'apache-arrow/fb/type'; +// eslint-disable-next-line import/no-internal-modules +import { Duration as FbDuration } from 'apache-arrow/fb/duration'; +// eslint-disable-next-line import/no-internal-modules +import { Int as FbInt } from 'apache-arrow/fb/int'; +// eslint-disable-next-line import/no-internal-modules +import { TimeUnit as FbTimeUnit } from 'apache-arrow/fb/time-unit'; + +/** + * Metadata key written onto rewritten fields to preserve the original + * `Duration` time unit. Consumed by + * `lib/result/ArrowResultConverter.ts` Phase 1 to choose the correct + * scale when formatting INTERVAL DAY-TIME values. + */ +export const DURATION_UNIT_METADATA_KEY = 'databricks.arrow.duration_unit'; + +const IPC_CONTINUATION_MARKER = 0xffffffff; + +const TIME_UNIT_NAME: Record = { + [FbTimeUnit.SECOND]: 'SECOND', + [FbTimeUnit.MILLISECOND]: 'MILLISECOND', + [FbTimeUnit.MICROSECOND]: 'MICROSECOND', + [FbTimeUnit.NANOSECOND]: 'NANOSECOND', +}; + +/** + * Walk an IPC stream payload and rewrite any `Duration` field in the + * schema message to `Int64` (preserving the original time unit in + * custom metadata). Subsequent record-batch messages are forwarded + * verbatim — the data layout matches the rewritten `Int64` type + * bit-for-bit. + * + * If the schema contains no `Duration` fields, the input buffer is + * returned unchanged (zero-copy fast path). + * + * The caller is expected to pass a complete IPC stream payload (the + * full byte buffer the kernel returned for one `fetchNextBatch` call, + * or the schema-only payload from `statement.schema()`). Multi-segment + * stream payloads are supported; we walk through each message until + * the buffer is exhausted. + * + * @param ipcBytes raw IPC stream bytes from the napi binding + * @returns either the original buffer (no rewrite needed) or a fresh + * buffer with the schema message replaced + */ +export function rewriteDurationToInt64(ipcBytes: Buffer | Uint8Array): Buffer { + const view = ipcBytes instanceof Buffer ? ipcBytes : Buffer.from(ipcBytes); + + // First message must be the schema. If we can't find a schema message + // we leave the bytes alone — better to surface apache-arrow's normal + // error path than to mask a malformed stream. + const first = readMessageAt(view, 0); + if (!first) { + return view; + } + + if (first.message.headerType() !== MessageHeader.Schema) { + return view; + } + + const rewrittenSchema = maybeRewriteSchemaMessage(first.messageBytes); + if (!rewrittenSchema) { + // No Duration fields; nothing to do. + return view; + } + + // Splice the rewritten schema back into the stream: continuation + // marker + new metadata length + new metadata bytes + everything after + // the original schema message (body of schema is empty per Arrow spec; + // record batches follow). + const outputs: Buffer[] = []; + outputs.push(encodeContinuationAndLength(rewrittenSchema.byteLength)); + outputs.push(rewrittenSchema); + // Schema messages have no body (bodyLength=0 always — Arrow spec). + // Forward everything after the schema's metadata bytes unchanged. + const tailStart = first.totalEnd; + if (tailStart < view.byteLength) { + outputs.push(view.subarray(tailStart)); + } + + return Buffer.concat(outputs); +} + +/** + * Read one IPC message at the given offset. Returns the parsed Message + * object and byte ranges, or `null` if the buffer is exhausted. + * + * IPC stream message format (post-0.15): + * [continuation: 0xFFFFFFFF (4 bytes LE)] [length: int32 LE] + * [metadata: flatbuffer Message of `length` bytes] [body: bodyLength bytes] + * + * Pre-0.15 streams omit the continuation marker — the first 4 bytes are + * the metadata length directly. apache-arrow handles both + * (`message.js:44-50`); we mirror that here. + */ +function readMessageAt( + view: Buffer, + start: number, +): { + message: Message; + messageBytes: Buffer; + metadataStart: number; + metadataEnd: number; + bodyEnd: number; + totalEnd: number; +} | null { + if (start + 4 > view.byteLength) { + return null; + } + let cursor = start; + let metadataLength = view.readInt32LE(cursor); + cursor += 4; + + // Continuation marker (0xFFFFFFFF reads as -1 as int32) — followed by + // the actual length. + if (metadataLength === -1) { + if (cursor + 4 > view.byteLength) { + return null; + } + metadataLength = view.readInt32LE(cursor); + cursor += 4; + } + + if (metadataLength === 0) { + return null; + } + + const metadataStart = cursor; + const metadataEnd = cursor + metadataLength; + if (metadataEnd > view.byteLength) { + return null; + } + + const metadataBytes = view.subarray(metadataStart, metadataEnd); + const bb = new flatbuffers.ByteBuffer(metadataBytes); + const message = Message.getRootAsMessage(bb); + + const bodyLength = Number(message.bodyLength()); + const bodyStart = metadataEnd; + const bodyEnd = bodyStart + bodyLength; + if (bodyEnd > view.byteLength) { + // Malformed; let apache-arrow surface the error downstream. + return null; + } + + return { + message, + messageBytes: metadataBytes, + metadataStart, + metadataEnd, + bodyEnd, + totalEnd: bodyEnd, + }; +} + +/** + * If the schema message contains any `Duration` fields, returns a fresh + * FlatBuffer-encoded Message containing the rewritten schema. Otherwise + * returns `null` so the caller can short-circuit. + */ +function maybeRewriteSchemaMessage(schemaMessageBytes: Buffer): Buffer | null { + const bb = new flatbuffers.ByteBuffer(schemaMessageBytes); + const message = Message.getRootAsMessage(bb); + const fbSchema = message.header(new FbSchema()) as FbSchema | null; + if (!fbSchema) { + return null; + } + + // Scan top-level fields and children for Duration. We rewrite only + // top-level Duration fields for M0 (Spark INTERVAL DAY-TIME surfaces + // as a top-level column — children of Struct/List/Map are out of + // scope until we see a real-world payload with nested Duration). + let hasDuration = false; + const fieldsLength = fbSchema.fieldsLength(); + for (let i = 0; i < fieldsLength; i += 1) { + const f = fbSchema.fields(i); + if (f && f.typeType() === FbType.Duration) { + hasDuration = true; + break; + } + } + if (!hasDuration) { + return null; + } + + // Re-encode the whole schema. This is more verbose than an in-place + // FlatBuffer patch, but it avoids relying on vtable layout details. + return rebuildSchemaWithDurationRewritten(message, fbSchema); +} + +/** + * Full re-encode path: parse every field in the schema, substitute + * `Duration` with `Int64` (carrying the unit in custom metadata), and + * emit a fresh Message FlatBuffer. This handles arbitrary schemas + * correctly at the cost of decode+re-encode of all fields. + * + * For non-Duration fields we copy the *bytes* of the original + * `type` sub-table verbatim into the new builder — FlatBuffer + * sub-tables are self-contained address spaces, so this is safe. + */ +function rebuildSchemaWithDurationRewritten(message: Message, fbSchema: FbSchema): Buffer { + const builder = new flatbuffers.Builder(1024); + + // Re-encode each field. + const fieldOffsets: number[] = []; + const fieldsLength = fbSchema.fieldsLength(); + for (let i = 0; i < fieldsLength; i += 1) { + const field = fbSchema.fields(i); + if (!field) { + continue; + } + fieldOffsets.push(reEncodeField(builder, field)); + } + + // Re-encode top-level schema custom_metadata verbatim. + const schemaMetadataOffsets: number[] = []; + const schemaMetadataLength = fbSchema.customMetadataLength(); + for (let i = 0; i < schemaMetadataLength; i += 1) { + const kv = fbSchema.customMetadata(i); + if (!kv) { + continue; + } + const keyStr = kv.key() ?? ''; + const valStr = kv.value() ?? ''; + const keyOff = builder.createString(keyStr); + const valOff = builder.createString(valStr); + FbKeyValue.startKeyValue(builder); + FbKeyValue.addKey(builder, keyOff); + FbKeyValue.addValue(builder, valOff); + schemaMetadataOffsets.push(FbKeyValue.endKeyValue(builder)); + } + + // Build the fields and metadata vectors, then the Schema, then the Message. + const fieldsVec = FbSchema.createFieldsVector(builder, fieldOffsets); + const metadataVec = + schemaMetadataOffsets.length > 0 ? FbSchema.createCustomMetadataVector(builder, schemaMetadataOffsets) : 0; + + // Preserve features vector — `features()` requires walking the + // bigint vector; for the kernel's payloads this is typically empty + // so we skip it. If a non-empty features vector appears, we drop it + // (Arrow features encode optional compression flags; the kernel + // emits uncompressed streams for the SEA path per + // `findings/rust-kernel/M0-kernel-async-readiness-2026-05-15.md`). + FbSchema.startSchema(builder); + FbSchema.addEndianness(builder, fbSchema.endianness()); + FbSchema.addFields(builder, fieldsVec); + if (metadataVec !== 0) { + FbSchema.addCustomMetadata(builder, metadataVec); + } + const schemaOffset = FbSchema.endSchema(builder); + + // Wrap in a Message. version + headerType + header + bodyLength + custom_metadata. + Message.startMessage(builder); + Message.addVersion(builder, message.version()); + Message.addHeaderType(builder, MessageHeader.Schema); + Message.addHeader(builder, schemaOffset); + Message.addBodyLength(builder, BigInt(0)); + const newMessage = Message.endMessage(builder); + builder.finish(newMessage); + + const bytes = builder.asUint8Array(); + + // The Arrow IPC spec requires each message to be 8-byte aligned so + // that subsequent record batches' body buffers stay aligned for SIMD + // reads. apache-arrow's MessageReader doesn't enforce this on read + // (it just trusts the metadata length), so any padding is fine. + // Round up the metadata bytes to a multiple of 8 by appending zero + // padding — this keeps the IPC stream spec-compliant. + const padded = padToAlignment(bytes, 8); + return Buffer.from(padded); +} + +/** + * Re-encode a single Field. For `Duration` fields, substitute `Int64` + * and add `databricks.arrow.duration_unit` metadata. For all other + * types we re-encode via the appropriate type-sub-table-aware path — + * but to keep this rewriter compact we just walk the FlatBuffer-level + * accessors needed for the M0 primitive types and complex types Arrow + * surfaces from the kernel. Unknown types fall back to copying the + * raw type sub-table bytes via FlatBuffer's serialization (which + * always works because sub-tables are self-contained). + */ +function reEncodeField(builder: flatbuffers.Builder, field: FbField): number { + const nameStr = field.name() ?? ''; + const nameOffset = builder.createString(nameStr); + + // Re-encode children recursively (Struct/List/Map all carry children). + const childOffsets: number[] = []; + const childrenLength = field.childrenLength(); + for (let i = 0; i < childrenLength; i += 1) { + const child = field.children(i); + if (child) { + childOffsets.push(reEncodeField(builder, child)); + } + } + const childrenVec = childOffsets.length > 0 ? FbField.createChildrenVector(builder, childOffsets) : 0; + + // Re-encode custom_metadata (preserving everything). For Duration + // fields we'll add our marker on top. + const metadataOffsets: number[] = []; + const metadataLength = field.customMetadataLength(); + for (let i = 0; i < metadataLength; i += 1) { + const kv = field.customMetadata(i); + if (!kv) { + continue; + } + const keyStr = kv.key() ?? ''; + const valStr = kv.value() ?? ''; + const keyOff = builder.createString(keyStr); + const valOff = builder.createString(valStr); + FbKeyValue.startKeyValue(builder); + FbKeyValue.addKey(builder, keyOff); + FbKeyValue.addValue(builder, valOff); + metadataOffsets.push(FbKeyValue.endKeyValue(builder)); + } + + const originalTypeType = field.typeType(); + let typeType = originalTypeType; + let typeOffset = 0; + + if (originalTypeType === FbType.Duration) { + // Read the original Duration unit. Substitute Int(64, signed) and + // append a custom_metadata entry recording the original unit. + const durationTable = field.type(new FbDuration()) as FbDuration | null; + const unit = durationTable ? durationTable.unit() : FbTimeUnit.MICROSECOND; + const unitName = TIME_UNIT_NAME[unit] ?? 'MICROSECOND'; + + const keyOff = builder.createString(DURATION_UNIT_METADATA_KEY); + const valOff = builder.createString(unitName); + FbKeyValue.startKeyValue(builder); + FbKeyValue.addKey(builder, keyOff); + FbKeyValue.addValue(builder, valOff); + metadataOffsets.push(FbKeyValue.endKeyValue(builder)); + + typeType = FbType.Int; + typeOffset = FbInt.createInt(builder, 64, true); + } else { + // Copy the original type sub-table by re-encoding it from the + // FlatBuffer-level accessor. Sub-tables are self-contained, but + // the builder API requires us to write each known type with its + // generated `createXxx`. For M0, the kernel emits a fixed set of + // top-level types (matching the SQL datatype table in + // `findings/rust-kernel/datatype-emission-and-block-on-2026-05-15.md`). + // We re-encode each known type sub-table; unsupported types fall + // through to a generic offset-only copy (zero-byte type sub-table), + // which apache-arrow's `decodeFieldType` accepts for the + // children-only types (List, Struct, Null). + typeOffset = reEncodeTypeSubtable(builder, field, originalTypeType); + } + + const metadataVec = metadataOffsets.length > 0 ? FbField.createCustomMetadataVector(builder, metadataOffsets) : 0; + + FbField.startField(builder); + FbField.addName(builder, nameOffset); + FbField.addNullable(builder, field.nullable()); + FbField.addTypeType(builder, typeType); + if (typeOffset !== 0) { + FbField.addType(builder, typeOffset); + } + if (childrenVec !== 0) { + FbField.addChildren(builder, childrenVec); + } + if (metadataVec !== 0) { + FbField.addCustomMetadata(builder, metadataVec); + } + // Note: dictionary encoding is not re-emitted. The kernel doesn't + // emit dictionary-encoded columns for M0; if it ever does, this + // rewriter would need to copy the DictionaryEncoding sub-table too. + return FbField.endField(builder); +} + +/** + * Re-encode a Field's type sub-table by reading it from the original + * FlatBuffer (via the apache-arrow generated accessors) and writing it + * into the new builder. Supports the full M0 type matrix: + * primitives: Null, Int (all widths), FloatingPoint (Float16/32/64), + * Bool, Utf8, Binary, Decimal, Date, Time, Timestamp, Interval + * complex: List (header only), Struct (header only), Map, FixedSizeList, + * FixedSizeBinary, Union + * Children-only types (Struct, List, Null) emit an empty sub-table. + */ +function reEncodeTypeSubtable(builder: flatbuffers.Builder, field: FbField, typeType: number): number { + // Lazy imports to avoid cyclic resolution and to keep this file's + // top-of-module imports tight. These are zero-cost — Node caches + // them after the first require. + /* eslint-disable @typescript-eslint/no-var-requires, global-require, import/no-internal-modules */ + const { Null } = require('apache-arrow/fb/null'); + const { FloatingPoint } = require('apache-arrow/fb/floating-point'); + const { Binary } = require('apache-arrow/fb/binary'); + const { Utf8 } = require('apache-arrow/fb/utf8'); + const { Bool } = require('apache-arrow/fb/bool'); + const { Decimal } = require('apache-arrow/fb/decimal'); + const { Date: DateTbl } = require('apache-arrow/fb/date'); + const { Time } = require('apache-arrow/fb/time'); + const { Timestamp } = require('apache-arrow/fb/timestamp'); + const { Interval } = require('apache-arrow/fb/interval'); + const { List } = require('apache-arrow/fb/list'); + // eslint-disable-next-line @typescript-eslint/naming-convention -- `Struct_` is apache-arrow's generated FlatBuffers export name. + const { Struct_ } = require('apache-arrow/fb/struct-'); + const { Union } = require('apache-arrow/fb/union'); + const { FixedSizeBinary } = require('apache-arrow/fb/fixed-size-binary'); + const { FixedSizeList } = require('apache-arrow/fb/fixed-size-list'); + const { Map: MapTbl } = require('apache-arrow/fb/map'); + /* eslint-enable @typescript-eslint/no-var-requires, global-require, import/no-internal-modules */ + + switch (typeType) { + case FbType.NONE: + case FbType.Null: { + // Null has no fields; emit an empty table. + const t = new Null(); + field.type(t); + Null.startNull(builder); + return Null.endNull(builder); + } + case FbType.Int: { + const t = field.type(new FbInt()) as InstanceType | null; + if (!t) { + return FbInt.createInt(builder, 32, true); + } + return FbInt.createInt(builder, t.bitWidth(), t.isSigned()); + } + case FbType.FloatingPoint: { + const t = field.type(new FloatingPoint()); + return FloatingPoint.createFloatingPoint(builder, t.precision()); + } + case FbType.Binary: { + Binary.startBinary(builder); + return Binary.endBinary(builder); + } + case FbType.Utf8: { + Utf8.startUtf8(builder); + return Utf8.endUtf8(builder); + } + case FbType.Bool: { + Bool.startBool(builder); + return Bool.endBool(builder); + } + case FbType.Decimal: { + const t = field.type(new Decimal()); + return Decimal.createDecimal(builder, t.precision(), t.scale(), t.bitWidth()); + } + case FbType.Date: { + const t = field.type(new DateTbl()); + return DateTbl.createDate(builder, t.unit()); + } + case FbType.Time: { + const t = field.type(new Time()); + return Time.createTime(builder, t.unit(), t.bitWidth()); + } + case FbType.Timestamp: { + const t = field.type(new Timestamp()); + const tz: string | null = t.timezone(); + const tzOffset = tz ? builder.createString(tz) : 0; + Timestamp.startTimestamp(builder); + Timestamp.addUnit(builder, t.unit()); + if (tzOffset !== 0) { + Timestamp.addTimezone(builder, tzOffset); + } + return Timestamp.endTimestamp(builder); + } + case FbType.Interval: { + const t = field.type(new Interval()); + return Interval.createInterval(builder, t.unit()); + } + case FbType.List: { + List.startList(builder); + return List.endList(builder); + } + case FbType.Struct_: { + Struct_.startStruct_(builder); + return Struct_.endStruct_(builder); + } + case FbType.Union: { + const t = field.type(new Union()); + // typeIds is an int32 vector — copy it. + const typeIdsArr = t.typeIdsArray(); + let typeIdsOffset = 0; + if (typeIdsArr) { + typeIdsOffset = Union.createTypeIdsVector(builder, Array.from(typeIdsArr)); + } + Union.startUnion(builder); + Union.addMode(builder, t.mode()); + if (typeIdsOffset !== 0) { + Union.addTypeIds(builder, typeIdsOffset); + } + return Union.endUnion(builder); + } + case FbType.FixedSizeBinary: { + const t = field.type(new FixedSizeBinary()); + return FixedSizeBinary.createFixedSizeBinary(builder, t.byteWidth()); + } + case FbType.FixedSizeList: { + const t = field.type(new FixedSizeList()); + return FixedSizeList.createFixedSizeList(builder, t.listSize()); + } + case FbType.Map: { + const t = field.type(new MapTbl()); + return MapTbl.createMap(builder, t.keysSorted()); + } + default: + // Unknown / newer types (LargeBinary, LargeUtf8, LargeList, + // RunEndEncoded, ...). The kernel doesn't emit these for M0; + // emit an empty sub-table and let apache-arrow's normal error + // path fire when it tries to decode an unrecognized type id. + return 0; + } +} + +/** + * Prefix the given FlatBuffer message bytes with the IPC stream + * framing: the continuation marker (0xFFFFFFFF) followed by the + * little-endian int32 metadata length. + */ +function encodeContinuationAndLength(metadataLength: number): Buffer { + const out = Buffer.alloc(8); + out.writeInt32LE(IPC_CONTINUATION_MARKER | 0, 0); // -1 + out.writeInt32LE(metadataLength, 4); + return out; +} + +/** + * Pad `bytes` with trailing zeros so its length is a multiple of + * `alignment`. Returns the original buffer when it is already + * aligned. + */ +function padToAlignment(bytes: Uint8Array, alignment: number): Uint8Array { + const remainder = bytes.byteLength % alignment; + if (remainder === 0) { + return bytes; + } + const padded = new Uint8Array(bytes.byteLength + (alignment - remainder)); + padded.set(bytes, 0); + return padded; +} diff --git a/lib/sea/SeaOperationBackend.ts b/lib/sea/SeaOperationBackend.ts new file mode 100644 index 00000000..e2087e00 --- /dev/null +++ b/lib/sea/SeaOperationBackend.ts @@ -0,0 +1,241 @@ +// Copyright (c) 2026 Databricks, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * `IOperationBackend` implementation for the SEA path. + * + * Combines: + * - **Fetch pipeline (from sea-results):** + * `napi.Statement.fetchNextBatch()` → `SeaResultsProvider` → + * `ArrowResultConverter` (Phase 1 + Phase 2; reused unchanged) → + * `ResultSlicer` (chunk-size normalisation; reused unchanged). The M0 + * row shape is byte-identical to the thrift path for every M0 + * datatype (parity gate exercised by `tests/integration/sea/results-e2e.test.ts`). + * + * - **Lifecycle (from sea-operation):** `cancel()` / `close()` / + * `finished()` (alias of `waitUntilReady`) delegate to the helpers + * in `SeaOperationLifecycle.ts`. The helpers handle idempotency, + * flag-set-before-await ordering (so cancel-mid-fetch propagates), + * logging via `IClientContext`, and kernel-error mapping. + * + * The lifecycle helpers route fetch-after-cancel / fetch-after-close + * through `failIfNotActive`, which throws an `OperationStateError` + * matching the Thrift `failIfClosed` semantics. We call it from + * `fetchChunk`/`hasMore`/`getResultMetadata` so the cancel-mid-fetch + * e2e (cancel < 200ms) drives against this backend cleanly. + */ + +import { v4 as uuidv4 } from 'uuid'; +import { TTableSchema } from '../../thrift/TCLIService_types'; +import IOperationBackend, { IOperationBackendWaitOptions } from '../contracts/IOperationBackend'; +import { OperationStatus, OperationState } from '../contracts/OperationStatus'; +import { ResultMetadata, ResultFormat } from '../contracts/ResultMetadata'; +import IClientContext from '../contracts/IClientContext'; +import Status from '../dto/Status'; +import ArrowResultConverter from '../result/ArrowResultConverter'; +import ResultSlicer from '../result/ResultSlicer'; +import SeaResultsProvider from './SeaResultsProvider'; +import { arrowSchemaToThriftSchema, decodeIpcSchema } from './SeaArrowIpc'; +import { SeaStatement } from './SeaNativeLoader'; +import { + SeaStatementHandle, + SeaOperationLifecycleState, + createLifecycleState, + seaCancel, + seaClose, + seaFinished, + failIfNotActive, +} from './SeaOperationLifecycle'; + +/** + * Structural union of the lifecycle surface (cancel/close) and the + * fetch surface (fetchNextBatch/schema). The real napi `Statement` + * implements both; lifecycle-only test stubs implement only the + * cancel/close half — fetch methods are accessed lazily and the + * lifecycle tests never reach that path. + */ +export type SeaOperationStatement = SeaStatementHandle & Partial; + +/** + * Constructor options for `SeaOperationBackend`. + */ +export interface SeaOperationBackendOptions { + /** The opaque napi `Statement` handle returned by `Connection.executeStatement(...)`. */ + statement: SeaOperationStatement; + context: IClientContext; + /** + * Optional override for `id`. When not provided a fresh UUIDv4 is + * generated upstream (in `SeaSessionBackend.executeStatement`); the + * kernel does not yet surface its internal statement-id at the napi + * boundary. Once it does, the JS layer can thread it through here. + */ + id?: string; +} + +export default class SeaOperationBackend implements IOperationBackend { + private readonly statement: SeaOperationStatement; + + private readonly context: IClientContext; + + private readonly _id: string; + + private readonly lifecycle: SeaOperationLifecycleState = createLifecycleState(); + + private resultSlicer?: ResultSlicer; + + private resultsProvider?: SeaResultsProvider; + + private metadata?: ResultMetadata; + + private metadataPromise?: Promise; + + constructor({ statement, context, id }: SeaOperationBackendOptions) { + this.statement = statement; + this.context = context; + this._id = id ?? uuidv4(); + } + + public get id(): string { + return this._id; + } + + public hasResultSet(): boolean { + // M0 only routes through SeaOperationBackend for executeStatement + // calls. DDL/DML without a result set is not exercised through SEA + // for M0; the napi Statement still produces a schema (empty) in + // that case, which the converter renders as zero rows. Reporting + // `true` keeps the facade's fetch path enabled for M0 parity. + return true; + } + + // --------------------------------------------------------------------------- + // Fetch / metadata (owned by the sea-results pipeline). + // --------------------------------------------------------------------------- + + public async fetchChunk({ + limit, + disableBuffering, + }: { + limit: number; + disableBuffering?: boolean; + }): Promise> { + // Cancel-mid-fetch propagation: if cancel() has flipped the + // lifecycle flag, fail locally without a wire round-trip. + failIfNotActive(this.lifecycle); + const slicer = await this.getResultSlicer(); + return slicer.fetchNext({ limit, disableBuffering }); + } + + public async hasMore(): Promise { + failIfNotActive(this.lifecycle); + const slicer = await this.getResultSlicer(); + return slicer.hasMore(); + } + + public async getResultMetadata(): Promise { + failIfNotActive(this.lifecycle); + if (this.metadata) { + return this.metadata; + } + if (this.metadataPromise) { + return this.metadataPromise; + } + this.metadataPromise = (async () => { + if (!this.statement.schema) { + throw new Error('SeaOperationBackend: statement.schema() is not available on this handle'); + } + const arrowSchemaIpc = await this.statement.schema(); + const arrowSchema = decodeIpcSchema(arrowSchemaIpc.ipcBytes); + // `ResultMetadata.schema` keeps the Thrift `TTableSchema` shape for + // back-compat with the public `IOperation.getSchema()` surface. + const thriftSchema: TTableSchema = arrowSchemaToThriftSchema(arrowSchema); + const meta: ResultMetadata = { + schema: thriftSchema, + // SEA inline + CloudFetch both surface to JS as Arrow batches; + // both flow through the same Arrow result converter. + resultFormat: ResultFormat.ArrowBased, + lz4Compressed: false, + // Carry the raw Arrow IPC schema bytes for ARROW_BASED consumers. + arrowSchema: arrowSchemaIpc.ipcBytes, + isStagingOperation: false, + }; + this.metadata = meta; + return meta; + })(); + try { + return await this.metadataPromise; + } finally { + this.metadataPromise = undefined; + } + } + + // --------------------------------------------------------------------------- + // Status / lifecycle (owned by the sea-operation lifecycle helpers). + // --------------------------------------------------------------------------- + + public async status(_progress: boolean): Promise { + // Synthesised — the kernel resolves `Statement::execute().await` before + // it hands back a Statement handle, so by the time a SeaOperationBackend + // exists the statement is terminal. Report Cancelled/Closed if the + // lifecycle flag is set, else Succeeded. Returns the backend-neutral + // OperationStatus the IOperationBackend contract expects, so the + // DBSQLOperation facade switches on `state` identically across backends. + if (this.lifecycle.isCancelled) { + return { state: OperationState.Cancelled, hasResultSet: true }; + } + if (this.lifecycle.isClosed) { + return { state: OperationState.Closed, hasResultSet: true }; + } + return { state: OperationState.Succeeded, hasResultSet: true }; + } + + public async waitUntilReady(options?: IOperationBackendWaitOptions): Promise { + // Kernel's `Statement::execute().await` has already resolved by the + // time we hold a Statement handle — there is no pending/running + // state to poll for M0. seaFinished fires the progress callback + // once with a synthesised FINISHED response so progress-UI callers + // see the same one-shot completion tick the Thrift path emits at + // the end of its polling loop. + return seaFinished(this.lifecycle, options); + } + + public async cancel(): Promise { + return seaCancel(this.lifecycle, this.statement, this.context, this._id); + } + + public async close(): Promise { + return seaClose(this.lifecycle, this.statement, this.context, this._id); + } + + // --------------------------------------------------------------------------- + // Internals. + // --------------------------------------------------------------------------- + + private async getResultSlicer(): Promise> { + if (this.resultSlicer) { + return this.resultSlicer; + } + if (!this.statement.fetchNextBatch) { + throw new Error('SeaOperationBackend: statement.fetchNextBatch() is not available on this handle'); + } + const metadata = await this.getResultMetadata(); + // The lifecycle subset has cancel/close only; fetch methods exist on + // the full napi Statement. Cast is safe here because we've just + // verified `fetchNextBatch` is callable. + this.resultsProvider = new SeaResultsProvider(this.statement as SeaStatement); + const converter = new ArrowResultConverter(this.context, this.resultsProvider, metadata); + this.resultSlicer = new ResultSlicer(this.context, converter); + return this.resultSlicer; + } +} diff --git a/lib/sea/SeaOperationLifecycle.ts b/lib/sea/SeaOperationLifecycle.ts new file mode 100644 index 00000000..1bb1715e --- /dev/null +++ b/lib/sea/SeaOperationLifecycle.ts @@ -0,0 +1,272 @@ +// Copyright (c) 2026 Databricks, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * SEA operation lifecycle helpers (M0). + * + * The three methods exposed here (`cancel`, `close`, `finished`) are + * standalone functions that the `SeaOperationBackend` implementation + * delegates to. Keeping them in this dedicated file lets the parallel + * impl-results work (which owns the fetch-* methods on + * `SeaOperationBackend`) land independently — at merge time it can + * either import these helpers from here or inline them, with no + * conflicts on the call sites. + * + * Mapping to the existing `DBSQLOperation` semantics: + * - `cancel()` → ` driver.cancelOperation(...)` on Thrift today + * (`lib/DBSQLOperation.ts:241-259`). For SEA this is a one-shot + * forward to the napi `Statement.cancel()` which in turn calls + * `ExecutedStatementHandle::cancel(&self).await` in the kernel. + * - `close()` → `driver.closeOperation(...)` on Thrift today + * (`lib/DBSQLOperation.ts:265-284`). For SEA this is the napi + * `Statement.close()` which awaits the server-side delete. + * - `finished({progress, callback})` → the 100ms polling loop in + * `DBSQLOperation.waitUntilReady` today (`lib/DBSQLOperation.ts:337-391`). + * For M0 the kernel's `Statement::execute().await` already blocks + * until the statement is in a terminal state, so by the time the JS + * side has an `ExecutedStatement` (and therefore a binding-level + * `Statement`) the underlying operation is already finished. The + * M0 implementation here therefore resolves immediately, optionally + * firing the progress callback once with a synthesized "finished" + * response so callers that wire a progress UI still see a single + * completion tick. + */ + +import Status from '../dto/Status'; +import { OperationStatus, OperationState } from '../contracts/OperationStatus'; +import { LogLevel } from '../contracts/IDBSQLLogger'; +import IClientContext from '../contracts/IClientContext'; +import { mapKernelErrorToJsError, KernelErrorShape } from './SeaErrorMapping'; +import OperationStateError, { OperationStateErrorCode } from '../errors/OperationStateError'; + +/** + * Minimal shape of the napi `Statement` that the lifecycle helpers + * depend on. Declared structurally so unit tests can hand in a mock + * without pulling the real native binding into the test process. + * + * The real binding's `Statement` (see `native/sea/index.d.ts`) has + * additional methods (`fetchNextBatch`, `schema`) which the lifecycle + * helpers deliberately don't touch — those belong to the results + * feature's surface. + */ +export interface SeaStatementHandle { + cancel(): Promise; + close(): Promise; +} + +/** + * Internal lifecycle state shared between the operation backend and + * these helpers. `SeaOperationBackend` keeps an instance of this and + * passes it to each helper call. Centralising the flags here means + * the helpers stay pure (no `this`) and the backend stays + * straightforward. + */ +export interface SeaOperationLifecycleState { + /** True once `cancel()` has succeeded — subsequent fetch* must throw. */ + isCancelled: boolean; + /** True once `close()` has been called (idempotent). */ + isClosed: boolean; +} + +/** + * Factory for a fresh lifecycle-state record. Helps keep test setup + * tidy. + */ +export function createLifecycleState(): SeaOperationLifecycleState { + return { isCancelled: false, isClosed: false }; +} + +/** + * Normalise an error thrown by the napi `Statement` into one of the + * driver's typed error classes. The binding surfaces kernel errors as + * a JSON envelope on `napi::Error.reason` with the sentinel prefix + * `__databricks_error__:` (see the napi-binding round 2 findings, + * section "JSON-envelope error reason"). If we can parse out a kernel + * payload, we route it through `mapKernelErrorToJsError`; otherwise + * the original error is rethrown unchanged. + */ +function rethrowKernelError(err: unknown): never { + if (err instanceof Error && typeof err.message === 'string') { + const sentinel = '__databricks_error__:'; + const idx = err.message.indexOf(sentinel); + if (idx >= 0) { + const json = err.message.slice(idx + sentinel.length); + let parsed: KernelErrorShape | undefined; + try { + parsed = JSON.parse(json) as KernelErrorShape; + } catch { + // Malformed envelope — fall through and rethrow the original + // below; we never silently drop a kernel error. + parsed = undefined; + } + if (parsed) { + throw mapKernelErrorToJsError(parsed); + } + } + } + throw err; +} + +/** + * Cancel an in-flight SEA operation. + * + * Mirrors `DBSQLOperation.cancel` semantics + * (`lib/DBSQLOperation.ts:241-259`): + * - idempotent: returns success if already cancelled or closed + * (no-ops are not bubbled to the kernel because the binding's + * `Statement::cancel` already treats already-finished statements as + * a no-op, but we still want to avoid a network round-trip here), + * - sets the cancelled flag _before_ awaiting the napi call so that a + * concurrent `fetchChunk()` observing the flag short-circuits as + * soon as the await yields (matches the Thrift flag-set ordering + * at `lib/DBSQLOperation.ts:254`), + * - returns a `Status.success()` on success (no rich Thrift status + * payload is available from the kernel side). + */ +export async function seaCancel( + state: SeaOperationLifecycleState, + statement: SeaStatementHandle, + context: IClientContext, + operationId: string, +): Promise { + if (state.isCancelled || state.isClosed) { + return Status.success(); + } + + context.getLogger().log(LogLevel.debug, `Cancelling SEA operation with id: ${operationId}`); + + state.isCancelled = true; + + try { + await statement.cancel(); + } catch (err) { + state.isCancelled = false; + rethrowKernelError(err); + } + + return Status.success(); +} + +/** + * Close a SEA operation. + * + * Mirrors `DBSQLOperation.close` semantics + * (`lib/DBSQLOperation.ts:265-284`) without the Thrift-only + * direct-results-prefetch optimisation: + * - idempotent: a second call is a no-op, + * - awaits the binding's `Statement::close` (which goes through to + * the kernel's `delete_statement` RPC), + * - sets the closed flag _before_ awaiting so a concurrent fetch + * sees the closed state as soon as the await yields. + */ +export async function seaClose( + state: SeaOperationLifecycleState, + statement: SeaStatementHandle, + context: IClientContext, + operationId: string, +): Promise { + if (state.isClosed) { + return Status.success(); + } + + context.getLogger().log(LogLevel.debug, `Closing SEA operation with id: ${operationId}`); + + state.isClosed = true; + + try { + await statement.close(); + } catch (err) { + state.isClosed = false; + rethrowKernelError(err); + } + + return Status.success(); +} + +/** + * Synthesize a neutral {@link OperationStatus} reporting the "finished" + * state. `IOperationBackend.waitUntilReady` is backend-neutral surface — its + * `callback` receives an {@link OperationStatus}, not a Thrift wire struct + * (the public Thrift-shaped `OperationStatusCallback` is adapted at the + * `DBSQLOperation` facade boundary). For M0 we report `Succeeded`. Richer + * fields (`numModifiedRows`, `progressUpdateResponse`, `errorMessage`) defer + * to M1 per the operation feature plan. + */ +function synthesizeFinishedStatus(): OperationStatus { + return { + state: OperationState.Succeeded, + hasResultSet: true, + }; +} + +/** + * `IOperation.finished({progress, callback})` M0 implementation. + * + * The Thrift implementation is a 100ms polling loop over + * `getOperationStatus` (`lib/DBSQLOperation.ts:337-391`). For SEA M0, + * the kernel's `Statement::execute().await` already blocks until the + * statement reaches a terminal state — by the time the JS layer has + * a `Statement` handle, the operation has already finished. + * + * Therefore the M0 implementation resolves immediately. If the + * caller supplied a progress callback we still invoke it once (a + * single completion tick) so progress-UI consumers see the same + * "operation is now finished" signal they'd get from the polling + * Thrift path — just without the intermediate `RUNNING_STATE` + * notifications. + * + * If the operation is already cancelled or closed, this is a no-op + * (matches the Thrift `failIfClosed` / cancelled-state semantics + * without throwing; throwing is the responsibility of subsequent + * fetch calls). + */ +export async function seaFinished( + state: SeaOperationLifecycleState, + options?: { + progress?: boolean; + callback?: (status: OperationStatus) => unknown; + }, +): Promise { + if (state.isCancelled || state.isClosed) { + return; + } + + if (options?.callback) { + const response = synthesizeFinishedStatus(); + // Await the callback in case it returns a promise — matches the + // Thrift code path at `lib/DBSQLOperation.ts:348-351`. + await Promise.resolve(options.callback(response)); + } +} + +/** + * Pre-flight check used by fetch* methods on `SeaOperationBackend`. + * If the operation has been cancelled or closed, throw the same + * `OperationStateError` classes the facade uses. Keeping these typed lets + * callers branch on `OperationStateErrorCode` consistently for Thrift and SEA. + * + * Exported so impl-results can call it at the top of every fetch + * call without duplicating the if/throw logic. + */ +export function failIfNotActive(state: SeaOperationLifecycleState): void { + if (state.isCancelled) { + throw mapKernelErrorToJsError({ + code: 'Cancelled', + message: 'The operation was cancelled.', + }); + } + if (state.isClosed) { + throw new OperationStateError(OperationStateErrorCode.Closed); + } +} diff --git a/lib/sea/SeaResultsProvider.ts b/lib/sea/SeaResultsProvider.ts new file mode 100644 index 00000000..6adf2cba --- /dev/null +++ b/lib/sea/SeaResultsProvider.ts @@ -0,0 +1,117 @@ +// Copyright (c) 2026 Databricks, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import IResultsProvider, { ResultsProviderFetchNextOptions } from '../result/IResultsProvider'; +import { ArrowBatch } from '../result/utils'; +import { decodeIpcBatch, patchIpcBytes } from './SeaArrowIpc'; + +/** + * The minimal slice of the napi-binding `Statement` class that we + * consume from JS. Defined locally (not imported from the binding's + * d.ts) so the loader layer's loose `unknown` typing doesn't force + * unsafe casts at every call site, and so unit tests can pass a stub. + */ +export interface SeaStatementHandle { + fetchNextBatch(): Promise<{ ipcBytes: Buffer } | null>; +} + +/** + * `IResultsProvider` that pulls Arrow IPC batches from the + * kernel via the napi `Statement` handle and adapts them onto the + * shape `ArrowResultConverter` already speaks + * (`lib/result/utils.ts:22-25`). + * + * Each kernel `fetchNextBatch()` call returns a complete Arrow IPC + * stream (schema header + 1 record-batch message) per the design + * documented at `sea-workflow/findings/arch/napi-binding/round2-methods-2026-05-15.md:46-60`. + * We pass that buffer through as a single-element `batches: [ipcBytes]` + * array — `RecordBatchReader.from(arrowBatch.batches)` inside the + * converter (`lib/result/ArrowResultConverter.ts:119`) reads the + * schema from the prefix and then the record-batch messages from the + * remainder of the same buffer. + * + * We pre-parse the IPC bytes once here to extract `rowCount` (the + * sum of `RecordBatch.numRows` across messages in the stream) because + * the converter consumes that as an explicit field rather than + * deriving it from the batch contents. See the comment in + * `SeaArrowIpc.ts:decodeIpcBatch` for the cost rationale. + */ +export default class SeaResultsProvider implements IResultsProvider { + private readonly statement: SeaStatementHandle; + + // Prefetched next batch so `hasMore()` can be answered without an + // extra round-trip. Set by `prime()` (lazy) and by `fetchNext`. + private prefetched?: ArrowBatch; + + // Set once the kernel returns `null` from `fetchNextBatch()`. + private exhausted = false; + + constructor(statement: SeaStatementHandle) { + this.statement = statement; + } + + public async hasMore(): Promise { + if (this.exhausted) { + return false; + } + if (this.prefetched !== undefined) { + return true; + } + await this.prime(); + return this.prefetched !== undefined; + } + + public async fetchNext(_options: ResultsProviderFetchNextOptions): Promise { + if (this.prefetched === undefined && !this.exhausted) { + await this.prime(); + } + if (this.prefetched === undefined) { + return { batches: [], rowCount: 0 }; + } + const out = this.prefetched; + this.prefetched = undefined; + return out; + } + + // Pull the next batch from the kernel and stash it in `prefetched`, + // or mark the stream exhausted. Used by both `hasMore` and `fetchNext` + // to keep one batch buffered ahead so `hasMore` is accurate without + // re-asking the kernel. + private async prime(): Promise { + if (this.exhausted || this.prefetched !== undefined) { + return; + } + const next = await this.statement.fetchNextBatch(); + if (next === null) { + this.exhausted = true; + return; + } + // Patch the raw bytes once: rewrite any Arrow `Duration` field to + // `Int64` with a `databricks.arrow.duration_unit` marker, so that + // apache-arrow@13 (which predates Duration support) can decode the + // stream. `decodeIpcBatch` is told these bytes are already patched; + // the downstream `RecordBatchReader.from` inside `ArrowResultConverter` + // sees the same patched buffer. See `SeaArrowIpcDurationFix.ts`. + const ipcBytes = patchIpcBytes(next.ipcBytes); + const { rowCount } = decodeIpcBatch(ipcBytes, { alreadyPatched: true }); + if (rowCount === 0) { + // Skip empty batches — the converter handles them but pre-filtering + // here avoids one round-trip through the converter's prefetch loop. + // Re-prime to either find a non-empty batch or hit exhaustion. + await this.prime(); + return; + } + this.prefetched = { batches: [ipcBytes], rowCount }; + } +} diff --git a/lib/sea/SeaSessionBackend.ts b/lib/sea/SeaSessionBackend.ts index 6a1415f3..f1850730 100644 --- a/lib/sea/SeaSessionBackend.ts +++ b/lib/sea/SeaSessionBackend.ts @@ -33,6 +33,7 @@ import InfoValue from '../dto/InfoValue'; import HiveDriverError from '../errors/HiveDriverError'; import { SeaConnection } from './SeaNativeLoader'; import { decodeNapiKernelError } from './SeaErrorMapping'; +import SeaOperationBackend from './SeaOperationBackend'; export interface SeaSessionBackendOptions { /** The opaque napi `Connection` handle returned by `openSession`. */ @@ -100,15 +101,33 @@ export default class SeaSessionBackend implements ISessionBackend { * The Thrift backend remains the path for consumers that need any * of those today. */ - // The result-execution path (napi `Connection.executeStatement` → result - // pipeline) is wired in the SEA execution feature. Until then SEA - // executeStatement throws a clear, actionable error rather than silently - // failing, so a `useSEA: true` caller knows the path is not yet available. - public async executeStatement(_statement: string, _options: ExecuteStatementOptions): Promise { + public async executeStatement(statement: string, options: ExecuteStatementOptions): Promise { this.failIfClosed(); - throw new HiveDriverError( - 'SeaSessionBackend.executeStatement: not implemented yet (wired in the SEA execution feature)', - ); + + // M0 surfaces a clear error rather than silently dropping M1-only knobs. + if (options.namedParameters !== undefined || options.ordinalParameters !== undefined) { + throw new HiveDriverError('SEA executeStatement: query parameters are not supported in M0 (deferred to M1)'); + } + if (options.queryTimeout !== undefined) { + throw new HiveDriverError('SEA executeStatement: queryTimeout is not supported in M0 (deferred to M1)'); + } + if (options.useCloudFetch !== undefined) { + throw new HiveDriverError( + 'SEA executeStatement: useCloudFetch is controlled by the kernel result configuration and is not a per-statement option on SEA', + ); + } + + let nativeStatement; + try { + nativeStatement = await this.connection.executeStatement(statement); + } catch (err) { + throw decodeNapiKernelError(err); + } + return new SeaOperationBackend({ + statement: nativeStatement!, + context: this.context, + id: nativeStatement!.statementId, + }); } public async getTypeInfo(_request: TypeInfoRequest): Promise { diff --git a/package-lock.json b/package-lock.json index ee7678d5..3014cdf9 100644 --- a/package-lock.json +++ b/package-lock.json @@ -11,6 +11,7 @@ "dependencies": { "apache-arrow": "^13.0.0", "commander": "^9.3.0", + "flatbuffers": "23.5.26", "node-fetch": "^2.6.12", "node-int64": "^0.4.0", "open": "^8.4.2", @@ -2983,8 +2984,9 @@ }, "node_modules/flatbuffers": { "version": "23.5.26", - "resolved": "https://registry.npmjs.org/flatbuffers/-/flatbuffers-23.5.26.tgz", - "integrity": "sha512-vE+SI9vrJDwi1oETtTIFldC/o9GsVKRM+s6EL0nQgxXlYV1Vc4Tk30hj4xGICftInKQKj1F3up2n8UbIVobISQ==" + "resolved": "https://npm-proxy.dev.databricks.com/flatbuffers/-/flatbuffers-23.5.26.tgz", + "integrity": "sha512-vE+SI9vrJDwi1oETtTIFldC/o9GsVKRM+s6EL0nQgxXlYV1Vc4Tk30hj4xGICftInKQKj1F3up2n8UbIVobISQ==", + "license": "SEE LICENSE IN LICENSE" }, "node_modules/flatted": { "version": "3.2.6", @@ -8637,7 +8639,7 @@ }, "flatbuffers": { "version": "23.5.26", - "resolved": "https://registry.npmjs.org/flatbuffers/-/flatbuffers-23.5.26.tgz", + "resolved": "https://npm-proxy.dev.databricks.com/flatbuffers/-/flatbuffers-23.5.26.tgz", "integrity": "sha512-vE+SI9vrJDwi1oETtTIFldC/o9GsVKRM+s6EL0nQgxXlYV1Vc4Tk30hj4xGICftInKQKj1F3up2n8UbIVobISQ==" }, "flatted": { diff --git a/package.json b/package.json index 8eca3135..02f8eeca 100644 --- a/package.json +++ b/package.json @@ -81,6 +81,7 @@ "dependencies": { "apache-arrow": "^13.0.0", "commander": "^9.3.0", + "flatbuffers": "23.5.26", "node-fetch": "^2.6.12", "node-int64": "^0.4.0", "open": "^8.4.2", diff --git a/tests/e2e/sea/execution-e2e.test.ts b/tests/e2e/sea/execution-e2e.test.ts new file mode 100644 index 00000000..28dd1035 --- /dev/null +++ b/tests/e2e/sea/execution-e2e.test.ts @@ -0,0 +1,124 @@ +// Copyright (c) 2026 Databricks, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import { expect } from 'chai'; +import { DBSQLClient } from '../../../lib'; +import { ConnectionOptions } from '../../../lib/contracts/IDBSQLClient'; +import { InternalConnectionOptions } from '../../../lib/contracts/InternalConnectionOptions'; + +/** + * sea-execution end-to-end test. + * + * Walks the full `DBSQLClient` → `SeaBackend` → napi binding → kernel + * pipeline against a live warehouse over PAT: + * + * 1. `connect({ useSEA: true })` selects the SEA backend. + * 2. `openSession({ initialCatalog: 'main' })` opens a kernel session + * and threads `initialCatalog` through to the napi `ExecuteOptions`. + * 3. `executeStatement('SELECT 1')` returns an `IOperation` backed by + * `SeaOperationBackend` (wraps a napi `Statement`). + * 4. `operation.id` is observable (via `IOperation.id` on the public + * surface). + * 5. `operation.cancel()` and `operation.close()` succeed without + * throwing. + * 6. `session.close()` and `client.close()` succeed without throwing. + * + * **Test gating:** requires the same env vars as `tests/native/e2e-smoke`. + * If any is missing, the suite is skipped so dev machines without + * provisioned secrets don't flap. + * + * **Proxy-validation note (per execution plan §17.4):** M0 verifies + * "no thrift fallback" indirectly — by selecting `useSEA: true` and + * exercising the executeStatement path. A proxy that captures + * `executeStatement` + `GetStatement` wire counts lands in the + * sea-integration round; for now we assert that the SEA pipeline + * itself runs cleanly to completion. + */ +describe('SEA execution end-to-end', function e2eSuite() { + const hostName = process.env.DATABRICKS_PECOTESTING_SERVER_HOSTNAME; + const httpPath = process.env.DATABRICKS_PECOTESTING_HTTP_PATH; + const token = process.env.DATABRICKS_PECOTESTING_TOKEN_PERSONAL; + + // Live-warehouse round-trips can take a few seconds through warm-up. + this.timeout(60_000); + + before(function gate() { + if (!hostName || !httpPath || !token) { + // eslint-disable-next-line no-invalid-this + this.skip(); + } + }); + + it('opens a session, executes SELECT 1, and closes cleanly via SEA backend', async () => { + const client = new DBSQLClient(); + + await client.connect({ + host: hostName as string, + path: httpPath as string, + token: token as string, + useSEA: true, + } as ConnectionOptions & InternalConnectionOptions); + + const session = await client.openSession({ + initialCatalog: 'main', + }); + expect(session).to.be.an('object'); + expect(session.id).to.be.a('string').and.have.length.greaterThan(0); + + const operation = await session.executeStatement('SELECT 1', {}); + expect(operation).to.be.an('object'); + // `IOperation.id` is the public-API observable identity for the + // returned operation. SeaOperationBackend generates a UUIDv4 for + // M0 until the napi binding surfaces the server statement id. + expect(operation.id).to.be.a('string').and.have.length.greaterThan(0); + + // M0 does not yet plumb fetchChunk through the SEA pipeline + // (sea-results owns that). We exercise the lifecycle: cancel is a + // no-op against a finished statement, close releases the kernel + // handle. + await operation.close(); + + await session.close(); + await client.close(); + }); + + it('passes sessionConfig (Spark conf) through openSession.configuration', async () => { + const client = new DBSQLClient(); + + await client.connect({ + host: hostName as string, + path: httpPath as string, + token: token as string, + useSEA: true, + } as ConnectionOptions & InternalConnectionOptions); + + // Sanity-check that supplying session-level Spark conf does not + // break openSession. The SEA wire applies these as `parameters` on + // every executeStatement; we don't observe them in the response + // for M0, but the absence of an error proves the napi binding + // accepts and forwards the map. + const session = await client.openSession({ + initialCatalog: 'main', + configuration: { + 'spark.sql.session.timeZone': 'UTC', + }, + }); + + const operation = await session.executeStatement('SELECT 1', {}); + await operation.close(); + + await session.close(); + await client.close(); + }); +}); diff --git a/tests/e2e/sea/results-e2e.test.ts b/tests/e2e/sea/results-e2e.test.ts new file mode 100644 index 00000000..497889c5 --- /dev/null +++ b/tests/e2e/sea/results-e2e.test.ts @@ -0,0 +1,128 @@ +// Copyright (c) 2026 Databricks, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* eslint-disable no-console */ + +import { expect } from 'chai'; +import { DBSQLClient } from '../../../lib'; +import { ConnectionOptions } from '../../../lib/contracts/IDBSQLClient'; +import { InternalConnectionOptions } from '../../../lib/contracts/InternalConnectionOptions'; + +// Integration suite: connect through both backends, run a probe query, +// and assert byte-identical row output (the M0 parity gate). Requires +// the developer's shell to export the pecotesting secrets: +// - DATABRICKS_PECOTESTING_SERVER_HOSTNAME +// - DATABRICKS_PECOTESTING_HTTP_PATH +// - DATABRICKS_PECOTESTING_TOKEN_PERSONAL +// If any is missing, the suite skips so CI / sandboxes without +// credentials don't flap. + +const PROBE_QUERY = "SELECT 1 AS x, 'hello' AS s, true AS b, CAST(1.5 AS DECIMAL(10,2)) AS d, DATE '2026-01-01' AS dt"; + +interface PecoSecrets { + host: string; + path: string; + token: string; +} + +function readSecrets(): PecoSecrets | null { + const host = process.env.DATABRICKS_PECOTESTING_SERVER_HOSTNAME; + const path = process.env.DATABRICKS_PECOTESTING_HTTP_PATH; + const token = process.env.DATABRICKS_PECOTESTING_TOKEN_PERSONAL; + if (!host || !path || !token) return null; + return { host, path, token }; +} + +async function fetchProbeRows(useSEA: boolean, secrets: PecoSecrets): Promise>> { + const client = new DBSQLClient(); + await client.connect({ + host: secrets.host, + path: secrets.path, + token: secrets.token, + useSEA, + } as ConnectionOptions & InternalConnectionOptions); + try { + const session = await client.openSession(); + try { + const operation = await session.executeStatement(PROBE_QUERY); + try { + const rows = (await operation.fetchAll()) as Array>; + return rows; + } finally { + await operation.close(); + } + } finally { + await session.close(); + } + } finally { + await client.close(); + } +} + +// JSON-safe normalisation for byte-identical comparison. Buffers, Dates +// and BigInts each have distinct JSON representations; we coerce them +// to stable strings so deep.equal compares value-for-value across +// backends. The thrift converter and the SEA converter both surface +// these as JS Date / Buffer / Number — but we still normalise here so +// a future divergence (e.g. one path returning a string while the +// other returns a Date) trips the assertion explicitly. +function canonical(value: unknown): unknown { + if (value === null || value === undefined) return value; + if (Buffer.isBuffer(value)) return `__buffer__:${value.toString('hex')}`; + if (value instanceof Date) return `__date__:${value.toISOString()}`; + if (typeof value === 'bigint') return `__bigint__:${value.toString()}`; + if (Array.isArray(value)) return value.map(canonical); + if (typeof value === 'object') { + const out: Record = {}; + for (const [k, v] of Object.entries(value as Record)) { + out[k] = canonical(v); + } + return out; + } + return value; +} + +describe('SEA results end-to-end (pecotesting parity gate)', function suite() { + this.timeout(120_000); + + const secrets = readSecrets(); + + before(function gate() { + if (!secrets) { + // eslint-disable-next-line no-invalid-this + this.skip(); + } + }); + + it('SEA backend returns one row with expected columns', async () => { + const rows = await fetchProbeRows(true, secrets as PecoSecrets); + expect(rows.length).to.equal(1); + const row = rows[0]; + expect(row).to.have.property('x'); + expect(row).to.have.property('s'); + expect(row).to.have.property('b'); + expect(row).to.have.property('d'); + expect(row).to.have.property('dt'); + expect(Number(row.x)).to.equal(1); + expect(row.s).to.equal('hello'); + expect(row.b).to.equal(true); + expect(Number(row.d)).to.equal(1.5); + }); + + it('Thrift and SEA produce byte-identical rows for the probe query (parity gate)', async () => { + const seaRows = await fetchProbeRows(true, secrets as PecoSecrets); + const thriftRows = await fetchProbeRows(false, secrets as PecoSecrets); + expect(seaRows.map(canonical)).to.deep.equal(thriftRows.map(canonical)); + }); +}); diff --git a/tests/unit/result/ArrowResultConverter.test.ts b/tests/unit/result/ArrowResultConverter.test.ts index 5f940544..dfe00966 100644 --- a/tests/unit/result/ArrowResultConverter.test.ts +++ b/tests/unit/result/ArrowResultConverter.test.ts @@ -5,7 +5,7 @@ import { Table, tableFromArrays, tableToIPC, RecordBatch, TypeMap } from 'apache import ArrowResultConverter from '../../../lib/result/ArrowResultConverter'; import { ArrowBatch } from '../../../lib/result/utils'; import ResultsProviderStub from '../.stubs/ResultsProviderStub'; -import { TStatusCode, TTableSchema, TTypeId } from '../../../thrift/TCLIService_types'; +import { TTableSchema, TTypeId } from '../../../thrift/TCLIService_types'; import ClientContextStub from '../.stubs/ClientContextStub'; @@ -89,7 +89,6 @@ describe('ArrowResultConverter', () => { ); const result = new ArrowResultConverter(new ClientContextStub(), rowSetProvider, { schema: sampleThriftSchema, - status: { statusCode: TStatusCode.SUCCESS_STATUS }, }); expect(await result.fetchNext({ limit: 10000 })).to.be.deep.eq([{ 1: 1 }]); }); @@ -98,7 +97,6 @@ describe('ArrowResultConverter', () => { const rowSetProvider = new ResultsProviderStub([], emptyItem); const result = new ArrowResultConverter(new ClientContextStub(), rowSetProvider, { schema: sampleThriftSchema, - status: { statusCode: TStatusCode.SUCCESS_STATUS }, }); expect(await result.fetchNext({ limit: 10000 })).to.be.deep.eq([]); expect(await result.hasMore()).to.be.false; @@ -116,7 +114,6 @@ describe('ArrowResultConverter', () => { ); const result = new ArrowResultConverter(new ClientContextStub(), rowSetProvider, { schema: undefined, - status: { statusCode: TStatusCode.SUCCESS_STATUS }, }); expect(await result.hasMore()).to.be.false; expect(await result.fetchNext({ limit: 10000 })).to.be.deep.eq([]); @@ -134,7 +131,6 @@ describe('ArrowResultConverter', () => { ); const result = new ArrowResultConverter(new ClientContextStub(), rowSetProvider, { schema: thriftSchemaAllNulls, - status: { statusCode: TStatusCode.SUCCESS_STATUS }, }); expect(await result.fetchNext({ limit: 10000 })).to.be.deep.eq([ { @@ -189,7 +185,6 @@ describe('ArrowResultConverter', () => { ); const result = new ArrowResultConverter(new ClientContextStub(), rowSetProvider, { schema: createSampleThriftSchema('id'), - status: { statusCode: TStatusCode.SUCCESS_STATUS }, }); const rows1 = await result.fetchNext({ limit: 10000 }); diff --git a/tests/unit/result/compatibility.test.ts b/tests/unit/result/compatibility.test.ts index cc6d89d8..44d64663 100644 --- a/tests/unit/result/compatibility.test.ts +++ b/tests/unit/result/compatibility.test.ts @@ -33,7 +33,7 @@ describe('Result handlers compatibility tests', () => { arrowSchema: fixtureArrow.arrowSchema, status: { statusCode: TStatusCode.SUCCESS_STATUS }, }), - { schema: fixtureArrow.schema, status: { statusCode: TStatusCode.SUCCESS_STATUS } }, + { schema: fixtureArrow.schema }, ); const rows = await result.fetchNext({ limit: 10000 }); expect(fixArrowResult(rows)).to.deep.equal(fixtureArrow.expected); @@ -48,7 +48,7 @@ describe('Result handlers compatibility tests', () => { arrowSchema: fixtureArrowNT.arrowSchema, status: { statusCode: TStatusCode.SUCCESS_STATUS }, }), - { schema: fixtureArrowNT.schema, status: { statusCode: TStatusCode.SUCCESS_STATUS } }, + { schema: fixtureArrowNT.schema }, ); const rows = await result.fetchNext({ limit: 10000 }); expect(fixArrowResult(rows)).to.deep.equal(fixtureArrowNT.expected); @@ -63,7 +63,7 @@ describe('Result handlers compatibility tests', () => { schema: fixtureArrow.schema, status: { statusCode: TStatusCode.SUCCESS_STATUS }, }), - { schema: fixtureArrow.schema, status: { statusCode: TStatusCode.SUCCESS_STATUS } }, + { schema: fixtureArrow.schema }, ); const rows = await result.fetchNext({ limit: 10000 }); expect(fixArrowResult(rows)).to.deep.equal(fixtureArrow.expected); diff --git a/tests/unit/sea/SeaOperationBackend.test.ts b/tests/unit/sea/SeaOperationBackend.test.ts new file mode 100644 index 00000000..ada6616d --- /dev/null +++ b/tests/unit/sea/SeaOperationBackend.test.ts @@ -0,0 +1,287 @@ +// Copyright (c) 2026 Databricks, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import { expect } from 'chai'; +import { + Schema, + Field, + RecordBatch, + Table, + tableToIPC, + Bool, + Int8, + Int16, + Int32, + Int64, + Float32, + Float64, + Utf8, + Binary, + DateDay, + TimestampMicrosecond, + Decimal, + Struct, + makeData, + vectorFromArray, +} from 'apache-arrow'; + +import SeaOperationBackend from '../../../lib/sea/SeaOperationBackend'; +import ClientContextStub from '../.stubs/ClientContextStub'; + +// Minimal stub of the napi `Statement` surface that emits a precomputed +// Arrow IPC payload per `fetchNextBatch()` call. Used to feed +// `SeaOperationBackend` synthetic batches that mirror the kernel's +// per-batch IPC stream contract (`schema header + 1 record-batch +// message`) without loading the native binding. +class StatementStub { + private readonly batches: Buffer[]; + + private readonly schemaIpc: Buffer; + + public cancelled = false; + + public closed = false; + + constructor(schemaIpc: Buffer, batches: Buffer[]) { + this.schemaIpc = schemaIpc; + this.batches = [...batches]; + } + + // Mirrors the kernel `Statement.statementId` getter. + public readonly statementId = '01ef-fake-statement-id'; + + public async fetchNextBatch(): Promise<{ ipcBytes: Buffer } | null> { + if (this.batches.length === 0) return null; + return { ipcBytes: this.batches.shift() as Buffer }; + } + + // schema() is synchronous on the merged-kernel binding. + public schema(): { ipcBytes: Buffer } { + return { ipcBytes: this.schemaIpc }; + } + + public async cancel(): Promise { + this.cancelled = true; + } + + public async close(): Promise { + this.closed = true; + } + + // Status accessors from the kernel's status-fields surface. + public async numModifiedRows(): Promise { + return null; + } + + public async displayMessage(): Promise { + return null; + } + + public async diagnosticInfo(): Promise { + return null; + } + + public async errorDetailsJson(): Promise { + return null; + } +} + +// Helper: attach `databricks.type_name` to a field so the SEA Thrift +// schema synthesiser can resolve the TTypeId (matches kernel behaviour +// at `src/reader/mod.rs:476-504`). +function withTypeName(field: T, typeName: string): T { + const meta = new Map(field.metadata); + meta.set('databricks.type_name', typeName); + return new Field(field.name, field.type, field.nullable, meta) as T; +} + +// Build a single IPC stream (schema header + 1 record-batch message) +// from a Schema and a column->values mapping. Mirrors the kernel's +// per-batch ResultStream output shape. +function ipcFromColumns(schema: Schema, columns: Record): Buffer { + const vectors: any[] = []; + for (const field of schema.fields) { + const col = columns[field.name]; + vectors.push(vectorFromArray(col, field.type)); + } + const data = vectors.map((v) => v.data[0]); + const struct = makeData({ + type: new Struct(schema.fields), + children: data, + length: data[0]?.length ?? 0, + nullCount: 0, + }); + const batch = new RecordBatch(schema, struct); + const table = new Table([batch]); + return Buffer.from(tableToIPC(table, 'stream')); +} + +function ipcSchemaOnly(schema: Schema): Buffer { + // tableToIPC on an empty table produces a schema-only stream. + const struct = makeData({ + type: new Struct(schema.fields), + children: schema.fields.map((f) => makeData({ type: f.type as any, length: 0, nullCount: 0 })), + length: 0, + nullCount: 0, + }); + const batch = new RecordBatch(schema, struct); + const table = new Table([batch]); + return Buffer.from(tableToIPC(table, 'stream')); +} + +describe('SeaOperationBackend — M0 datatype round-trip via napi → ArrowResultConverter', () => { + it('passes M0 primitive datatypes through the same converter the thrift path uses', async () => { + // One row per M0 primitive type with a kernel-style metadata tag on + // each field. Decimal carries a real scale (2) so the converter's + // Phase-1 scale division produces 1.5 from the unscaled bigint. + const fields = [ + withTypeName(new Field('b', new Bool(), true), 'BOOLEAN'), + withTypeName(new Field('i8', new Int8(), true), 'TINYINT'), + withTypeName(new Field('i16', new Int16(), true), 'SMALLINT'), + withTypeName(new Field('i32', new Int32(), true), 'INT'), + withTypeName(new Field('i64', new Int64(), true), 'BIGINT'), + withTypeName(new Field('f32', new Float32(), true), 'FLOAT'), + withTypeName(new Field('f64', new Float64(), true), 'DOUBLE'), + withTypeName(new Field('s', new Utf8(), true), 'STRING'), + withTypeName(new Field('bin', new Binary(), true), 'BINARY'), + withTypeName(new Field('dt', new DateDay(), true), 'DATE'), + withTypeName(new Field('ts', new TimestampMicrosecond(), true), 'TIMESTAMP'), + // apache-arrow's Decimal signature is `(scale, precision, bitWidth)`. + withTypeName(new Field('dec', new Decimal(2, 10, 128), true), 'DECIMAL'), + // INTERVAL on the kernel side: Utf8 + metadata annotation. + withTypeName(new Field('iv', new Utf8(), true), 'INTERVAL'), + ]; + const schema = new Schema(fields); + const schemaIpc = ipcSchemaOnly(schema); + + // DECIMAL: 128-bit little-endian unscaled integer. 150 little-endian + // → [150, 0, 0, 0, ...0]. Phase-1 reads `valueType.scale` (=2) so the + // converter divides by 100 to yield 1.5. + const decimalBytes = new Uint8Array(16); + decimalBytes[0] = 150; + const dataIpc = ipcFromColumns(schema, { + b: [true], + i8: [Int8Array.from([1])[0]], + i16: [Int16Array.from([200])[0]], + i32: [42], + i64: [BigInt(1234567890123)], + f32: [Math.fround(1.5)], + f64: [3.14], + s: ['hello'], + bin: [new Uint8Array([0xde, 0xad, 0xbe, 0xef])], + dt: [new Date('2026-01-01T00:00:00Z')], + // Builder for TimestampMicrosecond accepts numeric epoch-ms; the + // internal scaling multiplies by 1000 to land on µs. + ts: [new Date('2026-05-15T12:00:00Z').valueOf()], + dec: [decimalBytes], + iv: ['1-0'], + }); + + const stub = new StatementStub(schemaIpc, [dataIpc]); + const backend = new SeaOperationBackend({ + statement: stub, + context: new ClientContextStub(), + }); + + const rows = await backend.fetchChunk({ limit: 100 }); + expect(rows.length).to.equal(1); + const row = rows[0] as Record; + + expect(row.b).to.equal(true); + expect(row.i8).to.equal(1); + expect(row.i16).to.equal(200); + expect(row.i32).to.equal(42); + // BIGINT goes through Phase-2 convertBigInt → Number (matches thrift) + expect(row.i64).to.equal(1234567890123); + expect(row.f32).to.equal(Math.fround(1.5)); + expect(row.f64).to.equal(3.14); + expect(row.s).to.equal('hello'); + expect(Buffer.isBuffer(row.bin)).to.equal(true); + expect((row.bin as Buffer).equals(Buffer.from([0xde, 0xad, 0xbe, 0xef]))).to.equal(true); + // DECIMAL: Phase-1 scale-aware coercion via Arrow's Decimal type → 1.5 + expect(row.dec).to.equal(1.5); + // TIMESTAMP: Phase-1 produces JS Date for arrow timestamps + expect(row.ts).to.be.instanceOf(Date); + expect((row.ts as Date).toISOString()).to.equal('2026-05-15T12:00:00.000Z'); + // INTERVAL: kernel emits Utf8 + metadata; converter passes through as string + expect(row.iv).to.equal('1-0'); + + // After consuming the single batch, the backend should report no more rows. + expect(await backend.hasMore()).to.equal(false); + }); + + it('round-trips ARRAY / MAP / STRUCT via the converter Phase-2 JSON fallback', async () => { + // ARRAY / MAP / STRUCT have two possible wire encodings in M0: + // (a) native Arrow `List` / `Map` / `Struct` — Phase 1 produces plain + // JS objects; Phase 2 `convertJSON` sees a non-string and is a + // no-op (`utils.ts:39-49`). + // (b) Utf8 JSON strings — Phase 1 passthrough; Phase 2 `convertJSON` + // runs `JSON.parse` (`utils.ts:75-79`). + // Both produce identical row shapes. We validate (b) here because + // it's the deterministic case we can construct with the current + // apache-arrow JS API; the kernel emits either depending on server + // config (see `findings/rust-kernel/datatype-emission...:140-142`). + const strSchema = new Schema([ + withTypeName(new Field('arr', new Utf8(), true), 'ARRAY'), + withTypeName(new Field('m', new Utf8(), true), 'MAP'), + withTypeName(new Field('s', new Utf8(), true), 'STRUCT'), + ]); + const strSchemaIpc = ipcSchemaOnly(strSchema); + const strDataIpc = ipcFromColumns(strSchema, { + arr: ['[1,2,3]'], + m: ['{"k":1}'], + s: ['{"a":1,"b":"hi"}'], + }); + + const stub = new StatementStub(strSchemaIpc, [strDataIpc]); + const backend = new SeaOperationBackend({ + statement: stub, + context: new ClientContextStub(), + }); + const rows = await backend.fetchChunk({ limit: 100 }); + expect(rows.length).to.equal(1); + const row = rows[0] as Record; + expect(row.arr).to.deep.equal([1, 2, 3]); + expect(row.m).to.deep.equal({ k: 1 }); + expect(row.s).to.deep.equal({ a: 1, b: 'hi' }); + }); + + it('streams multiple batches and reports hasMore correctly', async () => { + const schema = new Schema([withTypeName(new Field('x', new Int32(), true), 'INT')]); + const schemaIpc = ipcSchemaOnly(schema); + const batch1 = ipcFromColumns(schema, { x: [1, 2] }); + const batch2 = ipcFromColumns(schema, { x: [3] }); + + const stub = new StatementStub(schemaIpc, [batch1, batch2]); + const backend = new SeaOperationBackend({ + statement: stub, + context: new ClientContextStub(), + }); + + const all = await backend.fetchChunk({ limit: 10 }); + expect(all).to.deep.equal([{ x: 1 }, { x: 2 }, { x: 3 }]); + expect(await backend.hasMore()).to.equal(false); + }); + + it('cancel / close delegate to the native statement', async () => { + const schema = new Schema([withTypeName(new Field('x', new Int32(), true), 'INT')]); + const schemaIpc = ipcSchemaOnly(schema); + const stub = new StatementStub(schemaIpc, []); + const backend = new SeaOperationBackend({ statement: stub, context: new ClientContextStub() }); + await backend.cancel(); + expect(stub.cancelled).to.equal(true); + await backend.close(); + expect(stub.closed).to.equal(true); + }); +}); diff --git a/tests/unit/sea/execution.test.ts b/tests/unit/sea/execution.test.ts new file mode 100644 index 00000000..e71a6a07 --- /dev/null +++ b/tests/unit/sea/execution.test.ts @@ -0,0 +1,483 @@ +// Copyright (c) 2026 Databricks, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import { expect } from 'chai'; +import sinon from 'sinon'; +import SeaBackend from '../../../lib/sea/SeaBackend'; +import SeaSessionBackend from '../../../lib/sea/SeaSessionBackend'; +import SeaOperationBackend from '../../../lib/sea/SeaOperationBackend'; +import { SeaNativeBinding, SeaConnection, SeaStatement } from '../../../lib/sea/SeaNativeLoader'; +import IClientContext, { ClientConfig } from '../../../lib/contracts/IClientContext'; +import IDBSQLLogger, { LogLevel } from '../../../lib/contracts/IDBSQLLogger'; +import HiveDriverError from '../../../lib/errors/HiveDriverError'; +import { ConnectionOptions } from '../../../lib/contracts/IDBSQLClient'; + +// ----------------------------------------------------------------------------- +// Fakes — minimal stand-ins for the napi-rs generated surface and the +// IClientContext side of the abstraction. Keeping them inline avoids +// pulling in test-only fixtures from outside the sea/ namespace. +// ----------------------------------------------------------------------------- + +class FakeNativeStatement implements SeaStatement { + public closed = false; + + public cancelled = false; + + // Mirrors the kernel `Statement.statementId` getter. + public readonly statementId = '01ef-fake-statement-id'; + + public async fetchNextBatch() { + return null; + } + + // schema() is synchronous on the merged-kernel binding. + public schema() { + return { ipcBytes: Buffer.alloc(0) }; + } + + public async cancel() { + this.cancelled = true; + } + + public async close() { + this.closed = true; + } + + // Status accessors added by the kernel's status-fields surface. + public async numModifiedRows(): Promise { + return null; + } + + public async displayMessage(): Promise { + return null; + } + + public async diagnosticInfo(): Promise { + return null; + } + + public async errorDetailsJson(): Promise { + return null; + } +} + +class FakeNativeConnection implements SeaConnection { + public closed = false; + + public lastSql?: string; + + public throwOnExecute: Error | null = null; + + public statementToReturn: FakeNativeStatement = new FakeNativeStatement(); + + // Mirrors the kernel `Connection.sessionId` getter. + public readonly sessionId = '01ef-fake-session-id'; + + // Session-level migration: per-statement options were removed, so the + // binding's executeStatement takes only `sql`. + public async executeStatement(sql: string): Promise { + if (this.throwOnExecute) { + throw this.throwOnExecute; + } + this.lastSql = sql; + return this.statementToReturn; + } + + public async close(): Promise { + this.closed = true; + } +} + +function makeBinding(connection: SeaConnection): SeaNativeBinding & { + openSessionStub: sinon.SinonStub; +} { + const openSessionStub = sinon.stub().resolves(connection); + // Structural cast through `unknown`: the binding type carries an `AuthMode` + // const enum that can't be produced as a runtime value, so the whole fake + // is cast rather than each member. + const binding = { + version: () => 'test', + openSession: openSessionStub, + Connection: function Connection() {}, + Statement: function Statement() {}, + } as unknown as SeaNativeBinding; + return Object.assign(binding, { openSessionStub }); +} + +function makeContext(): IClientContext { + const logger: IDBSQLLogger = { + log(_level: LogLevel, _message: string): void { + // no-op + }, + }; + const config = {} as ClientConfig; + return { + getConfig: () => config, + getLogger: () => logger, + getConnectionProvider: async () => { + throw new Error('not used by SEA backend'); + }, + getClient: async () => { + throw new Error('not used by SEA backend'); + }, + getDriver: async () => { + throw new Error('not used by SEA backend'); + }, + }; +} + +// ----------------------------------------------------------------------------- +// Tests +// ----------------------------------------------------------------------------- + +describe('SeaBackend', () => { + it('connect() captures the connection options and validates PAT auth', async () => { + const connection = new FakeNativeConnection(); + const binding = makeBinding(connection); + const backend = new SeaBackend({ context: makeContext(), nativeBinding: binding }); + + await backend.connect({ + host: 'example.databricks.com', + path: '/sql/1.0/warehouses/abc', + token: 'dapi-token', + } as ConnectionOptions); + + // openSession should not have been called by connect() + expect(binding.openSessionStub.called).to.equal(false); + }); + + // sea-auth-u2m: `databricks-oauth` with no id/secret is now the U2M happy + // path (M0 was PAT-only, but the OAuth M2M+U2M feature on sea-auth-u2m + // accepts the full set of `databricks-oauth` variants). M2M/U2M flow- + // dispatch coverage lives in auth-m2m.test.ts / auth-u2m.test.ts; + // out-of-scope auth modes are now whatever neither PAT nor + // `databricks-oauth` covers (e.g. `token-provider`, `external-token`). + it('connect() rejects unsupported auth modes (non-PAT, non-OAuth)', async () => { + const connection = new FakeNativeConnection(); + const binding = makeBinding(connection); + const backend = new SeaBackend({ context: makeContext(), nativeBinding: binding }); + + let thrown: unknown; + try { + await backend.connect({ + host: 'example.databricks.com', + path: '/sql/1.0/warehouses/abc', + // eslint-disable-next-line @typescript-eslint/no-explicit-any + authType: 'token-provider', + } as any); + } catch (err) { + thrown = err; + } + expect(thrown).to.be.instanceOf(HiveDriverError); + expect((thrown as Error).message).to.match(/unsupported auth mode/); + }); + + it('connect() rejects missing token', async () => { + const connection = new FakeNativeConnection(); + const binding = makeBinding(connection); + const backend = new SeaBackend({ context: makeContext(), nativeBinding: binding }); + + let thrown: unknown; + try { + await backend.connect({ + host: 'example.databricks.com', + path: '/sql/1.0/warehouses/abc', + token: '', + } as ConnectionOptions); + } catch (err) { + thrown = err; + } + expect(thrown).to.be.instanceOf(HiveDriverError); + // After sea-integration merge, missing-token validation goes through + // SeaAuth.buildSeaConnectionOptions which throws AuthenticationError + // (extends HiveDriverError) with the "non-empty PAT" message. + expect((thrown as Error).message).to.match(/non-empty PAT/); + }); + + it('openSession() throws if connect() was not called', async () => { + const connection = new FakeNativeConnection(); + const binding = makeBinding(connection); + const backend = new SeaBackend({ context: makeContext(), nativeBinding: binding }); + + let thrown: unknown; + try { + await backend.openSession({}); + } catch (err) { + thrown = err; + } + expect(thrown).to.be.instanceOf(HiveDriverError); + expect((thrown as Error).message).to.match(/not connected/); + }); + + it('openSession() forwards hostName / httpPath / token to napi binding', async () => { + const connection = new FakeNativeConnection(); + const binding = makeBinding(connection); + const backend = new SeaBackend({ context: makeContext(), nativeBinding: binding }); + + await backend.connect({ + host: 'workspace.example', + path: '/sql/1.0/warehouses/xyz', + token: 'dapi-token', + } as ConnectionOptions); + + await backend.openSession({}); + + expect(binding.openSessionStub.calledOnce).to.equal(true); + const args = binding.openSessionStub.firstCall.args[0]; + // sea-auth-u2m introduced the discriminated SeaNativeConnectionOptions + // shape with a leading `authMode` tag — `'Pat'` for the PAT branch. + expect(args).to.deep.equal({ + hostName: 'workspace.example', + httpPath: '/sql/1.0/warehouses/xyz', + authMode: 'Pat', + token: 'dapi-token', + }); + }); + + it('openSession() returns a SeaSessionBackend wrapping the napi Connection', async () => { + const connection = new FakeNativeConnection(); + const binding = makeBinding(connection); + const backend = new SeaBackend({ context: makeContext(), nativeBinding: binding }); + + await backend.connect({ + host: 'h', + path: '/p', + token: 't', + } as ConnectionOptions); + + const sessionBackend = await backend.openSession({}); + expect(sessionBackend).to.be.instanceOf(SeaSessionBackend); + expect(sessionBackend.id).to.be.a('string').and.have.length.greaterThan(0); + }); + + it('openSession() forwards initialCatalog / initialSchema / configuration to the napi openSession call (not per-statement)', async () => { + const connection = new FakeNativeConnection(); + const binding = makeBinding(connection); + const backend = new SeaBackend({ context: makeContext(), nativeBinding: binding }); + + await backend.connect({ + host: 'h', + path: '/p', + token: 't', + } as ConnectionOptions); + + const session = await backend.openSession({ + initialCatalog: 'main', + initialSchema: 'default', + configuration: { 'spark.sql.execution.arrow.enabled': 'true' }, + }); + + // The defaults reach the kernel via `Session::builder().defaults()` + + // `.session_conf()`, applied on `CreateSession`. Assert they were + // folded into the napi `openSession` arg. + expect(binding.openSessionStub.calledOnce).to.equal(true); + expect(binding.openSessionStub.firstCall.args[0]).to.deep.include({ + authMode: 'Pat', + token: 't', + catalog: 'main', + schema: 'default', + sessionConf: { 'spark.sql.execution.arrow.enabled': 'true' }, + }); + + // And the SQL still threads through executeStatement (now with no + // per-statement options). + await session.executeStatement('SELECT 1', {}); + expect(connection.lastSql).to.equal('SELECT 1'); + }); + + it('close() clears connection state without throwing', async () => { + const connection = new FakeNativeConnection(); + const binding = makeBinding(connection); + const backend = new SeaBackend({ context: makeContext(), nativeBinding: binding }); + await backend.connect({ host: 'h', path: '/p', token: 't' } as ConnectionOptions); + await backend.close(); + + let thrown: unknown; + try { + await backend.openSession({}); + } catch (err) { + thrown = err; + } + expect(thrown).to.be.instanceOf(HiveDriverError); + }); +}); + +describe('SeaSessionBackend', () => { + function makeSession(connection: SeaConnection) { + return new SeaSessionBackend({ connection, context: makeContext() }); + } + + it('executeStatement passes sql through verbatim', async () => { + const connection = new FakeNativeConnection(); + const session = makeSession(connection); + await session.executeStatement('SELECT * FROM foo', {}); + expect(connection.lastSql).to.equal('SELECT * FROM foo'); + }); + + it('executeStatement returns a SeaOperationBackend with an id', async () => { + const connection = new FakeNativeConnection(); + const session = makeSession(connection); + const op = await session.executeStatement('SELECT 1', {}); + expect(op).to.be.instanceOf(SeaOperationBackend); + expect(op.id).to.be.a('string').and.have.length.greaterThan(0); + }); + + it('executeStatement rejects namedParameters (M1)', async () => { + const connection = new FakeNativeConnection(); + const session = makeSession(connection); + let thrown: unknown; + try { + await session.executeStatement('SELECT :x', { namedParameters: { x: 1 } }); + } catch (err) { + thrown = err; + } + expect(thrown).to.be.instanceOf(HiveDriverError); + expect((thrown as Error).message).to.match(/parameters/); + }); + + it('executeStatement rejects ordinalParameters (M1)', async () => { + const connection = new FakeNativeConnection(); + const session = makeSession(connection); + let thrown: unknown; + try { + await session.executeStatement('SELECT ?', { ordinalParameters: [1] }); + } catch (err) { + thrown = err; + } + expect(thrown).to.be.instanceOf(HiveDriverError); + }); + + it('executeStatement rejects queryTimeout (M1)', async () => { + const connection = new FakeNativeConnection(); + const session = makeSession(connection); + let thrown: unknown; + try { + await session.executeStatement('SELECT 1', { queryTimeout: 30 }); + } catch (err) { + thrown = err; + } + expect(thrown).to.be.instanceOf(HiveDriverError); + expect((thrown as Error).message).to.match(/queryTimeout/); + }); + + it('metadata methods throw deferred-M1 errors', async () => { + const connection = new FakeNativeConnection(); + const session = makeSession(connection); + for (const method of [ + 'getInfo', + 'getTypeInfo', + 'getCatalogs', + 'getSchemas', + 'getTables', + 'getTableTypes', + 'getColumns', + 'getFunctions', + 'getPrimaryKeys', + 'getCrossReference', + ] as const) { + let thrown: unknown; + try { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + await (session as any)[method]({}); + } catch (err) { + thrown = err; + } + expect(thrown, `expected ${method} to throw`).to.be.instanceOf(HiveDriverError); + expect((thrown as Error).message).to.match(/M1|not implemented/); + } + }); + + it('close() forwards to the native connection', async () => { + const connection = new FakeNativeConnection(); + const session = makeSession(connection); + const status = await session.close(); + expect(connection.closed).to.equal(true); + expect(status.isSuccess).to.equal(true); + }); + + it('close() is idempotent', async () => { + const connection = new FakeNativeConnection(); + const session = makeSession(connection); + await session.close(); + // Second call should not re-invoke connection.close + connection.closed = false; + const status = await session.close(); + expect(connection.closed).to.equal(false); + expect(status.isSuccess).to.equal(true); + }); + + it('executeStatement fails after close()', async () => { + const connection = new FakeNativeConnection(); + const session = makeSession(connection); + await session.close(); + let thrown: unknown; + try { + await session.executeStatement('SELECT 1', {}); + } catch (err) { + thrown = err; + } + expect(thrown).to.be.instanceOf(HiveDriverError); + }); +}); + +describe('SeaOperationBackend', () => { + function makeOperation(statement: SeaStatement = new FakeNativeStatement()) { + return new SeaOperationBackend({ statement, context: makeContext() }); + } + + it('id is a stable string', () => { + const op = makeOperation(); + expect(op.id).to.equal(op.id); + expect(op.id).to.be.a('string').and.have.length.greaterThan(0); + }); + + it('hasResultSet is true for M0', () => { + const op = makeOperation(); + expect(op.hasResultSet()).to.equal(true); + }); + + it('cancel() forwards to napi Statement', async () => { + const stmt = new FakeNativeStatement(); + const op = makeOperation(stmt); + await op.cancel(); + expect(stmt.cancelled).to.equal(true); + }); + + it('cancel() is idempotent', async () => { + const stmt = new FakeNativeStatement(); + const op = makeOperation(stmt); + await op.cancel(); + stmt.cancelled = false; + await op.cancel(); + expect(stmt.cancelled).to.equal(false); + }); + + it('close() forwards to napi Statement', async () => { + const stmt = new FakeNativeStatement(); + const op = makeOperation(stmt); + await op.close(); + expect(stmt.closed).to.equal(true); + }); + + it('waitUntilReady() is a no-op (kernel internalises polling)', async () => { + const op = makeOperation(); + await op.waitUntilReady(); + }); + + // Note: after sea-integration merge, fetchChunk is no longer a stub — + // the sea-results SeaResultsProvider + ArrowResultConverter pipeline + // implements the real fetch path. Full coverage lives in + // tests/unit/sea/SeaOperationBackend.test.ts and the parity-gate e2e + // at tests/integration/sea/results-e2e.test.ts. +});