00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #include <AbsSqlConnection.h>
00017 #include <AbsSqlStatement.h>
00018 #include <SqlOdbcStatement.h>
00019 #include <SqlFactory.h>
00020 #include <CSql.h>
00021
00022 int insert(Table *table, int pkid);
00023 int remove(Table *table, int pkid);
00024 int getRecordsFromTargetDb(int mode);
00025
00026 int srvStop =0;
00027 static void sigTermHandler(int sig)
00028 {
00029 printf("Received signal %d\nStopping the server\n", sig);
00030 srvStop = 1;
00031 }
00032
00033 void printUsage()
00034 {
00035 printf("Usage: csqlcacheserver \n");
00036 printf("Description: Start the csql caching server.\n");
00037 return;
00038 }
00039 AbsSqlConnection *targetconn;
00040 Connection conn;
00041 int main(int argc, char **argv)
00042 {
00043 int c = 0, opt = 0;
00044 while ((c = getopt(argc, argv, "?")) != EOF)
00045 {
00046 switch (c)
00047 {
00048 case '?' : { opt = 10; break; }
00049 default: opt=10;
00050
00051 }
00052 }
00053
00054 if (opt == 10) {
00055 printUsage();
00056 return 0;
00057 }
00058
00059 os::signal(SIGINT, sigTermHandler);
00060 os::signal(SIGTERM, sigTermHandler);
00061
00062 targetconn = SqlFactory::createConnection(CSqlAdapter);
00063 DbRetVal rv = targetconn->connect("root", "manager");
00064 if (rv != OK) return 1;
00065 rv = conn.open("root", "manager");
00066 if (rv != OK) return 1;
00067 if (!Conf::config.useCache())
00068 {
00069 printf("Cache is set to OFF in csql.conf file\n");
00070 return 1;
00071 }
00072 AbsSqlStatement *stmt = SqlFactory::createStatement(CSqlAdapter);
00073 stmt->setConnection(targetconn);
00074 rv = stmt->prepare("create table csql_log_int(tablename char(64), pkid int, operation int, id int not null unique auto_increment)engine='innodb';");
00075 targetconn->beginTrans();
00076 int rows=0;
00077 stmt->execute(rows);
00078 targetconn->commit();
00079 stmt->free();
00080 delete stmt;
00081
00082 printf("Cache server started\n");
00083 int ret = 0;
00084 struct timeval timeout, tval;
00085 timeout.tv_sec = Conf::config.getCacheWaitSecs();
00086 timeout.tv_usec = 0;
00087
00088 while(!srvStop)
00089 {
00090 tval.tv_sec = timeout.tv_sec;
00091 tval.tv_usec = timeout.tv_usec;
00092 ret = os::select(0, NULL, 0, 0, &tval);
00093 printf("Checking for cache updates\n");
00094 ret = getRecordsFromTargetDb(1);
00095 if (ret !=0) srvStop = 1;
00096
00097 if (ret !=0) srvStop = 1;
00098 }
00099 printf("Cache Server Exiting\n");
00100 conn.close();
00101 targetconn->disconnect();
00102 return 0;
00103 }
00104 int getRecordsFromTargetDb(int mode)
00105 {
00106 int pkid;
00107 char tablename[64];
00108 int op, id;
00109 int rows =0;
00110 DbRetVal rv = OK;
00111 AbsSqlStatement *stmt = SqlFactory::createStatement(CSqlAdapter);
00112 stmt->setConnection(targetconn);
00113 AbsSqlStatement *delstmt = SqlFactory::createStatement(CSqlAdapter);
00114 delstmt->setConnection(targetconn);
00115 if (mode == 1 ) {
00116 rv = stmt->prepare("SELECT * FROM csql_log_int;");
00117 rv = delstmt->prepare("DELETE FROM csql_log_int where id =?;");
00118 }
00119 else {
00120 rv = stmt->prepare("SELECT * FROM csql_log_char;");
00121 rv = delstmt->prepare("DELETE FROM csql_log_char where id =?;");
00122 }
00123 if (rv != OK) return 1;
00124 stmt->bindField(1, tablename);
00125 stmt->bindField(2, &pkid);
00126 stmt->bindField(3, &op);
00127 stmt->bindField(4, &id);
00128 DatabaseManager *dbMgr = conn.getDatabaseManager();
00129 while(true) {
00130 rv = targetconn->beginTrans();
00131 rv = stmt->execute(rows);
00132 if (rv != OK)
00133 {
00134 printError(ErrSysInit, "Unable to execute stmt in target db");
00135 targetconn->rollback();
00136 stmt->free();
00137 delstmt->free();
00138 delete stmt;
00139 delete delstmt;
00140 return 1;
00141 }
00142 if (stmt->fetch() != NULL) {
00143 printf("Row value is %s %d %d\n", tablename, pkid, op);
00144 Table *table = dbMgr->openTable(tablename);
00145 int ret = 0;
00146 if (table == NULL)
00147 {
00148 printError(ErrSysInit, "Table %s not exist in csql", tablename);
00149 targetconn->rollback();
00150 stmt->free();
00151 delstmt->free();
00152 delete stmt;
00153 delete delstmt;
00154 break;
00155 }
00156 if (op == 2)
00157 {
00158 ret = remove(table,pkid);
00159 }
00160 else
00161 {
00162 ret = insert(table, pkid);
00163 }
00164 dbMgr->closeTable(table);
00165 rv = targetconn->commit();
00166 rv = targetconn->beginTrans();
00167
00168 delstmt->setIntParam(1, id);
00169 rv = delstmt->execute(rows);
00170 if (rv != OK)
00171 {
00172 printf("log record not deleted from the target db %d\n", rv);
00173 targetconn->rollback();
00174 stmt->free();
00175 delstmt->free();
00176 delete stmt;
00177 delete delstmt;
00178 }
00179
00180 rv = targetconn->commit();
00181
00182 }
00183 else {
00184 stmt->close();
00185 break;
00186 }
00187 stmt->close();
00188 }
00189 stmt->free();
00190 delstmt->free();
00191 delete stmt;
00192 delete delstmt;
00193 return 0;
00194 }
00195 int insert(Table *table, int pkid)
00196 {
00197 AbsSqlStatement *stmt = SqlFactory::createStatement(CSqlAdapter);
00198 stmt->setConnection(targetconn);
00199 SqlOdbcStatement *ostmt = (SqlOdbcStatement*) stmt;
00200 char pkfieldname[128];
00201 ostmt->getPrimaryKeyFieldName(table->getName(), pkfieldname);
00202 char sbuf[1024];
00203 sprintf(sbuf, "SELECT * FROM %s where %s = %d;", table->getName(), pkfieldname, pkid);
00204
00205 DbRetVal rv = stmt->prepare(sbuf);
00206 if (rv != OK) return 1;
00207
00208 List fNameList = table->getFieldNameList();
00209 ListIterator fNameIter = fNameList.getIterator();
00210 FieldInfo *info = new FieldInfo();
00211 int fcount =1; void *valBuf; int fieldsize=0;
00212 void *buf[128];
00213 Identifier *elem = NULL;
00214 while (fNameIter.hasElement()) {
00215 elem = (Identifier*) fNameIter.nextElement();
00216 table->getFieldInfo((const char*)elem->name, info);
00217 valBuf = AllDataType::alloc(info->type, info->length);
00218 buf[fcount] = valBuf;
00219 table->bindFld(elem->name, valBuf);
00220 stmt->bindField(fcount++, valBuf);
00221
00222 }
00223 delete info;
00224 int rows=0;
00225 int retValue = stmt->execute(rows);
00226 if (retValue && rows != 1) {printError(ErrSysInit, "Unable to execute statement at target db\n"); return ErrSysInit; }
00227 conn.startTransaction();
00228 if (stmt->fetch() != NULL) {
00229 table->insertTuple();
00230
00231 }
00232 for (int i=1; i < fcount; i++) {
00233 free(buf[i]);
00234 }
00235 stmt->free();
00236 delete stmt;
00237 conn.commit();
00238 return 0;
00239 }
00240 int remove(Table *table, int pkid)
00241 {
00242 DbRetVal rv = OK;
00243 AbsSqlStatement *stmt = SqlFactory::createStatement(CSqlAdapter);
00244 stmt->setConnection(targetconn);
00245 SqlOdbcStatement *ostmt = (SqlOdbcStatement*) stmt;
00246 char pkfieldname[128];
00247 ostmt->getPrimaryKeyFieldName(table->getName(), pkfieldname);
00248 delete stmt;
00249 Condition p1;
00250 p1.setTerm(pkfieldname, OpEquals, &pkid);
00251 table->setCondition(&p1);
00252 rv = conn.startTransaction();
00253 if (rv != OK) return 1;
00254 rv = table->execute();
00255 if (rv != OK)
00256 {
00257 table->setCondition(NULL);
00258 conn.rollback();
00259 return 1;
00260 }
00261 if (table->fetch() != NULL)
00262 rv = table->deleteTuple();
00263
00264 table->setCondition(NULL);
00265 rv = conn.commit();
00266 if (rv != OK) return 1;
00267 return 0;
00268 }
00269