Commit d24fbcda authored by Joel Becker's avatar Joel Becker Committed by Mark Fasheh

ocfs2: Negotiate locking protocol versions.

Currently, when ocfs2 nodes connect via TCP, they advertise their
compatibility level.  If the versions do not match, two nodes cannot speak
to each other and they disconnect. As a result, this provides no forward or
backwards compatibility.

This patch implements a simple protocol negotiation at the dlm level by
introducing a major/minor version number scheme for entities that
communicate.  Specifically, o2dlm has a major/minor version for interaction
with o2dlm on other nodes, and ocfs2 itself has a major/minor version for
interacting with the filesystem on other nodes.

This will allow rolling upgrades of ocfs2 clusters when changes to the
locking or network protocols can be done in a backwards compatible manner.
In those cases, only the minor number is changed and the negotatied protocol
minor is returned from dlm join. In the far less likely event that a
required protocol change makes backwards compatibility impossible, we simply
bump the major number.
Signed-off-by: default avatarJoel Becker <joel.becker@oracle.com>
Signed-off-by: default avatarMark Fasheh <mark.fasheh@oracle.com>
parent 3e6bdf47
...@@ -38,6 +38,15 @@ ...@@ -38,6 +38,15 @@
* locking semantics of the file system using the protocol. It should * locking semantics of the file system using the protocol. It should
* be somewhere else, I'm sure, but right now it isn't. * be somewhere else, I'm sure, but right now it isn't.
* *
* With version 11, we separate out the filesystem locking portion. The
* filesystem now has a major.minor version it negotiates. Version 11
* introduces this negotiation to the o2dlm protocol, and as such the
* version here in tcp_internal.h should not need to be bumped for
* filesystem locking changes.
*
* New in version 11
* - Negotiation of filesystem locking in the dlm join.
*
* New in version 10: * New in version 10:
* - Meta/data locks combined * - Meta/data locks combined
* *
...@@ -66,7 +75,7 @@ ...@@ -66,7 +75,7 @@
* - full 64 bit i_size in the metadata lock lvbs * - full 64 bit i_size in the metadata lock lvbs
* - introduction of "rw" lock and pushing meta/data locking down * - introduction of "rw" lock and pushing meta/data locking down
*/ */
#define O2NET_PROTOCOL_VERSION 10ULL #define O2NET_PROTOCOL_VERSION 11ULL
struct o2net_handshake { struct o2net_handshake {
__be64 protocol_version; __be64 protocol_version;
__be64 connector_id; __be64 connector_id;
......
...@@ -193,7 +193,12 @@ enum dlm_status dlmunlock(struct dlm_ctxt *dlm, ...@@ -193,7 +193,12 @@ enum dlm_status dlmunlock(struct dlm_ctxt *dlm,
dlm_astunlockfunc_t *unlockast, dlm_astunlockfunc_t *unlockast,
void *data); void *data);
struct dlm_ctxt * dlm_register_domain(const char *domain, u32 key); struct dlm_protocol_version {
u8 pv_major;
u8 pv_minor;
};
struct dlm_ctxt * dlm_register_domain(const char *domain, u32 key,
struct dlm_protocol_version *fs_proto);
void dlm_unregister_domain(struct dlm_ctxt *dlm); void dlm_unregister_domain(struct dlm_ctxt *dlm);
......
...@@ -142,6 +142,12 @@ struct dlm_ctxt ...@@ -142,6 +142,12 @@ struct dlm_ctxt
spinlock_t work_lock; spinlock_t work_lock;
struct list_head dlm_domain_handlers; struct list_head dlm_domain_handlers;
struct list_head dlm_eviction_callbacks; struct list_head dlm_eviction_callbacks;
/* The filesystem specifies this at domain registration. We
* cache it here to know what to tell other nodes. */
struct dlm_protocol_version fs_locking_proto;
/* This is the inter-dlm communication version */
struct dlm_protocol_version dlm_locking_proto;
}; };
static inline struct hlist_head *dlm_lockres_hash(struct dlm_ctxt *dlm, unsigned i) static inline struct hlist_head *dlm_lockres_hash(struct dlm_ctxt *dlm, unsigned i)
...@@ -589,10 +595,24 @@ struct dlm_proxy_ast ...@@ -589,10 +595,24 @@ struct dlm_proxy_ast
#define DLM_PROXY_AST_MAX_LEN (sizeof(struct dlm_proxy_ast)+DLM_LVB_LEN) #define DLM_PROXY_AST_MAX_LEN (sizeof(struct dlm_proxy_ast)+DLM_LVB_LEN)
#define DLM_MOD_KEY (0x666c6172) #define DLM_MOD_KEY (0x666c6172)
enum dlm_query_join_response { enum dlm_query_join_response_code {
JOIN_DISALLOW = 0, JOIN_DISALLOW = 0,
JOIN_OK, JOIN_OK,
JOIN_OK_NO_MAP, JOIN_OK_NO_MAP,
JOIN_PROTOCOL_MISMATCH,
};
union dlm_query_join_response {
u32 intval;
struct {
u8 code; /* Response code. dlm_minor and fs_minor
are only valid if this is JOIN_OK */
u8 dlm_minor; /* The minor version of the protocol the
dlm is speaking. */
u8 fs_minor; /* The minor version of the protocol the
filesystem is speaking. */
u8 reserved;
} packet;
}; };
struct dlm_lock_request struct dlm_lock_request
...@@ -633,6 +653,8 @@ struct dlm_query_join_request ...@@ -633,6 +653,8 @@ struct dlm_query_join_request
u8 node_idx; u8 node_idx;
u8 pad1[2]; u8 pad1[2];
u8 name_len; u8 name_len;
struct dlm_protocol_version dlm_proto;
struct dlm_protocol_version fs_proto;
u8 domain[O2NM_MAX_NAME_LEN]; u8 domain[O2NM_MAX_NAME_LEN];
u8 node_map[BITS_TO_BYTES(O2NM_MAX_NODES)]; u8 node_map[BITS_TO_BYTES(O2NM_MAX_NODES)];
}; };
......
...@@ -123,6 +123,17 @@ DEFINE_SPINLOCK(dlm_domain_lock); ...@@ -123,6 +123,17 @@ DEFINE_SPINLOCK(dlm_domain_lock);
LIST_HEAD(dlm_domains); LIST_HEAD(dlm_domains);
static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events); static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events);
/*
* The supported protocol version for DLM communication. Running domains
* will have a negotiated version with the same major number and a minor
* number equal or smaller. The dlm_ctxt->dlm_locking_proto field should
* be used to determine what a running domain is actually using.
*/
static const struct dlm_protocol_version dlm_protocol = {
.pv_major = 1,
.pv_minor = 0,
};
#define DLM_DOMAIN_BACKOFF_MS 200 #define DLM_DOMAIN_BACKOFF_MS 200
static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data, static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
...@@ -133,6 +144,8 @@ static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data, ...@@ -133,6 +144,8 @@ static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data); void **ret_data);
static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data, static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data); void **ret_data);
static int dlm_protocol_compare(struct dlm_protocol_version *existing,
struct dlm_protocol_version *request);
static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm); static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm);
...@@ -668,11 +681,45 @@ void dlm_unregister_domain(struct dlm_ctxt *dlm) ...@@ -668,11 +681,45 @@ void dlm_unregister_domain(struct dlm_ctxt *dlm)
} }
EXPORT_SYMBOL_GPL(dlm_unregister_domain); EXPORT_SYMBOL_GPL(dlm_unregister_domain);
static int dlm_query_join_proto_check(char *proto_type, int node,
struct dlm_protocol_version *ours,
struct dlm_protocol_version *request)
{
int rc;
struct dlm_protocol_version proto = *request;
if (!dlm_protocol_compare(ours, &proto)) {
mlog(0,
"node %u wanted to join with %s locking protocol "
"%u.%u, we respond with %u.%u\n",
node, proto_type,
request->pv_major,
request->pv_minor,
proto.pv_major, proto.pv_minor);
request->pv_minor = proto.pv_minor;
rc = 0;
} else {
mlog(ML_NOTICE,
"Node %u wanted to join with %s locking "
"protocol %u.%u, but we have %u.%u, disallowing\n",
node, proto_type,
request->pv_major,
request->pv_minor,
ours->pv_major,
ours->pv_minor);
rc = 1;
}
return rc;
}
static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data, static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data) void **ret_data)
{ {
struct dlm_query_join_request *query; struct dlm_query_join_request *query;
enum dlm_query_join_response response; union dlm_query_join_response response = {
.packet.code = JOIN_DISALLOW,
};
struct dlm_ctxt *dlm = NULL; struct dlm_ctxt *dlm = NULL;
u8 nodenum; u8 nodenum;
...@@ -690,11 +737,11 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data, ...@@ -690,11 +737,11 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
mlog(0, "node %u is not in our live map yet\n", mlog(0, "node %u is not in our live map yet\n",
query->node_idx); query->node_idx);
response = JOIN_DISALLOW; response.packet.code = JOIN_DISALLOW;
goto respond; goto respond;
} }
response = JOIN_OK_NO_MAP; response.packet.code = JOIN_OK_NO_MAP;
spin_lock(&dlm_domain_lock); spin_lock(&dlm_domain_lock);
dlm = __dlm_lookup_domain_full(query->domain, query->name_len); dlm = __dlm_lookup_domain_full(query->domain, query->name_len);
...@@ -713,7 +760,7 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data, ...@@ -713,7 +760,7 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
mlog(0, "disallow join as node %u does not " mlog(0, "disallow join as node %u does not "
"have node %u in its nodemap\n", "have node %u in its nodemap\n",
query->node_idx, nodenum); query->node_idx, nodenum);
response = JOIN_DISALLOW; response.packet.code = JOIN_DISALLOW;
goto unlock_respond; goto unlock_respond;
} }
} }
...@@ -733,30 +780,48 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data, ...@@ -733,30 +780,48 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
/*If this is a brand new context and we /*If this is a brand new context and we
* haven't started our join process yet, then * haven't started our join process yet, then
* the other node won the race. */ * the other node won the race. */
response = JOIN_OK_NO_MAP; response.packet.code = JOIN_OK_NO_MAP;
} else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) { } else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
/* Disallow parallel joins. */ /* Disallow parallel joins. */
response = JOIN_DISALLOW; response.packet.code = JOIN_DISALLOW;
} else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) { } else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) {
mlog(0, "node %u trying to join, but recovery " mlog(0, "node %u trying to join, but recovery "
"is ongoing.\n", bit); "is ongoing.\n", bit);
response = JOIN_DISALLOW; response.packet.code = JOIN_DISALLOW;
} else if (test_bit(bit, dlm->recovery_map)) { } else if (test_bit(bit, dlm->recovery_map)) {
mlog(0, "node %u trying to join, but it " mlog(0, "node %u trying to join, but it "
"still needs recovery.\n", bit); "still needs recovery.\n", bit);
response = JOIN_DISALLOW; response.packet.code = JOIN_DISALLOW;
} else if (test_bit(bit, dlm->domain_map)) { } else if (test_bit(bit, dlm->domain_map)) {
mlog(0, "node %u trying to join, but it " mlog(0, "node %u trying to join, but it "
"is still in the domain! needs recovery?\n", "is still in the domain! needs recovery?\n",
bit); bit);
response = JOIN_DISALLOW; response.packet.code = JOIN_DISALLOW;
} else { } else {
/* Alright we're fully a part of this domain /* Alright we're fully a part of this domain
* so we keep some state as to who's joining * so we keep some state as to who's joining
* and indicate to him that needs to be fixed * and indicate to him that needs to be fixed
* up. */ * up. */
response = JOIN_OK;
__dlm_set_joining_node(dlm, query->node_idx); /* Make sure we speak compatible locking protocols. */
if (dlm_query_join_proto_check("DLM", bit,
&dlm->dlm_locking_proto,
&query->dlm_proto)) {
response.packet.code =
JOIN_PROTOCOL_MISMATCH;
} else if (dlm_query_join_proto_check("fs", bit,
&dlm->fs_locking_proto,
&query->fs_proto)) {
response.packet.code =
JOIN_PROTOCOL_MISMATCH;
} else {
response.packet.dlm_minor =
query->dlm_proto.pv_minor;
response.packet.fs_minor =
query->fs_proto.pv_minor;
response.packet.code = JOIN_OK;
__dlm_set_joining_node(dlm, query->node_idx);
}
} }
spin_unlock(&dlm->spinlock); spin_unlock(&dlm->spinlock);
...@@ -765,9 +830,9 @@ unlock_respond: ...@@ -765,9 +830,9 @@ unlock_respond:
spin_unlock(&dlm_domain_lock); spin_unlock(&dlm_domain_lock);
respond: respond:
mlog(0, "We respond with %u\n", response); mlog(0, "We respond with %u\n", response.packet.code);
return response; return response.intval;
} }
static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data, static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
...@@ -899,10 +964,11 @@ static int dlm_send_join_cancels(struct dlm_ctxt *dlm, ...@@ -899,10 +964,11 @@ static int dlm_send_join_cancels(struct dlm_ctxt *dlm,
static int dlm_request_join(struct dlm_ctxt *dlm, static int dlm_request_join(struct dlm_ctxt *dlm,
int node, int node,
enum dlm_query_join_response *response) enum dlm_query_join_response_code *response)
{ {
int status, retval; int status;
struct dlm_query_join_request join_msg; struct dlm_query_join_request join_msg;
union dlm_query_join_response join_resp;
mlog(0, "querying node %d\n", node); mlog(0, "querying node %d\n", node);
...@@ -910,12 +976,15 @@ static int dlm_request_join(struct dlm_ctxt *dlm, ...@@ -910,12 +976,15 @@ static int dlm_request_join(struct dlm_ctxt *dlm,
join_msg.node_idx = dlm->node_num; join_msg.node_idx = dlm->node_num;
join_msg.name_len = strlen(dlm->name); join_msg.name_len = strlen(dlm->name);
memcpy(join_msg.domain, dlm->name, join_msg.name_len); memcpy(join_msg.domain, dlm->name, join_msg.name_len);
join_msg.dlm_proto = dlm->dlm_locking_proto;
join_msg.fs_proto = dlm->fs_locking_proto;
/* copy live node map to join message */ /* copy live node map to join message */
byte_copymap(join_msg.node_map, dlm->live_nodes_map, O2NM_MAX_NODES); byte_copymap(join_msg.node_map, dlm->live_nodes_map, O2NM_MAX_NODES);
status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg, status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg,
sizeof(join_msg), node, &retval); sizeof(join_msg), node,
&join_resp.intval);
if (status < 0 && status != -ENOPROTOOPT) { if (status < 0 && status != -ENOPROTOOPT) {
mlog_errno(status); mlog_errno(status);
goto bail; goto bail;
...@@ -928,14 +997,41 @@ static int dlm_request_join(struct dlm_ctxt *dlm, ...@@ -928,14 +997,41 @@ static int dlm_request_join(struct dlm_ctxt *dlm,
if (status == -ENOPROTOOPT) { if (status == -ENOPROTOOPT) {
status = 0; status = 0;
*response = JOIN_OK_NO_MAP; *response = JOIN_OK_NO_MAP;
} else if (retval == JOIN_DISALLOW || } else if (join_resp.packet.code == JOIN_DISALLOW ||
retval == JOIN_OK || join_resp.packet.code == JOIN_OK_NO_MAP) {
retval == JOIN_OK_NO_MAP) { *response = join_resp.packet.code;
*response = retval; } else if (join_resp.packet.code == JOIN_PROTOCOL_MISMATCH) {
mlog(ML_NOTICE,
"This node requested DLM locking protocol %u.%u and "
"filesystem locking protocol %u.%u. At least one of "
"the protocol versions on node %d is not compatible, "
"disconnecting\n",
dlm->dlm_locking_proto.pv_major,
dlm->dlm_locking_proto.pv_minor,
dlm->fs_locking_proto.pv_major,
dlm->fs_locking_proto.pv_minor,
node);
status = -EPROTO;
*response = join_resp.packet.code;
} else if (join_resp.packet.code == JOIN_OK) {
*response = join_resp.packet.code;
/* Use the same locking protocol as the remote node */
dlm->dlm_locking_proto.pv_minor =
join_resp.packet.dlm_minor;
dlm->fs_locking_proto.pv_minor =
join_resp.packet.fs_minor;
mlog(0,
"Node %d responds JOIN_OK with DLM locking protocol "
"%u.%u and fs locking protocol %u.%u\n",
node,
dlm->dlm_locking_proto.pv_major,
dlm->dlm_locking_proto.pv_minor,
dlm->fs_locking_proto.pv_major,
dlm->fs_locking_proto.pv_minor);
} else { } else {
status = -EINVAL; status = -EINVAL;
mlog(ML_ERROR, "invalid response %d from node %u\n", retval, mlog(ML_ERROR, "invalid response %d from node %u\n",
node); join_resp.packet.code, node);
} }
mlog(0, "status %d, node %d response is %d\n", status, node, mlog(0, "status %d, node %d response is %d\n", status, node,
...@@ -1008,7 +1104,7 @@ struct domain_join_ctxt { ...@@ -1008,7 +1104,7 @@ struct domain_join_ctxt {
static int dlm_should_restart_join(struct dlm_ctxt *dlm, static int dlm_should_restart_join(struct dlm_ctxt *dlm,
struct domain_join_ctxt *ctxt, struct domain_join_ctxt *ctxt,
enum dlm_query_join_response response) enum dlm_query_join_response_code response)
{ {
int ret; int ret;
...@@ -1034,7 +1130,7 @@ static int dlm_try_to_join_domain(struct dlm_ctxt *dlm) ...@@ -1034,7 +1130,7 @@ static int dlm_try_to_join_domain(struct dlm_ctxt *dlm)
{ {
int status = 0, tmpstat, node; int status = 0, tmpstat, node;
struct domain_join_ctxt *ctxt; struct domain_join_ctxt *ctxt;
enum dlm_query_join_response response = JOIN_DISALLOW; enum dlm_query_join_response_code response = JOIN_DISALLOW;
mlog_entry("%p", dlm); mlog_entry("%p", dlm);
...@@ -1450,10 +1546,38 @@ leave: ...@@ -1450,10 +1546,38 @@ leave:
} }
/* /*
* dlm_register_domain: one-time setup per "domain" * Compare a requested locking protocol version against the current one.
*
* If the major numbers are different, they are incompatible.
* If the current minor is greater than the request, they are incompatible.
* If the current minor is less than or equal to the request, they are
* compatible, and the requester should run at the current minor version.
*/
static int dlm_protocol_compare(struct dlm_protocol_version *existing,
struct dlm_protocol_version *request)
{
if (existing->pv_major != request->pv_major)
return 1;
if (existing->pv_minor > request->pv_minor)
return 1;
if (existing->pv_minor < request->pv_minor)
request->pv_minor = existing->pv_minor;
return 0;
}
/*
* dlm_register_domain: one-time setup per "domain".
*
* The filesystem passes in the requested locking version via proto.
* If registration was successful, proto will contain the negotiated
* locking protocol.
*/ */
struct dlm_ctxt * dlm_register_domain(const char *domain, struct dlm_ctxt * dlm_register_domain(const char *domain,
u32 key) u32 key,
struct dlm_protocol_version *fs_proto)
{ {
int ret; int ret;
struct dlm_ctxt *dlm = NULL; struct dlm_ctxt *dlm = NULL;
...@@ -1496,6 +1620,15 @@ retry: ...@@ -1496,6 +1620,15 @@ retry:
goto retry; goto retry;
} }
if (dlm_protocol_compare(&dlm->fs_locking_proto, fs_proto)) {
mlog(ML_ERROR,
"Requested locking protocol version is not "
"compatible with already registered domain "
"\"%s\"\n", domain);
ret = -EPROTO;
goto leave;
}
__dlm_get(dlm); __dlm_get(dlm);
dlm->num_joins++; dlm->num_joins++;
...@@ -1526,6 +1659,13 @@ retry: ...@@ -1526,6 +1659,13 @@ retry:
list_add_tail(&dlm->list, &dlm_domains); list_add_tail(&dlm->list, &dlm_domains);
spin_unlock(&dlm_domain_lock); spin_unlock(&dlm_domain_lock);
/*
* Pass the locking protocol version into the join. If the join
* succeeds, it will have the negotiated protocol set.
*/
dlm->dlm_locking_proto = dlm_protocol;
dlm->fs_locking_proto = *fs_proto;
ret = dlm_join_domain(dlm); ret = dlm_join_domain(dlm);
if (ret) { if (ret) {
mlog_errno(ret); mlog_errno(ret);
...@@ -1533,6 +1673,9 @@ retry: ...@@ -1533,6 +1673,9 @@ retry:
goto leave; goto leave;
} }
/* Tell the caller what locking protocol we negotiated */
*fs_proto = dlm->fs_locking_proto;
ret = 0; ret = 0;
leave: leave:
if (new_ctxt) if (new_ctxt)
......
...@@ -60,6 +60,8 @@ ...@@ -60,6 +60,8 @@
#define MLOG_MASK_PREFIX ML_DLMFS #define MLOG_MASK_PREFIX ML_DLMFS
#include "cluster/masklog.h" #include "cluster/masklog.h"
#include "ocfs2_lockingver.h"
static const struct super_operations dlmfs_ops; static const struct super_operations dlmfs_ops;
static const struct file_operations dlmfs_file_operations; static const struct file_operations dlmfs_file_operations;
static const struct inode_operations dlmfs_dir_inode_operations; static const struct inode_operations dlmfs_dir_inode_operations;
...@@ -69,6 +71,16 @@ static struct kmem_cache *dlmfs_inode_cache; ...@@ -69,6 +71,16 @@ static struct kmem_cache *dlmfs_inode_cache;
struct workqueue_struct *user_dlm_worker; struct workqueue_struct *user_dlm_worker;
/*
* This is the userdlmfs locking protocol version.
*
* See fs/ocfs2/dlmglue.c for more details on locking versions.
*/
static const struct dlm_protocol_version user_locking_protocol = {
.pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
.pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
};
/* /*
* decodes a set of open flags into a valid lock level and a set of flags. * decodes a set of open flags into a valid lock level and a set of flags.
* returns < 0 if we have invalid flags * returns < 0 if we have invalid flags
...@@ -416,6 +428,7 @@ static int dlmfs_mkdir(struct inode * dir, ...@@ -416,6 +428,7 @@ static int dlmfs_mkdir(struct inode * dir,
struct qstr *domain = &dentry->d_name; struct qstr *domain = &dentry->d_name;
struct dlmfs_inode_private *ip; struct dlmfs_inode_private *ip;
struct dlm_ctxt *dlm; struct dlm_ctxt *dlm;
struct dlm_protocol_version proto = user_locking_protocol;
mlog(0, "mkdir %.*s\n", domain->len, domain->name); mlog(0, "mkdir %.*s\n", domain->len, domain->name);
...@@ -435,7 +448,7 @@ static int dlmfs_mkdir(struct inode * dir, ...@@ -435,7 +448,7 @@ static int dlmfs_mkdir(struct inode * dir,
ip = DLMFS_I(inode); ip = DLMFS_I(inode);
dlm = user_dlm_register_context(domain); dlm = user_dlm_register_context(domain, &proto);
if (IS_ERR(dlm)) { if (IS_ERR(dlm)) {
status = PTR_ERR(dlm); status = PTR_ERR(dlm);
mlog(ML_ERROR, "Error %d could not register domain \"%.*s\"\n", mlog(ML_ERROR, "Error %d could not register domain \"%.*s\"\n",
......
...@@ -645,7 +645,8 @@ bail: ...@@ -645,7 +645,8 @@ bail:
return status; return status;
} }
struct dlm_ctxt *user_dlm_register_context(struct qstr *name) struct dlm_ctxt *user_dlm_register_context(struct qstr *name,
struct dlm_protocol_version *proto)
{ {
struct dlm_ctxt *dlm; struct dlm_ctxt *dlm;
u32 dlm_key; u32 dlm_key;
...@@ -661,7 +662,7 @@ struct dlm_ctxt *user_dlm_register_context(struct qstr *name) ...@@ -661,7 +662,7 @@ struct dlm_ctxt *user_dlm_register_context(struct qstr *name)
snprintf(domain, name->len + 1, "%.*s", name->len, name->name); snprintf(domain, name->len + 1, "%.*s", name->len, name->name);
dlm = dlm_register_domain(domain, dlm_key); dlm = dlm_register_domain(domain, dlm_key, proto);
if (IS_ERR(dlm)) if (IS_ERR(dlm))
mlog_errno(PTR_ERR(dlm)); mlog_errno(PTR_ERR(dlm));
......
...@@ -83,7 +83,8 @@ void user_dlm_write_lvb(struct inode *inode, ...@@ -83,7 +83,8 @@ void user_dlm_write_lvb(struct inode *inode,
void user_dlm_read_lvb(struct inode *inode, void user_dlm_read_lvb(struct inode *inode,
char *val, char *val,
unsigned int len); unsigned int len);
struct dlm_ctxt *user_dlm_register_context(struct qstr *name); struct dlm_ctxt *user_dlm_register_context(struct qstr *name,
struct dlm_protocol_version *proto);
void user_dlm_unregister_context(struct dlm_ctxt *dlm); void user_dlm_unregister_context(struct dlm_ctxt *dlm);
struct dlmfs_inode_private { struct dlmfs_inode_private {
......
...@@ -43,6 +43,7 @@ ...@@ -43,6 +43,7 @@
#include <cluster/masklog.h> #include <cluster/masklog.h>
#include "ocfs2.h" #include "ocfs2.h"
#include "ocfs2_lockingver.h"
#include "alloc.h" #include "alloc.h"
#include "dcache.h" #include "dcache.h"
...@@ -258,6 +259,31 @@ static struct ocfs2_lock_res_ops ocfs2_flock_lops = { ...@@ -258,6 +259,31 @@ static struct ocfs2_lock_res_ops ocfs2_flock_lops = {
.flags = 0, .flags = 0,
}; };
/*
* This is the filesystem locking protocol version.
*
* Whenever the filesystem does new things with locks (adds or removes a
* lock, orders them differently, does different things underneath a lock),
* the version must be changed. The protocol is negotiated when joining
* the dlm domain. A node may join the domain if its major version is
* identical to all other nodes and its minor version is greater than
* or equal to all other nodes. When its minor version is greater than
* the other nodes, it will run at the minor version specified by the
* other nodes.
*
* If a locking change is made that will not be compatible with older
* versions, the major number must be increased and the minor version set
* to zero. If a change merely adds a behavior that can be disabled when
* speaking to older versions, the minor version must be increased. If a
* change adds a fully backwards compatible change (eg, LVB changes that
* are just ignored by older versions), the version does not need to be
* updated.
*/
const struct dlm_protocol_version ocfs2_locking_protocol = {
.pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
.pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
};
static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres) static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
{ {
return lockres->l_type == OCFS2_LOCK_TYPE_META || return lockres->l_type == OCFS2_LOCK_TYPE_META ||
...@@ -2506,7 +2532,8 @@ int ocfs2_dlm_init(struct ocfs2_super *osb) ...@@ -2506,7 +2532,8 @@ int ocfs2_dlm_init(struct ocfs2_super *osb)
dlm_key = crc32_le(0, osb->uuid_str, strlen(osb->uuid_str)); dlm_key = crc32_le(0, osb->uuid_str, strlen(osb->uuid_str));
/* for now, uuid == domain */ /* for now, uuid == domain */
dlm = dlm_register_domain(osb->uuid_str, dlm_key); dlm = dlm_register_domain(osb->uuid_str, dlm_key,
&osb->osb_locking_proto);
if (IS_ERR(dlm)) { if (IS_ERR(dlm)) {
status = PTR_ERR(dlm); status = PTR_ERR(dlm);
mlog_errno(status); mlog_errno(status);
......
...@@ -116,4 +116,5 @@ void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb); ...@@ -116,4 +116,5 @@ void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb);
struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void); struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void);
void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug); void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug);
extern const struct dlm_protocol_version ocfs2_locking_protocol;
#endif /* DLMGLUE_H */ #endif /* DLMGLUE_H */
...@@ -251,6 +251,7 @@ struct ocfs2_super ...@@ -251,6 +251,7 @@ struct ocfs2_super
struct ocfs2_lock_res osb_rename_lockres; struct ocfs2_lock_res osb_rename_lockres;
struct dlm_eviction_cb osb_eviction_cb; struct dlm_eviction_cb osb_eviction_cb;
struct ocfs2_dlm_debug *osb_dlm_debug; struct ocfs2_dlm_debug *osb_dlm_debug;
struct dlm_protocol_version osb_locking_proto;
struct dentry *osb_debug_root; struct dentry *osb_debug_root;
......
/* -*- mode: c; c-basic-offset: 8; -*-
* vim: noexpandtab sw=8 ts=8 sts=0:
*
* ocfs2_lockingver.h
*
* Defines OCFS2 Locking version values.
*
* Copyright (C) 2008 Oracle. All rights reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License, version 2, as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*/
#ifndef OCFS2_LOCKINGVER_H
#define OCFS2_LOCKINGVER_H
/*
* The protocol version for ocfs2 cluster locking. See dlmglue.c for
* more details.
*/
#define OCFS2_LOCKING_PROTOCOL_MAJOR 1
#define OCFS2_LOCKING_PROTOCOL_MINOR 0
#endif /* OCFS2_LOCKINGVER_H */
...@@ -1355,6 +1355,7 @@ static int ocfs2_initialize_super(struct super_block *sb, ...@@ -1355,6 +1355,7 @@ static int ocfs2_initialize_super(struct super_block *sb,
sb->s_fs_info = osb; sb->s_fs_info = osb;
sb->s_op = &ocfs2_sops; sb->s_op = &ocfs2_sops;
sb->s_export_op = &ocfs2_export_ops; sb->s_export_op = &ocfs2_export_ops;
osb->osb_locking_proto = ocfs2_locking_protocol;
sb->s_time_gran = 1; sb->s_time_gran = 1;
sb->s_flags |= MS_NOATIME; sb->s_flags |= MS_NOATIME;
/* this is needed to support O_LARGEFILE */ /* this is needed to support O_LARGEFILE */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment