Commit c86a2930 authored by Sage Weil's avatar Sage Weil

ceph: carry explicit msg reference for currently sending message

Carry a ceph_msg reference for connection->out_msg.  This will allow us to
make out_sent optional.
Signed-off-by: default avatarSage Weil <sage@newdream.net>
parent d4a780ce
...@@ -322,7 +322,10 @@ static void reset_connection(struct ceph_connection *con) ...@@ -322,7 +322,10 @@ static void reset_connection(struct ceph_connection *con)
con->connect_seq = 0; con->connect_seq = 0;
con->out_seq = 0; con->out_seq = 0;
con->out_msg = NULL; if (con->out_msg) {
ceph_msg_put(con->out_msg);
con->out_msg = NULL;
}
con->in_seq = 0; con->in_seq = 0;
mutex_unlock(&con->out_mutex); mutex_unlock(&con->out_mutex);
} }
...@@ -423,7 +426,7 @@ static void prepare_write_message_footer(struct ceph_connection *con, int v) ...@@ -423,7 +426,7 @@ static void prepare_write_message_footer(struct ceph_connection *con, int v)
con->out_kvec_bytes += sizeof(m->footer); con->out_kvec_bytes += sizeof(m->footer);
con->out_kvec_left++; con->out_kvec_left++;
con->out_more = m->more_to_follow; con->out_more = m->more_to_follow;
con->out_msg = NULL; /* we're done with this one */ con->out_msg_done = true;
} }
/* /*
...@@ -436,6 +439,7 @@ static void prepare_write_message(struct ceph_connection *con) ...@@ -436,6 +439,7 @@ static void prepare_write_message(struct ceph_connection *con)
con->out_kvec_bytes = 0; con->out_kvec_bytes = 0;
con->out_kvec_is_msg = true; con->out_kvec_is_msg = true;
con->out_msg_done = false;
/* Sneak an ack in there first? If we can get it into the same /* Sneak an ack in there first? If we can get it into the same
* TCP packet that's a good thing. */ * TCP packet that's a good thing. */
...@@ -452,8 +456,9 @@ static void prepare_write_message(struct ceph_connection *con) ...@@ -452,8 +456,9 @@ static void prepare_write_message(struct ceph_connection *con)
/* move message to sending/sent list */ /* move message to sending/sent list */
m = list_first_entry(&con->out_queue, m = list_first_entry(&con->out_queue,
struct ceph_msg, list_head); struct ceph_msg, list_head);
con->out_msg = m;
ceph_msg_get(m);
list_move_tail(&m->list_head, &con->out_sent); list_move_tail(&m->list_head, &con->out_sent);
con->out_msg = m; /* we don't bother taking a reference here. */
m->hdr.seq = cpu_to_le64(++con->out_seq); m->hdr.seq = cpu_to_le64(++con->out_seq);
...@@ -1521,6 +1526,12 @@ more_kvec: ...@@ -1521,6 +1526,12 @@ more_kvec:
/* msg pages? */ /* msg pages? */
if (con->out_msg) { if (con->out_msg) {
if (con->out_msg_done) {
ceph_msg_put(con->out_msg);
con->out_msg = NULL; /* we're done with this one */
goto do_next;
}
ret = write_partial_msg_pages(con); ret = write_partial_msg_pages(con);
if (ret == 1) if (ret == 1)
goto more_kvec; /* we need to send the footer, too! */ goto more_kvec; /* we need to send the footer, too! */
...@@ -1533,6 +1544,7 @@ more_kvec: ...@@ -1533,6 +1544,7 @@ more_kvec:
} }
} }
do_next:
if (!test_bit(CONNECTING, &con->state)) { if (!test_bit(CONNECTING, &con->state)) {
/* is anything else pending? */ /* is anything else pending? */
if (!list_empty(&con->out_queue)) { if (!list_empty(&con->out_queue)) {
...@@ -1923,8 +1935,10 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg) ...@@ -1923,8 +1935,10 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
list_del_init(&msg->list_head); list_del_init(&msg->list_head);
ceph_msg_put(msg); ceph_msg_put(msg);
msg->hdr.seq = 0; msg->hdr.seq = 0;
if (con->out_msg == msg) if (con->out_msg == msg) {
ceph_msg_put(con->out_msg);
con->out_msg = NULL; con->out_msg = NULL;
}
if (con->out_kvec_is_msg) { if (con->out_kvec_is_msg) {
con->out_skip = con->out_kvec_bytes; con->out_skip = con->out_kvec_bytes;
con->out_kvec_is_msg = false; con->out_kvec_is_msg = false;
......
...@@ -182,6 +182,7 @@ struct ceph_connection { ...@@ -182,6 +182,7 @@ struct ceph_connection {
/* message out temps */ /* message out temps */
struct ceph_msg *out_msg; /* sending message (== tail of struct ceph_msg *out_msg; /* sending message (== tail of
out_sent) */ out_sent) */
bool out_msg_done;
struct ceph_msg_pos out_msg_pos; struct ceph_msg_pos out_msg_pos;
struct kvec out_kvec[8], /* sending header/footer data */ struct kvec out_kvec[8], /* sending header/footer data */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment