How to Develop High Availability and load balancing Simulation System with Delphi and ActiveMQ



How to Develop High Availability and load balancing Simulation System with Delphi and ActiveMQ
By : M. Yadi M

In this article shows how to develop High Availability and load balancing simulation system by mixing Delphi and Active MQ.  Before we go into detail, it’s necessary to know definition of High Availability and Load Balancing.

High availability (HA) system refers to systems that continuously operational for long time.

Load Balancing system is dividing the amount of work between 2 processors or more, in general it can make process faster.

There are a lot of samples of implementation HA system. In this article will show HA simulation on ATM (Automatic teller machine) service, as you know that a High availability level is very important for ATM Service.

Before we go into the development, let’s first create the design.


As you can see on the figure, it shows 3 big modules,

1.       Client Application

This module is a presentation layer that will be used by customer to send request.

It will use active-standby mechanism with automatic reconnection feature in case of network disconnection.

2.       Queue Service

This module handles automatic message distribution and load balancing. For this module we will use ActiveMQ.

To have ActiveMQ you can download from http://activemq.apache.org/

Since ActiveMQ support STOMP (Simple Text Oriented Message Protocol), it makes easy to integrate with any programming language

3.       Server Application

This module will process and response for every message request.

Another important thing is a message flow design, the details are described in below figure


The idea of the above figure is quite simple. Client application PUBLISH topic=’ORDER’ to ActiveMQ and Server application SUBSCRIBE Topic=’ORDER’ to ActiveMQ, once Client application publish message with topic = ‘ORDER’, ActiveMQ distributes the message automatically to the SUBSCRIBER, here server application act as the subscriber. And for the reply message, Server application PUBLISH topic =’ORDERREPLY’ to ActiveMQ, and Client applicatuon SUBSCRIBE topic = ‘ORDERREPLY’ to ActiveMQ, once Server application processed the messages and publish topic = ‘ORDERREPLY’, ActiveMQ distributes the message automatically to SUBSCRIBER, this part Client application act as the subscriber.

Now you have got the idea of the process. On the next step, we have to define message format and Communication protocol. The detail as follow:

1.       Message format

There are many standard message formats that can be used for the communication, such as XML, Jason, etc.  Since this article only for simulation purpose, we use the most simplest message format in the World J, the detail as follow:

Item

Value

Header

[MSGHEADER]

Body

Free text

Tail

[MSGTAIL]

2.       Communication Protocol with ActiveMQ

As mentioned before, in this article we use STOMP as communication protocol. For detail of STOMP, you can find it at Wikipedia page http://en.wikipedia.org/wiki/Streaming_Text_Oriented_Messaging_Protocol

Another important thing is a working directory, to manage project files. I make 3 directories with following details:

1.    LIB directory:  To save all general units

2.    ClientSide directory:  To save Client application project files

3.    ServerSide directory:   To save Server application project files

 

The result as shown on below figure



After the design completed then we are ready for the codes.  The steps as follow:

1.       Define Message format

Message format is used to manage standard message for communication between applications. The code as follow:

unit uMyActiveMQMessage;

 

interface

// Message Class, to manage communication between Applications

Type

  TMyActiveMQMessage = class

 

  private

    mHeader : string;

    mTail   : string;

  public

    constructor create();

    destructor destroy();override;

    function setMessage(inMsg : string) : string;

    function getMessage(inMsg : string) : string;

  end;

 

implementation

{TMyActiveMQMessage}

 

constructor TMyActiveMQMessage.create;

begin

   mHeader  := '[MSGHEADER]';

   mTail         := '[MSGTAIL]';

end;

 

destructor TMyActiveMQMessage.destroy;

begin

  inherited;

end;

 

function TMyActiveMQMessage.getMessage(inMsg: string): string;

var P1, p2 : integer;

begin

  // Show Message Only with Ignoring ActiveMQ message

  p1 := pos(mHeader,inMsg);

  p2 := pos(mTail,inMsg);

  result := Copy(inMsg, p1+length(mHeader), p2-p1-length(mHeader));

end;

 

function TMyActiveMQMessage.setMessage(inMsg: string): string;

begin

  // Set Message Header and Tail

  result := mHeader + inMsg + mTail;

end;

end.

Save the code into LIB directory, with filename “uMyActiveMQMessage.pas”

2.       Stomp Connector

Stomp connector is used to manage connection between applications and ActiveMQ.

unit uMyActiveMQConnector;

interface

uses System.SysUtils, System.Win.ScktComp, Vcl.Dialogs, uMyActiveMQMessage;

 

