Transactional to Business Intelligence
Introduction
All developers of Transactional(OLTP) systems have had to carry out certain processes for the feed of Business Intelligence(BI) databases. This type of process can be more or less complicated depending on the manager of the OLTP system and the manager of the business analysis system. Let me explain: Normally, the necessary information in both types of system is not usually the same, so, either the OLTP team or the business intelligence team, they must analyze, select and transfer the information available from one system to another. And depending on the managers, this task will be done by one or other equipment.
In the event that this task must be carried out by the OLTP team, normally, there is usually a additional requirement: efficiency. This is usually due to the fact that OLTP systems are usually very busy during the day with online work, and at night with data uploads and reporting varied. Thus, the time window given for this type of process is usually very small, so it is necessary to optimizing the entire transfer process as much as possible.
Here we will describe a work methodology using the Insert Bulk Bestia libraries and stored procedures in such a way that the analysis, selection and transmission of information from one system to another is minimized.
The code shown in this chapter is presented as an example of the use of the SQLServerAsyncExecProcedures library included in Insert Bulk Bestia. InforCustom is not responsible for the use of such code.
Problem Statement
You want to perform a daily process to send certain information from one OLTP system to another system for BI (both are on different hosts). Said information must be marked indicating for each record if it is new, has been modified or deleted since the last time the process was executed.
To do this, a series of repository tables will be created in both systems, and in BI a table will be created, "ACTUALIZACIONES", where each execution of the process will be recorded. BI will check this table and when a new record appears, it must process the transferred information.
Note: All created objects will be made in the "rca" scheme.
It is also based on the hypothesis that Insert Bulk Bestia libraries are installed in the OLTP and not in the BI system.
Repository in OLTP
In the database of the OLTP system, a repository of tables will be created with the information to be transmitted, and associated with each table there will be another table where a couple of HASHs of the information will be stored. These two HASHs will facilitate and speed up the process of comparing the information with respect to whether it were done with queries that compared field to field, together with the queries necessary to find out which are the new and deleted records.
In addition to the repository, a Sequence used to name the successive executions and a HASH calculation function will be included in the code for those versions of SQL Server (2014 and lower) whose HASHBYTES function is limited to 4000 characters (NVARCHAR) . Let's take a look at it in code (for brevity, only a couple of example tables will be included):
IF EXISTS(select NULL from sys.sequences where object_id = object_id('rca.SEQ_PARALELO'))
DROP SEQUENCE rca.SEQ_PARALELO
GO
CREATE SEQUENCE rca.SEQ_PARALELO
AS [bigint]
START WITH 1
INCREMENT BY 1
MINVALUE 1
MAXVALUE 9223372036854775807
CACHE 10
GO
--SQL Server 2014 y versiones anteriores:
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.fnHashBytes') IS NOT NULL
DROP FUNCTION rca.fnHashBytes
GO
CREATE FUNCTION rca.fnHashBytes (
@EncryptAlgorithm NVARCHAR(50),
@DataToEncrypt nvarchar(max))
RETURNS VARBINARY(MAX) WITH SCHEMABINDING
AS BEGIN
DECLARE @Index INT = 1, @DataToEncryptLength INT
DECLARE @EncryptedResult varbinary(max) = NULL
IF @DataToEncrypt IS NOT NULL
BEGIN
SELECT @EncryptedResult = 0x, @DataToEncryptLength = DATALENGTH(@DataToEncrypt)
WHILE @Index <= @DataToEncryptLength
BEGIN
SELECT @EncryptedResult = @EncryptedResult + HASHBYTES(@EncryptAlgorithm,
SUBSTRING(@DataToEncrypt, @Index, 4000)), @Index = @Index + 4000
END
END
RETURN @EncryptedResult
END
GO
As can be seen, the "rca" schema is used to define the objects used for the repository.
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.Tabla1') IS NOT NULL
DROP TABLE rca.Tabla1
GO
CREATE TABLE rca.Tabla1(
Campo1 varchar(16) NOT NULL,
Campo2 smallint NOT NULL,
Campo3 varchar(55) NOT NULL,
ACT_CD_TIPO smallint NOT NULL,
CONSTRAINT Tabla1_PK PRIMARY KEY CLUSTERED
(
Campo1 ASC,
Campo2 ASC,
ACT_CD_TIPO ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF,
IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.Tabla1_HASH') IS NOT NULL
DROP TABLE rca.Tabla1_HASH
GO
CREATE TABLE rca.Tabla1_HASH (
Campo1 varchar(16) NOT NULL,
Campo2 smallint NOT NULL,
HASH_OLD varbinary(max) NULL,
HASH_NEW varbinary(max) NULL,
CONSTRAINT Tabla1_HASH_PK PRIMARY KEY CLUSTERED
(
Campo1 ASC,
Campo2 ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF,
IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.Tabla2') IS NOT NULL
DROP TABLE rca.Tabla2
GO
CREATE TABLE rca.Tabla2(
Campo1 varchar(16) NOT NULL,
Campo2 smallint NOT NULL,
Campo3 varchar(55) NOT NULL,
ACT_CD_TIPO smallint NOT NULL,
CONSTRAINT Tabla2_PK PRIMARY KEY CLUSTERED
(
Campo1 ASC,
ACT_CD_TIPO ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF,
IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.Tabla2_HASH') IS NOT NULL
DROP TABLE rca.Tabla2_HASH
GO
CREATE TABLE rca.Tabla2_HASH (
Campo1 varchar(16) NOT NULL,
HASH_OLD varbinary(max) NULL,
HASH_NEW varbinary(max) NULL,
CONSTRAINT Tabla2_HASH_PK PRIMARY KEY CLUSTERED
(
Campo1 ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF,
IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO
It is assumed that both "Table1" and "Table2" only contain the fields to be transferred. The information will come from the tables "Table1Org" and "Table2Org" respectively, which will contain much more information, but which is not necessary for BI. "Table1" has a primary key that is made up of two fields and "Table2" has a single field.
Both tables contain the ACT_CD_TIPO field, which is part of the primary key. During information processing, records will be duplicated, and this field will be used to distinguish new records from old ones.
The HASH_OLD field will contain the HASH of the record of the last execution of the process, while the and HASH_NEW field will contain the HASH of the record of the current execution. If HASH_OLD is NULL, it means that the record is new, if HAS_NEW is NULL, it means that the record has been removed from the original table. And if both fields are not NULL and do not match, it means that the record has been modified.
Stored Procedures in OLTP
In the OLTP, a series of stored procedures will be created to update the repository tables that have been created.
It should be noted that no Foreing Key has been created in the repository tables. However, if there were any, before updating the tables they would have to be removed. The update of the repository tables will be done in parallel, so there will be no pre-established order. Thus, first of all, two stored procedures will be created to eliminate and recreate these Foreing Keys. Here is an approach to these stored procedures:
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.EliminaFkLocales') IS NOT NULL
DROP PROCEDURE rca.EliminaFkLocales
GO
CREATE PROCEDURE rca.EliminaFkLocales
@NombreParalelo NVARCHAR(256),
@error INT OUTPUT
AS BEGIN SET NOCOUNT ON
SET XACT_ABORT ON
--Variables para LOG
DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME =
DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID),
@procedimiento SYSNAME = OBJECT_NAME(@@PROCID)
SET @error = 0
BEGIN TRY
--Eliminar todas las FKs de las tablas del
repositorio para su carga
IF EXISTS(SELECT NULL FROM sys.foreign_keys WHERE
name = 'Tabla1_Tabla2_FK')
ALTER TABLE rca.Tabla1 DROP CONSTRAINT
Tabla1_Tabla2_FK
EXECUTE ibb.Log_Errores_Inserta N'Elimnadas Foreing
Keys Locales.',
0, @NombreParalelo, @inicio, @nameDB,
@esquema, @procedimiento
END TRY
BEGIN CATCH
DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(),
@ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) =
ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(),
@ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT =
ERROR_STATE()
EXECUTE ibb.Log_Errores_Inserta N'ERROR eliminando
las Foreing Keys Locales.',
10, @NombreParalelo, NULL, @nameDB,
@esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA,
@ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD,
@ERR_SQL_ESTADO
SET @error = @ERR_SQL_NUM_ERROR
END CATCH
END
GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.CreaFkLocales') IS NOT NULL
DROP PROCEDURE rca.CreaFkLocales
GO
CREATE PROCEDURE rca.CreaFkLocales
@NombreParalelo NVARCHAR(256),
@error INT OUTPUT
AS BEGIN SET NOCOUNT ON
SET XACT_ABORT ON
--Variables para LOG
DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME =
DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID),
@procedimiento SYSNAME = OBJECT_NAME(@@PROCID)
SET @error = 0
BEGIN TRY
--Crear todas las FKs necesarias
--En nuestro ejemplo, no hay FKs entre tablas del
repositorio
--IF NOT EXISTS(SELECT NULL FROM sys.foreign_keys
WHERE name = 'Tabla1_Tabla2_FK')
-- ALTER TABLE rca.Tabla1 ADD CONSTRAINT
Tabla1_Tabla2_FK FOREIGN KEY(Campo1)
-- REFERENCES rca.Tabla2 (Campo1)
EXECUTE ibb.Log_Errores_Inserta N'Creadas Foreing
Keys Locales.',
0, @NombreParalelo, @inicio, @nameDB,
@esquema, @procedimiento
END TRY
BEGIN CATCH
DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(),
@ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) =
ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(),
@ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT =
ERROR_STATE()
EXECUTE ibb.Log_Errores_Inserta N'ERROR creando las
Foreing Keys Locales.',
10, @NombreParalelo, NULL, @nameDB,
@esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA,
@ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD,
@ERR_SQL_ESTADO
SET @error = @ERR_SQL_NUM_ERROR
END CATCH
END
GO
This is followed by the stored procedures responsible for updating the repository tables. The ones shown here are in charge of updating the contents of the tables "rca.Tabla1", "rca.Tabla1_HASH", "rca.Tabla2" and "rca.Tabla2_HASH". The code comments are descriptive enough.
These stored procedures will run in parallel. Therefore, the following instructions are important:
- It is desired that, if an error occurs in them, it does not continue the execution (SET XACT_ABORT ON).
- The goal of parallel execution is to make the most of all available CPUs. Therefore, you do not want Querys to be executed in parallel (OPTION (MAXDOP 1)). MERGE is usually a candidate statement to be executed in parallel, so it is very important to tell it not to do it.
Algorithm:
- Update ACT_CD_TIPO field to 2 for all records in repository table (Tabla1).
-
Insert all the records from the original OLTP table (Table1Org) in the repository table (Table1).
The ACT_CD_TIPO field is set to 0 to indicate that they are the inserted records.
This Query is where the information from the OLTP is selected and transformed appropriately to meet BI needs. -
Delete old records:
- Table1 contains the records of the previous execution marked with ACT_CD_TIPO at 2 and ALL the OLTP records marked with ACT_CD_TIPO at 0.
- The DELETE will delete the records that already existed in Table1 and that continue to exist in the OLTP. For this, the key from Table1 is used.
- In this way, the records with ACT_CD_TIPO = 2 will be the records that have disappeared from the OLTP, and the records with ACT_CD_TIPO = 0 will contain the existing information in the OLTP.
-
Table1_HASH update:
- The HASH of each record is calculated and compared with the HASH contained in the HASH_NEW field.
- If the record exists in both tables (Table1 and Table1_HASH), but the calculated HASH does not match the HASH_NEW, the HASH_NEW field is updated with the calculated HASH. Those will be the records whose information has been modified in the OLTP. For these records, HASH_OLD and HASH_NEW will not match. We will see why later.
- If the record does not exist in Table1_HASH, it is inserted into that table. It will be the new records. These records will have the HAS_OLD field set to NULL.
- If the record does not exist in Table1, in Table1_HASH the HASH_NEW field is updated to NULL. These records will be deleted and the HASH_OLD field will not be NULL.
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.RellenaTabla1') IS NOT NULL
DROP PROCEDURE rca.RellenaTabla1
GO
CREATE PROCEDURE rca.RellenaTabla1
@NombreParalelo NVARCHAR(256),
@error INT OUTPUT
AS BEGIN SET NOCOUNT ON
--Este procedimiento almacenado se ejecutará en paralelo, por lo que se
desea que si se produce un error no coninue con la ejecución.
SET XACT_ABORT ON
--Variables para LOG
DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME =
DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID),
@procedimiento SYSNAME = OBJECT_NAME(@@PROCID)
SET @error = 0
BEGIN TRY
--Marcar todos los registros como 2
UPDATE rca.Tabla1
SET ACT_CD_TIPO = 2
OPTION(MAXDOP 1) --Como se ejecutará en
paralelo, no hemos de bloquear las CPUs
--Insertar Todos los registros procedentes de la
Tabla Original marcados como 0
INSERT INTO rca.Tabla1
SELECT Tb1Org.Campo1, Tb1Org.Campo2,
Tb1Org.Campo3, 0 AS ACT_CD_TIPO
FROM dbo.Tabla1Org Tb1Org
OPTION(MAXDOP 1) --Como se ejecutará en
paralelo, no hemos de bloquear las CPUs
--Eliminar los registros viejos:
-- Si registro1 existe en Tabla1 y TablaOrg1,
después del INSERT anterior Tabla1
-- contiene: registro1-2 (del UPDATE) y
registro1-0 (del INSERT). Con este DELETE
-- se eliminará registro1-2, quedando el nuevo
registro.
DELETE Tb1Del
FROM rca.Tabla1 Tb1Del
INNER JOIN rca.Tabla1 Tb1Ins
ON Tb1Del.Campo1
= Tb1Ins.Campo1 AND Tb1Del.Campo2 = Tb1Ins.Campo2
WHERE Tb1Del.ACT_CD_TIPO = 2 AND
Tb1Ins.ACT_CD_TIPO = 0
OPTION(MAXDOP 1) --Como se ejecutará en
paralelo, no hemos de bloquear las CPUs
--Actualización de Tabla1_HASH
-- Se calcula el HASH de cada registro y se
compara con el campo HASH_NEW
-- Si el registro existe en ambas
tablas, pero el HASH no coincide, se actualiza el campo HASH_NEW
-- Si el registro no existe
Tabla1_HASH, se inserta
-- Si el registro no existe Tabla1,
se anula el campo HAS_NEW
MERGE rca.Tabla1_HASH TablaHash USING
(SELECT
MBT.Campo1, MBT.Campo2,
--SQL Server 2014 y versiones anteriores:
-- Aquí se está utilizando la función de SQL Server HASHBYTES,
porque se supone que
-- la cadena de caracteres que se va a formar no superará los 4000
caracteres. Si se
-- superase dicha longitud se debería usar la función
rca.fnHashBytes
HASHBYTES('SHA2_512', (select MBT.* from (values(null))foo(bar) for xml
auto)) as HASH_NEW
FROM
rca.Tabla1 AS MBT
WHERE
MBT.ACT_CD_TIPO = 0
) AS Hases_Nuevos
ON
TablaHash.Campo1 = Hases_Nuevos.Campo1 AND TablaHash.Campo2 =
Hases_Nuevos.Campo2
WHEN MATCHED AND TablaHash.HASH_NEW
<> Hases_Nuevos.HASH_NEW THEN --Modificados
UPDATE SET TablaHash.HASH_NEW
= Hases_Nuevos.HASH_NEW
WHEN NOT MATCHED BY TARGET THEN --Nuevos
INSERT(Campo1, Campo2,
HASH_NEW)
VALUES(Hases_Nuevos.Campo1,
Hases_Nuevos.Campo2, Hases_Nuevos.HASH_NEW)
WHEN NOT MATCHED BY SOURCE THEN
--Borrados
UPDATE SET HASH_NEW = NULL
OPTION(MAXDOP 1); --Como se ejecutará en
paralelo, no hemos de bloquear las CPUs
--MERGE es una instrucción candidata a
ser paralelizada por SQL Server. Como ya se hacen muchas cosas
--en paralelo, se le indica que no la
paralelice para no bloquear las CPUs
--Puede ser conveniente actualizar los índices y las
estadísticas por si hay muchas modificaciones
ALTER INDEX ALL ON rca.Tabla1_HASH REBUILD WITH
(MAXDOP = 1, SORT_IN_TEMPDB = ON, ONLINE = OFF, STATISTICS_NORECOMPUTE =
OFF)
UPDATE STATISTICS rca.Tabla1_HASH WITH COLUMNS
ALTER INDEX ALL ON rca.Tabla1 REBUILD WITH (MAXDOP =
1, SORT_IN_TEMPDB = ON, ONLINE = OFF, STATISTICS_NORECOMPUTE = OFF)
UPDATE STATISTICS rca.Tabla1 WITH COLUMNS
EXECUTE ibb.Log_Errores_Inserta N'Se ha actualizado
Tabla1 y Tabla1_HASH.',
0, @NombreParalelo, @inicio, @nameDB,
@esquema, @procedimiento
END TRY
BEGIN CATCH
DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(),
@ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) =
ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(),
@ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT =
ERROR_STATE()
EXECUTE ibb.Log_Errores_Inserta N'ERROR actualizando
Tabla1 y Tabla1_HASH.',
10, @NombreParalelo, NULL, @nameDB,
@esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA,
@ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD,
@ERR_SQL_ESTADO
SET @error = @ERR_SQL_NUM_ERROR
END CATCH
END
GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.RellenaTabla2') IS NOT NULL
DROP PROCEDURE rca.RellenaTabla2
GO
CREATE PROCEDURE rca.RellenaTabla2
@NombreParalelo NVARCHAR(256),
@error INT OUTPUT
AS BEGIN SET NOCOUNT ON
--Este procedimiento almacenado se ejecutará en paralelo, por lo que se
desea que si se produce un error no coninue con la ejecución.
SET XACT_ABORT ON
--Variables para LOG
DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME =
DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID),
@procedimiento SYSNAME = OBJECT_NAME(@@PROCID)
SET @error = 0
BEGIN TRY
--Marcar todos los registros como 2
UPDATE rca.Tabla2
SET ACT_CD_TIPO = 2
OPTION(MAXDOP 1) --Como se ejecutará en
paralelo, no hemos de bloquear las CPUs
--Insertar Todos los registros procedentes de la
Tabla Original marcados como 0
INSERT INTO rca.Tabla2
SELECT Tb2Org.Campo1, Tb2Org.Campo2,
Tb2Org.Campo3, 0 AS ACT_CD_TIPO
FROM dbo.Tabla2Org Tb2Org
OPTION(MAXDOP 1) --Como se ejecutará en
paralelo, no hemos de bloquear las CPUs
--Eliminar los registros viejos:
-- Si registro1 existe en Tabla2 y Tabla2Org,
después del INSERT anterior Tabla1
-- contiene: registro1-2 (del UPDATE) y
registro1-0 (del INSERT). Con este DELETE
-- se eliminará registro1-2, quedando el nuevo
registro.
DELETE Tb2Del
FROM rca.Tabla2 Tb2Del
INNER JOIN rca.Tabla2 Tb2Ins
ON Tb2Del.Campo1
= Tb2Ins.Campo1
WHERE Tb2Del.ACT_CD_TIPO = 2 AND
Tb2Ins.ACT_CD_TIPO = 0
OPTION(MAXDOP 1) --Como se ejecutará en
paralelo, no hemos de bloquear las CPUs
--Actualización de Tabla2_HASH
-- Se calcula el HASH de cada registro y se
compara con el campo HASH_NEW
-- Si el registro existe en ambas
tablas, pero el HASH no coincide, se actualiza el campo HASH_NEW
-- Si el registro no existe
Tabla1_HASH, se inserta
-- Si el registro no existe Tabla1,
se anula el campo HAS_NEW
MERGE rca.Tabla2_HASH TablaHash USING
(SELECT
MBT.Campo1,
--SQL Server 2014 y versiones anteriores:
-- Aquí se está utilizando la función de SQL Server HASHBYTES,
porque se supone que
-- la cadena de caracteres que se va a formar no superará los 4000
caracteres. Si se
-- superase dicha longitud se debería usar la función
rca.fnHashBytes
HASHBYTES('SHA2_512', (select MBT.* from (values(null))foo(bar) for xml
auto)) as HASH_NEW
FROM
rca.Tabla2 AS MBT
WHERE
MBT.ACT_CD_TIPO = 0
) AS Hases_Nuevos
ON
TablaHash.Campo1 = Hases_Nuevos.Campo1
WHEN MATCHED AND TablaHash.HASH_NEW
<> Hases_Nuevos.HASH_NEW THEN --Modificados
UPDATE SET TablaHash.HASH_NEW
= Hases_Nuevos.HASH_NEW
WHEN NOT MATCHED BY TARGET THEN --Nuevos
INSERT(Campo1, HASH_NEW)
VALUES(Hases_Nuevos.Campo1,
Hases_Nuevos.HASH_NEW)
WHEN NOT MATCHED BY SOURCE THEN
--Borrados
UPDATE SET HASH_NEW = NULL
OPTION(MAXDOP 1); --Como se ejecutará en
paralelo, no hemos de bloquear las CPUs
--MERGE es una instrucción candidata a
ser paralelizada por SQL Server. Como ya se hacen muchas cosas
--en paralelo, se le indica que no la
paralelice para no bloquear las CPUs
--Puede ser conveniente actualizar los índices y las
estadísticas por si hay muchas modificaciones
ALTER INDEX ALL ON rca.Tabla2_HASH REBUILD WITH
(MAXDOP = 1, SORT_IN_TEMPDB = ON, ONLINE = OFF, STATISTICS_NORECOMPUTE =
OFF)
UPDATE STATISTICS rca.Tabla2_HASH WITH COLUMNS
ALTER INDEX ALL ON rca.Tabla2 REBUILD WITH (MAXDOP =
1, SORT_IN_TEMPDB = ON, ONLINE = OFF, STATISTICS_NORECOMPUTE = OFF)
UPDATE STATISTICS rca.Tabla2 WITH COLUMNS
EXECUTE ibb.Log_Errores_Inserta N'Se ha actualizado
Tabla2 y Tabla2_HASH.',
0, @NombreParalelo, @inicio, @nameDB,
@esquema, @procedimiento
END TRY
BEGIN CATCH
DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(),
@ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) =
ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(),
@ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT =
ERROR_STATE()
EXECUTE ibb.Log_Errores_Inserta N'ERROR actualizando
Tabla2 y Tabla2_HASH.',
10, @NombreParalelo, NULL, @nameDB,
@esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA,
@ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD,
@ERR_SQL_ESTADO
SET @error = @ERR_SQL_NUM_ERROR
END CATCH
END
GO
Repository in BI
Just as a series of tables have been created in the OLTP, their replicas will be created in BI according to the requirements outlined at the beginning.
The replicas of "Table1" and "Table2" are shown below. A new field has been added, ACT_CD_ACTUALIZACION, which corresponds to the key of the table rca.UPDATES. Both tables contain the ACT_CD_TIPO field that will indicate what has happened to the record since the last time the process was run. It will contain the values: 0-Insert, 1-Update, 2-Delete.
The rca.UPDATES table will contain a record for each time the data transfer process has been attempted. If the process was successful, the ACT_FH_FIN field will have a value, and if there have been errors, this field will contain a NULL.
As in OLTP, Foreing Keys have not been defined between the repository tables, however, the FKs that relate these tables to the rca table have been defined. UPDATES.
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.Tabla1') IS NOT NULL
DROP TABLE rca.Tabla1
GO
CREATE TABLE rca.Tabla1(
Campo1 varchar(16) NOT NULL,
Campo2 smallint NOT NULL,
Campo3 varchar(55) NOT NULL,
ACT_CD_TIPO smallint NOT NULL,
ACT_CD_ACTUALIZACION bigint NOT NULL,
CONSTRAINT Tabla1_PK PRIMARY KEY CLUSTERED
(
Campo1 ASC,
Campo2 ASC,
ACT_CD_ACTUALIZACION ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF,
IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON
[PRIMARY]
) ON [PRIMARY]
GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.Tabla2') IS NOT NULL
DROP TABLE rca.Tabla2
GO
CREATE TABLE rca.Tabla2(
Campo1 varchar(16) NOT NULL,
Campo2 smallint NOT NULL,
Campo3 varchar(55) NOT NULL,
ACT_CD_TIPO smallint NOT NULL,
ACT_CD_ACTUALIZACION bigint NOT NULL,
CONSTRAINT Tabla2_PK PRIMARY KEY CLUSTERED
(
Campo1 ASC,
ACT_CD_ACTUALIZACION ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF,
IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON
[PRIMARY]
) ON [PRIMARY]
GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.ACTUALIZACIONES') IS NOT NULL
DROP TABLE rca.ACTUALIZACIONES
GO
CREATE TABLE rca.ACTUALIZACIONES(
ACT_CD_ACTUALIZACION bigint NOT NULL,
ACT_FH_INICIO datetime2(0) NOT NULL,
ACT_FH_FIN datetime2(0) NULL,
ACT_DURACION nvarchar(50) NULL,
CONSTRAINT ACTUALIZACIONES_PK PRIMARY KEY CLUSTERED
(
ACT_CD_ACTUALIZACION ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF,
IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON
[PRIMARY]
) ON [PRIMARY]
GO
CREATE NONCLUSTERED INDEX RCA_ACTUALIZACIONES_FIN_NULO_IDX ON
rca.ACTUALIZACIONES
(
ACT_FH_FIN ASC
)
INCLUDE(ACT_CD_ACTUALIZACION)
WITH (PAD_INDEX = ON, STATISTICS_NORECOMPUTE = OFF, SORT_IN_TEMPDB =
OFF, DROP_EXISTING = OFF, ONLINE = OFF, ALLOW_ROW_LOCKS = ON,
ALLOW_PAGE_LOCKS = ON, FILLFACTOR = 100) ON [PRIMARY]
GO
ALTER TABLE rca.Tabla1 ADD CONSTRAINT ACTUALIZACIONES_Tabla1_FK FOREIGN
KEY(ACT_CD_ACTUALIZACION)
REFERENCES rca.ACTUALIZACIONES (ACT_CD_ACTUALIZACION) ON DELETE
CASCADE
GO
ALTER TABLE rca.Tabla2 ADD CONSTRAINT ACTUALIZACIONES_Tabla2_FK FOREIGN
KEY(ACT_CD_ACTUALIZACION)
REFERENCES rca.ACTUALIZACIONES (ACT_CD_ACTUALIZACION) ON DELETE
CASCADE
GO
Stored Procedures in BI
It will also be necessary to create certain stored procedures on the BI side:
- Cleaning of the repository tables and of the rca.UPDATES table of possible previous executions that did not finish correctly: (ACT_FH_FIN IS NULL, or ACT_CD_ACTUALIZACION does not exist in rca.UPDATES). It also inserts a new record for the current execution in rca.UPDATES.
- A stored procedure is also created to update the end date of the process, also indicating that the process has run successfully.
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.LimpiaTablasRemotas') IS NOT NULL
DROP PROCEDURE rca.LimpiaTablasRemotas
GO
CREATE PROCEDURE rca.LimpiaTablasRemotas
@fechaInicioProceso NVARCHAR(50),
@IdParalelo BIGINT,
@error INT OUTPUT
AS BEGIN SET NOCOUNT ON
SET @error = 0
BEGIN TRY
--Eliminar de Tabla1 y Tabla2 aquellos registros que
se hayan podido insertar de un proceso anterior que haya
--podido fallar (ACT_FH_FIN IS NULL o bien
ACT_CD_ACTUALIZACION no existe en rca.ACTUALIZACIONES)
DELETE rca.Tabla1
WHERE ACT_CD_ACTUALIZACION IN (SELECT
ACT_CD_ACTUALIZACION FROM rca.ACTUALIZACIONES WHERE ACT_FH_FIN IS NULL)
OR ACT_CD_ACTUALIZACION NOT
IN (SELECT ACT_CD_ACTUALIZACION FROM rca.ACTUALIZACIONES)
DELETE rca.Tabla2
WHERE ACT_CD_ACTUALIZACION IN (SELECT
ACT_CD_ACTUALIZACION FROM rca.ACTUALIZACIONES WHERE ACT_FH_FIN IS NULL)
OR ACT_CD_ACTUALIZACION NOT
IN (SELECT ACT_CD_ACTUALIZACION FROM rca.ACTUALIZACIONES)
--Hay un DELETE CASCADE, pero por tema de
prestaciones se realizan las querys anteriores.
DELETE rca.ACTUALIZACIONES
WHERE ACT_FH_FIN IS NULL
--Se crea el nuevo registro para la ejecución en
curso
INSERT INTO rca.ACTUALIZACIONES
(ACT_CD_ACTUALIZACION, ACT_FH_INICIO)
SELECT @IdParalelo,
TRY_CONVERT(datetime2(0), @fechaInicioProceso, 121)
--Por prestaciones, puede ser conveniente la
actualización de los índices y estadísticas.
ALTER INDEX ALL ON rca.ACTUALIZACIONES REBUILD WITH
(MAXDOP = 0, SORT_IN_TEMPDB = ON, ONLINE = OFF, STATISTICS_NORECOMPUTE =
OFF)
UPDATE STATISTICS rca.ACTUALIZACIONES WITH COLUMNS
END TRY
BEGIN CATCH
--Sería conveniente crear toda la estructura de
ibb.Log_Errores en el sistema de BI y dejar las trazas correspondientes
SET @error = 1
END CATCH
END
GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.UpdateFinActualizacion') IS NOT NULL
DROP PROCEDURE rca.UpdateFinActualizacion
GO
CREATE PROCEDURE rca.UpdateFinActualizacion
@IdParalelo bigint,
@Duracion nvarchar(50),
@fFin nvarchar(50),
@error INT OUTPUT
AS BEGIN SET NOCOUNT ON
SET REMOTE_PROC_TRANSACTIONS ON
SET XACT_ABORT ON
SET @error = 0
BEGIN TRY
UPDATE rca.ACTUALIZACIONES
SET ACT_FH_FIN =
TRY_CONVERT(datetime2(0), @fFin, 121), ACT_DURACION = @Duracion
WHERE ACT_CD_ACTUALIZACION = @IdParalelo
END TRY
BEGIN CATCH
--Sería conveniente crear toda la estructura de
ibb.Log_Errores en el sistema de BI y dejar las trazas correspondientes
SET @error = 1
END CATCH
END
GO
Second, the stored procedures to delete and create the Foreing Keys are created on the remote server. Even if FKs are not defined between the repository tables, FKs have been defined between these tables and the rca.ACTUALIZACIONES table.
A stored procedure has also been created for the regeneration of the indexes and statistics of the BI tables.
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.EliminaFkRemotas') IS NOT NULL
DROP PROCEDURE rca.EliminaFkRemotas
GO
CREATE PROCEDURE rca.EliminaFkRemotas
@error INT OUTPUT
AS BEGIN SET NOCOUNT ON
SET XACT_ABORT ON
SET @error = 0
BEGIN TRY
--Eliminar todas las FKs de las tablas del
repositorio para su carga
IF EXISTS(SELECT NULL FROM sys.foreign_keys WHERE
name = 'Tabla1_Tabla2_FK')
ALTER TABLE rca.Tabla1 DROP CONSTRAINT
Tabla1_Tabla2_FK
--Eliminar todas las FKs contra la tabla de
rca.ACTUALIZACIONES
IF EXISTS(SELECT NULL FROM sys.foreign_keys WHERE
name = 'ACTUALIZACIONES_Tabla1_FK')
ALTER TABLE rca.Tabla1 DROP CONSTRAINT
ACTUALIZACIONES_Tabla1_FK
IF EXISTS(SELECT NULL FROM sys.foreign_keys WHERE
name = 'ACTUALIZACIONES_Tabla2_FK')
ALTER TABLE rca.Tabla2 DROP CONSTRAINT
ACTUALIZACIONES_Tabla2_FK
END TRY
BEGIN CATCH
--Sería conveniente crear toda la estructura de
ibb.Log_Errores en el sistema de BI y dejar las trazas correspondientes
SET @error = 1
END CATCH
END
GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.CreaFkRemotas') IS NOT NULL
DROP PROCEDURE rca.CreaFkRemotas
GO
CREATE PROCEDURE rca.CreaFkRemotas
@error INT OUTPUT
AS BEGIN SET NOCOUNT ON
SET XACT_ABORT ON
SET @error = 0
BEGIN TRY
--Crear todas las FKs necesarias
--En el ejemplo no hay ninguna FKs entre las tablas
del repositorio
--IF NOT EXISTS(SELECT NULL FROM sys.foreign_keys
WHERE name = 'Tabla1_Tabla2_FK')
--ALTER TABLE rca.Tabla1 ADD CONSTRAINT
Tabla1_Tabla2_FK FOREIGN KEY(Campo1)
-- REFERENCES rca.Tabla2 (Campo1)
--Crear todas las FKs contra la tabla de
rca.ACTUALIZACIONES
IF NOT EXISTS(SELECT NULL FROM sys.foreign_keys WHERE
name = 'ACTUALIZACIONES_Tabla1_FK')
ALTER TABLE rca.Tabla1 ADD CONSTRAINT
ACTUALIZACIONES_Tabla1_FK FOREIGN KEY(ACT_CD_ACTUALIZACION)
REFERENCES rca.ACTUALIZACIONES
(ACT_CD_ACTUALIZACION) ON DELETE CASCADE
IF NOT EXISTS(SELECT NULL FROM sys.foreign_keys WHERE
name = 'ACTUALIZACIONES_Tabla2_FK')
ALTER TABLE rca.Tabla2 ADD CONSTRAINT
ACTUALIZACIONES_Tabla2_FK FOREIGN KEY(ACT_CD_ACTUALIZACION)
REFERENCES rca.ACTUALIZACIONES
(ACT_CD_ACTUALIZACION) ON DELETE CASCADE
END TRY
BEGIN CATCH
--Sería conveniente crear toda la estructura de
ibb.Log_Errores en el sistema de BI y dejar las trazas correspondientes
SET @error = 1
END CATCH
END
GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.RegeneraIndicesRemotos') IS NOT NULL
DROP PROCEDURE rca.RegeneraIndicesRemotos
GO
CREATE PROCEDURE rca.RegeneraIndicesRemotos
@error INT OUTPUT
AS BEGIN SET NOCOUNT ON
SET XACT_ABORT ON
SET @error = 0
BEGIN TRY
--Si hay muchas actualizaciones en el repositorio,
puede ser conveniente la actualización
--de los índices y las estadísticas
ALTER INDEX ALL ON rca.Tabla1 REBUILD WITH (MAXDOP =
0, SORT_IN_TEMPDB = ON, ONLINE = OFF, STATISTICS_NORECOMPUTE = OFF)
UPDATE STATISTICS rca.Tabla1 WITH COLUMNS
ALTER INDEX ALL ON rca.Tabla2 REBUILD WITH (MAXDOP =
0, SORT_IN_TEMPDB = ON, ONLINE = OFF, STATISTICS_NORECOMPUTE = OFF)
UPDATE STATISTICS rca.Tabla2 WITH COLUMNS
ALTER INDEX ALL ON rca.ACTUALIZACIONES REBUILD WITH
(MAXDOP = 0, SORT_IN_TEMPDB = ON, ONLINE = OFF, STATISTICS_NORECOMPUTE =
OFF)
UPDATE STATISTICS rca.ACTUALIZACIONES WITH COLUMNS
END TRY
BEGIN CATCH
--Sería conveniente crear toda la estructura de
ibb.Log_Errores en el sistema de BI y dejar las trazas correspondientes
SET @error = 1
END CATCH
END
GO
Stored Procedures for Transfer of Information
In the OLTP, the stored procedures for the transfer of information are created. It is assumed that you are using a Linked Server called RCA, and a destination database also called RCA. The target tables, as already mentioned, are in the rca schema.
Obviously, the use of a Linked Server is not the most appropriate for doing this kind of thing. However, if this is one of the requirements, one of the ways to speed up the process is to use the 'SQLServerAsyncExecProcedures' libraries so that the available resources are optimized to the maximum, and above all, the transmission of data through the network, as this is often the bottleneck.
It can be seen that with the algorithm used so far it is not necessary for the information to travel in both directions, that is, in the OLTP, the information to be sent to BI is known, and it is not necessary that the BI information must travel to OLTP to make comparisons. Obviously, this generates great efficiency, since the bottleneck is usually in the network.
Thus, the following are the stored procedures that pass the data for Table1 and Table2:
- If there are transactions, they should promote to distributed transactions: SET REMOTE_PROC_TRANSACTIONS ON.
- It is desired that, if an error occurs in them, the execution does not continue (SET XACT_ABORT ON).
- In BI, new records (HASH_OLD IS NULL), modified and deleted records (HASH_NEW! = HASH_OLD) must be inserted.
- The ACT_CD_TIPO field will contain the values: 0-Insert(HASH_OLD IS NULL), 1-Update(HASH_NEW != HASH_OLD), 2-Delete(HASH_NEW IS NULL).
- The ACT_CD_ACTUALIZACION field will contain the appropriate value as will be seen later.
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.Traspasa_Tabla1') IS NOT NULL
DROP PROCEDURE rca.Traspasa_Tabla1
GO
CREATE PROCEDURE rca.Traspasa_Tabla1
@idParalelo BIGINT,
@NombreParalelo NVARCHAR(256),
@error INT OUTPUT
AS BEGIN SET NOCOUNT ON
--Si hay transacciones, éstas deben poder promocionar
SET REMOTE_PROC_TRANSACTIONS ON
SET XACT_ABORT ON
--Variables para LOG
DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME =
DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID),
@procedimiento SYSNAME = OBJECT_NAME(@@PROCID)
SET @error = 0
BEGIN TRY
--Insert de los Nuevos (HASH_OLD Es nulo) y
Modificados (HASH_NEW != HASH_OLD)
INSERT INTO RCA.RCA.rca.Tabla1
SELECT tb1.Campo1, tb1.Campo2,
tb1.Campo3,
IIF(ExpH.HASH_OLD IS NULL, 0, IIF(ExpH.HASH_NEW IS NULL, 2, 1)),
@idParalelo
FROM rca.Tabla1 tb1
INNER JOIN
rca.Tabla1_HASH ExpH
ON
ExpH.Campo1 = tb1.Campo1 AND ExpH.Campo2 = tb1.Campo2
WHERE ExpH.HASH_OLD IS NULL
OR ExpH.HASH_NEW != ExpH.HASH_OLD
EXECUTE ibb.Log_Errores_Inserta N'Se ha trasvasado
Tabla1.',
0, @NombreParalelo, @inicio, @nameDB,
@esquema, @procedimiento
END TRY
BEGIN CATCH
DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(),
@ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) =
ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(),
@ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT =
ERROR_STATE()
EXECUTE ibb.Log_Errores_Inserta N'ERROR trasvasando
Tabla1.',
10, @NombreParalelo, NULL, @nameDB,
@esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA,
@ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD,
@ERR_SQL_ESTADO
SET @error = @ERR_SQL_NUM_ERROR
END CATCH
END
GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.Traspasa_Tabla2') IS NOT NULL
DROP PROCEDURE rca.Traspasa_Tabla2
GO
CREATE PROCEDURE rca.Traspasa_Tabla2
@idParalelo BIGINT,
@NombreParalelo NVARCHAR(256),
@error INT OUTPUT
AS BEGIN SET NOCOUNT ON
--Si hay transacciones, éstas deben poder promocionar
SET REMOTE_PROC_TRANSACTIONS ON
SET XACT_ABORT ON
--Variables para LOG
DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME =
DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID),
@procedimiento SYSNAME = OBJECT_NAME(@@PROCID)
SET @error = 0
BEGIN TRY
--Insert de los Nuevos (HASH_OLD Es nulo) y
Modificados (HASH_NEW != HASH_OLD)
INSERT INTO RCA.RCA.rca.Tabla2
SELECT tb1.Campo1, tb1.Campo2,
tb1.Campo3,
IIF(ExpH.HASH_OLD IS NULL, 0, IIF(ExpH.HASH_NEW IS NULL, 2, 1)),
@idParalelo
FROM rca.Tabla2 tb1
INNER JOIN
rca.Tabla2_HASH ExpH
ON
ExpH.Campo1 = tb1.Campo1
WHERE ExpH.HASH_OLD IS NULL
OR ExpH.HASH_NEW != ExpH.HASH_OLD
EXECUTE ibb.Log_Errores_Inserta N'Se ha trasvasado
Tabla2.',
0, @NombreParalelo, @inicio, @nameDB,
@esquema, @procedimiento
END TRY
BEGIN CATCH
DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(),
@ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) =
ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(),
@ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT =
ERROR_STATE()
EXECUTE ibb.Log_Errores_Inserta N'ERROR trasvasando
Tabla2.',
10, @NombreParalelo, NULL, @nameDB,
@esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA,
@ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD,
@ERR_SQL_ESTADO
SET @error = @ERR_SQL_NUM_ERROR
END CATCH
END
GO
Stored Procedures for COMMIT
Until now, all stored procedures do not need transactions, since in case of error the algorithm recovers from errors. For example, if it fails to update the OLTP tables or the generation of the HASH_NEWs, in the next execution of the process it will recover from these errors. If the transmission of the tables from OLTP to BI fails, the stored procedure 'RCA.RCA.rca.LimpiaTablasRemotas' will retrieve the remote tables.
However, the stored procedures described here are the ones that actually COMMIT the process, that is, they are stored procedures that need to be done within a transaction, since in case of failure, the process will not be able to recover. Transactions can be of any isolation level, since each stored procedure does not need to "see" the data for the other stored procedures.
The stored procedures 'rca.UpdateHash_Tabla1' and 'rca.UpdateHash_Tabla2' prepare the Table1_HASH and Table2_HASH tables for the next process execution:
- They remove from Table1_HASH the records that have disappeared in the OLTP (HASH_NEW IS NULL).
- They remove from Table1 those records that do not exist in Table1_HASH.
- Updates the HASH_OLD field in Table1_HASH with the content of HASH_NEW.
In this critical step, the remote procedure 'RCA.RCA.rca.UpdateFinActualizacion' must also be called, which updates the field 'ACT_FH_FIN' indicating that the process has been executed correctly. If this method fails, the process will also not be recoverable on the next run. In this other case, the transaction that is associated with this stored procedure will be distributed, so MS DTC will have to be enabled on both servers.
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.ActualizaHash_Tabla1') IS NOT NULL
DROP PROCEDURE rca.ActualizaHash_Tabla1
GO
CREATE PROCEDURE rca.ActualizaHash_Tabla1
@NombreParalelo NVARCHAR(256),
@error INT OUTPUT
AS BEGIN SET NOCOUNT ON
SET XACT_ABORT ON
DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME =
DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID),
@procedimiento SYSNAME = OBJECT_NAME(@@PROCID)
SET @error = 0
BEGIN TRY
--Eliminar de Tabla1_HASH aquellos registros cuyo
HAS_NEW es nulo (eliminados)
DELETE rca.Tabla1_HASH
WHERE HASH_NEW IS NULL
--Eliminar de Tabla1 aquellos registros que no
existan en Tabla1_HASH
DELETE Tbl
FROM rca.Tabla1 Tbl
LEFT JOIN rca.Tabla1_HASH
TblHs
ON Tbl.Campo1 =
TblHs.Campo1 AND Tbl.Campo2 = TblHs.Campo2
WHERE TblHs.Campo1 IS NULL AND
TblHs.Campo2 IS NULL
--Actualizar los HASH_OLD con los HAS_NEW
UPDATE rca.Tabla1_HASH
SET HASH_OLD = HASH_NEW
EXECUTE ibb.Log_Errores_Inserta N'Se ha actualizado
Tabla1_HASH.',
0, @NombreParalelo, @inicio, @nameDB,
@esquema, @procedimiento
END TRY
BEGIN CATCH
DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(),
@ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) =
ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(),
@ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT =
ERROR_STATE()
EXECUTE ibb.Log_Errores_Inserta N'ERROR actualizando
Tabla1_HASH.',
10, @NombreParalelo, NULL, @nameDB,
@esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA,
@ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD,
@ERR_SQL_ESTADO
SET @error = @ERR_SQL_NUM_ERROR
END CATCH
END
GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.ActualizaHash_Tabla2') IS NOT NULL
DROP PROCEDURE rca.ActualizaHash_Tabla2
GO
CREATE PROCEDURE rca.ActualizaHash_Tabla2
@NombreParalelo NVARCHAR(256),
@error INT OUTPUT
AS BEGIN SET NOCOUNT ON
SET XACT_ABORT ON
DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME =
DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID),
@procedimiento SYSNAME = OBJECT_NAME(@@PROCID)
SET @error = 0
BEGIN TRY
--Eliminar de Tabla1_HASH aquellos registros cuyo
HAS_NEW es nulo (eliminados)
DELETE rca.Tabla2_HASH
WHERE HASH_NEW IS NULL
--Eliminar de Tabla1 aquellos registros que no
existan en Tabla1_HASH
DELETE Tbl
FROM rca.Tabla2 Tbl
LEFT JOIN rca.Tabla2_HASH
TblHs
ON Tbl.Campo1 =
TblHs.Campo1
WHERE TblHs.Campo1 IS NULL
--Actualizar los HASH_OLD con los HAS_NEW
UPDATE rca.Tabla2_HASH
SET HASH_OLD = HASH_NEW
EXECUTE ibb.Log_Errores_Inserta N'Se ha actualizado
Tabla2_HASH.',
0, @NombreParalelo, @inicio, @nameDB,
@esquema, @procedimiento
END TRY
BEGIN CATCH
DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(),
@ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) =
ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(),
@ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT =
ERROR_STATE()
EXECUTE ibb.Log_Errores_Inserta N'ERROR actualizando
Tabla2_HASH.',
10, @NombreParalelo, NULL, @nameDB,
@esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA,
@ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD,
@ERR_SQL_ESTADO
SET @error = @ERR_SQL_NUM_ERROR
END CATCH
END
GO
For this critical point there are two possibilities:
- The first is to execute in parallel each of these stored procedures with independent transactions. If all procedures run without errors, all transactions will be committed. If any fail, a rollback will be made. It should be noted that the transaction required to execute 'RCA.RCA.rca.UpdateFinActualizacion' must be a distributed transaction, so MS DTC needs to be enabled on both servers. Experience says that this possibility has a high probability that transactions fail, not stored procedures, but the transactions themselves.
- The second is to create a new stored procedure that calls all of these update HASH stored procedures, along with the stored procedure 'RCA.RCA.rca.UpdateFinUpdate' and put it in a transaction. The transaction is obviously going to promote to a distributed transaction, so you need to have MS DTC enabled on both servers.
Since a demonstration example is being presented here, the code for both solutions will be shown. Based on trial and error tests, you will decide on one or the other.
If the second possibility is chosen, the following stored procedure will have to be created:
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.Actualiza_Hash_Todas_Tablas_Secuencial') IS NOT NULL
DROP PROCEDURE rca.Actualiza_Hash_Todas_Tablas_Secuencial
GO
CREATE PROCEDURE rca.Actualiza_Hash_Todas_Tablas_Secuencial
@NombreParalelo NVARCHAR(256),
@IdParalelo bigint,
@inicioProceso DATETIME2(7),
@error INT OUTPUT
AS BEGIN SET NOCOUNT ON
SET REMOTE_PROC_TRANSACTIONS ON
SET XACT_ABORT ON
DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME =
DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID),
@procedimiento SYSNAME = OBJECT_NAME(@@PROCID)
DECLARE @Duracion nvarchar(50), @fFin DATETIME2(7)
SET @error = 0
BEGIN TRY
EXEC rca.ActualizaHash_Tabla1 @NombreParalelo, @error
out
IF @error = 0
EXEC rca.ActualizaHash_Tabla2
@NombreParalelo, @error out
--Y así con el resto de tablas
--Por último, actualizar la fecha de finalización del
proceso
IF @error = 0
BEGIN
SELECT @fFin = SYSDATETIME(), @Duracion =
ibb.TimeSpanToNow(@inicioProceso)
EXEC RCA.RCA.rca.UpdateFinActualizacion
@IdParalelo, @Duracion, @fFin, @error out
END
IF @error = 0
EXECUTE ibb.Log_Errores_Inserta N'Se ha
actualizado el HASH de todas las Tablas.',
0, @NombreParalelo, @inicio,
@nameDB, @esquema, @procedimiento
ELSE
EXECUTE ibb.Log_Errores_Inserta N'ERROR
actualizando el HASH de todas las Tablas.',
10, @NombreParalelo, @inicio,
@nameDB, @esquema, @procedimiento
END TRY
BEGIN CATCH
DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(),
@ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) =
ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(),
@ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT =
ERROR_STATE()
EXECUTE ibb.Log_Errores_Inserta N'ERROR actualizando
el HASH de todas las Tablas.',
10, @NombreParalelo, NULL, @nameDB,
@esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA,
@ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD,
@ERR_SQL_ESTADO
SET @error = @ERR_SQL_NUM_ERROR
END CATCH
END
GO
Parallel Creation and Execution
Finally, the stored procedure that creates the 'Parallel' and executes it is detailed.
- If there are transactions, they should promote to distributed transactions: SET REMOTE_PROC_TRANSACTIONS ON.
- It is desired that, if an error occurs in them, it does not continue the execution (SET XACT_ABORT ON).
- If there are open transactions in the call to this remote procedure, the execution is aborted.
- In order to identify each execution by a different name, this name will be created from a sequence. This value will also be used to identify a new execution of the process in BI (ACT_CD_ACTUALIZACION).
-
The Parallel will be made up of 3 Lots of Works:
-
1º Updating of the OLTP repository tables. No transactions.
Initial jobs executed sequentially:- Create the new record in rca.UPDATES and delete data from past failed runs.
- If there are FKs in the OLTP repository: Delete them.
- Delete the FKs from the BI repository.
-
Update the tables of the OLTP repository in parallel.
Note: the regeneration of indexes and statistics is included in the stored procedures, if not, a stored procedure would have to be made for it and executed as Final jobs.
- If there are FKs in the OLTP repository: Create them
-
2º Transfer of Information from the OLTP repository to the BI repository. No transactions.
Jobs executed in parallel:- Transfer the information from the OLTP repository to BI in parallel.
- Create the FKs in BI.
- Regenerate the indexes in BI.
-
3º Update HAHS_OLD by HASH_NEW and put the end of process indicator. With transactions.
FIRST OPTION: Execute in parallel and with transactions the update of the HASH and the End Date
Jobs executed in parallel:- Update the HASHs of the tables in the OLTP repository.
- Put the end date in the BI repository.
- Three batches of jobs have been created, and after experimentation several times the optimal degrees of parallelism are obtained. The code indicates that for the first batch the degree of parallelism is 6 (6 simultaneous processes), for the second 20 (transmission of information) and for the third 15 (if the FIRST OPTION is chosen)
- It is indicated that the transactions to be created will be of type 'ReadCommitted' and that the maximum time that a transaction will last open will be 5 minutes. This value should be such that it is the total duration of the 3rd lot, since it is the only one that has transactions.
SECOND OPTION: Execute the stored procedure "rca.Update_Hash_All_Tables_Sequential" within a transaction
Execution options for parallel:- Two batches of jobs have been created, and after experimentation several times the optimal degrees of parallelism are obtained. The code indicates that for the first batch the degree of parallelism is 6 (6 simultaneous processes), and for the second, 20 (transmission of information). There is no third batch.
- It is indicated that the transactions to be created will be of type 'ReadCommitted' and that the maximum time that a transaction will last open will be 5 minutes. However, this setting has no effect, since the third batch of jobs has not been created and the rest of the batches have no transactions.
If the parallel was successful:- Open a transaction
- Execute the stored procedure created expressly for the SECOND OPTION.
- If it has been executed successfully, end the transaction, otherwise, undo it.
-
1º Updating of the OLTP repository tables. No transactions.
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.RellenaTablasParalelo') IS NOT NULL
DROP PROCEDURE rca.RellenaTablasParalelo
GO
CREATE PROCEDURE rca.RellenaTablasParalelo
@error INT OUTPUT
AS BEGIN SET NOCOUNT ON
--SET IMPLICIT_TRANSACTIONS OFF
SET REMOTE_PROC_TRANSACTIONS ON
SET XACT_ABORT ON
DECLARE @NumTranAntes INT = @@TRANCOUNT
DECLARE @idParalelo bigint, @NombreParalelo varchar(256) = NULL,
@Parametros NVARCHAR(MAX) = NULL, @Trabajo NVARCHAR(MAX) = NULL
DECLARE @Duracion nvarchar(50), @fFin DATETIME2(7)
--Variables para LOG
DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME =
DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID),
@procedimiento SYSNAME = OBJECT_NAME(@@PROCID)
SET @error = 0
BEGIN TRY
--Si hay transacciones abiertas, el sistema no
funcionará
IF @NumTranAntes > 0
BEGIN
SET @error = 1
EXECUTE ibb.Log_Errores_Inserta N'El
trasvase de tablas no puede funcionar con una transacción abierta.',
0, @NombreParalelo, @inicio,
@nameDB, @esquema, @procedimiento
RETURN
END
--Con el fin de identificar cada ejecución por un
nombre distinto, se va a crear dicho nombre a partir de una secuencia:
SELECT @idParalelo = NEXT VALUE FOR rca.SEQ_PARALELO,
@NombreParalelo = CONCAT('ParaleloRepositorio', @idParalelo)
--El Paralelo se va a componer de 3 Lotes de
Trabajos:
-- 1º Actualización de las tablas del
repositorio local. Sin transacciones
-- 2º Trasvase de Información desde el
repositorio local al remoto. Sin transacciones
-- 3º Actualización de HAHS_OLD por HASH_NEW y
poner indicador de fin del trabajo. Con transacciones
--Lote de Trabajos 1º:
-- 1º Crear el nuevo registro en
rca.ACTUALIZACIONES y eliminar datos de ejecuciones pasadas fallidas
-- 2º Si hay FKs locales en el respositorio:
Eliminarlas.
-- 3º Eliminar las FKs remotas.
SET @Trabajo = CONCAT(N'EXEC
SGR.rca.LimpiaTablasRemotas N''', TRY_CONVERT(nvarchar, @inicio, 121),
N''', ', @idParalelo, N', @error out')
EXEC ibbclr.Parallel_AddJob @NombreParalelo, 1,
N'Iniciales', N'Limpia Tablas Remotas', 0, @Trabajo
SELECT @Parametros = CONCAT(N'N''', @NombreParalelo,
N''', @error out'), @Trabajo = CONCAT(N'EXEC rca.EliminaFkLocales ',
@Parametros)
EXEC ibbclr.Parallel_AddJob @NombreParalelo, 1,
N'Iniciales', N'Elimina FKs Locales', 0, @Trabajo
EXEC ibbclr.Parallel_AddJob @NombreParalelo, 1,
N'Iniciales', N'Elimina FKs Remotas', 0, N'EXEC SGR.rca.EliminaFkRemotas
@error out'
-- 4º Actualizar las tablas del repositorio
local en paralelo.
-- Nota: en los procedimientos
almacenados va incluido la regeneración de índices y estadísticas, si no,
habría
-- que hacer un
procedimiento almacenado para ello y ejecutarlo como trabajos Finales
SET @Trabajo = CONCAT(N'EXEC rca.RellenaTabla1 ',
@Parametros)
EXEC ibbclr.Parallel_AddJob @NombreParalelo, 1,
N'Paralelo', N'Carga Tabla1', 0, @Trabajo
SET @Trabajo = CONCAT(N'EXEC rca.RellenaTabla2 ',
@Parametros)
EXEC ibbclr.Parallel_AddJob @NombreParalelo, 1,
N'Paralelo', N'Carga Tabla2', 0, @Trabajo
-- 5º Si hay FKs locales en el repositorio:
Crearlas
SET @Trabajo = CONCAT(N'EXEC rca.CreaFkLocales ',
@Parametros)
EXEC ibbclr.Parallel_AddJob @NombreParalelo, 1,
N'Finales', N'Crea FKs Locales', 0, @Trabajo
--Lote de Trabajos 2º:
-- 1º Traspasar la información desde el local
al remoto en paralelo
-- 2º Crear las FKs remotas
-- 3º Regenerar los índices remotos
SELECT @Parametros = CONCAT(@idParalelo, N', N''',
@NombreParalelo, N''', @error out'), @Trabajo = CONCAT(N'EXEC
rca.Traspasa_Tabla1 ', @Parametros)
EXEC ibbclr.Parallel_AddJob @NombreParalelo, 2,
N'Paralelo', N'Traspasa Tabla1', 0, @Trabajo
SELECT @Trabajo = CONCAT(N'EXEC rca.Traspasa_Tabla2
', @Parametros)
EXEC ibbclr.Parallel_AddJob @NombreParalelo, 2,
N'Paralelo', N'Traspasa Tabla2', 0, @Trabajo
EXEC ibbclr.Parallel_AddJob @NombreParalelo, 2,
N'Finales', N'Crea FKs Remotas', 0, N'EXEC SGR.rca.CreaFkRemotas @error
out'
EXEC ibbclr.Parallel_AddJob @NombreParalelo, 2,
N'Finales', N'Regenera Índices Remotos', 0, N'EXEC
SGR.rca.RegeneraIndicesRemotos @error out'
--PRIMERA OPCIÓN: Ejecutar en paralelo y con
transacciones la actualización de los HASH y la Fecha de Finalización
--Lote de trabajos 3º, con transacciones:
-- 1º Actualizar los HASHs de las tablas
-- 2º Poner la fecha de finalización en el
remoto
SELECT @Parametros = CONCAT(N'''', @NombreParalelo,
N''', @error out'), @Trabajo = CONCAT(N'EXEC rca.ActualizaHash_Tabla1 ',
@Parametros)
EXEC ibbclr.Parallel_AddJob @NombreParalelo, 3,
N'Paralelo', N'ActualizaHash Tabla1', 1, @Trabajo
SELECT @Trabajo = CONCAT(N'EXEC
rca.ActualizaHash_Tabla2 ', @Parametros)
EXEC ibbclr.Parallel_AddJob @NombreParalelo, 3,
N'Paralelo', N'ActualizaHash Tabla2', 1, @Trabajo
SELECT @Duracion = ibb.TimeSpanToNow(@inicio),
@Trabajo = CONCAT(N'EXEC SGR.rca.UpdateFinActualizacion ', @idParalelo,
N', ''', @Duracion, N''', ''',
TRY_CONVERT(nvarchar, SYSDATETIME(), 121), N''', @error out')
EXEC ibbclr.Parallel_AddJob @NombreParalelo, 3,
N'Paralelo', N'Actualiza Fecha Fin', 1, @Trabajo
--Opciones para la ejecución de los Trabajos
-- Se han creado tres lotes de trabajos, y
después de experimentar varias veces comprobamos que los siguientes
grados
-- de paralelismo son los óptimos
EXEC ibbclr.Parallel_SetOption_MaxTaskBatch
@NombreParalelo, 1, 6
EXEC ibbclr.Parallel_SetOption_MaxTaskBatch
@NombreParalelo, 2, 20
EXEC ibbclr.Parallel_SetOption_MaxTaskBatch
@NombreParalelo, 3, 15
-- Se indica que las transacciones a crear
serán de tipo 'ReadCommitted' y que el tiempo máximo que durará abierta
una
-- Transacción será de 5 minutos. Este valor
deberá ser tal que sea el tiempo total de duración del 3º lote, ya que
es el
-- único que tiene transacciones
EXEC ibbclr.Parallel_SetOptions @NombreParalelo,
N'ReadCommitted', 5
--Ejecución del Paralelo
EXEC @error = ibbclr.Parallel_Execute @NombreParalelo
----SEGUNDA OPCION: Ejecutar el procedimiento
almacenado rca.Actualiza_Hash_Todas_Tablas_Secuencial dentro de una
transacción
--BEGIN TRAN
--EXEC rca.Actualiza_Hash_Todas_Tablas_Secuencial
@NombreParalelo, @idParalelo, @inicio, @error out
--IF @error = 0
-- COMMIT
--ELSE
-- ROLLBACK
IF @error = 0
EXECUTE ibb.Log_Errores_Inserta N'Se ha
ejecutado el Paralelo.',
0, @NombreParalelo, @inicio,
@nameDB, @esquema, @procedimiento
ELSE
EXECUTE ibb.Log_Errores_Inserta N'ERROR
ejecutando el Paralelo.',
10, @NombreParalelo, @inicio,
@nameDB, @esquema, @procedimiento
END TRY
BEGIN CATCH
DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(),
@ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) =
ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(),
@ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT =
ERROR_STATE()
EXECUTE ibb.Log_Errores_Inserta N'ERROR ejecutando el
Paralelo.',
10, @NombreParalelo, NULL, @nameDB,
@esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA,
@ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD,
@ERR_SQL_ESTADO
SET @error = @ERR_SQL_NUM_ERROR
END CATCH
END
GO
Once everything is programmed, the execution is quite simple:
DECLARE @error int
EXEC rca.RellenaTablasParalelo @error out
In the table 'ibb.Log_Errores' the LOG messages will appear together with the ERROR messages, if any. The messages are quite long, and contain line breaks, so it is recommended to activate the option to keep CR/LF of SSMS, to be able to copy and paste them where they can be read in full.
It is not within the scope of this chapter, but the algorithm described here can be added functionality for disaster recovery, that is, retrieving the OLTP repository tables from the BI repository tables, and vice versa.