|
24 | 24 | #include "storage/procsignal.h"
|
25 | 25 | #include "storage/shm_mq.h"
|
26 | 26 | #include "storage/spin.h"
|
| 27 | +#include "utils/memutils.h" |
27 | 28 |
|
28 | 29 | /*
|
29 | 30 | * This structure represents the actual queue, stored in shared memory.
|
@@ -360,6 +361,13 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
|
360 | 361 | for (i = 0; i < iovcnt; ++i)
|
361 | 362 | nbytes += iov[i].len;
|
362 | 363 |
|
| 364 | + /* Prevent writing messages overwhelming the receiver. */ |
| 365 | + if (nbytes > MaxAllocSize) |
| 366 | + ereport(ERROR, |
| 367 | + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), |
| 368 | + errmsg("cannot send a message of size %zu via shared memory queue", |
| 369 | + nbytes))); |
| 370 | + |
363 | 371 | /* Try to write, or finish writing, the length word into the buffer. */
|
364 | 372 | while (!mqh->mqh_length_word_complete)
|
365 | 373 | {
|
@@ -675,6 +683,17 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
|
675 | 683 | }
|
676 | 684 | nbytes = mqh->mqh_expected_bytes;
|
677 | 685 |
|
| 686 | + /* |
| 687 | + * Should be disallowed on the sending side already, but better check and |
| 688 | + * error out on the receiver side as well rather than trying to read a |
| 689 | + * prohibitively large message. |
| 690 | + */ |
| 691 | + if (nbytes > MaxAllocSize) |
| 692 | + ereport(ERROR, |
| 693 | + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), |
| 694 | + errmsg("invalid message size %zu in shared memory queue", |
| 695 | + nbytes))); |
| 696 | + |
678 | 697 | if (mqh->mqh_partial_bytes == 0)
|
679 | 698 | {
|
680 | 699 | /*
|
@@ -703,8 +722,13 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
|
703 | 722 | {
|
704 | 723 | Size newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE);
|
705 | 724 |
|
| 725 | + /* |
| 726 | + * Double the buffer size until the payload fits, but limit to |
| 727 | + * MaxAllocSize. |
| 728 | + */ |
706 | 729 | while (newbuflen < nbytes)
|
707 | 730 | newbuflen *= 2;
|
| 731 | + newbuflen = Min(newbuflen, MaxAllocSize); |
708 | 732 |
|
709 | 733 | if (mqh->mqh_buffer != NULL)
|
710 | 734 | {
|
|
0 commit comments