Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/C/Linux/net/tipc/   (Open Source Betriebssystem Version 6.17.9©)  Datei vom 24.10.2025 mit Größe 104 kB image not shown  

Quelle  socket.c   Sprache: C

 
/*
 * net/tipc/socket.c: TIPC socket API
 *
 * Copyright (c) 2001-2007, 2012-2019, Ericsson AB
 * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
 * Copyright (c) 2020-2021, Red Hat Inc
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in the
 *    documentation and/or other materials provided with the distribution.
 * 3. Neither the names of the copyright holders nor the names of its
 *    contributors may be used to endorse or promote products derived from
 *    this software without specific prior written permission.
 *
 * Alternatively, this software may be distributed under the terms of the
 * GNU General Public License ("GPL") version 2 as published by the Free
 * Software Foundation.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */


#include <linux/rhashtable.h>
#include <linux/sched/signal.h>
#include <trace/events/sock.h>

#include "core.h"
#include "name_table.h"
#include "node.h"
#include "link.h"
#include "name_distr.h"
#include "socket.h"
#include "bcast.h"
#include "netlink.h"
#include "group.h"
#include "trace.h"

#define NAGLE_START_INIT 4
#define NAGLE_START_MAX  1024
#define CONN_TIMEOUT_DEFAULT    8000    /* default connect timeout = 8s */
#define CONN_PROBING_INTV msecs_to_jiffies(3600000)  /* [ms] => 1 h */
#define TIPC_MAX_PORT  0xffffffff
#define TIPC_MIN_PORT  1
#define TIPC_ACK_RATE  4       /* ACK at 1/4 of rcv window size */

enum {
 TIPC_LISTEN = TCP_LISTEN,
 TIPC_ESTABLISHED = TCP_ESTABLISHED,
 TIPC_OPEN = TCP_CLOSE,
 TIPC_DISCONNECTING = TCP_CLOSE_WAIT,
 TIPC_CONNECTING = TCP_SYN_SENT,
};

struct sockaddr_pair {
 struct sockaddr_tipc sock;
 struct sockaddr_tipc member;
};

/**
 * struct tipc_sock - TIPC socket structure
 * @sk: socket - interacts with 'port' and with user via the socket API
 * @max_pkt: maximum packet size "hint" used when building messages sent by port
 * @maxnagle: maximum size of msg which can be subject to nagle
 * @portid: unique port identity in TIPC socket hash table
 * @phdr: preformatted message header used when sending messages
 * @cong_links: list of congested links
 * @publications: list of publications for port
 * @pub_count: total # of publications port has made during its lifetime
 * @conn_timeout: the time we can wait for an unresponded setup request
 * @probe_unacked: probe has not received ack yet
 * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
 * @cong_link_cnt: number of congested links
 * @snt_unacked: # messages sent by socket, and not yet acked by peer
 * @snd_win: send window size
 * @peer_caps: peer capabilities mask
 * @rcv_unacked: # messages read by user, but not yet acked back to peer
 * @rcv_win: receive window size
 * @peer: 'connected' peer for dgram/rdm
 * @node: hash table node
 * @mc_method: cookie for use between socket and broadcast layer
 * @rcu: rcu struct for tipc_sock
 * @group: TIPC communications group
 * @oneway: message count in one direction (FIXME)
 * @nagle_start: current nagle value
 * @snd_backlog: send backlog count
 * @msg_acc: messages accepted; used in managing backlog and nagle
 * @pkt_cnt: TIPC socket packet count
 * @expect_ack: whether this TIPC socket is expecting an ack
 * @nodelay: setsockopt() TIPC_NODELAY setting
 * @group_is_open: TIPC socket group is fully open (FIXME)
 * @published: true if port has one or more associated names
 * @conn_addrtype: address type used when establishing connection
 */

struct tipc_sock {
 struct sock sk;
 u32 max_pkt;
 u32 maxnagle;
 u32 portid;
 struct tipc_msg phdr;
 struct list_head cong_links;
 struct list_head publications;
 u32 pub_count;
 atomic_t dupl_rcvcnt;
 u16 conn_timeout;
 bool probe_unacked;
 u16 cong_link_cnt;
 u16 snt_unacked;
 u16 snd_win;
 u16 peer_caps;
 u16 rcv_unacked;
 u16 rcv_win;
 struct sockaddr_tipc peer;
 struct rhash_head node;
 struct tipc_mc_method mc_method;
 struct rcu_head rcu;
 struct tipc_group *group;
 u32 oneway;
 u32 nagle_start;
 u16 snd_backlog;
 u16 msg_acc;
 u16 pkt_cnt;
 bool expect_ack;
 bool nodelay;
 bool group_is_open;
 bool published;
 u8 conn_addrtype;
};

static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb);
static void tipc_data_ready(struct sock *sk);
static void tipc_write_space(struct sock *sk);
static void tipc_sock_destruct(struct sock *sk);
static int tipc_release(struct socket *sock);
static void tipc_sk_timeout(struct timer_list *t);
static int tipc_sk_publish(struct tipc_sock *tsk, struct tipc_uaddr *ua);
static int tipc_sk_withdraw(struct tipc_sock *tsk, struct tipc_uaddr *ua);
static int tipc_sk_leave(struct tipc_sock *tsk);
static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
static int tipc_sk_insert(struct tipc_sock *tsk);
static void tipc_sk_remove(struct tipc_sock *tsk);
static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz);
static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);
static void tipc_sk_push_backlog(struct tipc_sock *tsk, bool nagle_ack);
static int tipc_wait_for_connect(struct socket *sock, long *timeo_p);

static const struct proto_ops packet_ops;
static const struct proto_ops stream_ops;
static const struct proto_ops msg_ops;
static struct proto tipc_proto;
static const struct rhashtable_params tsk_rht_params;

static u32 tsk_own_node(struct tipc_sock *tsk)
{
 return msg_prevnode(&tsk->phdr);
}

static u32 tsk_peer_node(struct tipc_sock *tsk)
{
 return msg_destnode(&tsk->phdr);
}

static u32 tsk_peer_port(struct tipc_sock *tsk)
{
 return msg_destport(&tsk->phdr);
}

static  bool tsk_unreliable(struct tipc_sock *tsk)
{
 return msg_src_droppable(&tsk->phdr) != 0;
}

static void tsk_set_unreliable(struct tipc_sock *tsk, bool unreliable)
{
 msg_set_src_droppable(&tsk->phdr, unreliable ? 1 : 0);
}

static bool tsk_unreturnable(struct tipc_sock *tsk)
{
 return msg_dest_droppable(&tsk->phdr) != 0;
}

static void tsk_set_unreturnable(struct tipc_sock *tsk, bool unreturnable)
{
 msg_set_dest_droppable(&tsk->phdr, unreturnable ? 1 : 0);
}

static int tsk_importance(struct tipc_sock *tsk)
{
 return msg_importance(&tsk->phdr);
}

static struct tipc_sock *tipc_sk(const struct sock *sk)
{
 return container_of(sk, struct tipc_sock, sk);
}

int tsk_set_importance(struct sock *sk, int imp)
{
 if (imp > TIPC_CRITICAL_IMPORTANCE)
  return -EINVAL;
 msg_set_importance(&tipc_sk(sk)->phdr, (u32)imp);
 return 0;
}

static bool tsk_conn_cong(struct tipc_sock *tsk)
{
 return tsk->snt_unacked > tsk->snd_win;
}

static u16 tsk_blocks(int len)
{
 return ((len / FLOWCTL_BLK_SZ) + 1);
}

/* tsk_blocks(): translate a buffer size in bytes to number of
 * advertisable blocks, taking into account the ratio truesize(len)/len
 * We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ
 */

static u16 tsk_adv_blocks(int len)
{
 return len / FLOWCTL_BLK_SZ / 4;
}

/* tsk_inc(): increment counter for sent or received data
 * - If block based flow control is not supported by peer we
 *   fall back to message based ditto, incrementing the counter
 */

static u16 tsk_inc(struct tipc_sock *tsk, int msglen)
{
 if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
  return ((msglen / FLOWCTL_BLK_SZ) + 1);
 return 1;
}

/* tsk_set_nagle - enable/disable nagle property by manipulating maxnagle
 */

static void tsk_set_nagle(struct tipc_sock *tsk)
{
 struct sock *sk = &tsk->sk;

 tsk->maxnagle = 0;
 if (sk->sk_type != SOCK_STREAM)
  return;
 if (tsk->nodelay)
  return;
 if (!(tsk->peer_caps & TIPC_NAGLE))
  return;
 /* Limit node local buffer size to avoid receive queue overflow */
 if (tsk->max_pkt == MAX_MSG_SIZE)
  tsk->maxnagle = 1500;
 else
  tsk->maxnagle = tsk->max_pkt;
}

/**
 * tsk_advance_rx_queue - discard first buffer in socket receive queue
 * @sk: network socket
 *
 * Caller must hold socket lock
 */

static void tsk_advance_rx_queue(struct sock *sk)
{
 trace_tipc_sk_advance_rx(sk, NULL, TIPC_DUMP_SK_RCVQ, " ");
 kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
}

/* tipc_sk_respond() : send response message back to sender
 */

static void tipc_sk_respond(struct sock *sk, struct sk_buff *skb, int err)
{
 u32 selector;
 u32 dnode;
 u32 onode = tipc_own_addr(sock_net(sk));

 if (!tipc_msg_reverse(onode, &skb, err))
  return;

 trace_tipc_sk_rej_msg(sk, skb, TIPC_DUMP_NONE, "@sk_respond!");
 dnode = msg_destnode(buf_msg(skb));
 selector = msg_origport(buf_msg(skb));
 tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
}

/**
 * tsk_rej_rx_queue - reject all buffers in socket receive queue
 * @sk: network socket
 * @error: response error code
 *
 * Caller must hold socket lock
 */

static void tsk_rej_rx_queue(struct sock *sk, int error)
{
 struct sk_buff *skb;

 while ((skb = __skb_dequeue(&sk->sk_receive_queue)))
  tipc_sk_respond(sk, skb, error);
}

