#if !BESTHTTP_DISABLE_SIGNALR_CORE && BESTHTTP_SIGNALR_CORE_ENABLE_GAMEDEVWARE_MESSAGEPACK
using System;
using System.Linq;
using System.Collections.Generic;
using BestHTTP.PlatformSupport.Memory;
using BestHTTP.SignalRCore.Messages;
using GameDevWare.Serialization;
using GameDevWare.Serialization.MessagePack;
using GameDevWare.Serialization.Serializers;
namespace BestHTTP.SignalRCore.Encoders
{
/// <summary>
/// IPRotocol implementation using the "Json & MessagePack Serialization" asset store package (https://assetstore.unity.com/packages/tools/network/json-messagepack-serialization-59918).
/// </summary>
public sealed class MessagePackProtocol : BestHTTP.SignalRCore.IProtocol
{
public string Name { get { return "messagepack"; } }
public TransferModes Type { get { return TransferModes.Binary; } }
public IEncoder Encoder { get; private set; }
public HubConnection Connection { get; set; }
/// <summary>
/// This function must convert all element in the arguments array to the corresponding type from the argTypes array.
/// </summary>
public object[] GetRealArguments(Type[] argTypes, object[] arguments)
{
if (arguments == null || arguments.Length == 0)
return null;
if (argTypes.Length > arguments.Length)
throw new Exception(string.Format("argType.Length({0}) < arguments.length({1})", argTypes.Length, arguments.Length));
return arguments;
}
/// <summary>
/// Convert a value to the given type.
/// </summary>
public object ConvertTo(Type toType, object obj)
{
if (obj == null)
return null;
#if NETFX_CORE
TypeInfo typeInfo = toType.GetTypeInfo();
#endif
#if NETFX_CORE
if (typeInfo.IsEnum)
#else
if (toType.IsEnum)
#endif
return Enum.Parse(toType, obj.ToString(), true);
#if NETFX_CORE
if (typeInfo.IsPrimitive)
#else
if (toType.IsPrimitive)
#endif
return Convert.ChangeType(obj, toType);
if (toType == typeof(string))
return obj.ToString();
#if NETFX_CORE
if (typeInfo.IsGenericType && toType.Name == "Nullable`1")
return Convert.ChangeType(obj, toType.GenericTypeArguments[0]);
#else
if (toType.IsGenericType && toType.Name == "Nullable`1")
return Convert.ChangeType(obj, toType.GetGenericArguments()[0]);
#endif
return obj;
}
/// <summary>
/// This function must return the encoded representation of the given message.
/// </summary>
public BufferSegment EncodeMessage(Message message)
{
var memBuffer = BufferPool.Get(256, true);
var stream = new BestHTTP.Extensions.BufferPoolMemoryStream(memBuffer, 0, memBuffer.Length, true, true, false);
// Write 5 bytes for placeholder for length prefix
stream.WriteByte(0);
stream.WriteByte(0);
stream.WriteByte(0);
stream.WriteByte(0);
stream.WriteByte(0);
var buffer = BufferPool.Get(MsgPackWriter.DEFAULT_BUFFER_SIZE, true);
var writer = new MsgPackWriter(stream, new SerializationContext { Options = SerializationOptions.None, EnumSerializerFactory = (enumType) => new EnumNumberSerializer(enumType) }, buffer);
switch (message.type)
{
case MessageTypes.StreamItem:
// https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#streamitem-message-encoding-1
// [2, Headers, InvocationId, Item]
writer.WriteArrayBegin(4);
writer.WriteNumber(2);
WriteHeaders(writer);
writer.WriteString(message.invocationId);
WriteValue(writer, message.item);
writer.WriteArrayEnd();
break;
case MessageTypes.Completion:
// https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#completion-message-encoding-1
// [3, Headers, InvocationId, ResultKind, Result?]
byte resultKind = (byte)(!string.IsNullOrEmpty(message.error) ? /*error*/ 1 : message.result != null ? /*non-void*/ 3 : /*void*/ 2);
writer.WriteArrayBegin(resultKind == 2 ? 4 : 5);
writer.WriteNumber(3);
WriteHeaders(writer);
writer.WriteString(message.invocationId);
writer.WriteNumber(resultKind);
if (resultKind == 1) // error
writer.WriteString(message.error);
else if (resultKind == 3) // non-void
WriteValue(writer, message.result);
writer.WriteArrayEnd();
break;
case MessageTypes.Invocation:
// https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#invocation-message-encoding-1
// [1, Headers, InvocationId, NonBlocking, Target, [Arguments], [StreamIds]]
case MessageTypes.StreamInvocation:
// https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#streaminvocation-message-encoding-1
// [4, Headers, InvocationId, Target, [Arguments], [StreamIds]]
writer.WriteArrayBegin(message.streamIds != null ? 6 : 5);
writer.WriteNumber((int)message.type);
WriteHeaders(writer);
writer.WriteString(message.invocationId);
writer.WriteString(message.target);
writer.WriteArrayBegin(message.arguments != null ? message.arguments.Length : 0);
if (message.arguments != null)
for (int i = 0; i < message.arguments.Length; ++i)
WriteValue(writer, message.arguments[i]);
writer.WriteArrayEnd();
if (message.streamIds != null)
{
writer.WriteArrayBegin(message.streamIds.Length);
for (int i = 0; i < message.streamIds.Length; ++i)
WriteValue(writer, message.streamIds[i]);
writer.WriteArrayEnd();
}
writer.WriteArrayEnd();
break;
case MessageTypes.CancelInvocation:
// https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#cancelinvocation-message-encoding-1
// [5, Headers, InvocationId]
writer.WriteArrayBegin(3);
writer.WriteNumber(5);
WriteHeaders(writer);
writer.WriteString(message.invocationId);
writer.WriteArrayEnd();
break;
case MessageTypes.Ping:
// https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#ping-message-encoding-1
// [6]
writer.WriteArrayBegin(1);
writer.WriteNumber(6);
writer.WriteArrayEnd();
break;
case MessageTypes.Close:
// https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#close-message-encoding-1
// [7, Error, AllowReconnect?]
writer.WriteArrayBegin(string.IsNullOrEmpty(message.error) ? 1 : 2);
writer.WriteNumber(7);
if (!string.IsNullOrEmpty(message.error))
writer.WriteString(message.error);
writer.WriteArrayEnd();
break;
}
writer.Flush();
// release back the buffer we used for the MsgPackWriter
BufferPool.Release(buffer);
// get how much bytes got written to the buffer. This includes the 5 placeholder bytes too.
int length = (int)stream.Position;
// this is the length without the 5 placeholder bytes
int contentLength = length - 5;
// get the stream's internal buffer. We set the releaseBuffer flag to false, so we can use it safely.
buffer = stream.GetBuffer();
// add varint length prefix
byte prefixBytes = GetRequiredBytesForLengthPrefix(contentLength);
WriteLengthAsVarInt(buffer, 5 - prefixBytes, contentLength);
// return with the final segment
return new BufferSegment(buffer, 5 - prefixBytes, contentLength + prefixBytes);
}
private void WriteValue(MsgPackWriter writer, object value)
{
if (value == null)
writer.WriteNull();
else
writer.WriteValue(value, value.GetType());
}
private void WriteHeaders(MsgPackWriter writer)
{
writer.WriteObjectBegin(0);
writer.WriteObjectEnd();
}
/// <summary>
/// This function must parse binary representation of the messages into the list of Messages.
/// </summary>
public void ParseMessages(BufferSegment segment, ref List<Message> messages)
{
messages.Clear();
int offset = segment.Offset;
while (offset < segment.Count)
{
int length = ReadVarInt(segment.Data, ref offset);
using (var stream = new System.IO.MemoryStream(segment.Data, offset, length))
{
var buff = BufferPool.Get(MsgPackReader.DEFAULT_BUFFER_SIZE, true);
try
{
var reader = new MsgPackReader(stream, new SerializationContext { Options = SerializationOptions.None }, Endianness.BigEndian, buff);
reader.NextToken();
reader.NextToken();
int messageType = reader.ReadByte();
switch ((MessageTypes)messageType)
{
case MessageTypes.Invocation: messages.Add(ReadInvocation(reader)); break;
case MessageTypes.StreamItem: messages.Add(ReadStreamItem(reader)); break;
case MessageTypes.Completion: messages.Add(ReadCompletion(reader)); break;
case MessageTypes.StreamInvocation: messages.Add(ReadStreamInvocation(reader)); break;
case MessageTypes.CancelInvocation: messages.Add(ReadCancelInvocation(reader)); break;
case MessageTypes.Ping:
// https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#ping-message-encoding-1
messages.Add(new Message { type = MessageTypes.Ping });
break;
case MessageTypes.Close: messages.Add(ReadClose(reader)); break;
}
reader.NextToken();
}
finally
{
BufferPool.Release(buff);
}
}
offset += length;
}
}
private Message ReadClose(MsgPackReader reader)
{
// https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#close-message-encoding-1
string error = reader.ReadString();
bool allowReconnect = false;
try
{
allowReconnect = reader.ReadBoolean();
}
catch { }
return new Message
{
type = MessageTypes.Close,
error = error,
allowReconnect = allowReconnect
};
}
private Message ReadCancelInvocation(MsgPackReader reader)
{
// https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#cancelinvocation-message-encoding-1
ReadHeaders(reader);
string invocationId = reader.ReadString();
return new Message
{
type = MessageTypes.CancelInvocation,
invocationId = invocationId
};
}
private Message ReadStreamInvocation(MsgPackReader reader)
{
// https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#streaminvocation-message-encoding-1
ReadHeaders(reader);
string invocationId = reader.ReadString();
string target = reader.ReadString();
object[] arguments = ReadArguments(reader, target);
string[] streamIds = ReadStreamIds(reader);
return new Message
{
type = MessageTypes.StreamInvocation,
invocationId = invocationId,
target = target,
arguments = arguments,
streamIds = streamIds
};
}
private Message ReadCompletion(MsgPackReader reader)
{
// https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#completion-message-encoding-1
ReadHeaders(reader);
string invocationId = reader.ReadString();
byte resultKind = reader.ReadByte();
switch(resultKind)
{
// 1 - Error result - Result contains a String with the error message
case 1:
string error = reader.ReadString();
return new Message
{
type = MessageTypes.Completion,
invocationId = invocationId,
error = error
};
// 2 - Void result - Result is absent
case 2:
return new Message
{
type = MessageTypes.Completion,
invocationId = invocationId
};
// 3 - Non-Void result - Result contains the value returned by the server
case 3:
object item = ReadItem(reader, invocationId);
return new Message
{
type = MessageTypes.Completion,
invocationId = invocationId,
item = item,
result = item
};
default:
throw new NotImplementedException("Unknown resultKind: " + resultKind);
}
}
private Message ReadStreamItem(MsgPackReader reader)
{
// https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#streamitem-message-encoding-1
ReadHeaders(reader);
string invocationId = reader.ReadString();
object item = ReadItem(reader, invocationId);
return new Message
{
type = MessageTypes.StreamItem,
invocationId = invocationId,
item = item
};
}
private Message ReadInvocation(MsgPackReader reader)
{
// https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#invocation-message-encoding-1
ReadHeaders(reader);
string invocationId = reader.ReadString();
string target = reader.ReadString();
object[] arguments = ReadArguments(reader, target);
string[] streamIds = ReadStreamIds(reader);
return new Message
{
type = MessageTypes.Invocation,
invocationId = invocationId,
target = target,
arguments = arguments,
streamIds = streamIds
};
}
private object ReadItem(MsgPackReader reader, string invocationId)
{
long longId = 0;
if (long.TryParse(invocationId, out longId))
{
Type itemType = this.Connection.GetItemType(longId);
return reader.ReadValue(itemType);
}
else
return reader.ReadValue(typeof(object));
}
private string[] ReadStreamIds(MsgPackReader reader)
{
return reader.ReadValue(typeof(string[])) as string[];
}
private object[] ReadArguments(MsgPackReader reader, string target)
{
reader.NextToken();
var subscription = this.Connection.GetSubscription(target);
object[] args;
if (subscription == null || subscription.callbacks == null || subscription.callbacks.Count == 0)
{
args = reader.ReadValue(typeof(object[])) as object[];
}
else
{
args = new object[subscription.callbacks[0].ParamTypes.Length];
for (int i = 0; i < subscription.callbacks[0].ParamTypes.Length; ++i)
args[i] = reader.ReadValue(subscription.callbacks[0].ParamTypes[i]);
}
reader.NextToken();
return args;
}
private Dictionary<string, string> ReadHeaders(MsgPackReader reader)
{
return reader.ReadValue(typeof(Dictionary<string, string>)) as Dictionary<string, string>;
}
public static byte GetRequiredBytesForLengthPrefix(int length)
{
byte bytes = 0;
do
{
length >>= 7;
bytes++;
}
while (length > 0);
return bytes;
}
public static int WriteLengthAsVarInt(byte[] data, int offset, int length)
{
do
{
var current = data[offset];
current = (byte)(length & 0x7f);
length >>= 7;
if (length > 0)
{
current |= 0x80;
}
data[offset++] = current;
}
while (length > 0);
return offset;
}
public static int ReadVarInt(byte[] data, ref int offset)
{
int num = 0;
do
{
num <<= 7;
num |= data[offset] & 0x7F;
} while ((data[offset++] & 0x80) != 0);
return num;
}
}
}
#endif