virtualx-engine/thirdparty/libwebsockets/roles/ws/ops-ws.c
2019-05-01 14:41:47 +02:00

1994 lines
51 KiB
C

/*
* libwebsockets - small server side websockets and web server implementation
*
* Copyright (C) 2010-2018 Andy Green <andy@warmcat.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation:
* version 2.1 of the License.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
* MA 02110-1301 USA
*/
#include <core/private.h>
#define LWS_CPYAPP(ptr, str) { strcpy(ptr, str); ptr += strlen(str); }
/*
* client-parser.c: lws_ws_client_rx_sm() needs to be roughly kept in
* sync with changes here, esp related to ext draining
*/
int
lws_ws_rx_sm(struct lws *wsi, char already_processed, unsigned char c)
{
int callback_action = LWS_CALLBACK_RECEIVE;
int ret = 0;
unsigned short close_code;
struct lws_tokens ebuf;
unsigned char *pp;
int n = 0;
#if !defined(LWS_WITHOUT_EXTENSIONS)
int rx_draining_ext = 0;
int lin;
#endif
ebuf.token = NULL;
ebuf.len = 0;
if (wsi->socket_is_permanently_unusable)
return -1;
switch (wsi->lws_rx_parse_state) {
case LWS_RXPS_NEW:
#if !defined(LWS_WITHOUT_EXTENSIONS)
if (wsi->ws->rx_draining_ext) {
ebuf.token = NULL;
ebuf.len = 0;
lws_remove_wsi_from_draining_ext_list(wsi);
rx_draining_ext = 1;
lwsl_debug("%s: doing draining flow\n", __func__);
goto drain_extension;
}
#endif
switch (wsi->ws->ietf_spec_revision) {
case 13:
/*
* no prepended frame key any more
*/
wsi->ws->all_zero_nonce = 1;
goto handle_first;
default:
lwsl_warn("lws_ws_rx_sm: unknown spec version %d\n",
wsi->ws->ietf_spec_revision);
break;
}
break;
case LWS_RXPS_04_mask_1:
wsi->ws->mask[1] = c;
if (c)
wsi->ws->all_zero_nonce = 0;
wsi->lws_rx_parse_state = LWS_RXPS_04_mask_2;
break;
case LWS_RXPS_04_mask_2:
wsi->ws->mask[2] = c;
if (c)
wsi->ws->all_zero_nonce = 0;
wsi->lws_rx_parse_state = LWS_RXPS_04_mask_3;
break;
case LWS_RXPS_04_mask_3:
wsi->ws->mask[3] = c;
if (c)
wsi->ws->all_zero_nonce = 0;
/*
* start from the zero'th byte in the XOR key buffer since
* this is the start of a frame with a new key
*/
wsi->ws->mask_idx = 0;
wsi->lws_rx_parse_state = LWS_RXPS_04_FRAME_HDR_1;
break;
/*
* 04 logical framing from the spec (all this is masked when incoming
* and has to be unmasked)
*
* We ignore the possibility of extension data because we don't
* negotiate any extensions at the moment.
*
* 0 1 2 3
* 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
* +-+-+-+-+-------+-+-------------+-------------------------------+
* |F|R|R|R| opcode|R| Payload len | Extended payload length |
* |I|S|S|S| (4) |S| (7) | (16/63) |
* |N|V|V|V| |V| | (if payload len==126/127) |
* | |1|2|3| |4| | |
* +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
* | Extended payload length continued, if payload len == 127 |
* + - - - - - - - - - - - - - - - +-------------------------------+
* | | Extension data |
* +-------------------------------+ - - - - - - - - - - - - - - - +
* : :
* +---------------------------------------------------------------+
* : Application data :
* +---------------------------------------------------------------+
*
* We pass payload through to userland as soon as we get it, ignoring
* FIN. It's up to userland to buffer it up if it wants to see a
* whole unfragmented block of the original size (which may be up to
* 2^63 long!)
*/
case LWS_RXPS_04_FRAME_HDR_1:
handle_first:
wsi->ws->opcode = c & 0xf;
wsi->ws->rsv = c & 0x70;
wsi->ws->final = !!((c >> 7) & 1);
wsi->ws->defeat_check_utf8 = 0;
if (((wsi->ws->opcode) & 8) && !wsi->ws->final) {
lws_close_reason(wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR,
(uint8_t *)"frag ctl", 8);
return -1;
}
switch (wsi->ws->opcode) {
case LWSWSOPC_TEXT_FRAME:
wsi->ws->check_utf8 = lws_check_opt(
wsi->context->options,
LWS_SERVER_OPTION_VALIDATE_UTF8);
/* fallthru */
case LWSWSOPC_BINARY_FRAME:
if (wsi->ws->opcode == LWSWSOPC_BINARY_FRAME)
wsi->ws->check_utf8 = 0;
if (wsi->ws->continuation_possible) {
lws_close_reason(wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, (uint8_t *)"bad cont", 8);
return -1;
}
wsi->ws->rsv_first_msg = (c & 0x70);
wsi->ws->frame_is_binary =
wsi->ws->opcode == LWSWSOPC_BINARY_FRAME;
wsi->ws->first_fragment = 1;
wsi->ws->continuation_possible = !wsi->ws->final;
break;
case LWSWSOPC_CONTINUATION:
if (!wsi->ws->continuation_possible) {
lws_close_reason(wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, (uint8_t *)"bad cont", 8);
return -1;
}
break;
case LWSWSOPC_CLOSE:
wsi->ws->check_utf8 = 0;
wsi->ws->utf8 = 0;
break;
case 3:
case 4:
case 5:
case 6:
case 7:
case 0xb:
case 0xc:
case 0xd:
case 0xe:
case 0xf:
lws_close_reason(wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, (uint8_t *)"bad opc", 7);
lwsl_info("illegal opcode\n");
return -1;
}
if (wsi->ws->owed_a_fin &&
(wsi->ws->opcode == LWSWSOPC_TEXT_FRAME ||
wsi->ws->opcode == LWSWSOPC_BINARY_FRAME)) {
lwsl_info("hey you owed us a FIN\n");
lws_close_reason(wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, (uint8_t *)"bad fin", 7);
return -1;
}
if ((!(wsi->ws->opcode & 8)) && wsi->ws->final) {
wsi->ws->continuation_possible = 0;
wsi->ws->owed_a_fin = 0;
}
if (!wsi->ws->final)
wsi->ws->owed_a_fin = 1;
wsi->lws_rx_parse_state = LWS_RXPS_04_FRAME_HDR_LEN;
if (wsi->ws->rsv &&
(
#if !defined(LWS_WITHOUT_EXTENSIONS)
!wsi->ws->count_act_ext ||
#endif
(wsi->ws->rsv & ~0x40))) {
lws_close_reason(wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR,
(uint8_t *)"rsv bits", 8);
return -1;
}
break;
case LWS_RXPS_04_FRAME_HDR_LEN:
wsi->ws->this_frame_masked = !!(c & 0x80);
switch (c & 0x7f) {
case 126:
/* control frames are not allowed to have big lengths */
if (wsi->ws->opcode & 8)
goto illegal_ctl_length;
wsi->lws_rx_parse_state = LWS_RXPS_04_FRAME_HDR_LEN16_2;
break;
case 127:
/* control frames are not allowed to have big lengths */
if (wsi->ws->opcode & 8)
goto illegal_ctl_length;
wsi->lws_rx_parse_state = LWS_RXPS_04_FRAME_HDR_LEN64_8;
break;
default:
wsi->ws->rx_packet_length = c & 0x7f;
if (wsi->ws->this_frame_masked)
wsi->lws_rx_parse_state =
LWS_RXPS_07_COLLECT_FRAME_KEY_1;
else
if (wsi->ws->rx_packet_length) {
wsi->lws_rx_parse_state =
LWS_RXPS_WS_FRAME_PAYLOAD;
} else {
wsi->lws_rx_parse_state = LWS_RXPS_NEW;
goto spill;
}
break;
}
break;
case LWS_RXPS_04_FRAME_HDR_LEN16_2:
wsi->ws->rx_packet_length = c << 8;
wsi->lws_rx_parse_state = LWS_RXPS_04_FRAME_HDR_LEN16_1;
break;
case LWS_RXPS_04_FRAME_HDR_LEN16_1:
wsi->ws->rx_packet_length |= c;
if (wsi->ws->this_frame_masked)
wsi->lws_rx_parse_state =
LWS_RXPS_07_COLLECT_FRAME_KEY_1;
else {
wsi->lws_rx_parse_state =
LWS_RXPS_WS_FRAME_PAYLOAD;
}
break;
case LWS_RXPS_04_FRAME_HDR_LEN64_8:
if (c & 0x80) {
lwsl_warn("b63 of length must be zero\n");
/* kill the connection */
return -1;
}
#if defined __LP64__
wsi->ws->rx_packet_length = ((size_t)c) << 56;
#else
wsi->ws->rx_packet_length = 0;
#endif
wsi->lws_rx_parse_state = LWS_RXPS_04_FRAME_HDR_LEN64_7;
break;
case LWS_RXPS_04_FRAME_HDR_LEN64_7:
#if defined __LP64__
wsi->ws->rx_packet_length |= ((size_t)c) << 48;
#endif
wsi->lws_rx_parse_state = LWS_RXPS_04_FRAME_HDR_LEN64_6;
break;
case LWS_RXPS_04_FRAME_HDR_LEN64_6:
#if defined __LP64__
wsi->ws->rx_packet_length |= ((size_t)c) << 40;
#endif
wsi->lws_rx_parse_state = LWS_RXPS_04_FRAME_HDR_LEN64_5;
break;
case LWS_RXPS_04_FRAME_HDR_LEN64_5:
#if defined __LP64__
wsi->ws->rx_packet_length |= ((size_t)c) << 32;
#endif
wsi->lws_rx_parse_state = LWS_RXPS_04_FRAME_HDR_LEN64_4;
break;
case LWS_RXPS_04_FRAME_HDR_LEN64_4:
wsi->ws->rx_packet_length |= ((size_t)c) << 24;
wsi->lws_rx_parse_state = LWS_RXPS_04_FRAME_HDR_LEN64_3;
break;
case LWS_RXPS_04_FRAME_HDR_LEN64_3:
wsi->ws->rx_packet_length |= ((size_t)c) << 16;
wsi->lws_rx_parse_state = LWS_RXPS_04_FRAME_HDR_LEN64_2;
break;
case LWS_RXPS_04_FRAME_HDR_LEN64_2:
wsi->ws->rx_packet_length |= ((size_t)c) << 8;
wsi->lws_rx_parse_state = LWS_RXPS_04_FRAME_HDR_LEN64_1;
break;
case LWS_RXPS_04_FRAME_HDR_LEN64_1:
wsi->ws->rx_packet_length |= ((size_t)c);
if (wsi->ws->this_frame_masked)
wsi->lws_rx_parse_state =
LWS_RXPS_07_COLLECT_FRAME_KEY_1;
else
wsi->lws_rx_parse_state = LWS_RXPS_WS_FRAME_PAYLOAD;
break;
case LWS_RXPS_07_COLLECT_FRAME_KEY_1:
wsi->ws->mask[0] = c;
if (c)
wsi->ws->all_zero_nonce = 0;
wsi->lws_rx_parse_state = LWS_RXPS_07_COLLECT_FRAME_KEY_2;
break;
case LWS_RXPS_07_COLLECT_FRAME_KEY_2:
wsi->ws->mask[1] = c;
if (c)
wsi->ws->all_zero_nonce = 0;
wsi->lws_rx_parse_state = LWS_RXPS_07_COLLECT_FRAME_KEY_3;
break;
case LWS_RXPS_07_COLLECT_FRAME_KEY_3:
wsi->ws->mask[2] = c;
if (c)
wsi->ws->all_zero_nonce = 0;
wsi->lws_rx_parse_state = LWS_RXPS_07_COLLECT_FRAME_KEY_4;
break;
case LWS_RXPS_07_COLLECT_FRAME_KEY_4:
wsi->ws->mask[3] = c;
if (c)
wsi->ws->all_zero_nonce = 0;
wsi->lws_rx_parse_state = LWS_RXPS_WS_FRAME_PAYLOAD;
wsi->ws->mask_idx = 0;
if (wsi->ws->rx_packet_length == 0) {
wsi->lws_rx_parse_state = LWS_RXPS_NEW;
goto spill;
}
break;
case LWS_RXPS_WS_FRAME_PAYLOAD:
assert(wsi->ws->rx_ubuf);
if (wsi->ws->rx_ubuf_head + LWS_PRE >= wsi->ws->rx_ubuf_alloc) {
lwsl_err("Attempted overflow \n");
return -1;
}
if (!(already_processed & ALREADY_PROCESSED_IGNORE_CHAR)) {
if (wsi->ws->all_zero_nonce)
wsi->ws->rx_ubuf[LWS_PRE + (wsi->ws->rx_ubuf_head++)] =
c;
else
wsi->ws->rx_ubuf[LWS_PRE + (wsi->ws->rx_ubuf_head++)] =
c ^ wsi->ws->mask[(wsi->ws->mask_idx++) & 3];
--wsi->ws->rx_packet_length;
}
if (!wsi->ws->rx_packet_length) {
lwsl_debug("%s: ws fragment length exhausted\n", __func__);
/* spill because we have the whole frame */
wsi->lws_rx_parse_state = LWS_RXPS_NEW;
goto spill;
}
#if !defined(LWS_WITHOUT_EXTENSIONS)
if (wsi->ws->rx_draining_ext) {
lwsl_debug("%s: UNTIL_EXHAUSTED draining\n", __func__);
goto drain_extension;
}
#endif
/*
* if there's no protocol max frame size given, we are
* supposed to default to context->pt_serv_buf_size
*/
if (!wsi->protocol->rx_buffer_size &&
wsi->ws->rx_ubuf_head != wsi->context->pt_serv_buf_size)
break;
if (wsi->protocol->rx_buffer_size &&
wsi->ws->rx_ubuf_head != wsi->protocol->rx_buffer_size)
break;
/* spill because we filled our rx buffer */
spill:
/*
* is this frame a control packet we should take care of at this
* layer? If so service it and hide it from the user callback
*/
lwsl_parser("spill on %s\n", wsi->protocol->name);
switch (wsi->ws->opcode) {
case LWSWSOPC_CLOSE:
if (wsi->ws->peer_has_sent_close)
break;
wsi->ws->peer_has_sent_close = 1;
pp = (unsigned char *)&wsi->ws->rx_ubuf[LWS_PRE];
if (lws_check_opt(wsi->context->options,
LWS_SERVER_OPTION_VALIDATE_UTF8) &&
wsi->ws->rx_ubuf_head > 2 &&
lws_check_utf8(&wsi->ws->utf8, pp + 2,
wsi->ws->rx_ubuf_head - 2))
goto utf8_fail;
/* is this an acknowledgment of our close? */
if (lwsi_state(wsi) == LRS_AWAITING_CLOSE_ACK) {
/*
* fine he has told us he is closing too, let's
* finish our close
*/
lwsl_parser("seen client close ack\n");
return -1;
}
if (lwsi_state(wsi) == LRS_RETURNED_CLOSE)
/* if he sends us 2 CLOSE, kill him */
return -1;
if (lws_partial_buffered(wsi)) {
/*
* if we're in the middle of something,
* we can't do a normal close response and
* have to just close our end.
*/
wsi->socket_is_permanently_unusable = 1;
lwsl_parser("Closing on peer close due to Pending tx\n");
return -1;
}
if (wsi->ws->rx_ubuf_head >= 2) {
close_code = (pp[0] << 8) | pp[1];
if (close_code < 1000 ||
close_code == 1004 ||
close_code == 1005 ||
close_code == 1006 ||
close_code == 1012 ||
close_code == 1013 ||
close_code == 1014 ||
close_code == 1015 ||
(close_code >= 1016 && close_code < 3000)
) {
pp[0] = (LWS_CLOSE_STATUS_PROTOCOL_ERR >> 8) & 0xff;
pp[1] = LWS_CLOSE_STATUS_PROTOCOL_ERR & 0xff;
}
}
if (user_callback_handle_rxflow(
wsi->protocol->callback, wsi,
LWS_CALLBACK_WS_PEER_INITIATED_CLOSE,
wsi->user_space,
&wsi->ws->rx_ubuf[LWS_PRE],
wsi->ws->rx_ubuf_head))
return -1;
lwsl_parser("server sees client close packet\n");
lwsi_set_state(wsi, LRS_RETURNED_CLOSE);
/* deal with the close packet contents as a PONG */
wsi->ws->payload_is_close = 1;
goto process_as_ping;
case LWSWSOPC_PING:
lwsl_info("received %d byte ping, sending pong\n",
wsi->ws->rx_ubuf_head);
if (wsi->ws->ping_pending_flag) {
/*
* there is already a pending ping payload
* we should just log and drop
*/
lwsl_parser("DROP PING since one pending\n");
goto ping_drop;
}
process_as_ping:
/* control packets can only be < 128 bytes long */
if (wsi->ws->rx_ubuf_head > 128 - 3) {
lwsl_parser("DROP PING payload too large\n");
goto ping_drop;
}
/* stash the pong payload */
memcpy(wsi->ws->ping_payload_buf + LWS_PRE,
&wsi->ws->rx_ubuf[LWS_PRE],
wsi->ws->rx_ubuf_head);
wsi->ws->ping_payload_len = wsi->ws->rx_ubuf_head;
wsi->ws->ping_pending_flag = 1;
/* get it sent as soon as possible */
lws_callback_on_writable(wsi);
ping_drop:
wsi->ws->rx_ubuf_head = 0;
return 0;
case LWSWSOPC_PONG:
lwsl_info("received pong\n");
lwsl_hexdump(&wsi->ws->rx_ubuf[LWS_PRE],
wsi->ws->rx_ubuf_head);
if (wsi->pending_timeout ==
PENDING_TIMEOUT_WS_PONG_CHECK_GET_PONG) {
lwsl_info("received expected PONG on wsi %p\n",
wsi);
lws_set_timeout(wsi, NO_PENDING_TIMEOUT, 0);
}
/* issue it */
callback_action = LWS_CALLBACK_RECEIVE_PONG;
break;
case LWSWSOPC_TEXT_FRAME:
case LWSWSOPC_BINARY_FRAME:
case LWSWSOPC_CONTINUATION:
break;
default:
lwsl_parser("unknown opc %x\n", wsi->ws->opcode);
return -1;
}
/*
* No it's real payload, pass it up to the user callback.
* It's nicely buffered with the pre-padding taken care of
* so it can be sent straight out again using lws_write
*/
ebuf.token = &wsi->ws->rx_ubuf[LWS_PRE];
ebuf.len = wsi->ws->rx_ubuf_head;
if (wsi->ws->opcode == LWSWSOPC_PONG && !ebuf.len)
goto already_done;
#if !defined(LWS_WITHOUT_EXTENSIONS)
drain_extension:
#endif
// lwsl_notice("%s: passing %d to ext\n", __func__, ebuf.len);
if (lwsi_state(wsi) == LRS_RETURNED_CLOSE ||
lwsi_state(wsi) == LRS_AWAITING_CLOSE_ACK)
goto already_done;
#if !defined(LWS_WITHOUT_EXTENSIONS)
lin = ebuf.len;
//if (lin)
// lwsl_hexdump_notice(ebuf.token, ebuf.len);
n = lws_ext_cb_active(wsi, LWS_EXT_CB_PAYLOAD_RX, &ebuf, 0);
lwsl_debug("%s: ext says %d / ebuf.len %d\n", __func__, n, ebuf.len);
if (wsi->ws->rx_draining_ext)
already_processed &= ~ALREADY_PROCESSED_NO_CB;
#endif
/*
* ebuf may be pointing somewhere completely different now,
* it's the output
*/
#if !defined(LWS_WITHOUT_EXTENSIONS)
if (n < 0) {
/*
* we may rely on this to get RX, just drop connection
*/
wsi->socket_is_permanently_unusable = 1;
return -1;
}
#endif
if (
#if !defined(LWS_WITHOUT_EXTENSIONS)
rx_draining_ext &&
#endif
ebuf.len == 0)
goto already_done;
if (
#if !defined(LWS_WITHOUT_EXTENSIONS)
n &&
#endif
ebuf.len)
/* extension had more... main loop will come back */
lws_add_wsi_to_draining_ext_list(wsi);
else
lws_remove_wsi_from_draining_ext_list(wsi);
if (wsi->ws->check_utf8 && !wsi->ws->defeat_check_utf8) {
if (lws_check_utf8(&wsi->ws->utf8,
(unsigned char *)ebuf.token,
ebuf.len)) {
lws_close_reason(wsi,
LWS_CLOSE_STATUS_INVALID_PAYLOAD,
(uint8_t *)"bad utf8", 8);
goto utf8_fail;
}
/* we are ending partway through utf-8 character? */
if (!wsi->ws->rx_packet_length && wsi->ws->final &&
wsi->ws->utf8 && !n) {
lwsl_info("FINAL utf8 error\n");
lws_close_reason(wsi,
LWS_CLOSE_STATUS_INVALID_PAYLOAD,
(uint8_t *)"partial utf8", 12);
utf8_fail:
lwsl_notice("utf8 error\n");
lwsl_hexdump_notice(ebuf.token, ebuf.len);
return -1;
}
}
if (!wsi->wsistate_pre_close && (ebuf.len >= 0 ||
callback_action == LWS_CALLBACK_RECEIVE_PONG)) {
if (ebuf.len)
ebuf.token[ebuf.len] = '\0';
if (wsi->protocol->callback &&
!(already_processed & ALREADY_PROCESSED_NO_CB)) {
if (callback_action == LWS_CALLBACK_RECEIVE_PONG)
lwsl_info("Doing pong callback\n");
ret = user_callback_handle_rxflow(
wsi->protocol->callback,
wsi, (enum lws_callback_reasons)
callback_action,
wsi->user_space,
ebuf.token,
ebuf.len);
}
wsi->ws->first_fragment = 0;
}
#if !defined(LWS_WITHOUT_EXTENSIONS)
if (!lin)
break;
#endif
already_done:
wsi->ws->rx_ubuf_head = 0;
break;
}
return ret;
illegal_ctl_length:
lwsl_warn("Control frame with xtended length is illegal\n");
/* kill the connection */
return -1;
}
LWS_VISIBLE size_t
lws_remaining_packet_payload(struct lws *wsi)
{
return wsi->ws->rx_packet_length;
}
LWS_VISIBLE int lws_frame_is_binary(struct lws *wsi)
{
return wsi->ws->frame_is_binary;
}
void
lws_add_wsi_to_draining_ext_list(struct lws *wsi)
{
#if !defined(LWS_WITHOUT_EXTENSIONS)
struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
if (wsi->ws->rx_draining_ext)
return;
lwsl_debug("%s: RX EXT DRAINING: Adding to list\n", __func__);
wsi->ws->rx_draining_ext = 1;
wsi->ws->rx_draining_ext_list = pt->ws.rx_draining_ext_list;
pt->ws.rx_draining_ext_list = wsi;
#endif
}
void
lws_remove_wsi_from_draining_ext_list(struct lws *wsi)
{
#if !defined(LWS_WITHOUT_EXTENSIONS)
struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
struct lws **w = &pt->ws.rx_draining_ext_list;
if (!wsi->ws->rx_draining_ext)
return;
lwsl_debug("%s: RX EXT DRAINING: Removing from list\n", __func__);
wsi->ws->rx_draining_ext = 0;
/* remove us from context draining ext list */
while (*w) {
if (*w == wsi) {
/* if us, point it instead to who we were pointing to */
*w = wsi->ws->rx_draining_ext_list;
break;
}
w = &((*w)->ws->rx_draining_ext_list);
}
wsi->ws->rx_draining_ext_list = NULL;
#endif
}
LWS_EXTERN void
lws_restart_ws_ping_pong_timer(struct lws *wsi)
{
if (!wsi->context->ws_ping_pong_interval ||
!lwsi_role_ws(wsi))
return;
wsi->ws->time_next_ping_check = (time_t)lws_now_secs();
}
static int
lws_0405_frame_mask_generate(struct lws *wsi)
{
int n;
/* fetch the per-frame nonce */
n = lws_get_random(lws_get_context(wsi), wsi->ws->mask, 4);
if (n != 4) {
lwsl_parser("Unable to read from random device %s %d\n",
SYSTEM_RANDOM_FILEPATH, n);
return 1;
}
/* start masking from first byte of masking key buffer */
wsi->ws->mask_idx = 0;
return 0;
}
int
lws_server_init_wsi_for_ws(struct lws *wsi)
{
int n;
lwsi_set_state(wsi, LRS_ESTABLISHED);
lws_restart_ws_ping_pong_timer(wsi);
/*
* create the frame buffer for this connection according to the
* size mentioned in the protocol definition. If 0 there, use
* a big default for compatibility
*/
n = (int)wsi->protocol->rx_buffer_size;
if (!n)
n = wsi->context->pt_serv_buf_size;
n += LWS_PRE;
wsi->ws->rx_ubuf = lws_malloc(n + 4 /* 0x0000ffff zlib */, "rx_ubuf");
if (!wsi->ws->rx_ubuf) {
lwsl_err("Out of Mem allocating rx buffer %d\n", n);
return 1;
}
wsi->ws->rx_ubuf_alloc = n;
lwsl_debug("Allocating RX buffer %d\n", n);
#if !defined(LWS_WITH_ESP32)
if (!wsi->parent_carries_io &&
!wsi->h2_stream_carries_ws)
if (setsockopt(wsi->desc.sockfd, SOL_SOCKET, SO_SNDBUF,
(const char *)&n, sizeof n)) {
lwsl_warn("Failed to set SNDBUF to %d", n);
return 1;
}
#endif
/* notify user code that we're ready to roll */
if (wsi->protocol->callback)
if (wsi->protocol->callback(wsi, LWS_CALLBACK_ESTABLISHED,
wsi->user_space,
#ifdef LWS_WITH_TLS
wsi->tls.ssl,
#else
NULL,
#endif
wsi->h2_stream_carries_ws))
return 1;
lwsl_debug("ws established\n");
return 0;
}
LWS_VISIBLE int
lws_is_final_fragment(struct lws *wsi)
{
#if !defined(LWS_WITHOUT_EXTENSIONS)
lwsl_debug("%s: final %d, rx pk length %ld, draining %ld\n", __func__,
wsi->ws->final, (long)wsi->ws->rx_packet_length,
(long)wsi->ws->rx_draining_ext);
return wsi->ws->final && !wsi->ws->rx_packet_length &&
!wsi->ws->rx_draining_ext;
#else
return wsi->ws->final && !wsi->ws->rx_packet_length;
#endif
}
LWS_VISIBLE int
lws_is_first_fragment(struct lws *wsi)
{
return wsi->ws->first_fragment;
}
LWS_VISIBLE unsigned char
lws_get_reserved_bits(struct lws *wsi)
{
return wsi->ws->rsv;
}
LWS_VISIBLE LWS_EXTERN int
lws_get_close_length(struct lws *wsi)
{
return wsi->ws->close_in_ping_buffer_len;
}
LWS_VISIBLE LWS_EXTERN unsigned char *
lws_get_close_payload(struct lws *wsi)
{
return &wsi->ws->ping_payload_buf[LWS_PRE];
}
LWS_VISIBLE LWS_EXTERN void
lws_close_reason(struct lws *wsi, enum lws_close_status status,
unsigned char *buf, size_t len)
{
unsigned char *p, *start;
int budget = sizeof(wsi->ws->ping_payload_buf) - LWS_PRE;
assert(lwsi_role_ws(wsi));
start = p = &wsi->ws->ping_payload_buf[LWS_PRE];
*p++ = (((int)status) >> 8) & 0xff;
*p++ = ((int)status) & 0xff;
if (buf)
while (len-- && p < start + budget)
*p++ = *buf++;
wsi->ws->close_in_ping_buffer_len = lws_ptr_diff(p, start);
}
static int
lws_is_ws_with_ext(struct lws *wsi)
{
#if defined(LWS_WITHOUT_EXTENSIONS)
return 0;
#else
return lwsi_role_ws(wsi) && !!wsi->ws->count_act_ext;
#endif
}
static int
rops_handle_POLLIN_ws(struct lws_context_per_thread *pt, struct lws *wsi,
struct lws_pollfd *pollfd)
{
struct lws_tokens ebuf;
unsigned int pending = 0;
char buffered = 0;
int n = 0, m;
#if defined(LWS_WITH_HTTP2)
struct lws *wsi1;
#endif
if (!wsi->ws) {
lwsl_err("ws role wsi with no ws\n");
return 1;
}
// lwsl_notice("%s: %s\n", __func__, wsi->protocol->name);
//lwsl_info("%s: wsistate 0x%x, pollout %d\n", __func__,
// wsi->wsistate, pollfd->revents & LWS_POLLOUT);
/*
* something went wrong with parsing the handshake, and
* we ended up back in the event loop without completing it
*/
if (lwsi_state(wsi) == LRS_PRE_WS_SERVING_ACCEPT) {
wsi->socket_is_permanently_unusable = 1;
return LWS_HPI_RET_PLEASE_CLOSE_ME;
}
ebuf.token = NULL;
ebuf.len = 0;
if (lwsi_state(wsi) == LRS_WAITING_CONNECT) {
#if !defined(LWS_NO_CLIENT)
if ((pollfd->revents & LWS_POLLOUT) &&
lws_handle_POLLOUT_event(wsi, pollfd)) {
lwsl_debug("POLLOUT event closed it\n");
return LWS_HPI_RET_PLEASE_CLOSE_ME;
}
n = lws_client_socket_service(wsi, pollfd, NULL);
if (n)
return LWS_HPI_RET_WSI_ALREADY_DIED;
#endif
return LWS_HPI_RET_HANDLED;
}
//lwsl_notice("%s: wsi->ws->tx_draining_ext %d revents 0x%x 0x%x %d\n", __func__, wsi->ws->tx_draining_ext, pollfd->revents, wsi->wsistate, lwsi_state_can_handle_POLLOUT(wsi));
/* 1: something requested a callback when it was OK to write */
if ((pollfd->revents & LWS_POLLOUT) &&
lwsi_state_can_handle_POLLOUT(wsi) &&
lws_handle_POLLOUT_event(wsi, pollfd)) {
if (lwsi_state(wsi) == LRS_RETURNED_CLOSE)
lwsi_set_state(wsi, LRS_FLUSHING_BEFORE_CLOSE);
return LWS_HPI_RET_PLEASE_CLOSE_ME;
}
if (lwsi_state(wsi) == LRS_RETURNED_CLOSE ||
lwsi_state(wsi) == LRS_WAITING_TO_SEND_CLOSE) {
/*
* we stopped caring about anything except control
* packets. Force flow control off, defeat tx
* draining.
*/
lws_rx_flow_control(wsi, 1);
#if !defined(LWS_WITHOUT_EXTENSIONS)
if (wsi->ws)
wsi->ws->tx_draining_ext = 0;
#endif
}
#if !defined(LWS_WITHOUT_EXTENSIONS)
if (wsi->ws->tx_draining_ext)
/*
* We cannot deal with new RX until the TX ext path has
* been drained. It's because new rx will, eg, crap on
* the wsi rx buf that may be needed to retain state.
*
* TX ext drain path MUST go through event loop to avoid
* blocking.
*/
return LWS_HPI_RET_HANDLED;
#endif
if (lws_is_flowcontrolled(wsi)) {
/* We cannot deal with any kind of new RX because we are
* RX-flowcontrolled.
*/
lwsl_info("flowcontrolled\n");
return LWS_HPI_RET_HANDLED;
}
#if defined(LWS_WITH_HTTP2)
if (wsi->http2_substream || wsi->upgraded_to_http2) {
wsi1 = lws_get_network_wsi(wsi);
if (wsi1 && wsi1->trunc_len)
/* We cannot deal with any kind of new RX
* because we are dealing with a partial send
* (new RX may trigger new http_action() that
* expect to be able to send)
*/
return LWS_HPI_RET_HANDLED;
}
#endif
#if !defined(LWS_WITHOUT_EXTENSIONS)
/* 2: RX Extension needs to be drained
*/
if (wsi->ws->rx_draining_ext) {
lwsl_debug("%s: RX EXT DRAINING: Service\n", __func__);
#ifndef LWS_NO_CLIENT
if (lwsi_role_client(wsi)) {
n = lws_ws_client_rx_sm(wsi, 0);
if (n < 0)
/* we closed wsi */
return LWS_HPI_RET_PLEASE_CLOSE_ME;
} else
#endif
n = lws_ws_rx_sm(wsi, ALREADY_PROCESSED_IGNORE_CHAR, 0);
return LWS_HPI_RET_HANDLED;
}
if (wsi->ws->rx_draining_ext)
/*
* We have RX EXT content to drain, but can't do it
* right now. That means we cannot do anything lower
* priority either.
*/
return LWS_HPI_RET_HANDLED;
#endif
/* 3: buflist needs to be drained
*/
read:
//lws_buflist_describe(&wsi->buflist, wsi);
ebuf.len = (int)lws_buflist_next_segment_len(&wsi->buflist,
(uint8_t **)&ebuf.token);
if (ebuf.len) {
lwsl_info("draining buflist (len %d)\n", ebuf.len);
buffered = 1;
goto drain;
}
if (!(pollfd->revents & pollfd->events & LWS_POLLIN) && !wsi->http.ah)
return LWS_HPI_RET_HANDLED;
if (lws_is_flowcontrolled(wsi)) {
lwsl_info("%s: %p should be rxflow (bm 0x%x)..\n",
__func__, wsi, wsi->rxflow_bitmap);
return LWS_HPI_RET_HANDLED;
}
if (!(lwsi_role_client(wsi) &&
(lwsi_state(wsi) != LRS_ESTABLISHED &&
lwsi_state(wsi) != LRS_AWAITING_CLOSE_ACK &&
lwsi_state(wsi) != LRS_H2_WAITING_TO_SEND_HEADERS))) {
/*
* In case we are going to react to this rx by scheduling
* writes, we need to restrict the amount of rx to the size
* the protocol reported for rx buffer.
*
* Otherwise we get a situation we have to absorb possibly a
* lot of reads before we get a chance to drain them by writing
* them, eg, with echo type tests in autobahn.
*/
buffered = 0;
ebuf.token = (char *)pt->serv_buf;
if (lwsi_role_ws(wsi))
ebuf.len = wsi->ws->rx_ubuf_alloc;
else
ebuf.len = wsi->context->pt_serv_buf_size;
if ((unsigned int)ebuf.len > wsi->context->pt_serv_buf_size)
ebuf.len = wsi->context->pt_serv_buf_size;
if ((int)pending > ebuf.len)
pending = ebuf.len;
ebuf.len = lws_ssl_capable_read(wsi, (uint8_t *)ebuf.token,
pending ? (int)pending :
ebuf.len);
switch (ebuf.len) {
case 0:
lwsl_info("%s: zero length read\n",
__func__);
return LWS_HPI_RET_PLEASE_CLOSE_ME;
case LWS_SSL_CAPABLE_MORE_SERVICE:
lwsl_info("SSL Capable more service\n");
return LWS_HPI_RET_HANDLED;
case LWS_SSL_CAPABLE_ERROR:
lwsl_info("%s: LWS_SSL_CAPABLE_ERROR\n",
__func__);
return LWS_HPI_RET_PLEASE_CLOSE_ME;
}
// lwsl_notice("Actual RX %d\n", ebuf.len);
lws_restart_ws_ping_pong_timer(wsi);
/*
* coverity thinks ssl_capable_read() may read over
* 2GB. Dissuade it...
*/
ebuf.len &= 0x7fffffff;
}
drain:
/*
* give any active extensions a chance to munge the buffer
* before parse. We pass in a pointer to an lws_tokens struct
* prepared with the default buffer and content length that's in
* there. Rather than rewrite the default buffer, extensions
* that expect to grow the buffer can adapt .token to
* point to their own per-connection buffer in the extension
* user allocation. By default with no extensions or no
* extension callback handling, just the normal input buffer is
* used then so it is efficient.
*/
m = 0;
do {
/* service incoming data */
//lws_buflist_describe(&wsi->buflist, wsi);
if (ebuf.len) {
#if defined(LWS_ROLE_H2)
if (lwsi_role_h2(wsi) && lwsi_state(wsi) != LRS_BODY)
n = lws_read_h2(wsi, (unsigned char *)ebuf.token,
ebuf.len);
else
#endif
n = lws_read_h1(wsi, (unsigned char *)ebuf.token,
ebuf.len);
if (n < 0) {
/* we closed wsi */
n = 0;
return LWS_HPI_RET_WSI_ALREADY_DIED;
}
//lws_buflist_describe(&wsi->buflist, wsi);
//lwsl_notice("%s: consuming %d / %d\n", __func__, n, ebuf.len);
if (lws_buflist_aware_consume(wsi, &ebuf, n, buffered))
return LWS_HPI_RET_PLEASE_CLOSE_ME;
}
ebuf.token = NULL;
ebuf.len = 0;
} while (m);
if (wsi->http.ah
#if !defined(LWS_NO_CLIENT)
&& !wsi->client_h2_alpn
#endif
) {
lwsl_info("%s: %p: detaching ah\n", __func__, wsi);
lws_header_table_detach(wsi, 0);
}
pending = lws_ssl_pending(wsi);
if (pending) {
if (lws_is_ws_with_ext(wsi))
pending = pending > wsi->ws->rx_ubuf_alloc ?
wsi->ws->rx_ubuf_alloc : pending;
else
pending = pending > wsi->context->pt_serv_buf_size ?
wsi->context->pt_serv_buf_size : pending;
goto read;
}
if (buffered && /* were draining, now nothing left */
!lws_buflist_next_segment_len(&wsi->buflist, NULL)) {
lwsl_info("%s: %p flow buf: drained\n", __func__, wsi);
/* having drained the rxflow buffer, can rearm POLLIN */
#ifdef LWS_NO_SERVER
n =
#endif
__lws_rx_flow_control(wsi);
/* n ignored, needed for NO_SERVER case */
}
/* n = 0 */
return LWS_HPI_RET_HANDLED;
}
int rops_handle_POLLOUT_ws(struct lws *wsi)
{
int write_type = LWS_WRITE_PONG;
#if !defined(LWS_WITHOUT_EXTENSIONS)
struct lws_tokens ebuf;
int ret, m;
#endif
int n;
#if !defined(LWS_WITHOUT_EXTENSIONS)
lwsl_debug("%s: %s: wsi->ws->tx_draining_ext %d\n", __func__,
wsi->protocol->name, wsi->ws->tx_draining_ext);
#endif
/* Priority 3: pending control packets (pong or close)
*
* 3a: close notification packet requested from close api
*/
if (lwsi_state(wsi) == LRS_WAITING_TO_SEND_CLOSE) {
lwsl_debug("sending close packet\n");
lwsl_hexdump_debug(&wsi->ws->ping_payload_buf[LWS_PRE],
wsi->ws->close_in_ping_buffer_len);
wsi->waiting_to_send_close_frame = 0;
n = lws_write(wsi, &wsi->ws->ping_payload_buf[LWS_PRE],
wsi->ws->close_in_ping_buffer_len,
LWS_WRITE_CLOSE);
if (n >= 0) {
if (wsi->close_needs_ack) {
lwsi_set_state(wsi, LRS_AWAITING_CLOSE_ACK);
lws_set_timeout(wsi, PENDING_TIMEOUT_CLOSE_ACK, 5);
lwsl_debug("sent close indication, awaiting ack\n");
return LWS_HP_RET_BAIL_OK;
}
wsi->close_needs_ack = 0;
lwsi_set_state(wsi, LRS_RETURNED_CLOSE);
}
return LWS_HP_RET_BAIL_DIE;
}
/* else, the send failed and we should just hang up */
if ((lwsi_role_ws(wsi) && wsi->ws->ping_pending_flag) ||
(lwsi_state(wsi) == LRS_RETURNED_CLOSE &&
wsi->ws->payload_is_close)) {
if (wsi->ws->payload_is_close)
write_type = LWS_WRITE_CLOSE;
else {
if (wsi->wsistate_pre_close) {
/* we started close flow, forget pong */
wsi->ws->ping_pending_flag = 0;
return LWS_HP_RET_BAIL_OK;
}
lwsl_info("issuing pong %d on wsi %p\n", wsi->ws->ping_payload_len, wsi);
}
n = lws_write(wsi, &wsi->ws->ping_payload_buf[LWS_PRE],
wsi->ws->ping_payload_len, write_type);
if (n < 0)
return LWS_HP_RET_BAIL_DIE;
/* well he is sent, mark him done */
wsi->ws->ping_pending_flag = 0;
if (wsi->ws->payload_is_close) {
// assert(0);
/* oh... a close frame was it... then we are done */
return LWS_HP_RET_BAIL_DIE;
}
/* otherwise for PING, leave POLLOUT active either way */
return LWS_HP_RET_BAIL_OK;
}
if (!wsi->socket_is_permanently_unusable && wsi->ws->send_check_ping) {
lwsl_info("issuing ping on wsi %p\n", wsi);
wsi->ws->send_check_ping = 0;
n = lws_write(wsi, &wsi->ws->ping_payload_buf[LWS_PRE],
0, LWS_WRITE_PING);
if (n < 0)
return LWS_HP_RET_BAIL_DIE;
/*
* we apparently were able to send the PING in a reasonable time
* now reset the clock on our peer to be able to send the
* PONG in a reasonable time.
*/
lws_set_timeout(wsi, PENDING_TIMEOUT_WS_PONG_CHECK_GET_PONG,
wsi->context->timeout_secs);
return LWS_HP_RET_BAIL_OK;
}
/* Priority 4: if we are closing, not allowed to send more data frags
* which means user callback or tx ext flush banned now
*/
if (lwsi_state(wsi) == LRS_RETURNED_CLOSE)
return LWS_HP_RET_USER_SERVICE;
#if !defined(LWS_WITHOUT_EXTENSIONS)
/* Priority 5: Tx path extension with more to send
*
* These are handled as new fragments each time around
* So while we must block new writeable callback to enforce
* payload ordering, but since they are always complete
* fragments control packets can interleave OK.
*/
if (wsi->ws->tx_draining_ext) {
lwsl_ext("SERVICING TX EXT DRAINING\n");
if (lws_write(wsi, NULL, 0, LWS_WRITE_CONTINUATION) < 0)
return LWS_HP_RET_BAIL_DIE;
/* leave POLLOUT active */
return LWS_HP_RET_BAIL_OK;
}
/* Priority 6: extensions
*/
if (!wsi->ws->extension_data_pending && !wsi->ws->tx_draining_ext) {
lwsl_ext("%s: !wsi->ws->extension_data_pending\n", __func__);
return LWS_HP_RET_USER_SERVICE;
}
/*
* check in on the active extensions, see if they
* had pending stuff to spill... they need to get the
* first look-in otherwise sequence will be disordered
*
* NULL, zero-length ebuf means just spill pending
*/
ret = 1;
if (wsi->role_ops == &role_ops_raw_skt ||
wsi->role_ops == &role_ops_raw_file)
ret = 0;
while (ret == 1) {
/* default to nobody has more to spill */
ret = 0;
ebuf.token = NULL;
ebuf.len = 0;
/* give every extension a chance to spill */
m = lws_ext_cb_active(wsi, LWS_EXT_CB_PACKET_TX_PRESEND,
&ebuf, 0);
if (m < 0) {
lwsl_err("ext reports fatal error\n");
return LWS_HP_RET_BAIL_DIE;
}
if (m)
/*
* at least one extension told us he has more
* to spill, so we will go around again after
*/
ret = 1;
/* assuming they gave us something to send, send it */
if (ebuf.len) {
n = lws_issue_raw(wsi, (unsigned char *)ebuf.token,
ebuf.len);
if (n < 0) {
lwsl_info("closing from POLLOUT spill\n");
return LWS_HP_RET_BAIL_DIE;
}
/*
* Keep amount spilled small to minimize chance of this
*/
if (n != ebuf.len) {
lwsl_err("Unable to spill ext %d vs %d\n",
ebuf.len, n);
return LWS_HP_RET_BAIL_DIE;
}
} else
continue;
/* no extension has more to spill */
if (!ret)
continue;
/*
* There's more to spill from an extension, but we just sent
* something... did that leave the pipe choked?
*/
if (!lws_send_pipe_choked(wsi))
/* no we could add more */
continue;
lwsl_info("choked in POLLOUT service\n");
/*
* Yes, he's choked. Leave the POLLOUT masked on so we will
* come back here when he is unchoked. Don't call the user
* callback to enforce ordering of spilling, he'll get called
* when we come back here and there's nothing more to spill.
*/
return LWS_HP_RET_BAIL_OK;
}
wsi->ws->extension_data_pending = 0;
#endif
return LWS_HP_RET_USER_SERVICE;
}
static int
rops_periodic_checks_ws(struct lws_context *context, int tsi, time_t now)
{
struct lws_vhost *vh;
if (!context->ws_ping_pong_interval ||
context->last_ws_ping_pong_check_s >= now + 10)
return 0;
vh = context->vhost_list;
context->last_ws_ping_pong_check_s = now;
while (vh) {
int n;
lws_vhost_lock(vh);
for (n = 0; n < vh->count_protocols; n++) {
struct lws *wsi = vh->same_vh_protocol_list[n];
while (wsi) {
if (lwsi_role_ws(wsi) &&
!wsi->socket_is_permanently_unusable &&
!wsi->ws->send_check_ping &&
wsi->ws->time_next_ping_check &&
lws_compare_time_t(context, now,
wsi->ws->time_next_ping_check) >
context->ws_ping_pong_interval) {
lwsl_info("req pp on wsi %p\n", wsi);
wsi->ws->send_check_ping = 1;
lws_set_timeout(wsi,
PENDING_TIMEOUT_WS_PONG_CHECK_SEND_PING,
context->timeout_secs);
lws_callback_on_writable(wsi);
wsi->ws->time_next_ping_check = now;
}
wsi = wsi->same_vh_protocol_next;
}
}
lws_vhost_unlock(vh);
vh = vh->vhost_next;
}
return 0;
}
static int
rops_service_flag_pending_ws(struct lws_context *context, int tsi)
{
#if !defined(LWS_WITHOUT_EXTENSIONS)
struct lws_context_per_thread *pt = &context->pt[tsi];
struct lws *wsi;
int forced = 0;
/* POLLIN faking (the pt lock is taken by the parent) */
/*
* 1) For all guys with already-available ext data to drain, if they are
* not flowcontrolled, fake their POLLIN status
*/
wsi = pt->ws.rx_draining_ext_list;
while (wsi && wsi->position_in_fds_table != LWS_NO_FDS_POS) {
pt->fds[wsi->position_in_fds_table].revents |=
pt->fds[wsi->position_in_fds_table].events & LWS_POLLIN;
if (pt->fds[wsi->position_in_fds_table].revents & LWS_POLLIN)
forced = 1;
wsi = wsi->ws->rx_draining_ext_list;
}
return forced;
#else
return 0;
#endif
}
static int
rops_close_via_role_protocol_ws(struct lws *wsi, enum lws_close_status reason)
{
if (!wsi->ws)
return 0;
if (!wsi->ws->close_in_ping_buffer_len && /* already a reason */
(reason == LWS_CLOSE_STATUS_NOSTATUS ||
reason == LWS_CLOSE_STATUS_NOSTATUS_CONTEXT_DESTROY))
return 0;
lwsl_debug("%s: sending close indication...\n", __func__);
/* if no prepared close reason, use 1000 and no aux data */
if (!wsi->ws->close_in_ping_buffer_len) {
wsi->ws->close_in_ping_buffer_len = 2;
wsi->ws->ping_payload_buf[LWS_PRE] = (reason >> 8) & 0xff;
wsi->ws->ping_payload_buf[LWS_PRE + 1] = reason & 0xff;
}
wsi->waiting_to_send_close_frame = 1;
wsi->close_needs_ack = 1;
lwsi_set_state(wsi, LRS_WAITING_TO_SEND_CLOSE);
__lws_set_timeout(wsi, PENDING_TIMEOUT_CLOSE_SEND, 5);
lws_callback_on_writable(wsi);
return 1;
}
static int
rops_close_role_ws(struct lws_context_per_thread *pt, struct lws *wsi)
{
#if !defined(LWS_WITHOUT_EXTENSIONS)
if (wsi->ws->rx_draining_ext) {
struct lws **w = &pt->ws.rx_draining_ext_list;
wsi->ws->rx_draining_ext = 0;
/* remove us from context draining ext list */
while (*w) {
if (*w == wsi) {
*w = wsi->ws->rx_draining_ext_list;
break;
}
w = &((*w)->ws->rx_draining_ext_list);
}
wsi->ws->rx_draining_ext_list = NULL;
}
if (wsi->ws->tx_draining_ext) {
struct lws **w = &pt->ws.tx_draining_ext_list;
lwsl_ext("%s: CLEARING tx_draining_ext\n", __func__);
wsi->ws->tx_draining_ext = 0;
/* remove us from context draining ext list */
while (*w) {
if (*w == wsi) {
*w = wsi->ws->tx_draining_ext_list;
break;
}
w = &((*w)->ws->tx_draining_ext_list);
}
wsi->ws->tx_draining_ext_list = NULL;
}
#endif
lws_free_set_NULL(wsi->ws->rx_ubuf);
if (wsi->trunc_alloc)
/* not going to be completed... nuke it */
lws_free_set_NULL(wsi->trunc_alloc);
wsi->ws->ping_payload_len = 0;
wsi->ws->ping_pending_flag = 0;
/* deallocate any active extension contexts */
if (lws_ext_cb_active(wsi, LWS_EXT_CB_DESTROY, NULL, 0) < 0)
lwsl_warn("extension destruction failed\n");
return 0;
}
static int
rops_write_role_protocol_ws(struct lws *wsi, unsigned char *buf, size_t len,
enum lws_write_protocol *wp)
{
#if !defined(LWS_WITHOUT_EXTENSIONS)
struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
enum lws_write_protocol wpt;
#endif
int masked7 = lwsi_role_client(wsi);
unsigned char is_masked_bit = 0;
unsigned char *dropmask = NULL;
struct lws_tokens ebuf;
size_t orig_len = len;
int pre = 0, n = 0;
// lwsl_err("%s: wp 0x%x len %d\n", __func__, *wp, (int)len);
#if !defined(LWS_WITHOUT_EXTENSIONS)
if (wsi->ws->tx_draining_ext) {
/* remove us from the list */
struct lws **w = &pt->ws.tx_draining_ext_list;
lwsl_ext("%s: CLEARING tx_draining_ext\n", __func__);
wsi->ws->tx_draining_ext = 0;
/* remove us from context draining ext list */
while (*w) {
if (*w == wsi) {
*w = wsi->ws->tx_draining_ext_list;
break;
}
w = &((*w)->ws->tx_draining_ext_list);
}
wsi->ws->tx_draining_ext_list = NULL;
wpt = *wp;
*wp = (wsi->ws->tx_draining_stashed_wp & 0xc0)|
LWS_WRITE_CONTINUATION;
/*
* When we are just flushing (len == 0), we can trust the
* stashed wp info completely. Otherwise adjust it to the
* FIN status of the incoming packet.
*/
if (!(wpt & LWS_WRITE_NO_FIN) && len)
*wp &= ~LWS_WRITE_NO_FIN;
lwsl_ext("FORCED draining wp to 0x%02X (stashed 0x%02X, incoming 0x%02X)\n", *wp,
wsi->ws->tx_draining_stashed_wp, wpt);
// assert(0);
}
#endif
lws_restart_ws_ping_pong_timer(wsi);
if (((*wp) & 0x1f) == LWS_WRITE_HTTP ||
((*wp) & 0x1f) == LWS_WRITE_HTTP_FINAL ||
((*wp) & 0x1f) == LWS_WRITE_HTTP_HEADERS_CONTINUATION ||
((*wp) & 0x1f) == LWS_WRITE_HTTP_HEADERS)
goto send_raw;
/* if we are continuing a frame that already had its header done */
if (wsi->ws->inside_frame) {
lwsl_debug("INSIDE FRAME\n");
goto do_more_inside_frame;
}
wsi->ws->clean_buffer = 1;
/*
* give a chance to the extensions to modify payload
* the extension may decide to produce unlimited payload erratically
* (eg, compression extension), so we require only that if he produces
* something, it will be a complete fragment of the length known at
* the time (just the fragment length known), and if he has
* more we will come back next time he is writeable and allow him to
* produce more fragments until he's drained.
*
* This allows what is sent each time it is writeable to be limited to
* a size that can be sent without partial sends or blocking, allows
* interleaving of control frames and other connection service.
*/
ebuf.token = (char *)buf;
ebuf.len = (int)len;
switch ((int)*wp) {
case LWS_WRITE_PING:
case LWS_WRITE_PONG:
case LWS_WRITE_CLOSE:
break;
default:
#if !defined(LWS_WITHOUT_EXTENSIONS)
// lwsl_notice("LWS_EXT_CB_PAYLOAD_TX\n");
// m = (int)ebuf.len;
/* returns 0 if no more tx pending, 1 if more pending */
n = lws_ext_cb_active(wsi, LWS_EXT_CB_PAYLOAD_TX, &ebuf, *wp);
if (n < 0)
return -1;
// lwsl_notice("ext processed %d plaintext into %d compressed (wp 0x%x)\n", m, (int)ebuf.len, *wp);
if (n && ebuf.len) {
lwsl_ext("write drain len %d (wp 0x%x) SETTING tx_draining_ext\n", (int)ebuf.len, *wp);
/* extension requires further draining */
wsi->ws->tx_draining_ext = 1;
wsi->ws->tx_draining_ext_list = pt->ws.tx_draining_ext_list;
pt->ws.tx_draining_ext_list = wsi;
/* we must come back to do more */
lws_callback_on_writable(wsi);
/*
* keep a copy of the write type for the overall
* action that has provoked generation of these
* fragments, so the last guy can use its FIN state.
*/
wsi->ws->tx_draining_stashed_wp = *wp;
/* this is definitely not actually the last fragment
* because the extension asserted he has more coming
* So make sure this intermediate one doesn't go out
* with a FIN.
*/
*wp |= LWS_WRITE_NO_FIN;
}
#endif
if (ebuf.len && wsi->ws->stashed_write_pending) {
wsi->ws->stashed_write_pending = 0;
*wp = ((*wp) & 0xc0) | (int)wsi->ws->stashed_write_type;
}
}
/*
* an extension did something we need to keep... for example, if
* compression extension, it has already updated its state according
* to this being issued
*/
if ((char *)buf != ebuf.token) {
/*
* ext might eat it, but not have anything to issue yet.
* In that case we have to follow his lead, but stash and
* replace the write type that was lost here the first time.
*/
if (len && !ebuf.len) {
if (!wsi->ws->stashed_write_pending)
wsi->ws->stashed_write_type = (char)(*wp) & 0x3f;
wsi->ws->stashed_write_pending = 1;
return (int)len;
}
/*
* extension recreated it:
* need to buffer this if not all sent
*/
wsi->ws->clean_buffer = 0;
}
buf = (unsigned char *)ebuf.token;
len = ebuf.len;
if (!buf) {
lwsl_err("null buf (%d)\n", (int)len);
return -1;
}
switch (wsi->ws->ietf_spec_revision) {
case 13:
if (masked7) {
pre += 4;
dropmask = &buf[0 - pre];
is_masked_bit = 0x80;
}
switch ((*wp) & 0xf) {
case LWS_WRITE_TEXT:
n = LWSWSOPC_TEXT_FRAME;
break;
case LWS_WRITE_BINARY:
n = LWSWSOPC_BINARY_FRAME;
break;
case LWS_WRITE_CONTINUATION:
n = LWSWSOPC_CONTINUATION;
break;
case LWS_WRITE_CLOSE:
n = LWSWSOPC_CLOSE;
break;
case LWS_WRITE_PING:
n = LWSWSOPC_PING;
break;
case LWS_WRITE_PONG:
n = LWSWSOPC_PONG;
break;
default:
lwsl_warn("lws_write: unknown write opc / wp\n");
return -1;
}
if (!((*wp) & LWS_WRITE_NO_FIN))
n |= 1 << 7;
if (len < 126) {
pre += 2;
buf[-pre] = n;
buf[-pre + 1] = (unsigned char)(len | is_masked_bit);
} else {
if (len < 65536) {
pre += 4;
buf[-pre] = n;
buf[-pre + 1] = 126 | is_masked_bit;
buf[-pre + 2] = (unsigned char)(len >> 8);
buf[-pre + 3] = (unsigned char)len;
} else {
pre += 10;
buf[-pre] = n;
buf[-pre + 1] = 127 | is_masked_bit;
#if defined __LP64__
buf[-pre + 2] = (len >> 56) & 0x7f;
buf[-pre + 3] = len >> 48;
buf[-pre + 4] = len >> 40;
buf[-pre + 5] = len >> 32;
#else
buf[-pre + 2] = 0;
buf[-pre + 3] = 0;
buf[-pre + 4] = 0;
buf[-pre + 5] = 0;
#endif
buf[-pre + 6] = (unsigned char)(len >> 24);
buf[-pre + 7] = (unsigned char)(len >> 16);
buf[-pre + 8] = (unsigned char)(len >> 8);
buf[-pre + 9] = (unsigned char)len;
}
}
break;
}
do_more_inside_frame:
/*
* Deal with masking if we are in client -> server direction and
* the wp demands it
*/
if (masked7) {
if (!wsi->ws->inside_frame)
if (lws_0405_frame_mask_generate(wsi)) {
lwsl_err("frame mask generation failed\n");
return -1;
}
/*
* in v7, just mask the payload
*/
if (dropmask) { /* never set if already inside frame */
for (n = 4; n < (int)len + 4; n++)
dropmask[n] = dropmask[n] ^ wsi->ws->mask[
(wsi->ws->mask_idx++) & 3];
/* copy the frame nonce into place */
memcpy(dropmask, wsi->ws->mask, 4);
}
}
if (lwsi_role_h2_ENCAPSULATION(wsi)) {
struct lws *encap = lws_get_network_wsi(wsi);
assert(encap != wsi);
return encap->role_ops->write_role_protocol(wsi, buf - pre,
len + pre, wp);
}
switch ((*wp) & 0x1f) {
case LWS_WRITE_TEXT:
case LWS_WRITE_BINARY:
case LWS_WRITE_CONTINUATION:
if (!wsi->h2_stream_carries_ws) {
/*
* give any active extensions a chance to munge the
* buffer before send. We pass in a pointer to an
* lws_tokens struct prepared with the default buffer
* and content length that's in there. Rather than
* rewrite the default buffer, extensions that expect
* to grow the buffer can adapt .token to point to their
* own per-connection buffer in the extension user
* allocation. By default with no extensions or no
* extension callback handling, just the normal input
* buffer is used then so it is efficient.
*
* callback returns 1 in case it wants to spill more
* buffers
*
* This takes care of holding the buffer if send is
* incomplete, ie, if wsi->ws->clean_buffer is 0
* (meaning an extension meddled with the buffer). If
* wsi->ws->clean_buffer is 1, it will instead return
* to the user code how much OF THE USER BUFFER was
* consumed.
*/
n = lws_issue_raw_ext_access(wsi, buf - pre, len + pre);
wsi->ws->inside_frame = 1;
if (n <= 0)
return n;
if (n == (int)len + pre) {
/* everything in the buffer was handled
* (or rebuffered...) */
wsi->ws->inside_frame = 0;
return (int)orig_len;
}
/*
* it is how many bytes of user buffer got sent... may
* be < orig_len in which case callback when writable
* has already been arranged and user code can call
* lws_write() again with the rest later.
*/
return n - pre;
}
break;
default:
break;
}
send_raw:
return lws_issue_raw(wsi, (unsigned char *)buf - pre, len + pre);
}
static int
rops_close_kill_connection_ws(struct lws *wsi, enum lws_close_status reason)
{
/* deal with ws encapsulation in h2 */
#if defined(LWS_WITH_HTTP2)
if (wsi->http2_substream && wsi->h2_stream_carries_ws)
return role_ops_h2.close_kill_connection(wsi, reason);
return 0;
#else
return 0;
#endif
}
static int
rops_callback_on_writable_ws(struct lws *wsi)
{
#if defined(LWS_WITH_HTTP2)
if (lwsi_role_h2_ENCAPSULATION(wsi)) {
/* we know then that it has an h2 parent */
struct lws *enc = role_ops_h2.encapsulation_parent(wsi);
assert(enc);
if (enc->role_ops->callback_on_writable(wsi))
return 1;
}
#endif
return 0;
}
static int
rops_init_vhost_ws(struct lws_vhost *vh,
const struct lws_context_creation_info *info)
{
#if !defined(LWS_WITHOUT_EXTENSIONS)
#ifdef LWS_WITH_PLUGINS
struct lws_plugin *plugin = vh->context->plugin_list;
int m;
if (vh->context->plugin_extension_count) {
m = 0;
while (info->extensions && info->extensions[m].callback)
m++;
/*
* give the vhost a unified list of extensions including the
* ones that came from plugins
*/
vh->ws.extensions = lws_zalloc(sizeof(struct lws_extension) *
(m + vh->context->plugin_extension_count + 1),
"extensions");
if (!vh->ws.extensions)
return 1;
memcpy((struct lws_extension *)vh->ws.extensions, info->extensions,
sizeof(struct lws_extension) * m);
plugin = vh->context->plugin_list;
while (plugin) {
memcpy((struct lws_extension *)&vh->ws.extensions[m],
plugin->caps.extensions,
sizeof(struct lws_extension) *
plugin->caps.count_extensions);
m += plugin->caps.count_extensions;
plugin = plugin->list;
}
} else
#endif
vh->ws.extensions = info->extensions;
#endif
return 0;
}
static int
rops_destroy_vhost_ws(struct lws_vhost *vh)
{
#ifdef LWS_WITH_PLUGINS
#if !defined(LWS_WITHOUT_EXTENSIONS)
if (vh->context->plugin_extension_count)
lws_free((void *)vh->ws.extensions);
#endif
#endif
return 0;
}
static int
rops_destroy_role_ws(struct lws *wsi)
{
lws_free_set_NULL(wsi->ws);
return 0;
}
struct lws_role_ops role_ops_ws = {
/* role name */ "ws",
/* alpn id */ NULL,
/* check_upgrades */ NULL,
/* init_context */ NULL,
/* init_vhost */ rops_init_vhost_ws,
/* destroy_vhost */ rops_destroy_vhost_ws,
/* periodic_checks */ rops_periodic_checks_ws,
/* service_flag_pending */ rops_service_flag_pending_ws,
/* handle_POLLIN */ rops_handle_POLLIN_ws,
/* handle_POLLOUT */ rops_handle_POLLOUT_ws,
/* perform_user_POLLOUT */ NULL,
/* callback_on_writable */ rops_callback_on_writable_ws,
/* tx_credit */ NULL,
/* write_role_protocol */ rops_write_role_protocol_ws,
/* encapsulation_parent */ NULL,
/* alpn_negotiated */ NULL,
/* close_via_role_protocol */ rops_close_via_role_protocol_ws,
/* close_role */ rops_close_role_ws,
/* close_kill_connection */ rops_close_kill_connection_ws,
/* destroy_role */ rops_destroy_role_ws,
/* writeable cb clnt, srv */ { LWS_CALLBACK_CLIENT_WRITEABLE,
LWS_CALLBACK_SERVER_WRITEABLE },
/* close cb clnt, srv */ { LWS_CALLBACK_CLIENT_CLOSED,
LWS_CALLBACK_CLOSED },
/* file handles */ 0
};