static bool tipc_sk_connected(const struct sock *sk)
{
 return READ_ONCE(sk->sk_state) == TIPC_ESTABLISHED;
}

/* tipc_sk_type_connectionless - check if the socket is datagram socket
 * @sk: socket
 *
 * Returns true if connection less, false otherwise
 */

static bool tipc_sk_type_connectionless(struct sock *sk)
{
 return sk->sk_type == SOCK_RDM || sk->sk_type == SOCK_DGRAM;
}

/* tsk_peer_msg - verify if message was sent by connected port's peer
 *
 * Handles cases where the node's network address has changed from
 * the default of <0.0.0> to its configured setting.
 */

static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg)
{
 struct sock *sk = &tsk->sk;
 u32 self = tipc_own_addr(sock_net(sk));
 u32 peer_port = tsk_peer_port(tsk);
 u32 orig_node, peer_node;

 if (unlikely(!tipc_sk_connected(sk)))
  return false;

 if (unlikely(msg_origport(msg) != peer_port))
  return false;

 orig_node = msg_orignode(msg);
 peer_node = tsk_peer_node(tsk);

 if (likely(orig_node == peer_node))
  return true;

 if (!orig_node && peer_node == self)
  return true;

 if (!peer_node && orig_node == self)
  return true;

 return false;
}

/* tipc_set_sk_state - set the sk_state of the socket
 * @sk: socket
 *
 * Caller must hold socket lock
 *
 * Returns 0 on success, errno otherwise
 */

static int tipc_set_sk_state(struct sock *sk, int state)
{
 int oldsk_state = sk->sk_state;
 int res = -EINVAL;

 switch (state) {
 case TIPC_OPEN:
  res = 0;
  break;
 case TIPC_LISTEN:
 case TIPC_CONNECTING:
  if (oldsk_state == TIPC_OPEN)
   res = 0;
  break;
 case TIPC_ESTABLISHED:
  if (oldsk_state == TIPC_CONNECTING ||
      oldsk_state == TIPC_OPEN)
   res = 0;
  break;
 case TIPC_DISCONNECTING:
  if (oldsk_state == TIPC_CONNECTING ||
      oldsk_state == TIPC_ESTABLISHED)
   res = 0;
  break;
 }

 if (!res)
  sk->sk_state = state;

 return res;
}

static int tipc_sk_sock_err(struct socket *sock, long *timeout)
{
 struct sock *sk = sock->sk;
 int err = sock_error(sk);
 int typ = sock->type;

 if (err)
  return err;
 if (typ == SOCK_STREAM || typ == SOCK_SEQPACKET) {
  if (sk->sk_state == TIPC_DISCONNECTING)
   return -EPIPE;
  else if (!tipc_sk_connected(sk))
   return -ENOTCONN;
 }
 if (!*timeout)
  return -EAGAIN;
 if (signal_pending(current))
  return sock_intr_errno(*timeout);

 return 0;
}

#define tipc_wait_for_cond(sock_, timeo_, condition_)          \
({                                                                             \
 DEFINE_WAIT_FUNC(wait_, woken_wake_function);                          \
 struct sock *sk_;             \
 int rc_;              \
                \
 while ((rc_ = !(condition_))) {            \
  /* coupled with smp_wmb() in tipc_sk_proto_rcv() */            \
  smp_rmb();                                                     \
  sk_ = (sock_)->sk;            \
  rc_ = tipc_sk_sock_err((sock_), timeo_);         \
  if (rc_)             \
   break;             \
  add_wait_queue(sk_sleep(sk_), &wait_);                         \
  release_sock(sk_);            \
  *(timeo_) = wait_woken(&wait_, TASK_INTERRUPTIBLE, *(timeo_)); \
  sched_annotate_sleep();                   \
  lock_sock(sk_);             \
  remove_wait_queue(sk_sleep(sk_), &wait_);         \
 }               \
 rc_;               \
})

/**
 * tipc_sk_create - create a TIPC socket
 * @net: network namespace (must be default network)
 * @sock: pre-allocated socket structure
 * @protocol: protocol indicator (must be 0)
 * @kern: caused by kernel or by userspace?
 *
 * This routine creates additional data structures used by the TIPC socket,
 * initializes them, and links them together.
 *
 * Return: 0 on success, errno otherwise
 */

static int tipc_sk_create(struct net *net, struct socket *sock,
     int protocol, int kern)
{
 const struct proto_ops *ops;
 struct sock *sk;
 struct tipc_sock *tsk;
 struct tipc_msg *msg;

 /* Validate arguments */
 if (unlikely(protocol != 0))
  return -EPROTONOSUPPORT;

 switch (sock->type) {
 case SOCK_STREAM:
  ops = &stream_ops;
  break;
 case SOCK_SEQPACKET:
  ops = &packet_ops;
  break;
 case SOCK_DGRAM:
 case SOCK_RDM:
  ops = &msg_ops;
  break;
 default:
  return -EPROTOTYPE;
 }

 /* Allocate socket's protocol area */
 sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto, kern);
 if (sk == NULL)
  return -ENOMEM;

 tsk = tipc_sk(sk);
 tsk->max_pkt = MAX_PKT_DEFAULT;
 tsk->maxnagle = 0;
 tsk->nagle_start = NAGLE_START_INIT;
 INIT_LIST_HEAD(&tsk->publications);
 INIT_LIST_HEAD(&tsk->cong_links);
 msg = &tsk->phdr;

 /* Finish initializing socket data structures */
 sock->ops = ops;
 sock_init_data(sock, sk);
 tipc_set_sk_state(sk, TIPC_OPEN);
 if (tipc_sk_insert(tsk)) {
  sk_free(sk);
  pr_warn("Socket create failed; port number exhausted\n");
  return -EINVAL;
 }

 /* Ensure tsk is visible before we read own_addr. */
 smp_mb();

 tipc_msg_init(tipc_own_addr(net), msg, TIPC_LOW_IMPORTANCE,
        TIPC_NAMED_MSG, NAMED_H_SIZE, 0);

 msg_set_origport(msg, tsk->portid);
 timer_setup(&sk->sk_timer, tipc_sk_timeout, 0);
 sk->sk_shutdown = 0;
 sk->sk_backlog_rcv = tipc_sk_backlog_rcv;
 sk->sk_rcvbuf = READ_ONCE(sysctl_tipc_rmem[1]);
 sk->sk_data_ready = tipc_data_ready;
 sk->sk_write_space = tipc_write_space;
 sk->sk_destruct = tipc_sock_destruct;
 tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
 tsk->group_is_open = true;
 atomic_set(&tsk->dupl_rcvcnt, 0);

 /* Start out with safe limits until we receive an advertised window */
 tsk->snd_win = tsk_adv_blocks(RCVBUF_MIN);
 tsk->rcv_win = tsk->snd_win;

 if (tipc_sk_type_connectionless(sk)) {
  tsk_set_unreturnable(tsk, true);
  if (sock->type == SOCK_DGRAM)
   tsk_set_unreliable(tsk, true);
 }
 __skb_queue_head_init(&tsk->mc_method.deferredq);
 trace_tipc_sk_create(sk, NULL, TIPC_DUMP_NONE, " ");
 return 0;
}

static void tipc_sk_callback(struct rcu_head *head)
{
 struct tipc_sock *tsk = container_of(head, struct tipc_sock, rcu);

 sock_put(&tsk->sk);
}

/* Caller should hold socket lock for the socket. */
static void __tipc_shutdown(struct socket *sock, int error)
{
 struct sock *sk = sock->sk;
 struct tipc_sock *tsk = tipc_sk(sk);
 struct net *net = sock_net(sk);
 long timeout = msecs_to_jiffies(CONN_TIMEOUT_DEFAULT);
 u32 dnode = tsk_peer_node(tsk);
 struct sk_buff *skb;

 /* Avoid that hi-prio shutdown msgs bypass msgs in link wakeup queue */
 tipc_wait_for_cond(sock, &timeout, (!tsk->cong_link_cnt &&
         !tsk_conn_cong(tsk)));

 /* Push out delayed messages if in Nagle mode */
 tipc_sk_push_backlog(tsk, false);
 /* Remove pending SYN */
 __skb_queue_purge(&sk->sk_write_queue);

 /* Remove partially received buffer if any */
 skb = skb_peek(&sk->sk_receive_queue);
 if (skb && TIPC_SKB_CB(skb)->bytes_read) {
  __skb_unlink(skb, &sk->sk_receive_queue);
  kfree_skb(skb);
 }

 /* Reject all unreceived messages if connectionless */
 if (tipc_sk_type_connectionless(sk)) {
  tsk_rej_rx_queue(sk, error);
  return;
 }

 switch (sk->sk_state) {
 case TIPC_CONNECTING:
 case TIPC_ESTABLISHED:
  tipc_set_sk_state(sk, TIPC_DISCONNECTING);
  tipc_node_remove_conn(net, dnode, tsk->portid);
  /* Send a FIN+/- to its peer */
  skb = __skb_dequeue(&sk->sk_receive_queue);
  if (skb) {
   __skb_queue_purge(&sk->sk_receive_queue);
   tipc_sk_respond(sk, skb, error);
   break;
  }
  skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
          TIPC_CONN_MSG, SHORT_H_SIZE, 0, dnode,
          tsk_own_node(tsk), tsk_peer_port(tsk),
          tsk->portid, error);
  if (skb)
   tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
  break;
 case TIPC_LISTEN:
  /* Reject all SYN messages */
  tsk_rej_rx_queue(sk, error);
  break;
 default:
  __skb_queue_purge(&sk->sk_receive_queue);
  break;
 }
}

