Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1429] A failed CONNACK should be sent, when a CONNECT packet exceed the max packet size #170

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
import com.hivemq.codec.decoder.mqtt.MqttDecoders;
import com.hivemq.configuration.service.MqttConfigurationService;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.extension.sdk.api.annotations.Nullable;
import com.hivemq.metrics.handler.GlobalMQTTMessageCounter;
import com.hivemq.mqtt.handler.connack.MqttConnacker;
import com.hivemq.mqtt.handler.disconnect.MqttServerDisconnector;
import com.hivemq.mqtt.message.Message;
import com.hivemq.mqtt.message.MessageType;
import com.hivemq.mqtt.message.ProtocolVersion;
import com.hivemq.mqtt.message.mqtt5.Mqtt5UserProperties;
import com.hivemq.mqtt.message.reason.Mqtt5ConnAckReasonCode;
import com.hivemq.mqtt.message.reason.Mqtt5DisconnectReasonCode;
import com.hivemq.util.ReasonStrings;
import io.netty.buffer.ByteBuf;
Expand All @@ -48,18 +51,21 @@ public class MQTTMessageDecoder extends ByteToMessageDecoder {
private static final int MIN_FIXED_HEADER_LENGTH = 2;

private final @NotNull MqttConnectDecoder connectDecoder;
private final @NotNull MqttConnacker mqttConnacker;
private final @NotNull MqttConfigurationService mqttConfig;
private final @NotNull MqttDecoders mqttDecoders;
private final @NotNull MqttServerDisconnector mqttServerDisconnector;
private final @NotNull GlobalMQTTMessageCounter globalMQTTMessageCounter;

public MQTTMessageDecoder(
final @NotNull MqttConnectDecoder connectDecoder,
final @NotNull MqttConnacker mqttConnacker,
final @NotNull MqttConfigurationService mqttConfig,
final @NotNull MqttDecoders mqttDecoders,
final @NotNull MqttServerDisconnector mqttServerDisconnector,
final @NotNull GlobalMQTTMessageCounter globalMQTTMessageCounter) {
this.connectDecoder = connectDecoder;
this.mqttConnacker = mqttConnacker;
this.mqttConfig = mqttConfig;
this.mqttDecoders = mqttDecoders;
this.mqttServerDisconnector = mqttServerDisconnector;
Expand All @@ -68,6 +74,7 @@ public MQTTMessageDecoder(

public MQTTMessageDecoder(final ChannelDependencies channelDependencies) {
this(channelDependencies.getMqttConnectDecoder(),
channelDependencies.getMqttConnacker(),
channelDependencies.getConfigurationService().mqttConfiguration(),
channelDependencies.getMqttDecoders(),
channelDependencies.getMqttServerDisconnector(),
Expand Down Expand Up @@ -115,10 +122,80 @@ protected void decode(
}

final int fixedHeaderSize = getFixedHeaderSize(remainingLength);
final int packetSize = fixedHeaderSize + remainingLength;

final MessageType messageType = getMessageType(fixedHeader);
final Message message;
if (messageType == CONNECT) {
message = handleConnect(buf, clientConnection, fixedHeader, packetSize, remainingLength);
} else {
message =
handleMessage(buf, clientConnection, fixedHeader, messageType, packetSize, remainingLength);
}
if (message == null) {
buf.clear();
return;
}
globalMQTTMessageCounter.countInbound(message);
out.add(message);
}

private @Nullable Message handleConnect(
final @NotNull ByteBuf buf,
final @NotNull ClientConnection clientConnection,
final byte fixedHeader,
final int packetSize,
final int remainingLength) {

//this is the message size HiveMQ allows for incoming messages
if (packetSize > mqttConfig.maxPacketSize()) {
connectDecoder.decodeProtocolVersion(clientConnection, buf);
mqttConnacker.connackError(clientConnection.getChannel(),
"A client (IP: {}) connect packet exceeded the maximum permissible size.",
"Sent CONNECT exceeded the maximum permissible size",
Mqtt5ConnAckReasonCode.PACKET_TOO_LARGE,
ReasonStrings.CONNACK_PACKET_TOO_LARGE);

return null;
}

// Check if the client is already connected
if (clientConnection.getProtocolVersion() != null) {
mqttServerDisconnector.disconnect(clientConnection.getChannel(),
"A client (IP: {}) sent second CONNECT message. This is not allowed. Disconnecting client.",
"Sent second CONNECT message",
null,
null,
Mqtt5UserProperties.NO_USER_PROPERTIES,
false,
true
// as we don't know if a CONNACK was already sent we can only close the channel here
);
return null;
}

globalMQTTMessageCounter.countInboundTraffic(packetSize);

//We're slicing the buffer to the exact MQTT message size so we don't have to pass the actual length around
final ByteBuf messageBuffer = buf.readSlice(remainingLength);
//We mark the end of the message
buf.markReaderIndex();

return connectDecoder.decode(clientConnection, messageBuffer, fixedHeader);
}

private @Nullable Message handleMessage(
final @NotNull ByteBuf buf,
final @NotNull ClientConnection clientConnection,
final byte fixedHeader,
final @NotNull MessageType messageType,
final int packetSize,
final int remainingLength) {

final ProtocolVersion protocolVersion = clientConnection.getProtocolVersion();

//this is the message size HiveMQ allows for incoming messages
if (remainingLength + fixedHeaderSize > mqttConfig.maxPacketSize()) {
if (packetSize > mqttConfig.maxPacketSize()) {

//force channel close for Mqtt3.1, Mqtt3.1.1 and null (before connect)
final boolean forceClose = protocolVersion != ProtocolVersion.MQTTv5;
Expand All @@ -130,112 +207,80 @@ protected void decode(
Mqtt5UserProperties.NO_USER_PROPERTIES,
false,
forceClose);
buf.clear();
return;
return null;
}

final Message message;
//We're slicing the buffer to the exact MQTT message size so we don't have to pass the actual length around
final ByteBuf messageBuffer = buf.readSlice(remainingLength);
//We mark the end of the message
buf.markReaderIndex();

final MessageType messageType = getMessageType(fixedHeader);

if (protocolVersion == null && messageType != CONNECT) {
// Check if client is connected
if (protocolVersion == null) {
mqttServerDisconnector.logAndClose(clientConnection.getChannel(),
"A client (IP: {}) sent other message before CONNECT. Disconnecting client.",
"Sent other message before CONNECT");
buf.clear();
return;
return null;
}

if (protocolVersion != null && messageType == CONNECT) {
mqttServerDisconnector.logAndClose(clientConnection.getChannel(),
"A client (IP: {}) sent second CONNECT message. This is not allowed. Disconnecting client.",
"Sent second CONNECT message");
buf.clear();
return;
}
globalMQTTMessageCounter.countInboundTraffic(packetSize);

globalMQTTMessageCounter.countInboundTraffic(readableBytes);
//We're slicing the buffer to the exact MQTT message size so we don't have to pass the actual length around
final ByteBuf messageBuffer = buf.readSlice(remainingLength);
//We mark the end of the message
buf.markReaderIndex();

if (messageType == CONNECT) {
message = connectDecoder.decode(clientConnection, messageBuffer, fixedHeader);
final MqttDecoder<?> decoder = mqttDecoders.decoder(messageType, protocolVersion);
if (decoder != null) {
return decoder.decode(clientConnection, messageBuffer, fixedHeader);
} else {
final MqttDecoder<?> decoder = mqttDecoders.decoder(messageType, protocolVersion);

if (decoder != null) {
message = decoder.decode(clientConnection, messageBuffer, fixedHeader);
} else {
switch (messageType) {
case RESERVED_ZERO:
mqttServerDisconnector.disconnect(clientConnection.getChannel(),
"A client (IP: {}) sent a message with an invalid message type '0'. This message type is reserved. Disconnecting client.",
"Sent a message with message type '0'",
Mqtt5DisconnectReasonCode.PROTOCOL_ERROR,
ReasonStrings.DISCONNECT_MESSAGE_TYPE_ZERO);
buf.clear();
return;
case CONNACK:
mqttServerDisconnector.disconnect(clientConnection.getChannel(),
"A client (IP: {}) sent a CONNACK message. This is invalid because clients are not allowed to send CONNACKs. Disconnecting client.",
"Sent a CONNACK message",
Mqtt5DisconnectReasonCode.PROTOCOL_ERROR,
ReasonStrings.DISCONNECT_CONNACK_RECEIVED);
buf.clear();
return;
case SUBACK:
mqttServerDisconnector.disconnect(clientConnection.getChannel(),
"A client (IP: {}) sent a SUBACK message. This is invalid because clients are not allowed to send SUBACKs. Disconnecting client.",
"Sent a SUBACK message",
Mqtt5DisconnectReasonCode.PROTOCOL_ERROR,
ReasonStrings.DISCONNECT_SUBACK_RECEIVED);
buf.clear();
return;
case UNSUBACK:
mqttServerDisconnector.disconnect(clientConnection.getChannel(),
"A client (IP: {}) sent a UNSUBACK message. This is invalid because clients are not allowed to send UNSUBACKs. Disconnecting client.",
"Sent a UNSUBACK message",
Mqtt5DisconnectReasonCode.PROTOCOL_ERROR,
ReasonStrings.DISCONNECT_UNSUBACK_RECEIVED);
buf.clear();
return;
case PINGRESP:
mqttServerDisconnector.disconnect(clientConnection.getChannel(),
"A client (IP: {}) sent a PINGRESP message. This is invalid because clients are not allowed to send PINGRESPs. Disconnecting client.",
"Sent a PINGRESP message",
Mqtt5DisconnectReasonCode.PROTOCOL_ERROR,
ReasonStrings.DISCONNECT_PINGRESP_RECEIVED);
buf.clear();
return;
case AUTH:
mqttServerDisconnector.disconnect(clientConnection.getChannel(),
"A client (IP: {}) sent a message with an invalid message type '15'. This message type is reserved. Disconnecting client.",
"Sent a message with message type '15'",
Mqtt5DisconnectReasonCode.PROTOCOL_ERROR,
ReasonStrings.DISCONNECT_MESSAGE_TYPE_FIFTEEN);
buf.clear();
return;
default:
mqttServerDisconnector.disconnect(clientConnection.getChannel(),
"A client (IP: {}) connected but the message type could not get determined. Disconnecting client.",
"Sent a message with invalid message type",
Mqtt5DisconnectReasonCode.PROTOCOL_ERROR,
ReasonStrings.DISCONNECT_MESSAGE_TYPE_INVALID);
buf.clear();
return;
}
switch (messageType) {
case RESERVED_ZERO:
mqttServerDisconnector.disconnect(clientConnection.getChannel(),
"A client (IP: {}) sent a message with an invalid message type '0'. This message type is reserved. Disconnecting client.",
"Sent a message with message type '0'",
Mqtt5DisconnectReasonCode.PROTOCOL_ERROR,
ReasonStrings.DISCONNECT_MESSAGE_TYPE_ZERO);
return null;
case CONNACK:
mqttServerDisconnector.disconnect(clientConnection.getChannel(),
"A client (IP: {}) sent a CONNACK message. This is invalid because clients are not allowed to send CONNACKs. Disconnecting client.",
"Sent a CONNACK message",
Mqtt5DisconnectReasonCode.PROTOCOL_ERROR,
ReasonStrings.DISCONNECT_CONNACK_RECEIVED);
return null;
case SUBACK:
mqttServerDisconnector.disconnect(clientConnection.getChannel(),
"A client (IP: {}) sent a SUBACK message. This is invalid because clients are not allowed to send SUBACKs. Disconnecting client.",
"Sent a SUBACK message",
Mqtt5DisconnectReasonCode.PROTOCOL_ERROR,
ReasonStrings.DISCONNECT_SUBACK_RECEIVED);
return null;
case UNSUBACK:
mqttServerDisconnector.disconnect(clientConnection.getChannel(),
"A client (IP: {}) sent a UNSUBACK message. This is invalid because clients are not allowed to send UNSUBACKs. Disconnecting client.",
"Sent a UNSUBACK message",
Mqtt5DisconnectReasonCode.PROTOCOL_ERROR,
ReasonStrings.DISCONNECT_UNSUBACK_RECEIVED);
return null;
case PINGRESP:
mqttServerDisconnector.disconnect(clientConnection.getChannel(),
"A client (IP: {}) sent a PINGRESP message. This is invalid because clients are not allowed to send PINGRESPs. Disconnecting client.",
"Sent a PINGRESP message",
Mqtt5DisconnectReasonCode.PROTOCOL_ERROR,
ReasonStrings.DISCONNECT_PINGRESP_RECEIVED);
return null;
case AUTH:
mqttServerDisconnector.disconnect(clientConnection.getChannel(),
"A client (IP: {}) sent a message with an invalid message type '15'. This message type is reserved. Disconnecting client.",
"Sent a message with message type '15'",
Mqtt5DisconnectReasonCode.PROTOCOL_ERROR,
ReasonStrings.DISCONNECT_MESSAGE_TYPE_FIFTEEN);
return null;
default:
mqttServerDisconnector.disconnect(clientConnection.getChannel(),
"A client (IP: {}) connected but the message type could not get determined. Disconnecting client.",
"Sent a message with invalid message type",
Mqtt5DisconnectReasonCode.PROTOCOL_ERROR,
ReasonStrings.DISCONNECT_MESSAGE_TYPE_INVALID);
return null;
}
}

if (message == null) {
buf.clear();
return;
}

globalMQTTMessageCounter.countInbound(message);
out.add(message);
}

private static int getFixedHeaderSize(final int remainingLength) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ public MqttConnectDecoder(
mqtt31ConnectDecoder = new Mqtt31ConnectDecoder(mqttConnacker, clientIds, fullConfigurationService, hiveMQId);
}

public @Nullable CONNECT decode(
final @NotNull ClientConnection clientConnection, final @NotNull ByteBuf buf, final byte fixedHeader) {
public @Nullable ProtocolVersion decodeProtocolVersion(
final @NotNull ClientConnection clientConnection, final @NotNull ByteBuf buf) {

/*
* It is sufficient to look at the second byte of the variable header (Length LSB) This byte
Expand All @@ -85,9 +85,7 @@ public MqttConnectDecoder(
}

final ByteBuf lengthLSBBuf = buf.slice(buf.readerIndex() + 1, 1);

final int lengthLSB = lengthLSBBuf.readByte();

final ProtocolVersion protocolVersion;
switch (lengthLSB) {
case 4:
Expand Down Expand Up @@ -117,6 +115,16 @@ public MqttConnectDecoder(
clientConnection.setProtocolVersion(protocolVersion);
clientConnection.setConnectReceivedTimestamp(System.currentTimeMillis());

return protocolVersion;
}

public @Nullable CONNECT decode(
final @NotNull ClientConnection clientConnection, final @NotNull ByteBuf buf, final byte fixedHeader) {

final ProtocolVersion protocolVersion = decodeProtocolVersion(clientConnection, buf);
if (protocolVersion == null) {
return null;
}
if (protocolVersion == ProtocolVersion.MQTTv5) {
return mqtt5ConnectDecoder.decode(clientConnection, buf, fixedHeader);
} else if (protocolVersion == ProtocolVersion.MQTTv3_1_1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class ReasonStrings {
public static final String CONNACK_NOT_AUTHORIZED_NO_AUTHENTICATOR = "Not authorized to connect. No authenticator registered.";
public static final String CONNACK_NOT_AUTHORIZED_FAILED = "Not authorized to connect. Authentication failed.";

public static final String CONNACK_PACKET_TOO_LARGE = "Sent CONNECT exceeded the maximum permissible size.";
public static final String CONNACK_TOPIC_NAME_INVALID_WILL_LENGTH = "Sent CONNECT with incorrect will topic length.";
public static final String CONNACK_TOPIC_NAME_INVALID_WILL_MALFORMED = "Sent CONNECT with malformed will topic.";
public static final String CONNACK_TOPIC_NAME_INVALID_WILL_EMPTY = "Sent CONNECT with empty will topic.";
Expand Down
Loading
Loading