00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include <SqlNetworkHandler.h>
00021 #include <AbsSqlConnection.h>
00022 #include <SqlConnection.h>
00023 #include <SqlOdbcConnection.h>
00024 #include <AbsSqlStatement.h>
00025 #include <SqlStatement.h>
00026 #include <SqlOdbcStatement.h>
00027
00028 #include <SqlLogStatement.h>
00029
00030 List SqlNetworkHandler::stmtList;
00031 AbsSqlConnection* SqlNetworkHandler::conn;
00032 SqlApiImplType SqlNetworkHandler::type;
00033
00034 int SqlNetworkHandler::process(PacketHeader &header, char *buffer)
00035 {
00036 switch(header.packetType)
00037 {
00038 case NW_PKT_PREPARE:
00039 return processPrepare(header, buffer);
00040 break;
00041 case NW_PKT_COMMIT:
00042 return processCommit(header, buffer);
00043 break;
00044 }
00045 }
00046
00047 int SqlNetworkHandler::processCommit(PacketHeader &header, char *buffer)
00048 {
00049 printDebug(DM_Network, "Processing COMMIT");
00050 PacketCommit *pkt = new PacketCommit();
00051 pkt->setBuffer(buffer);
00052 pkt->setBufferSize(header.packetLength);
00053 pkt->unmarshall();
00054 List pktList;
00055 pkt->getExecPacketList(stmtList, pktList);
00056 DbRetVal rv = applyExecPackets(stmtList, pktList);
00057 int response = 1;
00058 if (rv != OK)
00059 {
00060 printError(ErrSysFatal, "Unable to apply the exec packets\n");
00061 response =0;
00062 }
00063 return response;
00064
00065 }
00066 int SqlNetworkHandler::processFree(PacketHeader &header, char *buffer)
00067 {
00068 PacketFree *pkt = new PacketFree();
00069 pkt->setBuffer(buffer);
00070 pkt->setBufferSize(header.packetLength);
00071 pkt->unmarshall();
00072
00073 int response =1;
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092 return response;
00093 }
00094 int SqlNetworkHandler::processPrepare(PacketHeader &header, char *buffer)
00095 {
00096 PacketPrepare *pkt = new PacketPrepare();
00097 pkt->setBuffer(buffer);
00098 pkt->setBufferSize(header.packetLength);
00099 pkt->unmarshall();
00100 printDebug(DM_Network, "PREPARE %d %s\n", pkt->stmtID, pkt->stmtString);
00101
00102
00103 int response =1;
00104
00105 AbsSqlStatement *sqlstmt = SqlFactory::createStatement(type);
00106 sqlstmt->setConnection(conn);
00107 NetworkStmt *nwStmt = new NetworkStmt();
00108 printDebug(DM_Network, "Statement string %s\n", pkt->stmtString);
00109 nwStmt->srcNetworkID = header.srcNetworkID;
00110 nwStmt->stmtID = pkt->stmtID;
00111 nwStmt->stmt = sqlstmt;
00112 DbRetVal rv = sqlstmt->prepare(pkt->stmtString);
00113 if (rv != OK)
00114 {
00115 printError(ErrSysInit, "statement prepare failed\n");
00116 response = 0;
00117 return response;
00118 }
00119 BindSqlField *bindField = NULL;
00120
00121 for (int i = 0; i < pkt->noParams; i++)
00122 {
00123 bindField = new BindSqlField();
00124 bindField->type = (DataType) pkt->type[i];
00125 bindField->length = pkt->length[i];
00126 bindField->value = AllDataType::alloc(bindField->type,
00127 bindField->length);
00128 nwStmt->paramList.append(bindField);
00129 }
00130 stmtList.append(nwStmt);
00131 return response;
00132
00133 }
00134
00135 DbRetVal SqlNetworkHandler::applyExecPackets(List sList, List pList)
00136 {
00137 ListIterator stmtIter = sList.getIterator();
00138 NetworkStmt *nwstmt;
00139 DbRetVal rv = conn->beginTrans();
00140 if (rv != OK) return rv;
00141 ListIterator pktIter = pList.getIterator();
00142 PacketExecute *pkt;
00143 int i = 0;
00144 BindSqlField *bindField;
00145 while (pktIter.hasElement())
00146 {
00147 pkt = (PacketExecute*) pktIter.nextElement();
00148 stmtIter.reset();
00149 bool found = false;
00150 while (stmtIter.hasElement())
00151 {
00152 nwstmt = (NetworkStmt*) stmtIter.nextElement();
00153 if (nwstmt->stmtID == pkt->stmtID) {found = true ; break;}
00154 }
00155 if (!found) {
00156 printf("stmt not found in list. Negleting unreplicated table...\n");
00157 continue;
00158 }
00159 ListIterator paramIter = nwstmt->paramList.getIterator();
00160 i = 0;
00161 while (paramIter.hasElement()) {
00162 bindField = (BindSqlField*) paramIter.nextElement();
00163 setParamValues(nwstmt->stmt, i+1, bindField->type, bindField->length, pkt->paramValues[i]);
00164 i++;
00165 }
00166 int rows= 0;
00167 DbRetVal rv = nwstmt->stmt->execute(rows);
00168 if (rv != OK )
00169 {
00170 printError(ErrSysFatal, "sql execute failed with rv %d\n", rv);
00171
00172 SqlNetworkHandler::conn->rollback();
00173 printError(ErrPeerExecFailed, "Transaction Rolledback\n");
00174 return ErrPeerExecFailed;
00175 }
00176 }
00177 SqlNetworkHandler::conn->commit();
00178 return OK;
00179 }
00180
00181 void SqlNetworkHandler::setParamValues(AbsSqlStatement *stmt, int parampos, DataType type,
00182 int length, char *value)
00183 {
00184 switch(type)
00185 {
00186 case typeInt:
00187 stmt->setIntParam(parampos, *(int*)value);
00188 break;
00189 case typeLong:
00190 stmt->setLongParam(parampos, *(long*)value);
00191 break;
00192 case typeLongLong:
00193 stmt->setLongLongParam(parampos, *(long long*)value);
00194 break;
00195 case typeShort:
00196 stmt->setShortParam(parampos, *(short*)value);
00197 break;
00198 case typeByteInt:
00199 stmt->setByteIntParam(parampos, *(char*)value);
00200 break;
00201 case typeDouble:
00202 stmt->setDoubleParam(parampos, *(double*)value);
00203 break;
00204 case typeFloat:
00205 stmt->setFloatParam(parampos, *(float*)value);
00206 break;
00207 case typeDate:
00208 stmt->setDateParam(parampos, *(Date*)value);
00209 break;
00210 case typeTime:
00211 stmt->setTimeParam(parampos, *(Time*)value);
00212 break;
00213 case typeTimeStamp:
00214 stmt->setTimeStampParam(parampos, *(TimeStamp*)value);
00215 break;
00216 case typeString:
00217 {
00218 char *d =(char*)value;
00219 d[length-1] = '\0';
00220 stmt->setStringParam(parampos, (char*)value);
00221 break;
00222 }
00223 }
00224 return;
00225 }
00226