/**
 * tipc_release - destroy a TIPC socket
 * @sock: socket to destroy
 *
 * This routine cleans up any messages that are still queued on the socket.
 * For DGRAM and RDM socket types, all queued messages are rejected.
 * For SEQPACKET and STREAM socket types, the first message is rejected
 * and any others are discarded.  (If the first message on a STREAM socket
 * is partially-read, it is discarded and the next one is rejected instead.)
 *
 * NOTE: Rejected messages are not necessarily returned to the sender!  They
 * are returned or discarded according to the "destination droppable" setting
 * specified for the message by the sender.
 *
 * Return: 0 on success, errno otherwise
 */

static int tipc_release(struct socket *sock)
{
 struct sock *sk = sock->sk;
 struct tipc_sock *tsk;

 /*
 * Exit if socket isn't fully initialized (occurs when a failed accept()
 * releases a pre-allocated child socket that was never used)
 */

 if (sk == NULL)
  return 0;

 tsk = tipc_sk(sk);
 lock_sock(sk);

 trace_tipc_sk_release(sk, NULL, TIPC_DUMP_ALL, " ");
 __tipc_shutdown(sock, TIPC_ERR_NO_PORT);
 sk->sk_shutdown = SHUTDOWN_MASK;
 tipc_sk_leave(tsk);
 tipc_sk_withdraw(tsk, NULL);
 __skb_queue_purge(&tsk->mc_method.deferredq);
 sk_stop_timer(sk, &sk->sk_timer);
 tipc_sk_remove(tsk);

 sock_orphan(sk);
 /* Reject any messages that accumulated in backlog queue */
 release_sock(sk);
 tipc_dest_list_purge(&tsk->cong_links);
 tsk->cong_link_cnt = 0;
 call_rcu(&tsk->rcu, tipc_sk_callback);
 sock->sk = NULL;

 return 0;
}

/**
 * __tipc_bind - associate or disassociate TIPC name(s) with a socket
 * @sock: socket structure
 * @skaddr: socket address describing name(s) and desired operation
 * @alen: size of socket address data structure
 *
 * Name and name sequence binding are indicated using a positive scope value;
 * a negative scope value unbinds the specified name.  Specifying no name
 * (i.e. a socket address length of 0) unbinds all names from the socket.
 *
 * Return: 0 on success, errno otherwise
 *
 * NOTE: This routine doesn't need to take the socket lock since it doesn't
 *       access any non-constant socket information.
 */

static int __tipc_bind(struct socket *sock, struct sockaddr *skaddr, int alen)
{
 struct tipc_uaddr *ua = (struct tipc_uaddr *)skaddr;
 struct tipc_sock *tsk = tipc_sk(sock->sk);
 bool unbind = false;

 if (unlikely(!alen))
  return tipc_sk_withdraw(tsk, NULL);

 if (ua->addrtype == TIPC_SERVICE_ADDR) {
  ua->addrtype = TIPC_SERVICE_RANGE;
  ua->sr.upper = ua->sr.lower;
 }
 if (ua->scope < 0) {
  unbind = true;
  ua->scope = -ua->scope;
 }
 /* Users may still use deprecated TIPC_ZONE_SCOPE */
 if (ua->scope != TIPC_NODE_SCOPE)
  ua->scope = TIPC_CLUSTER_SCOPE;

 if (tsk->group)
  return -EACCES;

 if (unbind)
  return tipc_sk_withdraw(tsk, ua);
 return tipc_sk_publish(tsk, ua);
}

int tipc_sk_bind(struct socket *sock, struct sockaddr *skaddr, int alen)
{
 int res;

 lock_sock(sock->sk);
 res = __tipc_bind(sock, skaddr, alen);
 release_sock(sock->sk);
 return res;
}

static int tipc_bind(struct socket *sock, struct sockaddr *skaddr, int alen)
{
 struct tipc_uaddr *ua = (struct tipc_uaddr *)skaddr;
 u32 atype = ua->addrtype;

 if (alen) {
  if (!tipc_uaddr_valid(ua, alen))
   return -EINVAL;
  if (atype == TIPC_SOCKET_ADDR)
   return -EAFNOSUPPORT;
  if (ua->sr.type < TIPC_RESERVED_TYPES) {
   pr_warn_once("Can't bind to reserved service type %u\n",
         ua->sr.type);
   return -EACCES;
  }
 }
 return tipc_sk_bind(sock, skaddr, alen);
}

/**
 * tipc_getname - get port ID of socket or peer socket
 * @sock: socket structure
 * @uaddr: area for returned socket address
 * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
 *
 * Return: 0 on success, errno otherwise
 *
 * NOTE: This routine doesn't need to take the socket lock since it only
 *       accesses socket information that is unchanging (or which changes in
 *       a completely predictable manner).
 */

static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,
   int peer)
{
 struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
 struct sock *sk = sock->sk;
 struct tipc_sock *tsk = tipc_sk(sk);

 memset(addr, 0, sizeof(*addr));
 if (peer) {
  if ((!tipc_sk_connected(sk)) &&
      ((peer != 2) || (sk->sk_state != TIPC_DISCONNECTING)))
   return -ENOTCONN;
  addr->addr.id.ref = tsk_peer_port(tsk);
  addr->addr.id.node = tsk_peer_node(tsk);
 } else {
  addr->addr.id.ref = tsk->portid;
  addr->addr.id.node = tipc_own_addr(sock_net(sk));
 }

 addr->addrtype = TIPC_SOCKET_ADDR;
 addr->family = AF_TIPC;
 addr->scope = 0;
 addr->addr.name.domain = 0;

 return sizeof(*addr);
}

/**
 * tipc_poll - read and possibly block on pollmask
 * @file: file structure associated with the socket
 * @sock: socket for which to calculate the poll bits
 * @wait: ???
 *
 * Return: pollmask value
 *
 * COMMENTARY:
 * It appears that the usual socket locking mechanisms are not useful here
 * since the pollmask info is potentially out-of-date the moment this routine
 * exits.  TCP and other protocols seem to rely on higher level poll routines
 * to handle any preventable race conditions, so TIPC will do the same ...
 *
 * IMPORTANT: The fact that a read or write operation is indicated does NOT
 * imply that the operation will succeed, merely that it should be performed
 * and will not block.
 */

static __poll_t tipc_poll(struct file *file, struct socket *sock,
         poll_table *wait)
{
 struct sock *sk = sock->sk;
 struct tipc_sock *tsk = tipc_sk(sk);
 __poll_t revents = 0;

 sock_poll_wait(file, sock, wait);
 trace_tipc_sk_poll(sk, NULL, TIPC_DUMP_ALL, " ");

 if (sk->sk_shutdown & RCV_SHUTDOWN)
  revents |= EPOLLRDHUP | EPOLLIN | EPOLLRDNORM;
 if (sk->sk_shutdown == SHUTDOWN_MASK)
  revents |= EPOLLHUP;

 switch (sk->sk_state) {
 case TIPC_ESTABLISHED:
  if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk))
   revents |= EPOLLOUT;
  fallthrough;
 case TIPC_LISTEN:
 case TIPC_CONNECTING:
  if (!skb_queue_empty_lockless(&sk->sk_receive_queue))
   revents |= EPOLLIN | EPOLLRDNORM;
  break;
 case TIPC_OPEN:
  if (tsk->group_is_open && !tsk->cong_link_cnt)
   revents |= EPOLLOUT;
  if (!tipc_sk_type_connectionless(sk))
   break;
  if (skb_queue_empty_lockless(&sk->sk_receive_queue))
   break;
  revents |= EPOLLIN | EPOLLRDNORM;
  break;
 case TIPC_DISCONNECTING:
  revents = EPOLLIN | EPOLLRDNORM | EPOLLHUP;
  break;
 }
 return revents;
}

/**
 * tipc_sendmcast - send multicast message
 * @sock: socket structure
 * @ua: destination address struct
 * @msg: message to send
 * @dlen: length of data to send
 * @timeout: timeout to wait for wakeup
 *
 * Called from function tipc_sendmsg(), which has done all sanity checks
 * Return: the number of bytes sent on success, or errno
 */

static int tipc_sendmcast(struct  socket *sock, struct tipc_uaddr *ua,
     struct msghdr *msg, size_t dlen, long timeout)
{
 struct sock *sk = sock->sk;
 struct tipc_sock *tsk = tipc_sk(sk);
 struct tipc_msg *hdr = &tsk->phdr;
 struct net *net = sock_net(sk);
 int mtu = tipc_bcast_get_mtu(net);
 struct sk_buff_head pkts;
 struct tipc_nlist dsts;
 int rc;

 if (tsk->group)
  return -EACCES;

 /* Block or return if any destination link is congested */
 rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt);
 if (unlikely(rc))
  return rc;

 /* Lookup destination nodes */
 tipc_nlist_init(&dsts, tipc_own_addr(net));
 tipc_nametbl_lookup_mcast_nodes(net, ua, &dsts);
 if (!dsts.local && !dsts.remote)
  return -EHOSTUNREACH;

 /* Build message header */
 msg_set_type(hdr, TIPC_MCAST_MSG);
 msg_set_hdr_sz(hdr, MCAST_H_SIZE);
 msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
 msg_set_destport(hdr, 0);
 msg_set_destnode(hdr, 0);
 msg_set_nametype(hdr, ua->sr.type);
 msg_set_namelower(hdr, ua->sr.lower);
 msg_set_nameupper(hdr, ua->sr.upper);

 /* Build message as chain of buffers */
 __skb_queue_head_init(&pkts);
 rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts);

 /* Send message if build was successful */
 if (unlikely(rc == dlen)) {
  trace_tipc_sk_sendmcast(sk, skb_peek(&pkts),
     TIPC_DUMP_SK_SNDQ, " ");
  rc = tipc_mcast_xmit(net, &pkts, &tsk->mc_method, &dsts,
         &tsk->cong_link_cnt);
 }

 tipc_nlist_purge(&dsts);

 return rc ? rc : dlen;
}

/**
 * tipc_send_group_msg - send a message to a member in the group
 * @net: network namespace
 * @tsk: tipc socket
 * @m: message to send
 * @mb: group member
 * @dnode: destination node
 * @dport: destination port
 * @dlen: total length of message data
 */

