src/sqllog/SqlLogConnection.cxx

Go to the documentation of this file.
00001 /***************************************************************************
00002  *   Copyright (C) 2007 by Prabakaran Thirumalai   *
00003  *   praba_tuty@yahoo.com   *
00004  *                                                                         *
00005  *   This program is free software; you can redistribute it and/or modify  *
00006  *   it under the terms of the GNU General Public License as published by  *
00007  *   the Free Software Foundation; either version 2 of the License, or     *
00008  *   (at your option) any later version.                                   *
00009  *                                                                         *
00010  *   This program is distributed in the hope that it will be useful,       *
00011  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00012  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         *
00013  *   GNU General Public License for more details.                          *
00014  *                                                                         *
00015  *   You should have received a copy of the GNU General Public License     *
00016  *   along with this program; if not, write to the                         *
00017  *   Free Software Foundation, Inc.,                                       *
00018  *   59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.             *
00019  ***************************************************************************/
00020 #include <SqlLogConnection.h>
00021 #include <CSql.h>
00022 #include <Network.h>
00023 
00024 UniqueID SqlLogConnection::txnUID;
00025 List SqlLogConnection::cacheList;
00026 
00027 DbRetVal SqlLogConnection::addPacket(BasePacket* pkt)
00028 {
00029     logStore.append(pkt);
00030     return OK;
00031 }
00032 DbRetVal SqlLogConnection::addPreparePacket(PacketPrepare* pkt)
00033 {
00034     curPrepareStore.append(pkt);
00035     return OK;
00036 }
00037 
00038 DbRetVal SqlLogConnection::removePreparePacket(int stmtid)
00039 {
00040     ListIterator iter = prepareStore.getIterator();
00041     PacketPrepare *pkt = NULL, *dpkt=NULL;
00042     while (iter.hasElement())
00043     {
00044         pkt = (PacketPrepare*)iter.nextElement();
00045         if (pkt->stmtID == stmtid) dpkt = pkt;
00046     }
00047     if (dpkt == NULL) return OK;
00048     //TEMP:mask below error for now
00049     if (dpkt == NULL)
00050     {
00051         printError(ErrNotFound, "Prepare packet not found in list for %d\n", stmtid);
00052         return ErrNotFound;
00053     }
00054     delete dpkt;
00055     prepareStore.remove(dpkt);
00056     return OK;
00057 }
00058 
00059 DbRetVal SqlLogConnection::connect (char *user, char *pass)
00060 {
00061     DbRetVal rv = OK;
00062     //printf("LOG: connect\n");
00063     if (innerConn) rv = innerConn->connect(user,pass);
00064     if (rv != OK) return rv;
00065     if (!Conf::config.useReplication() && !Conf::config.useCache()) return OK;
00066     if (rv !=OK) { innerConn->disconnect(); return rv; }
00067 
00068     //populate cacheList if not populated by another thread in same process
00069     //TODO::cacheList requires mutex guard
00070     if (0 == cacheList.size()) rv = populateCachedTableList(); 
00071     return rv;
00072     
00073 }
00074 DbRetVal SqlLogConnection::disconnect()
00075 {
00076     DbRetVal rv = OK;
00077     //printf("LOG: disconnect\n");
00078     if (innerConn) rv =innerConn->disconnect();
00079     if (rv != OK) return rv;
00080     if (!Conf::config.useReplication() && !Conf::config.useCache()) return OK;
00081     return rv;
00082 }
00083 DbRetVal SqlLogConnection::beginTrans(IsolationLevel isoLevel, TransSyncMode mode)
00084 {
00085     DbRetVal rv = OK;
00086     if (innerConn) rv =  innerConn->beginTrans(isoLevel);
00087     if (rv != OK) return rv;
00088 
00089     syncMode = mode;
00090     return OK;
00091 }
00092 DbRetVal SqlLogConnection::commit()
00093 {
00094     DbRetVal rv = OK;
00095     //printf("LOG: commit %d\n", syncMode);
00096     //if (innerConn) rv =  innerConn->commit();
00097     if (syncMode == OSYNC) {
00098         if (innerConn) rv = innerConn->commit();
00099         return rv;
00100     }
00101     if (logStore.size() == 0) 
00102     { 
00103         //This means no execution for any non select statements in 
00104         //this transaction
00105         //rollback so that subsequent beginTrans will not report that
00106         //transaction is already started
00107         if (innerConn) {
00108             rv =  innerConn->rollback(); 
00109             //if (rv != OK) return rv;
00110             //rv = innerConn->beginTrans(READ_COMMITTED, syncMode);
00111         }
00112         return rv; 
00113     }
00114     if (syncMode == ASYNC) {
00115     //TODO::put the packet in global log store
00116     /*
00117     PacketCommit *pkt = new PacketCommit();
00118     int tid = txnUID.getID();
00119     pkt->setExecPackets(tid, logStore);
00120     pkt->marshall();
00121     int *p = (int*) pkt->getMarshalledBuffer();
00122     NetworkClient *nwClient= nwTable.getNetworkClient();
00123     if (syncMode == ASYNC) {
00124         rv = nwClient->send(NW_PKT_COMMIT, pkt->getMarshalledBuffer(), 
00125                                           pkt->getBufferSize());    
00126         if (rv !=OK) 
00127         {
00128             printError(ErrOS, "Unable to send SQL Logs to peer site\n");
00129             return ErrOS;
00130         }
00131         rv = nwClient->receive();    
00132         if (rv !=OK) 
00133         {
00134           printError(ErrOS, "Could not get acknowledgement from peer site\n");
00135           return ErrPeerExecFailed;
00136         }
00137         //TODO::remove all sql logs nodes and the list which contains ptr to it
00138         */
00139     }
00140     
00141     ListIterator logStoreIter = logStore.getIterator();
00142     PacketExecute *execPkt = NULL;
00143     while (logStoreIter.hasElement())
00144     {
00145         execPkt = (PacketExecute*)logStoreIter.nextElement();
00146         delete execPkt;
00147     }
00148     logStore.reset();
00149     if (innerConn) rv = innerConn->commit();
00150     return rv;
00151 }
00152 DbRetVal SqlLogConnection::rollback()
00153 {
00154     DbRetVal rv = OK;
00155     //printf("LOG: rollback \n");
00156     if (innerConn) rv =  innerConn->rollback();
00157     if (rv != OK) return rv;
00158     ListIterator logStoreIter = logStore.getIterator();
00159     PacketExecute *execPkt = NULL;
00160     while (logStoreIter.hasElement())
00161     {
00162         execPkt = (PacketExecute*)logStoreIter.nextElement();
00163         delete execPkt;
00164     }
00165     logStore.reset();
00166     return rv;
00167 }
00168 DbRetVal SqlLogConnection::populateCachedTableList()
00169 {
00170     FILE *fp = NULL;
00171     fp = fopen(Conf::config.getTableConfigFile(),"r");
00172     if( fp == NULL ) {
00173         printError(ErrSysInit, "cache.table file does not exist");
00174         return ErrSysInit;
00175     }
00176     char tablename[IDENTIFIER_LENGTH];
00177     int cmode;
00178     CachedTable *node;
00179     while(!feof(fp))
00180     {
00181         fscanf(fp, "%d:%s\n", &cmode, tablename);
00182         node = new CachedTable();
00183         strcpy(node->tableName, tablename);
00184         cacheList.append(node);
00185     }
00186     fclose(fp);
00187     return OK;
00188 }
00189 bool SqlLogConnection::isTableCached(char *tblName)
00190 {
00191     if (NULL == tblName)
00192     {
00193         printError(ErrBadArg, "tblName passed is NULL\n");
00194         return ErrBadArg;
00195     }
00196     ListIterator iter = cacheList.getIterator();
00197     CachedTable *node;
00198     while (iter.hasElement()) {
00199         node = (CachedTable*)iter.nextElement();
00200         if (strcmp(node->tableName, tblName) == 0)
00201         {
00202            return true;
00203         }
00204     }
00205     return false;
00206 }
00207 
00208 
00209 DbRetVal SqlLogConnection::sendAndReceive(NetworkPacketType type, char *packet, int length)
00210 {
00211     return OK;
00212     NetworkClient* nwClient = nwTable.getNetworkClient();
00213     DbRetVal rv = OK, retRV=OK;
00214     printf("isCacheClient %d\n", nwClient->isCacheClient());
00215     printf("isConnected %d\n", nwClient->isConnected());
00216 /*
00217     if (!nwClient->isConnected()) {
00218         if (nwClient->isCacheClient()) return ErrOS; 
00219         //TODO::put this packet in send buffer.
00220         return OK;
00221     }
00222 */
00223     rv = nwClient->send(type, packet, length);
00224     if (rv != OK) 
00225     {
00226        printf("Unable to send pkt to peer with nwid %d\n", nwClient->getNetworkID());
00227        //TODO:: put this packet in resend buffer, so that it will sent later by another thread for repl mode
00228         nwClient->setConnectFlag(false);
00229         if (nwClient->isCacheClient()) return ErrOS; else return OK;
00230     }
00231     rv = nwClient->receive();
00232     if (rv != OK)
00233     {
00234         printf("Unable to receive ack pkt from peer with nwid %d\n", nwClient->getNetworkID());
00235         nwClient->setConnectFlag(false);
00236         if (nwClient->isCacheClient()) return ErrOS;
00237         //TODO:: put this packet to resend buffer so that it can be sent later
00238         //and call continue;
00239     }
00240     return OK;
00241 }

Generated on Mon Jun 9 22:37:15 2008 for csql by  doxygen 1.4.7