Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
"inputType": "text",
"inputPlaceholder": "host",
"inputTooltip": "Specify host IP address or DNS name of Teradata server",
"regex": "([^\\s])"
"validation": {
"regex": "([^\\s])"
}
},
{
"inputLabel": "Port",
Expand Down
87 changes: 73 additions & 14 deletions reverse_engineering/helpers/connectionHelper.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ const SYSTEM_DATABASES = [
'TD_SYSXML',
'TDPUSER',
];
const SYSTEM_UDT = [

const SYSTEM_UDT = new Set([
'ArrayVec',
'InternalPeriodDateType',
'InternalPeriodTimeStampType',
Expand All @@ -57,12 +58,15 @@ const SYSTEM_UDT = [
'TD_JSON_BSON',
'TD_JSON_UBJSON',
'XML',
];
]);

const MISSING_JAVA_PATH_MESSAGE =
'Path to JAVA binary file is incorrect. Please specify JAVA_HOME variable in your system or put specific path to JAVA binary file in connection settings.';

let connection;
let useSshTunnel;
let abortController;
let activeQueries = new Set();

const isWindows = () => os.platform() === 'win32';

Expand Down Expand Up @@ -172,16 +176,42 @@ const createConnection = async (connectionInfo, sshService, logger) => {
const teradataClientCommandArguments = buildCommand(teradataClientPath, connectionSettings);

return {
execute: query => {
execute: (query, signal) => {
return new Promise((resolve, reject) => {
if (signal?.aborted) {
return reject(new Error('Query execution was aborted'));
}

const queryArgument = createArgument('query', query);
const javaArgs = [...teradataClientCommandArguments, queryArgument];

const queryResult = spawn(`"${javaPath}"`, javaArgs, {
shell: true,
});

activeQueries.add(queryResult);

const abortHandler = () => {
if (!queryResult?.killed) {
queryResult.kill('SIGTERM');
activeQueries.delete(queryResult);
}
reject(new Error('Query execution was aborted'));
};

if (signal) {
signal.addEventListener('abort', abortHandler);
}

const cleanup = () => {
activeQueries.delete(queryResult);
if (signal) {
signal.removeEventListener('abort', abortHandler);
}
};

queryResult.on('error', error => {
cleanup();
reject(new Error(error));
});

Expand All @@ -196,6 +226,12 @@ const createConnection = async (connectionInfo, sshService, logger) => {
});

queryResult.on('close', code => {
cleanup();

if (signal?.aborted) {
return;
}

if (code !== 0) {
reject(new Error(Buffer.concat(errorData).toString()));
return;
Expand Down Expand Up @@ -225,6 +261,7 @@ const createConnection = async (connectionInfo, sshService, logger) => {
});
});
},
getAbortController: () => abortController,
};
};

Expand All @@ -233,6 +270,9 @@ const connect = async (connectionInfo, sshService, logger) => {
return connection;
}

abortController = new AbortController();
activeQueries.clear();

useSshTunnel = connectionInfo.useSshTunnel;
connection = await createConnection(connectionInfo, sshService, logger);

Expand All @@ -246,12 +286,14 @@ const getConcatenatedQueryResult = (queryResult = []) => {
};

const createInstance = (connection, _) => {
const signal = connection.getAbortController()?.signal;

const getDatabasesWithTableNames = async tableType => {
const query = buildQuery(queryType.GET_DATABASE_AND_TABLE_NAMES, {
tableType,
systemDatabases: SYSTEM_DATABASES,
});
const queryResult = await connection.execute(query);
const queryResult = await connection.execute(query, signal);

return groupBy({
items: queryResult,
Expand All @@ -261,24 +303,27 @@ const createInstance = (connection, _) => {
};

const getCount = async (dbName, tableName) => {
const count = await connection.execute(buildQuery(queryType.COUNT_COLUMNS, { dbName, tableName }));
const count = await connection.execute(buildQuery(queryType.COUNT_COLUMNS, { dbName, tableName }), signal);

return Number(count[0]?.Quantity || 0);
};

const getRecords = async (dbName, tableName, limit) => {
return connection.execute(buildQuery(queryType.GET_RECORDS, { dbName, tableName, limit }));
return connection.execute(buildQuery(queryType.GET_RECORDS, { dbName, tableName, limit }), signal);
};

const getVersion = async () => {
const result = await connection.execute('SELECT * FROM dbc.dbcinfo');
const result = await connection.execute('SELECT * FROM dbc.dbcinfo', signal);
const versionInfo = result.find(info => info.InfoKey === 'VERSION');

return versionInfo?.InfoData;
};

const describeDatabase = async dbName => {
const databaseInfoResult = await connection.execute(buildQuery(queryType.DESCRIBE_DATABASE, { dbName }));
const databaseInfoResult = await connection.execute(
buildQuery(queryType.DESCRIBE_DATABASE, { dbName }),
signal,
);

if (!databaseInfoResult.length) {
return {};
Expand All @@ -303,14 +348,15 @@ const createInstance = (connection, _) => {
const showCreateEntity = async (dbName, tableName, entityType) => {
const result = await connection.execute(
buildQuery(queryType.SHOW_CREATE_ENTITY_STATEMENT, { dbName, tableName, entityType }),
signal,
);

return getConcatenatedQueryResult(result);
};

const getColumns = async (dbName, tableName) => {
const query = buildQuery(queryType.GET_COLUMNS, { dbName, tableName });
const result = await connection.execute(query);
const result = await connection.execute(query, signal);

return result.map(raw => ({
dbName: raw.DatabaseName,
Expand All @@ -322,7 +368,7 @@ const createInstance = (connection, _) => {

const getCreateIndexStatement = async index => {
const query = `SHOW ${index.indexType} INDEX "${index.dbName}"."${index.indxName}";`;
const createStatement = await connection.execute(query);
const createStatement = await connection.execute(query, signal);

return {
...index,
Expand All @@ -332,7 +378,7 @@ const createInstance = (connection, _) => {

const getIndexes = async dbName => {
const query = buildQuery(queryType.GET_INDEXES, { dbName });
const queryResult = await connection.execute(query);
const queryResult = await connection.execute(query, signal);

const indexes = _.uniqBy(queryResult, 'IndexName').map(index => ({
dbName: index.DatabaseName,
Expand All @@ -353,7 +399,7 @@ const createInstance = (connection, _) => {
const getCreateUdtStatement = async type => {
const name = type['Table/View/Macro Dictionary Name'];
const query = `SHOW TYPE "${name}";`;
const createStatement = await connection.execute(query);
const createStatement = await connection.execute(query, signal);

return {
name,
Expand All @@ -363,7 +409,7 @@ const createInstance = (connection, _) => {

const getUserDefinedTypes = async () => {
const query = 'HELP DATABASE SYSUDTLIB;';
const queryResult = await connection.execute(query);
const queryResult = await connection.execute(query, signal);

return queryResult
.filter(filterUdt)
Expand All @@ -390,6 +436,19 @@ const createInstance = (connection, _) => {
};

const close = async sshService => {
if (abortController) {
abortController.abort();
abortController = null;
}

for (const activeQuery of activeQueries) {
if (activeQuery && !activeQuery.killed) {
activeQuery.kill('SIGTERM');
}
}

activeQueries.clear();

if (connection) {
connection = null;
}
Expand Down Expand Up @@ -444,7 +503,7 @@ const getIndexType = index => {

const filterUdt = object => object.Kind === 'U';

const excludeSystemUdt = type => !SYSTEM_UDT.includes(type['Table/View/Macro Dictionary Name']);
const excludeSystemUdt = type => !SYSTEM_UDT.has(type['Table/View/Macro Dictionary Name']);

module.exports = {
connect,
Expand Down