static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk,
          struct msghdr *m, struct tipc_member *mb,
          u32 dnode, u32 dport, int dlen)
{
 u16 bc_snd_nxt = tipc_group_bc_snd_nxt(tsk->group);
 struct tipc_mc_method *method = &tsk->mc_method;
 int blks = tsk_blocks(GROUP_H_SIZE + dlen);
 struct tipc_msg *hdr = &tsk->phdr;
 struct sk_buff_head pkts;
 int mtu, rc;

 /* Complete message header */
 msg_set_type(hdr, TIPC_GRP_UCAST_MSG);
 msg_set_hdr_sz(hdr, GROUP_H_SIZE);
 msg_set_destport(hdr, dport);
 msg_set_destnode(hdr, dnode);
 msg_set_grp_bc_seqno(hdr, bc_snd_nxt);

 /* Build message as chain of buffers */
 __skb_queue_head_init(&pkts);
 mtu = tipc_node_get_mtu(net, dnode, tsk->portid, false);
 rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
 if (unlikely(rc != dlen))
  return rc;

 /* Send message */
 rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
 if (unlikely(rc == -ELINKCONG)) {
  tipc_dest_push(&tsk->cong_links, dnode, 0);
  tsk->cong_link_cnt++;
 }

 /* Update send window */
 tipc_group_update_member(mb, blks);

 /* A broadcast sent within next EXPIRE period must follow same path */
 method->rcast = true;
 method->mandatory = true;
 return dlen;
}

/**
 * tipc_send_group_unicast - send message to a member in the group
 * @sock: socket structure
 * @m: message to send
 * @dlen: total length of message data
 * @timeout: timeout to wait for wakeup
 *
 * Called from function tipc_sendmsg(), which has done all sanity checks
 * Return: the number of bytes sent on success, or errno
 */

static int tipc_send_group_unicast(struct socket *sock, struct msghdr *m,
       int dlen, long timeout)
{
 struct sock *sk = sock->sk;
 struct tipc_uaddr *ua = (struct tipc_uaddr *)m->msg_name;
 int blks = tsk_blocks(GROUP_H_SIZE + dlen);
 struct tipc_sock *tsk = tipc_sk(sk);
 struct net *net = sock_net(sk);
 struct tipc_member *mb = NULL;
 u32 node, port;
 int rc;

 node = ua->sk.node;
 port = ua->sk.ref;
 if (!port && !node)
  return -EHOSTUNREACH;

 /* Block or return if destination link or member is congested */
 rc = tipc_wait_for_cond(sock, &timeout,
    !tipc_dest_find(&tsk->cong_links, node, 0) &&
    tsk->group &&
    !tipc_group_cong(tsk->group, node, port, blks,
       &mb));
 if (unlikely(rc))
  return rc;

 if (unlikely(!mb))
  return -EHOSTUNREACH;

 rc = tipc_send_group_msg(net, tsk, m, mb, node, port, dlen);

 return rc ? rc : dlen;
}

/**
 * tipc_send_group_anycast - send message to any member with given identity
 * @sock: socket structure
 * @m: message to send
 * @dlen: total length of message data
 * @timeout: timeout to wait for wakeup
 *
 * Called from function tipc_sendmsg(), which has done all sanity checks
 * Return: the number of bytes sent on success, or errno
 */

static int tipc_send_group_anycast(struct socket *sock, struct msghdr *m,
       int dlen, long timeout)
{
 struct tipc_uaddr *ua = (struct tipc_uaddr *)m->msg_name;
 struct sock *sk = sock->sk;
 struct tipc_sock *tsk = tipc_sk(sk);
 struct list_head *cong_links = &tsk->cong_links;
 int blks = tsk_blocks(GROUP_H_SIZE + dlen);
 struct tipc_msg *hdr = &tsk->phdr;
 struct tipc_member *first = NULL;
 struct tipc_member *mbr = NULL;
 struct net *net = sock_net(sk);
 u32 node, port, exclude;
 LIST_HEAD(dsts);
 int lookups = 0;
 int dstcnt, rc;
 bool cong;

 ua->sa.type = msg_nametype(hdr);
 ua->scope = msg_lookup_scope(hdr);

 while (++lookups < 4) {
  exclude = tipc_group_exclude(tsk->group);

  first = NULL;

  /* Look for a non-congested destination member, if any */
  while (1) {
   if (!tipc_nametbl_lookup_group(net, ua, &dsts, &dstcnt,
             exclude, false))
    return -EHOSTUNREACH;
   tipc_dest_pop(&dsts, &node, &port);
   cong = tipc_group_cong(tsk->group, node, port, blks,
            &mbr);
   if (!cong)
    break;
   if (mbr == first)
    break;
   if (!first)
    first = mbr;
  }

  /* Start over if destination was not in member list */
  if (unlikely(!mbr))
   continue;

  if (likely(!cong && !tipc_dest_find(cong_links, node, 0)))
   break;

  /* Block or return if destination link or member is congested */
  rc = tipc_wait_for_cond(sock, &timeout,
     !tipc_dest_find(cong_links, node, 0) &&
     tsk->group &&
     !tipc_group_cong(tsk->group, node, port,
        blks, &mbr));
  if (unlikely(rc))
   return rc;

  /* Send, unless destination disappeared while waiting */
  if (likely(mbr))
   break;
 }

 if (unlikely(lookups >= 4))
  return -EHOSTUNREACH;

 rc = tipc_send_group_msg(net, tsk, m, mbr, node, port, dlen);

 return rc ? rc : dlen;
}

/**
 * tipc_send_group_bcast - send message to all members in communication group
 * @sock: socket structure
 * @m: message to send
 * @dlen: total length of message data
 * @timeout: timeout to wait for wakeup
 *
 * Called from function tipc_sendmsg(), which has done all sanity checks
 * Return: the number of bytes sent on success, or errno
 */

static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
     int dlen, long timeout)
{
 struct tipc_uaddr *ua = (struct tipc_uaddr *)m->msg_name;
 struct sock *sk = sock->sk;
 struct net *net = sock_net(sk);
 struct tipc_sock *tsk = tipc_sk(sk);
 struct tipc_nlist *dsts;
 struct tipc_mc_method *method = &tsk->mc_method;
 bool ack = method->mandatory && method->rcast;
 int blks = tsk_blocks(MCAST_H_SIZE + dlen);
 struct tipc_msg *hdr = &tsk->phdr;
 int mtu = tipc_bcast_get_mtu(net);
 struct sk_buff_head pkts;
 int rc = -EHOSTUNREACH;

 /* Block or return if any destination link or member is congested */
 rc = tipc_wait_for_cond(sock, &timeout,
    !tsk->cong_link_cnt && tsk->group &&
    !tipc_group_bc_cong(tsk->group, blks));
 if (unlikely(rc))
  return rc;

 dsts = tipc_group_dests(tsk->group);
 if (!dsts->local && !dsts->remote)
  return -EHOSTUNREACH;

 /* Complete message header */
 if (ua) {
  msg_set_type(hdr, TIPC_GRP_MCAST_MSG);
  msg_set_nameinst(hdr, ua->sa.instance);
 } else {
  msg_set_type(hdr, TIPC_GRP_BCAST_MSG);
  msg_set_nameinst(hdr, 0);
 }
 msg_set_hdr_sz(hdr, GROUP_H_SIZE);
 msg_set_destport(hdr, 0);
 msg_set_destnode(hdr, 0);
 msg_set_grp_bc_seqno(hdr, tipc_group_bc_snd_nxt(tsk->group));

 /* Avoid getting stuck with repeated forced replicasts */
 msg_set_grp_bc_ack_req(hdr, ack);

 /* Build message as chain of buffers */
 __skb_queue_head_init(&pkts);
 rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
 if (unlikely(rc != dlen))
  return rc;

 /* Send message */
 rc = tipc_mcast_xmit(net, &pkts, method, dsts, &tsk->cong_link_cnt);
 if (unlikely(rc))
  return rc;

 /* Update broadcast sequence number and send windows */
 tipc_group_update_bc_members(tsk->group, blks, ack);

 /* Broadcast link is now free to choose method for next broadcast */
 method->mandatory = false;
 method->expires = jiffies;

 return dlen;
}

/**
 * tipc_send_group_mcast - send message to all members with given identity
 * @sock: socket structure
 * @m: message to send
 * @dlen: total length of message data
 * @timeout: timeout to wait for wakeup
 *
 * Called from function tipc_sendmsg(), which has done all sanity checks
 * Return: the number of bytes sent on success, or errno
 */

static int tipc_send_group_mcast(struct socket *sock, struct msghdr *m,
     int dlen, long timeout)
{
 struct tipc_uaddr *ua = (struct tipc_uaddr *)m->msg_name;
 struct sock *sk = sock->sk;
 struct tipc_sock *tsk = tipc_sk(sk);
 struct tipc_group *grp = tsk->group;
 struct tipc_msg *hdr = &tsk->phdr;
 struct net *net = sock_net(sk);
 u32 dstcnt, exclude;
 LIST_HEAD(dsts);

 ua->sa.type = msg_nametype(hdr);
 ua->scope = msg_lookup_scope(hdr);
 exclude = tipc_group_exclude(grp);

 if (!tipc_nametbl_lookup_group(net, ua, &dsts, &dstcnt, exclude, true))
  return -EHOSTUNREACH;

 if (dstcnt == 1) {
  tipc_dest_pop(&dsts, &ua->sk.node, &ua->sk.ref);
  return tipc_send_group_unicast(sock, m, dlen, timeout);
 }

 tipc_dest_list_purge(&dsts);
 return tipc_send_group_bcast(sock, m, dlen, timeout);
}

/**
 * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
 * @net: the associated network namespace
 * @arrvq: queue with arriving messages, to be cloned after destination lookup
 * @inputq: queue with cloned messages, delivered to socket after dest lookup
 *
 * Multi-threaded: parallel calls with reference to same queues may occur
 */

