diff --git a/net/tipc/link.c b/net/tipc/link.c index 2ccdb6ffd5c8..d5f4005f388f 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -48,7 +48,7 @@ /* * Error message prefixes */ -static const char *link_co_err = "Link changeover error, "; +static const char *link_co_err = "Link tunneling error, "; static const char *link_rst_msg = "Resetting link "; static const char *link_unk_evt = "Unknown link event "; @@ -139,24 +139,6 @@ static void tipc_link_build_bcast_sync_msg(struct tipc_link *l, static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf); static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb); static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb); -static int tipc_link_failover_rcv(struct tipc_link *l, struct sk_buff **skb); - -/* - * Simple link routines - */ -static unsigned int align(unsigned int i) -{ - return (i + 3) & ~3u; -} - -static struct tipc_link *tipc_parallel_link(struct tipc_link *l) -{ - struct tipc_node *n = l->owner; - - if (node_active_link(n, 0) != l) - return node_active_link(n, 0); - return node_active_link(n, 1); -} /* * Simple non-static link routines (i.e. referenced outside this file) @@ -394,12 +376,10 @@ static int tipc_link_fsm_evt(struct tipc_link *l, int evt, /* Perform actions as decided by FSM */ if (actions & LINK_RESET) { l->exec_mode = TIPC_LINK_BLOCKED; - rc |= TIPC_LINK_DOWN_EVT; - } - if (actions & LINK_ACTIVATE) { - l->exec_mode = TIPC_LINK_OPEN; - rc |= TIPC_LINK_UP_EVT; + rc = TIPC_LINK_DOWN_EVT; } + if (actions & LINK_ACTIVATE) + rc = TIPC_LINK_UP_EVT; if (actions & (SND_STATE | SND_PROBE)) mtyp = STATE_MSG; if (actions & SND_RESET) @@ -461,6 +441,9 @@ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq) { int rc = 0; + if (l->exec_mode == TIPC_LINK_BLOCKED) + return rc; + link_profile_stats(l); if (l->silent_intv_cnt) rc = tipc_link_fsm_evt(l, SILENCE_EVT, xmitq); @@ -563,52 +546,42 @@ void tipc_link_purge_queues(struct tipc_link *l_ptr) tipc_link_reset_fragments(l_ptr); } -void tipc_link_reset(struct tipc_link *l_ptr) +void tipc_link_reset(struct tipc_link *l) { - u32 prev_state = l_ptr->state; - struct tipc_node *owner = l_ptr->owner; - struct tipc_link *pl = tipc_parallel_link(l_ptr); + struct tipc_node *owner = l->owner; - msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff)); + l->state = TIPC_LINK_RESETTING; /* Link is down, accept any session */ - l_ptr->peer_session = WILDCARD_SESSION; + l->peer_session = WILDCARD_SESSION; + + /* If peer is up, it only accepts an incremented session number */ + msg_set_session(l->pmsg, msg_session(l->pmsg) + 1); /* Prepare for renewed mtu size negotiation */ - l_ptr->mtu = l_ptr->advertised_mtu; + l->mtu = l->advertised_mtu; - l_ptr->state = TIPC_LINK_RESETTING; - - if ((prev_state == TIPC_LINK_RESETTING) || - (prev_state == TIPC_LINK_ESTABLISHING)) - return; - - if (tipc_node_is_up(l_ptr->owner) && (pl != l_ptr)) { - l_ptr->exec_mode = TIPC_LINK_BLOCKED; - l_ptr->failover_checkpt = l_ptr->rcv_nxt; - pl->failover_pkts = FIRST_FAILOVER; - pl->failover_checkpt = l_ptr->rcv_nxt; - pl->failover_skb = l_ptr->reasm_buf; - } else { - kfree_skb(l_ptr->reasm_buf); - } /* Clean up all queues, except inputq: */ - __skb_queue_purge(&l_ptr->transmq); - __skb_queue_purge(&l_ptr->deferdq); + __skb_queue_purge(&l->transmq); + __skb_queue_purge(&l->deferdq); if (!owner->inputq) - owner->inputq = l_ptr->inputq; - skb_queue_splice_init(&l_ptr->wakeupq, owner->inputq); + owner->inputq = l->inputq; + skb_queue_splice_init(&l->wakeupq, owner->inputq); if (!skb_queue_empty(owner->inputq)) owner->action_flags |= TIPC_MSG_EVT; - tipc_link_purge_backlog(l_ptr); - l_ptr->reasm_buf = NULL; - l_ptr->rcv_unacked = 0; - l_ptr->snd_nxt = 1; - l_ptr->rcv_nxt = 1; - l_ptr->silent_intv_cnt = 0; - l_ptr->stats.recv_info = 0; - l_ptr->stale_count = 0; - link_reset_statistics(l_ptr); + + tipc_link_purge_backlog(l); + kfree_skb(l->reasm_buf); + kfree_skb(l->failover_reasm_skb); + l->reasm_buf = NULL; + l->failover_reasm_skb = NULL; + l->rcv_unacked = 0; + l->snd_nxt = 1; + l->rcv_nxt = 1; + l->silent_intv_cnt = 0; + l->stats.recv_info = 0; + l->stale_count = 0; + link_reset_statistics(l); } /** @@ -751,20 +724,6 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, return 0; } -static void skb2list(struct sk_buff *skb, struct sk_buff_head *list) -{ - skb_queue_head_init(list); - __skb_queue_tail(list, skb); -} - -static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb) -{ - struct sk_buff_head head; - - skb2list(skb, &head); - return __tipc_link_xmit(link->owner->net, link, &head); -} - /* * tipc_link_sync_rcv - synchronize broadcast link endpoints. * Receive the sequence number where we should start receiving and @@ -955,32 +914,6 @@ static int tipc_link_retransm(struct tipc_link *l, int retransm, return 0; } -/* link_synch(): check if all packets arrived before the synch - * point have been consumed - * Returns true if the parallel links are synched, otherwise false - */ -static bool link_synch(struct tipc_link *l) -{ - unsigned int post_synch; - struct tipc_link *pl; - - pl = tipc_parallel_link(l); - if (pl == l) - goto synched; - - /* Was last pre-synch packet added to input queue ? */ - if (less_eq(pl->rcv_nxt, l->synch_point)) - return false; - - /* Is it still in the input queue ? */ - post_synch = mod(pl->rcv_nxt - l->synch_point) - 1; - if (skb_queue_len(pl->inputq) > post_synch) - return false; -synched: - l->exec_mode = TIPC_LINK_OPEN; - return true; -} - /* tipc_data_input - deliver data and name distr msgs to upper layer * * Consumes buffer if message is of right type @@ -1025,54 +958,59 @@ static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb) /* tipc_link_input - process packet that has passed link protocol check * * Consumes buffer - * Node lock must be held */ -static int tipc_link_input(struct tipc_link *link, struct sk_buff *skb) +static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb) { - struct tipc_node *node = link->owner; - struct tipc_msg *msg = buf_msg(skb); + struct tipc_node *node = l->owner; + struct tipc_msg *hdr = buf_msg(skb); + struct sk_buff **reasm_skb = &l->reasm_buf; struct sk_buff *iskb; - int pos = 0; + int usr = msg_user(hdr); int rc = 0; + int pos = 0; + int ipos = 0; - switch (msg_user(msg)) { - case TUNNEL_PROTOCOL: - if (msg_dup(msg)) { - link->exec_mode = TIPC_LINK_TUNNEL; - link->synch_point = msg_seqno(msg_get_wrapped(msg)); - kfree_skb(skb); - break; + if (unlikely(usr == TUNNEL_PROTOCOL)) { + if (msg_type(hdr) == SYNCH_MSG) { + __skb_queue_purge(&l->deferdq); + goto drop; } - rc |= tipc_link_failover_rcv(link, &skb); - if (!skb) - break; - if (msg_user(buf_msg(skb)) != MSG_BUNDLER) { - tipc_data_input(link, skb); - break; - } - case MSG_BUNDLER: - link->stats.recv_bundles++; - link->stats.recv_bundled += msg_msgcnt(msg); + if (!tipc_msg_extract(skb, &iskb, &ipos)) + return rc; + kfree_skb(skb); + skb = iskb; + hdr = buf_msg(skb); + if (less(msg_seqno(hdr), l->drop_point)) + goto drop; + if (tipc_data_input(l, skb)) + return rc; + usr = msg_user(hdr); + reasm_skb = &l->failover_reasm_skb; + } + if (usr == MSG_BUNDLER) { + l->stats.recv_bundles++; + l->stats.recv_bundled += msg_msgcnt(hdr); while (tipc_msg_extract(skb, &iskb, &pos)) - tipc_data_input(link, iskb); - break; - case MSG_FRAGMENTER: - link->stats.recv_fragments++; - if (tipc_buf_append(&link->reasm_buf, &skb)) { - link->stats.recv_fragmented++; - tipc_data_input(link, skb); - } else if (!link->reasm_buf) { - link->exec_mode = TIPC_LINK_BLOCKED; - rc |= TIPC_LINK_DOWN_EVT; + tipc_data_input(l, iskb); + return rc; + } else if (usr == MSG_FRAGMENTER) { + l->stats.recv_fragments++; + if (tipc_buf_append(reasm_skb, &skb)) { + l->stats.recv_fragmented++; + tipc_data_input(l, skb); + } else if (!*reasm_skb) { + l->exec_mode = TIPC_LINK_BLOCKED; + l->state = TIPC_LINK_RESETTING; + rc = TIPC_LINK_DOWN_EVT; } - break; - case BCAST_PROTOCOL: + return rc; + } else if (usr == BCAST_PROTOCOL) { tipc_link_sync_rcv(node, skb); - break; - default: - break; - }; + return rc; + } +drop: + kfree_skb(skb); return rc; } @@ -1100,7 +1038,6 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, struct sk_buff_head *xmitq) { struct sk_buff_head *arrvq = &l->deferdq; - struct sk_buff *tmp; struct tipc_msg *hdr; u16 seqno, rcv_nxt; int rc = 0; @@ -1112,18 +1049,18 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, return rc; } - skb_queue_walk_safe(arrvq, skb, tmp) { + while ((skb = skb_peek(arrvq))) { hdr = buf_msg(skb); /* Verify and update link state */ if (unlikely(msg_user(hdr) == LINK_PROTOCOL)) { __skb_dequeue(arrvq); - rc |= tipc_link_proto_rcv(l, skb, xmitq); + rc = tipc_link_proto_rcv(l, skb, xmitq); continue; } if (unlikely(!link_working(l))) { - rc |= tipc_link_fsm_evt(l, TRAFFIC_EVT, xmitq); + rc = tipc_link_fsm_evt(l, TRAFFIC_EVT, xmitq); if (!link_working(l)) { kfree_skb(__skb_dequeue(arrvq)); return rc; @@ -1156,18 +1093,11 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, return rc; } - /* Synchronize with parallel link if applicable */ - if (unlikely(l->exec_mode == TIPC_LINK_TUNNEL)) - if (!msg_dup(hdr) && !link_synch(l)) { - kfree_skb(skb); - return rc; - } - /* Packet can be delivered */ l->rcv_nxt++; l->stats.recv_info++; if (unlikely(!tipc_data_input(l, skb))) - rc |= tipc_link_input(l, skb); + rc = tipc_link_input(l, skb); /* Ack at regular intervals */ if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) { @@ -1288,7 +1218,7 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, } else { /* RESET_MSG or ACTIVATE_MSG */ msg_set_max_pkt(hdr, l->advertised_mtu); - msg_set_ack(hdr, l->failover_checkpt - 1); + msg_set_ack(hdr, l->rcv_nxt - 1); msg_set_next_sent(hdr, 1); } skb = tipc_buf_acquire(msg_size(hdr)); @@ -1296,223 +1226,75 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, return; skb_copy_to_linear_data(skb, hdr, msg_size(hdr)); skb->priority = TC_PRIO_CONTROL; - __skb_queue_head(xmitq, skb); + __skb_queue_tail(xmitq, skb); } -/* tipc_link_tunnel_xmit(): Tunnel one packet via a link belonging to - * a different bearer. Owner node is locked. +/* tipc_link_tnl_prepare(): prepare and return a list of tunnel packets + * with contents of the link's tranmsit and backlog queues. */ -static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr, - struct tipc_msg *tunnel_hdr, - struct tipc_msg *msg, - u32 selector) +void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, + int mtyp, struct sk_buff_head *xmitq) { - struct tipc_link *tunnel; - struct sk_buff *skb; - u32 length = msg_size(msg); + struct sk_buff *skb, *tnlskb; + struct tipc_msg *hdr, tnlhdr; + struct sk_buff_head *queue = &l->transmq; + struct sk_buff_head tmpxq, tnlq; + u16 pktlen, pktcnt, seqno = l->snd_nxt; - tunnel = node_active_link(l_ptr->owner, selector & 1); - if (!tipc_link_is_up(tunnel)) { - pr_warn("%stunnel link no longer available\n", link_co_err); + if (!tnl) return; - } - msg_set_size(tunnel_hdr, length + INT_H_SIZE); - skb = tipc_buf_acquire(length + INT_H_SIZE); + + skb_queue_head_init(&tnlq); + skb_queue_head_init(&tmpxq); + + /* At least one packet required for safe algorithm => add dummy */ + skb = tipc_msg_create(TIPC_LOW_IMPORTANCE, TIPC_DIRECT_MSG, + BASIC_H_SIZE, 0, l->addr, link_own_addr(l), + 0, 0, TIPC_ERR_NO_PORT); if (!skb) { - pr_warn("%sunable to send tunnel msg\n", link_co_err); + pr_warn("%sunable to create tunnel packet\n", link_co_err); return; } - skb_copy_to_linear_data(skb, tunnel_hdr, INT_H_SIZE); - skb_copy_to_linear_data_offset(skb, INT_H_SIZE, msg, length); - __tipc_link_xmit_skb(tunnel, skb); -} + skb_queue_tail(&tnlq, skb); + tipc_link_xmit(l, &tnlq, &tmpxq); + __skb_queue_purge(&tmpxq); - -/* tipc_link_failover_send_queue(): A link has gone down, but a second - * link is still active. We can do failover. Tunnel the failing link's - * whole send queue via the remaining link. This way, we don't lose - * any packets, and sequence order is preserved for subsequent traffic - * sent over the remaining link. Owner node is locked. - */ -void tipc_link_failover_send_queue(struct tipc_link *l_ptr) -{ - int msgcount; - struct tipc_link *tunnel = node_active_link(l_ptr->owner, 0); - struct tipc_msg tunnel_hdr; - struct sk_buff *skb; - int split_bundles; - - if (!tunnel) - return; - - tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, TUNNEL_PROTOCOL, - FAILOVER_MSG, INT_H_SIZE, l_ptr->addr); - - skb_queue_walk(&l_ptr->backlogq, skb) { - msg_set_seqno(buf_msg(skb), l_ptr->snd_nxt); - l_ptr->snd_nxt = mod(l_ptr->snd_nxt + 1); - } - skb_queue_splice_tail_init(&l_ptr->backlogq, &l_ptr->transmq); - tipc_link_purge_backlog(l_ptr); - msgcount = skb_queue_len(&l_ptr->transmq); - msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); - msg_set_msgcnt(&tunnel_hdr, msgcount); - - if (skb_queue_empty(&l_ptr->transmq)) { - skb = tipc_buf_acquire(INT_H_SIZE); - if (skb) { - skb_copy_to_linear_data(skb, &tunnel_hdr, INT_H_SIZE); - msg_set_size(&tunnel_hdr, INT_H_SIZE); - __tipc_link_xmit_skb(tunnel, skb); - } else { - pr_warn("%sunable to send changeover msg\n", - link_co_err); - } - return; - } - - split_bundles = (node_active_link(l_ptr->owner, 0) != - node_active_link(l_ptr->owner, 0)); - - skb_queue_walk(&l_ptr->transmq, skb) { - struct tipc_msg *msg = buf_msg(skb); - - if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) { - struct tipc_msg *m = msg_get_wrapped(msg); - unchar *pos = (unchar *)m; - - msgcount = msg_msgcnt(msg); - while (msgcount--) { - msg_set_seqno(m, msg_seqno(msg)); - tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, m, - msg_link_selector(m)); - pos += align(msg_size(m)); - m = (struct tipc_msg *)pos; - } - } else { - tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, msg, - msg_link_selector(msg)); - } - } -} - -/* tipc_link_dup_queue_xmit(): A second link has become active. Tunnel a - * duplicate of the first link's send queue via the new link. This way, we - * are guaranteed that currently queued packets from a socket are delivered - * before future traffic from the same socket, even if this is using the - * new link. The last arriving copy of each duplicate packet is dropped at - * the receiving end by the regular protocol check, so packet cardinality - * and sequence order is preserved per sender/receiver socket pair. - * Owner node is locked. - */ -void tipc_link_dup_queue_xmit(struct tipc_link *link, - struct tipc_link *tnl) -{ - struct sk_buff *skb; - struct tipc_msg tnl_hdr; - struct sk_buff_head *queue = &link->transmq; - int mcnt; - u16 seqno; - - tipc_msg_init(link_own_addr(link), &tnl_hdr, TUNNEL_PROTOCOL, - SYNCH_MSG, INT_H_SIZE, link->addr); - mcnt = skb_queue_len(&link->transmq) + skb_queue_len(&link->backlogq); - msg_set_msgcnt(&tnl_hdr, mcnt); - msg_set_bearer_id(&tnl_hdr, link->peer_bearer_id); - -tunnel_queue: + /* Initialize reusable tunnel packet header */ + tipc_msg_init(link_own_addr(l), &tnlhdr, TUNNEL_PROTOCOL, + mtyp, INT_H_SIZE, l->addr); + pktcnt = skb_queue_len(&l->transmq) + skb_queue_len(&l->backlogq); + msg_set_msgcnt(&tnlhdr, pktcnt); + msg_set_bearer_id(&tnlhdr, l->peer_bearer_id); +tnl: + /* Wrap each packet into a tunnel packet */ skb_queue_walk(queue, skb) { - struct sk_buff *outskb; - struct tipc_msg *msg = buf_msg(skb); - u32 len = msg_size(msg); - - msg_set_ack(msg, mod(link->rcv_nxt - 1)); - msg_set_bcast_ack(msg, link->owner->bclink.last_in); - msg_set_size(&tnl_hdr, len + INT_H_SIZE); - outskb = tipc_buf_acquire(len + INT_H_SIZE); - if (outskb == NULL) { - pr_warn("%sunable to send duplicate msg\n", - link_co_err); + hdr = buf_msg(skb); + if (queue == &l->backlogq) + msg_set_seqno(hdr, seqno++); + pktlen = msg_size(hdr); + msg_set_size(&tnlhdr, pktlen + INT_H_SIZE); + tnlskb = tipc_buf_acquire(pktlen + INT_H_SIZE); + if (!tnlskb) { + pr_warn("%sunable to send packet\n", link_co_err); return; } - skb_copy_to_linear_data(outskb, &tnl_hdr, INT_H_SIZE); - skb_copy_to_linear_data_offset(outskb, INT_H_SIZE, - skb->data, len); - __tipc_link_xmit_skb(tnl, outskb); - if (!tipc_link_is_up(link)) - return; + skb_copy_to_linear_data(tnlskb, &tnlhdr, INT_H_SIZE); + skb_copy_to_linear_data_offset(tnlskb, INT_H_SIZE, hdr, pktlen); + __skb_queue_tail(&tnlq, tnlskb); } - if (queue == &link->backlogq) - return; - seqno = link->snd_nxt; - skb_queue_walk(&link->backlogq, skb) { - msg_set_seqno(buf_msg(skb), seqno); - seqno = mod(seqno + 1); + if (queue != &l->backlogq) { + queue = &l->backlogq; + goto tnl; } - queue = &link->backlogq; - goto tunnel_queue; -} -/* tipc_link_failover_rcv(): Receive a tunnelled FAILOVER_MSG packet - * Owner node is locked. - */ -static int tipc_link_failover_rcv(struct tipc_link *link, - struct sk_buff **skb) -{ - struct tipc_msg *msg = buf_msg(*skb); - struct sk_buff *iskb = NULL; - struct tipc_link *pl = NULL; - int bearer_id = msg_bearer_id(msg); - int pos = 0; - int rc = 0; + tipc_link_xmit(tnl, &tnlq, xmitq); - if (msg_type(msg) != FAILOVER_MSG) { - pr_warn("%sunknown tunnel pkt received\n", link_co_err); - goto exit; + if (mtyp == FAILOVER_MSG) { + tnl->drop_point = l->rcv_nxt; + tnl->failover_reasm_skb = l->reasm_buf; + l->reasm_buf = NULL; + l->exec_mode = TIPC_LINK_BLOCKED; } - if (bearer_id >= MAX_BEARERS) - goto exit; - - if (bearer_id == link->bearer_id) - goto exit; - - pl = link->owner->links[bearer_id].link; - - if (link->failover_pkts == FIRST_FAILOVER) - link->failover_pkts = msg_msgcnt(msg); - - /* Should we expect an inner packet? */ - if (!link->failover_pkts) - goto exit; - - if (!tipc_msg_extract(*skb, &iskb, &pos)) { - pr_warn("%sno inner failover pkt\n", link_co_err); - *skb = NULL; - goto exit; - } - link->failover_pkts--; - *skb = NULL; - - /* Was this packet already delivered? */ - if (less(buf_seqno(iskb), link->failover_checkpt)) { - kfree_skb(iskb); - iskb = NULL; - goto exit; - } - if (msg_user(buf_msg(iskb)) == MSG_FRAGMENTER) { - link->stats.recv_fragments++; - if (!tipc_buf_append(&link->failover_skb, &iskb) && - !link->failover_skb) { - link->exec_mode = TIPC_LINK_BLOCKED; - rc |= TIPC_LINK_DOWN_EVT; - } - } -exit: - if (!link->failover_pkts && pl) - pl->exec_mode = TIPC_LINK_OPEN; - kfree_skb(*skb); - *skb = iskb; - return rc; } /* tipc_link_proto_rcv(): receive link level protocol message : @@ -1593,7 +1375,7 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, /* If NACK, retransmit will now start at right position */ if (nacked_gap) { - rc |= tipc_link_retransm(l, nacked_gap, xmitq); + rc = tipc_link_retransm(l, nacked_gap, xmitq); l->stats.recv_nacks++; } tipc_link_advance_backlog(l, xmitq); diff --git a/net/tipc/link.h b/net/tipc/link.h index bb1378b7cb59..e377d9ba41c5 100644 --- a/net/tipc/link.h +++ b/net/tipc/link.h @@ -164,13 +164,11 @@ struct tipc_link { struct tipc_msg *pmsg; u32 priority; char net_plane; - u8 exec_mode; - u16 synch_point; - /* Failover */ - u16 failover_pkts; - u16 failover_checkpt; - struct sk_buff *failover_skb; + /* Failover/synch */ + u8 exec_mode; + u16 drop_point; + struct sk_buff *failover_reasm_skb; /* Max packet negotiation */ u16 mtu; @@ -212,8 +210,8 @@ struct tipc_link *tipc_link_create(struct tipc_node *n, const struct tipc_media_addr *maddr, struct sk_buff_head *inputq, struct sk_buff_head *namedq); -void tipc_link_failover_send_queue(struct tipc_link *l_ptr); -void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr, struct tipc_link *dest); +void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, + int mtyp, struct sk_buff_head *xmitq); void tipc_link_reset_fragments(struct tipc_link *l_ptr); int tipc_link_is_up(struct tipc_link *l_ptr); int tipc_link_is_active(struct tipc_link *l_ptr); diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 234fb0531d1d..115bb2aa6bed 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -110,7 +110,6 @@ struct tipc_skb_cb { struct sk_buff *tail; bool validated; bool wakeup_pending; - bool bundling; u16 chain_sz; u16 chain_imp; }; @@ -559,15 +558,6 @@ static inline void msg_set_node_capabilities(struct tipc_msg *m, u32 n) msg_set_bits(m, 1, 15, 0x1fff, n); } -static inline bool msg_dup(struct tipc_msg *m) -{ - if (likely(msg_user(m) != TUNNEL_PROTOCOL)) - return false; - if (msg_type(m) != SYNCH_MSG) - return false; - return true; -} - /* * Word 2 */ @@ -621,12 +611,12 @@ static inline void msg_set_fragm_no(struct tipc_msg *m, u32 n) } -static inline u32 msg_next_sent(struct tipc_msg *m) +static inline u16 msg_next_sent(struct tipc_msg *m) { return msg_bits(m, 4, 0, 0xffff); } -static inline void msg_set_next_sent(struct tipc_msg *m, u32 n) +static inline void msg_set_next_sent(struct tipc_msg *m, u16 n) { msg_set_bits(m, 4, 0, 0xffff, n); } @@ -727,12 +717,12 @@ static inline char *msg_media_addr(struct tipc_msg *m) /* * Word 9 */ -static inline u32 msg_msgcnt(struct tipc_msg *m) +static inline u16 msg_msgcnt(struct tipc_msg *m) { return msg_bits(m, 9, 16, 0xffff); } -static inline void msg_set_msgcnt(struct tipc_msg *m, u32 n) +static inline void msg_set_msgcnt(struct tipc_msg *m, u16 n) { msg_set_bits(m, 9, 16, 0xffff, n); } @@ -767,19 +757,19 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n) msg_set_bits(m, 9, 0, 0xffff, n); } -static inline bool msg_is_traffic(struct tipc_msg *m) +static inline bool msg_peer_link_is_up(struct tipc_msg *m) { if (likely(msg_user(m) != LINK_PROTOCOL)) return true; - if ((msg_type(m) == RESET_MSG) || (msg_type(m) == ACTIVATE_MSG)) - return false; - return true; + if (msg_type(m) == STATE_MSG) + return true; + return false; } -static inline bool msg_peer_is_up(struct tipc_msg *m) +static inline bool msg_peer_node_is_up(struct tipc_msg *m) { - if (likely(msg_is_traffic(m))) - return false; + if (msg_peer_link_is_up(m)) + return true; return msg_redundant_link(m); } diff --git a/net/tipc/node.c b/net/tipc/node.c index 6b18d73830ca..b0372bb107f6 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -42,6 +42,31 @@ #include "bcast.h" #include "discover.h" +/* Node FSM states and events: + */ +enum { + SELF_DOWN_PEER_DOWN = 0xdd, + SELF_UP_PEER_UP = 0xaa, + SELF_DOWN_PEER_LEAVING = 0xd1, + SELF_UP_PEER_COMING = 0xac, + SELF_COMING_PEER_UP = 0xca, + SELF_LEAVING_PEER_DOWN = 0x1d, + NODE_FAILINGOVER = 0xf0, + NODE_SYNCHING = 0xcc +}; + +enum { + SELF_ESTABL_CONTACT_EVT = 0xece, + SELF_LOST_CONTACT_EVT = 0x1ce, + PEER_ESTABL_CONTACT_EVT = 0x9ece, + PEER_LOST_CONTACT_EVT = 0x91ce, + NODE_FAILOVER_BEGIN_EVT = 0xfbe, + NODE_FAILOVER_END_EVT = 0xfee, + NODE_SYNCH_BEGIN_EVT = 0xcbe, + NODE_SYNCH_END_EVT = 0xcee +}; + +static void tipc_node_link_down(struct tipc_node *n, int bearer_id); static void node_lost_contact(struct tipc_node *n_ptr); static void node_established_contact(struct tipc_node *n_ptr); static void tipc_node_delete(struct tipc_node *node); @@ -281,69 +306,75 @@ static void tipc_node_timeout(unsigned long data) * * Link becomes active (alone or shared) or standby, depending on its priority. */ -void tipc_node_link_up(struct tipc_node *n, int bearer_id) +static void tipc_node_link_up(struct tipc_node *n, int bearer_id, + struct sk_buff_head *xmitq) { int *slot0 = &n->active_links[0]; int *slot1 = &n->active_links[1]; - struct tipc_link_entry *links = n->links; - struct tipc_link *l = n->links[bearer_id].link; - - /* Leave room for tunnel header when returning 'mtu' to users: */ - links[bearer_id].mtu = l->mtu - INT_H_SIZE; + struct tipc_link *ol = node_active_link(n, 0); + struct tipc_link *nl = n->links[bearer_id].link; + if (n->working_links > 1) { + pr_warn("Attempt to establish 3rd link to %x\n", n->addr); + return; + } n->working_links++; n->action_flags |= TIPC_NOTIFY_LINK_UP; - n->link_id = l->peer_bearer_id << 16 | l->bearer_id; + n->link_id = nl->peer_bearer_id << 16 | bearer_id; + + /* Leave room for tunnel header when returning 'mtu' to users: */ + n->links[bearer_id].mtu = nl->mtu - INT_H_SIZE; tipc_bearer_add_dest(n->net, bearer_id, n->addr); pr_debug("Established link <%s> on network plane %c\n", - l->name, l->net_plane); + nl->name, nl->net_plane); - /* No active links ? => take both active slots */ - if (!tipc_node_is_up(n)) { + /* First link? => give it both slots */ + if (!ol) { *slot0 = bearer_id; *slot1 = bearer_id; + nl->exec_mode = TIPC_LINK_OPEN; node_established_contact(n); return; } - /* Lower prio than current active ? => no slot */ - if (l->priority < links[*slot0].link->priority) { - pr_debug("New link <%s> becomes standby\n", l->name); - return; - } - tipc_link_dup_queue_xmit(links[*slot0].link, l); - - /* Same prio as current active ? => take one slot */ - if (l->priority == links[*slot0].link->priority) { + /* Second link => redistribute slots */ + if (nl->priority > ol->priority) { + pr_debug("Old link <%s> becomes standby\n", ol->name); *slot0 = bearer_id; - return; + *slot1 = bearer_id; + } else if (nl->priority == ol->priority) { + *slot0 = bearer_id; + } else { + pr_debug("New link <%s> is standby\n", nl->name); } - /* Higher prio than current active => take both active slots */ - pr_debug("Old link <%s> now standby\n", links[*slot0].link->name); - *slot0 = bearer_id; - *slot1 = bearer_id; + /* Prepare synchronization with first link */ + tipc_link_tnl_prepare(ol, nl, SYNCH_MSG, xmitq); } /** * tipc_node_link_down - handle loss of link */ -void tipc_node_link_down(struct tipc_node *n, int bearer_id) +static void tipc_node_link_down(struct tipc_node *n, int bearer_id) { int *slot0 = &n->active_links[0]; int *slot1 = &n->active_links[1]; + struct tipc_media_addr *maddr = &n->links[bearer_id].maddr; int i, highest = 0; - struct tipc_link *l, *_l; + struct tipc_link *l, *_l, *tnl; + struct sk_buff_head xmitq; l = n->links[bearer_id].link; if (!l || !tipc_link_is_up(l)) return; + __skb_queue_head_init(&xmitq); + n->working_links--; n->action_flags |= TIPC_NOTIFY_LINK_DOWN; - n->link_id = l->peer_bearer_id << 16 | l->bearer_id; + n->link_id = l->peer_bearer_id << 16 | bearer_id; tipc_bearer_remove_dest(n->net, l->bearer_id, n->addr); @@ -370,13 +401,19 @@ void tipc_node_link_down(struct tipc_node *n, int bearer_id) *slot1 = i; } - if (tipc_node_is_up(n)) - tipc_link_failover_send_queue(l); - - tipc_link_reset(l); - - if (!tipc_node_is_up(n)) + if (!tipc_node_is_up(n)) { + tipc_link_reset(l); node_lost_contact(n); + return; + } + + /* There is still a working link => initiate failover */ + tnl = node_active_link(n, 0); + tipc_node_fsm_evt(n, NODE_FAILOVER_BEGIN_EVT); + n->sync_point = tnl->rcv_nxt + (U16_MAX / 2 - 1); + tipc_link_tnl_prepare(l, tnl, FAILOVER_MSG, &xmitq); + tipc_link_reset(l); + tipc_bearer_xmit(n->net, tnl->bearer_id, &xmitq, maddr); } bool tipc_node_is_up(struct tipc_node *n) @@ -652,37 +689,22 @@ static void tipc_node_fsm_evt(struct tipc_node *n, int evt) pr_err("Illegal node fsm evt %x in state %x\n", evt, state); } -bool tipc_node_filter_skb(struct tipc_node *n, struct tipc_link *l, - struct tipc_msg *hdr) +bool tipc_node_filter_pkt(struct tipc_node *n, struct tipc_msg *hdr) { int state = n->state; if (likely(state == SELF_UP_PEER_UP)) return true; - if (state == SELF_DOWN_PEER_DOWN) - return true; - - if (state == SELF_UP_PEER_COMING) { - /* If not traffic msg, peer may still be ESTABLISHING */ - if (tipc_link_is_up(l) && msg_is_traffic(hdr)) - tipc_node_fsm_evt(n, PEER_ESTABL_CONTACT_EVT); - return true; - } - - if (state == SELF_COMING_PEER_UP) - return true; - if (state == SELF_LEAVING_PEER_DOWN) return false; if (state == SELF_DOWN_PEER_LEAVING) { - if (msg_peer_is_up(hdr)) + if (msg_peer_node_is_up(hdr)) return false; - tipc_node_fsm_evt(n, PEER_LOST_CONTACT_EVT); - return true; } - return false; + + return true; } static void node_established_contact(struct tipc_node *n_ptr) @@ -727,10 +749,8 @@ static void node_lost_contact(struct tipc_node *n_ptr) if (!l_ptr) continue; l_ptr->exec_mode = TIPC_LINK_OPEN; - l_ptr->failover_checkpt = 0; - l_ptr->failover_pkts = 0; - kfree_skb(l_ptr->failover_skb); - l_ptr->failover_skb = NULL; + kfree_skb(l_ptr->failover_reasm_skb); + l_ptr->failover_reasm_skb = NULL; tipc_link_reset_fragments(l_ptr); } /* Prevent re-contact with node until cleanup is done */ @@ -961,38 +981,111 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode, return 0; } -/* tipc_node_tnl_init(): handle a received TUNNEL_PROTOCOL packet, - * in order to control parallel link failover or synchronization +/** + * tipc_node_check_state - check and if necessary update node state + * @skb: TIPC packet + * @bearer_id: identity of bearer delivering the packet + * Returns true if state is ok, otherwise consumes buffer and returns false */ -static void tipc_node_tnl_init(struct tipc_node *n, int bearer_id, - struct sk_buff *skb) +static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb, + int bearer_id) { - struct tipc_link *tnl, *pl; struct tipc_msg *hdr = buf_msg(skb); + int usr = msg_user(hdr); + int mtyp = msg_type(hdr); u16 oseqno = msg_seqno(hdr); - int pb_id = msg_bearer_id(hdr); + u16 iseqno = msg_seqno(msg_get_wrapped(hdr)); + u16 exp_pkts = msg_msgcnt(hdr); + u16 rcv_nxt, syncpt, dlv_nxt; + int state = n->state; + struct tipc_link *l, *pl = NULL; + struct sk_buff_head; + int i; - if (pb_id >= MAX_BEARERS) - return; + l = n->links[bearer_id].link; + if (!l) + return false; + rcv_nxt = l->rcv_nxt; - tnl = n->links[bearer_id].link; - if (!tnl) - return; - /* Ignore if duplicate */ - if (less(oseqno, tnl->rcv_nxt)) - return; + if (likely((state == SELF_UP_PEER_UP) && (usr != TUNNEL_PROTOCOL))) + return true; - pl = n->links[pb_id].link; - if (!pl) - return; - - if (msg_type(hdr) == FAILOVER_MSG) { - if (tipc_link_is_up(pl)) { - tipc_node_link_down(n, pb_id); - pl->exec_mode = TIPC_LINK_BLOCKED; + /* Find parallel link, if any */ + for (i = 0; i < MAX_BEARERS; i++) { + if ((i != bearer_id) && n->links[i].link) { + pl = n->links[i].link; + break; } } + + /* Update node accesibility if applicable */ + if (state == SELF_UP_PEER_COMING) { + if (!tipc_link_is_up(l)) + return true; + if (!msg_peer_link_is_up(hdr)) + return true; + tipc_node_fsm_evt(n, PEER_ESTABL_CONTACT_EVT); + } + + if (state == SELF_DOWN_PEER_LEAVING) { + if (msg_peer_node_is_up(hdr)) + return false; + tipc_node_fsm_evt(n, PEER_LOST_CONTACT_EVT); + } + + /* Ignore duplicate packets */ + if (less(oseqno, rcv_nxt)) + return true; + + /* Initiate or update failover mode if applicable */ + if ((usr == TUNNEL_PROTOCOL) && (mtyp == FAILOVER_MSG)) { + syncpt = oseqno + exp_pkts - 1; + if (pl && tipc_link_is_up(pl)) { + tipc_node_link_down(n, pl->bearer_id); + pl->exec_mode = TIPC_LINK_BLOCKED; + } + /* If pkts arrive out of order, use lowest calculated syncpt */ + if (less(syncpt, n->sync_point)) + n->sync_point = syncpt; + } + + /* Open parallel link when tunnel link reaches synch point */ + if ((n->state == NODE_FAILINGOVER) && (more(rcv_nxt, n->sync_point))) { + tipc_node_fsm_evt(n, NODE_FAILOVER_END_EVT); + if (pl) + pl->exec_mode = TIPC_LINK_OPEN; + return true; + } + + /* Initiate or update synch mode if applicable */ + if ((usr == TUNNEL_PROTOCOL) && (mtyp == SYNCH_MSG)) { + syncpt = iseqno + exp_pkts - 1; + if (n->state == SELF_UP_PEER_UP) { + n->sync_point = syncpt; + tipc_node_fsm_evt(n, NODE_SYNCH_BEGIN_EVT); + } + l->exec_mode = TIPC_LINK_TUNNEL; + if (less(syncpt, n->sync_point)) + n->sync_point = syncpt; + } + + /* Open tunnel link when parallel link reaches synch point */ + if ((n->state == NODE_SYNCHING) && (l->exec_mode == TIPC_LINK_TUNNEL)) { + if (pl) + dlv_nxt = mod(pl->rcv_nxt - skb_queue_len(pl->inputq)); + if (!pl || more(dlv_nxt, n->sync_point)) { + tipc_node_fsm_evt(n, NODE_SYNCH_END_EVT); + l->exec_mode = TIPC_LINK_OPEN; + return true; + } + if ((usr == TUNNEL_PROTOCOL) && (mtyp == SYNCH_MSG)) + return true; + if (usr == LINK_PROTOCOL) + return true; + return false; + } + return true; } /** @@ -1008,12 +1101,11 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) { struct sk_buff_head xmitq; struct tipc_node *n; - struct tipc_link *l; - struct tipc_msg *hdr; - struct tipc_media_addr *maddr; + struct tipc_msg *hdr = buf_msg(skb); + int usr = msg_user(hdr); int bearer_id = b->identity; + struct tipc_link_entry *le; int rc = 0; - int usr; __skb_queue_head_init(&xmitq); @@ -1022,8 +1114,6 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) goto discard; /* Handle arrival of a non-unicast link packet */ - hdr = buf_msg(skb); - usr = msg_user(hdr); if (unlikely(msg_non_seq(hdr))) { if (usr == LINK_CONFIG) tipc_disc_rcv(net, skb, b); @@ -1036,42 +1126,41 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) n = tipc_node_find(net, msg_prevnode(hdr)); if (unlikely(!n)) goto discard; + le = &n->links[bearer_id]; + tipc_node_lock(n); - /* Prepare links for tunneled reception if applicable */ - if (unlikely(usr == TUNNEL_PROTOCOL)) - tipc_node_tnl_init(n, bearer_id, skb); - - /* Locate link endpoint that should handle packet */ - l = n->links[bearer_id].link; - if (unlikely(!l)) + /* Is reception permitted at the moment ? */ + if (!tipc_node_filter_pkt(n, hdr)) goto unlock; - /* Is reception of this packet permitted at the moment ? */ - if (unlikely(n->state != SELF_UP_PEER_UP)) - if (!tipc_node_filter_skb(n, l, hdr)) - goto unlock; - - if (unlikely(usr == LINK_PROTOCOL)) + if (unlikely(msg_user(hdr) == LINK_PROTOCOL)) tipc_bclink_sync_state(n, hdr); /* Release acked broadcast messages */ if (unlikely(n->bclink.acked != msg_bcast_ack(hdr))) tipc_bclink_acknowledge(n, msg_bcast_ack(hdr)); - /* Check protocol and update link state */ - rc = tipc_link_rcv(l, skb, &xmitq); + /* Check and if necessary update node state */ + if (likely(tipc_node_check_state(n, skb, bearer_id))) { + rc = tipc_link_rcv(le->link, skb, &xmitq); + skb = NULL; + } if (unlikely(rc & TIPC_LINK_UP_EVT)) - tipc_node_link_up(n, bearer_id); + tipc_node_link_up(n, bearer_id, &xmitq); + if (unlikely(rc & TIPC_LINK_DOWN_EVT)) tipc_node_link_down(n, bearer_id); - skb = NULL; unlock: tipc_node_unlock(n); - tipc_sk_rcv(net, &n->links[bearer_id].inputq); - maddr = &n->links[bearer_id].maddr; - tipc_bearer_xmit(net, bearer_id, &xmitq, maddr); + + if (!skb_queue_empty(&le->inputq)) + tipc_sk_rcv(net, &le->inputq); + + if (!skb_queue_empty(&xmitq)) + tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr); + tipc_node_put(n); discard: kfree_skb(skb); diff --git a/net/tipc/node.h b/net/tipc/node.h index 65e2728f66a6..406c6fe0dbb2 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -47,33 +47,7 @@ #define INVALID_BEARER_ID -1 -/* Node FSM states and events: - */ -enum { - SELF_DOWN_PEER_DOWN = 0xdd, - SELF_UP_PEER_UP = 0xaa, - SELF_DOWN_PEER_LEAVING = 0xd1, - SELF_UP_PEER_COMING = 0xac, - SELF_COMING_PEER_UP = 0xca, - SELF_LEAVING_PEER_DOWN = 0x1d, - NODE_FAILINGOVER = 0xf0, - NODE_SYNCHING = 0xcc -}; - -enum { - SELF_ESTABL_CONTACT_EVT = 0xece, - SELF_LOST_CONTACT_EVT = 0x1ce, - PEER_ESTABL_CONTACT_EVT = 0xfece, - PEER_LOST_CONTACT_EVT = 0xf1ce, - NODE_FAILOVER_BEGIN_EVT = 0xfbe, - NODE_FAILOVER_END_EVT = 0xfee, - NODE_SYNCH_BEGIN_EVT = 0xcbe, - NODE_SYNCH_END_EVT = 0xcee -}; - /* Flags used to take different actions according to flag type - * TIPC_WAIT_PEER_LINKS_DOWN: wait to see that peer's links are down - * TIPC_WAIT_OWN_LINKS_DOWN: wait until peer node is declared down * TIPC_NOTIFY_NODE_DOWN: notify node is down * TIPC_NOTIFY_NODE_UP: notify node is up * TIPC_DISTRIBUTE_NAME: publish or withdraw link state name type @@ -133,6 +107,8 @@ struct tipc_link_entry { * @links: array containing references to all links to node * @action_flags: bit mask of different types of node actions * @bclink: broadcast-related info + * @state: connectivity state vs peer node + * @sync_point: sequence number where synch/failover is finished * @list: links to adjacent nodes in sorted list of cluster's nodes * @working_links: number of working links to node (both active and standby) * @link_cnt: number of links to node @@ -156,6 +132,7 @@ struct tipc_node { struct tipc_node_bclink bclink; struct list_head list; int state; + u16 sync_point; int link_cnt; u16 working_links; u16 capabilities; @@ -180,8 +157,6 @@ bool tipc_node_update_dest(struct tipc_node *n, struct tipc_bearer *bearer, void tipc_node_delete_links(struct net *net, int bearer_id); void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr); void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr); -void tipc_node_link_down(struct tipc_node *n_ptr, int bearer_id); -void tipc_node_link_up(struct tipc_node *n_ptr, int bearer_id); bool tipc_node_is_up(struct tipc_node *n); int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 node, char *linkname, size_t len);