Type

  TOnReceived         = procedure (inSender : TObject; inSock : TCustomWinSocket) of object;

  TOnConnectionStatus = procedure (inSender : TObject; inStatus : boolean) of object;

 

  TMyActiveMQConnector = class

  private

    topicPublish   : string;

    topicSubscribe : string;

    FReceiveHandle : TOnReceived;

    FConnectionStatus : TOnConnectionStatus;

    socketObject : TClientSocket;

    procedure connect;

    procedure closeSocket;

    procedure onSocketConnect(inSender : TObject; inSock : TCustomWinSocket);

    procedure onSocketDisconnect(inSender : TObject; inSock : TCustomWinSocket);

    procedure onSocketRead(inSender : TObject; inSock : TCustomWinSocket);

    procedure onSocketError(inSender : TObject; inSock : TCustomWinSocket; inErrorEvent : TErrorEvent; var vErrorCode : integer );

 

  public

    constructor create(inTopicPublish, inTopicSubscribe : string);

    destructor destroy; override;

    procedure initSocket(inAddress : string;inPort : string);

    procedure subscribeTopic;

    procedure publishTopic(inMessage : string);

 

    property OnReceiveData : TOnReceived read FReceiveHandle write FReceiveHandle;

    property OnStatus : TOnConnectionStatus read FConnectionStatus write FConnectionStatus;

 

  end;

 

implementation

 

{ TMyActiveMQConnector }

 

procedure TMyActiveMQConnector.initSocket(inAddress: string; inPort: string);

var iMsg : string;

begin

 closeSocket;

 socketObject := TClientSocket.Create(nil);

 try

   socketObject.Address       := inAddress;

   socketObject.Port          := StrToInt(inPort);

   socketObject.OnRead        := onSocketRead;

   socketObject.OnConnect     := onSocketConnect;

   socketObject.OnDisconnect  := onSocketDisconnect;

   socketObject.OnError       := onSocketError;

   socketObject.Open;

 except

   closeSocket;

 end;

end;

 

procedure TMyActiveMQConnector.publishTopic(inMessage: string);

var iMsg      : string;

    iMyAMQMsg : TMyActiveMQMessage;

begin

  if not socketObject.Active then

    exit;

 

  // if connectionstatus is established then Customer is allowed to send Order

  iMyAMQMsg := TMyActiveMQMessage.Create;

  iMsg      := 'SEND'+ #10 +'destination:/queue/'+ topicPublish + #10#10 + iMyAMQMsg.setMessage(inMessage) + #0;

  try

    socketObject.Socket.SendText(iMsg);

  finally

    FreeAndNil(iMyAMQMsg);

  end;

end;

 

procedure TMyActiveMQConnector.subscribeTopic;

var iMsg : string;

begin

  if not socketObject.Active then

    exit;

 

  // if socketObject is Active then send Subsribe Topic

  iMsg := 'SUBSCRIBE'+ #10 +'destination:/queue/'+ topicSubscribe  + #10#10#0;

  socketObject.Socket.SendText(iMsg);

  try

    socketObject.Socket.SendText(iMsg);

  except

    MessageDlg('Error On subscribe',mtError,[mbOk],0);

  end;

end;

 

procedure TMyActiveMQConnector.connect;

// Send Request To Connect

var iMsg : string;

begin

 if not socketObject.Active then

    exit;

 

 iMsg := 'CONNECT'+#10#10#0;

 try

  socketObject.Socket.SendText(iMsg);

 except

end;

end;

 

constructor TMyActiveMQConnector.create(inTopicPublish, inTopicSubscribe : string);

begin

  // initialize socket Object

  socketObject  := TClientSocket.Create(nil);

  topicPublish  := inTopicPublish;

  topicSubscribe:= inTopicSubscribe;

end;

 

destructor TMyActiveMQConnector.destroy;

begin

  inherited;

end;

 

procedure TMyActiveMQConnector.closeSocket;

// Destroy Socket Object

begin

  if Assigned(socketObject) then

  begin

    try

      socketObject.Close;

    except

    end;

    FreeAndNil(socketObject);

  end;

end;

 

 

procedure TMyActiveMQConnector.onSocketConnect(inSender: TObject; inSock: TCustomWinSocket);

begin

 inherited;

 // On Socket Connected imediatelly open status and send Connect Request

 connect;

 // following by Topic Subscription

 subscribeTopic;

 

if assigned(FConnectionStatus) then

    FConnectionStatus(inSender, true);

end;

 

procedure TMyActiveMQConnector.onSocketDisconnect(inSender: TObject;

  inSock: TCustomWinSocket);

begin

 inherited;

  // On Socket Disconnection, close the status

 if assigned(FConnectionStatus) then

    FConnectionStatus(inSender, false);