void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
         struct sk_buff_head *inputq)
{
 u32 self = tipc_own_addr(net);
 struct sk_buff *skb, *_skb;
 u32 portid, onode;
 struct sk_buff_head tmpq;
 struct list_head dports;
 struct tipc_msg *hdr;
 struct tipc_uaddr ua;
 int user, mtyp, hlen;

 __skb_queue_head_init(&tmpq);
 INIT_LIST_HEAD(&dports);
 ua.addrtype = TIPC_SERVICE_RANGE;

 /* tipc_skb_peek() increments the head skb's reference counter */
 skb = tipc_skb_peek(arrvq, &inputq->lock);
 for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
  hdr = buf_msg(skb);
  user = msg_user(hdr);
  mtyp = msg_type(hdr);
  hlen = skb_headroom(skb) + msg_hdr_sz(hdr);
  onode = msg_orignode(hdr);
  ua.sr.type = msg_nametype(hdr);
  ua.sr.lower = msg_namelower(hdr);
  ua.sr.upper = msg_nameupper(hdr);
  if (onode == self)
   ua.scope = TIPC_ANY_SCOPE;
  else
   ua.scope = TIPC_CLUSTER_SCOPE;

  if (mtyp == TIPC_GRP_UCAST_MSG || user == GROUP_PROTOCOL) {
   spin_lock_bh(&inputq->lock);
   if (skb_peek(arrvq) == skb) {
    __skb_dequeue(arrvq);
    __skb_queue_tail(inputq, skb);
   }
   kfree_skb(skb);
   spin_unlock_bh(&inputq->lock);
   continue;
  }

  /* Group messages require exact scope match */
  if (msg_in_group(hdr)) {
   ua.sr.lower = 0;
   ua.sr.upper = ~0;
   ua.scope = msg_lookup_scope(hdr);
  }

  /* Create destination port list: */
  tipc_nametbl_lookup_mcast_sockets(net, &ua, &dports);

  /* Clone message per destination */
  while (tipc_dest_pop(&dports, NULL, &portid)) {
   _skb = __pskb_copy(skb, hlen, GFP_ATOMIC);
   if (_skb) {
    msg_set_destport(buf_msg(_skb), portid);
    __skb_queue_tail(&tmpq, _skb);
    continue;
   }
   pr_warn("Failed to clone mcast rcv buffer\n");
  }
  /* Append clones to inputq only if skb is still head of arrvq */
  spin_lock_bh(&inputq->lock);
  if (skb_peek(arrvq) == skb) {
   skb_queue_splice_tail_init(&tmpq, inputq);
   /* Decrement the skb's refcnt */
   kfree_skb(__skb_dequeue(arrvq));
  }
  spin_unlock_bh(&inputq->lock);
  __skb_queue_purge(&tmpq);
  kfree_skb(skb);
 }
 tipc_sk_rcv(net, inputq);
}

/* tipc_sk_push_backlog(): send accumulated buffers in socket write queue
 *                         when socket is in Nagle mode
 */

static void tipc_sk_push_backlog(struct tipc_sock *tsk, bool nagle_ack)
{
 struct sk_buff_head *txq = &tsk->sk.sk_write_queue;
 struct sk_buff *skb = skb_peek_tail(txq);
 struct net *net = sock_net(&tsk->sk);
 u32 dnode = tsk_peer_node(tsk);
 int rc;

 if (nagle_ack) {
  tsk->pkt_cnt += skb_queue_len(txq);
  if (!tsk->pkt_cnt || tsk->msg_acc / tsk->pkt_cnt < 2) {
   tsk->oneway = 0;
   if (tsk->nagle_start < NAGLE_START_MAX)
    tsk->nagle_start *= 2;
   tsk->expect_ack = false;
   pr_debug("tsk %10u: bad nagle %u -> %u, next start %u!\n",
     tsk->portid, tsk->msg_acc, tsk->pkt_cnt,
     tsk->nagle_start);
  } else {
   tsk->nagle_start = NAGLE_START_INIT;
   if (skb) {
    msg_set_ack_required(buf_msg(skb));
    tsk->expect_ack = true;
   } else {
    tsk->expect_ack = false;
   }
  }
  tsk->msg_acc = 0;
  tsk->pkt_cnt = 0;
 }

 if (!skb || tsk->cong_link_cnt)
  return;

 /* Do not send SYN again after congestion */
 if (msg_is_syn(buf_msg(skb)))
  return;

 if (tsk->msg_acc)
  tsk->pkt_cnt += skb_queue_len(txq);
 tsk->snt_unacked += tsk->snd_backlog;
 tsk->snd_backlog = 0;
 rc = tipc_node_xmit(net, txq, dnode, tsk->portid);
 if (rc == -ELINKCONG)
  tsk->cong_link_cnt = 1;
}

/**
 * tipc_sk_conn_proto_rcv - receive a connection mng protocol message
 * @tsk: receiving socket
 * @skb: pointer to message buffer.
 * @inputq: buffer list containing the buffers
 * @xmitq: output message area
 */

static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
       struct sk_buff_head *inputq,
       struct sk_buff_head *xmitq)
{
 struct tipc_msg *hdr = buf_msg(skb);
 u32 onode = tsk_own_node(tsk);
 struct sock *sk = &tsk->sk;
 int mtyp = msg_type(hdr);
 bool was_cong;

 /* Ignore if connection cannot be validated: */
 if (!tsk_peer_msg(tsk, hdr)) {
  trace_tipc_sk_drop_msg(sk, skb, TIPC_DUMP_NONE, "@proto_rcv!");
  goto exit;
 }

 if (unlikely(msg_errcode(hdr))) {
  tipc_set_sk_state(sk, TIPC_DISCONNECTING);
  tipc_node_remove_conn(sock_net(sk), tsk_peer_node(tsk),
          tsk_peer_port(tsk));
  sk->sk_state_change(sk);

  /* State change is ignored if socket already awake,
 * - convert msg to abort msg and add to inqueue
 */

  msg_set_user(hdr, TIPC_CRITICAL_IMPORTANCE);
  msg_set_type(hdr, TIPC_CONN_MSG);
  msg_set_size(hdr, BASIC_H_SIZE);
  msg_set_hdr_sz(hdr, BASIC_H_SIZE);
  __skb_queue_tail(inputq, skb);
  return;
 }

 tsk->probe_unacked = false;

 if (mtyp == CONN_PROBE) {
  msg_set_type(hdr, CONN_PROBE_REPLY);
  if (tipc_msg_reverse(onode, &skb, TIPC_OK))
   __skb_queue_tail(xmitq, skb);
  return;
 } else if (mtyp == CONN_ACK) {
  was_cong = tsk_conn_cong(tsk);
  tipc_sk_push_backlog(tsk, msg_nagle_ack(hdr));
  tsk->snt_unacked -= msg_conn_ack(hdr);
  if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
   tsk->snd_win = msg_adv_win(hdr);
  if (was_cong && !tsk_conn_cong(tsk))
   sk->sk_write_space(sk);
 } else if (mtyp != CONN_PROBE_REPLY) {
  pr_warn("Received unknown CONN_PROTO msg\n");
 }
exit:
 kfree_skb(skb);
}

/**
 * tipc_sendmsg - send message in connectionless manner
 * @sock: socket structure
 * @m: message to send
 * @dsz: amount of user data to be sent
 *
 * Message must have an destination specified explicitly.
 * Used for SOCK_RDM and SOCK_DGRAM messages,
 * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
 * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
 *
 * Return: the number of bytes sent on success, or errno otherwise
 */

static int tipc_sendmsg(struct socket *sock,
   struct msghdr *m, size_t dsz)
{
 struct sock *sk = sock->sk;
 int ret;

 lock_sock(sk);
 ret = __tipc_sendmsg(sock, m, dsz);
 release_sock(sk);

 return ret;
}

