Commit 422d2cb8 authored by Yehuda Sadeh's avatar Yehuda Sadeh Committed by Sage Weil

ceph: reset osd after relevant messages timed out

This simplifies the process of timing out messages. We
keep lru of current messages that are in flight. If a
timeout has passed, we reset the osd connection, so that
messages will be retransmitted.  This is a failsafe in case
we hit some sort of problem sending out message to the OSD.
Normally, we'll get notification via an updated osdmap if
there are problems.

If a request is older than the keepalive timeout, send a
keepalive to ensure we detect any breaks in the TCP connection.
Signed-off-by: default avatarYehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: default avatarSage Weil <sage@newdream.net>
parent e9964c10
...@@ -17,6 +17,8 @@ ...@@ -17,6 +17,8 @@
#define OSD_OPREPLY_FRONT_LEN 512 #define OSD_OPREPLY_FRONT_LEN 512
const static struct ceph_connection_operations osd_con_ops; const static struct ceph_connection_operations osd_con_ops;
static int __kick_requests(struct ceph_osd_client *osdc,
struct ceph_osd *kickosd);
static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd); static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
...@@ -339,6 +341,7 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc) ...@@ -339,6 +341,7 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
osd->o_con.ops = &osd_con_ops; osd->o_con.ops = &osd_con_ops;
osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD; osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
INIT_LIST_HEAD(&osd->o_keepalive_item);
return osd; return osd;
} }
...@@ -461,6 +464,16 @@ static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o) ...@@ -461,6 +464,16 @@ static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
return NULL; return NULL;
} }
static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
{
schedule_delayed_work(&osdc->timeout_work,
osdc->client->mount_args->osd_keepalive_timeout * HZ);
}
static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
{
cancel_delayed_work(&osdc->timeout_work);
}
/* /*
* Register request, assign tid. If this is the first request, set up * Register request, assign tid. If this is the first request, set up
...@@ -472,21 +485,16 @@ static void register_request(struct ceph_osd_client *osdc, ...@@ -472,21 +485,16 @@ static void register_request(struct ceph_osd_client *osdc,
mutex_lock(&osdc->request_mutex); mutex_lock(&osdc->request_mutex);
req->r_tid = ++osdc->last_tid; req->r_tid = ++osdc->last_tid;
req->r_request->hdr.tid = cpu_to_le64(req->r_tid); req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
INIT_LIST_HEAD(&req->r_req_lru_item);
dout("register_request %p tid %lld\n", req, req->r_tid); dout("register_request %p tid %lld\n", req, req->r_tid);
__insert_request(osdc, req); __insert_request(osdc, req);
ceph_osdc_get_request(req); ceph_osdc_get_request(req);
osdc->num_requests++; osdc->num_requests++;
req->r_timeout_stamp =
jiffies + osdc->client->mount_args->osd_timeout*HZ;
if (osdc->num_requests == 1) { if (osdc->num_requests == 1) {
osdc->timeout_tid = req->r_tid; dout(" first request, scheduling timeout\n");
dout(" timeout on tid %llu at %lu\n", req->r_tid, __schedule_osd_timeout(osdc);
req->r_timeout_stamp);
schedule_delayed_work(&osdc->timeout_work,
round_jiffies_relative(req->r_timeout_stamp - jiffies));
} }
mutex_unlock(&osdc->request_mutex); mutex_unlock(&osdc->request_mutex);
} }
...@@ -513,21 +521,10 @@ static void __unregister_request(struct ceph_osd_client *osdc, ...@@ -513,21 +521,10 @@ static void __unregister_request(struct ceph_osd_client *osdc,
ceph_osdc_put_request(req); ceph_osdc_put_request(req);
if (req->r_tid == osdc->timeout_tid) { list_del_init(&req->r_req_lru_item);
if (osdc->num_requests == 0) { if (osdc->num_requests == 0) {
dout("no requests, canceling timeout\n"); dout(" no requests, canceling timeout\n");
osdc->timeout_tid = 0; __cancel_osd_timeout(osdc);
cancel_delayed_work(&osdc->timeout_work);
} else {
req = rb_entry(rb_first(&osdc->requests),
struct ceph_osd_request, r_node);
osdc->timeout_tid = req->r_tid;
dout("rescheduled timeout on tid %llu at %lu\n",
req->r_tid, req->r_timeout_stamp);
schedule_delayed_work(&osdc->timeout_work,
round_jiffies_relative(req->r_timeout_stamp -
jiffies));
}
} }
} }
...@@ -540,6 +537,7 @@ static void __cancel_request(struct ceph_osd_request *req) ...@@ -540,6 +537,7 @@ static void __cancel_request(struct ceph_osd_request *req)
ceph_con_revoke(&req->r_osd->o_con, req->r_request); ceph_con_revoke(&req->r_osd->o_con, req->r_request);
req->r_sent = 0; req->r_sent = 0;
} }
list_del_init(&req->r_req_lru_item);
} }
/* /*
...@@ -635,7 +633,8 @@ static int __send_request(struct ceph_osd_client *osdc, ...@@ -635,7 +633,8 @@ static int __send_request(struct ceph_osd_client *osdc,
reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */ reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */
reqhead->reassert_version = req->r_reassert_version; reqhead->reassert_version = req->r_reassert_version;
req->r_timeout_stamp = jiffies+osdc->client->mount_args->osd_timeout*HZ; req->r_sent_stamp = jiffies;
list_move_tail(&osdc->req_lru, &req->r_req_lru_item);
ceph_msg_get(req->r_request); /* send consumes a ref */ ceph_msg_get(req->r_request); /* send consumes a ref */
ceph_con_send(&req->r_osd->o_con, req->r_request); ceph_con_send(&req->r_osd->o_con, req->r_request);
...@@ -656,11 +655,14 @@ static void handle_timeout(struct work_struct *work) ...@@ -656,11 +655,14 @@ static void handle_timeout(struct work_struct *work)
{ {
struct ceph_osd_client *osdc = struct ceph_osd_client *osdc =
container_of(work, struct ceph_osd_client, timeout_work.work); container_of(work, struct ceph_osd_client, timeout_work.work);
struct ceph_osd_request *req; struct ceph_osd_request *req, *last_req = NULL;
struct ceph_osd *osd; struct ceph_osd *osd;
unsigned long timeout = osdc->client->mount_args->osd_timeout * HZ; unsigned long timeout = osdc->client->mount_args->osd_timeout * HZ;
unsigned long next_timeout = timeout + jiffies; unsigned long keepalive =
osdc->client->mount_args->osd_keepalive_timeout * HZ;
unsigned long last_sent = 0;
struct rb_node *p; struct rb_node *p;
struct list_head slow_osds;
dout("timeout\n"); dout("timeout\n");
down_read(&osdc->map_sem); down_read(&osdc->map_sem);
...@@ -683,25 +685,56 @@ static void handle_timeout(struct work_struct *work) ...@@ -683,25 +685,56 @@ static void handle_timeout(struct work_struct *work)
continue; continue;
} }
} }
for (p = rb_first(&osdc->osds); p; p = rb_next(p)) {
osd = rb_entry(p, struct ceph_osd, o_node);
if (list_empty(&osd->o_requests))
continue;
req = list_first_entry(&osd->o_requests,
struct ceph_osd_request, r_osd_item);
if (time_before(jiffies, req->r_timeout_stamp))
continue;
dout(" tid %llu (at least) timed out on osd%d\n", /*
* reset osds that appear to be _really_ unresponsive. this
* is a failsafe measure.. we really shouldn't be getting to
* this point if the system is working properly. the monitors
* should mark the osd as failed and we should find out about
* it from an updated osd map.
*/
while (!list_empty(&osdc->req_lru)) {
req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
r_req_lru_item);
if (time_before(jiffies, req->r_sent_stamp + timeout))
break;
BUG_ON(req == last_req && req->r_sent_stamp == last_sent);
last_req = req;
last_sent = req->r_sent_stamp;
osd = req->r_osd;
BUG_ON(!osd);
pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
req->r_tid, osd->o_osd); req->r_tid, osd->o_osd);
req->r_timeout_stamp = next_timeout; __kick_requests(osdc, osd);
ceph_con_keepalive(&osd->o_con);
} }
if (osdc->timeout_tid) /*
schedule_delayed_work(&osdc->timeout_work, * ping osds that are a bit slow. this ensures that if there
round_jiffies_relative(timeout)); * is a break in the TCP connection we will notice, and reopen
* a connection with that osd (from the fault callback).
*/
INIT_LIST_HEAD(&slow_osds);
list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
if (time_before(jiffies, req->r_sent_stamp + keepalive))
break;
osd = req->r_osd;
BUG_ON(!osd);
dout(" tid %llu is slow, will send keepalive on osd%d\n",
req->r_tid, osd->o_osd);
list_move_tail(&osd->o_keepalive_item, &slow_osds);
}
while (!list_empty(&slow_osds)) {
osd = list_entry(slow_osds.next, struct ceph_osd,
o_keepalive_item);
list_del_init(&osd->o_keepalive_item);
ceph_con_keepalive(&osd->o_con);
}
__schedule_osd_timeout(osdc);
mutex_unlock(&osdc->request_mutex); mutex_unlock(&osdc->request_mutex);
up_read(&osdc->map_sem); up_read(&osdc->map_sem);
...@@ -819,18 +852,7 @@ bad: ...@@ -819,18 +852,7 @@ bad:
} }
/* static int __kick_requests(struct ceph_osd_client *osdc,
* Resubmit osd requests whose osd or osd address has changed. Request
* a new osd map if osds are down, or we are otherwise unable to determine
* how to direct a request.
*
* Close connections to down osds.
*
* If @who is specified, resubmit requests for that specific osd.
*
* Caller should hold map_sem for read and request_mutex.
*/
static void kick_requests(struct ceph_osd_client *osdc,
struct ceph_osd *kickosd) struct ceph_osd *kickosd)
{ {
struct ceph_osd_request *req; struct ceph_osd_request *req;
...@@ -839,7 +861,6 @@ static void kick_requests(struct ceph_osd_client *osdc, ...@@ -839,7 +861,6 @@ static void kick_requests(struct ceph_osd_client *osdc,
int err; int err;
dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1); dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1);
mutex_lock(&osdc->request_mutex);
if (kickosd) { if (kickosd) {
__reset_osd(osdc, kickosd); __reset_osd(osdc, kickosd);
} else { } else {
...@@ -900,14 +921,36 @@ kick: ...@@ -900,14 +921,36 @@ kick:
req->r_resend = true; req->r_resend = true;
} }
} }
return needmap;
}
/*
* Resubmit osd requests whose osd or osd address has changed. Request
* a new osd map if osds are down, or we are otherwise unable to determine
* how to direct a request.
*
* Close connections to down osds.
*
* If @who is specified, resubmit requests for that specific osd.
*
* Caller should hold map_sem for read and request_mutex.
*/
static void kick_requests(struct ceph_osd_client *osdc,
struct ceph_osd *kickosd)
{
int needmap;
mutex_lock(&osdc->request_mutex);
needmap = __kick_requests(osdc, kickosd);
mutex_unlock(&osdc->request_mutex); mutex_unlock(&osdc->request_mutex);
if (needmap) { if (needmap) {
dout("%d requests for down osds, need new map\n", needmap); dout("%d requests for down osds, need new map\n", needmap);
ceph_monc_request_next_osdmap(&osdc->client->monc); ceph_monc_request_next_osdmap(&osdc->client->monc);
} }
}
}
/* /*
* Process updated osd map. * Process updated osd map.
* *
...@@ -1164,11 +1207,11 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) ...@@ -1164,11 +1207,11 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
init_completion(&osdc->map_waiters); init_completion(&osdc->map_waiters);
osdc->last_requested_map = 0; osdc->last_requested_map = 0;
mutex_init(&osdc->request_mutex); mutex_init(&osdc->request_mutex);
osdc->timeout_tid = 0;
osdc->last_tid = 0; osdc->last_tid = 0;
osdc->osds = RB_ROOT; osdc->osds = RB_ROOT;
INIT_LIST_HEAD(&osdc->osd_lru); INIT_LIST_HEAD(&osdc->osd_lru);
osdc->requests = RB_ROOT; osdc->requests = RB_ROOT;
INIT_LIST_HEAD(&osdc->req_lru);
osdc->num_requests = 0; osdc->num_requests = 0;
INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
......
...@@ -36,12 +36,15 @@ struct ceph_osd { ...@@ -36,12 +36,15 @@ struct ceph_osd {
void *o_authorizer_buf, *o_authorizer_reply_buf; void *o_authorizer_buf, *o_authorizer_reply_buf;
size_t o_authorizer_buf_len, o_authorizer_reply_buf_len; size_t o_authorizer_buf_len, o_authorizer_reply_buf_len;
unsigned long lru_ttl; unsigned long lru_ttl;
int o_marked_for_keepalive;
struct list_head o_keepalive_item;
}; };
/* an in-flight request */ /* an in-flight request */
struct ceph_osd_request { struct ceph_osd_request {
u64 r_tid; /* unique for this client */ u64 r_tid; /* unique for this client */
struct rb_node r_node; struct rb_node r_node;
struct list_head r_req_lru_item;
struct list_head r_osd_item; struct list_head r_osd_item;
struct ceph_osd *r_osd; struct ceph_osd *r_osd;
struct ceph_pg r_pgid; struct ceph_pg r_pgid;
...@@ -67,7 +70,7 @@ struct ceph_osd_request { ...@@ -67,7 +70,7 @@ struct ceph_osd_request {
char r_oid[40]; /* object name */ char r_oid[40]; /* object name */
int r_oid_len; int r_oid_len;
unsigned long r_timeout_stamp; unsigned long r_sent_stamp;
bool r_resend; /* msg send failed, needs retry */ bool r_resend; /* msg send failed, needs retry */
struct ceph_file_layout r_file_layout; struct ceph_file_layout r_file_layout;
...@@ -92,6 +95,7 @@ struct ceph_osd_client { ...@@ -92,6 +95,7 @@ struct ceph_osd_client {
u64 timeout_tid; /* tid of timeout triggering rq */ u64 timeout_tid; /* tid of timeout triggering rq */
u64 last_tid; /* tid of last request */ u64 last_tid; /* tid of last request */
struct rb_root requests; /* pending requests */ struct rb_root requests; /* pending requests */
struct list_head req_lru; /* pending requests lru */
int num_requests; int num_requests;
struct delayed_work timeout_work; struct delayed_work timeout_work;
struct delayed_work osds_timeout_work; struct delayed_work osds_timeout_work;
......
...@@ -292,6 +292,7 @@ enum { ...@@ -292,6 +292,7 @@ enum {
Opt_wsize, Opt_wsize,
Opt_rsize, Opt_rsize,
Opt_osdtimeout, Opt_osdtimeout,
Opt_osdkeepalivetimeout,
Opt_mount_timeout, Opt_mount_timeout,
Opt_osd_idle_ttl, Opt_osd_idle_ttl,
Opt_caps_wanted_delay_min, Opt_caps_wanted_delay_min,
...@@ -322,6 +323,7 @@ static match_table_t arg_tokens = { ...@@ -322,6 +323,7 @@ static match_table_t arg_tokens = {
{Opt_wsize, "wsize=%d"}, {Opt_wsize, "wsize=%d"},
{Opt_rsize, "rsize=%d"}, {Opt_rsize, "rsize=%d"},
{Opt_osdtimeout, "osdtimeout=%d"}, {Opt_osdtimeout, "osdtimeout=%d"},
{Opt_osdkeepalivetimeout, "osdkeepalive=%d"},
{Opt_mount_timeout, "mount_timeout=%d"}, {Opt_mount_timeout, "mount_timeout=%d"},
{Opt_osd_idle_ttl, "osd_idle_ttl=%d"}, {Opt_osd_idle_ttl, "osd_idle_ttl=%d"},
{Opt_caps_wanted_delay_min, "caps_wanted_delay_min=%d"}, {Opt_caps_wanted_delay_min, "caps_wanted_delay_min=%d"},
...@@ -367,7 +369,8 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options, ...@@ -367,7 +369,8 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options,
/* start with defaults */ /* start with defaults */
args->sb_flags = flags; args->sb_flags = flags;
args->flags = CEPH_OPT_DEFAULT; args->flags = CEPH_OPT_DEFAULT;
args->osd_timeout = 5; /* seconds */ args->osd_timeout = CEPH_OSD_TIMEOUT_DEFAULT;
args->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT;
args->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT; /* seconds */ args->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT; /* seconds */
args->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT; /* seconds */ args->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT; /* seconds */
args->caps_wanted_delay_min = CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT; args->caps_wanted_delay_min = CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT;
...@@ -468,6 +471,9 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options, ...@@ -468,6 +471,9 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options,
case Opt_osdtimeout: case Opt_osdtimeout:
args->osd_timeout = intval; args->osd_timeout = intval;
break; break;
case Opt_osdkeepalivetimeout:
args->osd_keepalive_timeout = intval;
break;
case Opt_mount_timeout: case Opt_mount_timeout:
args->mount_timeout = intval; args->mount_timeout = intval;
break; break;
......
...@@ -62,6 +62,7 @@ struct ceph_mount_args { ...@@ -62,6 +62,7 @@ struct ceph_mount_args {
int max_readdir; /* max readdir size */ int max_readdir; /* max readdir size */
int congestion_kb; /* max readdir size */ int congestion_kb; /* max readdir size */
int osd_timeout; int osd_timeout;
int osd_keepalive_timeout;
char *snapdir_name; /* default ".snap" */ char *snapdir_name; /* default ".snap" */
char *name; char *name;
char *secret; char *secret;
...@@ -72,6 +73,8 @@ struct ceph_mount_args { ...@@ -72,6 +73,8 @@ struct ceph_mount_args {
* defaults * defaults
*/ */
#define CEPH_MOUNT_TIMEOUT_DEFAULT 60 #define CEPH_MOUNT_TIMEOUT_DEFAULT 60
#define CEPH_OSD_TIMEOUT_DEFAULT 60 /* seconds */
#define CEPH_OSD_KEEPALIVE_DEFAULT 5
#define CEPH_OSD_IDLE_TTL_DEFAULT 60 #define CEPH_OSD_IDLE_TTL_DEFAULT 60
#define CEPH_MOUNT_RSIZE_DEFAULT (512*1024) /* readahead */ #define CEPH_MOUNT_RSIZE_DEFAULT (512*1024) /* readahead */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment