linux_dsm_epyc7002/fs/dlm/midcomms.c
Alexander Aring 4798cbbfbd fs: dlm: rework receive handling
This patch reworks the current receive handling of dlm. As I tried to
change the send handling to fix reorder issues I took a look into the
receive handling and simplified it, it works as the following:

Each connection has a preallocated receive buffer with a minimum length of
4096. On receive, the upper layer protocol will process all dlm message
until there is not enough data anymore. If there exists "leftover" data at
the end of the receive buffer because the dlm message wasn't fully received
it will be copied to the begin of the preallocated receive buffer. Next
receive more data will be appended to the previous "leftover" data and
processing will begin again.

This will remove a lot of code of the current mechanism. Inside the
processing functionality we will ensure with a memmove() that the dlm
message should be memory aligned. To have a dlm message always started
at the beginning of the buffer will reduce some amount of memmove()
calls because src and dest pointers are the same.

The cluster attribute "buffer_size" becomes a new meaning, it's now the
size of application layer receive buffer size. If this is changed during
runtime the receive buffer will be reallocated. It's important that the
receive buffer size has at minimum the size of the maximum possible dlm
message size otherwise the received message cannot be placed inside
the receive buffer size.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
Signed-off-by: David Teigland <teigland@redhat.com>
2020-09-29 14:00:32 -05:00

106 lines
2.7 KiB
C

// SPDX-License-Identifier: GPL-2.0-only
/******************************************************************************
*******************************************************************************
**
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
** Copyright (C) 2004-2008 Red Hat, Inc. All rights reserved.
**
**
*******************************************************************************
******************************************************************************/
/*
* midcomms.c
*
* This is the appallingly named "mid-level" comms layer.
*
* Its purpose is to take packets from the "real" comms layer,
* split them up into packets and pass them to the interested
* part of the locking mechanism.
*
* It also takes messages from the locking layer, formats them
* into packets and sends them to the comms layer.
*/
#include <asm/unaligned.h>
#include "dlm_internal.h"
#include "lowcomms.h"
#include "config.h"
#include "lock.h"
#include "midcomms.h"
/*
* Called from the low-level comms layer to process a buffer of
* commands.
*/
int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
{
const unsigned char *ptr = buf;
const struct dlm_header *hd;
uint16_t msglen;
int ret = 0;
while (len >= sizeof(struct dlm_header)) {
hd = (struct dlm_header *)ptr;
/* no message should be more than this otherwise we
* cannot deliver this message to upper layers
*/
msglen = get_unaligned_le16(&hd->h_length);
if (msglen > DEFAULT_BUFFER_SIZE) {
log_print("received invalid length header: %u, will abort message parsing",
msglen);
return -EBADMSG;
}
/* caller will take care that leftover
* will be parsed next call with more data
*/
if (msglen > len)
break;
switch (hd->h_cmd) {
case DLM_MSG:
if (msglen < sizeof(struct dlm_message)) {
log_print("dlm msg too small: %u, will skip this message",
msglen);
goto skip;
}
break;
case DLM_RCOM:
if (msglen < sizeof(struct dlm_rcom)) {
log_print("dlm rcom msg too small: %u, will skip this message",
msglen);
goto skip;
}
break;
default:
log_print("unsupported h_cmd received: %u, will skip this message",
hd->h_cmd);
goto skip;
}
/* for aligned memory access, we just copy current message
* to begin of the buffer which contains already parsed buffer
* data and should provide align access for upper layers
* because the start address of the buffer has a aligned
* address. This memmove can be removed when the upperlayer
* is capable of unaligned memory access.
*/
memmove(buf, ptr, msglen);
dlm_receive_buffer((union dlm_packet *)buf, nodeid);
skip:
ret += msglen;
len -= msglen;
ptr += msglen;
}
return ret;
}