static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
{
 struct sock *sk = sock->sk;
 struct net *net = sock_net(sk);
 struct tipc_sock *tsk = tipc_sk(sk);
 struct tipc_uaddr *ua = (struct tipc_uaddr *)m->msg_name;
 long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
 struct list_head *clinks = &tsk->cong_links;
 bool syn = !tipc_sk_type_connectionless(sk);
 struct tipc_group *grp = tsk->group;
 struct tipc_msg *hdr = &tsk->phdr;
 struct tipc_socket_addr skaddr;
 struct sk_buff_head pkts;
 int atype, mtu, rc;

 if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
  return -EMSGSIZE;

 if (ua) {
  if (!tipc_uaddr_valid(ua, m->msg_namelen))
   return -EINVAL;
  atype = ua->addrtype;
 }

 /* If socket belongs to a communication group follow other paths */
 if (grp) {
  if (!ua)
   return tipc_send_group_bcast(sock, m, dlen, timeout);
  if (atype == TIPC_SERVICE_ADDR)
   return tipc_send_group_anycast(sock, m, dlen, timeout);
  if (atype == TIPC_SOCKET_ADDR)
   return tipc_send_group_unicast(sock, m, dlen, timeout);
  if (atype == TIPC_SERVICE_RANGE)
   return tipc_send_group_mcast(sock, m, dlen, timeout);
  return -EINVAL;
 }

 if (!ua) {
  ua = (struct tipc_uaddr *)&tsk->peer;
  if (!syn && ua->family != AF_TIPC)
   return -EDESTADDRREQ;
  atype = ua->addrtype;
 }

 if (unlikely(syn)) {
  if (sk->sk_state == TIPC_LISTEN)
   return -EPIPE;
  if (sk->sk_state != TIPC_OPEN)
   return -EISCONN;
  if (tsk->published)
   return -EOPNOTSUPP;
  if (atype == TIPC_SERVICE_ADDR)
   tsk->conn_addrtype = atype;
  msg_set_syn(hdr, 1);
 }

 memset(&skaddr, 0, sizeof(skaddr));

 /* Determine destination */
 if (atype == TIPC_SERVICE_RANGE) {
  return tipc_sendmcast(sock, ua, m, dlen, timeout);
 } else if (atype == TIPC_SERVICE_ADDR) {
  skaddr.node = ua->lookup_node;
  ua->scope = tipc_node2scope(skaddr.node);
  if (!tipc_nametbl_lookup_anycast(net, ua, &skaddr))
   return -EHOSTUNREACH;
 } else if (atype == TIPC_SOCKET_ADDR) {
  skaddr = ua->sk;
 } else {
  return -EINVAL;
 }

 /* Block or return if destination link is congested */
 rc = tipc_wait_for_cond(sock, &timeout,
    !tipc_dest_find(clinks, skaddr.node, 0));
 if (unlikely(rc))
  return rc;

 /* Finally build message header */
 msg_set_destnode(hdr, skaddr.node);
 msg_set_destport(hdr, skaddr.ref);
 if (atype == TIPC_SERVICE_ADDR) {
  msg_set_type(hdr, TIPC_NAMED_MSG);
  msg_set_hdr_sz(hdr, NAMED_H_SIZE);
  msg_set_nametype(hdr, ua->sa.type);
  msg_set_nameinst(hdr, ua->sa.instance);
  msg_set_lookup_scope(hdr, ua->scope);
 } else { /* TIPC_SOCKET_ADDR */
  msg_set_type(hdr, TIPC_DIRECT_MSG);
  msg_set_lookup_scope(hdr, 0);
  msg_set_hdr_sz(hdr, BASIC_H_SIZE);
 }

 /* Add message body */
 __skb_queue_head_init(&pkts);
 mtu = tipc_node_get_mtu(net, skaddr.node, tsk->portid, true);
 rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
 if (unlikely(rc != dlen))
  return rc;
 if (unlikely(syn && !tipc_msg_skb_clone(&pkts, &sk->sk_write_queue))) {
  __skb_queue_purge(&pkts);
  return -ENOMEM;
 }

 /* Send message */
 trace_tipc_sk_sendmsg(sk, skb_peek(&pkts), TIPC_DUMP_SK_SNDQ, " ");
 rc = tipc_node_xmit(net, &pkts, skaddr.node, tsk->portid);
 if (unlikely(rc == -ELINKCONG)) {
  tipc_dest_push(clinks, skaddr.node, 0);
  tsk->cong_link_cnt++;
  rc = 0;
 }

 if (unlikely(syn && !rc)) {
  tipc_set_sk_state(sk, TIPC_CONNECTING);
  if (dlen && timeout) {
   timeout = msecs_to_jiffies(timeout);
   tipc_wait_for_connect(sock, &timeout);
  }
 }

 return rc ? rc : dlen;
}

/**
 * tipc_sendstream - send stream-oriented data
 * @sock: socket structure
 * @m: data to send
 * @dsz: total length of data to be transmitted
 *
 * Used for SOCK_STREAM data.
 *
 * Return: the number of bytes sent on success (or partial success),
 * or errno if no data sent
 */

static int tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz)
{
 struct sock *sk = sock->sk;
 int ret;

 lock_sock(sk);
 ret = __tipc_sendstream(sock, m, dsz);
 release_sock(sk);

 return ret;
}

static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
{
 struct sock *sk = sock->sk;
 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
 long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
 struct sk_buff_head *txq = &sk->sk_write_queue;
 struct tipc_sock *tsk = tipc_sk(sk);
 struct tipc_msg *hdr = &tsk->phdr;
 struct net *net = sock_net(sk);
 struct sk_buff *skb;
 u32 dnode = tsk_peer_node(tsk);
 int maxnagle = tsk->maxnagle;
 int maxpkt = tsk->max_pkt;
 int send, sent = 0;
 int blocks, rc = 0;

 if (unlikely(dlen > INT_MAX))
  return -EMSGSIZE;

 /* Handle implicit connection setup */
 if (unlikely(dest && sk->sk_state == TIPC_OPEN)) {
  rc = __tipc_sendmsg(sock, m, dlen);
  if (dlen && dlen == rc) {
   tsk->peer_caps = tipc_node_get_capabilities(net, dnode);
   tsk->snt_unacked = tsk_inc(tsk, dlen + msg_hdr_sz(hdr));
  }
  return rc;
 }

 do {
  rc = tipc_wait_for_cond(sock, &timeout,
     (!tsk->cong_link_cnt &&
      !tsk_conn_cong(tsk) &&
      tipc_sk_connected(sk)));
  if (unlikely(rc))
   break;
  send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE);
  blocks = tsk->snd_backlog;
  if (tsk->oneway++ >= tsk->nagle_start && maxnagle &&
      send <= maxnagle) {
   rc = tipc_msg_append(hdr, m, send, maxnagle, txq);
   if (unlikely(rc < 0))
    break;
   blocks += rc;
   tsk->msg_acc++;
   if (blocks <= 64 && tsk->expect_ack) {
    tsk->snd_backlog = blocks;
    sent += send;
    break;
   } else if (blocks > 64) {
    tsk->pkt_cnt += skb_queue_len(txq);
   } else {
    skb = skb_peek_tail(txq);
    if (skb) {
     msg_set_ack_required(buf_msg(skb));
     tsk->expect_ack = true;
    } else {
     tsk->expect_ack = false;
    }
    tsk->msg_acc = 0;
    tsk->pkt_cnt = 0;
   }
  } else {
   rc = tipc_msg_build(hdr, m, sent, send, maxpkt, txq);
   if (unlikely(rc != send))
    break;
   blocks += tsk_inc(tsk, send + MIN_H_SIZE);
  }
  trace_tipc_sk_sendstream(sk, skb_peek(txq),
      TIPC_DUMP_SK_SNDQ, " ");
  rc = tipc_node_xmit(net, txq, dnode, tsk->portid);
  if (unlikely(rc == -ELINKCONG)) {
   tsk->cong_link_cnt = 1;
   rc = 0;
  }
  if (likely(!rc)) {
   tsk->snt_unacked += blocks;
   tsk->snd_backlog = 0;
   sent += send;
  }
 } while (sent < dlen && !rc);

 return sent ? sent : rc;
}

/**
 * tipc_send_packet - send a connection-oriented message
 * @sock: socket structure
 * @m: message to send
 * @dsz: length of data to be transmitted
 *
 * Used for SOCK_SEQPACKET messages.
 *
 * Return: the number of bytes sent on success, or errno otherwise
 */

static int tipc_send_packet(struct socket *sock, struct msghdr *m, size_t dsz)
{
 if (dsz > TIPC_MAX_USER_MSG_SIZE)
  return -EMSGSIZE;

 return tipc_sendstream(sock, m, dsz);
}

/* tipc_sk_finish_conn - complete the setup of a connection
 */

static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
    u32 peer_node)
{
 struct sock *sk = &tsk->sk;
 struct net *net = sock_net(sk);
 struct tipc_msg *msg = &tsk->phdr;

 msg_set_syn(msg, 0);
 msg_set_destnode(msg, peer_node);
 msg_set_destport(msg, peer_port);
 msg_set_type(msg, TIPC_CONN_MSG);
 msg_set_lookup_scope(msg, 0);
 msg_set_hdr_sz(msg, SHORT_H_SIZE);

 sk_reset_timer(sk, &sk->sk_timer, jiffies + CONN_PROBING_INTV);
 tipc_set_sk_state(sk, TIPC_ESTABLISHED);
 tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
 tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid, true);
 tsk->peer_caps = tipc_node_get_capabilities(net, peer_node);
 tsk_set_nagle(tsk);
 __skb_queue_purge(&sk->sk_write_queue);
 if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
  return;

 /* Fall back to message based flow control */
 tsk->rcv_win = FLOWCTL_MSG_WIN;
 tsk->snd_win = FLOWCTL_MSG_WIN;
}

/**
 * tipc_sk_set_orig_addr - capture sender's address for received message
 * @m: descriptor for message info
 * @skb: received message
 *
 * Note: Address is not captured if not requested by receiver.
 */

static void tipc_sk_set_orig_addr(struct msghdr *m, struct sk_buff *skb)
{
 DECLARE_SOCKADDR(struct sockaddr_pair *, srcaddr, m->msg_name);
 struct tipc_msg *hdr = buf_msg(skb);

 if (!srcaddr)
  return;

 srcaddr->sock.family = AF_TIPC;
 srcaddr->sock.addrtype = TIPC_SOCKET_ADDR;
 srcaddr->sock.scope = 0;
 srcaddr->sock.addr.id.ref = msg_origport(hdr);
 srcaddr->sock.addr.id.node = msg_orignode(hdr);
 srcaddr->sock.addr.name.domain = 0;
 m->msg_namelen = sizeof(struct sockaddr_tipc);

 if (!msg_in_group(hdr))
  return;

 /* Group message users may also want to know sending member's id */
 srcaddr->member.family = AF_TIPC;
 srcaddr->member.addrtype = TIPC_SERVICE_ADDR;
 srcaddr->member.scope = 0;
 srcaddr->member.addr.name.name.type = msg_nametype(hdr);
 srcaddr->member.addr.name.name.instance = TIPC_SKB_CB(skb)->orig_member;
 srcaddr->member.addr.name.domain = 0;
 m->msg_namelen = sizeof(*srcaddr);
}

/**
 * tipc_sk_anc_data_recv - optionally capture ancillary data for received message
 * @m: descriptor for message info
 * @skb: received message buffer
 * @tsk: TIPC port associated with message
 *
 * Note: Ancillary data is not captured if not requested by receiver.
 *
 * Return: 0 if successful, otherwise errno
 */

static int tipc_sk_anc_data_recv(struct msghdr *m, struct sk_buff *skb,
     struct tipc_sock *tsk)
{
 struct tipc_msg *hdr;
 u32 data[3] = {0,};
 bool has_addr;
 int dlen, rc;

 if (likely(m->msg_controllen == 0))
  return 0;

 hdr = buf_msg(skb);
 dlen = msg_data_sz(hdr);

