1. 打开ulog文件
1308308 1308309 1308310 1308311 1308312 1308313 1308314 1308315 1308316 1308317 1308318 1308319 1308320 1308321 1308322 1308323 1308324 1308325 1308326 1308327 1308328 1308329 1308330 1308331 1308332 1308333 1308334 1308335 1308336 1308337 1308338 1308339 1308340 1308341 1308342 1308343 1308344 1308345 1308346 1308347 1308348 1308349 1308350 1308351 1308352 1308353 1308354 1308355 1308356 1308357 1308358 1308359 |
/* Create a log reader object. */ TCULRD *tculrdnew(TCULOG *ulog, uint64_t ts){ assert(ulog); if(!ulog->base) return NULL; if(pthread_rwlock_rdlock(&ulog->rwlck) != 0) return NULL; TCLIST *names = tcreaddir(ulog->base); if(!names){ pthread_rwlock_unlock(&ulog->rwlck); return NULL; } int ln = tclistnum(names); int max = 0; for(int i = 0; i < ln; i++){ const char *name = tclistval2(names, i); if(!tcstrbwm(name, TCULSUFFIX)) continue; int id = tcatoi(name); char *path = tcsprintf("%s/%08d%s", ulog->base, id, TCULSUFFIX); struct stat sbuf; if(stat(path, &sbuf) == 0 && S_ISREG(sbuf.st_mode) && id > max) max = id; tcfree(path); } tclistdel(names); if(max < 1) max = 1; uint64_t bts = (ts > TCULTMDEVALW * 1000000) ? ts - TCULTMDEVALW * 1000000 : 0; int num = 0; for(int i = max; i > 0; i--){ char *path = tcsprintf("%s/%08d%s", ulog->base, i, TCULSUFFIX); int fd = open(path, O_RDONLY, 00644); tcfree(path); if(fd == -1) break; int rsiz = sizeof(uint8_t) + sizeof(uint64_t); unsigned char buf[rsiz]; uint64_t fts = INT64_MAX; if(tcread(fd, buf, rsiz)){ memcpy(&fts, buf + sizeof(uint8_t), sizeof(ts)); fts = TTNTOHLL(fts); } close(fd); num = i; if(bts >= fts) break; } if(num < 1) num = 1; TCULRD *urld = tcmalloc(sizeof(*urld)); urld->ulog = ulog; urld->ts = ts; urld->num = num; urld->fd = -1; urld->rbuf = tcmalloc(TTIOBUFSIZ); urld->rsiz = TTIOBUFSIZ; pthread_rwlock_unlock(&ulog->rwlck); return urld; } |
2. 读取ulog文件
394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 |
/* Read a message from a log reader object. */ const void *tculrdread(TCULRD *ulrd, int *sp, uint64_t *tsp, uint32_t *sidp, uint32_t *midp){ assert(ulrd && sp && tsp && sidp && midp); TCULOG *ulog = ulrd->ulog; if(pthread_rwlock_rdlock(&ulog->rwlck) != 0) return NULL; if(ulrd->fd == -1){ char *path = tcsprintf("%s/%08d%s", ulog->base, ulrd->num, TCULSUFFIX); ulrd->fd = open(path, O_RDONLY, 00644); tcfree(path); if(ulrd->fd == -1){ pthread_rwlock_unlock(&ulog->rwlck); return NULL; } } int rsiz = sizeof(uint8_t) + sizeof(uint64_t) + sizeof(uint32_t) * 2; unsigned char buf[rsiz]; uint64_t ts; uint32_t sid, mid, size; while(true){ if(ulog->aiocbs && ulrd->num == ulog->max){ struct stat sbuf; if(fstat(ulrd->fd, &sbuf) == -1 || (sbuf.st_size < ulog->size && sbuf.st_size >= ulog->aioend)){ pthread_rwlock_unlock(&ulog->rwlck); return NULL; } } if(!tcread(ulrd->fd, buf, rsiz)){ if(ulrd->num < ulog->max){ close(ulrd->fd); ulrd->num++; char *path = tcsprintf("%s/%08d%s", ulog->base, ulrd->num, TCULSUFFIX); ulrd->fd = open(path, O_RDONLY, 00644); tcfree(path); if(ulrd->fd == -1){ pthread_rwlock_unlock(&ulog->rwlck); return NULL; } continue; } pthread_rwlock_unlock(&ulog->rwlck); return NULL; } const unsigned char *rp = buf; if(*rp != TCULMAGICNUM){ pthread_rwlock_unlock(&ulog->rwlck); return NULL; } rp += sizeof(uint8_t); memcpy(&ts, rp, sizeof(ts)); ts = TTNTOHLL(ts); rp += sizeof(ts); uint16_t snum; memcpy(&snum, rp, sizeof(snum)); sid = TTNTOHS(snum); rp += sizeof(snum); memcpy(&snum, rp, sizeof(snum)); mid = TTNTOHS(snum); rp += sizeof(snum); memcpy(&size, rp, sizeof(size)); size = TTNTOHL(size); rp += sizeof(size); if(ulrd->rsiz < size + 1){ ulrd->rbuf = tcrealloc(ulrd->rbuf, size + 1); ulrd->rsiz = size + 1; } if(!tcread(ulrd->fd, ulrd->rbuf, size)){ pthread_rwlock_unlock(&ulog->rwlck); return NULL; } if(ts < ulrd->ts) continue; break; } *sp = size; *tsp = ts; *sidp = sid; *midp = mid; ulrd->rbuf[size] = '\0'; pthread_rwlock_unlock(&ulog->rwlck); return ulrd->rbuf; } |
3. 复制协议
参看: ttserver.c:do_slave
打开连接:
tcreplopen(repl, arg->host, arg->port, arg->rts + 1, sid)
读取信息:
tcreplread(repl, &rsiz, &rts, &rsid)
4. 其它
/* Receive data by a socket. */
bool ttsockrecv(TTSOCK *sock, char *buf, int size){
/* Receive one byte by a socket. */
int ttsockgetc(TTSOCK *sock){
/* Receive one line by a socket. */
bool ttsockgets(TTSOCK *sock, char *buf, int size){
/* Receive an 32-bit integer by a socket. */
uint32_t ttsockgetint32(TTSOCK *sock){
各命令的记录格式可参看: tculogadbredo(…)