Revise message passing code.

More here
This commit is contained in:
Neil Brown 2008-07-12 20:27:40 +10:00
parent 4d43913ce0
commit bfa44e2e7a
5 changed files with 111 additions and 170 deletions

View File

@ -428,7 +428,7 @@ void manage(struct mdstat_ent *mdstat, struct supertype *container)
}
}
static int handle_message(struct supertype *container, struct md_message *msg)
static int handle_message(struct supertype *container, struct metadata_update *msg)
{
return -1;
}
@ -436,7 +436,7 @@ static int handle_message(struct supertype *container, struct md_message *msg)
void read_sock(struct supertype *container)
{
int fd;
struct md_message msg;
struct metadata_update msg;
int terminate = 0;
long fl;
int tmo = 3; /* 3 second timeout before hanging up the socket */
@ -450,21 +450,15 @@ void read_sock(struct supertype *container)
fcntl(fd, F_SETFL, fl);
do {
int err;
msg.buf = NULL;
/* read and validate the message */
if (receive_message(fd, &msg, tmo) == 0) {
err = handle_message(container, &msg);
if (!err)
ack(fd, msg.seq, tmo);
else
nack(fd, err, tmo);
} else {
handle_message(container, &msg);
if (ack(fd, tmo) < 0)
terminate = 1;
} else
terminate = 1;
nack(fd, -1, tmo);
}
if (msg.buf)
free(msg.buf);

View File

@ -177,6 +177,7 @@ int main(int argc, char *argv[])
struct mdinfo *mdi, *di;
struct supertype *container;
sigset_t set;
struct sigaction act;
if (argc != 2) {
fprintf(stderr, "Usage: md-manage /device/name/for/container\n");
@ -280,7 +281,11 @@ int main(int argc, char *argv[])
sigemptyset(&set);
sigaddset(&set, SIGUSR1);
sigprocmask(SIG_BLOCK, &set, NULL);
signal(SIGUSR1, wake_me);
act.sa_handler = wake_me;
act.sa_flags = 0;
sigaction(SIGUSR1, &act, NULL);
act.sa_handler = SIG_IGN;
sigaction(SIGPIPE, &act, NULL);
if (clone_monitor(container) < 0) {
fprintf(stderr, "md-manage: failed to start monitor process: %s\n",

View File

@ -457,7 +457,8 @@ static int wait_and_act(struct supertype *container, int nowait)
sigprocmask(SIG_UNBLOCK, NULL, &set);
sigdelset(&set, SIGUSR1);
rv = pselect(maxfd+1, &rfds, NULL, NULL, NULL, &set);
if (rv == -1 && errno == EINTR)
rv = 0;
#ifdef DEBUG
dprint_wake_reasons(&rfds);
#endif

233
msg.c
View File

@ -29,160 +29,113 @@
#include <sys/socket.h>
#include <sys/un.h>
#include "mdadm.h"
#include "mdmon.h"
enum tx_rx_state {
TX_RX_START,
TX_RX_SEQ,
TX_RX_NUM_BYTES,
TX_RX_BUF,
TX_RX_END,
TX_RX_SUCCESS,
TX_RX_ERR,
};
static const __u32 start_magic = 0x5a5aa5a5;
static const __u32 end_magic = 0xa5a55a5a;
const int start_magic = 0x5a5aa5a5;
const int end_magic = 0xa5a55a5a;
#define txrx(fd, buf, size, flags) (recv_send ? \
recv(fd, buf, size, flags) : \
send(fd, buf, size, flags))
/* non-blocking send/receive with n second timeout */
static enum tx_rx_state
tx_rx_message(int fd, struct md_message *msg, int recv_send, int tmo)
static int send_buf(int fd, const void* buf, int len, int tmo)
{
int d = recv_send ? 0 : start_magic;
int flags = recv_send ? 0 : MSG_NOSIGNAL;
enum tx_rx_state state = TX_RX_START;
void *buf = &d;
size_t size = sizeof(d);
off_t n = 0;
int rc;
int again;
fd_set set;
int rv;
struct timeval timeout = {tmo, 0};
struct timeval *ptmo = tmo ? &timeout : NULL;
do {
again = 0;
rc = txrx(fd, buf + n, size - n, flags);
if (rc <= 0) { /* error */
if (rc == -1 && errno == EAGAIN)
again = 1;
else
state = TX_RX_ERR;
} else if (rc + n == size) /* done */
switch (state) {
case TX_RX_START:
if (recv_send && d != start_magic)
state = TX_RX_ERR;
else {
state = TX_RX_SEQ;
buf = &msg->seq;
size = sizeof(msg->seq);
n = 0;
}
break;
case TX_RX_SEQ:
state = TX_RX_NUM_BYTES;
buf = &msg->num_bytes;
size = sizeof(msg->num_bytes);
n = 0;
break;
case TX_RX_NUM_BYTES:
if (msg->num_bytes >
1024*1024)
state = TX_RX_ERR;
else if (recv_send && msg->num_bytes) {
msg->buf = malloc(msg->num_bytes);
if (!msg->buf)
state = TX_RX_ERR;
else {
state = TX_RX_BUF;
buf = msg->buf;
size = msg->num_bytes;
n = 0;
}
} else if (!recv_send && msg->num_bytes) {
state = TX_RX_BUF;
buf = msg->buf;
size = msg->num_bytes;
n = 0;
} else {
d = recv_send ? 0 : end_magic;
state = TX_RX_END;
buf = &d;
size = sizeof(d);
n = 0;
}
break;
case TX_RX_BUF:
d = recv_send ? 0 : end_magic;
state = TX_RX_END;
buf = &d;
size = sizeof(d);
n = 0;
break;
case TX_RX_END:
if (recv_send && d != end_magic)
state = TX_RX_ERR;
else
state = TX_RX_SUCCESS;
break;
case TX_RX_ERR:
case TX_RX_SUCCESS:
break;
}
else /* continue */
n += rc;
while (len) {
FD_ZERO(&set);
FD_SET(fd, &set);
rv = select(fd+1, NULL, &set, NULL, ptmo);
if (rv <= 0)
return -1;
rv = write(fd, buf, len);
if (rv <= 0)
return -1;
len -= rv;
buf += rv;
}
return 0;
}
if (again) {
fd_set set;
struct timeval timeout = { tmo, 0 };
struct timeval *ptmo = tmo ? &timeout : NULL;
static int recv_buf(int fd, void* buf, int len, int tmo)
{
fd_set set;
int rv;
struct timeval timeout = {tmo, 0};
struct timeval *ptmo = tmo ? &timeout : NULL;
FD_ZERO(&set);
FD_SET(fd, &set);
while (len) {
FD_ZERO(&set);
FD_SET(fd, &set);
rv = select(fd+1, &set, NULL, NULL, ptmo);
if (rv <= 0)
return -1;
rv = read(fd, buf, len);
if (rv <= 0)
return -1;
len -= rv;
buf += rv;
}
return 0;
}
if (recv_send)
rc = select(fd + 1, &set, NULL, NULL, ptmo);
else
rc = select(fd + 1, NULL, &set, NULL, ptmo);
if (rc <= 0)
state = TX_RX_ERR;
int send_message(int fd, struct metadata_update *msg, int tmo)
{
__u32 len = msg->len;
int rv;
rv = send_buf(fd, &start_magic, 4, tmo);
rv = rv ?: send_buf(fd, &len, 4, tmo);
if (len)
rv = rv ?: send_buf(fd, msg->buf, msg->len, tmo);
rv = send_buf(fd, &end_magic, 4, tmo);
return rv;
}
int receive_message(int fd, struct metadata_update *msg, int tmo)
{
__u32 magic;
__u32 len;
int rv;
rv = recv_buf(fd, &magic, 4, tmo);
if (rv < 0 || magic != start_magic)
return -1;
rv = recv_buf(fd, &len, 4, tmo);
if (rv < 0 || len > MSG_MAX_LEN)
return -1;
if (len) {
msg->buf = malloc(len);
if (msg->buf == NULL)
return -1;
rv = recv_buf(fd, msg->buf, len, tmo);
if (rv < 0) {
free(msg->buf);
return -1;
}
} while (state < TX_RX_SUCCESS);
return state;
}
int receive_message(int fd, struct md_message *msg, int tmo)
{
if (tx_rx_message(fd, msg, 1, tmo) == TX_RX_SUCCESS)
return 0;
else
} else
msg->buf = NULL;
rv = recv_buf(fd, &magic, 4, tmo);
if (rv < 0 || magic != end_magic) {
free(msg->buf);
return -1;
}
msg->len = len;
return 0;
}
int send_message(int fd, struct md_message *msg, int tmo)
int ack(int fd, int tmo)
{
if (tx_rx_message(fd, msg, 0, tmo) == TX_RX_SUCCESS)
return 0;
else
return -1;
}
int ack(int fd, int seq, int tmo)
{
struct md_message msg = { .seq = seq, .num_bytes = 0 };
struct metadata_update msg = { .len = 0 };
return send_message(fd, &msg, tmo);
}
int nack(int fd, int err, int tmo)
int wait_reply(int fd, int tmo)
{
struct md_message msg = { .seq = err, .num_bytes = 0 };
return send_message(fd, &msg, tmo);
struct metadata_update msg;
return receive_message(fd, &msg, tmo);
}
int connect_monitor(char *devname)
@ -214,21 +167,17 @@ int connect_monitor(char *devname)
int ping_monitor(char *devname)
{
int sfd = connect_monitor(devname);
struct md_message msg;
int err = 0;
if (sfd < 0)
return sfd;
/* try to ping existing socket */
if (ack(sfd, 0, 0) != 0)
if (ack(sfd, 0) != 0)
err = -1;
/* check the reply */
if (!err && receive_message(sfd, &msg, 0) != 0)
err = -1;
if (msg.seq != 0)
if (!err && wait_reply(sfd, 0) != 0)
err = -1;
close(sfd);

20
msg.h
View File

@ -19,21 +19,13 @@
struct mdinfo;
struct metadata_update;
struct mdinfo;
struct md_message {
int seq;
int num_bytes;
void *buf;
};
extern const int start_magic;
extern const int end_magic;
extern int receive_message(int fd, struct md_message *msg, int tmo);
extern int send_message(int fd, struct md_message *msg, int tmo);
extern int ack(int fd, int seq, int tmo);
extern int nack(int fd, int err, int tmo);
extern int receive_message(int fd, struct metadata_update *msg, int tmo);
extern int send_message(int fd, struct metadata_update *msg, int tmo);
extern int ack(int fd, int tmo);
extern int wait_reply(int fd, int tmo);
extern int connect_monitor(char *devname);
extern int ping_monitor(char *devname);
#define MSG_MAX_LEN (4*1024*1024)