src/tools/csqlcacheserver.cxx

Go to the documentation of this file.
00001 /***************************************************************************
00002  *   Copyright (C) 2007 by www.databasecache.com                           *
00003  *   Contact: praba_tuty@databasecache.com                                 *
00004  *                                                                         *
00005  *   This program is free software; you can redistribute it and/or modify  *
00006  *   it under the terms of the GNU General Public License as published by  *
00007  *   the Free Software Foundation; either version 2 of the License, or     *
00008  *   (at your option) any later version.                                   *
00009  *                                                                         *
00010  *   This program is distributed in the hope that it will be useful,       *
00011  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00012  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         *
00013  *   GNU General Public License for more details.                          *
00014  *                                                                         *
00015   ***************************************************************************/
00016 #include <AbsSqlConnection.h>
00017 #include <AbsSqlStatement.h>
00018 #include <SqlOdbcStatement.h>
00019 #include <SqlFactory.h>
00020 #include <CSql.h>
00021 
00022 int insert(Table *table, int pkid);
00023 int remove(Table *table, int pkid);
00024 int getRecordsFromTargetDb(int mode);
00025 
00026 int srvStop =0;
00027 static void sigTermHandler(int sig)
00028 {
00029     printf("Received signal %d\nStopping the server\n", sig);
00030     srvStop = 1;
00031 }
00032 
00033 void printUsage()
00034 {
00035    printf("Usage: csqlcacheserver \n");
00036    printf("Description: Start the csql caching server.\n");
00037    return;
00038 }
00039 AbsSqlConnection *targetconn;
00040 Connection conn;
00041 int main(int argc, char **argv)
00042 {
00043     int c = 0, opt = 0;
00044     while ((c = getopt(argc, argv, "?")) != EOF) 
00045     {
00046         switch (c)
00047         {
00048             case '?' : { opt = 10; break; } //print help 
00049             default: opt=10; 
00050 
00051         }
00052     }//while options
00053 
00054     if (opt == 10) {
00055         printUsage();
00056         return 0;
00057     }
00058 
00059     os::signal(SIGINT, sigTermHandler);
00060     os::signal(SIGTERM, sigTermHandler);
00061 
00062     targetconn = SqlFactory::createConnection(CSqlAdapter);
00063     DbRetVal rv = targetconn->connect("root", "manager");
00064     if (rv != OK) return 1;
00065     rv = conn.open("root", "manager");
00066     if (rv != OK) return 1;
00067     if (!Conf::config.useCache())
00068     {
00069         printf("Cache is set to OFF in csql.conf file\n");
00070         return 1;
00071     } 
00072     AbsSqlStatement *stmt = SqlFactory::createStatement(CSqlAdapter);
00073     stmt->setConnection(targetconn);
00074     rv = stmt->prepare("create table csql_log_int(tablename char(64), pkid int, operation int, id int not null unique auto_increment)engine='innodb';");
00075     targetconn->beginTrans();
00076     int rows=0;
00077     stmt->execute(rows);
00078     targetconn->commit();
00079     stmt->free();
00080     delete stmt;
00081 
00082     printf("Cache server started\n");
00083     int ret = 0;
00084     struct timeval timeout, tval;
00085     timeout.tv_sec = Conf::config.getCacheWaitSecs();
00086     timeout.tv_usec = 0;
00087     
00088     while(!srvStop)
00089     {
00090         tval.tv_sec = timeout.tv_sec;
00091         tval.tv_usec = timeout.tv_usec;
00092         ret = os::select(0, NULL, 0, 0, &tval);
00093         printf("Checking for cache updates\n");
00094         ret = getRecordsFromTargetDb(1);       
00095         if (ret !=0) srvStop = 1;
00096         //ret = getRecordsFromTargetDb(2);       
00097         if (ret !=0) srvStop = 1;
00098     }
00099     printf("Cache Server Exiting\n");
00100     conn.close();
00101     targetconn->disconnect();
00102     return 0;
00103 }
00104 int getRecordsFromTargetDb(int mode)
00105 {
00106     int pkid;
00107     char tablename[64];
00108     int op, id;
00109     int rows =0;
00110     DbRetVal rv = OK;
00111     AbsSqlStatement *stmt = SqlFactory::createStatement(CSqlAdapter);
00112     stmt->setConnection(targetconn);
00113     AbsSqlStatement *delstmt = SqlFactory::createStatement(CSqlAdapter);
00114     delstmt->setConnection(targetconn);
00115     if (mode == 1 ) {
00116         rv = stmt->prepare("SELECT * FROM csql_log_int;");
00117         rv = delstmt->prepare("DELETE FROM csql_log_int where id =?;");
00118     }
00119     else {
00120         rv = stmt->prepare("SELECT * FROM csql_log_char;");
00121         rv = delstmt->prepare("DELETE FROM csql_log_char where id =?;");
00122     }
00123     if (rv != OK) return 1;
00124     stmt->bindField(1, tablename);
00125     stmt->bindField(2, &pkid);
00126     stmt->bindField(3, &op);
00127     stmt->bindField(4, &id);
00128     DatabaseManager *dbMgr = conn.getDatabaseManager();
00129     while(true) {
00130       rv = targetconn->beginTrans();
00131       rv = stmt->execute(rows);
00132       if (rv != OK) 
00133       {
00134           printError(ErrSysInit, "Unable to execute stmt in target db");
00135           targetconn->rollback();
00136           stmt->free();
00137           delstmt->free();
00138           delete stmt;
00139           delete delstmt;
00140           return 1;
00141       }
00142       if (stmt->fetch() != NULL) {
00143           printf("Row value is %s %d %d\n", tablename, pkid, op);
00144           Table *table = dbMgr->openTable(tablename);
00145           int ret = 0;
00146           if (table == NULL)
00147           {
00148               printError(ErrSysInit, "Table %s not exist in csql", tablename);
00149               targetconn->rollback();
00150               stmt->free();
00151               delstmt->free();
00152               delete stmt;
00153               delete delstmt;
00154               break;
00155           }
00156           if (op == 2)//DELETE
00157           {
00158               ret = remove(table,pkid);
00159           }
00160           else //INSERT
00161           {
00162               ret = insert(table, pkid);
00163           }
00164           dbMgr->closeTable(table);
00165           rv = targetconn->commit();
00166           rv = targetconn->beginTrans();
00167           //Remove record from csql_log_XXX table
00168           delstmt->setIntParam(1, id);
00169           rv = delstmt->execute(rows);
00170           if (rv != OK) 
00171           {
00172               printf("log record not deleted from the target db %d\n", rv);
00173               targetconn->rollback();
00174               stmt->free();
00175               delstmt->free();
00176               delete stmt;
00177               delete delstmt;
00178           }
00179           
00180           rv = targetconn->commit();
00181           //create table csql_log_int(tablename char(64), pkid int, op int, id int not null unique auto_increment);
00182      }
00183      else {
00184          stmt->close();
00185          break;
00186      }
00187      stmt->close();
00188    }
00189    stmt->free();
00190    delstmt->free();
00191    delete stmt;
00192    delete delstmt;
00193    return 0;
00194 }
00195 int insert(Table *table, int pkid)
00196 {
00197     AbsSqlStatement *stmt = SqlFactory::createStatement(CSqlAdapter);
00198     stmt->setConnection(targetconn);
00199     SqlOdbcStatement *ostmt = (SqlOdbcStatement*) stmt;
00200     char pkfieldname[128];
00201     ostmt->getPrimaryKeyFieldName(table->getName(), pkfieldname);
00202     char sbuf[1024];
00203     sprintf(sbuf, "SELECT * FROM %s where %s = %d;", table->getName(), pkfieldname, pkid);
00204     //TODO::get the primary key field name from the table interface. need to implement it
00205     DbRetVal rv = stmt->prepare(sbuf);
00206     if (rv != OK) return 1;
00207 
00208     List fNameList = table->getFieldNameList();
00209     ListIterator fNameIter = fNameList.getIterator();
00210     FieldInfo *info = new FieldInfo();
00211     int fcount =1; void *valBuf; int fieldsize=0;
00212     void *buf[128];//TODO:resticts to support only 128 fields in table
00213     Identifier *elem = NULL;
00214     while (fNameIter.hasElement()) {
00215         elem = (Identifier*) fNameIter.nextElement();
00216         table->getFieldInfo((const char*)elem->name, info);
00217         valBuf = AllDataType::alloc(info->type, info->length);
00218         buf[fcount] = valBuf;
00219         table->bindFld(elem->name, valBuf);
00220         stmt->bindField(fcount++, valBuf);
00221         
00222     }
00223     delete info;
00224     int rows=0;
00225     int retValue = stmt->execute(rows);
00226     if (retValue && rows != 1) {printError(ErrSysInit, "Unable to execute statement at target db\n"); return ErrSysInit; }
00227     conn.startTransaction();
00228     if (stmt->fetch() != NULL) {
00229         table->insertTuple();
00230         //Note:insert may fail if the record is inserted from this cache
00231     }
00232     for (int i=1; i < fcount; i++) {
00233         free(buf[i]);
00234     }
00235     stmt->free();
00236     delete stmt;
00237     conn.commit();      
00238     return 0;
00239 }
00240 int remove(Table *table, int pkid)
00241 {
00242     DbRetVal rv = OK;
00243     AbsSqlStatement *stmt = SqlFactory::createStatement(CSqlAdapter);
00244     stmt->setConnection(targetconn);
00245     SqlOdbcStatement *ostmt = (SqlOdbcStatement*) stmt;
00246     char pkfieldname[128];
00247     ostmt->getPrimaryKeyFieldName(table->getName(), pkfieldname);
00248     delete stmt;
00249     Condition p1;
00250     p1.setTerm(pkfieldname, OpEquals, &pkid);
00251     table->setCondition(&p1);
00252     rv = conn.startTransaction();
00253     if (rv != OK) return 1;
00254     rv = table->execute();
00255     if (rv != OK)  
00256     {
00257         table->setCondition(NULL); 
00258         conn.rollback();
00259         return 1;
00260     }
00261     if (table->fetch() != NULL)
00262         rv = table->deleteTuple();
00263         //Note:Delete may fail if the record is deleted from this cache
00264     table->setCondition(NULL);
00265     rv = conn.commit();
00266     if (rv != OK) return 1;
00267     return 0;
00268 }
00269 

Generated on Mon Jun 9 22:37:15 2008 for csql by  doxygen 1.4.7