/* * Ceph uses the messenger to exchange ceph_msg messages with other * hosts in the system. The messenger provides ordered and reliable * delivery. We tolerate TCP disconnects by reconnecting (with * exponential backoff) in the case of a fault (disconnection, bad * crc, protocol error). Acks allow sent messages to be discarded by * the sender.
*/
/* * We track the state of the socket on a given connection using * values defined below. The transition to a new socket state is * handled by a function which verifies we aren't coming from an * unexpected state. * * -------- * | NEW* | transient initial state * -------- * | con_sock_state_init() * v * ---------- * | CLOSED | initialized, but no socket (and no * ---------- TCP connection) * ^ \ * | \ con_sock_state_connecting() * | ---------------------- * | \ * + con_sock_state_closed() \ * |+--------------------------- \ * | \ \ \ * | ----------- \ \ * | | CLOSING | socket event; \ \ * | ----------- await close \ \ * | ^ \ | * | | \ | * | + con_sock_state_closing() \ | * | / \ | | * | / --------------- | | * | / \ v v * | / -------------- * | / -----------------| CONNECTING | socket created, TCP * | | / -------------- connect initiated * | | | con_sock_state_connected() * | | v * ------------- * | CONNECTED | TCP connection established * ------------- * * State values for ceph_connection->sock_state; NEW is assumed to be 0.
*/
staticbool con_flag_valid(unsignedlong con_flag)
{ switch (con_flag) { case CEPH_CON_F_LOSSYTX: case CEPH_CON_F_KEEPALIVE_PENDING: case CEPH_CON_F_WRITE_PENDING: case CEPH_CON_F_SOCK_CLOSED: case CEPH_CON_F_BACKOFF: returntrue; default: returnfalse;
}
}
/* * The number of active work items is limited by the number of * connections, so leave @max_active at default.
*/
ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_MEM_RECLAIM, 0); if (ceph_msgr_wq) return 0;
pr_err("msgr_init failed to create workqueue\n");
_ceph_msgr_exit();
/* Connection socket state transition functions */
staticvoid con_sock_state_init(struct ceph_connection *con)
{ int old_state;
old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED); if (WARN_ON(old_state != CON_SOCK_STATE_NEW))
printk("%s: unexpected old state %d\n", __func__, old_state);
dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
CON_SOCK_STATE_CLOSED);
}
staticvoid con_sock_state_connecting(struct ceph_connection *con)
{ int old_state;
old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTING); if (WARN_ON(old_state != CON_SOCK_STATE_CLOSED))
printk("%s: unexpected old state %d\n", __func__, old_state);
dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
CON_SOCK_STATE_CONNECTING);
}
staticvoid con_sock_state_connected(struct ceph_connection *con)
{ int old_state;
old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTED); if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING))
printk("%s: unexpected old state %d\n", __func__, old_state);
dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
CON_SOCK_STATE_CONNECTED);
}
staticvoid con_sock_state_closing(struct ceph_connection *con)
{ int old_state;
old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSING); if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING &&
old_state != CON_SOCK_STATE_CONNECTED &&
old_state != CON_SOCK_STATE_CLOSING))
printk("%s: unexpected old state %d\n", __func__, old_state);
dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
CON_SOCK_STATE_CLOSING);
}
staticvoid con_sock_state_closed(struct ceph_connection *con)
{ int old_state;
old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED); if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTED &&
old_state != CON_SOCK_STATE_CLOSING &&
old_state != CON_SOCK_STATE_CONNECTING &&
old_state != CON_SOCK_STATE_CLOSED))
printk("%s: unexpected old state %d\n", __func__, old_state);
dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
CON_SOCK_STATE_CLOSED);
}
/* * socket callback functions
*/
/* data available on socket, or listen socket received a connect */ staticvoid ceph_sock_data_ready(struct sock *sk)
{ struct ceph_connection *con = sk->sk_user_data;
trace_sk_data_ready(sk);
if (atomic_read(&con->msgr->stopping)) { return;
}
if (sk->sk_state != TCP_CLOSE_WAIT) {
dout("%s %p state = %d, queueing work\n", __func__,
con, con->state);
queue_con(con);
}
}
/* socket has buffer space for writing */ staticvoid ceph_sock_write_space(struct sock *sk)
{ struct ceph_connection *con = sk->sk_user_data;
/* only queue to workqueue if there is data we want to write, * and there is sufficient space in the socket buffer to accept * more data. clear SOCK_NOSPACE so that ceph_sock_write_space() * doesn't get called again until try_write() fills the socket * buffer. See net/ipv4/tcp_input.c:tcp_check_space() * and net/core/stream.c:sk_stream_write_space().
*/ if (ceph_con_flag_test(con, CEPH_CON_F_WRITE_PENDING)) { if (sk_stream_is_writeable(sk)) {
dout("%s %p queueing write work\n", __func__, con);
clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
queue_con(con);
}
} else {
dout("%s %p nothing to write\n", __func__, con);
}
}
/* socket's state has changed */ staticvoid ceph_sock_state_change(struct sock *sk)
{ struct ceph_connection *con = sk->sk_user_data;
if (ceph_test_opt(from_msgr(con->msgr), TCP_NODELAY))
tcp_sock_set_nodelay(sock->sk);
con->sock = sock; return 0;
}
/* * Shutdown/close the socket for the given connection.
*/ int ceph_con_close_socket(struct ceph_connection *con)
{ int rc = 0;
dout("%s con %p sock %p\n", __func__, con, con->sock); if (con->sock) {
rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR);
sock_release(con->sock);
con->sock = NULL;
}
/* * Forcibly clear the SOCK_CLOSED flag. It gets set * independent of the connection mutex, and we could have * received a socket close event before we had the chance to * shut the socket down.
*/
ceph_con_flag_clear(con, CEPH_CON_F_SOCK_CLOSED);
con_sock_state_closed(con); return rc;
}
staticvoid ceph_con_reset_protocol(struct ceph_connection *con)
{
dout("%s con %p\n", __func__, con);
if (ceph_msgr2(from_msgr(con->msgr)))
ceph_con_v2_reset_session(con); else
ceph_con_v1_reset_session(con);
}
/* * mark a peer down. drop any open connections.
*/ void ceph_con_close(struct ceph_connection *con)
{
mutex_lock(&con->mutex);
dout("con_close %p peer %s\n", con, ceph_pr_addr(&con->peer_addr));
con->state = CEPH_CON_S_CLOSED;
ceph_con_flag_clear(con, CEPH_CON_F_LOSSYTX); /* so we retry next
connect */
ceph_con_flag_clear(con, CEPH_CON_F_KEEPALIVE_PENDING);
ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING);
ceph_con_flag_clear(con, CEPH_CON_F_BACKOFF);
/* * return true if this connection ever successfully opened
*/ bool ceph_con_opened(struct ceph_connection *con)
{ if (ceph_msgr2(from_msgr(con->msgr))) return ceph_con_v2_opened(con);
/* * We maintain a global counter to order connection attempts. Get * a unique seq greater than @gt.
*/
u32 ceph_get_global_seq(struct ceph_messenger *msgr, u32 gt)
{
u32 ret;
spin_lock(&msgr->global_seq_lock); if (msgr->global_seq < gt)
msgr->global_seq = gt;
ret = ++msgr->global_seq;
spin_unlock(&msgr->global_seq_lock); return ret;
}
/* * Discard messages that have been acked by the server.
*/ void ceph_con_discard_sent(struct ceph_connection *con, u64 ack_seq)
{ struct ceph_msg *msg;
u64 seq;
dout("%s con %p ack_seq %llu\n", __func__, con, ack_seq); while (!list_empty(&con->out_sent)) {
msg = list_first_entry(&con->out_sent, struct ceph_msg,
list_head);
WARN_ON(msg->needs_out_seq);
seq = le64_to_cpu(msg->hdr.seq); if (seq > ack_seq) break;
/* * Discard messages that have been requeued in con_fault(), up to * reconnect_seq. This avoids gratuitously resending messages that * the server had received and handled prior to reconnect.
*/ void ceph_con_discard_requeued(struct ceph_connection *con, u64 reconnect_seq)
{ struct ceph_msg *msg;
u64 seq;
dout("%s con %p reconnect_seq %llu\n", __func__, con, reconnect_seq); while (!list_empty(&con->out_queue)) {
msg = list_first_entry(&con->out_queue, struct ceph_msg,
list_head); if (msg->needs_out_seq) break;
seq = le64_to_cpu(msg->hdr.seq); if (seq > reconnect_seq) break;
/* * For a bio data item, a piece is whatever remains of the next * entry in the current bio iovec, or the first entry in the next * bio in the list.
*/ staticvoid ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor *cursor,
size_t length)
{ struct ceph_msg_data *data = cursor->data; struct ceph_bio_iter *it = &cursor->bio_iter;
if (!cursor->resid) returnfalse; /* no more data */
if (!bytes || (it->iter.bi_size && it->iter.bi_bvec_done &&
page == bio_iter_page(it->bio, it->iter))) returnfalse; /* more bytes to process in this segment */
if (!it->iter.bi_size) {
it->bio = it->bio->bi_next;
it->iter = it->bio->bi_iter; if (cursor->resid < it->iter.bi_size)
it->iter.bi_size = cursor->resid;
}
if (!cursor->resid) returnfalse; /* no more data */
if (!bytes || (cursor->bvec_iter.bi_bvec_done &&
page == bvec_iter_page(bvecs, cursor->bvec_iter))) returnfalse; /* more bytes to process in this segment */
/* * For a page array, a piece comes from the first page in the array * that has not already been fully consumed.
*/ staticvoid ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor *cursor,
size_t length)
{ struct ceph_msg_data *data = cursor->data; int page_count;
cursor->resid -= bytes;
cursor->page_offset = (cursor->page_offset + bytes) & ~PAGE_MASK; if (!bytes || cursor->page_offset) returnfalse; /* more bytes to process in the current page */
if (!cursor->resid) returnfalse; /* no more data */
/* Move on to the next page; offset is already at 0 */
/* * For a pagelist, a piece is whatever remains to be consumed in the * first page in the list, or the front of the next page.
*/ staticvoid
ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data_cursor *cursor,
size_t length)
{ struct ceph_msg_data *data = cursor->data; struct ceph_pagelist *pagelist; struct page *page;
BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
pagelist = data->pagelist;
BUG_ON(!pagelist);
if (!length) return; /* pagelist can be assigned but empty */
cursor->resid -= bytes;
cursor->offset += bytes; /* offset of first page in pagelist is always 0 */ if (!bytes || cursor->offset & ~PAGE_MASK) returnfalse; /* more bytes to process in the current page */
if (!cursor->resid) returnfalse; /* no more data */
if (cursor->lastlen)
iov_iter_revert(&cursor->iov_iter, cursor->lastlen);
len = iov_iter_get_pages2(&cursor->iov_iter, &page, PAGE_SIZE,
1, page_offset);
BUG_ON(len < 0);
cursor->lastlen = len;
/* * FIXME: The assumption is that the pages represented by the iov_iter * are pinned, with the references held by the upper-level * callers, or by virtue of being under writeback. Eventually, * we'll get an iov_iter_get_pages2 variant that doesn't take * page refs. Until then, just put the page ref.
*/
VM_BUG_ON_PAGE(!PageWriteback(page) && page_count(page) < 2, page);
put_page(page);
/* * Message data is handled (sent or received) in pieces, where each * piece resides on a single page. The network layer might not * consume an entire piece at once. A data item's cursor keeps * track of which piece is next to process and how much remains to * be processed in that piece. It also tracks whether the current * piece is the last one in the data item.
*/ staticvoid __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
{
size_t length = cursor->total_resid;
switch (cursor->data->type) { case CEPH_MSG_DATA_PAGELIST:
ceph_msg_data_pagelist_cursor_init(cursor, length); break; case CEPH_MSG_DATA_PAGES:
ceph_msg_data_pages_cursor_init(cursor, length); break; #ifdef CONFIG_BLOCK case CEPH_MSG_DATA_BIO:
ceph_msg_data_bio_cursor_init(cursor, length); break; #endif/* CONFIG_BLOCK */ case CEPH_MSG_DATA_BVECS:
ceph_msg_data_bvecs_cursor_init(cursor, length); break; case CEPH_MSG_DATA_ITER:
ceph_msg_data_iter_cursor_init(cursor, length); break; case CEPH_MSG_DATA_NONE: default: /* BUG(); */ break;
}
cursor->need_crc = true;
}
/* * Return the page containing the next piece to process for a given * data item, and supply the page offset and length of that piece. * Indicate whether this is the last piece in this data item.
*/ struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
size_t *page_offset, size_t *length)
{ struct page *page;
switch (cursor->data->type) { case CEPH_MSG_DATA_PAGELIST:
page = ceph_msg_data_pagelist_next(cursor, page_offset, length); break; case CEPH_MSG_DATA_PAGES:
page = ceph_msg_data_pages_next(cursor, page_offset, length); break; #ifdef CONFIG_BLOCK case CEPH_MSG_DATA_BIO:
page = ceph_msg_data_bio_next(cursor, page_offset, length); break; #endif/* CONFIG_BLOCK */ case CEPH_MSG_DATA_BVECS:
page = ceph_msg_data_bvecs_next(cursor, page_offset, length); break; case CEPH_MSG_DATA_ITER:
page = ceph_msg_data_iter_next(cursor, page_offset, length); break; case CEPH_MSG_DATA_NONE: default:
page = NULL; break;
}
/* * Returns true if the result moves the cursor on to the next piece * of the data item.
*/ void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, size_t bytes)
{ bool new_piece;
BUG_ON(bytes > cursor->resid); switch (cursor->data->type) { case CEPH_MSG_DATA_PAGELIST:
new_piece = ceph_msg_data_pagelist_advance(cursor, bytes); break; case CEPH_MSG_DATA_PAGES:
new_piece = ceph_msg_data_pages_advance(cursor, bytes); break; #ifdef CONFIG_BLOCK case CEPH_MSG_DATA_BIO:
new_piece = ceph_msg_data_bio_advance(cursor, bytes); break; #endif/* CONFIG_BLOCK */ case CEPH_MSG_DATA_BVECS:
new_piece = ceph_msg_data_bvecs_advance(cursor, bytes); break; case CEPH_MSG_DATA_ITER:
new_piece = ceph_msg_data_iter_advance(cursor, bytes); break; case CEPH_MSG_DATA_NONE: default:
BUG(); break;
}
cursor->total_resid -= bytes;
/* * Extract hostname string and resolve using kernel DNS facility.
*/ #ifdef CONFIG_CEPH_LIB_USE_DNS_RESOLVER staticint ceph_dns_resolve_name(constchar *name, size_t namelen, struct ceph_entity_addr *addr, char delim, constchar **ipend)
{ constchar *end, *delim_p; char *colon_p, *ip_addr = NULL; int ip_len, ret;
/* * The end of the hostname occurs immediately preceding the delimiter or * the port marker (':') where the delimiter takes precedence.
*/
delim_p = memchr(name, delim, namelen);
colon_p = memchr(name, ':', namelen);
if (delim_p && colon_p)
end = min(delim_p, colon_p); elseif (!delim_p && colon_p)
end = colon_p; else {
end = delim_p; if (!end) /* case: hostname:/ */
end = name + namelen;
}
if (end <= name) return -EINVAL;
/* do dns_resolve upcall */
ip_len = dns_query(current->nsproxy->net_ns,
NULL, name, end - name, NULL, &ip_addr, NULL, false); if (ip_len > 0)
ret = ceph_pton(ip_addr, ip_len, addr, -1, NULL); else
ret = -ESRCH;
/* * Parse a server name (IP or hostname). If a valid IP address is not found * then try to extract a hostname to resolve using userspace DNS upcall.
*/ staticint ceph_parse_server_name(constchar *name, size_t namelen, struct ceph_entity_addr *addr, char delim, constchar **ipend)
{ int ret;
ret = ceph_pton(name, namelen, addr, delim, ipend); if (ret)
ret = ceph_dns_resolve_name(name, namelen, addr, delim, ipend);
return ret;
}
/* * Parse an ip[:port] list into an addr array. Use the default * monitor port if a port isn't specified.
*/ int ceph_parse_ips(constchar *c, constchar *end, struct ceph_entity_addr *addr, int max_count, int *count, char delim)
{ int i, ret = -EINVAL; constchar *p = c;
dout("parse_ips on '%.*s'\n", (int)(end-c), c); for (i = 0; i < max_count; i++) { char cur_delim = delim; constchar *ipend; int port;
if (*p == '[') {
cur_delim = ']';
p++;
}
ret = ceph_parse_server_name(p, end - p, &addr[i], cur_delim,
&ipend); if (ret) goto bad;
ret = -EINVAL;
p = ipend;
if (cur_delim == ']') { if (*p != ']') {
dout("missing matching ']'\n"); goto bad;
}
p++;
}
/* port? */ if (p < end && *p == ':') {
port = 0;
p++; while (p < end && *p >= '0' && *p <= '9') {
port = (port * 10) + (*p - '0');
p++;
} if (port == 0)
port = CEPH_MON_PORT; elseif (port > 65535) goto bad;
} else {
port = CEPH_MON_PORT;
}
ceph_addr_set_port(&addr[i], port); /* * We want the type to be set according to ms_mode * option, but options are normally parsed after mon * addresses. Rather than complicating parsing, set * to LEGACY and override in build_initial_monmap() * for mon addresses and ceph_messenger_init() for * ip option.
*/
addr[i].type = CEPH_ENTITY_ADDR_TYPE_LEGACY;
addr[i].nonce = 0;
if (p == end) break; if (*p != delim) goto bad;
p++;
}
if (p != end) goto bad;
if (count)
*count = i + 1; return 0;
bad: return ret;
}
/* * Process message. This happens in the worker thread. The callback should * be careful not to do anything that waits on other incoming messages or it * may deadlock.
*/ void ceph_con_process_message(struct ceph_connection *con)
{ struct ceph_msg *msg = con->in_msg;
/* * Atomically queue work on a connection after the specified delay. * Bump @con reference to avoid races with connection teardown. * Returns 0 if work was queued, or an error code otherwise.
*/ staticint queue_con_delay(struct ceph_connection *con, unsignedlong delay)
{ if (!con->ops->get(con)) {
dout("%s %p ref count 0\n", __func__, con); return -ENOENT;
}
if (delay >= HZ)
delay = round_jiffies_relative(delay);
staticbool con_backoff(struct ceph_connection *con)
{ int ret;
if (!ceph_con_flag_test_and_clear(con, CEPH_CON_F_BACKOFF)) returnfalse;
ret = queue_con_delay(con, con->delay); if (ret) {
dout("%s: con %p FAILED to back off %lu\n", __func__,
con, con->delay);
BUG_ON(ret == -ENOENT);
ceph_con_flag_set(con, CEPH_CON_F_BACKOFF);
}
returntrue;
}
/* Finish fault handling; con->mutex must *not* be held here */
/* * in case we faulted due to authentication, invalidate our * current tickets so that we can get new ones.
*/ if (!ceph_msgr2(from_msgr(con->msgr)) && con->v1.auth_retry) {
dout("auth_retry %d, invalidating\n", con->v1.auth_retry); if (con->ops->invalidate_authorizer)
con->ops->invalidate_authorizer(con);
con->v1.auth_retry = 0;
}
if (con->ops->fault)
con->ops->fault(con);
}
/* * Do some work on a connection. Drop a connection ref when we're done.
*/ staticvoid ceph_con_workfn(struct work_struct *work)
{ struct ceph_connection *con = container_of(work, struct ceph_connection,
work.work); bool fault;
mutex_lock(&con->mutex); while (true) { int ret;
if ((fault = con_sock_closed(con))) {
dout("%s: con %p SOCK_CLOSED\n", __func__, con); break;
} if (con_backoff(con)) {
dout("%s: con %p BACKOFF\n", __func__, con); break;
} if (con->state == CEPH_CON_S_STANDBY) {
dout("%s: con %p STANDBY\n", __func__, con); break;
} if (con->state == CEPH_CON_S_CLOSED) {
dout("%s: con %p CLOSED\n", __func__, con);
BUG_ON(con->sock); break;
} if (con->state == CEPH_CON_S_PREOPEN) {
dout("%s: con %p PREOPEN\n", __func__, con);
BUG_ON(con->sock);
}
if (ceph_msgr2(from_msgr(con->msgr)))
ret = ceph_con_v2_try_read(con); else
ret = ceph_con_v1_try_read(con); if (ret < 0) { if (ret == -EAGAIN) continue; if (!con->error_msg)
con->error_msg = "socket error on read";
fault = true; break;
}
if (ceph_msgr2(from_msgr(con->msgr)))
ret = ceph_con_v2_try_write(con); else
ret = ceph_con_v1_try_write(con); if (ret < 0) { if (ret == -EAGAIN) continue; if (!con->error_msg)
con->error_msg = "socket error on write";
fault = true;
}
break; /* If we make it to here, we're done */
} if (fault)
con_fault(con);
mutex_unlock(&con->mutex);
if (fault)
con_fault_finish(con);
con->ops->put(con);
}
/* * Generic error/fault handler. A retry mechanism is used with * exponential backoff
*/ staticvoid con_fault(struct ceph_connection *con)
{
dout("fault %p state %d to peer %s\n",
con, con->state, ceph_pr_addr(&con->peer_addr));
/* * initialize a new messenger instance
*/ void ceph_messenger_init(struct ceph_messenger *msgr, struct ceph_entity_addr *myaddr)
{
spin_lock_init(&msgr->global_seq_lock);
if (myaddr) {
memcpy(&msgr->inst.addr.in_addr, &myaddr->in_addr, sizeof(msgr->inst.addr.in_addr));
ceph_addr_set_port(&msgr->inst.addr, 0);
}
/* * Since nautilus, clients are identified using type ANY. * For msgr1, ceph_encode_banner_addr() munges it to NONE.
*/
msgr->inst.addr.type = CEPH_ENTITY_ADDR_TYPE_ANY;
/* generate a random non-zero nonce */ do {
get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce));
} while (!msgr->inst.addr.nonce);
ceph_encode_my_addr(msgr);
staticvoid clear_standby(struct ceph_connection *con)
{ /* come back from STANDBY? */ if (con->state == CEPH_CON_S_STANDBY) {
dout("clear_standby %p\n", con);
con->state = CEPH_CON_S_PREOPEN; if (!ceph_msgr2(from_msgr(con->msgr)))
con->v1.connect_seq++;
WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_WRITE_PENDING));
WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING));
}
}
/* * Queue up an outgoing message on the given connection. * * Consumes a ref on @msg.
*/ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
{ /* set src+dst */
msg->hdr.src = con->msgr->inst.name;
BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len));
msg->needs_out_seq = true;
BUG_ON(!list_empty(&msg->list_head));
list_add_tail(&msg->list_head, &con->out_queue);
dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type),
ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
le32_to_cpu(msg->hdr.front_len),
le32_to_cpu(msg->hdr.middle_len),
le32_to_cpu(msg->hdr.data_len));
clear_standby(con);
mutex_unlock(&con->mutex);
/* if there wasn't anything waiting to send before, queue
* new work */ if (!ceph_con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING))
queue_con(con);
}
EXPORT_SYMBOL(ceph_con_send);
/* * Revoke a message that was previously queued for send
*/ void ceph_msg_revoke(struct ceph_msg *msg)
{ struct ceph_connection *con = msg->con;
if (!con) {
dout("%s msg %p null con\n", __func__, msg); return; /* Message not in our possession */
}
mutex_lock(&con->mutex); if (list_empty(&msg->list_head)) {
WARN_ON(con->out_msg == msg);
dout("%s con %p msg %p not linked\n", __func__, con, msg);
mutex_unlock(&con->mutex); return;
}
dout("%s con %p msg %p was linked\n", __func__, con, msg);
msg->hdr.seq = 0;
ceph_msg_remove(msg);
if (con->out_msg == msg) {
WARN_ON(con->state != CEPH_CON_S_OPEN);
dout("%s con %p msg %p was sending\n", __func__, con, msg); if (ceph_msgr2(from_msgr(con->msgr)))
ceph_con_v2_revoke(con); else
ceph_con_v1_revoke(con);
ceph_msg_put(con->out_msg);
con->out_msg = NULL;
} else {
dout("%s con %p msg %p not current, out_msg %p\n", __func__,
con, msg, con->out_msg);
}
mutex_unlock(&con->mutex);
}
/* * Revoke a message that we may be reading data into
*/ void ceph_msg_revoke_incoming(struct ceph_msg *msg)
{ struct ceph_connection *con = msg->con;
if (!con) {
dout("%s msg %p null con\n", __func__, msg); return; /* Message not in our possession */
}
mutex_lock(&con->mutex); if (con->in_msg == msg) {
WARN_ON(con->state != CEPH_CON_S_OPEN);
dout("%s con %p msg %p was recving\n", __func__, con, msg); if (ceph_msgr2(from_msgr(con->msgr)))
ceph_con_v2_revoke_incoming(con); else
ceph_con_v1_revoke_incoming(con);
ceph_msg_put(con->in_msg);
con->in_msg = NULL;
} else {
dout("%s con %p msg %p not current, in_msg %p\n", __func__,
con, msg, con->in_msg);
}
mutex_unlock(&con->mutex);
}
/* * Queue a keepalive byte to ensure the tcp connection is alive.
*/ void ceph_con_keepalive(struct ceph_connection *con)
{
dout("con_keepalive %p\n", con);
mutex_lock(&con->mutex);
clear_standby(con);
ceph_con_flag_set(con, CEPH_CON_F_KEEPALIVE_PENDING);
mutex_unlock(&con->mutex);
if (!ceph_con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING))
queue_con(con);
}
EXPORT_SYMBOL(ceph_con_keepalive);
/* * construct a new message with given type, size * the new msg has a ref count of 1.
*/ struct ceph_msg *ceph_msg_new2(int type, int front_len, int max_data_items,
gfp_t flags, bool can_fail)
{ struct ceph_msg *m;
m = kmem_cache_zalloc(ceph_msg_cache, flags); if (m == NULL) goto out;
/* * Allocate "middle" portion of a message, if it is needed and wasn't * allocated by alloc_msg. This allows us to read a small fixed-size * per-type header in the front and then gracefully fail (i.e., * propagate the error to the caller based on info in the front) when * the middle is too large.
*/ staticint ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
{ int type = le16_to_cpu(msg->hdr.type); int middle_len = le32_to_cpu(msg->hdr.middle_len);
/* * Allocate a message for receiving an incoming message on a * connection, and save the result in con->in_msg. Uses the * connection's private alloc_msg op if available. * * Returns 0 on success, or a negative error code. * * On success, if we set *skip = 1: * - the next message should be skipped and ignored. * - con->in_msg == NULL * or if we set *skip = 0: * - con->in_msg is non-null. * On error (ENOMEM, EAGAIN, ...), * - con->in_msg == NULL
*/ int ceph_con_in_msg_alloc(struct ceph_connection *con, struct ceph_msg_header *hdr, int *skip)
{ int middle_len = le32_to_cpu(hdr->middle_len); struct ceph_msg *msg; int ret = 0;
mutex_unlock(&con->mutex);
msg = con->ops->alloc_msg(con, hdr, skip);
mutex_lock(&con->mutex); if (con->state != CEPH_CON_S_OPEN) { if (msg)
ceph_msg_put(msg); return -EAGAIN;
} if (msg) {
BUG_ON(*skip);
msg_con_set(msg, con);
con->in_msg = msg;
} else { /* * Null message pointer means either we should skip * this message or we couldn't allocate memory. The * former is not an error.
*/ if (*skip) return 0;
/* * Put the message on "sent" list using a ref from ceph_con_send(). * It is put when the message is acked or revoked.
*/
list_move_tail(&msg->list_head, &con->out_sent);
/* * Only assign outgoing seq # if we haven't sent this message * yet. If it is requeued, resend with it's original seq.
*/ if (msg->needs_out_seq) {
msg->hdr.seq = cpu_to_le64(++con->out_seq);
msg->needs_out_seq = false;
if (con->ops->reencode_message)
con->ops->reencode_message(msg);
}
/* * Get a ref for out_msg. It is put when we are done sending the * message or in case of a fault.
*/
WARN_ON(con->out_msg);
con->out_msg = ceph_msg_get(msg);
}
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.