linux/net/tipc/bcast.c
Jon Paul Maloy 50100a5e39 tipc: use pseudo message to wake up sockets after link congestion
The current link implementation keeps a linked list of blocked ports/
sockets that is populated when there is link congestion. The purpose
of this is to let the link know which users to wake up when the
congestion abates.

This adds unnecessary complexity to the data structure and the code,
since it forces us to involve the link each time we want to delete
a socket. It also forces us to grab the spinlock port_lock within
the scope of node_lock. We want to get rid of this direct dependence,
as well as the deadlock hazard resulting from the usage of port_lock.

In this commit, we instead let the link keep list of a "wakeup" pseudo
messages for use in such situations. Those messages are sent to the
pending sockets via the ordinary message reception path, and wake up
the socket's owner when they are received.

This enables us to get rid of the 'waiting_ports' linked lists in struct
tipc_port that manifest this direct reference. As a consequence, we can
eliminate another BH entry into the socket, and hence the need to grab
port_lock. This is a further step in our effort to remove port_lock
altogether.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
2014-08-23 11:18:33 -07:00

973 lines
24 KiB
C

/*
* net/tipc/bcast.c: TIPC broadcast code
*
* Copyright (c) 2004-2006, 2014, Ericsson AB
* Copyright (c) 2004, Intel Corporation.
* Copyright (c) 2005, 2010-2011, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. Neither the names of the copyright holders nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* Alternatively, this software may be distributed under the terms of the
* GNU General Public License ("GPL") version 2 as published by the Free
* Software Foundation.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "core.h"
#include "link.h"
#include "port.h"
#include "socket.h"
#include "msg.h"
#include "bcast.h"
#include "name_distr.h"
#define MAX_PKT_DEFAULT_MCAST 1500 /* bcast link max packet size (fixed) */
#define BCLINK_WIN_DEFAULT 20 /* bcast link window size (default) */
#define BCBEARER MAX_BEARERS
/**
* struct tipc_bcbearer_pair - a pair of bearers used by broadcast link
* @primary: pointer to primary bearer
* @secondary: pointer to secondary bearer
*
* Bearers must have same priority and same set of reachable destinations
* to be paired.
*/
struct tipc_bcbearer_pair {
struct tipc_bearer *primary;
struct tipc_bearer *secondary;
};
/**
* struct tipc_bcbearer - bearer used by broadcast link
* @bearer: (non-standard) broadcast bearer structure
* @media: (non-standard) broadcast media structure
* @bpairs: array of bearer pairs
* @bpairs_temp: temporary array of bearer pairs used by tipc_bcbearer_sort()
* @remains: temporary node map used by tipc_bcbearer_send()
* @remains_new: temporary node map used tipc_bcbearer_send()
*
* Note: The fields labelled "temporary" are incorporated into the bearer
* to avoid consuming potentially limited stack space through the use of
* large local variables within multicast routines. Concurrent access is
* prevented through use of the spinlock "bclink_lock".
*/
struct tipc_bcbearer {
struct tipc_bearer bearer;
struct tipc_media media;
struct tipc_bcbearer_pair bpairs[MAX_BEARERS];
struct tipc_bcbearer_pair bpairs_temp[TIPC_MAX_LINK_PRI + 1];
struct tipc_node_map remains;
struct tipc_node_map remains_new;
};
/**
* struct tipc_bclink - link used for broadcast messages
* @lock: spinlock governing access to structure
* @link: (non-standard) broadcast link structure
* @node: (non-standard) node structure representing b'cast link's peer node
* @flags: represent bclink states
* @bcast_nodes: map of broadcast-capable nodes
* @retransmit_to: node that most recently requested a retransmit
*
* Handles sequence numbering, fragmentation, bundling, etc.
*/
struct tipc_bclink {
spinlock_t lock;
struct tipc_link link;
struct tipc_node node;
unsigned int flags;
struct tipc_node_map bcast_nodes;
struct tipc_node *retransmit_to;
};
static struct tipc_bcbearer *bcbearer;
static struct tipc_bclink *bclink;
static struct tipc_link *bcl;
const char tipc_bclink_name[] = "broadcast-link";
static void tipc_nmap_diff(struct tipc_node_map *nm_a,
struct tipc_node_map *nm_b,
struct tipc_node_map *nm_diff);
static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node);
static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node);
static void tipc_bclink_lock(void)
{
spin_lock_bh(&bclink->lock);
}
static void tipc_bclink_unlock(void)
{
struct tipc_node *node = NULL;
if (likely(!bclink->flags)) {
spin_unlock_bh(&bclink->lock);
return;
}
if (bclink->flags & TIPC_BCLINK_RESET) {
bclink->flags &= ~TIPC_BCLINK_RESET;
node = tipc_bclink_retransmit_to();
}
spin_unlock_bh(&bclink->lock);
if (node)
tipc_link_reset_all(node);
}
uint tipc_bclink_get_mtu(void)
{
return MAX_PKT_DEFAULT_MCAST;
}
void tipc_bclink_set_flags(unsigned int flags)
{
bclink->flags |= flags;
}
static u32 bcbuf_acks(struct sk_buff *buf)
{
return (u32)(unsigned long)TIPC_SKB_CB(buf)->handle;
}
static void bcbuf_set_acks(struct sk_buff *buf, u32 acks)
{
TIPC_SKB_CB(buf)->handle = (void *)(unsigned long)acks;
}
static void bcbuf_decr_acks(struct sk_buff *buf)
{
bcbuf_set_acks(buf, bcbuf_acks(buf) - 1);
}
void tipc_bclink_add_node(u32 addr)
{
tipc_bclink_lock();
tipc_nmap_add(&bclink->bcast_nodes, addr);
tipc_bclink_unlock();
}
void tipc_bclink_remove_node(u32 addr)
{
tipc_bclink_lock();
tipc_nmap_remove(&bclink->bcast_nodes, addr);
tipc_bclink_unlock();
}
static void bclink_set_last_sent(void)
{
if (bcl->next_out)
bcl->fsm_msg_cnt = mod(buf_seqno(bcl->next_out) - 1);
else
bcl->fsm_msg_cnt = mod(bcl->next_out_no - 1);
}
u32 tipc_bclink_get_last_sent(void)
{
return bcl->fsm_msg_cnt;
}
static void bclink_update_last_sent(struct tipc_node *node, u32 seqno)
{
node->bclink.last_sent = less_eq(node->bclink.last_sent, seqno) ?
seqno : node->bclink.last_sent;
}
/**
* tipc_bclink_retransmit_to - get most recent node to request retransmission
*
* Called with bclink_lock locked
*/
struct tipc_node *tipc_bclink_retransmit_to(void)
{
return bclink->retransmit_to;
}
/**
* bclink_retransmit_pkt - retransmit broadcast packets
* @after: sequence number of last packet to *not* retransmit
* @to: sequence number of last packet to retransmit
*
* Called with bclink_lock locked
*/
static void bclink_retransmit_pkt(u32 after, u32 to)
{
struct sk_buff *buf;
buf = bcl->first_out;
while (buf && less_eq(buf_seqno(buf), after))
buf = buf->next;
tipc_link_retransmit(bcl, buf, mod(to - after));
}
/**
* tipc_bclink_acknowledge - handle acknowledgement of broadcast packets
* @n_ptr: node that sent acknowledgement info
* @acked: broadcast sequence # that has been acknowledged
*
* Node is locked, bclink_lock unlocked.
*/
void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
{
struct sk_buff *crs;
struct sk_buff *next;
unsigned int released = 0;
tipc_bclink_lock();
/* Bail out if tx queue is empty (no clean up is required) */
crs = bcl->first_out;
if (!crs)
goto exit;
/* Determine which messages need to be acknowledged */
if (acked == INVALID_LINK_SEQ) {
/*
* Contact with specified node has been lost, so need to
* acknowledge sent messages only (if other nodes still exist)
* or both sent and unsent messages (otherwise)
*/
if (bclink->bcast_nodes.count)
acked = bcl->fsm_msg_cnt;
else
acked = bcl->next_out_no;
} else {
/*
* Bail out if specified sequence number does not correspond
* to a message that has been sent and not yet acknowledged
*/
if (less(acked, buf_seqno(crs)) ||
less(bcl->fsm_msg_cnt, acked) ||
less_eq(acked, n_ptr->bclink.acked))
goto exit;
}
/* Skip over packets that node has previously acknowledged */
while (crs && less_eq(buf_seqno(crs), n_ptr->bclink.acked))
crs = crs->next;
/* Update packets that node is now acknowledging */
while (crs && less_eq(buf_seqno(crs), acked)) {
next = crs->next;
if (crs != bcl->next_out)
bcbuf_decr_acks(crs);
else {
bcbuf_set_acks(crs, 0);
bcl->next_out = next;
bclink_set_last_sent();
}
if (bcbuf_acks(crs) == 0) {
bcl->first_out = next;
bcl->out_queue_size--;
kfree_skb(crs);
released = 1;
}
crs = next;
}
n_ptr->bclink.acked = acked;
/* Try resolving broadcast link congestion, if necessary */
if (unlikely(bcl->next_out)) {
tipc_link_push_queue(bcl);
bclink_set_last_sent();
}
if (unlikely(released && !skb_queue_empty(&bcl->waiting_sks)))
bclink->node.action_flags |= TIPC_WAKEUP_USERS;
exit:
tipc_bclink_unlock();
}
/**
* tipc_bclink_update_link_state - update broadcast link state
*
* RCU and node lock set
*/
void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
{
struct sk_buff *buf;
/* Ignore "stale" link state info */
if (less_eq(last_sent, n_ptr->bclink.last_in))
return;
/* Update link synchronization state; quit if in sync */
bclink_update_last_sent(n_ptr, last_sent);
if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in)
return;
/* Update out-of-sync state; quit if loss is still unconfirmed */
if ((++n_ptr->bclink.oos_state) == 1) {
if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2))
return;
n_ptr->bclink.oos_state++;
}
/* Don't NACK if one has been recently sent (or seen) */
if (n_ptr->bclink.oos_state & 0x1)
return;
/* Send NACK */
buf = tipc_buf_acquire(INT_H_SIZE);
if (buf) {
struct tipc_msg *msg = buf_msg(buf);
tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG,
INT_H_SIZE, n_ptr->addr);
msg_set_non_seq(msg, 1);
msg_set_mc_netid(msg, tipc_net_id);
msg_set_bcast_ack(msg, n_ptr->bclink.last_in);
msg_set_bcgap_after(msg, n_ptr->bclink.last_in);
msg_set_bcgap_to(msg, n_ptr->bclink.deferred_head
? buf_seqno(n_ptr->bclink.deferred_head) - 1
: n_ptr->bclink.last_sent);
tipc_bclink_lock();
tipc_bearer_send(MAX_BEARERS, buf, NULL);
bcl->stats.sent_nacks++;
tipc_bclink_unlock();
kfree_skb(buf);
n_ptr->bclink.oos_state++;
}
}
/**
* bclink_peek_nack - monitor retransmission requests sent by other nodes
*
* Delay any upcoming NACK by this node if another node has already
* requested the first message this node is going to ask for.
*/
static void bclink_peek_nack(struct tipc_msg *msg)
{
struct tipc_node *n_ptr = tipc_node_find(msg_destnode(msg));
if (unlikely(!n_ptr))
return;
tipc_node_lock(n_ptr);
if (n_ptr->bclink.recv_permitted &&
(n_ptr->bclink.last_in != n_ptr->bclink.last_sent) &&
(n_ptr->bclink.last_in == msg_bcgap_after(msg)))
n_ptr->bclink.oos_state = 2;
tipc_node_unlock(n_ptr);
}
/* tipc_bclink_xmit - broadcast buffer chain to all nodes in cluster
* and to identified node local sockets
* @buf: chain of buffers containing message
* Consumes the buffer chain, except when returning -ELINKCONG
* Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
*/
int tipc_bclink_xmit(struct sk_buff *buf)
{
int rc = 0;
int bc = 0;
struct sk_buff *clbuf;
/* Prepare clone of message for local node */
clbuf = tipc_msg_reassemble(buf);
if (unlikely(!clbuf)) {
kfree_skb_list(buf);
return -EHOSTUNREACH;
}
/* Broadcast to all other nodes */
if (likely(bclink)) {
tipc_bclink_lock();
if (likely(bclink->bcast_nodes.count)) {
rc = __tipc_link_xmit(bcl, buf);
if (likely(!rc)) {
bclink_set_last_sent();
bcl->stats.queue_sz_counts++;
bcl->stats.accu_queue_sz += bcl->out_queue_size;
}
bc = 1;
}
tipc_bclink_unlock();
}
if (unlikely(!bc))
kfree_skb_list(buf);
/* Deliver message clone */
if (likely(!rc))
tipc_sk_mcast_rcv(clbuf);
else
kfree_skb(clbuf);
return rc;
}
/**
* bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
*
* Called with both sending node's lock and bclink_lock taken.
*/
static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
{
bclink_update_last_sent(node, seqno);
node->bclink.last_in = seqno;
node->bclink.oos_state = 0;
bcl->stats.recv_info++;
/*
* Unicast an ACK periodically, ensuring that
* all nodes in the cluster don't ACK at the same time
*/
if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) {
tipc_link_proto_xmit(node->active_links[node->addr & 1],
STATE_MSG, 0, 0, 0, 0, 0);
bcl->stats.sent_acks++;
}
}
/**
* tipc_bclink_rcv - receive a broadcast packet, and deliver upwards
*
* RCU is locked, no other locks set
*/
void tipc_bclink_rcv(struct sk_buff *buf)
{
struct tipc_msg *msg = buf_msg(buf);
struct tipc_node *node;
u32 next_in;
u32 seqno;
int deferred = 0;
/* Screen out unwanted broadcast messages */
if (msg_mc_netid(msg) != tipc_net_id)
goto exit;
node = tipc_node_find(msg_prevnode(msg));
if (unlikely(!node))
goto exit;
tipc_node_lock(node);
if (unlikely(!node->bclink.recv_permitted))
goto unlock;
/* Handle broadcast protocol message */
if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) {
if (msg_type(msg) != STATE_MSG)
goto unlock;
if (msg_destnode(msg) == tipc_own_addr) {
tipc_bclink_acknowledge(node, msg_bcast_ack(msg));
tipc_node_unlock(node);
tipc_bclink_lock();
bcl->stats.recv_nacks++;
bclink->retransmit_to = node;
bclink_retransmit_pkt(msg_bcgap_after(msg),
msg_bcgap_to(msg));
tipc_bclink_unlock();
} else {
tipc_node_unlock(node);
bclink_peek_nack(msg);
}
goto exit;
}
/* Handle in-sequence broadcast message */
seqno = msg_seqno(msg);
next_in = mod(node->bclink.last_in + 1);
if (likely(seqno == next_in)) {
receive:
/* Deliver message to destination */
if (likely(msg_isdata(msg))) {
tipc_bclink_lock();
bclink_accept_pkt(node, seqno);
tipc_bclink_unlock();
tipc_node_unlock(node);
if (likely(msg_mcast(msg)))
tipc_sk_mcast_rcv(buf);
else
kfree_skb(buf);
} else if (msg_user(msg) == MSG_BUNDLER) {
tipc_bclink_lock();
bclink_accept_pkt(node, seqno);
bcl->stats.recv_bundles++;
bcl->stats.recv_bundled += msg_msgcnt(msg);
tipc_bclink_unlock();
tipc_node_unlock(node);
tipc_link_bundle_rcv(buf);
} else if (msg_user(msg) == MSG_FRAGMENTER) {
tipc_buf_append(&node->bclink.reasm_buf, &buf);
if (unlikely(!buf && !node->bclink.reasm_buf))
goto unlock;
tipc_bclink_lock();
bclink_accept_pkt(node, seqno);
bcl->stats.recv_fragments++;
if (buf) {
bcl->stats.recv_fragmented++;
msg = buf_msg(buf);
tipc_bclink_unlock();
goto receive;
}
tipc_bclink_unlock();
tipc_node_unlock(node);
} else if (msg_user(msg) == NAME_DISTRIBUTOR) {
tipc_bclink_lock();
bclink_accept_pkt(node, seqno);
tipc_bclink_unlock();
tipc_node_unlock(node);
tipc_named_rcv(buf);
} else {
tipc_bclink_lock();
bclink_accept_pkt(node, seqno);
tipc_bclink_unlock();
tipc_node_unlock(node);
kfree_skb(buf);
}
buf = NULL;
/* Determine new synchronization state */
tipc_node_lock(node);
if (unlikely(!tipc_node_is_up(node)))
goto unlock;
if (node->bclink.last_in == node->bclink.last_sent)
goto unlock;
if (!node->bclink.deferred_head) {
node->bclink.oos_state = 1;
goto unlock;
}
msg = buf_msg(node->bclink.deferred_head);
seqno = msg_seqno(msg);
next_in = mod(next_in + 1);
if (seqno != next_in)
goto unlock;
/* Take in-sequence message from deferred queue & deliver it */
buf = node->bclink.deferred_head;
node->bclink.deferred_head = buf->next;
buf->next = NULL;
node->bclink.deferred_size--;
goto receive;
}
/* Handle out-of-sequence broadcast message */
if (less(next_in, seqno)) {
deferred = tipc_link_defer_pkt(&node->bclink.deferred_head,
&node->bclink.deferred_tail,
buf);
node->bclink.deferred_size += deferred;
bclink_update_last_sent(node, seqno);
buf = NULL;
}
tipc_bclink_lock();
if (deferred)
bcl->stats.deferred_recv++;
else
bcl->stats.duplicates++;
tipc_bclink_unlock();
unlock:
tipc_node_unlock(node);
exit:
kfree_skb(buf);
}
u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr)
{
return (n_ptr->bclink.recv_permitted &&
(tipc_bclink_get_last_sent() != n_ptr->bclink.acked));
}
/**
* tipc_bcbearer_send - send a packet through the broadcast pseudo-bearer
*
* Send packet over as many bearers as necessary to reach all nodes
* that have joined the broadcast link.
*
* Returns 0 (packet sent successfully) under all circumstances,
* since the broadcast link's pseudo-bearer never blocks
*/
static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1,
struct tipc_media_addr *unused2)
{
int bp_index;
struct tipc_msg *msg = buf_msg(buf);
/* Prepare broadcast link message for reliable transmission,
* if first time trying to send it;
* preparation is skipped for broadcast link protocol messages
* since they are sent in an unreliable manner and don't need it
*/
if (likely(!msg_non_seq(buf_msg(buf)))) {
bcbuf_set_acks(buf, bclink->bcast_nodes.count);
msg_set_non_seq(msg, 1);
msg_set_mc_netid(msg, tipc_net_id);
bcl->stats.sent_info++;
if (WARN_ON(!bclink->bcast_nodes.count)) {
dump_stack();
return 0;
}
}
/* Send buffer over bearers until all targets reached */
bcbearer->remains = bclink->bcast_nodes;
for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) {
struct tipc_bearer *p = bcbearer->bpairs[bp_index].primary;
struct tipc_bearer *s = bcbearer->bpairs[bp_index].secondary;
struct tipc_bearer *bp[2] = {p, s};
struct tipc_bearer *b = bp[msg_link_selector(msg)];
struct sk_buff *tbuf;
if (!p)
break; /* No more bearers to try */
if (!b)
b = p;
tipc_nmap_diff(&bcbearer->remains, &b->nodes,
&bcbearer->remains_new);
if (bcbearer->remains_new.count == bcbearer->remains.count)
continue; /* Nothing added by bearer pair */
if (bp_index == 0) {
/* Use original buffer for first bearer */
tipc_bearer_send(b->identity, buf, &b->bcast_addr);
} else {
/* Avoid concurrent buffer access */
tbuf = pskb_copy_for_clone(buf, GFP_ATOMIC);
if (!tbuf)
break;
tipc_bearer_send(b->identity, tbuf, &b->bcast_addr);
kfree_skb(tbuf); /* Bearer keeps a clone */
}
if (bcbearer->remains_new.count == 0)
break; /* All targets reached */
bcbearer->remains = bcbearer->remains_new;
}
return 0;
}
/**
* tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer
*/
void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action)
{
struct tipc_bcbearer_pair *bp_temp = bcbearer->bpairs_temp;
struct tipc_bcbearer_pair *bp_curr;
struct tipc_bearer *b;
int b_index;
int pri;
tipc_bclink_lock();
if (action)
tipc_nmap_add(nm_ptr, node);
else
tipc_nmap_remove(nm_ptr, node);
/* Group bearers by priority (can assume max of two per priority) */
memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp));
rcu_read_lock();
for (b_index = 0; b_index < MAX_BEARERS; b_index++) {
b = rcu_dereference_rtnl(bearer_list[b_index]);
if (!b || !b->nodes.count)
continue;
if (!bp_temp[b->priority].primary)
bp_temp[b->priority].primary = b;
else
bp_temp[b->priority].secondary = b;
}
rcu_read_unlock();
/* Create array of bearer pairs for broadcasting */
bp_curr = bcbearer->bpairs;
memset(bcbearer->bpairs, 0, sizeof(bcbearer->bpairs));
for (pri = TIPC_MAX_LINK_PRI; pri >= 0; pri--) {
if (!bp_temp[pri].primary)
continue;
bp_curr->primary = bp_temp[pri].primary;
if (bp_temp[pri].secondary) {
if (tipc_nmap_equal(&bp_temp[pri].primary->nodes,
&bp_temp[pri].secondary->nodes)) {
bp_curr->secondary = bp_temp[pri].secondary;
} else {
bp_curr++;
bp_curr->primary = bp_temp[pri].secondary;
}
}
bp_curr++;
}
tipc_bclink_unlock();
}
int tipc_bclink_stats(char *buf, const u32 buf_size)
{
int ret;
struct tipc_stats *s;
if (!bcl)
return 0;
tipc_bclink_lock();
s = &bcl->stats;
ret = tipc_snprintf(buf, buf_size, "Link <%s>\n"
" Window:%u packets\n",
bcl->name, bcl->queue_limit[0]);
ret += tipc_snprintf(buf + ret, buf_size - ret,
" RX packets:%u fragments:%u/%u bundles:%u/%u\n",
s->recv_info, s->recv_fragments,
s->recv_fragmented, s->recv_bundles,
s->recv_bundled);
ret += tipc_snprintf(buf + ret, buf_size - ret,
" TX packets:%u fragments:%u/%u bundles:%u/%u\n",
s->sent_info, s->sent_fragments,
s->sent_fragmented, s->sent_bundles,
s->sent_bundled);
ret += tipc_snprintf(buf + ret, buf_size - ret,
" RX naks:%u defs:%u dups:%u\n",
s->recv_nacks, s->deferred_recv, s->duplicates);
ret += tipc_snprintf(buf + ret, buf_size - ret,
" TX naks:%u acks:%u dups:%u\n",
s->sent_nacks, s->sent_acks, s->retransmitted);
ret += tipc_snprintf(buf + ret, buf_size - ret,
" Congestion link:%u Send queue max:%u avg:%u\n",
s->link_congs, s->max_queue_sz,
s->queue_sz_counts ?
(s->accu_queue_sz / s->queue_sz_counts) : 0);
tipc_bclink_unlock();
return ret;
}
int tipc_bclink_reset_stats(void)
{
if (!bcl)
return -ENOPROTOOPT;
tipc_bclink_lock();
memset(&bcl->stats, 0, sizeof(bcl->stats));
tipc_bclink_unlock();
return 0;
}
int tipc_bclink_set_queue_limits(u32 limit)
{
if (!bcl)
return -ENOPROTOOPT;
if ((limit < TIPC_MIN_LINK_WIN) || (limit > TIPC_MAX_LINK_WIN))
return -EINVAL;
tipc_bclink_lock();
tipc_link_set_queue_limits(bcl, limit);
tipc_bclink_unlock();
return 0;
}
int tipc_bclink_init(void)
{
bcbearer = kzalloc(sizeof(*bcbearer), GFP_ATOMIC);
if (!bcbearer)
return -ENOMEM;
bclink = kzalloc(sizeof(*bclink), GFP_ATOMIC);
if (!bclink) {
kfree(bcbearer);
return -ENOMEM;
}
bcl = &bclink->link;
bcbearer->bearer.media = &bcbearer->media;
bcbearer->media.send_msg = tipc_bcbearer_send;
sprintf(bcbearer->media.name, "tipc-broadcast");
spin_lock_init(&bclink->lock);
__skb_queue_head_init(&bcl->waiting_sks);
bcl->next_out_no = 1;
spin_lock_init(&bclink->node.lock);
__skb_queue_head_init(&bclink->node.waiting_sks);
bcl->owner = &bclink->node;
bcl->max_pkt = MAX_PKT_DEFAULT_MCAST;
tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT);
bcl->bearer_id = MAX_BEARERS;
rcu_assign_pointer(bearer_list[MAX_BEARERS], &bcbearer->bearer);
bcl->state = WORKING_WORKING;
strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME);
return 0;
}
void tipc_bclink_stop(void)
{
tipc_bclink_lock();
tipc_link_purge_queues(bcl);
tipc_bclink_unlock();
RCU_INIT_POINTER(bearer_list[BCBEARER], NULL);
synchronize_net();
kfree(bcbearer);
kfree(bclink);
}
/**
* tipc_nmap_add - add a node to a node map
*/
static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node)
{
int n = tipc_node(node);
int w = n / WSIZE;
u32 mask = (1 << (n % WSIZE));
if ((nm_ptr->map[w] & mask) == 0) {
nm_ptr->count++;
nm_ptr->map[w] |= mask;
}
}
/**
* tipc_nmap_remove - remove a node from a node map
*/
static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node)
{
int n = tipc_node(node);
int w = n / WSIZE;
u32 mask = (1 << (n % WSIZE));
if ((nm_ptr->map[w] & mask) != 0) {
nm_ptr->map[w] &= ~mask;
nm_ptr->count--;
}
}
/**
* tipc_nmap_diff - find differences between node maps
* @nm_a: input node map A
* @nm_b: input node map B
* @nm_diff: output node map A-B (i.e. nodes of A that are not in B)
*/
static void tipc_nmap_diff(struct tipc_node_map *nm_a,
struct tipc_node_map *nm_b,
struct tipc_node_map *nm_diff)
{
int stop = ARRAY_SIZE(nm_a->map);
int w;
int b;
u32 map;
memset(nm_diff, 0, sizeof(*nm_diff));
for (w = 0; w < stop; w++) {
map = nm_a->map[w] ^ (nm_a->map[w] & nm_b->map[w]);
nm_diff->map[w] = map;
if (map != 0) {
for (b = 0 ; b < WSIZE; b++) {
if (map & (1 << b))
nm_diff->count++;
}
}
}
}
/**
* tipc_port_list_add - add a port to a port list, ensuring no duplicates
*/
void tipc_port_list_add(struct tipc_port_list *pl_ptr, u32 port)
{
struct tipc_port_list *item = pl_ptr;
int i;
int item_sz = PLSIZE;
int cnt = pl_ptr->count;
for (; ; cnt -= item_sz, item = item->next) {
if (cnt < PLSIZE)
item_sz = cnt;
for (i = 0; i < item_sz; i++)
if (item->ports[i] == port)
return;
if (i < PLSIZE) {
item->ports[i] = port;
pl_ptr->count++;
return;
}
if (!item->next) {
item->next = kmalloc(sizeof(*item), GFP_ATOMIC);
if (!item->next) {
pr_warn("Incomplete multicast delivery, no memory\n");
return;
}
item->next->next = NULL;
}
}
}
/**
* tipc_port_list_free - free dynamically created entries in port_list chain
*
*/
void tipc_port_list_free(struct tipc_port_list *pl_ptr)
{
struct tipc_port_list *item;
struct tipc_port_list *next;
for (item = pl_ptr->next; item; item = next) {
next = item->next;
kfree(item);
}
}