#include <mspdmn.h>
#include <mspscada.h>
void
* mspBuildScadaConnectMsg( HCONNCB *pHconn,
long
bufLength,
void
*ipcBuffer,
long
*msgLength ){
long
fHLength = 0;
long
wtLength = 0;
long
wtLengthUt = 0;
long
wmLength = 0;
long
rLength = 12;
long
ciLength = 0;
char
*tmpPtr;
CONN_PARMS *pMspCp;
long
totalLength = 0;
char
*connectMsg = NULL;
char
*pWillMsg = NULL;
char
*pWillTopic = NULL;
size_t
cleanStartOffset = 0;
u_short keepAlive;
pHconn->apiReturnCode = MQISDP_OK;
pMspCp = (CONN_PARMS*)ipcBuffer;
*msgLength = 0;
ciLength =
strlen
( pMspCp->clientId );
rLength += ciLength + 2;
if
( pMspCp->options & MQISDP_WILL ) {
tmpPtr = (
char
*)pMspCp +
sizeof
(CONN_PARMS);
memcpy
( &wtLengthUt, tmpPtr,
sizeof
(
long
) );
if
( wtLengthUt > 0 ) {
pWillTopic = tmpPtr +
sizeof
(
long
);
wtLength = mspCharTrim(
' '
, wtLengthUt, pWillTopic );
rLength += wtLength + 2;
}
else
{
pHconn->apiReturnCode = MQISDP_NO_WILL_TOPIC;
return
NULL;
}
tmpPtr += wtLengthUt +
sizeof
(
long
);
memcpy
( &wmLength, tmpPtr,
sizeof
(
long
) );
if
( wmLength > 0 ) {
pWillMsg = tmpPtr +
sizeof
(
long
);
wmLength = mspCharTrim(
' '
, wmLength, pWillMsg );
rLength += wmLength + 2;
}
}
MSP_CALC_FHEADER_LENGTH( rLength, fHLength );
totalLength = fHLength + rLength;
if
( fHLength == -1 ) {
pHconn->apiReturnCode = MQISDP_DATA_TOO_BIG;
}
if
( pHconn->apiReturnCode == MQISDP_OK ) {
connectMsg = (
char
*)mspMalloc( &(pHconn->comParms), (
size_t
)totalLength );
if
( connectMsg == NULL ) {
pHconn->apiReturnCode = MQISDP_OUT_OF_MEMORY;
return
NULL;
}
tmpPtr = connectMsg;
*tmpPtr = 0x00 | MSP_CONNECT;
tmpPtr++;
mspEncodeFHeaderLength( rLength, tmpPtr );
tmpPtr = connectMsg + fHLength;
mspUTFEncodeString( MSP_PROTOCOL_NAME_SZ, MSP_PROTOCOL_NAME, tmpPtr );
tmpPtr += MSP_PROTOCOL_NAME_SZ + 2;
*tmpPtr = MSP_PROTOCOL_VERSION_3;
tmpPtr++;
cleanStartOffset = tmpPtr - connectMsg;
*tmpPtr = 0x00;
if
( pMspCp->options & MQISDP_WILL ) {
*tmpPtr |= MSPC_WILL;
}
if
( pMspCp->options & MQISDP_WILL_RETAIN ) {
*tmpPtr |= MSPC_WILL_RETAIN;
}
if
( pMspCp->options & MQISDP_CLEAN_START ) {
pHconn->ctrlFlags |= MSP_CLEAN_SESSION;
*tmpPtr |= MSPC_CLEAN_START;
pHconn->reconnect.connRetries = -1;
}
else
{
pHconn->ctrlFlags &= MSP_CLEAN_SESSION_OFF;
}
if
( pMspCp->options & MQISDP_QOS_2 ) {
*tmpPtr |= MSPC_QOS_2;
}
else
if
( pMspCp->options & MQISDP_QOS_1 ) {
*tmpPtr |= MSPC_QOS_1;
}
tmpPtr ++;
keepAlive = htons( (u_short)pMspCp->keepAliveTime );
memcpy
( tmpPtr, &keepAlive,
sizeof
(u_short) );
tmpPtr +=
sizeof
(u_short);
mspUTFEncodeString( (u_short)ciLength, pMspCp->clientId, tmpPtr );
tmpPtr += ciLength + 2;
if
( pMspCp->options & MQISDP_WILL ) {
if
( wtLength > 0 ) {
mspUTFEncodeString( (u_short)wtLength, pWillTopic, tmpPtr );
tmpPtr += wtLength + 2;
}
if
( wmLength > 0 ) {
mspUTFEncodeString( (u_short)wmLength, pWillMsg, tmpPtr );
tmpPtr += wmLength + 2;
}
}
pHconn->reconnect.connMsgSz = totalLength;
pHconn->reconnect.connectMsg = (
char
*)mspMalloc( &(pHconn->comParms), totalLength );
if
( pHconn->reconnect.connectMsg == NULL ) {
mspFree( &(pHconn->comParms), connectMsg, totalLength );
pHconn->apiReturnCode = MQISDP_OUT_OF_MEMORY;
return
NULL;
}
memcpy
( pHconn->reconnect.connectMsg, connectMsg, totalLength );
*(pHconn->reconnect.connectMsg + cleanStartOffset) &= MSPC_CLEAN_START_OFF;
mspLog( LOGSCADA, &(pHconn->comParms),
"CONNECT\n"
);
*msgLength = totalLength;
}
return
connectMsg;
}
void
* mspBuildScadaDisconnectMsg( HCONNCB *pHconn,
long
bufLength,
void
*ipcBuffer,
long
*msgLength ){
long
totalLength = 2;
char
*discMsg = NULL;
pHconn->apiReturnCode = MQISDP_OK;
*msgLength = 0;
discMsg = (
char
*)mspMalloc( &(pHconn->comParms), (
size_t
)totalLength );
if
( discMsg != NULL) {
*discMsg = 0x00 | (
char
)MSP_DISCONNECT;
*(discMsg+1) = 0x00;
mspLog( LOGSCADA, &(pHconn->comParms),
"DISCONNECT\n"
);
*msgLength = totalLength;
}
else
{
pHconn->apiReturnCode = MQISDP_OUT_OF_MEMORY;
}
return
discMsg;
}
void
* mspBuildScadaSubscribeMsg( HCONNCB *pHconn,
long
bufLength,
void
*ipcBuffer,
long
*msgLength ){
long
fHLength = 0;
long
rLength = 2;
long
tLength;
long
tLengthUt;
SUB_PARMS *pMspSp;
char
*endPtr;
char
*tmpPtr;
char
*topicPtr;
char
*subMsg = NULL;
long
totalLength = 0;
long
topicQoS;
u_short msgId;
pHconn->apiReturnCode = MQISDP_OK;
pMspSp = (SUB_PARMS*)ipcBuffer;
endPtr = (
char
*)pMspSp + pMspSp->strucLength;
*msgLength = 0;
tmpPtr = (
char
*)pMspSp +
sizeof
(SUB_PARMS);
while
( tmpPtr < endPtr ) {
if
( tmpPtr <= (endPtr -
sizeof
(
long
)) ) {
memcpy
( &tLengthUt, tmpPtr,
sizeof
(
long
) );
tLength = mspCharTrim(
' '
, tLengthUt, tmpPtr +
sizeof
(
long
) );
rLength += tLength + 3;
}
else
{
pHconn->apiReturnCode = MQISDP_INVALID_STRUC_LENGTH;
break
;
}
tmpPtr +=
sizeof
(
long
) + tLengthUt +
sizeof
(
long
);
if
( tmpPtr > endPtr ) {
pHconn->apiReturnCode = MQISDP_INVALID_STRUC_LENGTH;
break
;
}
}
MSP_CALC_FHEADER_LENGTH( rLength, fHLength );
totalLength = fHLength + rLength;
if
( pHconn->outQ.numBytesQueued + totalLength +
sizeof
(IPQ) > MSP_DEFAULT_MAX_OUTQ_SZ ) {
pHconn->apiReturnCode = MQISDP_Q_FULL;
}
if
( fHLength == -1 ) {
pHconn->apiReturnCode = MQISDP_DATA_TOO_BIG;
}
if
( pHconn->apiReturnCode == MQISDP_OK ) {
subMsg = (
char
*)mspMalloc( &(pHconn->comParms), (
size_t
)totalLength );
if
( subMsg == NULL ) {
pHconn->apiReturnCode = MQISDP_OUT_OF_MEMORY;
return
NULL;
}
tmpPtr = subMsg;
*tmpPtr = 0x00 | (
char
)MSP_SUBSCRIBE | (
char
)MSPF_QOS_1 ;
tmpPtr++;
mspEncodeFHeaderLength( rLength, tmpPtr );
tmpPtr = subMsg + fHLength;
msgId = htons( pHconn->nextMsgId );
memcpy
( tmpPtr, &msgId,
sizeof
(u_short) );
tmpPtr +=
sizeof
(u_short);
topicPtr = (
char
*)pMspSp +
sizeof
(SUB_PARMS);
while
( topicPtr < endPtr ) {
memcpy
( &tLengthUt, topicPtr,
sizeof
(
long
) );
tLength = mspCharTrim(
' '
, tLengthUt, topicPtr +
sizeof
(
long
) );
topicPtr +=
sizeof
(
long
);
mspUTFEncodeString( (u_short)tLength, topicPtr, tmpPtr );
tmpPtr += tLength + 2;
topicPtr += tLengthUt;
memcpy
( &topicQoS, topicPtr,
sizeof
(
long
) );
if
( topicQoS & MQISDP_QOS_2 ) {
*tmpPtr = MSPS_QOS_2;
}
else
if
( topicQoS & MQISDP_QOS_1 ) {
*tmpPtr = MSPS_QOS_1;
}
else
{
*tmpPtr = MSPS_QOS_0;
}
mspLog( LOGSCADA, &(pHconn->comParms),
"SUBSCRIBE,topic:<%.*s>,QoS:%d\n"
,
tLength, topicPtr - tLengthUt, *tmpPtr );
tmpPtr++;
topicPtr +=
sizeof
(
long
);
}
*msgLength = totalLength;
}
return
subMsg;
}
void
* mspBuildScadaUnsubscribeMsg( HCONNCB *pHconn,
long
bufLength,
void
*ipcBuffer,
long
*msgLength ){
long
fHLength = 0;
long
rLength = 2;
long
tLength;
long
tLengthUt;
UNSUB_PARMS *pMspUp;
char
*endPtr;
char
*tmpPtr;
char
*topicPtr;
char
*unsubMsg = NULL;
long
totalLength = 0;
u_short msgId;
pHconn->apiReturnCode = MQISDP_OK;
pMspUp = (UNSUB_PARMS*)ipcBuffer;
endPtr = (
char
*)pMspUp + pMspUp->strucLength;
*msgLength = 0;
tmpPtr = (
char
*)pMspUp +
sizeof
(UNSUB_PARMS);
while
( tmpPtr < endPtr ) {
if
( tmpPtr <= (endPtr -
sizeof
(
long
)) ) {
memcpy
( &tLengthUt, tmpPtr,
sizeof
(
long
) );
tLength = mspCharTrim(
' '
, tLengthUt, tmpPtr +
sizeof
(
long
) );
rLength += tLength + 2;
}
else
{
pHconn->apiReturnCode = MQISDP_INVALID_STRUC_LENGTH;
break
;
}
tmpPtr +=
sizeof
(
long
) + tLengthUt;
if
( tmpPtr > endPtr ) {
pHconn->apiReturnCode = MQISDP_INVALID_STRUC_LENGTH;
break
;
}
}
MSP_CALC_FHEADER_LENGTH( rLength, fHLength );
totalLength = fHLength + rLength;
if
( pHconn->outQ.numBytesQueued + totalLength +
sizeof
(IPQ) > MSP_DEFAULT_MAX_OUTQ_SZ ) {
pHconn->apiReturnCode = MQISDP_Q_FULL;
}
if
( fHLength == -1 ) {
pHconn->apiReturnCode = MQISDP_DATA_TOO_BIG;
}
if
( pHconn->apiReturnCode == MQISDP_OK ) {
unsubMsg = (
char
*)mspMalloc( &(pHconn->comParms), (
size_t
)totalLength );
if
( unsubMsg == NULL ) {
pHconn->apiReturnCode = MQISDP_OUT_OF_MEMORY;
return
NULL;
}
tmpPtr = unsubMsg;
*tmpPtr = 0x00 | (
char
)MSP_UNSUBSCRIBE | (
char
)MSPF_QOS_1 ;
tmpPtr++;
mspEncodeFHeaderLength( rLength, tmpPtr );
tmpPtr = unsubMsg + fHLength;
msgId = htons( pHconn->nextMsgId );
memcpy
( tmpPtr, &msgId,
sizeof
(u_short) );
tmpPtr +=
sizeof
(u_short);
topicPtr = (
char
*)pMspUp +
sizeof
(UNSUB_PARMS);
while
( topicPtr < endPtr ) {
memcpy
( &tLengthUt, topicPtr,
sizeof
(
long
) );
tLength = mspCharTrim(
' '
, tLengthUt, topicPtr +
sizeof
(
long
) );
topicPtr +=
sizeof
(
long
);
mspUTFEncodeString( (u_short)tLength, topicPtr, tmpPtr );
tmpPtr += tLength + 2;
topicPtr += tLengthUt;
mspLog( LOGSCADA, &(pHconn->comParms),
"UNSUBSCRIBE,topic:<%.*s>\n"
,
tLength, topicPtr-tLengthUt );
}
*msgLength = totalLength;
}
return
unsubMsg;
}
void
* mspBuildScadaPublishMsg( HCONNCB *pHconn,
long
bufLength,
void
*ipcBuffer,
long
*msgLength ){
long
fHLength = 0;
long
rLength = 0;
long
dLength;
PUB_PARMS *pMspPp;
char
*tmpPtr;
char
*pubMsg = NULL;
long
totalLength = 0;
pHconn->apiReturnCode = MQISDP_OK;
pMspPp = (PUB_PARMS*)ipcBuffer;
*msgLength = 0;
dLength = mspCharTrim(
' '
, pMspPp->topicLength, pMspPp->topic );
rLength += dLength + 2;
if
( (pMspPp->options & MQISDP_QOS_1) || (pMspPp->options & MQISDP_QOS_2) ) {
rLength += 2;
}
if
( pHconn->apiReturnCode == MQISDP_OK ) {
rLength += pMspPp->dataLength;
}
MSP_CALC_FHEADER_LENGTH( rLength, fHLength );
totalLength = fHLength + rLength;
if
( pHconn->outQ.numBytesQueued + totalLength +
sizeof
(IPQ) > MSP_DEFAULT_MAX_OUTQ_SZ ) {
pHconn->apiReturnCode = MQISDP_Q_FULL;
}
if
( fHLength == -1 ) {
pHconn->apiReturnCode = MQISDP_DATA_TOO_BIG;
}
if
( pHconn->apiReturnCode == MQISDP_OK ) {
pubMsg = (
char
*)mspMalloc( &(pHconn->comParms), (
size_t
)totalLength );
if
( pubMsg == NULL ) {
pHconn->apiReturnCode = MQISDP_OUT_OF_MEMORY;
return
NULL;
}
tmpPtr = pubMsg;
*tmpPtr = 0x00 | (
char
)MSP_PUBLISH;
if
( pMspPp->options & MQISDP_QOS_2 ) {
*tmpPtr |= (
char
)MSPF_QOS_2;
}
else
if
( pMspPp->options & MQISDP_QOS_1 ) {
*tmpPtr |= (
char
)MSPF_QOS_1;
}
if
( pMspPp->options & MQISDP_RETAIN ) {
*tmpPtr |= (
char
)MSPF_RETAIN;
}
tmpPtr++;
mspEncodeFHeaderLength( rLength, tmpPtr );
tmpPtr = pubMsg + fHLength;
mspUTFEncodeString( (u_short)dLength, pMspPp->topic, tmpPtr );
tmpPtr += dLength + 2;
if
( (*pubMsg & MSP_FH_GET_QOS) != 0x00 ) {
u_short msgId = htons( pHconn->nextMsgId );
memcpy
( tmpPtr, &msgId,
sizeof
(u_short) );
mspLog( LOGSCADA, &(pHconn->comParms),
"PUBLISH sent,topic:<%.*s>,QoS:%d,msgid:%d\n"
,
dLength, pMspPp->topic, (*pubMsg & MSP_FH_GET_QOS)>>1 , pHconn->nextMsgId );
tmpPtr +=
sizeof
(u_short);
}
else
{
mspLog( LOGSCADA, &(pHconn->comParms),
"PUBLISH sent,topic:<%.*s>,QoS:%d\n"
,
dLength, pMspPp->topic, (*pubMsg & MSP_FH_GET_QOS)>>1 );
}
memcpy
( tmpPtr, pMspPp->data, pMspPp->dataLength );
*msgLength = totalLength;
}
return
pubMsg;
}
int
mspReceiveScadaMessage( HCONNCB *pHconn,
long
bytesRead,
char
*pReadBuffer ) {
long
rlBytes = 0;
int
rLength = 0;
int
l = 0;
int
rc = 0;
RPQ *pRpqEntry;
IPQ *pIpqEntry;
u_short wmqttMsgId;
switch
( pReadBuffer[0] & MSP_GET_MSG_TYPE ) {
case
MSP_PUBLISH:
if
( pReadBuffer[0] & MSPF_QOS_2 ) {
pReadBuffer[bytesRead-1] = 0x00;
}
else
{
pReadBuffer[bytesRead-1] = MQISDP_RELEASED;
}
pRpqEntry = mspStorePublication( pHconn, bytesRead, pReadBuffer, &wmqttMsgId );
if
( pRpqEntry == NULL ) {
mspLog( LOGSCADA, &(pHconn->comParms),
"PUBLISH received, unable to store\n"
);
}
else
{
if
( !(pRpqEntry->options & MQISDP_QOS_0) && (pHconn->persistFuncs != NULL) ) {
MSP_SET_NEXT_RCVID( pHconn->nextRcvId );
pRpqEntry->rcvId = pHconn->nextRcvId;
rc = pHconn->persistFuncs->addReceivedMessage( pHconn->persistFuncs->pUserData,
pRpqEntry->rcvId, bytesRead, pReadBuffer );
}
if
( rc != 0 ) {
mspDelFromHash( pHconn, pHconn->inQ.rpHash, wmqttMsgId );
mspDeleteRPMFromList( pHconn, pRpqEntry );
}
else
{
mspLog( LOGSCADA, &(pHconn->comParms),
"PUBLISH received,topic:<%.*s>,QoS:%d,msgid:%d\n"
,
pRpqEntry->topicLength, pRpqEntry->buffer, (pRpqEntry->options)/8,
wmqttMsgId );
mspSendPublishResponse( pHconn, pRpqEntry, wmqttMsgId );
}
}
break
;
case
MSP_PUBREC:
if
( mspDecodeFHeaderLength( bytesRead - 1, &rlBytes, &rLength, pReadBuffer + 1 ) != -1
&& rlBytes + rLength <= bytesRead - 1 ) {
memcpy
( &wmqttMsgId, pReadBuffer + 1 + rlBytes,
sizeof
(u_short) );
wmqttMsgId = ntohs( wmqttMsgId );
mspLog( LOGSCADA, &(pHconn->comParms),
"PUBREC received\n"
);
mspSendPubReceivedResponse( pHconn, wmqttMsgId );
}
break
;
case
MSP_PUBREL:
if
( mspDecodeFHeaderLength( bytesRead - 1, &rlBytes, &rLength, pReadBuffer + 1 ) != -1
&& rlBytes + rLength <= bytesRead - 1 ) {
memcpy
( &wmqttMsgId, pReadBuffer + 1 + rlBytes,
sizeof
(u_short) );
wmqttMsgId = ntohs( wmqttMsgId );
pRpqEntry = (RPQ*)mspReadFromHash( pHconn->inQ.rpHash, wmqttMsgId );
if
( pRpqEntry != NULL ) {
if
( pHconn->persistFuncs != NULL ) {
rc = pHconn->persistFuncs->updReceivedMessage( pHconn->persistFuncs->pUserData, pRpqEntry->rcvId );
}
if
( rc == 0 ) {
pRpqEntry->readyToPublish = MQISDP_RELEASED;
if
( pHconn->inQ.rtpEntries == 0 ) {
mspSignalSemaphore( pHconn->ipcCb.receiveSemaphore );
}
pHconn->inQ.rtpEntries++;
mspDelFromHash( pHconn, pHconn->inQ.rpHash, wmqttMsgId );
mspLog( LOGSCADA, &(pHconn->comParms),
"PUBREL received,msgId:%ld. Releasing publication.\n"
, wmqttMsgId );
}
}
else
{
mspLog( LOGSCADA, &(pHconn->comParms),
"PUBREL received,msgId:%ld. Already released.\n"
, wmqttMsgId );
}
if
( rc == 0 ) {
mspSendPubReleaseResponse( pHconn, wmqttMsgId );
}
}
break
;
case
MSP_PUBACK:
if
( l == 0 ) {
mspLog( LOGSCADA, &(pHconn->comParms),
"PUBACK received\n"
);
l=1;
}
case
MSP_PUBCOMP:
if
( l == 0 ) {
mspLog( LOGSCADA, &(pHconn->comParms),
"PUBCOMP received\n"
);
l=1;
}
case
MSP_SUBACK:
if
( l == 0 ) {
mspLog( LOGSCADA, &(pHconn->comParms),
"SUBACK received\n"
);
l=1;
}
case
MSP_UNSUBACK:
if
( l == 0 ) {
mspLog( LOGSCADA, &(pHconn->comParms),
"UNSUBACK received\n"
);
l=1;
}
if
( mspDecodeFHeaderLength( bytesRead - 1, &rlBytes, &rLength, pReadBuffer + 1 ) != -1
&& rlBytes + rLength <= bytesRead - 1 ) {
memcpy
( &wmqttMsgId, pReadBuffer + 1 + rlBytes,
sizeof
(u_short) );
wmqttMsgId = ntohs( wmqttMsgId );
pIpqEntry = mspReadFromHash( pHconn->outQ.ipHash, wmqttMsgId );
mspDelFromHash( pHconn, pHconn->outQ.ipHash, wmqttMsgId );
mspDeleteIPMFromList( pHconn, pIpqEntry );
if
( pHconn->persistFuncs != NULL ) {
pHconn->persistFuncs->delSentMessage( pHconn->persistFuncs->pUserData, wmqttMsgId );
}
}
break
;
case
MSP_PINGREQ:
mspLog( LOGSCADA, &(pHconn->comParms),
"PINGREQ received\n"
);
mspSendPingResponse( pHconn );
break
;
case
MSP_PINGRESP:
mspLog( LOGSCADA, &(pHconn->comParms),
"PINGRESP received\n"
);
break
;
case
MSP_CONNACK:
if
( pReadBuffer[3] == MSP_CONN_ACCEPTED ) {
pHconn->connState = MQISDP_CONNECTED;
pHconn->reconnect.timeForNextConnect = 0;
if
( pHconn->reconnect.connRetries > 0 ) {
pHconn->reconnect.connRetries = 0;
}
}
else
{
pHconn->connState = MQISDP_DISCONNECTED;
mspTCPDisconnect( &(pHconn->tcpParms.sockfd) );
pHconn->reconnect.connRetries = pHconn->retryCount + 1;
switch
( pReadBuffer[4] ) {
case
MSP_CONN_REFUSED_VERSION:
pHconn->tcpParms.lastError = MQISDP_PROTOCOL_VERSION_ERROR | MSP_CONN_ERROR;
break
;
case
MSP_CONN_REFUSED_ID:
pHconn->tcpParms.lastError = MQISDP_CLIENT_ID_ERROR | MSP_CONN_ERROR;
break
;
case
MSP_CONN_REFUSED_BROKER:
pHconn->tcpParms.lastError = MQISDP_BROKER_UNAVAILABLE | MSP_CONN_ERROR;
break
;
}
}
mspLog( LOGSCADA, &(pHconn->comParms),
"CONNACK\n"
);
break
;
default
:
mspLog( LOGERROR, &(pHconn->comParms),
"Unrecognised MQIsdp fixed header:\n"
);
mspLogHex( LOGERROR, &(pHconn->comParms), 1, pReadBuffer );
}
return
0;
}
int
mspEncodeFHeaderLength(
int
l,
char
*ptr ) {
char
d;
do
{
d = l % 128;
l = l / 128;
if
( l > 0 ) {
d = d | 0x80;
}
*ptr = d;
ptr++;
}
while
( l > 0 );
return
0;
}
int
mspDecodeFHeaderLength(
long
numBytes,
long
*rlLength,
int
*l,
char
*ptr ) {
char
d;
int
multiplier = 1;
*l = 0;
*rlLength = 0;
do
{
(*rlLength)++;
d = *ptr;
*l += (d & 127) * multiplier;
multiplier *= 128;
ptr++;
}
while
( ((d & 128) != 0) && (*rlLength <= numBytes) );
if
( (d & 128) != 0 ) {
*rlLength = 0;
*l = 0;
return
-1;
}
return
0;
}
int
mspUTFEncodeString( u_short bufLen,
char
*buf,
char
*outBuf ) {
u_short netShort;
netShort = htons( bufLen );
memcpy
( outBuf, &netShort,
sizeof
(u_short) );
memcpy
( outBuf + 2, buf, (
size_t
)bufLen );
return
0;
}
int
mspUTFDecodeString( u_short *bufLen,
char
*bufToDecode,
char
**ppBuffer ) {
memcpy
( bufLen, bufToDecode,
sizeof
(u_short) );
*bufLen = ntohs( *bufLen );
*ppBuffer = bufToDecode + 2;
return
0;
}
int
mspSendScadaMessage( HCONNCB *pHconn,
long
msgLen,
char
*msgData,
short
msgId,
int
retryFlag,
int
initFlag ) {
long
storeMsg = 0;
int
tcpRc = 1;
int
rc = 0;
IPQ *ipqEntry = NULL;
IPQ *pTmpIpqEntry = NULL;
switch
( *msgData & MSP_FH_GET_QOS ) {
case
MSPF_QOS_2:
case
MSPF_QOS_1:
storeMsg = 1;
break
;
default
:
break
;
}
if
( storeMsg == 1 ) {
if
( retryFlag == 0 ) {
if
( pHconn->outQ.numBytesQueued + msgLen +
sizeof
(IPQ) > MSP_DEFAULT_MAX_OUTQ_SZ ) {
rc = MQISDP_OUT_OF_MEMORY;
}
else
{
ipqEntry = mspAddIPMToList( pHconn, msgLen, msgData, msgId );
if
( ipqEntry == NULL ) {
rc = MQISDP_OUT_OF_MEMORY;
}
else
{
if
( initFlag == 0 ) {
if
( pHconn->persistFuncs != NULL ) {
if
( (*msgData & MSP_GET_MSG_TYPE) == MSP_PUBREL ) {
rc = pHconn->persistFuncs->updSentMessage( pHconn->persistFuncs->pUserData,
msgId, msgLen, msgData );
}
else
{
rc = pHconn->persistFuncs->addSentMessage( pHconn->persistFuncs->pUserData,
msgId, msgLen, msgData );
}
}
}
else
{
ipqEntry->msgStatus = MQISDP_RETRYING;
}
if
( rc != 0 ) {
rc = MQISDP_PERSISTENCE_FAILED;
ipqEntry->msgData = NULL;
mspDeleteIPMFromList( pHconn, ipqEntry );
ipqEntry = NULL;
}
else
{
pTmpIpqEntry = mspReadFromHash( pHconn->outQ.ipHash, msgId );
if
( pTmpIpqEntry != NULL ) {
mspDeleteIPMFromList( pHconn, pTmpIpqEntry );
}
mspAddToHash( pHconn, pHconn->outQ.ipHash, msgId, ipqEntry );
}
}
}
}
else
{
*msgData |= MSPF_DUPLICATE;
}
}
if
( rc == 0 ) {
if
( pHconn->tcpParms.sockfd != MSP_INVALID_SOCKET ) {
tcpRc = mspTCPWrite( pHconn, msgLen, msgData );
if
( tcpRc == -1 ) {
rc = 1;
pHconn->connState = MQISDP_DISCONNECTED;
mspTCPDisconnect( &(pHconn->tcpParms.sockfd) );
}
else
{
pHconn->timeForNextPoll =
time
(NULL) + pHconn->keepAliveTime;
}
}
else
{
rc = 1;
}
}
if
( ((storeMsg != 1) || (ipqEntry == NULL)) && !retryFlag ) {
mspFree( &(pHconn->comParms), msgData, msgLen );
}
return
rc;
}
RPQ* mspStorePublication( HCONNCB *pHconn,
long
bytesRead,
char
*pReadBuffer, u_short *wmqttMsgId ) {
RPQ *pRpqEntry = NULL;
long
rlBytes;
int
rLength;
char
*tmpPtr;
char
*topicPtr = NULL;
u_short topicLength = 0;
pRpqEntry = (RPQ*)mspMalloc( &(pHconn->comParms),
sizeof
(RPQ) );
if
( pRpqEntry == NULL ) {
return
NULL;
}
if
( mspDecodeFHeaderLength( bytesRead - 1, &rlBytes, &rLength, pReadBuffer + 1 ) != -1
&& rlBytes + rLength <= bytesRead - 1
) {
pRpqEntry->options = 0;
*wmqttMsgId = 0;
pRpqEntry->topicLength = 0;
if
( pReadBuffer[0] & MSPF_RETAIN ) {
pRpqEntry->options |= MQISDP_RETAIN;
}
if
( pReadBuffer[0] & MSPF_DUPLICATE ) {
pRpqEntry->options |= MQISDP_DUPLICATE;
}
if
( pReadBuffer[0] & MSPF_QOS_2 ) {
pRpqEntry->options |= MQISDP_QOS_2;
}
else
if
( pReadBuffer[0] & MSPF_QOS_1 ) {
pRpqEntry->options |= MQISDP_QOS_1;
}
else
{
pRpqEntry->options |= MQISDP_QOS_0;
}
pRpqEntry->readyToPublish = pReadBuffer[bytesRead-1];
tmpPtr = pReadBuffer + rlBytes + 1;
mspUTFDecodeString( &topicLength, tmpPtr, &topicPtr );
pRpqEntry->topicLength = topicLength;
tmpPtr += pRpqEntry->topicLength + 2;
if
( (pReadBuffer[0] & MSPF_QOS_2) || (pReadBuffer[0] & MSPF_QOS_1) ) {
if
(tmpPtr +
sizeof
(u_short) > pReadBuffer + bytesRead) {
mspFree( &(pHconn->comParms), pRpqEntry,
sizeof
(RPQ) );
return
NULL;
}
memcpy
( wmqttMsgId, tmpPtr,
sizeof
(u_short) );
(*wmqttMsgId) = ntohs( *wmqttMsgId );
pRpqEntry->bufferLength = rLength - 4;
tmpPtr += 2;
}
else
{
pRpqEntry->bufferLength = rLength - 2;
}
if
(topicPtr + pRpqEntry->topicLength > pReadBuffer + bytesRead
|| pRpqEntry->bufferLength - pRpqEntry->topicLength < 0
|| tmpPtr + (pRpqEntry->bufferLength - pRpqEntry->topicLength) > pReadBuffer + bytesRead) {
mspFree( &(pHconn->comParms), pRpqEntry,
sizeof
(RPQ) );
return
NULL;
}
if
( pRpqEntry->bufferLength +
sizeof
(RPQ) > MSP_DEFAULT_MAX_INQ_SZ ) {
mspFree( &(pHconn->comParms), pRpqEntry,
sizeof
(RPQ) );
return
NULL;
}
pRpqEntry->buffer = (
char
*)mspMalloc( &(pHconn->comParms), pRpqEntry->bufferLength );
if
( pRpqEntry->buffer != NULL ) {
memcpy
( pRpqEntry->buffer, topicPtr, pRpqEntry->topicLength );
memcpy
( pRpqEntry->buffer + pRpqEntry->topicLength, tmpPtr,
pRpqEntry->bufferLength - pRpqEntry->topicLength );
if
( (pReadBuffer[0] & MSPF_QOS_2) && (pRpqEntry->readyToPublish == 0) ) {
switch
( pReadBuffer[0] & MSPF_DUPLICATE ) {
case
MSPF_DUPLICATE:
if
( mspGetHashEntry( pHconn->inQ.rpHash, *wmqttMsgId ) != NULL ) {
mspFree( &(pHconn->comParms), pRpqEntry->buffer, pRpqEntry->bufferLength );
mspFree( &(pHconn->comParms), pRpqEntry,
sizeof
(RPQ) );
pRpqEntry = NULL;
break
;
}
default
:
mspAddRPMToList( pHconn, pRpqEntry );
if
( mspAddToHash( pHconn, pHconn->inQ.rpHash, *wmqttMsgId, pRpqEntry ) != 0 ) {
mspDeleteRPMFromList( pHconn, pRpqEntry);
mspFree( &(pHconn->comParms), pRpqEntry->buffer, pRpqEntry->bufferLength );
mspFree( &(pHconn->comParms), pRpqEntry,
sizeof
(RPQ) );
pRpqEntry = NULL;
}
break
;
}
}
else
{
mspAddRPMToList( pHconn, pRpqEntry );
}
if
( pRpqEntry != NULL ) {
mspLog( LOGSCADA, &(pHconn->comParms),
"Storing publication,msgId:%ld. Releasing:%s\n"
,
*wmqttMsgId, (pRpqEntry->readyToPublish==MQISDP_RELEASED)?
"TRUE"
:
"FALSE"
);
}
}
else
{
mspFree( &(pHconn->comParms), pRpqEntry,
sizeof
(RPQ) );
pRpqEntry = NULL;
}
}
else
{
mspFree( &(pHconn->comParms), pRpqEntry,
sizeof
(RPQ) );
pRpqEntry = NULL;
}
return
pRpqEntry;
}
int
mspSendPublishResponse( HCONNCB *pHconn, RPQ* pRpqEntry, u_short wmqttMsgId ) {
char
*pScadaMsg;
u_short netMsgId;
int
rc = 1;
if
( pRpqEntry->options & MQISDP_QOS_2 ) {
pScadaMsg = mspMalloc( &(pHconn->comParms), 4 );
if
( pScadaMsg != NULL ) {
pScadaMsg[0] = 0x00 | MSP_PUBREC;
pScadaMsg[1] = 0x02;
netMsgId = htons( wmqttMsgId );
memcpy
( pScadaMsg+2, &netMsgId,
sizeof
(u_short) );
mspLog( LOGSCADA, &(pHconn->comParms),
"PUBREC sent\n"
);
if
( mspSendScadaMessage( pHconn , 4, pScadaMsg, wmqttMsgId, 0, 0 ) <= 1 ) {
rc = 0;
}
}
}
else
if
( pRpqEntry->options & MQISDP_QOS_1 ) {
pScadaMsg = mspMalloc( &(pHconn->comParms), 4 );
if
( pScadaMsg != NULL ) {
pScadaMsg[0] = 0x00 | MSP_PUBACK;
pScadaMsg[1] = 0x02;
netMsgId = htons( wmqttMsgId );
memcpy
( pScadaMsg+2, &netMsgId,
sizeof
(u_short) );
mspLog( LOGSCADA, &(pHconn->comParms),
"PUBACK sent\n"
);
if
( mspSendScadaMessage( pHconn , 4, pScadaMsg, wmqttMsgId, 0, 0 ) <= 1 ) {
rc = 0;
}
}
}
else
{
rc = 0;
}
return
rc;
}
int
mspSendPingResponse( HCONNCB *pHconn ) {
char
*pScadaMsg;
int
rc = 1;
pScadaMsg = mspMalloc( &(pHconn->comParms), 2 );
if
( pScadaMsg != NULL ) {
pScadaMsg[0] = 0x00 | (
char
)MSP_PINGRESP;
pScadaMsg[1] = 0x00;
mspLog( LOGSCADA, &(pHconn->comParms),
"PINGRESP sent\n"
);
if
( mspSendScadaMessage( pHconn , 2, pScadaMsg, 0, 0, 0 ) <= 1 ) {
rc = 0;
}
}
return
rc;
}
int
mspSendPingRequest( HCONNCB *pHconn ) {
char
*pScadaMsg;
int
rc = 1;
pScadaMsg = mspMalloc( &(pHconn->comParms), 2 );
if
( pScadaMsg != NULL ) {
pScadaMsg[0] = 0x00 | (
char
)MSP_PINGREQ;
pScadaMsg[1] = 0x00;
mspLog( LOGSCADA, &(pHconn->comParms),
"PINGREQ sent\n"
);
if
( mspSendScadaMessage( pHconn , 2, pScadaMsg, 0, 0, 0 ) <= 1 ) {
rc = 0;
}
}
return
rc;
}
int
mspSendPubReceivedResponse( HCONNCB *pHconn, u_short msgId ) {
char
*pScadaMsg;
int
rc = 1;
pScadaMsg = mspMalloc( &(pHconn->comParms), 4 );
if
( pScadaMsg != NULL ) {
pScadaMsg[0] = 0x00 | MSP_PUBREL;
pScadaMsg[0] |= (
char
)MSPF_QOS_1;
pScadaMsg[1] = 0x02;
*(u_short*)(pScadaMsg+2) = htons( msgId );
if
( mspSendScadaMessage( pHconn , 4, pScadaMsg, msgId, 0, 0 ) <= 1 ) {
mspLog( LOGSCADA, &(pHconn->comParms),
"PUBREL sent, msgid:%d\n"
, msgId );
rc = 0;
}
}
return
rc;
}
int
mspSendPubReleaseResponse( HCONNCB *pHconn, u_short msgId ) {
char
*pScadaMsg;
int
rc = 1;
pScadaMsg = mspMalloc( &(pHconn->comParms), 4 );
if
( pScadaMsg != NULL ) {
pScadaMsg[0] = 0x00 | MSP_PUBCOMP;
pScadaMsg[1] = 0x02;
*(u_short*)(pScadaMsg+2) = htons( msgId );
mspLog( LOGSCADA, &(pHconn->comParms),
"PUBCOMP sent, msgid:%d\n"
, msgId );
if
( mspSendScadaMessage( pHconn , 4, pScadaMsg, msgId, 0, 0 ) <= 1 ) {
rc = 0;
}
}
return
rc;
}
int
mspMQIsdpReconnect( HCONNCB *pHconn ) {
int
rc = 1;
char
*connMsg;
if
( pHconn->tcpParms.sockfd != MSP_INVALID_SOCKET ) {
mspTCPDisconnect( &(pHconn->tcpParms.sockfd) );
}
rc = mspTCPInitialise( pHconn );
if
( rc == 0 ) {
pHconn->connState = MQISDP_CONNECTING;
connMsg = (
char
*)mspMalloc( &(pHconn->comParms), pHconn->reconnect.connMsgSz );
if
( connMsg != NULL ) {
memcpy
( connMsg, pHconn->reconnect.connectMsg, pHconn->reconnect.connMsgSz );
mspSendScadaMessage( pHconn, pHconn->reconnect.connMsgSz, connMsg, 0, 0, 0 );
}
}
return
rc;
}