Skip to content

Commit

Permalink
Connection auto-recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
eduard93 committed Nov 16, 2018
1 parent f2a2c2b commit d016de5
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 18 deletions.
44 changes: 42 additions & 2 deletions RabbitMQ/Common.cls
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,18 @@ Property Encoding As %String;
/// See property AdditionalPaths in that class.
Property ClassPath As %String(MAXLEN = 32000);

/// How many times have we tried reconnecting
/// empty - do not retry
/// 0 - retry ad infinitum
/// n - retry n times
Property RetryCount As %Integer [ InitialExpression = 5 ];

/// How frequently to retry access to the output system.
/// Pause in seconds between retry attempts.
Property RetryInterval As %Numeric(MINVAL = 0) [ InitialExpression = 5 ];

/// These are the production settings for this object
Parameter SETTINGS = "Host:Basic,Port:Basic,VirtualHost:Basic,Queue:Basic,Credentials:Basic:credentialsSelector,JGHost:Java Gateway,JGPort:Java Gateway,JGService:Java Gateway:selector?context={Ens.ContextSearch/ProductionItems?targets=0&productionName=@productionId},ClassPath:Basic,Encoding:Basic";
Parameter SETTINGS = "Host:Basic,Port:Basic,VirtualHost:Basic,Queue:Basic,Credentials:Basic:credentialsSelector,JGHost:Java Gateway,JGPort:Java Gateway,JGService:Java Gateway:selector?context={Ens.ContextSearch/ProductionItems?targets=0&productionName=@productionId},ClassPath:Basic,Encoding:Basic,RetryCount:Alerting,RetryInterval:Alerting";

/// Connect to running JGW
Method Connect() As %Status
Expand Down Expand Up @@ -87,14 +97,44 @@ Method ConnectToRabbitMQ() As %Status
Set pass = "guest"
}

Set port = $select(..Port="":-1, 1:..Port)

Try {
Set ..API = ##class(isc.rabbitmq.API).%New(..JGW, ..Host, ..Port, user, pass, ..VirtualHost, ..Queue, $$$YES, ..Exchange)
Set ..API = ##class(isc.rabbitmq.API).%New(..JGW, ..Host, port, user, pass, ..VirtualHost, ..Queue, $$$YES, ..Exchange)
} Catch ex {
Set sc = $$$ADDSC(ex.AsStatus(),$g(%objlasterror))
}

Quit sc
}

Method IsOpen() As %Status
{
#Dim sc As %Status = $$$OK
Set retryCount = 1
Try {
While '..API.isOpen() {
If ..RetryCount = "" {
Set sc = $$$ERROR($$$GeneralError, "Connection problems. Consider specifying RetryCount and RetryInterval settings")
} ElseIf ((..RetryCount = 0) || (retryCount < ..RetryCount)) {
// wait and retry connecting
Set retryCount = retryCount + 1
Hang ..RetryInterval

// reconnect happens in isOpen method
} Else {
// we're out of reconnect attempts
Set sc = $$$ERROR($$$GeneralError, $$$FormatText("Connection still closed after %1 attempts at reconnecting.", ..RetryCount))
}
Quit:$$$ISERR(sc)
}
} Catch ex {
#Dim ex As %Exception.General
Set sc = ex.AsStatus()
}

Quit sc
}

}

31 changes: 23 additions & 8 deletions RabbitMQ/InboundAdapter.cls
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,36 @@ Method OnTearDown() As %Status
/// Get Messages from RabbitMQ queue.
Method OnTask() As %Status
{
Set sc = $$$OK
#Dim sc As %Status = $$$OK

Set messageCount = 1

While messageCount > 0 {
Set sc = ..IsOpen()
Quit:$$$ISERR(sc)

// List containing metainformation and possibly body (in the case of string interaction) of the RabbitMQ message
#Dim messageList As %ListOfDataTypes

If ..BodyClass = "" {
Set messageList = ..API.readMessageString()
} Else {
#Dim tempStream As %Library.GlobalBinaryStream
Set messageList = ##class(%ListOfDataTypes).%New()
For i=1:1:15 Do messageList.Insert("")
Set tempStream = ..API.readMessageStream(.messageList)
Set messageList = ##class(%ListOfDataTypes).%New()
For i=1:1:15 Do messageList.Insert("")

Try {
If ..BodyClass = "" {
Set messageList = ..API.readMessageString()
} Else {
#Dim tempStream As %Library.GlobalBinaryStream
Set tempStream = ..API.readMessageStream(.messageList)
}
} Catch ex {
#Dim ex As %Exception.General
If ($ZE["<ZJGTW>") {
Set sc = ..IsOpen()
Set:$$$ISERR(sc) sc = $$$ADDSC(sc, ex.AsStatus())
} Else {
Set sc = ex.AsStatus()
}
Quit:$$$ISERR(sc)
}

Set messageLength = messageList.GetAt(1)
Expand Down
42 changes: 34 additions & 8 deletions RabbitMQ/OutboundAdapter.cls
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,23 @@ Method SendMessage(message As %Stream.Object) As %Status
Do stream.Write(..EncodeMessageBody(message))
}

Try {
Do ..API.sendMessage(stream)
} Catch ex {
Set sc = ex.AsStatus()
#Dim attempts As %Integer = 0

While attempts<2 {
Set attempts = attempts + 1
Try {
Do ..API.sendMessage(stream)
Return sc
} Catch ex {
#Dim ex As %Exception.General
If ($ZE["<ZJGTW>") {
Set sc = ..IsOpen()
Set:$$$ISERR(sc) sc = $$$ADDSC(sc, ex.AsStatus())
} Else {
Set sc = ex.AsStatus()
}
Return:$$$ISERR(sc) sc // quit if reconnect is unsuccessful or we got unknown exception
}
}
Quit sc
}
Expand All @@ -57,11 +70,24 @@ Method SendMessageToQueue(queue As %String, message As %Stream.Object) As %Statu
} Else {
Do stream.Write(..EncodeMessageBody(message))
}

#Dim attempts As %Integer = 0

Try {
Do ..API.sendMessageToQueue(queue, stream)
} Catch ex {
Set sc = ex.AsStatus()
While attempts<2 {
Set attempts = attempts + 1
Try {
Do ..API.sendMessageToQueue(queue, stream)
Return sc
} Catch ex {
#Dim ex As %Exception.General
If ($ZE["<ZJGTW>") {
Set sc = ..IsOpen()
Set:$$$ISERR(sc) sc = $$$ADDSC(sc, ex.AsStatus())
} Else {
Set sc = ex.AsStatus()
}
Return:$$$ISERR(sc) sc // quit if reconnect is unsuccessful or we got unknown exception
}
}
Quit sc
}
Expand Down

0 comments on commit d016de5

Please sign in to comment.