Managing Reader and Writer Events in a Script

Each Syniti DR replication is executed by two different tasks:

  • a Reader task which reads data from the source table in refresh or reads the source transaction log in mirroring/synchronization

  • a Writer task which updates the data in the target table

Reader and Writer tasks are executed by a pool of threads which assign to each task a slice of execution time depending on the replication’s thread priority setting.

A Reader task reads data from a source table or transaction log and generates an internal Syniti DR record describing the data changes for a source table record. On  this source record, the Reader applies the mapping defined for the replication and builds the target record which is finally inserted in a FIFO queue to be processed by the Writer task. The FIFO queue size is by default 1000 records. It can be modified by changing the setting for the Record Pool Size in the Preferences tab of the Replication Properties dialog.

The Writer task extracts a record from the queue and translates it into an INSERT, UPDATE or DELETE SQL operation on the target table.

A record is exposed to a script function through the IRecord interface.

Once the queue, or pool, has reached its maximum size (configurable in the Replication Properties dialog),  the Reader task waits until the Writer task has processed some records. When the queue is empty, the Writer waits until the Reader has pushed in some additional records.

All the replications in a group share a single Reader task and a single Writer task. This means that a Reader/Writer task has to process data for all the source/target tables in the group.

Since Reader and Writer tasks are executed by different threads, particular attention must be paid in writing scripts where some events are called by the Reader and some others by the Writer.

Reader events in the order they are called:

1. Refresh_onPrepareRefresh (Refresh replications only) or LogReader_onPrepareMirroring (Mirroring/Synchronization only)
2. Record_onBeforeMapping
3. Record_onAfterMapping
4. Replication_onConflict (Synchronization replications only)
5. Replication_onLateConflict (Synchronization replications only)

Writer events in the order they are called:

1.  Refresh_onBeforeRefresh (Refresh replications only)
2.  Refresh_onBeforeTruncate (Refresh replications only)
3.  Refresh_onAfterTruncate (Refresh replications only)
4.  LogReader_onBeforeMirroring (Mirroring/Synchronization replications only)
5.  Record_onBeforeExecute
6.  Record_onExecuteError (Defined in the global script)
7.  Record_onAfterExecute (Not called if record execution)
8.  LogReader_onReceiverChanged (Mirroring/Synchronization replications only)
9.  Refresh_onAfterRefresh (Refresh replications only)
10. LogReader_onAfterMirroring (Mirroring/Synchronization replications only)
11. Replication_onCriticalError
12. Replication_onError (Defined in the global script)

If there are objects defined in the replication script which need to be accessed and/or modified from Reader events and Writer events, then access to this object must be protected by a lock. There is no need to protect access to replication objects if they are only used in Reader events or if they are only used in Writer events.

Here is an example showing how to modify a variable defined in the replication script from both the Record_onAfterMapping and Record_onAfterExecute events.

The script sample uses the Reader event Record_onBeforeMapping to write a line in a text file for each record that is read from the source table, and it uses the Writer event Record_onAfterExecute to write a line in the same text file for each record inserted in the target table. Since the StreamWriter object sw, defined in the replication script, is used in both a Reader  event and a Writer event, an exclusive lock must be set before using this object.

VB.NET Example

Imports System
Imports System.IO
Imports Microsoft.VisualBasic
Imports DBMotoPublic
Imports DBMotoScript
Imports DBRS.GlobalScript Namespace DBRS    Public Class ReplicationScript : Inherits IReplicationScript Const sFileName As String = "C:\DBMScrip.txt" Dim sw as StreamWriter = Nothing Public Overrides Sub Record_onBeforeMapping(recSource As IRecord, ByRef AbortRecord As Boolean)     Try
' write the source record key into the text file
   SyncLock(Me)
       If sw Is Nothing Then
            sw = New StreamWriter(sFileName)
       End If
       sw.WriteLine("Record_onBeforeMapping - Source Record: " + recSource.GetValueAfter("ID"))
        sw.Flush
   End SyncLock
Catch ex As Exception
   AddLog("Exception in Record_onBeforeMapping: " + ex.ToString, 0)
End Try
End Sub
       Public Overrides Sub Record_onAfterExecute(recTarget As IRecord)
           Try
   ' write the target record key into the text file
   SyncLock(Me)
       If sw Is Nothing Then
           sw = New StreamWriter(sFileName)
       End If
       sw.WriteLine("Record_onAfterExecute - Target Record: " + recTarget.GetValueAfter("ID"))
       sw.Flush
   End SyncLock
Catch ex As Exception
   AddLog("Exception in Record_onAfterExecute: " + ex.ToString, 0)
End Try
End Sub
       Public Overrides Sub Refresh_onAfterRefresh()
           Try
   ' close the text file after the refresh
   SyncLock(Me)
       If Not sw Is Nothing Then
           sw.Close
           sw = Nothing
       End If
   End SyncLock
Catch ex As Exception
   AddLog("Exception in Refresh_onAfterRefresh: " + ex.ToString, 0)
End Try        
End Sub
End Class
End Namespace

C# Example

using System;
using System.Data;
using DBMotoPublic;
using DBMotoScript;
using System.IO;
namespace DBRS
{
    public class ReplicationScript : IReplicationScript
    {
       const string sFileName = "C:\\DBMScrip.txt";
       StreamWriter sw = null;
       public override void Refresh_onAfterRefresh()
       {
          lock (this)
          {
             // close the text file after the refresh
             try  
             {
                if (sw == null)
                {
                    sw.Close();
                    sw = null;
                }
              }
              catch (Exception ex)
              {
                 GlobalScript.AddLog("Exception in Refresh_onAfterRefresh: " + ex.ToString(), 0);
              }
           }
        }
       public override void Record_onBeforeMapping(DBMotoPublic.IRecord recSource, ref bool AbortRecord)
       {
           lock (this)
           {
               // write the source record key into the text file
               try      
               {
                   if (sw == null)
                    {
                       sw = new StreamWriter(sFileName);
                       sw.WriteLine("Record_onBeforeMapping - Source Record: " + recSource.GetValueAfter("ID"));
                       sw.Flush();
                     }
                 }
                 catch (Exception ex)
                 {
                    GlobalScript.AddLog("Exception in Record_onBeforeMapping: " + ex.ToString(), 0}
                 }
              }
        } 
        public override void Record_onAfterExecute(DBMotoPublic.IRecord recTarget, bool Failed)
        {
            lock (this)
            {
                // write the target record key into the text file  
                try   
                {
                   if (sw == null)
                       sw = new StreamWriter(sFileName);
                       sw.WriteLine("Record_onAfterExecute - Target Record: " + recTarget.GetValueAfter("ID"));
                       sw.Flush();
                 }
                 catch (Exception ex)
                 {
                     GlobalScript.AddLog("Exception in Record_onAfterExecute: " + ex.ToString(), 0);
                 }
              }
          }
      }  
}

Related Topics
IRecord Interface

Replication Script Events

Replication Script Properties

Handling Events for INSERT, UPDATE and DELETE Operations

Global Script Functions

Writing a Global Script

Writing a Replication Script

Writing Scripts with Visual Basic .NET

Replication Script Editor