FAQ Search Today's Posts Mark Forums Read
» Video Reviews

» Linux Archive

Linux-archive is a website aiming to archive linux email lists and to make them easily accessible for linux users/developers.


» Sponsor

» Partners

» Sponsor

Go Back   Linux Archive > Redhat > Cluster Development

 
 
LinkBack Thread Tools
 
Old 02-08-2008, 01:30 PM
 
Default cluster/cmirror/src cluster.c cluster.h functi ...

CVSROOT: /cvs/cluster
Module name: cluster
Branch: RHEL5
Changes by: jbrassow@sourceware.org 2008-02-08 14:30:10

Modified files:
cmirror/src : cluster.c cluster.h functions.c functions.h
local.c

Log message:
- stop delaying disk log writes
- stop placing requests into the startup queue before initial config
- added recovering_region to checkpoint data to prevent duplicate region
syncing assignment.

Patches:
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/cluster.c.diff?cvsroot=cluster&only_with_tag=RHEL5 &r1=1.1.2.15&r2=1.1.2.16
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/cluster.h.diff?cvsroot=cluster&only_with_tag=RHEL5 &r1=1.1.2.2&r2=1.1.2.3
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/functions.c.diff?cvsroot=cluster&only_with_tag=RHE L5&r1=1.1.2.13&r2=1.1.2.14
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/functions.h.diff?cvsroot=cluster&only_with_tag=RHE L5&r1=1.1.2.4&r2=1.1.2.5
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/local.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r 1=1.1.2.13&r2=1.1.2.14

--- cluster/cmirror/src/Attic/cluster.c 2008/02/06 23:03:05 1.1.2.15
+++ cluster/cmirror/src/Attic/cluster.c 2008/02/08 14:30:10 1.1.2.16
@@ -27,6 +27,13 @@
static SaCkptCallbacksT callbacks = { 0, 0 };
static SaVersionT version = { 'B', 1, 1 };

+#define DEBUGGING_HISTORY 20
+static char debugging[DEBUGGING_HISTORY][128];
+static int idx = 0;
+static int memberz = 0;
+static int doit = 0;
+
+
struct checkpoint_data {
uint32_t requester;
char uuid[CPG_MAX_NAME_LENGTH];
@@ -34,6 +41,7 @@
int bitmap_size; /* in bytes */
char *sync_bits;
char *clean_bits;
+ char *recovering_region;
struct checkpoint_data *next;
};

@@ -58,44 +66,18 @@
static struct list_head clog_cpg_list;

/*
- * flow_control
- * @handle
- *
- * Returns: 1 if flow control needed, 0 otherwise
- */
-static int flow_control(cpg_handle_t handle)
-{
- cpg_flow_control_state_t flow_control_state;
- cpg_error_t error;
-
- /* FIXME: no flow control for now (cmirror should self regulate) */
- return 0;
-
- error = cpg_flow_control_state_get(handle, &flow_control_state);
- if (error != CPG_OK) {
- LOG_ERROR("Failed to get flow control state. Reason: %d", error);
- /* FIXME: Better error handling */
- return 0;
- }
-
- return (flow_control_state == CPG_FLOW_CONTROL_ENABLED) ? 1 : 0;
-}
-
-/*
* cluster_send
* @tfr
*
* Returns: 0 on success, -Exxx on error
*/
-int cluster_send(struct clog_tfr *tfr)
+static int cluster_send(struct clog_tfr *tfr)
{
int r;
int found;
struct iovec iov;
struct clog_cpg *entry, *tmp;

- ENTER();
-
list_for_each_entry_safe(entry, tmp, &clog_cpg_list, list)
if (!strncmp(entry->name.value, tfr->uuid, CPG_MAX_NAME_LENGTH)) {
found = 1;
@@ -104,26 +86,35 @@

if (!found) {
tfr->error = -ENOENT;
- EXIT();
return -ENOENT;
}

iov.iov_base = tfr;
iov.iov_len = sizeof(struct clog_tfr) + tfr->data_size;
- while (flow_control(entry->handle)) {
- /*
- * FIXME: Don't need to sleep this long
- *
- * ... or, we could dispatch the queued messages here.
- */
- LOG_PRINT("Flow control enabled. Delaying msg [%s]",
- RQ_TYPE(tfr->request_type));
- sleep(1);
- }
+
r = cpg_mcast_joined(entry->handle, CPG_TYPE_AGREED, &iov, 1);
+ if (r == CPG_OK)
+ return 0;
+ if (r == SA_AIS_ERR_TRY_AGAIN)
+ return -EAGAIN;
+
+ LOG_ERROR("cpg_mcast_joined error: %d", r);
+
+ tfr->error = -EBADE;
+ return -EBADE;
+}
+
+int cluster_send_helper(struct clog_tfr *tfr, int line, char *file, const char *function)
+{
+ int r;
+
+ do {
+ r = cluster_send(tfr);
+ if (r)
+ LOG_ERROR("cluster_send failed at: %s:%d (%s)",
+ file, line, function);
+ } while (r == -EAGAIN);

- EXIT();
- tfr->error = r = (r == CPG_OK) ? 0 : -EBADE;
return r;
}

@@ -137,7 +128,7 @@
return r;
}

-static int handle_cluster_request(struct clog_tfr *tfr, int server)
+static int handle_cluster_request(struct clog_tfr *tfr, int server, int printz)
{
int r = 0;

@@ -152,27 +143,22 @@
*/
if ((tfr->request_type != DM_CLOG_RESUME) ||
(tfr->originator == my_cluster_id))
- r = do_request(tfr);
+ r = do_request(tfr, server);

if (server) {
- if (r)
- LOG_ERROR("do_request failed, unable to commit log");
- else
- r = commit_log(tfr);
-
tfr->request_type |= DM_CLOG_RESPONSE;

/*
* Errors from previous functions are in the tfr struct.
*/
-
- LOG_DBG("Sending response to %u on cluster: [%s/%llu]",
- tfr->originator,
- RQ_TYPE(tfr->request_type & ~DM_CLOG_RESPONSE),
- (unsigned long long)tfr->seq);
+ if (printz)
+ LOG_DBG("[%s] Sending response to %u on cluster: [%s/%llu]",
+ SHORT_UUID(tfr->uuid), tfr->originator,
+ RQ_TYPE(tfr->request_type & ~DM_CLOG_RESPONSE),
+ (unsigned long long)tfr->seq);
r = cluster_send(tfr);
if (r)
- LOG_ERROR("cluster_send failed");
+ LOG_ERROR("cluster_send failed: %s", strerror(-r));
}

EXIT();
@@ -209,6 +195,8 @@
INIT_LIST_HEAD(&l);
queue_remove_all(&l, cluster_queue);
LOG_ERROR("Current list:");
+ if (list_empty(&l))
+ LOG_ERROR(" [none]");
list_for_each_safe(p, n, &l) {
list_del_init(p);
t = (struct clog_tfr *)p;
@@ -257,6 +245,7 @@
static struct checkpoint_data *prepare_checkpoint(struct clog_cpg *entry,
uint32_t cp_requester)
{
+ int r;
struct checkpoint_data *new;

new = malloc(sizeof(*new));
@@ -270,7 +259,7 @@
strncpy(new->uuid, entry->name.value, entry->name.length);

if (entry->valid) {
- new->bitmap_size = store_bits(entry->name.value, "clean_bits",
+ new->bitmap_size = push_state(entry->name.value, "clean_bits",
&new->clean_bits);
if (new->bitmap_size <= 0) {
LOG_ERROR("Failed to store clean_bits to checkpoint for node %u",
@@ -279,7 +268,7 @@
return NULL;
}

- new->bitmap_size = store_bits(entry->name.value,
+ new->bitmap_size = push_state(entry->name.value,
"sync_bits", &new->sync_bits);
if (new->bitmap_size <= 0) {
LOG_ERROR("Failed to store sync_bits to checkpoint for node %u",
@@ -288,6 +277,16 @@
free(new);
return NULL;
}
+
+ r = push_state(entry->name.value, "recovering_region", &new->recovering_region);
+ if (r <= 0) {
+ LOG_ERROR("Failed to store recovering_region to checkpoint for node %u",
+ new->requester);
+ free(new->sync_bits);
+ free(new->clean_bits);
+ free(new);
+ return NULL;
+ }
} else {
/*
* We can store bitmaps yet, because the log is not
@@ -309,6 +308,7 @@
*/
static void free_checkpoint(struct checkpoint_data *cp)
{
+ free(cp->recovering_region);
free(cp->sync_bits);
free(cp->clean_bits);
free(cp);
@@ -335,9 +335,9 @@
name.length = len;

attr.creationFlags = SA_CKPT_WR_ALL_REPLICAS;
- attr.checkpointSize = cp->bitmap_size * 2;
+ attr.checkpointSize = cp->bitmap_size * 2 + strlen(cp->recovering_region) + 1;
attr.retentionDuration = SA_TIME_MAX;
- attr.maxSections = 3; /* don't know why we need +1 */
+ attr.maxSections = 4; /* don't know why we need +1 */
attr.maxSectionSize = cp->bitmap_size;
attr.maxSectionIdSize = 22;

@@ -363,6 +363,7 @@
EXIT();
return -EIO; /* FIXME: better error */
}
+
/*
* Add section for sync_bits
*/
@@ -408,7 +409,7 @@
}

if (rv == SA_AIS_ERR_EXIST) {
- LOG_ERROR("export_checkpoint: clean checkpoint section already exists");
+ LOG_DBG("export_checkpoint: clean checkpoint section already exists");
EXIT();
return -EEXIST;
}
@@ -419,6 +420,35 @@
return -EIO; /* FIXME: better error */
}

+ /*
+ * Add section for recovering_region
+ */
+ section_id.idLen = snprintf(buf, 32, "recovering_region");
+ section_id.id = (unsigned char *)buf;
+ section_attr.sectionId = &section_id;
+ section_attr.expirationTime = SA_TIME_END;
+
+rr_create_retry:
+ rv = saCkptSectionCreate(h, &section_attr, cp->recovering_region,
+ strlen(cp->recovering_region) + 1);
+ if (rv == SA_AIS_ERR_TRY_AGAIN) {
+ LOG_ERROR("export_checkpoint: RR create retry");
+ sleep(1);
+ goto rr_create_retry;
+ }
+
+ if (rv == SA_AIS_ERR_EXIST) {
+ LOG_DBG("export_checkpoint: RR checkpoint section already exists");
+ EXIT();
+ return -EEXIST;
+ }
+
+ if (rv != SA_AIS_OK) {
+ LOG_ERROR("export_checkpoint: RR checkpoint section creation failed");
+ EXIT();
+ return -EIO; /* FIXME: better error */
+ }
+
LOG_DBG("export_checkpoint: closing checkpoint");
saCkptCheckpointClose(h);

@@ -515,7 +545,7 @@
break;
}
saCkptSectionIterationFinalize(itr);
- if (len != 2) {
+ if (len != 3) {
LOG_ERROR("import_checkpoint: %d checkpoint sections found", len);
sleep(1);
goto init_retry;
@@ -572,8 +602,9 @@
*/

if (iov.readSize) {
- if (load_bits(entry->name.value, (char *)desc.sectionId.id, bitmap, iov.readSize)) {
- LOG_ERROR("Error loading bits");
+ if (pull_state(entry->name.value, (char *)desc.sectionId.id, bitmap,
+ iov.readSize)) {
+ LOG_ERROR("Error loading state");
rtn = -EIO;
goto fail;
}
@@ -645,6 +676,7 @@
int i;
int r = 0;
int i_am_server;
+ int response = 0;
struct clog_tfr *tfr = msg;
struct clog_tfr *startup_tfr = NULL;
struct clog_cpg *match;
@@ -665,7 +697,9 @@
(unsigned long long)tfr->seq);

if (my_cluster_id == 0xDEAD) {
- LOG_DBG("Message before init... ignoring.
");
+ LOG_DBG("[%s] Message from %u before init [%s/%llu]",
+ SHORT_UUID(tfr->uuid), nodeid,
+ RQ_TYPE(tfr->request_type), (unsigned long long) tfr->seq);
return;
}

@@ -674,6 +708,14 @@
LOG_ERROR("Unable to find clog_cpg for cluster message");
return;
}
+
+ if (match->lowest_id == 0xDEAD) {
+ LOG_DBG("[%s] Message from %u before init* [%s/%llu]",
+ SHORT_UUID(tfr->uuid), nodeid,
+ RQ_TYPE(tfr->request_type), (unsigned long long) tfr->seq);
+ return;
+ }
+
i_am_server = (my_cluster_id == match->lowest_id) ? 1 : 0;

if (tfr->request_type == DM_CLOG_CHECKPOINT_READY) {
@@ -691,7 +733,7 @@
LOG_DBG("Processing delayed request %d: %s",
match->startup_queue->count,
RQ_TYPE(startup_tfr->request_type));
- r = handle_cluster_request(startup_tfr, i_am_server);
+ r = handle_cluster_request(startup_tfr, i_am_server, 1);

if (r) {
LOG_ERROR("Error while processing delayed CPG message");
@@ -732,9 +774,10 @@
match->checkpoint_list = new;
}

- if (tfr->request_type & DM_CLOG_RESPONSE)
+ if (tfr->request_type & DM_CLOG_RESPONSE) {
+ response = 1;
r = handle_cluster_response(tfr);
- else {
+ } else {
tfr->originator = nodeid;

if (!match->valid) {
@@ -757,15 +800,40 @@
goto out;
}

- r = handle_cluster_request(tfr, i_am_server);
+ r = handle_cluster_request(tfr, i_am_server,
+ ((memberz != 4) || (--doit > 0)));
}

out:
if (r) {
- LOG_ERROR("[%s] Error while processing CPG message, %s: %d",
+ LOG_ERROR("[%s] Error while processing CPG message, %s: %s",
SHORT_UUID(tfr->uuid),
RQ_TYPE(tfr->request_type & ~DM_CLOG_RESPONSE),
- r);
+ strerror(-r));
+ LOG_ERROR("[%s] Response : %s", SHORT_UUID(tfr->uuid),
+ (response) ? "YES" : "NO");
+ LOG_ERROR("[%s] Originator: %u", SHORT_UUID(tfr->uuid), tfr->originator);
+ if (response)
+ LOG_ERROR("[%s] Responder : %u", SHORT_UUID(tfr->uuid), nodeid);
+ LOG_ERROR("HISTORY::");
+
+ for (i = 0; i < DEBUGGING_HISTORY; i++) {
+ idx++;
+ idx = idx % DEBUGGING_HISTORY;
+ if (debugging[idx][0] == '')
+ continue;
+ LOG_ERROR("%d:%d) %s", i, idx, debugging[idx]);
+ }
+ } else if (!(tfr->request_type & DM_CLOG_RESPONSE)) {
+ int len;
+ idx++;
+ idx = idx % DEBUGGING_HISTORY;
+ len = sprintf(debugging[idx], "SEQ#=%llu, UUID=%s, TYPE=%s, ORIG=%u, RESP=%s",
+ (unsigned long long)tfr->seq, SHORT_UUID(tfr->uuid),
+ RQ_TYPE(tfr->request_type),
+ tfr->originator, (response) ? "YES" : "NO");
+ if (response)
+ sprintf(debugging[idx] + len, ", RSPR=%u", nodeid);
}
EXIT();
}
@@ -779,10 +847,12 @@
int my_pid = getpid();
int found = 0;
struct clog_cpg *match, *tmp;
- uint32_t lowest;
+ uint32_t lowest = 0xDEAD;

ENTER();

+ memberz = member_list_entries;
+
LOG_DBG("****** CPG config callback **[%s]**",
SHORT_UUID(gname->value));

@@ -821,6 +891,8 @@
goto out;
}

+ lowest = match->lowest_id;
+
/* Am I leaving? */
for (i = 0; i < left_list_entries; i++)
if (my_cluster_id == left_list[i].nodeid) {
@@ -863,6 +935,7 @@

free(match->startup_queue);
match->free_me = 1;
+ match->lowest_id = 0xDEAD;

goto out;
}
@@ -871,8 +944,6 @@
if (!left_list_entries &&
(member_list_entries == 1) && (joined_list_entries == 1) &&
(member_list[0].nodeid == joined_list[0].nodeid)) {
- LOG_DBG("[%s] I am the log server (and first to join)",
- SHORT_UUID(match->name.value));
match->lowest_id = my_cluster_id = joined_list[0].nodeid;
match->valid = 1;
goto out;
@@ -894,17 +965,15 @@
}
}

- lowest = match->lowest_id;
+ if (member_list_entries)
+ match->lowest_id = member_list[0].nodeid;
+ else
+ match->lowest_id = 0xDEAD;
/* Find the lowest_id, i.e. the server */
- for (i = 0, match->lowest_id = member_list[0].nodeid;
- i < member_list_entries; i++)
+ for (i = 0; i < member_list_entries; i++)
if (match->lowest_id > member_list[i].nodeid)
match->lowest_id = member_list[i].nodeid;

- if (lowest != match->lowest_id)
- LOG_DBG("[%s] Server is now %u", SHORT_UUID(match->name.value),
- match->lowest_id);
-
/*
* If I am part of the joining list, I do not send checkpoints
* FIXME: What are the cases where multiple nodes can join?
@@ -920,6 +989,21 @@
match->checkpoints_needed += i;

out:
+ if (lowest != match->lowest_id)
+ LOG_DBG("[%s] Server change %u -> %u (%u %s)",
+ SHORT_UUID(match->name.value),
+ lowest, match->lowest_id,
+ (joined_list_entries) ? joined_list[0].nodeid : left_list[0].nodeid,
+ (joined_list_entries && (member_list_entries == 1)) ?
+ "is first to join" : (joined_list_entries) ? "joined" : "left");
+ else
+ LOG_DBG("[%s] Server unchanged at %u (%u %s)",
+ SHORT_UUID(match->name.value), lowest,
+ (joined_list_entries) ? joined_list[0].nodeid : left_list[0].nodeid,
+ (joined_list_entries) ? "joined" : "left");
+
+ if (joined_list_entries && (joined_list[0].nodeid == my_cluster_id))
+ doit = 25;
EXIT();
}

@@ -1019,6 +1103,12 @@

ENTER();

+ {
+ int i;
+ for(i = 0; i < DEBUGGING_HISTORY; i++)
+ debugging[i][0] = '';
+ }
+
INIT_LIST_HEAD(&clog_cpg_list);
rv = saCkptInitialize(&ckpt_handle, &callbacks, &version);

--- cluster/cmirror/src/Attic/cluster.h 2008/01/23 21:21:06 1.1.2.2
+++ cluster/cmirror/src/Attic/cluster.h 2008/02/08 14:30:10 1.1.2.3
@@ -7,6 +7,7 @@
int create_cluster_cpg(char *str);
int destroy_cluster_cpg(char *str);

-int cluster_send(struct clog_tfr *tfr);
+int cluster_send_helper(struct clog_tfr *tfr, int line, char *file, const char *function);
+#define cluster_send(x) cluster_send_helper((x), __LINE__, __FILE__, __FUNCTION__)

#endif /* __CLUSTER_LOG_CLUSTER_DOT_H__ */
--- cluster/cmirror/src/Attic/functions.c 2008/02/06 23:03:05 1.1.2.13
+++ cluster/cmirror/src/Attic/functions.c 2008/02/08 14:30:10 1.1.2.14
@@ -33,14 +33,6 @@
uint64_t nr_regions;
};

-/*
- * Used by the 'touched' variable, these macros mean:
- * LOG_CHANGED - bits in the in-memory log have changed
- * LOG_FLUSH - log must be committed to disk
- */
-#define LOG_CHANGED 1
-#define LOG_FLUSH 2
-
struct log_c {
struct list_head list;

@@ -103,13 +95,13 @@
static void log_set_bit(struct log_c *lc, uint32_t *bs, unsigned bit)
{
ext2fs_set_bit(bit, (unsigned int *) bs);
- lc->touched |= LOG_CHANGED;
+ lc->touched = 1;
}

static void log_clear_bit(struct log_c *lc, uint32_t *bs, unsigned bit)
{
ext2fs_clear_bit(bit, (unsigned int *) bs);
- lc->touched |= LOG_CHANGED;
+ lc->touched = 1;
}

/* FIXME: Why aren't count and start the same type? */
@@ -205,7 +197,7 @@
if (r < 0) {
LOG_ERROR("rw_log: write failure: %s",
strerror(errno));
- return -EIO;
+ return -EIO; /* Failed disk write */
}
return 0;
}
@@ -216,7 +208,7 @@
LOG_ERROR("rw_log: read failure: %s",
strerror(errno));
if (r != lc->disk_size)
- return -EIO;
+ return -EIO; /* Failed disk read */
return 0;
}

@@ -239,7 +231,7 @@
memset(&lh, 0, sizeof(struct log_header));

if (rw_log(lc, 0))
- return -EIO;
+ return -EIO; /* Failed disk read */

header_from_disk(&lh, lc->disk_buffer);
if (lh.magic != MIRROR_MAGIC) {
@@ -285,8 +277,10 @@
bitset_size += (lc->region_count % 8) ? 1 : 0;
memcpy(lc->disk_buffer + 1024, lc->sync_bits, bitset_size);

- if (rw_log(lc, 1))
- return -EIO;
+ if (rw_log(lc, 1)) {
+ lc->log_dev_failed = 1;
+ return -EIO; /* Failed disk write */
+ }
return 0;
}

@@ -697,6 +691,7 @@
static int clog_resume(struct clog_tfr *tfr)
{
uint32_t i;
+ int commit_log = 0;
struct log_c *lc = get_log(tfr->uuid);
size_t size = lc->bitset_uint32_count * sizeof(uint32_t);

@@ -715,6 +710,7 @@
LOG_DBG("[%s] Master resume: reading disk log",
SHORT_UUID(lc->uuid));
lc->resume_override = 1000;
+ commit_log = 1;
break;
case 1:
LOG_ERROR("Error:: partial bit loading (just sync_bits)");
@@ -782,11 +778,14 @@
SHORT_UUID(lc->uuid), (unsigned long long)lc->sync_count);
lc->sync_search = 0;

- /*
- * We mark 'touched' as LOG_FLUSH so that only the master commits
- * the log via 'commit_log'
- */
- lc->touched = LOG_FLUSH;
+ if (commit_log && (lc->disk_fd >= 0)) {
+ tfr->error = write_log(lc);
+ if (tfr->error)
+ LOG_ERROR("Failed initial disk log write");
+ else
+ LOG_DBG("Disk log initialized");
+ lc->touched = 0;
+ }
out:
lc->state = LOG_RESUMED;
lc->recovery_halted = 0;
@@ -917,20 +916,34 @@
* @tfr
*
*/
-static int clog_flush(struct clog_tfr *tfr)
+static int clog_flush(struct clog_tfr *tfr, int server)
{
+ int r = 0;
struct log_c *lc = get_log(tfr->uuid);
-
+
if (!lc)
return -EINVAL;

- /*
- * Actual disk flush happens in 'commit_log()'
- * Clear LOG_CHANGED and set LOG_FLUSH
+ if (!lc->touched)
+ return 0;
+
+ /*
+ * Do the actual flushing of the log only
+ * if we are the server.
*/
- lc->touched = LOG_FLUSH;
+ if (server && (lc->disk_fd >= 0)) {
+ r = tfr->error = write_log(lc);
+ if (r) {
+ LOG_ERROR("Error writing to disk log");
+ return r;
+ }
+ LOG_DBG("[%s] Disk log written", SHORT_UUID(lc->uuid));
+ }
+
+ lc->touched = 0;

return 0;
+
}

/*
@@ -1179,14 +1192,18 @@

if (pkg->in_sync) {
if (log_test_bit(lc->sync_bits, pkg->region)) {
- LOG_PRINT(" Region already in-sync: %llu",
- (unsigned long long)pkg->region);
+ LOG_ERROR("[%s] Region already in-sync: region=%llu, seq=%llu, who=%u",
+ SHORT_UUID(lc->uuid),
+ (unsigned long long)pkg->region,
+ (unsigned long long)tfr->seq,
+ tfr->originator);
} else {
log_set_bit(lc, lc->sync_bits, pkg->region);
lc->sync_count++;
- LOG_DBG("[%s] sync_count = %llu, Region %llu marked in-sync by %u",
+ LOG_DBG("[%s] sync_count=%llu, Region %llu marked in-sync by %u, seq=%llu",
SHORT_UUID(lc->uuid), (unsigned long long)lc->sync_count,
- (unsigned long long)pkg->region, tfr->originator);
+ (unsigned long long)pkg->region, tfr->originator,
+ (unsigned long long)tfr->seq);
}
} else if (log_test_bit(lc->sync_bits, pkg->region)) {
lc->sync_count--;
@@ -1249,7 +1266,7 @@

tfr->data_size = sprintf(data, "3 clustered_disk %d:%d %c",
major(statbuf.st_rdev), minor(statbuf.st_rdev),
- 'A'); /* FIXME: detect dead device */
+ (lc->log_dev_failed) ? 'D' : 'A');

return 0;
}
@@ -1396,6 +1413,7 @@
/*
* do_request
* @tfr: the request
+ * @server: is this request performed by the server
*
* An inability to perform this function will return an error
* from this function. However, an inability to successfully
@@ -1403,7 +1421,7 @@
*
* Returns: 0 on success, -EXXX on error
*/
-int do_request(struct clog_tfr *tfr)
+int do_request(struct clog_tfr *tfr, int server)
{
int r;

@@ -1442,7 +1460,7 @@
r = clog_in_sync(tfr);
break;
case DM_CLOG_FLUSH:
- r = clog_flush(tfr);
+ r = clog_flush(tfr, server);
break;
case DM_CLOG_MARK_REGION:
r = clog_mark_region(tfr);
@@ -1489,52 +1507,6 @@
return 0;
}

-/*
- * commit_log
- * @tfr: commit log associated with this request
- *
- * This function will also set 'tfr->error' on failure
- *
- * Returns: 0 on success, -EXXX on error
- */
-int commit_log(struct clog_tfr *tfr)
-{
- int r = 0;
- struct log_c *lc;
-
- ENTER();
-
- lc = get_log(tfr->uuid);
-
- if (!lc) {
- LOG_DBG("No log found");
- tfr->error = -EINVAL;
- r = -EINVAL;
- goto out;
- }
-
- if (!(lc->touched & LOG_FLUSH))
- goto out;
-
- if (lc->disk_fd >= 0) {
- r = tfr->error = write_log(lc);
- if (r) {
- LOG_ERROR("Error writing to disk log");
- return -EIO;
- }
- LOG_DBG("[%s] Disk log written", SHORT_UUID(lc->uuid));
- }
-
- if (lc->touched & LOG_CHANGED)
- LOG_ERROR("WARNING: Log has changed during a flush operation");
-
- lc->touched &= ~LOG_FLUSH;
-
-out:
- EXIT();
- return 0;
-}
-
static void print_bits(char *buf, int size)
{
#ifdef DEBUG
@@ -1556,7 +1528,8 @@
#endif
}

-int store_bits(const char *uuid, const char *which, char **buf)
+/* int store_bits(const char *uuid, const char *which, char **buf)*/
+int push_state(const char *uuid, const char *which, char **buf)
{
int bitset_size;
struct log_c *lc;
@@ -1570,8 +1543,18 @@
return -EINVAL;
}

+ if (!strcmp(which, "recovering_region")) {
+ *buf = malloc(32); /* easily covers largest 64-bit int */
+ if (!*buf)
+ return -ENOMEM;
+ sprintf(*buf, "%llu", (unsigned long long)lc->recovering_region);
+
+ return 32;
+ }
+
bitset_size = lc->bitset_uint32_count * sizeof(*lc->clean_bits);
*buf = malloc(bitset_size);
+
if (!*buf) {
LOG_ERROR("store_bits: Unable to allocate memory");
return -ENOMEM;
@@ -1590,23 +1573,33 @@
return bitset_size;
}

-int load_bits(const char *uuid, const char *which, char *buf, int size)
+/*int load_bits(const char *uuid, const char *which, char *buf, int size)*/
+int pull_state(const char *uuid, const char *which, char *buf, int size)
{
int bitset_size;
struct log_c *lc;

if (!buf)
- LOG_ERROR("load_bits: buf == NULL");
+ LOG_ERROR("pull_state: buf == NULL");

lc = get_log(uuid);
if (!lc) {
- LOG_ERROR("load_bits: No log found for %s", uuid);
+ LOG_ERROR("pull_state: No log found for %s", uuid);
return -EINVAL;
}

+ if (!strncmp(which, "recovering_region", 17)) {
+ sscanf(buf, "%llu", (unsigned long long *)&lc->recovering_region);
+ LOG_DBG("[%s] recovering_region set to %llu",
+ SHORT_UUID(uuid),
+ (unsigned long long)lc->recovering_region);
+ return 0;
+ }
+
bitset_size = lc->bitset_uint32_count * sizeof(*lc->clean_bits);
if (bitset_size != size) {
- LOG_ERROR("load_bits: bad bitset_size");
+ LOG_ERROR("pull_state(%s): bad bitset_size (%d vs %d)",
+ which, size, bitset_size);
return -EINVAL;
}

--- cluster/cmirror/src/Attic/functions.h 2008/02/06 23:03:05 1.1.2.4
+++ cluster/cmirror/src/Attic/functions.h 2008/02/08 14:30:10 1.1.2.5
@@ -8,10 +8,16 @@

int local_resume(struct clog_tfr *tfr);
int cluster_postsuspend(char *);
-int do_request(struct clog_tfr *tfr);
-int commit_log(struct clog_tfr *tfr);
+
+int do_request(struct clog_tfr *tfr, int server);
+
+/*
int store_bits(const char *uuid, const char *which, char **buf);
int load_bits(const char *uuid, const char *which, char *buf, int size);
+*/
+int push_state(const char *uuid, const char *which, char **buf);
+int pull_state(const char *uuid, const char *which, char *buf, int size);
+
int log_get_state(struct clog_tfr *tfr);
int log_status(int);
#endif /* __CLOG_FUNCTIONS_DOT_H__ */
--- cluster/cmirror/src/Attic/local.c 2008/02/06 23:03:05 1.1.2.13
+++ cluster/cmirror/src/Attic/local.c 2008/02/08 14:30:10 1.1.2.14
@@ -166,7 +166,8 @@
case DM_CLOG_STATUS_INFO:
case DM_CLOG_STATUS_TABLE:
case DM_CLOG_PRESUSPEND:
- r = do_request(tfr);
+ /* We do not specify ourselves as server here */
+ r = do_request(tfr, 0);
if (r)
LOG_DBG("Returning failed request to kernel [%s]",
RQ_TYPE(tfr->request_type));
@@ -177,7 +178,8 @@

break;
case DM_CLOG_POSTSUSPEND:
- r = do_request(tfr);
+ /* We do not specify ourselves as server here */
+ r = do_request(tfr, 0);
if (r) {
LOG_DBG("Returning failed request to kernel [%s]",
RQ_TYPE(tfr->request_type));
@@ -212,6 +214,7 @@
LOG_ERROR("[%s] Unable to send %s to cluster: %s",
SHORT_UUID(tfr->uuid),
RQ_TYPE(tfr->request_type), strerror(-r));
+ tfr->data_size = 0;
tfr->error = r;
kernel_send(tfr);
} else {
 

Thread Tools




All times are GMT. The time now is 02:33 AM.

VBulletin, Copyright ©2000 - 2014, Jelsoft Enterprises Ltd.
Content Relevant URLs by vBSEO ©2007, Crawlability, Inc.
Copyright 2007 - 2008, www.linux-archive.org