 /* Capture errored message object, if any */
 if (msg_errcode(hdr)) {
  if (skb_linearize(skb))
   return -ENOMEM;
  hdr = buf_msg(skb);
  data[0] = msg_errcode(hdr);
  data[1] = dlen;
  rc = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, data);
  if (rc || !dlen)
   return rc;
  rc = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, dlen, msg_data(hdr));
  if (rc)
   return rc;
 }

 /* Capture TIPC_SERVICE_ADDR/RANGE destination address, if any */
 switch (msg_type(hdr)) {
 case TIPC_NAMED_MSG:
  has_addr = true;
  data[0] = msg_nametype(hdr);
  data[1] = msg_namelower(hdr);
  data[2] = data[1];
  break;
 case TIPC_MCAST_MSG:
  has_addr = true;
  data[0] = msg_nametype(hdr);
  data[1] = msg_namelower(hdr);
  data[2] = msg_nameupper(hdr);
  break;
 case TIPC_CONN_MSG:
  has_addr = !!tsk->conn_addrtype;
  data[0] = msg_nametype(&tsk->phdr);
  data[1] = msg_nameinst(&tsk->phdr);
  data[2] = data[1];
  break;
 default:
  has_addr = false;
 }
 if (!has_addr)
  return 0;
 return put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, data);
}

static struct sk_buff *tipc_sk_build_ack(struct tipc_sock *tsk)
{
 struct sock *sk = &tsk->sk;
 struct sk_buff *skb = NULL;
 struct tipc_msg *msg;
 u32 peer_port = tsk_peer_port(tsk);
 u32 dnode = tsk_peer_node(tsk);

 if (!tipc_sk_connected(sk))
  return NULL;
 skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0,
         dnode, tsk_own_node(tsk), peer_port,
         tsk->portid, TIPC_OK);
 if (!skb)
  return NULL;
 msg = buf_msg(skb);
 msg_set_conn_ack(msg, tsk->rcv_unacked);
 tsk->rcv_unacked = 0;

 /* Adjust to and advertize the correct window limit */
 if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL) {
  tsk->rcv_win = tsk_adv_blocks(tsk->sk.sk_rcvbuf);
  msg_set_adv_win(msg, tsk->rcv_win);
 }
 return skb;
}

static void tipc_sk_send_ack(struct tipc_sock *tsk)
{
 struct sk_buff *skb;

 skb = tipc_sk_build_ack(tsk);
 if (!skb)
  return;

 tipc_node_xmit_skb(sock_net(&tsk->sk), skb, tsk_peer_node(tsk),
      msg_link_selector(buf_msg(skb)));
}

static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
{
 struct sock *sk = sock->sk;
 DEFINE_WAIT_FUNC(wait, woken_wake_function);
 long timeo = *timeop;
 int err = sock_error(sk);

 if (err)
  return err;

 for (;;) {
  if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
   if (sk->sk_shutdown & RCV_SHUTDOWN) {
    err = -ENOTCONN;
    break;
   }
   add_wait_queue(sk_sleep(sk), &wait);
   release_sock(sk);
   timeo = wait_woken(&wait, TASK_INTERRUPTIBLE, timeo);
   sched_annotate_sleep();
   lock_sock(sk);
   remove_wait_queue(sk_sleep(sk), &wait);
  }
  err = 0;
  if (!skb_queue_empty(&sk->sk_receive_queue))
   break;
  err = -EAGAIN;
  if (!timeo)
   break;
  err = sock_intr_errno(timeo);
  if (signal_pending(current))
   break;

  err = sock_error(sk);
  if (err)
   break;
 }
 *timeop = timeo;
 return err;
}

/**
 * tipc_recvmsg - receive packet-oriented message
 * @sock: network socket
 * @m: descriptor for message info
 * @buflen: length of user buffer area
 * @flags: receive flags
 *
 * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
 * If the complete message doesn't fit in user area, truncate it.
 *
 * Return: size of returned message data, errno otherwise
 */

static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
   size_t buflen, int flags)
{
 struct sock *sk = sock->sk;
 bool connected = !tipc_sk_type_connectionless(sk);
 struct tipc_sock *tsk = tipc_sk(sk);
 int rc, err, hlen, dlen, copy;
 struct tipc_skb_cb *skb_cb;
 struct sk_buff_head xmitq;
 struct tipc_msg *hdr;
 struct sk_buff *skb;
 bool grp_evt;
 long timeout;

 /* Catch invalid receive requests */
 if (unlikely(!buflen))
  return -EINVAL;

 lock_sock(sk);
 if (unlikely(connected && sk->sk_state == TIPC_OPEN)) {
  rc = -ENOTCONN;
  goto exit;
 }
 timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);

 /* Step rcv queue to first msg with data or error; wait if necessary */
 do {
  rc = tipc_wait_for_rcvmsg(sock, &timeout);
  if (unlikely(rc))
   goto exit;
  skb = skb_peek(&sk->sk_receive_queue);
  skb_cb = TIPC_SKB_CB(skb);
  hdr = buf_msg(skb);
  dlen = msg_data_sz(hdr);
  hlen = msg_hdr_sz(hdr);
  err = msg_errcode(hdr);
  grp_evt = msg_is_grp_evt(hdr);
  if (likely(dlen || err))
   break;
  tsk_advance_rx_queue(sk);
 } while (1);

 /* Collect msg meta data, including error code and rejected data */
 tipc_sk_set_orig_addr(m, skb);
 rc = tipc_sk_anc_data_recv(m, skb, tsk);
 if (unlikely(rc))
  goto exit;
 hdr = buf_msg(skb);

 /* Capture data if non-error msg, otherwise just set return value */
 if (likely(!err)) {
  int offset = skb_cb->bytes_read;

  copy = min_t(int, dlen - offset, buflen);
  rc = skb_copy_datagram_msg(skb, hlen + offset, m, copy);
  if (unlikely(rc))
   goto exit;
  if (unlikely(offset + copy < dlen)) {
   if (flags & MSG_EOR) {
    if (!(flags & MSG_PEEK))
     skb_cb->bytes_read = offset + copy;
   } else {
    m->msg_flags |= MSG_TRUNC;
    skb_cb->bytes_read = 0;
   }
  } else {
   if (flags & MSG_EOR)
    m->msg_flags |= MSG_EOR;
   skb_cb->bytes_read = 0;
  }
 } else {
  copy = 0;
  rc = 0;
  if (err != TIPC_CONN_SHUTDOWN && connected && !m->msg_control) {
   rc = -ECONNRESET;
   goto exit;
  }
 }

 /* Mark message as group event if applicable */
 if (unlikely(grp_evt)) {
  if (msg_grp_evt(hdr) == TIPC_WITHDRAWN)
   m->msg_flags |= MSG_EOR;
  m->msg_flags |= MSG_OOB;
  copy = 0;
 }

 /* Caption of data or error code/rejected data was successful */
 if (unlikely(flags & MSG_PEEK))
  goto exit;

 /* Send group flow control advertisement when applicable */
 if (tsk->group && msg_in_group(hdr) && !grp_evt) {
  __skb_queue_head_init(&xmitq);
  tipc_group_update_rcv_win(tsk->group, tsk_blocks(hlen + dlen),
       msg_orignode(hdr), msg_origport(hdr),
       &xmitq);
  tipc_node_distr_xmit(sock_net(sk), &xmitq);
 }

 if (skb_cb->bytes_read)
  goto exit;

 tsk_advance_rx_queue(sk);

 if (likely(!connected))
  goto exit;

 /* Send connection flow control advertisement when applicable */
 tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen);
 if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)
  tipc_sk_send_ack(tsk);
exit:
 release_sock(sk);
 return rc ? rc : copy;
}

/**
 * tipc_recvstream - receive stream-oriented data
 * @sock: network socket
 * @m: descriptor for message info
 * @buflen: total size of user buffer area
 * @flags: receive flags
 *
 * Used for SOCK_STREAM messages only.  If not enough data is available
 * will optionally wait for more; never truncates data.
 *
 * Return: size of returned message data, errno otherwise
 */

static int tipc_recvstream(struct socket *sock, struct msghdr *m,
      size_t buflen, int flags)
{
 struct sock *sk = sock->sk;
 struct tipc_sock *tsk = tipc_sk(sk);
 struct sk_buff *skb;
 struct tipc_msg *hdr;
 struct tipc_skb_cb *skb_cb;
 bool peek = flags & MSG_PEEK;
 int offset, required, copy, copied = 0;
 int hlen, dlen, err, rc;
 long timeout;

 /* Catch invalid receive attempts */
 if (unlikely(!buflen))
  return -EINVAL;

 lock_sock(sk);

 if (unlikely(sk->sk_state == TIPC_OPEN)) {
  rc = -ENOTCONN;
  goto exit;
 }
 required = sock_rcvlowat(sk, flags & MSG_WAITALL, buflen);
 timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);

 do {
  /* Look at first msg in receive queue; wait if necessary */
  rc = tipc_wait_for_rcvmsg(sock, &timeout);
  if (unlikely(rc))
   break;
  skb = skb_peek(&sk->sk_receive_queue);
  skb_cb = TIPC_SKB_CB(skb);
  hdr = buf_msg(skb);
  dlen = msg_data_sz(hdr);
  hlen = msg_hdr_sz(hdr);
  err = msg_errcode(hdr);

  /* Discard any empty non-errored (SYN-) message */
  if (unlikely(!dlen && !err)) {
   tsk_advance_rx_queue(sk);
   continue;
  }

  /* Collect msg meta data, incl. error code and rejected data */
  if (!copied) {
   tipc_sk_set_orig_addr(m, skb);
   rc = tipc_sk_anc_data_recv(m, skb, tsk);
   if (rc)
    break;
   hdr = buf_msg(skb);
  }

  /* Copy data if msg ok, otherwise return error/partial data */
  if (likely(!err)) {
   offset = skb_cb->bytes_read;
   copy = min_t(int, dlen - offset, buflen - copied);
   rc = skb_copy_datagram_msg(skb, hlen + offset, m, copy);
   if (unlikely(rc))
    break;
   copied += copy;
   offset += copy;
   if (unlikely(offset < dlen)) {
    if (!peek)
     skb_cb->bytes_read = offset;
    break;
   }
  } else {
   rc = 0;
   if ((err != TIPC_CONN_SHUTDOWN) && !m->msg_control)
    rc = -ECONNRESET;
   if (copied || rc)
    break;
  }

  if (unlikely(peek))
   break;

  tsk_advance_rx_queue(sk);

  /* Send connection flow control advertisement when applicable */
  tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen);
  if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)
   tipc_sk_send_ack(tsk);

  /* Exit if all requested data or FIN/error received */
  if (copied == buflen || err)
   break;

 } while (!skb_queue_empty(&sk->sk_receive_queue) || copied < required);
