Skip to content

Commit

Permalink
QIOChannel: Add flags on io_writev and introduce io_flush callback
Browse files Browse the repository at this point in the history
Add flags to io_writev and introduce io_flush as optional callback to
QIOChannelClass, allowing the implementation of zero copy writes by
subclasses.

How to use them:
- Write data using qio_channel_writev*(...,QIO_CHANNEL_WRITE_FLAG_ZERO_COPY),
- Wait write completion with qio_channel_flush().

Notes:
As some zero copy write implementations work asynchronously, it's
recommended to keep the write buffer untouched until the return of
qio_channel_flush(), to avoid the risk of sending an updated buffer
instead of the buffer state during write.

As io_flush callback is optional, if a subclass does not implement it, then:
- io_flush will return 0 without changing anything.

Also, some functions like qio_channel_writev_full_all() were adapted to
receive a flag parameter. That allows shared code between zero copy and
non-zero copy writev, and also an easier implementation on new flags.

Signed-off-by: Leonardo Bras <[email protected]>
Reviewed-by: Daniel P. Berrangé <[email protected]>
Reviewed-by: Peter Xu <[email protected]>
Reviewed-by: Juan Quintela <[email protected]>
Message-Id: <[email protected]>
Signed-off-by: Dr. David Alan Gilbert <[email protected]>
  • Loading branch information
