How to
Develop High Availability and load balancing Simulation System with Delphi and
ActiveMQ
By : M.
Yadi M
In this article shows how to develop High Availability and load balancing simulation
system by mixing Delphi and Active MQ. Before
we go into detail, it’s necessary to know definition of High Availability and Load
Balancing.
High availability (HA) system refers to
systems that continuously operational for long time.
Load Balancing system is dividing the amount
of work between 2 processors or more, in general it can make process faster.
There are a lot of samples of implementation
HA system. In this article will show HA simulation on ATM (Automatic teller
machine) service, as you know that a High availability level is very important for ATM
Service.
Before we go into the development, let’s
first create the design.
As you can see on the figure, it shows 3 big
modules,
1.
Client Application
This module is a presentation layer that will
be used by customer to send request.
It will use active-standby mechanism with automatic
reconnection feature in case of network disconnection.
2.
Queue Service
This module handles automatic message
distribution and load balancing. For this module we will use ActiveMQ.
Since ActiveMQ support STOMP (Simple Text
Oriented Message Protocol), it makes easy to integrate with any programming
language
3.
Server Application
This module will process and response for
every message request.
Another important thing is a message flow
design, the details are described in below figure
The idea of the above figure is quite simple.
Client application PUBLISH topic=’ORDER’ to ActiveMQ and Server application SUBSCRIBE
Topic=’ORDER’ to ActiveMQ, once Client application publish message with topic = ‘ORDER’, ActiveMQ distributes the message automatically to the SUBSCRIBER, here
server application act as the subscriber. And for the reply message, Server
application PUBLISH topic =’ORDERREPLY’ to ActiveMQ, and Client applicatuon
SUBSCRIBE topic = ‘ORDERREPLY’ to ActiveMQ, once Server application processed the messages and publish
topic = ‘ORDERREPLY’, ActiveMQ distributes the message automatically to SUBSCRIBER, this part
Client application act as the subscriber.
Now you have got the idea of the process. On
the next step, we have to define message format and Communication protocol. The
detail as follow:
1.
Message format
There are many standard message formats that
can be used for the communication, such as XML, Jason, etc. Since this article only for simulation
purpose, we use the most simplest message format in the World J, the detail as follow:
Item
|
Value
|
Header
|
[MSGHEADER]
|
Body
|
Free text
|
Tail
|
[MSGTAIL]
|
2.
Communication Protocol with ActiveMQ
Another important thing is a working
directory, to manage project files. I make 3 directories with following
details:
1. LIB
directory: To save all general units
2. ClientSide
directory: To save Client application
project files
3. ServerSide
directory: To save Server application project files
The result as shown
on below figure
After the design completed then we are ready
for the codes. The steps as follow:
1.
Define Message format
Message format is used to manage standard message
for communication between applications. The code as follow:
unit uMyActiveMQMessage;
interface
// Message Class,
to manage communication between Applications
Type
TMyActiveMQMessage = class
private
mHeader : string;
mTail : string;
public
constructor create();
destructor destroy();override;
function setMessage(inMsg : string) : string;
function getMessage(inMsg : string) : string;
end;
implementation
{TMyActiveMQMessage}
constructor TMyActiveMQMessage.create;
begin
mHeader := '[MSGHEADER]';
mTail := '[MSGTAIL]';
end;
destructor TMyActiveMQMessage.destroy;
begin
inherited;
end;
function TMyActiveMQMessage.getMessage(inMsg: string): string;
var P1, p2 : integer;
begin
// Show Message Only with Ignoring ActiveMQ message
p1 := pos(mHeader,inMsg);
p2 := pos(mTail,inMsg);
result := Copy(inMsg,
p1+length(mHeader), p2-p1-length(mHeader));
end;
function TMyActiveMQMessage.setMessage(inMsg: string): string;
begin
// Set Message Header and Tail
result := mHeader + inMsg +
mTail;
end;
end.
|
Save the code into LIB directory, with
filename “uMyActiveMQMessage.pas”
2.
Stomp Connector
Stomp connector is used to manage connection
between applications and ActiveMQ.
unit uMyActiveMQConnector;
interface
uses System.SysUtils, System.Win.ScktComp, Vcl.Dialogs, uMyActiveMQMessage;
Type
TOnReceived = procedure (inSender : TObject; inSock : TCustomWinSocket) of object;
TOnConnectionStatus = procedure (inSender : TObject; inStatus : boolean) of object;
TMyActiveMQConnector = class
private
topicPublish : string;
topicSubscribe : string;
FReceiveHandle : TOnReceived;
FConnectionStatus :
TOnConnectionStatus;
socketObject : TClientSocket;
procedure connect;
procedure closeSocket;
procedure onSocketConnect(inSender : TObject; inSock : TCustomWinSocket);
procedure onSocketDisconnect(inSender : TObject; inSock : TCustomWinSocket);
procedure onSocketRead(inSender : TObject; inSock : TCustomWinSocket);
procedure onSocketError(inSender : TObject; inSock : TCustomWinSocket;
inErrorEvent : TErrorEvent; var vErrorCode : integer );
public
constructor create(inTopicPublish, inTopicSubscribe : string);
destructor destroy; override;
procedure initSocket(inAddress : string;inPort : string);
procedure subscribeTopic;
procedure publishTopic(inMessage : string);
property OnReceiveData : TOnReceived read FReceiveHandle write FReceiveHandle;
property OnStatus : TOnConnectionStatus read FConnectionStatus write
FConnectionStatus;
end;
implementation
{
TMyActiveMQConnector }
procedure TMyActiveMQConnector.initSocket(inAddress: string; inPort: string);
var iMsg : string;
begin
closeSocket;
socketObject :=
TClientSocket.Create(nil);
try
socketObject.Address := inAddress;
socketObject.Port := StrToInt(inPort);
socketObject.OnRead := onSocketRead;
socketObject.OnConnect := onSocketConnect;
socketObject.OnDisconnect := onSocketDisconnect;
socketObject.OnError := onSocketError;
socketObject.Open;
except
closeSocket;
end;
end;
procedure TMyActiveMQConnector.publishTopic(inMessage: string);
var iMsg : string;
iMyAMQMsg :
TMyActiveMQMessage;
begin
if not socketObject.Active then
exit;
// if connectionstatus is established then
Customer is allowed to send Order
iMyAMQMsg :=
TMyActiveMQMessage.Create;
iMsg := 'SEND'+ #10 +'destination:/queue/'+
topicPublish + #10#10 + iMyAMQMsg.setMessage(inMessage) + #0;
try
socketObject.Socket.SendText(iMsg);
finally
FreeAndNil(iMyAMQMsg);
end;
end;
procedure TMyActiveMQConnector.subscribeTopic;
var iMsg : string;
begin
if not socketObject.Active then
exit;
// if socketObject is Active then
send Subsribe Topic
iMsg := 'SUBSCRIBE'+ #10
+'destination:/queue/'+ topicSubscribe
+ #10#10#0;
socketObject.Socket.SendText(iMsg);
try
socketObject.Socket.SendText(iMsg);
except
MessageDlg('Error On
subscribe',mtError,[mbOk],0);
end;
end;
procedure TMyActiveMQConnector.connect;
// Send Request To
Connect
var iMsg : string;
begin
if not socketObject.Active then
exit;
iMsg := 'CONNECT'+#10#10#0;
try
socketObject.Socket.SendText(iMsg);
except
end;
end;
constructor TMyActiveMQConnector.create(inTopicPublish, inTopicSubscribe :
string);
begin
// initialize socket Object
socketObject := TClientSocket.Create(nil);
topicPublish := inTopicPublish;
topicSubscribe:=
inTopicSubscribe;
end;
destructor TMyActiveMQConnector.destroy;
begin
inherited;
end;
procedure TMyActiveMQConnector.closeSocket;
// Destroy Socket
Object
begin
if Assigned(socketObject) then
begin
try
socketObject.Close;
except
end;
FreeAndNil(socketObject);
end;
end;
procedure TMyActiveMQConnector.onSocketConnect(inSender: TObject; inSock:
TCustomWinSocket);
begin
inherited;
// On Socket Connected imediatelly open status and send Connect
Request
connect;
// following by Topic Subscription
subscribeTopic;
if assigned(FConnectionStatus) then
FConnectionStatus(inSender,
true);
end;
procedure TMyActiveMQConnector.onSocketDisconnect(inSender: TObject;
inSock: TCustomWinSocket);
begin
inherited;
// On Socket Disconnection, close the status
if assigned(FConnectionStatus) then
FConnectionStatus(inSender,
false);
end;
procedure TMyActiveMQConnector.onSocketError(inSender: TObject;
inSock: TCustomWinSocket;
inErrorEvent: TErrorEvent; var vErrorCode: integer);
begin
inherited;
// On Socket Error, close status
if assigned(FConnectionStatus) then
FConnectionStatus(inSender,
false);
vErrorCode := 0;
end;
procedure TMyActiveMQConnector.onSocketRead(inSender: TObject;
inSock: TCustomWinSocket);
begin
inherited;
if assigned(FReceiveHandle) then
FReceiveHandle(inSender,
inSock);
end;
end.
|
Save the code into LIB directory with file
name “uMyActiveMQConnector.pas”
3.
Client Application
Create new Project for Client Application, and
set the GUI as shown on below figure
There
some component on the form, let’s define one by one
Component
|
Property
|
Value
|
Event
|
TLabelEdit
|
Name
|
eIPAddress
|
|
Text
|
127.0.0.1
|
|
EditLabel.Caption
|
IP Address
|
|
TLabelEdit
|
Name
|
ePort
|
|
Text
|
61613
|
|
EditLabel.Caption
|
Port
|
|
TLabelEdit
|
Name
|
eOrderMessage
|
|
Text
|
0
|
|
EditLabel.Caption
|
Put your Withdrawal
Amount (i.e 100)
|
|
TLabel
|
Name
|
lConnectionStatus
|
|
Caption
|
Disconnected
|
|
TButton
|
Name
|
btnConnect
|
onClick
|
Caption
|
Connect
|
TButton
|
Name
|
btnOrder
|
onClick
|
Caption
|
Send Order
|
TLabel
|
Name
|
lOrderStatus
|
|
Caption
|
Your Order Status
|
|
TLabel
|
Name
|
lServerLog
|
|
Caption
|
Server Log
|
|
TMemo
|
Name
|
MemoOrderStatus
|
|
TMemo
|
Name
|
MemoLog
|
|
TTimer
|
Name
|
TimerAutoReconnect
|
OnTimer
|
Interval
|
1000
|
Enabled
|
False
|
The
complete code and description as follow
unit uMain;
interface
uses
Winapi.Windows,
Winapi.Messages, System.SysUtils, System.Variants, System.Classes,
Vcl.Graphics,
Vcl.Controls, Vcl.Forms,
Vcl.Dialogs, Vcl.StdCtrls,
Vcl.ExtCtrls, Vcl.ComCtrls, uMyActiveMQConnector, System.Win.ScktComp,
uMyActiveMQMessage;
type
TForm1 = class(TForm)
btnConnect: TButton;
eIPAddress: TLabeledEdit;
ePort: TLabeledEdit;
lOrderStatus: TLabel;
MemoOrderStatus: TMemo;
MemoLog: TMemo;
lServerLog: TLabel;
TimerAutoReconnect: TTimer;
Panel1: TPanel;
lConnectionStatus: TLabel;
btnOrder: TButton;
eOrderMessage: TLabeledEdit;
procedure btnConnectClick(Sender: TObject);
procedure btnOrderClick(Sender: TObject);
procedure FormCreate(Sender: TObject);
procedure FormDestroy(Sender: TObject);
procedure TimerAutoReconnectTimer(Sender: TObject);
private
{ Private declarations }
topicPublish : string;
topicSubscribe : string;
listIPAddresses :
TStringList;
indexOfIPAddress: integer;
autoReconnectCounter : byte;
maxReconnectCounter : byte;
myActiveMQConnector : TMyActiveMQConnector;
procedure onSocketRead(inSender : TObject; inSock : TCustomWinSocket);
procedure onSocketConnectionStatus(inSender : TObject; inStatus : boolean);
public
{ Public declarations }
end;
var
Form1: TForm1;
implementation
{$R *.dfm}
procedure TForm1.FormCreate(Sender: TObject);
begin
inherited;
// initialize variables
// 3 seconds for
automatic reconnection incase of network disconnection
maxReconnectCounter := 3; autoReconnectCounter := maxReconnectCounter;
// Initialize Topics
topicPublish :=
'Order';
topicSubscribe := 'ReplyOrder';
// initialize IPAddresses
// as described on
our design, we use active-standby mechanism for connection method, so we can
define ActiveMQ IP Addresses more than 1 IP Address. This sample define 3 IP
Addresses, you can have more if you like
listIPAddresses :=
TStringList.Create;
listIPAddresses.Add('127.0.0.1'); // Primary IP Address
listIPAddresses.Add('10.72.85.11'); // Backup site 1 IP Address
listIPAddresses.Add('10.72.85.12');
// Backup site 2 IP Address
indexOfIPAddress := 0;
eIPAddress.Text :=
listIPAddresses.Strings[indexOfIPAddress];
// initialize Connection Status
myActiveMQConnector :=
TMyActiveMQConnector.create(topicPublish, topicSubscribe);
myActiveMQConnector.OnReceiveData :=
onSocketRead;
myActiveMQConnector.OnStatus :=
onSocketConnectionStatus;
end;
procedure TForm1.FormDestroy(Sender: TObject);
begin
inherited;
myActiveMQConnector.Destroy;
listIPAddresses.Free;
end;
procedure TForm1.btnConnectClick(Sender: TObject);
begin
// Open Connection
myActiveMQConnector.initSocket(eIPAddress.Text,
ePort.Text);
end;
procedure TForm1.btnOrderClick(Sender: TObject);
begin
// Send Order
myActiveMQConnector.publishTopic(eOrderMessage.Text);
end;
procedure TForm1.onSocketConnectionStatus(inSender: TObject; inStatus: boolean);
begin
inherited;
// Show Connection Status into Label
lConnectionStatus
if inStatus then
lConnectionStatus.Caption :=
'Connected'
else
begin
lConnectionStatus.Caption :=
'disconnected : please check the connection';
// activate auto reconnect
TimerAutoReconnect.Enabled :=
true;
end;
end;
procedure TForm1.onSocketRead(inSender: TObject; inSock: TCustomWinSocket);
var iRMsg : string;
iMyAMQMsg :
TMyActiveMQMessage;
begin
inherited;
// On Receive Message,
// 1. show on Log Screen
iRMsg := timetostr(now) +
inSock.ReceiveText;
MemoLog.Lines.Add(iRMsg);
MemoLog.Lines.add(' ');
// 2. show on Order Status Screen, by removing HeaderMessage and
tailMessage
iMyAMQMsg :=
TMyActiveMQMessage.Create;
try
iRMsg := iMyAMQMsg.getMessage(iRMsg);
finally
iMyAMQMsg.Free;
end;
if trim(iRMsg) <> '' then
begin
MemoOrderStatus.Lines.Add(timetostr(now) +': '+ iRMsg);
MemoOrderStatus.Lines.add(' ');
end;
end;
procedure TForm1.TimerAutoReconnectTimer(Sender: TObject);
begin
// auto reconnect
process in case network disconnection
if autoReconnectCounter < 1 then
begin
indexOfIPAddress :=
indexOfIPAddress + 1;
if indexOfIPAddress > listIPAddresses.Count-1 then
indexOfIPAddress := 0;
// Re Open Connection
eIPAddress.Text :=
listIPAddresses.Strings[indexOfIPAddress];
myActiveMQConnector.initSocket(eIPAddress.Text, ePort.Text);
// Stop auto reconnect counter
autoReconnectCounter :=
maxReconnectCounter;
TimerAutoReconnect.Enabled :=
false;
end else
begin
// update autoreconnect status on screen
autoReconnectCounter :=
autoReconnectCounter - 1;
lConnectionStatus.Caption :=
'please wait autoreconnect in '+ inttostr(autoReconnectCounter) +' seconds ';
end;
end;
end.
|
Save the project into
ClientSide directory then do some test on it, make sure it running properly.
4.
Server Application
Create another project for Server Application,
and set the GUI as shown on below figure
let’s define the components detail
Component
|
Property
|
Value
|
Event
|
TLabelEdit
|
Name
|
eServerID
|
|
Text
|
|
|
EditLabel.Caption
|
Put ServerID (i.e
Server#1, Server#2)
|
|
TLabelEdit
|
Name
|
eIPAddress
|
|
Text
|
127.0.0.1
|
|
EditLabel.Caption
|
IP Address
|
|
TLabelEdit
|
Name
|
ePort
|
|
Text
|
61613
|
|
EditLabel.Caption
|
Port
|
|
TLabel
|
Name
|
lConnectionStatus
|
|
Caption
|
Disconnected
|
|
TButton
|
Name
|
btnConnect
|
onClick
|
Caption
|
Connect
|
TLabel
|
Name
|
lServerLog
|
|
Caption
|
Server Log
|
|
TMemo
|
Name
|
MemoLog
|
|
The
complete code and description as follow
unit uMain;
interface
uses
Winapi.Windows,
Winapi.Messages, System.SysUtils, System.Variants, System.Classes,
Vcl.Graphics,
Vcl.Controls, Vcl.Forms,
Vcl.Dialogs, Vcl.StdCtrls, Vcl.ExtCtrls, uMyActiveMQConnector, System.Win.ScktComp,
uMyActiveMQMessage;
type
TForm1 = class(TForm)
lConnectionStatus: TLabel;
lServerLog: TLabel;
btnConnect: TButton;
eIPAddress: TLabeledEdit;
ePort: TLabeledEdit;
MemoLog: TMemo;
eServerID: TLabeledEdit;
procedure FormCreate(Sender:
TObject);
procedure
btnConnectClick(Sender: TObject);
procedure FormDestroy(Sender:
TObject);
private
{ Private declarations }
topicPublish : string;
topicSubscribe : string;
myActiveMQConnector :
TMyActiveMQConnector;
procedure
onSocketRead(inSender : TObject; inSock : TCustomWinSocket);
procedure
onSocketConnectionStatus(inSender : TObject; inStatus : boolean);
public
{ Public declarations }
end;
var
Form1: TForm1;
implementation
{$R *.dfm}
procedure TForm1.btnConnectClick(Sender: TObject);
begin
// Open Connection
if trim(eServerID.Text) = '' then
begin
MessageDlg('please put
serverID',mtError,[mbOk],0);
exit;
end;
myActiveMQConnector.initSocket(eIPAddress.Text,
ePort.Text);
end;
procedure TForm1.FormCreate(Sender: TObject);
begin
inherited;
// Initialize Topics
topicPublish := 'ReplyOrder';
topicSubscribe := 'Order';
// initialize Connection Status
myActiveMQConnector :=
TMyActiveMQConnector.create(topicPublish, topicSubscribe);
myActiveMQConnector.OnReceiveData :=
onSocketRead;
myActiveMQConnector.OnStatus :=
onSocketConnectionStatus;
end;
procedure TForm1.FormDestroy(Sender: TObject);
begin
inherited;
myActiveMQConnector.Destroy;
end;
procedure TForm1.onSocketConnectionStatus(inSender: TObject; inStatus: boolean);
begin
inherited;
// Show Connection Status into Label
lConnectionStatus
if inStatus then
lConnectionStatus.Caption :=
'Connected'
else
lConnectionStatus.Caption :=
'disconnected : please check the connection';
end;
procedure TForm1.onSocketRead(inSender: TObject; inSock: TCustomWinSocket);
var iRMsg : string;
iMyAMQMsg :
TMyActiveMQMessage;
begin
inherited;
// On Receive Message,
// 1. show on Log Screen
iRMsg := timetostr(now) +
inSock.ReceiveText;
MemoLog.Lines.Add(iRMsg);
MemoLog.Lines.add(' ');
// 2. Show on Order Status Screen, by
removing HeaderMessage and tailMessage
iMyAMQMsg :=
TMyActiveMQMessage.Create;
try
iRMsg := iMyAMQMsg.getMessage(iRMsg);
finally
iMyAMQMsg.Free;
end;
if trim(iRMsg) <> '' then
myActiveMQConnector.publishTopic(iRMsg +' : accepted by '+
eServerID.Text );
end;
end.
|
Save the project into Server directory then do some test, and make sure
it running properly
Congratulation, our
codes are done. Now we are going into Testing step.
1.
Start ActiveMQ application
Make sure Connector STOMP is started. By
default STOMP service opened at port 61613, you can change the port number on
configuration file. For simulation purpose, it will be better if you have
another ActiveMQ on other PC.
2.
Start Client Application
Press connect button, if connection succeeds
then ActiveMQ will reply connection status and shown on Server Log
3.
Start Server Application
Open 2 or more application servers, and set different name for each server , i.e
“Server1”,”Server2”,”Server3”,etc. In this test we will open 2 application
servers
4.
Send Order from Client Application
Send 10 orders or more, then you will see
load balancing function works. close 1 of server application, and then you will
see that your orders are still processed.
5.
If you have another ActieveMQ that running on
other PC, you can try to close ActiveMQ on main IP Address, then you will see
Client application will search automatically into backup IP Addresses, as we
have defined in Client Application code.
listIPAddresses := TStringList.Create;
listIPAddresses.Add('127.0.0.1');
listIPAddresses.Add('10.72.85.11');
listIPAddresses.Add('10.72.85.12');
|
If you only have 1 ActiveMQ which is in your
PC only, then you can try to restart the ActiveMQ application, then you will
see Client Application will automatically reconnect.
That’s all. Next you can play by opening many
ActiveMQ and many Server Applications and you also can play with the
performance by modifying the codes.
Thanks for visiting my blogs.
Related product you might see:
Post a Comment