end;

 

procedure TMyActiveMQConnector.onSocketError(inSender: TObject;

  inSock: TCustomWinSocket; inErrorEvent: TErrorEvent; var vErrorCode: integer);

begin

 inherited;

  // On Socket Error, close status

 if assigned(FConnectionStatus) then

    FConnectionStatus(inSender, false);

 vErrorCode := 0;

end;

 

procedure TMyActiveMQConnector.onSocketRead(inSender: TObject;

  inSock: TCustomWinSocket);

begin

 inherited;

 if assigned(FReceiveHandle)  then

   FReceiveHandle(inSender, inSock);

end;

end.

Save the code into LIB directory with file name “uMyActiveMQConnector.pas”

3.       Client Application

Create new Project for Client Application, and set the GUI as shown on below figure


                There some component on the form, let’s define one by one

Component

Property

Value

Event

TLabelEdit

Name

eIPAddress

 

Text

127.0.0.1

 

EditLabel.Caption

IP Address

 

TLabelEdit

Name

ePort

 

Text

61613

 

EditLabel.Caption

Port

 

TLabelEdit

Name

eOrderMessage

 

Text

0

 

EditLabel.Caption

Put your Withdrawal Amount (i.e 100)

 

TLabel

Name

lConnectionStatus

 

Caption

Disconnected

 

TButton

Name

btnConnect

onClick

Caption

Connect

TButton

Name

btnOrder

onClick

Caption

Send Order

TLabel

Name

lOrderStatus

 

Caption

Your Order Status

 

TLabel

Name

lServerLog

 

Caption

Server Log

 

TMemo

Name

MemoOrderStatus

 

TMemo

Name

MemoLog

 

TTimer

Name

TimerAutoReconnect

OnTimer

Interval

1000

Enabled

False

                The complete code and description as follow

unit uMain;

 

interface

 

uses

  Winapi.Windows, Winapi.Messages, System.SysUtils, System.Variants, System.Classes, Vcl.Graphics,

  Vcl.Controls, Vcl.Forms, Vcl.Dialogs, Vcl.StdCtrls,

  Vcl.ExtCtrls, Vcl.ComCtrls, uMyActiveMQConnector, System.Win.ScktComp, uMyActiveMQMessage;

 

type

  TForm1 = class(TForm)

    btnConnect: TButton;

    eIPAddress: TLabeledEdit;

    ePort: TLabeledEdit;

    lOrderStatus: TLabel;

    MemoOrderStatus: TMemo;

    MemoLog: TMemo;

    lServerLog: TLabel;

    TimerAutoReconnect: TTimer;

    Panel1: TPanel;

    lConnectionStatus: TLabel;

    btnOrder: TButton;

    eOrderMessage: TLabeledEdit;

    procedure btnConnectClick(Sender: TObject);

    procedure btnOrderClick(Sender: TObject);

    procedure FormCreate(Sender: TObject);

    procedure FormDestroy(Sender: TObject);

    procedure TimerAutoReconnectTimer(Sender: TObject);

  private

    { Private declarations }

    topicPublish    : string;

    topicSubscribe  : string;

    listIPAddresses : TStringList;

    indexOfIPAddress: integer;

    autoReconnectCounter : byte;

    maxReconnectCounter  : byte;

    myActiveMQConnector : TMyActiveMQConnector;

 

    procedure onSocketRead(inSender : TObject; inSock : TCustomWinSocket);

    procedure onSocketConnectionStatus(inSender : TObject; inStatus : boolean);

 

  public

    { Public declarations }

  end;

 

var

  Form1: TForm1;

 

implementation

{$R *.dfm}

 

procedure TForm1.FormCreate(Sender: TObject);

begin

 inherited;

 // initialize variables

// 3 seconds for automatic reconnection incase of network  disconnection

 maxReconnectCounter  := 3; autoReconnectCounter := maxReconnectCounter;

 // Initialize Topics

 topicPublish     := 'Order';

 topicSubscribe := 'ReplyOrder';

 

 // initialize IPAddresses

// as described on our design, we use active-standby mechanism for connection method, so we can define ActiveMQ IP Addresses more than 1 IP Address. This sample define 3 IP Addresses, you can have more if you like

 

 listIPAddresses := TStringList.Create;

 listIPAddresses.Add('127.0.0.1'); // Primary IP Address

 listIPAddresses.Add('10.72.85.11'); // Backup site 1 IP Address

 listIPAddresses.Add('10.72.85.12'); // Backup site 2 IP Address

 

 indexOfIPAddress := 0;

 eIPAddress.Text := listIPAddresses.Strings[indexOfIPAddress];

 

 // initialize Connection Status

 myActiveMQConnector := TMyActiveMQConnector.create(topicPublish, topicSubscribe);

 myActiveMQConnector.OnReceiveData := onSocketRead;

 myActiveMQConnector.OnStatus := onSocketConnectionStatus;

 