Leonardo Bras authored and dagrh committed May 16, 2022
1 parent 354081d commit b88651c
Show file tree
Hide file tree
Showing 13 changed files with 88 additions and 14 deletions.
2 changes: 1 addition & 1 deletion chardev/char-io.c
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ int io_channel_send_full(QIOChannel *ioc,

ret = qio_channel_writev_full(
ioc, &iov, 1,
fds, nfds, NULL);
fds, nfds, 0, NULL);
if (ret == QIO_CHANNEL_ERR_BLOCK) {
if (offset) {
return offset;
Expand Down
2 changes: 1 addition & 1 deletion hw/remote/mpqemu-link.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ bool mpqemu_msg_send(MPQemuMsg *msg, QIOChannel *ioc, Error **errp)
}

if (!qio_channel_writev_full_all(ioc, send, G_N_ELEMENTS(send),
fds, nfds, errp)) {
fds, nfds, 0, errp)) {
ret = true;
} else {
trace_mpqemu_send_io_error(msg->cmd, msg->size, nfds);
Expand Down
38 changes: 37 additions & 1 deletion include/io/channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,15 @@ OBJECT_DECLARE_TYPE(QIOChannel, QIOChannelClass,

#define QIO_CHANNEL_ERR_BLOCK -2

#define QIO_CHANNEL_WRITE_FLAG_ZERO_COPY 0x1

typedef enum QIOChannelFeature QIOChannelFeature;

enum QIOChannelFeature {
QIO_CHANNEL_FEATURE_FD_PASS,
QIO_CHANNEL_FEATURE_SHUTDOWN,
QIO_CHANNEL_FEATURE_LISTEN,
QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY,
};


Expand Down Expand Up @@ -104,6 +107,7 @@ struct QIOChannelClass {
size_t niov,
int *fds,
size_t nfds,
int flags,
Error **errp);
ssize_t (*io_readv)(QIOChannel *ioc,
const struct iovec *iov,
Expand Down Expand Up @@ -136,6 +140,8 @@ struct QIOChannelClass {
IOHandler *io_read,
IOHandler *io_write,
void *opaque);
int (*io_flush)(QIOChannel *ioc,
Error **errp);
};

/* General I/O handling functions */
Expand Down Expand Up @@ -228,6 +234,7 @@ ssize_t qio_channel_readv_full(QIOChannel *ioc,
* @niov: the length of the @iov array
* @fds: an array of file handles to send
* @nfds: number of file handles in @fds
* @flags: write flags (QIO_CHANNEL_WRITE_FLAG_*)
* @errp: pointer to a NULL-initialized error object
*
* Write data to the IO channel, reading it from the
Expand Down Expand Up @@ -260,6 +267,7 @@ ssize_t qio_channel_writev_full(QIOChannel *ioc,
size_t niov,
int *fds,
size_t nfds,
int flags,
Error **errp);

/**
Expand Down Expand Up @@ -837,6 +845,7 @@ int qio_channel_readv_full_all(QIOChannel *ioc,
* @niov: the length of the @iov array
* @fds: an array of file handles to send
* @nfds: number of file handles in @fds
* @flags: write flags (QIO_CHANNEL_WRITE_FLAG_*)
* @errp: pointer to a NULL-initialized error object
*
*
Expand All @@ -846,13 +855,40 @@ int qio_channel_readv_full_all(QIOChannel *ioc,
* to be written, yielding from the current coroutine
* if required.
*
* If QIO_CHANNEL_WRITE_FLAG_ZERO_COPY is passed in flags,
* instead of waiting for all requested data to be written,
* this function will wait until it's all queued for writing.
* In this case, if the buffer gets changed between queueing and
* sending, the updated buffer will be sent. If this is not a
* desired behavior, it's suggested to call qio_channel_flush()
* before reusing the buffer.
*
* Returns: 0 if all bytes were written, or -1 on error
*/

int qio_channel_writev_full_all(QIOChannel *ioc,
const struct iovec *iov,
size_t niov,
int *fds, size_t nfds,
Error **errp);
int flags, Error **errp);

/**
* qio_channel_flush:
* @ioc: the channel object
* @errp: pointer to a NULL-initialized error object
*
* Will block until every packet queued with
* qio_channel_writev_full() + QIO_CHANNEL_WRITE_FLAG_ZERO_COPY
* is sent, or return in case of any error.
*
* If not implemented, acts as a no-op, and returns 0.
*
* Returns -1 if any error is found,
* 1 if every send failed to use zero copy.
* 0 otherwise.
*/

int qio_channel_flush(QIOChannel *ioc,
Error **errp);

#endif /* QIO_CHANNEL_H */
1 change: 1 addition & 0 deletions io/channel-buffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ static ssize_t qio_channel_buffer_writev(QIOChannel *ioc,
size_t niov,
int *fds,
size_t nfds,
int flags,
Error **errp)
{
QIOChannelBuffer *bioc = QIO_CHANNEL_BUFFER(ioc);
Expand Down
1 change: 1 addition & 0 deletions io/channel-command.c
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ static ssize_t qio_channel_command_writev(QIOChannel *ioc,
size_t niov,
int *fds,
size_t nfds,
int flags,
Error **errp)
{
QIOChannelCommand *cioc = QIO_CHANNEL_COMMAND(ioc);
Expand Down
1 change: 1 addition & 0 deletions io/channel-file.c
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ static ssize_t qio_channel_file_writev(QIOChannel *ioc,
size_t niov,
int *fds,
size_t nfds,
int flags,
Error **errp)
{
QIOChannelFile *fioc = QIO_CHANNEL_FILE(ioc);
Expand Down
2 changes: 2 additions & 0 deletions io/channel-socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@ static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
size_t niov,
int *fds,
size_t nfds,
int flags,
Error **errp)
{
QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
Expand Down Expand Up @@ -619,6 +620,7 @@ static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
size_t niov,
int *fds,
size_t nfds,
int flags,
Error **errp)
{
QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
Expand Down
1 change: 1 addition & 0 deletions io/channel-tls.c
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ static ssize_t qio_channel_tls_writev(QIOChannel *ioc,
size_t niov,
int *fds,
size_t nfds,
int flags,
Error **errp)
{
QIOChannelTLS *tioc = QIO_CHANNEL_TLS(ioc);
Expand Down
1 change: 1 addition & 0 deletions io/channel-websock.c
Original file line number Diff line number Diff line change
Expand Up @@ -1127,6 +1127,7 @@ static ssize_t qio_channel_websock_writev(QIOChannel *ioc,
size_t niov,
int *fds,
size_t nfds,
int flags,
Error **errp)
{
QIOChannelWebsock *wioc = QIO_CHANNEL_WEBSOCK(ioc);
Expand Down
49 changes: 39 additions & 10 deletions io/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,32 @@ ssize_t qio_channel_writev_full(QIOChannel *ioc,
size_t niov,
int *fds,
size_t nfds,
int flags,
Error **errp)
{
QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);

if ((fds || nfds) &&
!qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
if (fds || nfds) {
if (!qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
error_setg_errno(errp, EINVAL,
"Channel does not support file descriptor passing");
return -1;
}
if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
error_setg_errno(errp, EINVAL,
"Zero Copy does not support file descriptor passing");
return -1;
}
}

if ((flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) &&
!qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY)) {
error_setg_errno(errp, EINVAL,
"Channel does not support file descriptor passing");
"Requested Zero Copy feature is not available");
return -1;
}

return klass->io_writev(ioc, iov, niov, fds, nfds, errp);
return klass->io_writev(ioc, iov, niov, fds, nfds, flags, errp);
}


Expand Down Expand Up @@ -217,14 +231,14 @@ int qio_channel_writev_all(QIOChannel *ioc,
size_t niov,
Error **errp)
{
return qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, errp);
return qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, 0, errp);
}

int qio_channel_writev_full_all(QIOChannel *ioc,
const struct iovec *iov,
size_t niov,
int *fds, size_t nfds,
Error **errp)
int flags, Error **errp)
{
int ret = -1;
struct iovec *local_iov = g_new(struct iovec, niov);
Expand All @@ -237,8 +251,10 @@ int qio_channel_writev_full_all(QIOChannel *ioc,

while (nlocal_iov > 0) {
ssize_t len;
len = qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds, nfds,
errp);

len = qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds,
nfds, flags, errp);

if (len == QIO_CHANNEL_ERR_BLOCK) {
if (qemu_in_coroutine()) {
qio_channel_yield(ioc, G_IO_OUT);
Expand Down Expand Up @@ -277,7 +293,7 @@ ssize_t qio_channel_writev(QIOChannel *ioc,
size_t niov,
Error **errp)
{
return qio_channel_writev_full(ioc, iov, niov, NULL, 0, errp);
return qio_channel_writev_full(ioc, iov, niov, NULL, 0, 0, errp);
}


Expand All @@ -297,7 +313,7 @@ ssize_t qio_channel_write(QIOChannel *ioc,
Error **errp)
{
struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen };
return qio_channel_writev_full(ioc, &iov, 1, NULL, 0, errp);
return qio_channel_writev_full(ioc, &iov, 1, NULL, 0, 0, errp);
}


Expand Down Expand Up @@ -473,6 +489,19 @@ off_t qio_channel_io_seek(QIOChannel *ioc,
return klass->io_seek(ioc, offset, whence, errp);
}

int qio_channel_flush(QIOChannel *ioc,
Error **errp)
{
QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);

if (!klass->io_flush ||
!qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY)) {
return 0;
}

return klass->io_flush(ioc, errp);
}


static void qio_channel_restart_read(void *opaque)
{
Expand Down
1 change: 1 addition & 0 deletions migration/rdma.c
Original file line number Diff line number Diff line change
Expand Up @@ -2840,6 +2840,7 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
size_t niov,
int *fds,
size_t nfds,
int flags,
Error **errp)
{
QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
Expand Down
2 changes: 1 addition & 1 deletion scsi/pr-manager-helper.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ static int pr_manager_helper_write(PRManagerHelper *pr_mgr,
iov.iov_base = (void *)buf;
iov.iov_len = sz;
n_written = qio_channel_writev_full(QIO_CHANNEL(pr_mgr->ioc), &iov, 1,
nfds ? &fd : NULL, nfds, errp);
nfds ? &fd : NULL, nfds, 0, errp);

if (n_written <= 0) {
assert(n_written != QIO_CHANNEL_ERR_BLOCK);
Expand Down
1 change: 1 addition & 0 deletions tests/unit/test-io-channel-socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ static void test_io_channel_unix_fd_pass(void)
G_N_ELEMENTS(iosend),
fdsend,
G_N_ELEMENTS(fdsend),
0,
&error_abort);

qio_channel_readv_full(dst,
Expand Down

0 comments on commit b88651c

Please sign in to comment.