Introduction: Data Synchronization in SQL ExpressData synchronization feature is available only in the SQL Server. In order to mimic that feature in SQL Express, currently we are using TableDiff utility method. This document proposes a new Stored Procedure method to speedup the entire data synchronization process as well as overcome some issues found in TableDiff utility method.Data Synchronization using TableDiff utility method:We can use TableDiff utility to generate a Transact-SQL script (containing delete/insert/update statements) to fix discrepancies at the destination server to bring the source and destination tables into convergence. Since this utility compare one table at a time, we need to call it in a loop in case we have N number of tables to synchronize. Within the loop, we have to accumulate (i.e., append) the generated Transact-SQL script into a local file (say CompleteFixSQL.sql). At end of the loop, we have a complete script file that need to be executed at destination server. sqlcmd utility can be used to run that script file (CompleteFixSQL.sql) into the destination server to bring the source and destination tables into convergence. However, there are some drawbacks using these utilities• Both TableDiff & sqlcmd utilities are external application need to be called from client application code. TableDiff need to be called N times if we have N number of tables to be synchronized which incurs I/O overhead.• Sqlcmd utility executes statements that are contained in the CompleteFixSQL script one by one in a sequence manner which is a time consuming process in case we have large data to be synchronized at destination.• TableDiff utility has some limitations. It would not generate FIX script file for LOB datatypes such as text, ntext & image. Data Synchronization using Stored Procedure:A Stored Procedure (SP) can be used to compare the difference between source and destination tables and then synchronize the destination tables with source table data. To bring the source and destination tables into convergence, 1. Find the records that need to be deleted from Destination database table 2. Find the records that need to be inserted into Destination database table3. Find the records that need to be updated in the Destination database tableSubsequently, we have to execute delete, insert and update statements in the destination database for the records that are found in the above steps 1, 2 & 3 respectively.The advantages of this SP method over TableDiff method are• The stored procedure is already compiled and stored within the destination database. Client application code is just need to call this SP using the connection string• Both table Compare & Synchronization will be done at a single query(one per each delete, insert & update)• Records are processed (deleted, inserted & updated) in bulk manner.• LOB datatypes are supported Step 1: Records to be deleted from Destination database table• Select the records that does not exist in Source database table, but exist in the Destination database table• Then delete them from the Destination database tabledelete from DestinationDB.dbo.TableName DestinationDBTablewhere not exists (select 1 from SourceDB.dbo.TableName SourceDBTable where SourceDBTable.PrimaryColumnName1 = DestinationDBTable.PrimaryColumnName1 and SourceDBTable.PrimaryColumnName2 = DestinationDBTable.PrimaryColumnName2 and ... SourceDBTable.PrimaryColumnNameN = DestinationDBTable.PrimaryColumnNameN )
If the table contains an Identity column then we can simply use that column rather than primary key column in the join condition of the WHERE clause. This will reduce the size of the join condition especially when the table having composite primary keys and an Identity column. Also it will improve the performance of the delete statementdelete from DestinationDB.dbo.TableName DestinationDBTablewhere not exists (select 1 from SourceDB.dbo.TableName SourceDBTable where SourceDBTable.IdentityColumn = DestinationDBTable.IdentityColumn )
Step 2: Records to be inserted into Destination database table• Select the records that are exist in the Source database table, but does not exist in the Destination database table• Then insert them into the Destination database tableinsert into DestinationDB.dbo.TableName DestinationDBTable (ColumnList)select SourceDBTable.Columnlistfrom SourceDB.dbo.TableName SourceDBTablewhere not exists (select 1 from DestinationDB.dbo.TableName DestinationDBTable where DestinationDBTable.PrimaryColumnName1 = SourceDBTable.PrimaryColumnName1 and DestinationDBTable.PrimaryColumnName2 = SourceDBTable.PrimaryColumnName2 and ... DestinationDBTable.PrimaryColumnNameN = SourceDBTable.PrimaryColumnNameN )
Column with TimeStamp datatype should be excluded from the Column list of the above insert statement as we cannot explicitly set values for TimeStamp column.As specified in the Step 1 we can use identity column rather than primary key column as followsinsert into DestinationDB.dbo.TableName DestinationDBTable (ColumnList)select SourceDBTable.Columnlistfrom SourceDB.dbo.TableName SourceDBTablewhere not exists (select 1 from DestinationDB.dbo.TableName DestinationDBTable where DestinationDBTable.IdentityColumn = SourceDBTable. IdentityColumn )
If the table having identity column then the above insert statement must be enclosed by the “set identity_insert on/off” as followsset identity_insert TableName On... above insert statement set identity_insert TableName off
Step 3. Records to be updated in the Destination database table• Select the records that are differ from Source & Destination database table• Then update them in the Destination database table with the source database table dataupdate TableNameset ColumnName1 = SourceDBTable.ColumnName1, ColumnName2 = SourceDBTable.ColumnName2, ... ColumnNameN = SourceDBTable.ColumnNameNfrom DestinationDB.dbo.TableName DestinationDBTable, ( select max(TableName) as TableName, columnlist from ( select 'SourceTableName' as TableName, columnlist from SourceTableName union all select 'DestinationiTableName' as TableName, columnlist from DestinationTableName ) AliasName group by columnlist having count(*) = 1 and max(TableName) = 'SourceTableName' ) SourceDBTablewhere SourceDBTable.PrimaryColumnName1 = DestinationDBTable.PrimaryColumnName1 and SourceDBTable.PrimaryColumnName2 = DestinationDBTable.PrimaryColumnName2 and ... SourceDBTable.PrimaryColumnNameN = DestinationDBTable.PrimaryColumnNameN
Column with TimeStamp datatype should be excluded from the SET clause of the above update statement as we cannot explicitly set values for TimeStamp column.Column with LOB datatypes (Text,nText & Image) should be converted to respective Large Value datatypes[Varchar(max),nVarchar(max) & varbinary(max)] from the queries that are combined by UNION ALL of the above statement. This is because the UNION ALL causes the sorting mechanism which prohibits LOB datatypes.As specified in the Step 1 & 2 we can use identity column rather than primary key column in the WHERE clause of the update statement. Stored procedure for Data Synchronization:The below Data Synchronization - Beta Version - Script contains following view and stored procedures (SP) to implement the Data Synchronization1. v_DTS_ColumnInformation2. stp_DTS_GetCommaSeperatedColumnString3. stp_DTS_GetIdentityOrPrimaryKeyColumnDetails4. stp_DTS_SetDestinationColumnWithSourceColumnString5. stp_DTS_DataSynchronizationv_DTS_ColumnInformation: This view will be used to populate the column details such as Data Type, Primary Key, Null constraint, Identity property, Column size constraint (Length for character datatype, Precision and scale of number datatype)stp_DTS_GetCommaSeperatedColumnString: This SP generates various strings comma separated column string for a given tablestp_DTS_GetIdentityOrPrimaryKeyColumnDetails: This SP generates various strings for an Identity or Primary key columns of a given table.stp_DTS_SetDestinationColumnWithSourceColumnString: This SP generates the SET clause for the update statement described in the step 3.stp_DTS_DataSynchronization: This is the main SP will be used to synchronize the destination tables with the source tables data.Details of the parameters used in all of the above procedures are described along with the header of each SP.Assumption:• Both source and destination tables’ schema are identical.• Both source and destination data sources are different.• Destination server has a linked server with source server in case both are remotely connected.• All the SP’s and view listed above are stored & compiled in the destination database.• All Foreign key constraints & Trigger (that affects another tables) of the destination tables are disabled before executing stp_DTS_DataSynchronization• The main SP stp_DTS_DataSynchronization will be executed at destination database.• User calls the main SP stp_DTS_DataSynchronization with valid parameters.Limitation:• Column with Timestamp datatype excluded from the data synchronization.Features of future version:• The Stored Procedure will be extended to include parameter validation. • It will be extended in such a way to disable the Foreign key constraints & Triggers of the destination tables before starting synchronization. After completion of data synchronization (either success or failure), all Foreign key constraints & Triggers will be enabled back to maintain data integrity.• It will be extended to provide Log of the data synchronization process. So that user can know, how many records are deleted/inserted/updated and what are they. Hint: The OUTPUT clause can be used to achieve this.Conclusion:Data synchronization using Stored Procedure method is faster than the TableDiff method. The genuine feedback from readers will be helpful to achieve a better solution than this proposed method. Together we will meet that goal. References:1. The shortest, fastest, and easiest way to compare two tables in SQL Server: UNION (http://weblogs.sqlteam.com/jeffs/archive/2004/11/10/2737.aspx)2. TableDiff Utility (http://technet.microsoft.com/en-us/library/ms162843.aspx)Acknowledgment:My sincere thanks to all the experts who participated and spend their valuable time to discuss the technique of Table Comparison in the Jeff's SQL Server Blog (http://weblogs.sqlteam.com/jeffs/archive/2004/11/10/2737.aspx). • Jeff: For his UNION ALL method to compare tables• Click: For his NOT EXISTS method to compare tables• David L. Penton: For his explanation of the issues found in NOT EXISTS method• John: For his powerful coding to generate comma separated list with a single SELECTData Synchronization - Beta Version.sql - Script/**********************************************************************' FILE NAME : v_DTS_ColumnInformation.sql' VERSION : Beta version' CREATED DATE : 05-Jan-2008' WRITTEN BY : Ganesan Krishnan' DESCRIPTION : This view populates column details' This view used by most of the stp_DTS.xxx stored procedures' COLUMN : 1. Table Name' 2. Column Name' 3. Data Type' 4. A flag indicates whether the column is part of the Primary Key or not' 5. A flag indicates whether the column is nullable or not' 6. A flag indicates whether the table has identity column or not ' 7. Length of character data type column' 8. Precision of numeric data type column' 9. Sclae of numeric data type column'*********************************************************************' Modification History'*********************************************************************//*-- Retrieve column informationselect * from vColumnInformation*/if exists (select * from dbo.sysobjects where id = object_id(N'[dbo].[v_DTS_ColumnInformation]') )drop view [dbo].[v_DTS_ColumnInformation]GOcreate view v_DTS_ColumnInformationasselect TableName = isc.table_name ,ColumnName = isc.column_name ,DataType = isc.data_type ,IsPrimaryKey = case when iskcu.ordinal_position is null then 0 else 1 end ,IsNullable = sc.isnullable ,IsIdentity = case when sc.status =128 then 1 else 0 end ,CharacterMaximumLength = isc.Character_Maximum_Length ,NumericPrecision = isc.Numeric_Precision ,NumericScale = isc.Numeric_Scalefrom information_schema.columns isc left outer join information_schema.key_column_usage iskcu on isc.table_name = iskcu.table_name and isc.column_name = iskcu.column_name inner join sysobjects so on isc.table_name = so.name inner join syscolumns sc on so.id = sc.id and isc.column_name = sc.namego/**********************************************************************' FILE NAME : stp_DTS_GetCommaSeperatedColumnString.sql' VERSION : Beta version' CREATED DATE : 05-Jan-2008' WRITTEN BY : Ganesan Krishnan' DESCRIPTION : This stored procedure generates the comma seperated' column string for a given table ' This SP called from stp_DTS_DataSynchronization ' PARAMTERS : 1. Table Name (Input)' 2. Column String (Output)' 3. Column String without TimeStamp datatype column(Output)' 4. Column String With casting LOB to Large Value Datatype (Output)' 5. Column String With casting Large Value to LOB Datatype(Output)'*********************************************************************' Modification History'*********************************************************************//*-- Execute stp_DTS_GetCommaSeperatedColumnString proceduredeclare @v_ColumnString varchar(max), @v_ColumnStringWithoutTimeStampDataType varchar(max) , @v_ColumnStringWithCastingLOBToLargeValueDataType varchar(max), @v_ColumnStringWithCastingLargeValueToLOBDataType varchar(max)exec dbo.stp_DTS_GetCommaSeperatedColumnString 'TableName',@v_ColumnString out, @v_ColumnStringWithoutTimeStampDataType out, @v_ColumnStringWithCastingLOBToLargeValueDataType out, @v_ColumnStringWithCastingLargeValueToLOBDataType outselect ColumnString = @v_ColumnString ,ColumnStringWithoutTimeStampDataType = @v_ColumnStringWithoutTimeStampDataType ,ColumnStringWithCastingLOBToLargeValueDataType = @v_ColumnStringWithCastingLOBToLargeValueDataType ,ColumnStringWithCastingLargeValueToLOBDataType = @v_ColumnStringWithCastingLargeValueToLOBDataType*/if exists (select * from dbo.sysobjects where id = object_id(N'[dbo].stp_DTS_GetCommaSeperatedColumnString') and OBJECTPROPERTY(id, N'IsProcedure') = 1)drop procedure [dbo].[stp_DTS_GetCommaSeperatedColumnString]GOcreate procedure stp_DTS_GetCommaSeperatedColumnString (@p_TableName varchar(254) ,@p_ColumnString varchar(max) out ,@p_ColumnStringWithoutTimeStampDataType varchar(max) out ,@p_ColumnStringWithCastingLOBToLargeValueDataType varchar(max) out ,@p_ColumnStringWithCastingLargeValueToLOBDataType varchar(max) out )asbegin set nocount on select @p_ColumnString = '' ,@p_ColumnStringWithCastingLOBToLargeValueDataType = '' ,@p_ColumnStringWithCastingLargeValueToLOBDataType = '' ,@p_ColumnStringWithoutTimeStampDataType = '' select @p_ColumnString = @p_ColumnString + case when len(@p_ColumnString)>0 then ', ' else '' end + '[' + ColumnName + ']' ,@p_ColumnStringWithoutTimeStampDataType = @p_ColumnStringWithoutTimeStampDatatype + case when len(@p_ColumnStringWithoutTimeStampDatatype)>0 and DataType != 'timestamp' then ', ' else '' end + case when DataType = 'timestamp' then '' else '[' + ColumnName + ']' end ,@p_ColumnStringWithCastingLOBToLargeValueDataType = @p_ColumnStringWithCastingLOBToLargeValueDataType + case when len(@p_ColumnStringWithCastingLOBToLargeValueDataType)>0 then ', ' else '' end + case when DataType = 'image' then 'convert(varbinary(max),[' + ColumnName + ']) as ' + '[' + ColumnName + ']' when DataType = 'text' then 'convert(varchar(max),[' + ColumnName + ']) as ' + '[' + ColumnName + ']' when DataType in ('ntext','xml') then 'convert(nvarchar(max),[' + ColumnName + ']) as ' + '[' + ColumnName + ']' else '[' + ColumnName + ']' end ,@p_ColumnStringWithCastingLargeValueToLOBDataType = @p_ColumnStringWithCastingLargeValueToLOBDataType + case when len(@p_ColumnStringWithCastingLargeValueToLOBDataType)>0 then ', ' else '' end + case when DataType = 'image' then 'convert(image,[' + ColumnName + ']) as ' + '[' + ColumnName + ']' when DataType = 'text' then 'convert(text,[' + ColumnName + ']) as ' + '[' + ColumnName + ']' when DataType = 'ntext' then 'convert(ntext,[' + ColumnName + ']) as ' + '[' + ColumnName + ']' when DataType = 'xml' then 'convert(xml,[' + ColumnName + ']) as ' + '[' + ColumnName + ']' else '[' + ColumnName + ']' end from v_DTS_ColumnInformation where TableName = @p_TableNameendgo/**********************************************************************' FILE NAME : stp_DTS_GetIdentityOrPrimaryKeyColumnDetails.sql' VERSION : Beta version' CREATED DATE : 05-Jan-2008' WRITTEN BY : Ganesan Krishnan' DESCRIPTION : This stored procedure generates the various string' for an Identity or Primary Key column for a given table' This SP called from stp_DTS_DataSynchronization ' PARAMTERS : 1. Table Name (Input)' 2. Equi Join String (Output)' 3. Column Structure String for the temporary table (Output)' 4. Comma seperated column strinig for DELETED table (Output)' 5. Comma seperated column strinig for INSERTED table (Output)' 6. Comma seperated column strinig for temporary table that hold detailed log (Output)' 7. Comma seperated column string of Identity or Priamry key column (Output)' 8. A flag indicates whether the table contains identity column or not (Output)'*********************************************************************' Modification History'*********************************************************************//*-- Executes stp_DTS_GetIdentityOrPrimaryKeyColumnDetails proceduredeclare @p_EquiJoinString varchar(max),@p_StructureString varchar(max),@p_DeletedString varchar(max),@p_InsertedString varchar(max),@p_TmpTableColumnListString varchar(max),@p_IdentityOrPrimaryKeyColumnString varchar(max),@p_IsIdentity bit exec stp_DTS_GetIdentityOrPrimaryKeyColumnDetails 'TableName',@p_EquiJoinString OUT, @p_StructureString OUT, @p_DeletedString OUT, @p_InsertedString OUT, @p_TmpTableColumnListString OUT,@p_IdentityOrPrimaryKeyColumnString out, @p_IsIdentity OUTselect EquiJoinString = @p_EquiJoinString ,StructureString = @p_StructureString ,DeletedString = @p_DeletedString ,InsertedString = @p_InsertedString ,TmpTableColumnListString = @p_TmpTableColumnListString ,IdentityOrPrimaryKeyColumnString = @p_IdentityOrPrimaryKeyColumnString ,IsIdentity = @p_IsIdentity*/if exists (select * from dbo.sysobjects where id = object_id(N'[dbo].[stp_DTS_GetIdentityOrPrimaryKeyColumnDetails]') and OBJECTPROPERTY(id, N'IsProcedure') = 1)drop procedure [dbo].[stp_DTS_GetIdentityOrPrimaryKeyColumnDetails]GOcreate procedure [dbo].[stp_DTS_GetIdentityOrPrimaryKeyColumnDetails] (@p_TableName varchar(254) ,@p_EquiJoinString varchar(max) out ,@p_StructureString varchar(max) out ,@p_DeletedString varchar(max) out ,@p_InsertedString varchar(max) out ,@p_TmpTableColumnListString varchar(max) out ,@p_IdentityOrPrimaryKeyColumnString varchar(max) out ,@p_IsIdentity bit out )asbegin set nocount on select @p_EquiJoinString = '' ,@p_StructureString = '' ,@p_DeletedString = '' ,@p_InsertedString = '' ,@p_TmpTableColumnListString = '' ,@p_IdentityOrPrimaryKeyColumnString = '' ,@p_IsIdentity=1 -- Construct the EquiJoinString using identity column if the table contains Identity column select @p_EquiJoinString = @p_EquiJoinString + case when len(@p_EquiJoinString)>0 then ' and ' else '' end + @p_TableName + '.[' + ColumnName + '] = SrcDBTable.[' + ColumnName + ']' ,@p_StructureString = @p_StructureString + case when len(@p_StructureString)>0 then ', ' else '' end + '[' + ColumnName + ']' + ' ' + CASE DataType WHEN 'char' THEN Datatype +'('+ convert(varchar, CharacterMaximumLength) + ')' WHEN 'nchar' THEN Datatype +'('+ convert(varchar, CharacterMaximumLength) + ')' WHEN 'varchar' THEN Datatype + '('+convert(varchar, CharacterMaximumLength) + ')' WHEN 'nvarchar' THEN Datatype +'('+ convert(varchar, CharacterMaximumLength) + ')' WHEN 'decimal' THEN Datatype +'('+ convert(varchar, NumericPrecision) + ',' + convert(varchar, NumericScale) + ')' WHEN 'numeric' THEN Datatype +'('+ convert(varchar, NumericPrecision) + ',' + convert(varchar, NumericScale) + ')' WHEN 'float' THEN Datatype +'('+ convert(varchar, NumericPrecision) + ')' WHEN 'binary' THEN Datatype +'('+ convert(varchar, CharacterMaximumLength) + ')' WHEN 'varbinary' THEN Datatype +'('+ convert(varchar, CharacterMaximumLength) + ')' else DataType END ,@p_DeletedString = @p_DeletedString + case when len(@p_DeletedString)>0 then ', ' else '' end + 'deleted.[' + ColumnName + ']' ,@p_InsertedString = @p_InsertedString + case when len(@p_InsertedString)>0 then ', ' else '' end + 'inserted.[' + ColumnName + ']' ,@p_TmpTableColumnListString = @p_TmpTableColumnListString + case when len(@p_TmpTableColumnListString)>0 then '+'',''+' else '' end + 'convert(varchar(max),[' + ColumnName + '])' ,@p_IdentityOrPrimaryKeyColumnString = @p_IdentityOrPrimaryKeyColumnString + case when len(@p_IdentityOrPrimaryKeyColumnString)>0 then ', ' else '' end + '[' + ColumnName + ']' from v_DTS_ColumnInformation where TableName = @p_TableName and IsIdentity = 1 -- Include only Identity column -- Construct the EquiJoinString using primary column if the table does not contains Identity column if @p_EquiJoinString = '' begin set @p_IsIdentity = 0 select @p_EquiJoinString = @p_EquiJoinString + case when len(@p_EquiJoinString)>0 then ' and ' else '' end + @p_TableName + '.[' + ColumnName + '] = SrcDBTable.[' + ColumnName + ']' ,@p_StructureString = @p_StructureString + case when len(@p_StructureString)>0 then ', ' else '' end + '[' + ColumnName + ']' + ' ' + CASE DataType WHEN 'char' THEN Datatype +'('+ convert(varchar, CharacterMaximumLength) + ')' WHEN 'nchar' THEN Datatype +'('+ convert(varchar, CharacterMaximumLength) + ')' WHEN 'varchar' THEN Datatype + '('+convert(varchar, CharacterMaximumLength) + ')' WHEN 'nvarchar' THEN Datatype +'('+ convert(varchar, CharacterMaximumLength) + ')' WHEN 'decimal' THEN Datatype +'('+ convert(varchar, NumericPrecision) + ',' + convert(varchar, NumericScale) + ')' WHEN 'numeric' THEN Datatype +'('+ convert(varchar, NumericPrecision) + ',' + convert(varchar, NumericScale) + ')' WHEN 'float' THEN Datatype +'('+ convert(varchar, NumericPrecision) + ')' WHEN 'binary' THEN Datatype +'('+ convert(varchar, CharacterMaximumLength) + ')' WHEN 'varbinary' THEN Datatype +'('+ convert(varchar, CharacterMaximumLength) + ')' else DataType END ,@p_DeletedString = @p_DeletedString + case when len(@p_DeletedString)>0 then ', ' else '' end + 'deleted.[' + ColumnName + ']' ,@p_InsertedString = @p_InsertedString + case when len(@p_InsertedString)>0 then ', ' else '' end + 'inserted.[' + ColumnName + ']' ,@p_TmpTableColumnListString = @p_TmpTableColumnListString + case when len(@p_TmpTableColumnListString)>0 then '+'',''+' else '' end + 'convert(varchar(max),[' + ColumnName + '])' ,@p_IdentityOrPrimaryKeyColumnString = @p_IdentityOrPrimaryKeyColumnString + case when len(@p_IdentityOrPrimaryKeyColumnString)>0 then ', ' else '' end + '[' + ColumnName + ']' from v_DTS_ColumnInformation where TableName = @p_TableName and IsPrimaryKey = 1 -- Include only primary key column endendgo/**********************************************************************' FILE NAME : stp_DTS_SetDestinationColumnWithSourceColumnString.sql' VERSION : Beta version' CREATED DATE : 05-Jan-2008' WRITTEN BY : Ganesan Krishnan' DESCRIPTION : This stored procedure generates the set string' of an update statement for a given table' This excludes TimeStamp datatype column' This SP called from stp_DTS_DataSynchronization ' PARAMTERS : 1. Table Name (Input)' 2. Set String (Output)'*********************************************************************' Modification History'*********************************************************************//*-- Execute stp_DTS_SetDestinationColumnWithSourceColumnString proceduredeclare @v_SetString varchar(max)exec stp_DTS_SetDestinationColumnWithSourceColumnString 'TableName', @v_SetString outselect @v_SetString*/if exists (select * from dbo.sysobjects where id = object_id(N'[dbo].[stp_DTS_SetDestinationColumnWithSourceColumnString]') and OBJECTPROPERTY(id, N'IsProcedure') = 1)drop procedure [dbo].[stp_DTS_SetDestinationColumnWithSourceColumnString]GOcreate procedure stp_DTS_SetDestinationColumnWithSourceColumnString (@p_TableName varchar(254) ,@p_SetString varchar(max) out )asbegin set nocount on set @p_SetString = '' select @p_SetString = @p_SetString + case when len(@p_SetString)>0 then ', ' else '' end + '[' + ColumnName + '] = SrcDBTable.[' + ColumnName + ']' from v_DTS_ColumnInformation where TableName = @p_TableName and IsPrimaryKey = 0 -- Include only non key column and IsIdentity = 0 -- Include only non identity column and DataType != 'timestamp' -- Exclude TimeStamp data type columnendgo/**********************************************************************' FILE NAME : stp_DTS_DataSynchronization.sql' VERSION : Beta version' CREATED DATE : 05-Jan-2008' WRITTEN BY : Ganesan Krishnan' DESCRIPTION : This stored procedure synchronize the destination tables' data with the Source tables data' This SP should be executed at Destination database' PARAMTERS : 1. Linked Server Name of the Source Database (Input)' 2. Source Database Name (Input)' 3. Comma seperated List of Tables to be synchronized (Input)' 4. Program Mode (Input) ' -1 -> Table Compare only ' 0 -> Table Compare & Data Synchronization' 1 -> Data Synchronization Only'*********************************************************************' Modification History'*********************************************************************//*-- Execute Data Synchronization procedure at destination database to sync Destination table with Source table data-- Table Compare Only-- exec [DestinationServer\Instance].DestinationDB.dbo.stp_DTS_DataSynchronization 'SourceServer\Instance','SourceDBName','TableName1,TableName2,TableName3',-1-- Table Compare & Data Synchronization-- exec [DestinationServer\Instance].DestinationDB.dbo.stp_DTS_DataSynchronization 'SourceServer\Instance','SourceDBName','TableName1,TableName2,TableName3',0-- Data Synchronization Only-- exec [DestinationServer\Instance].DestinationDB.dbo.stp_DTS_DataSynchronization 'SourceServer\Instance','SourceDBName','TableName1,TableName2,TableName3',1*/if exists (select * from dbo.sysobjects where id = object_id(N'[dbo].[stp_DTS_DataSynchronization]') and OBJECTPROPERTY(id, N'IsProcedure') = 1)drop procedure [dbo].[stp_DTS_DataSynchronization]GOcreate procedure stp_DTS_DataSynchronization ( @p_LinkedServerNameofSourceDatabase varchar(254) ,@p_SourceDatabaseName varchar(254) ,@p_TablesToBeSynchronized varchar(max) ,@p_ProgramMode int = 1 )asbegin set nocount on declare @v_TableName varchar(254) ,@i_CommaPosition int ,@i_StringLength int ,@i_StartPosition int ,@i_EndPosition int ,@v_sql nvarchar(max) ,@v_ColumnList varchar(max) ,@v_ColumnlistWithoutTimeStampDataType varchar(max) ,@v_ColumnStringWithCastingLOBToLargeValueDataType varchar(max) ,@v_ColumnStringWithCastingLargeValueToLOBDataType varchar(max) ,@v_EquiJoinString varchar(max) ,@v_StructureString varchar(max) ,@v_DeletedString varchar(max) ,@v_InsertedString varchar(max) ,@v_TmpTableColumnListString varchar(max) ,@v_IdentityOrPrimaryKeyColumnString varchar(max) ,@v_SetDestinationColumnWithSourceColumnString varchar(max) ,@b_IsTableHavingIdentityColumn bit ,@i_errorcode int ,@v_ErrorMessage varchar(400) ,@i_return int ,@v_newline varchar(1) ,@v_DetailedLog varchar(max) ,@i_DeletedRowCount int ,@i_InsertedRowCount int ,@i_UpdatedRowCount int ,@i_TotalDeletedRowCount int ,@i_TotalInsertedRowCount int ,@i_TotalUpdatedRowCount int ,@i_TotalRowsSynchronized int ,@v_Line varchar(80) ,@v_Line1 varchar(80) ,@i_LogColumnWidth int ,@d_StartDate datetime ,@d_EndDate datetime ,@v_FullyQualifiedSourceTableName varchar(254) ,@v_FullyQualifiedDestinationTableName varchar(254) ,@v_LinkedServerNameofDestinationDatabase varchar(254) ,@v_DestinationDatabaseName varchar(254) ,@v_FullyQualifiedSourceDatabaseName varchar(254) ,@v_FullyQualifiedDestinationDatabaseName varchar(254) ,@b_SynchronizationTransactionStarted bit -- Initialize local variables select @i_errorcode = 0 ,@i_return = 0 ,@v_newline = char(10) ,@i_TotalDeletedRowCount = 0 ,@i_TotalInsertedRowCount = 0 ,@i_TotalUpdatedRowCount = 0 ,@i_TotalRowsSynchronized = 0 ,@v_Line = replicate('*',80) ,@v_Line1 = replicate('-',32) ,@i_LogColumnWidth = 30 ,@d_StartDate = getdate() ,@v_LinkedServerNameofDestinationDatabase = @@servername ,@v_DestinationDatabaseName = db_name() ,@b_SynchronizationTransactionStarted = 0 -- Derive the fully qualified name of the Database select @v_FullyQualifiedSourceDatabaseName = '[' + @p_LinkedServerNameofSourceDatabase + '].['+ @p_SourceDatabaseName + ']' + '.dbo.' ,@v_FullyQualifiedDestinationDatabaseName = '[' + @v_LinkedServerNameofDestinationDatabase + '].['+ @v_DestinationDatabaseName + ']' + '.dbo.' -- start synchronization transaction only if the program mode is "Table Compare & Data Synchronization" or "Data Synchronization only" if @p_ProgramMode >= 0 begin -- Start Synchronization transaction begin transaction Synchronization set @b_SynchronizationTransactionStarted = 1 end -- Loop through all the TableName from the 'Comma seperated List of Tables to be synchronized' input parameter while 1=1 begin select @i_CommaPosition = patindex('%,%',@p_TablesToBeSynchronized) ,@i_StringLength = len(@p_TablesToBeSynchronized) ,@i_StartPosition = 1 select @i_endPosition = case when @i_CommaPosition = 0 then @i_StringLength else @i_CommaPosition - 1 end -- Derive the Table Name to be synchronized select @v_TableName = substring(@p_TablesToBeSynchronized,@i_StartPosition,@i_endPosition) if @v_TableName is not null begin -- Derive the column list exec stp_DTS_GetCommaSeperatedColumnString @v_TableName ,@v_ColumnList out ,@v_ColumnlistWithoutTimeStampDataType out ,@v_ColumnStringWithCastingLOBToLargeValueDataType out ,@v_ColumnStringWithCastingLargeValueToLOBDataType out -- Perform Table Compare if @p_ProgramMode <= 0 begin select @v_FullyQualifiedSourceTableName = @v_FullyQualifiedSourceDatabaseName + @v_TableName ,@v_FullyQualifiedDestinationTableName = @v_FullyQualifiedDestinationDatabaseName + @v_TableName set @v_sql = 'select ''' + @v_FullyQualifiedSourceTableName + ''' as TableName, ' + @v_ColumnStringWithCastingLOBToLargeValueDataType + ' from ' + @v_FullyQualifiedSourceTableName + ' union all select ''' + @v_FullyQualifiedDestinationTableName + ''' as TableName, ' + @v_ColumnStringWithCastingLOBToLargeValueDataType + ' from ' + @v_FullyQualifiedDestinationTableName set @v_sql = 'select max(TableName) as [TableName : ' + @v_TableName + '], ' + @v_ColumnStringWithCastingLargeValueToLOBDataType + ' from (' + @v_sql + ') a group by ' + @v_ColumnList + ' having count(*) = 1' exec sp_executesql @v_sql -- If an error encountered then go to error handler for rollback the transaction select @i_errorcode = @@error if @i_errorcode != 0 goto ErrorHandler -- Jump to next iteration if the program mode is Table compare only if @p_ProgramMode < 0 goto NextIteration end -- Populate the Identity Or Primary Key Column Details exec stp_DTS_GetIdentityOrPrimaryKeyColumnDetails @v_TableName ,@v_EquiJoinString OUT ,@v_StructureString OUT ,@v_DeletedString OUT ,@v_InsertedString OUT ,@v_TmpTableColumnListString OUT ,@v_IdentityOrPrimaryKeyColumnString out ,@b_IsTableHavingIdentityColumn out -- Step 1. Records to be deleted from Destination database table -- Select the records that does not exist in Source database table, but exist in the Destination database table -- Then delete them from the Destination database table -- Start constructing dynamic sql required for Step 1. set @v_sql = 'delete from' + ' ' + @v_TableName + ' ' + 'where not exists' + ' ' + '(select 1 from' + ' ' + @v_FullyQualifiedSourceDatabaseName + @v_TableName + ' ' + 'as SrcDBTable' + ' ' + 'where' + ' ' + @v_EquiJoinString + ' ' + ')' -- Execute the constructed dynamic sql for Step 1 exec sp_executesql @v_sql -- If an error encountered then go to error handler for rollback the transaction select @i_errorcode = @@error if @i_errorcode != 0 goto ErrorHandler -- Step 2. Records to be inserted into Destination database table -- Select the records that are exist in the Source database table, but does not exist in the Destination database table -- Then insert them into the Destination database table -- Start constructing dynamic sql required for Step 2. -- 'set identity_insert on' before the insert statement if the table has an Identity column if @b_IsTableHavingIdentityColumn = 1 set @v_sql = 'set identity_insert' + ' ' + @v_TableName + ' ' + + 'on' else set @v_sql = '' set @v_sql = @v_sql + ' ' + 'insert into' + ' ' + @v_TableName + ' ' + '(' + @v_ColumnlistWithoutTimeStampDataType + ')' + ' ' + 'select' + ' ' + @v_ColumnlistWithoutTimeStampDataType + ' ' + 'from' + ' ' + @v_FullyQualifiedSourceDatabaseName + @v_TableName + ' ' + 'as SrcDBTable' + ' ' + 'where not exists ' + ' ' + '(select 1 from' + ' ' + @v_TableName + ' ' + 'where' + ' ' + @v_EquiJoinString + ' ' + ')' -- ''set identity_insert off' after the insert statement if the table has an Identity column if @b_IsTableHavingIdentityColumn = 1 set @v_sql = @v_sql + ' ' + 'set identity_insert' + ' ' + @v_TableName + ' ' + + 'off' -- Execute the constructed dynamic sql for Step 2 exec sp_executesql @v_sql -- If an error encountered then go to error handler for rollback the transaction select @i_errorcode = @@error if @i_errorcode != 0 goto ErrorHandler -- Step 3. Records to be updated in the Destination database table -- Select the records that are differ from Source & Destination database table -- Then update them in the Destination database table with the source database table data -- Populate the set string exec stp_DTS_SetDestinationColumnWithSourceColumnString @v_TableName ,@v_SetDestinationColumnWithSourceColumnString out -- if the table does not have a non key column then no need to do this step 3. if isnull(@v_SetDestinationColumnWithSourceColumnString,'') = '' begin set @i_UpdatedRowCount = 0 goto NextIteration end -- Start constructing dynamic sql required for Step 3. set @v_sql = 'update' + ' ' + @v_TableName + ' ' + 'set' + ' ' + @v_SetDestinationColumnWithSourceColumnString + ' ' + 'from' + ' ' + @v_TableName + ' ' +',' + ' ' + '(select' + ' ' + 'max(TableName) as TableName,' + ' ' + @v_ColumnStringWithCastingLargeValueToLOBDataType + ' ' + 'from' + ' ' + '(' + ' ' + 'select' + ' ' + '''SourceDBTableName'' as TableName,' + ' ' + @v_ColumnStringWithCastingLOBToLargeValueDataType + ' ' + 'from' + ' ' + @v_FullyQualifiedSourceDatabaseName + @v_TableName + ' ' + 'union all' + ' ' + 'select' + ' ' + '''DestinationiTableName'' as TableName,' + ' ' + @v_ColumnStringWithCastingLOBToLargeValueDataType + ' ' + 'from' + ' ' + @v_TableName + ' ' + ') tmp' + ' ' + 'group by' + ' ' + @v_ColumnList + ' ' + 'having' + ' ' + 'count(*) = 1 and max(TableName) = ''SourceDBTableName''' + ' ' + ')' + ' ' + 'as SrcDBTable' + ' ' + 'where' + ' ' + @v_EquiJoinString -- Execute the constructed dynamic sql for Step 3 exec sp_executesql @v_sql -- If an error encountered then go to error handler for rollback the transaction select @i_errorcode = @@error if @i_errorcode != 0 goto ErrorHandler end -- Derive the next iteration values -- If no further iternation is required then exit the loop NextIteration: if @i_CommaPosition = 0 begin -- Break the loop if no more tables exist to synchronize break end else begin -- Derive the remaining Tables to be synchronized select @p_TablesToBeSynchronized = substring(@p_TablesToBeSynchronized, @i_CommaPosition + 1, @i_StringLength) end end -- If the program mode is "Table Compare & Data Synchronization" or "Data Synchronization only" -- then commit the synchronization transaction and log the remaining if @p_ProgramMode >= 0 begin -- Commit the synchronization transaction as no error occured commit transaction Synchronization end -- Return success return 0 -- Error Handler ErrorHandler: begin -- check for error if ( @i_errorcode <= 0 ) begin raiserror 99999 @v_ErrorMessage select @i_return = -100 end else if ( @i_errorcode > 0 ) begin select @i_return = -100 end -- Rollback the synchronization transaction if it is started and an error occured if @b_SynchronizationTransactionStarted = 1 rollback transaction Synchronization -- Return Failure return @i_return end endgo