end;

 

procedure TForm1.FormDestroy(Sender: TObject);

begin

 inherited;

 myActiveMQConnector.Destroy;

 listIPAddresses.Free;

end;

 

 

procedure TForm1.btnConnectClick(Sender: TObject);

begin

 // Open Connection

 myActiveMQConnector.initSocket(eIPAddress.Text, ePort.Text);

end;

 

procedure TForm1.btnOrderClick(Sender: TObject);

begin

  // Send Order

  myActiveMQConnector.publishTopic(eOrderMessage.Text);

end;

 

 

procedure TForm1.onSocketConnectionStatus(inSender: TObject; inStatus: boolean);

begin

 inherited;

 // Show Connection Status into Label lConnectionStatus

 if inStatus then

   lConnectionStatus.Caption := 'Connected'

 else

 begin

   lConnectionStatus.Caption := 'disconnected : please check the connection';

   // activate auto reconnect

   TimerAutoReconnect.Enabled := true;

 end;

end;

 

procedure TForm1.onSocketRead(inSender: TObject; inSock: TCustomWinSocket);

var iRMsg : string;

    iMyAMQMsg : TMyActiveMQMessage;

begin

  inherited;

  // On Receive Message,

  // 1. show on Log Screen

  iRMsg := timetostr(now) + inSock.ReceiveText;

  MemoLog.Lines.Add(iRMsg);

  MemoLog.Lines.add(' ');

 

  // 2. show on Order Status Screen, by removing HeaderMessage and tailMessage

  iMyAMQMsg := TMyActiveMQMessage.Create;

  try

    iRMsg :=  iMyAMQMsg.getMessage(iRMsg);

  finally

    iMyAMQMsg.Free;

  end;

 

  if trim(iRMsg) <> '' then

  begin

    MemoOrderStatus.Lines.Add(timetostr(now) +': '+ iRMsg);

    MemoOrderStatus.Lines.add(' ');

  end;

end;

 

 

procedure TForm1.TimerAutoReconnectTimer(Sender: TObject);

begin

// auto reconnect process in case network disconnection

 if autoReconnectCounter < 1 then

 begin

   indexOfIPAddress := indexOfIPAddress + 1;

   if indexOfIPAddress > listIPAddresses.Count-1 then

     indexOfIPAddress := 0;

 

   // Re Open Connection

   eIPAddress.Text := listIPAddresses.Strings[indexOfIPAddress];

   myActiveMQConnector.initSocket(eIPAddress.Text, ePort.Text);

   // Stop auto reconnect counter

   autoReconnectCounter := maxReconnectCounter;

   TimerAutoReconnect.Enabled := false;

 end else

 begin

 // update autoreconnect status on screen

   autoReconnectCounter := autoReconnectCounter - 1;

   lConnectionStatus.Caption := 'please wait autoreconnect in '+ inttostr(autoReconnectCounter) +' seconds ';

 end;

 

end;

 

end.

 

Save the project into ClientSide directory then do some test on it, make sure it running properly.

 

4.       Server Application

Create another project for Server Application, and set the GUI as shown on below figure


                 let’s define the components detail

Component

Property

Value

Event

TLabelEdit

Name

eServerID

 

Text

 

 

EditLabel.Caption