exit:
 release_sock(sk);
 return copied ? copied : rc;
}

/**
 * tipc_write_space - wake up thread if port congestion is released
 * @sk: socket
 */

static void tipc_write_space(struct sock *sk)
{
 struct socket_wq *wq;

 rcu_read_lock();
 wq = rcu_dereference(sk->sk_wq);
 if (skwq_has_sleeper(wq))
  wake_up_interruptible_sync_poll(&wq->wait, EPOLLOUT |
      EPOLLWRNORM | EPOLLWRBAND);
 rcu_read_unlock();
}

/**
 * tipc_data_ready - wake up threads to indicate messages have been received
 * @sk: socket
 */

static void tipc_data_ready(struct sock *sk)
{
 struct socket_wq *wq;

 trace_sk_data_ready(sk);

 rcu_read_lock();
 wq = rcu_dereference(sk->sk_wq);
 if (skwq_has_sleeper(wq))
  wake_up_interruptible_sync_poll(&wq->wait, EPOLLIN |
      EPOLLRDNORM | EPOLLRDBAND);
 rcu_read_unlock();
}

static void tipc_sock_destruct(struct sock *sk)
{
 __skb_queue_purge(&sk->sk_receive_queue);
}

static void tipc_sk_proto_rcv(struct sock *sk,
         struct sk_buff_head *inputq,
         struct sk_buff_head *xmitq)
{
 struct sk_buff *skb = __skb_dequeue(inputq);
 struct tipc_sock *tsk = tipc_sk(sk);
 struct tipc_msg *hdr = buf_msg(skb);
 struct tipc_group *grp = tsk->group;
 bool wakeup = false;

 switch (msg_user(hdr)) {
 case CONN_MANAGER:
  tipc_sk_conn_proto_rcv(tsk, skb, inputq, xmitq);
  return;
 case SOCK_WAKEUP:
  tipc_dest_del(&tsk->cong_links, msg_orignode(hdr), 0);
  /* coupled with smp_rmb() in tipc_wait_for_cond() */
  smp_wmb();
  tsk->cong_link_cnt--;
  wakeup = true;
  tipc_sk_push_backlog(tsk, false);
  break;
 case GROUP_PROTOCOL:
  tipc_group_proto_rcv(grp, &wakeup, hdr, inputq, xmitq);
  break;
 case TOP_SRV:
  tipc_group_member_evt(tsk->group, &wakeup, &sk->sk_rcvbuf,
          hdr, inputq, xmitq);
  break;
 default:
  break;
 }

 if (wakeup)
  sk->sk_write_space(sk);

 kfree_skb(skb);
}

/**
 * tipc_sk_filter_connect - check incoming message for a connection-based socket
 * @tsk: TIPC socket
 * @skb: pointer to message buffer.
 * @xmitq: for Nagle ACK if any
 * Return: true if message should be added to receive queue, false otherwise
 */

static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb,
       struct sk_buff_head *xmitq)
{
 struct sock *sk = &tsk->sk;
 struct net *net = sock_net(sk);
 struct tipc_msg *hdr = buf_msg(skb);
 bool con_msg = msg_connected(hdr);
 u32 pport = tsk_peer_port(tsk);
 u32 pnode = tsk_peer_node(tsk);
 u32 oport = msg_origport(hdr);
 u32 onode = msg_orignode(hdr);
 int err = msg_errcode(hdr);
 unsigned long delay;

 if (unlikely(msg_mcast(hdr)))
  return false;
 tsk->oneway = 0;

 switch (sk->sk_state) {
 case TIPC_CONNECTING:
  /* Setup ACK */
  if (likely(con_msg)) {
   if (err)
    break;
   tipc_sk_finish_conn(tsk, oport, onode);
   msg_set_importance(&tsk->phdr, msg_importance(hdr));
   /* ACK+ message with data is added to receive queue */
   if (msg_data_sz(hdr))
    return true;
   /* Empty ACK-, - wake up sleeping connect() and drop */
   sk->sk_state_change(sk);
   msg_set_dest_droppable(hdr, 1);
   return false;
  }
  /* Ignore connectionless message if not from listening socket */
  if (oport != pport || onode != pnode)
   return false;

  /* Rejected SYN */
  if (err != TIPC_ERR_OVERLOAD)
   break;

  /* Prepare for new setup attempt if we have a SYN clone */
  if (skb_queue_empty(&sk->sk_write_queue))
   break;
  get_random_bytes(&delay, 2);
  delay %= (tsk->conn_timeout / 4);
  delay = msecs_to_jiffies(delay + 100);
  sk_reset_timer(sk, &sk->sk_timer, jiffies + delay);
  return false;
 case TIPC_OPEN:
 case TIPC_DISCONNECTING:
  return false;
 case TIPC_LISTEN:
  /* Accept only SYN message */
  if (!msg_is_syn(hdr) &&
      tipc_node_get_capabilities(net, onode) & TIPC_SYN_BIT)
   return false;
  if (!con_msg && !err)
   return true;
  return false;
 case TIPC_ESTABLISHED:
  if (!skb_queue_empty(&sk->sk_write_queue))
   tipc_sk_push_backlog(tsk, false);
  /* Accept only connection-based messages sent by peer */
  if (likely(con_msg && !err && pport == oport &&
      pnode == onode)) {
   if (msg_ack_required(hdr)) {
    struct sk_buff *skb;

    skb = tipc_sk_build_ack(tsk);
    if (skb) {
     msg_set_nagle_ack(buf_msg(skb));
     __skb_queue_tail(xmitq, skb);
    }
   }
   return true;
  }
  if (!tsk_peer_msg(tsk, hdr))
   return false;
  if (!err)
   return true;
  tipc_set_sk_state(sk, TIPC_DISCONNECTING);
  tipc_node_remove_conn(net, pnode, tsk->portid);
  sk->sk_state_change(sk);
  return true;
 default:
  pr_err("Unknown sk_state %u\n", sk->sk_state);
 }
 /* Abort connection setup attempt */
 tipc_set_sk_state(sk, TIPC_DISCONNECTING);
 sk->sk_err = ECONNREFUSED;
 sk->sk_state_change(sk);
 return true;
}

/**
 * rcvbuf_limit - get proper overload limit of socket receive queue
 * @sk: socket
 * @skb: message
 *
 * For connection oriented messages, irrespective of importance,
 * default queue limit is 2 MB.
 *
 * For connectionless messages, queue limits are based on message
 * importance as follows:
 *
 * TIPC_LOW_IMPORTANCE       (2 MB)
 * TIPC_MEDIUM_IMPORTANCE    (4 MB)
 * TIPC_HIGH_IMPORTANCE      (8 MB)
 * TIPC_CRITICAL_IMPORTANCE  (16 MB)
 *
 * Return: overload limit according to corresponding message importance
 */

static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
{
 struct tipc_sock *tsk = tipc_sk(sk);
 struct tipc_msg *hdr = buf_msg(skb);

 if (unlikely(msg_in_group(hdr)))
  return READ_ONCE(sk->sk_rcvbuf);

 if (unlikely(!msg_connected(hdr)))
  return READ_ONCE(sk->sk_rcvbuf) << msg_importance(hdr);

 if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
  return READ_ONCE(sk->sk_rcvbuf);

 return FLOWCTL_MSG_LIM;
}

/**
 * tipc_sk_filter_rcv - validate incoming message
 * @sk: socket
 * @skb: pointer to message.
 * @xmitq: output message area (FIXME)
 *
 * Enqueues message on receive queue if acceptable; optionally handles
 * disconnect indication for a connected socket.
 *
 * Called with socket lock already taken
 */

static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
          struct sk_buff_head *xmitq)
{
 bool sk_conn = !tipc_sk_type_connectionless(sk);
 struct tipc_sock *tsk = tipc_sk(sk);
 struct tipc_group *grp = tsk->group;
 struct tipc_msg *hdr = buf_msg(skb);
 struct net *net = sock_net(sk);
 struct sk_buff_head inputq;
 int mtyp = msg_type(hdr);
 int limit, err = TIPC_OK;

 trace_tipc_sk_filter_rcv(sk, skb, TIPC_DUMP_ALL, " ");
 TIPC_SKB_CB(skb)->bytes_read = 0;
 __skb_queue_head_init(&inputq);
 __skb_queue_tail(&inputq, skb);

 if (unlikely(!msg_isdata(hdr)))
  tipc_sk_proto_rcv(sk, &inputq, xmitq);

 if (unlikely(grp))
  tipc_group_filter_msg(grp, &inputq, xmitq);

 if (unlikely(!grp) && mtyp == TIPC_MCAST_MSG)
  tipc_mcast_filter_msg(net, &tsk->mc_method.deferredq, &inputq);

 /* Validate and add to receive buffer if there is space */
 while ((skb = __skb_dequeue(&inputq))) {
  hdr = buf_msg(skb);
  limit = rcvbuf_limit(sk, skb);
  if ((sk_conn && !tipc_sk_filter_connect(tsk, skb, xmitq)) ||
      (!sk_conn && msg_connected(hdr)) ||
      (!grp && msg_in_group(hdr)))
   err = TIPC_ERR_NO_PORT;
--> --------------------

--> maximum size reached

--> --------------------

Messung V0.5
C=95 H=97 G=95

¤ Dauer der Verarbeitung: 0.24 Sekunden  (vorverarbeitet)  ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.