Put ServerID (i.e Server#1, Server#2)

 

TLabelEdit

Name

eIPAddress

 

Text

127.0.0.1

 

EditLabel.Caption

IP Address

 

TLabelEdit

Name

ePort

 

Text

61613

 

EditLabel.Caption

Port

 

TLabel

Name

lConnectionStatus

 

Caption

Disconnected

 

TButton

Name

btnConnect

onClick

Caption

Connect

TLabel

Name

lServerLog

 

Caption

Server Log

 

TMemo

Name

MemoLog

 

                The complete code and description as follow

unit uMain;

 

interface

uses

  Winapi.Windows, Winapi.Messages, System.SysUtils, System.Variants, System.Classes, Vcl.Graphics,

  Vcl.Controls, Vcl.Forms, Vcl.Dialogs, Vcl.StdCtrls, Vcl.ExtCtrls, uMyActiveMQConnector, System.Win.ScktComp, uMyActiveMQMessage;

 

type

  TForm1 = class(TForm)

    lConnectionStatus: TLabel;

    lServerLog: TLabel;

    btnConnect: TButton;

    eIPAddress: TLabeledEdit;

    ePort: TLabeledEdit;

    MemoLog: TMemo;

    eServerID: TLabeledEdit;

    procedure FormCreate(Sender: TObject);

    procedure btnConnectClick(Sender: TObject);

    procedure FormDestroy(Sender: TObject);

  private

    { Private declarations }

    topicPublish   : string;

    topicSubscribe : string;

 

    myActiveMQConnector : TMyActiveMQConnector;

    procedure onSocketRead(inSender : TObject; inSock : TCustomWinSocket);

    procedure onSocketConnectionStatus(inSender : TObject; inStatus : boolean);

 

  public

    { Public declarations }

  end;

 

var

  Form1: TForm1;

 

implementation

 

{$R *.dfm}

 

procedure TForm1.btnConnectClick(Sender: TObject);

begin

 // Open Connection

 if trim(eServerID.Text) = ''  then

 begin

   MessageDlg('please put serverID',mtError,[mbOk],0);

   exit;

 end;

 

 myActiveMQConnector.initSocket(eIPAddress.Text, ePort.Text);

end;

 

procedure TForm1.FormCreate(Sender: TObject);

begin

 inherited;

 // Initialize Topics

 topicPublish   := 'ReplyOrder';

 topicSubscribe := 'Order';

 // initialize Connection Status

 myActiveMQConnector := TMyActiveMQConnector.create(topicPublish, topicSubscribe);

 myActiveMQConnector.OnReceiveData := onSocketRead;

 myActiveMQConnector.OnStatus := onSocketConnectionStatus;

 

end;

 

procedure TForm1.FormDestroy(Sender: TObject);

begin

 inherited;

 myActiveMQConnector.Destroy;

end;

 

procedure TForm1.onSocketConnectionStatus(inSender: TObject; inStatus: boolean);

begin

 inherited;

 // Show Connection Status into Label lConnectionStatus

 if inStatus then

   lConnectionStatus.Caption := 'Connected'

 else

   lConnectionStatus.Caption := 'disconnected : please check the connection';

end;

 

procedure TForm1.onSocketRead(inSender: TObject; inSock: TCustomWinSocket);

var iRMsg : string;

    iMyAMQMsg : TMyActiveMQMessage;

begin

  inherited;

  // On Receive Message,

  // 1. show on Log Screen

  iRMsg := timetostr(now) + inSock.ReceiveText;

  MemoLog.Lines.Add(iRMsg);

  MemoLog.Lines.add(' ');

 

  // 2. Show on Order Status Screen, by removing HeaderMessage and tailMessage

  iMyAMQMsg := TMyActiveMQMessage.Create;

  try

    iRMsg :=  iMyAMQMsg.getMessage(iRMsg);

  finally

    iMyAMQMsg.Free;

  end;

 

  if trim(iRMsg) <> '' then

    myActiveMQConnector.publishTopic(iRMsg +' : accepted by '+ eServerID.Text );

end;

end.

 

Save the project into Server directory then do some test, and make sure it running properly

 

Congratulation, our codes are done. Now we are going into Testing step.

 

1.       Start ActiveMQ application

 



 

Make sure Connector STOMP is started. By default STOMP service opened at port 61613, you can change the port number on configuration file. For simulation purpose, it will be better if you have another ActiveMQ on other PC.

 

2.       Start Client Application



 


 

Press connect button, if connection succeeds then ActiveMQ will reply connection status and shown on Server Log

 

3.       Start Server Application

 

Open 2 or more application servers,  and set different name for each server , i.e “Server1”,”Server2”,”Server3”,etc. In this test we will open 2 application servers


 


 

 

4.       Send Order from Client Application

 

Send 10 orders or more, then you will see load balancing function works. close 1 of server application, and then you will see that your orders are still processed.


 

 

 

5.       If you have another ActieveMQ that running on other PC, you can try to close ActiveMQ on main IP Address, then you will see Client application will search automatically into backup IP Addresses, as we have defined in Client Application code.

 

listIPAddresses := TStringList.Create;

 listIPAddresses.Add('127.0.0.1');

 listIPAddresses.Add('10.72.85.11');

 listIPAddresses.Add('10.72.85.12');

 

 

If you only have 1 ActiveMQ which is in your PC only, then you can try to restart the ActiveMQ application, then you will see Client Application will automatically reconnect.


 

That’s all. Next you can play by opening many ActiveMQ and many Server Applications and you also can play with the performance by modifying the codes. 

Thanks for visiting my blogs.

Related product you might see:

Share this product :

Post a Comment

 
Delphi Programming Tutorial © Mang Yadi Site 2015