/
ReplicatedDataSerializer.cs
1689 lines (1472 loc) · 74.5 KB
/
ReplicatedDataSerializer.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
//-----------------------------------------------------------------------
// <copyright file="ReplicatedDataSerializer.cs" company="Akka.NET Project">
// Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------
using Akka.Actor;
using Akka.DistributedData.Internal;
using Akka.Serialization;
using Google.Protobuf;
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Reflection;
using System.Runtime.Serialization;
using Akka.DistributedData.Serialization.Proto.Msg;
using Akka.Util;
using Akka.Util.Internal;
using ArgumentOutOfRangeException = System.ArgumentOutOfRangeException;
using IActorRef = Akka.Actor.IActorRef;
namespace Akka.DistributedData.Serialization
{
public sealed class ReplicatedDataSerializer : SerializerWithStringManifest
{
private const string DeletedDataManifest = "A";
private const string GSetManifest = "B";
private const string GSetKeyManifest = "b";
private const string ORSetManifest = "C";
private const string ORSetKeyManifest = "c";
private const string ORSetAddManifest = "Ca";
private const string ORSetRemoveManifest = "Cr";
private const string ORSetFullManifest = "Cf";
private const string ORSetDeltaGroupManifest = "Cg";
private const string FlagManifest = "D";
private const string FlagKeyManifest = "d";
private const string LWWRegisterManifest = "E";
private const string LWWRegisterKeyManifest = "e";
private const string GCounterManifest = "F";
private const string GCounterKeyManifest = "f";
private const string PNCounterManifest = "G";
private const string PNCounterKeyManifest = "g";
private const string ORMapManifest = "H";
private const string ORMapKeyManifest = "h";
private const string ORMapPutManifest = "Ha";
private const string ORMapRemoveManifest = "Hr";
private const string ORMapRemoveKeyManifest = "Hk";
private const string ORMapUpdateManifest = "Hu";
private const string ORMapDeltaGroupManifest = "Hg";
private const string LWWMapManifest = "I";
private const string LWWMapDeltaGroupManifest = "Ig";
private const string LWWMapKeyManifest = "i";
private const string PNCounterMapManifest = "J";
private const string PNCounterMapDeltaOperationManifest = "Jo";
private const string PNCounterMapKeyManifest = "j";
private const string ORMultiMapManifest = "K";
private const string ORMultiMapDeltaOperationManifest = "Ko";
private const string ORMultiMapKeyManifest = "k";
private const string VersionVectorManifest = "L";
private readonly SerializationSupport _ser;
private readonly byte[] _emptyArray = Array.Empty<byte>();
public ReplicatedDataSerializer(ExtendedActorSystem system) : base(system)
{
_ser = new SerializationSupport(system);
}
public override byte[] ToBinary(object obj)
{
switch (obj)
{
case IORSet o: return SerializationSupport.Compress(ToProto(o));
case ORSet.IAddDeltaOperation o: return ToProto(o.UnderlyingSerialization).ToByteArray();
case ORSet.IRemoveDeltaOperation o: return ToProto(o.UnderlyingSerialization).ToByteArray();
case IGSet g: return ToProto(g).ToByteArray();
case GCounter g: return ToProto(g).ToByteArray();
case PNCounter p: return ToProto(p).ToByteArray();
case Flag f: return ToProto(f).ToByteArray();
case ILWWRegister l: return ToProto(l).ToByteArray();
case IORDictionary o: return SerializationSupport.Compress(ToProto(o));
case ORDictionary.IDeltaOperation p: return ToProto(p).ToByteArray();
case ILWWDictionary l: return SerializationSupport.Compress(ToProto(l));
case ILWWDictionaryDeltaOperation ld: return ToProto(ld.Underlying).ToByteArray();
case IPNCounterDictionary pn: return SerializationSupport.Compress(ToProto(pn));
case IPNCounterDictionaryDeltaOperation pnd: return ToProto(pnd.Underlying).ToByteArray();
case IORMultiValueDictionary m: return SerializationSupport.Compress(ToProto(m));
case IORMultiValueDictionaryDeltaOperation md: return ToProto(md).ToByteArray();
case DeletedData _: return _emptyArray;
case VersionVector v: return SerializationSupport.VersionVectorToProto(v).ToByteArray();
// key types
case IKey k: return ToProto(k).ToByteArray();
// less common delta types
case ORSet.IDeltaGroupOperation o: return ToProto(o).ToByteArray();
case ORSet.IFullStateDeltaOperation o: return ToProto(o.UnderlyingSerialization).ToByteArray();
default:
throw new ArgumentException($"Can't serialize object of type [{obj.GetType().FullName}] in [{GetType().FullName}]");
}
}
public override object FromBinary(byte[] bytes, string manifest)
{
switch (manifest)
{
case ORSetManifest: return ORSetFromBinary(SerializationSupport.Decompress(bytes));
case ORSetAddManifest: return ORAddDeltaOperationFromBinary(bytes);
case ORSetRemoveManifest: return ORRemoveOperationFromBinary(bytes);
case GSetManifest: return GSetFromBinary(bytes);
case GCounterManifest: return GCounterFromBytes(bytes);
case PNCounterManifest: return PNCounterFromBytes(bytes);
case FlagManifest: return FlagFromBinary(bytes);
case LWWRegisterManifest: return LWWRegisterFromBinary(bytes);
case ORMapManifest: return ORDictionaryFromBinary(SerializationSupport.Decompress(bytes));
case ORMapPutManifest: return ORDictionaryPutFromBinary(bytes);
case ORMapRemoveManifest: return ORDictionaryRemoveFromBinary(bytes);
case ORMapRemoveKeyManifest: return ORDictionaryRemoveKeyFromBinary(bytes);
case ORMapUpdateManifest: return ORDictionaryUpdateFromBinary(bytes);
case ORMapDeltaGroupManifest: return ORDictionaryDeltaGroupFromBinary(bytes);
case LWWMapManifest: return LWWDictionaryFromBinary(SerializationSupport.Decompress(bytes));
case LWWMapDeltaGroupManifest:
return LWWDictionaryDeltaGroupFromBinary(bytes);
case PNCounterMapManifest: return PNCounterDictionaryFromBinary(SerializationSupport.Decompress(bytes));
case PNCounterMapDeltaOperationManifest: return PNCounterDeltaFromBinary(bytes);
case ORMultiMapManifest: return ORMultiDictionaryFromBinary(SerializationSupport.Decompress(bytes));
case ORMultiMapDeltaOperationManifest: return ORMultiDictionaryDeltaFromBinary(bytes);
case DeletedDataManifest: return DeletedData.Instance;
case VersionVectorManifest: return _ser.VersionVectorFromBinary(bytes);
// key types
case ORSetKeyManifest: return ORSetKeyFromBinary(bytes);
case GSetKeyManifest: return GSetKeyFromBinary(bytes);
case GCounterKeyManifest: return GCounterKeyFromBinary(bytes);
case PNCounterKeyManifest: return PNCounterKeyFromBinary(bytes);
case FlagKeyManifest: return FlagKeyFromBinary(bytes);
case LWWRegisterKeyManifest: return LWWRegisterKeyFromBinary(bytes);
case ORMapKeyManifest: return ORDictionaryKeyFromBinary(bytes);
case LWWMapKeyManifest: return LWWDictionaryKeyFromBinary(bytes);
case PNCounterMapKeyManifest: return PNCounterDictionaryKeyFromBinary(bytes);
case ORMultiMapKeyManifest: return ORMultiValueDictionaryKeyFromBinary(bytes);
// less common delta types
case ORSetDeltaGroupManifest: return ORDeltaGroupOperationFromBinary(bytes);
case ORSetFullManifest: return ORFullStateDeltaOperationFromBinary(bytes);
default:
throw new ArgumentException($"Can't deserialize object with unknown manifest [{manifest}]");
}
}
public override string Manifest(object o)
{
switch (o)
{
case IORSet _: return ORSetManifest;
case ORSet.IAddDeltaOperation _: return ORSetAddManifest;
case ORSet.IRemoveDeltaOperation _: return ORSetRemoveManifest;
case IGSet _: return GSetManifest;
case GCounter _: return GCounterManifest;
case PNCounter _: return PNCounterManifest;
case Flag _: return FlagManifest;
case ILWWRegister _: return LWWRegisterManifest;
case IORDictionary _: return ORMapManifest;
case ORDictionary.IPutDeltaOp _: return ORMapPutManifest;
case ORDictionary.IRemoveDeltaOp _: return ORMapRemoveManifest;
case ORDictionary.IRemoveKeyDeltaOp _: return ORMapRemoveKeyManifest;
case ORDictionary.IUpdateDeltaOp _: return ORMapUpdateManifest;
case ILWWDictionary _: return LWWMapManifest;
case ILWWDictionaryDeltaOperation _: return LWWMapDeltaGroupManifest;
case IPNCounterDictionary _: return PNCounterMapManifest;
case IPNCounterDictionaryDeltaOperation _: return PNCounterMapDeltaOperationManifest;
case IORMultiValueDictionary _: return ORMultiMapManifest;
case IORMultiValueDictionaryDeltaOperation _: return ORMultiMapDeltaOperationManifest;
case DeletedData _: return DeletedDataManifest;
case VersionVector _: return VersionVectorManifest;
// key types
case IORSetKey _: return ORSetKeyManifest;
case IGSetKey _: return GSetKeyManifest;
case GCounterKey _: return GCounterKeyManifest;
case PNCounterKey _: return PNCounterKeyManifest;
case FlagKey _: return FlagKeyManifest;
case ILWWRegisterKey _: return LWWRegisterKeyManifest;
case IORDictionaryKey _: return ORMapKeyManifest;
case ILWWDictionaryKey _: return LWWMapKeyManifest;
case IPNCounterDictionaryKey _: return PNCounterMapKeyManifest;
case IORMultiValueDictionaryKey _: return ORMultiMapKeyManifest;
// less common delta types
case ORSet.IDeltaGroupOperation _: return ORSetDeltaGroupManifest;
case ORDictionary.IDeltaGroupOp _: return ORMapDeltaGroupManifest;
case ORSet.IFullStateDeltaOperation _: return ORSetFullManifest;
default:
throw new ArgumentException($"Can't serialize object of type [{o.GetType().FullName}] in [{GetType().FullName}]");
}
}
private static TypeDescriptor GetTypeDescriptor(Type t)
{
var typeInfo = new TypeDescriptor();
if (t == typeof(string))
{
typeInfo.Type = ValType.String;
}
else if (t == typeof(int))
{
typeInfo.Type = ValType.Int;
}
else if (t == typeof(long))
{
typeInfo.Type = ValType.Long;
}
else if (t == typeof(IActorRef))
{
typeInfo.Type = ValType.ActorRef;
}
else
{
typeInfo.Type = ValType.Other;
typeInfo.TypeName = t.TypeQualifiedName();
}
return typeInfo;
}
private static Type GetTypeFromDescriptor(TypeDescriptor t)
{
switch (t.Type)
{
case ValType.Int:
return typeof(int);
case ValType.Long:
return typeof(long);
case ValType.String:
return typeof(string);
case ValType.ActorRef:
return typeof(IActorRef);
case ValType.Other:
{
var type = Type.GetType(t.TypeName);
return type;
}
default:
throw new SerializationException($"Unknown ValType of [{t.Type}] detected");
}
}
#region ORSet
private IORSet ORSetFromBinary(byte[] bytes)
{
return FromProto(Proto.Msg.ORSet.Parser.ParseFrom(bytes));
}
private Proto.Msg.ORSet ToProto(IORSet orset)
{
var b = new Proto.Msg.ORSet
{
TypeInfo = new TypeDescriptor()
};
switch (orset)
{
case ORSet<int> ints:
{
b.Vvector = SerializationSupport.VersionVectorToProto(ints.VersionVector);
b.TypeInfo.Type = ValType.Int;
var intElements = new List<int>(ints.ElementsMap.Keys);
intElements.Sort();
foreach (var val in intElements)
{
b.IntElements.Add(val);
b.Dots.Add(SerializationSupport.VersionVectorToProto(ints.ElementsMap[val]));
}
return b;
}
case ORSet<long> longs:
{
b.Vvector = SerializationSupport.VersionVectorToProto(longs.VersionVector);
b.TypeInfo.Type = ValType.Long;
var longElements = new List<long>(longs.ElementsMap.Keys);
longElements.Sort();
foreach (var val in longElements)
{
b.LongElements.Add(val);
b.Dots.Add(SerializationSupport.VersionVectorToProto(longs.ElementsMap[val]));
}
return b;
}
case ORSet<string> strings:
{
b.Vvector = SerializationSupport.VersionVectorToProto(strings.VersionVector);
b.TypeInfo.Type = ValType.String;
var stringElements = new List<string>(strings.ElementsMap.Keys);
stringElements.Sort();
foreach (var val in stringElements)
{
b.StringElements.Add(val);
b.Dots.Add(SerializationSupport.VersionVectorToProto(strings.ElementsMap[val]));
}
return b;
}
case ORSet<IActorRef> refs:
{
b.Vvector = SerializationSupport.VersionVectorToProto(refs.VersionVector);
b.TypeInfo.Type = ValType.ActorRef;
var actorRefElements = new List<IActorRef>(refs.ElementsMap.Keys);
actorRefElements.Sort();
foreach (var val in actorRefElements)
{
b.ActorRefElements.Add(Akka.Serialization.Serialization.SerializedActorPath(val));
b.Dots.Add(SerializationSupport.VersionVectorToProto(refs.ElementsMap[val]));
}
return b;
}
default: // unknown type
{
// runtime type - enter horrible dynamic serialization stuff
var makeProto = ORSetUnknownMaker.MakeGenericMethod(orset.SetType);
return (Proto.Msg.ORSet)makeProto.Invoke(this, new object[] { orset, b });
}
}
}
private IORSet FromProto(Proto.Msg.ORSet orset)
{
var dots = orset.Dots.Select(x => _ser.VersionVectorFromProto(x));
var vector = _ser.VersionVectorFromProto(orset.Vvector);
if (orset.IntElements.Count > 0 || orset.TypeInfo.Type == ValType.Int)
{
var eInt = orset.IntElements.Zip(dots, (i, versionVector) => (i, versionVector))
.ToImmutableDictionary(x => x.i, y => y.versionVector);
return new ORSet<int>(eInt, vector);
}
if (orset.LongElements.Count > 0 || orset.TypeInfo.Type == ValType.Long)
{
var eLong = orset.LongElements.Zip(dots, (i, versionVector) => (i, versionVector))
.ToImmutableDictionary(x => x.i, y => y.versionVector);
return new ORSet<long>(eLong, vector);
}
if (orset.StringElements.Count > 0 || orset.TypeInfo.Type == ValType.String)
{
var eStr = orset.StringElements.Zip(dots, (i, versionVector) => (i, versionVector))
.ToImmutableDictionary(x => x.i, y => y.versionVector);
return new ORSet<string>(eStr, vector);
}
if (orset.ActorRefElements.Count > 0 || orset.TypeInfo.Type == ValType.ActorRef)
{
var eRef = orset.ActorRefElements.Zip(dots, (i, versionVector) => (i, versionVector))
.ToImmutableDictionary(x => _ser.ResolveActorRef(x.i), y => y.versionVector);
return new ORSet<IActorRef>(eRef, vector);
}
// runtime type - enter horrible dynamic serialization stuff
var setContentType = Type.GetType(orset.TypeInfo.TypeName);
var eOther = orset.OtherElements.Zip(dots,
(i, versionVector) => (_ser.OtherMessageFromProto(i), versionVector))
.ToImmutableDictionary(x => x.Item1, x => x.versionVector);
var setType = ORSetMaker.MakeGenericMethod(setContentType);
return (IORSet)setType.Invoke(this, new object[] { eOther, vector });
}
private static readonly MethodInfo ORSetMaker =
typeof(ReplicatedDataSerializer).GetMethod(nameof(ToGenericORSet), BindingFlags.Static | BindingFlags.NonPublic);
private static ORSet<T> ToGenericORSet<T>(ImmutableDictionary<object, VersionVector> elems, VersionVector vector)
{
var finalInput = elems.ToImmutableDictionary(x => (T)x.Key, v => v.Value);
return new ORSet<T>(finalInput, vector);
}
private static readonly MethodInfo ORSetUnknownMaker =
typeof(ReplicatedDataSerializer).GetMethod(nameof(ORSetUnknownToProto), BindingFlags.Instance | BindingFlags.NonPublic);
/// <summary>
/// Called when we're serializing none of the standard object types with ORSet
/// </summary>
private Proto.Msg.ORSet ORSetUnknownToProto<T>(IORSet o, Proto.Msg.ORSet b)
{
var orset = (ORSet<T>)o;
b.Vvector = SerializationSupport.VersionVectorToProto(orset.VersionVector);
b.TypeInfo.Type = ValType.Other;
b.TypeInfo.TypeName = typeof(T).TypeQualifiedName();
var otherElements = new List<OtherMessage>();
var otherElementsDict = new Dictionary<OtherMessage, Proto.Msg.VersionVector>();
foreach (var kvp in orset.ElementsMap)
{
var otherElement = _ser.OtherMessageToProto(kvp.Key);
otherElements.Add(otherElement);
otherElementsDict[otherElement] = SerializationSupport.VersionVectorToProto(kvp.Value);
}
otherElements.Sort(OtherMessageComparer.Instance);
foreach (var val in otherElements)
{
b.OtherElements.Add(val);
b.Dots.Add(otherElementsDict[val]);
}
return b;
}
private ORSet.IAddDeltaOperation ORAddDeltaOperationFromBinary(byte[] bytes)
{
var set = FromProto(Proto.Msg.ORSet.Parser.ParseFrom(bytes));
return set.ToAddDeltaOperation();
}
private ORSet.IRemoveDeltaOperation ORRemoveOperationFromBinary(byte[] bytes)
{
var set = FromProto(Proto.Msg.ORSet.Parser.ParseFrom(bytes));
return set.ToRemoveDeltaOperation();
}
private ORSet.IFullStateDeltaOperation ORFullStateDeltaOperationFromBinary(byte[] bytes)
{
var set = FromProto(Proto.Msg.ORSet.Parser.ParseFrom(bytes));
return set.ToFullStateDeltaOperation();
}
private Proto.Msg.ORSetDeltaGroup ToProto(ORSet.IDeltaGroupOperation orset)
{
var deltaGroup = new Proto.Msg.ORSetDeltaGroup();
var gatheredTypeInfo = false;
void SetType(IORSet underlying)
{
if (!gatheredTypeInfo) // only need to do this once - all Deltas must have ORSet<T> of same <T>
{
deltaGroup.TypeInfo = GetTypeDescriptor(underlying.SetType);
}
gatheredTypeInfo = true;
}
foreach (var op in orset.OperationsSerialization)
{
switch (op)
{
case ORSet.IAddDeltaOperation add:
deltaGroup.Entries.Add(new ORSetDeltaGroup.Types.Entry() { Operation = ORSetDeltaOp.Add, Underlying = ToProto(add.UnderlyingSerialization) });
SetType(add.UnderlyingSerialization);
break;
case ORSet.IRemoveDeltaOperation remove:
deltaGroup.Entries.Add(new ORSetDeltaGroup.Types.Entry() { Operation = ORSetDeltaOp.Remove, Underlying = ToProto(remove.UnderlyingSerialization) });
SetType(remove.UnderlyingSerialization);
break;
case ORSet.IFullStateDeltaOperation full:
deltaGroup.Entries.Add(new ORSetDeltaGroup.Types.Entry() { Operation = ORSetDeltaOp.Full, Underlying = ToProto(full.UnderlyingSerialization) });
SetType(full.UnderlyingSerialization);
break;
default: throw new ArgumentException($"{op} should not be nested");
}
}
return deltaGroup;
}
private ORSet.IDeltaGroupOperation ORDeltaGroupOperationFromBinary(byte[] bytes)
{
var deltaGroup = Proto.Msg.ORSetDeltaGroup.Parser.ParseFrom(bytes);
var ops = new List<ORSet.IDeltaOperation>();
foreach (var op in deltaGroup.Entries)
{
switch (op.Operation)
{
case ORSetDeltaOp.Add:
ops.Add(FromProto(op.Underlying).ToAddDeltaOperation());
break;
case ORSetDeltaOp.Remove:
ops.Add(FromProto(op.Underlying).ToRemoveDeltaOperation());
break;
case ORSetDeltaOp.Full:
ops.Add(FromProto(op.Underlying).ToFullStateDeltaOperation());
break;
default:
throw new SerializationException($"Unknown ORSet delta operation ${op.Operation}");
}
}
var arr = ops.Cast<IReplicatedData>().ToImmutableArray();
switch (deltaGroup.TypeInfo.Type)
{
case ValType.Int:
return new ORSet<int>.DeltaGroup(arr);
case ValType.Long:
return new ORSet<long>.DeltaGroup(arr);
case ValType.String:
return new ORSet<string>.DeltaGroup(arr);
case ValType.ActorRef:
return new ORSet<IActorRef>.DeltaGroup(arr);
}
// if we made it this far, we're working with an object type
// enter reflection magic
var type = Type.GetType(deltaGroup.TypeInfo.TypeName);
var orDeltaGroupType = typeof(ORSet<>.DeltaGroup).MakeGenericType(type);
return (ORSet.IDeltaGroupOperation)Activator.CreateInstance(orDeltaGroupType, arr);
}
#endregion
#region GSet
private Proto.Msg.GSet GSetToProto<T>(GSet<T> gset)
{
var p = new Proto.Msg.GSet();
p.TypeInfo = GetTypeDescriptor(typeof(T));
return p;
}
private Proto.Msg.GSet GSetToProtoUnknown<T>(IGSet g)
{
var gset = (GSet<T>)g;
var otherElements = new List<OtherMessage>(gset.Select(x => _ser.OtherMessageToProto(x)));
otherElements.Sort(OtherMessageComparer.Instance);
var p = new Proto.Msg.GSet
{
TypeInfo = GetTypeDescriptor(typeof(T))
};
p.OtherElements.Add(otherElements);
return p;
}
private static readonly MethodInfo GSetUnknownToProtoMaker =
typeof(ReplicatedDataSerializer).GetMethod(nameof(GSetToProtoUnknown), BindingFlags.Instance | BindingFlags.NonPublic);
private Proto.Msg.GSet ToProto(IGSet gset)
{
switch (gset)
{
case GSet<int> ints:
{
var p = GSetToProto(ints);
var intElements = new List<int>(ints.Elements);
intElements.Sort();
p.IntElements.Add(intElements);
return p;
}
case GSet<long> longs:
{
var p = GSetToProto(longs);
var longElements = new List<long>(longs.Elements);
longElements.Sort();
p.LongElements.Add(longElements);
return p;
}
case GSet<string> strings:
{
var p = GSetToProto(strings);
var stringElements = new List<string>(strings.Elements);
stringElements.Sort();
p.StringElements.Add(stringElements);
return p;
}
case GSet<IActorRef> refs:
{
var p = GSetToProto(refs);
var refElements = new List<IActorRef>(refs.Elements);
refElements.Sort();
p.ActorRefElements.Add(refElements.Select(Akka.Serialization.Serialization.SerializedActorPath));
return p;
}
default: // unknown type
{
var protoMaker = GSetUnknownToProtoMaker.MakeGenericMethod(gset.SetType);
return (Proto.Msg.GSet)protoMaker.Invoke(this, new object[] { gset });
}
}
}
private IGSet GSetFromBinary(byte[] bytes)
{
var gset = Proto.Msg.GSet.Parser.ParseFrom(bytes);
switch (gset.TypeInfo.Type)
{
case ValType.Int:
{
var eInt = gset.IntElements.ToImmutableHashSet();
return new GSet<int>(eInt);
}
case ValType.Long:
{
var eLong = gset.LongElements.ToImmutableHashSet();
return new GSet<long>(eLong);
}
case ValType.String:
{
var eStr = gset.StringElements.ToImmutableHashSet();
return new GSet<string>(eStr);
}
case ValType.ActorRef:
{
var eRef = gset.ActorRefElements.Select(x => _ser.ResolveActorRef(x)).ToImmutableHashSet();
return new GSet<IActorRef>(eRef);
}
case ValType.Other:
{
// runtime type - enter horrible dynamic serialization stuff
var setContentType = Type.GetType(gset.TypeInfo.TypeName);
var eOther = gset.OtherElements.Select(x => _ser.OtherMessageFromProto(x));
var setType = GSetMaker.MakeGenericMethod(setContentType);
return (IGSet)setType.Invoke(this, new object[] { eOther });
}
default:
throw new SerializationException($"Unknown ValType of [{gset.TypeInfo.Type}] detected while deserializing GSet");
}
}
private static readonly MethodInfo GSetMaker =
typeof(ReplicatedDataSerializer).GetMethod(nameof(ToGenericGSet), BindingFlags.Static | BindingFlags.NonPublic);
private static GSet<T> ToGenericGSet<T>(IEnumerable<object> items)
{
return new GSet<T>(items.Cast<T>().ToImmutableHashSet());
}
#endregion
#region GCounter
private Proto.Msg.GCounter ToProto(GCounter counter)
{
var gProto = new Proto.Msg.GCounter();
gProto.Entries.AddRange(counter.State.Select(x => new Proto.Msg.GCounter.Types.Entry() { Node = SerializationSupport.UniqueAddressToProto(x.Key), Value = ByteString.CopyFrom(BitConverter.GetBytes(x.Value)) }));
return gProto;
}
private GCounter GCounterFromBytes(byte[] bytes)
{
var gProto = Proto.Msg.GCounter.Parser.ParseFrom(bytes);
return GCounterFromProto(gProto);
}
private GCounter GCounterFromProto(Proto.Msg.GCounter gProto)
{
var entries = gProto.Entries.ToImmutableDictionary(k => _ser.UniqueAddressFromProto(k.Node),
v => BitConverter.ToUInt64(v.Value.ToByteArray(), 0));
return new GCounter(entries);
}
#endregion
#region PNCounter
private Proto.Msg.PNCounter ToProto(PNCounter counter)
{
var pProto = new Proto.Msg.PNCounter();
pProto.Increments = ToProto(counter.Increments);
pProto.Decrements = ToProto(counter.Decrements);
return pProto;
}
private PNCounter PNCounterFromBytes(byte[] bytes)
{
var pProto = Proto.Msg.PNCounter.Parser.ParseFrom(bytes);
return PNCounterFromProto(pProto);
}
private PNCounter PNCounterFromProto(Proto.Msg.PNCounter pProto)
{
var increments = GCounterFromProto(pProto.Increments);
var decrements = GCounterFromProto(pProto.Decrements);
return new PNCounter(increments, decrements);
}
#endregion
#region Flag
private Proto.Msg.Flag ToProto(Flag flag)
{
var pFlag = new Proto.Msg.Flag();
pFlag.Enabled = flag;
return pFlag;
}
private Flag FlagFromProto(Proto.Msg.Flag flag)
{
return flag.Enabled ? Flag.True : Flag.False;
}
private Flag FlagFromBinary(byte[] bytes)
{
return FlagFromProto(Proto.Msg.Flag.Parser.ParseFrom(bytes));
}
#endregion
#region LWWRegister
private Proto.Msg.LWWRegister ToProto(ILWWRegister register)
{
var protoMaker = LWWProtoMaker.MakeGenericMethod(register.RegisterType);
return (Proto.Msg.LWWRegister)protoMaker.Invoke(this, new object[] { register });
}
private static readonly MethodInfo LWWProtoMaker =
typeof(ReplicatedDataSerializer).GetMethod(nameof(LWWToProto), BindingFlags.Instance | BindingFlags.NonPublic);
private Proto.Msg.LWWRegister LWWToProto<T>(ILWWRegister r)
{
var register = (LWWRegister<T>)r;
var pLww = new Proto.Msg.LWWRegister();
pLww.Node = SerializationSupport.UniqueAddressToProto(register.UpdatedBy);
pLww.State = _ser.OtherMessageToProto(register.Value);
pLww.Timestamp = register.Timestamp;
pLww.TypeInfo = GetTypeDescriptor(r.RegisterType);
return pLww;
}
private ILWWRegister LWWRegisterFromBinary(byte[] bytes)
{
var proto = Proto.Msg.LWWRegister.Parser.ParseFrom(bytes);
return LWWRegisterFromProto(proto);
}
private ILWWRegister LWWRegisterFromProto(Proto.Msg.LWWRegister proto)
{
switch (proto.TypeInfo.Type)
{
case ValType.Int:
{
return GenericLWWRegisterFromProto<int>(proto);
}
case ValType.Long:
{
return GenericLWWRegisterFromProto<long>(proto);
}
case ValType.String:
{
return GenericLWWRegisterFromProto<string>(proto);
}
case ValType.ActorRef:
{
return GenericLWWRegisterFromProto<IActorRef>(proto);
}
case ValType.Other:
{
// runtime type - enter horrible dynamic serialization stuff
var setContentType = Type.GetType(proto.TypeInfo.TypeName);
var setType = LWWRegisterMaker.MakeGenericMethod(setContentType);
return (ILWWRegister)setType.Invoke(this, new object[] { proto });
}
default:
throw new SerializationException($"Unknown ValType of [{proto.TypeInfo.Type}] detected while deserializing LWWRegister");
}
}
private static readonly MethodInfo LWWRegisterMaker =
typeof(ReplicatedDataSerializer).GetMethod(nameof(GenericLWWRegisterFromProto), BindingFlags.Instance | BindingFlags.NonPublic);
private LWWRegister<T> GenericLWWRegisterFromProto<T>(Proto.Msg.LWWRegister proto)
{
var msg = (T)_ser.OtherMessageFromProto(proto.State);
var updatedBy = _ser.UniqueAddressFromProto(proto.Node);
return new LWWRegister<T>(updatedBy, msg, proto.Timestamp);
}
#endregion
#region ORMap
private Proto.Msg.ORMap ToProto(IORDictionary ormap)
{
var protoMaker = ORDictProtoMaker.MakeGenericMethod(ormap.KeyType, ormap.ValueType);
return (Proto.Msg.ORMap)protoMaker.Invoke(this, new object[] { ormap });
}
private static readonly MethodInfo ORDictProtoMaker =
typeof(ReplicatedDataSerializer).GetMethod(nameof(ORDictToProto), BindingFlags.Instance | BindingFlags.NonPublic);
private Proto.Msg.ORMap ORDictToProto<TKey, TValue>(IORDictionary o) where TValue : IReplicatedData<TValue>
{
var ormap = (ORDictionary<TKey, TValue>)o;
var proto = new Proto.Msg.ORMap();
ToORMapEntries(ormap.Entries, proto);
proto.Keys = ToProto(ormap.KeySet);
proto.ValueTypeInfo = GetTypeDescriptor(typeof(TValue));
return proto;
}
private void ToORMapEntries<TKey, TValue>(IImmutableDictionary<TKey, TValue> ormapEntries, ORMap proto) where TValue : IReplicatedData<TValue>
{
var entries = new List<ORMap.Types.Entry>();
foreach (var e in ormapEntries)
{
var entry = new ORMap.Types.Entry();
switch (e.Key)
{
case int i:
entry.IntKey = i;
break;
case long l:
entry.LongKey = l;
break;
case string str:
entry.StringKey = str;
break;
default:
entry.OtherKey = _ser.OtherMessageToProto(e.Key);
break;
}
entry.Value = _ser.OtherMessageToProto(e.Value);
entries.Add(entry);
}
proto.Entries.Add(entries);
}
private static readonly MethodInfo ORDictMaker =
typeof(ReplicatedDataSerializer).GetMethod(nameof(GenericORDictionaryFromProto), BindingFlags.Instance | BindingFlags.NonPublic);
private IORDictionary ORDictionaryFromBinary(byte[] bytes)
{
var proto = Proto.Msg.ORMap.Parser.ParseFrom(bytes);
return ORDictionaryFromProto(proto);
}
private IORDictionary ORDictionaryFromProto(Proto.Msg.ORMap proto)
{
var keyType = GetTypeFromDescriptor(proto.Keys.TypeInfo);
var valueType = GetTypeFromDescriptor(proto.ValueTypeInfo);
var protoMaker = ORDictMaker.MakeGenericMethod(keyType, valueType);
return (IORDictionary)protoMaker.Invoke(this, new object[] { proto });
}
private IORDictionary GenericORDictionaryFromProto<TKey, TValue>(Proto.Msg.ORMap proto) where TValue : IReplicatedData<TValue>
{
var keys = FromProto(proto.Keys);
switch (proto.Keys.TypeInfo.Type)
{
case ValType.Int:
{
var entries = proto.Entries.ToImmutableDictionary(x => x.IntKey,
v => (TValue)_ser.OtherMessageFromProto(v.Value));
return new ORDictionary<int, TValue>((ORSet<int>)keys, entries);
}
case ValType.Long:
{
var entries = proto.Entries.ToImmutableDictionary(x => x.LongKey,
v => (TValue)_ser.OtherMessageFromProto(v.Value));
return new ORDictionary<long, TValue>((ORSet<long>)keys, entries);
}
case ValType.String:
{
var entries = proto.Entries.ToImmutableDictionary(x => x.StringKey,
v => (TValue)_ser.OtherMessageFromProto(v.Value));
return new ORDictionary<string, TValue>((ORSet<string>)keys, entries);
}
default:
{
var entries = proto.Entries.ToImmutableDictionary(x => (TKey)_ser.OtherMessageFromProto(x.OtherKey),
v => (TValue)_ser.OtherMessageFromProto(v.Value));
return new ORDictionary<TKey, TValue>((ORSet<TKey>)keys, entries);
}
}
}
private Proto.Msg.ORMapDeltaGroup ORDictionaryDeltasToProto(
List<ORDictionary.IDeltaOperation> deltaGroupOps)
{
var keyType = deltaGroupOps[0].KeyType;
var valueType = deltaGroupOps[0].ValueType;
var protoMaker = ORDeltaGroupProtoMaker.MakeGenericMethod(keyType, valueType);
return (Proto.Msg.ORMapDeltaGroup)protoMaker.Invoke(this, new object[] { deltaGroupOps });
}
private static readonly MethodInfo ORDeltaGroupProtoMaker =
typeof(ReplicatedDataSerializer).GetMethod(nameof(ORDictionaryDeltaGroupToProto), BindingFlags.Instance | BindingFlags.NonPublic);
private Proto.Msg.ORMapDeltaGroup ORDictionaryDeltaGroupToProto<TKey, TValue>(
List<ORDictionary.IDeltaOperation> deltaGroupOps) where TValue : IReplicatedData<TValue>
{
var group = new ORMapDeltaGroup();
group.KeyTypeInfo = GetTypeDescriptor(typeof(TKey));
group.ValueTypeInfo = GetTypeDescriptor(typeof(TValue));
ORMapDeltaGroup.Types.MapEntry CreateMapEntry(TKey key, object value = null)
{
var entry = new ORMapDeltaGroup.Types.MapEntry();
switch (key)
{
case int i:
entry.IntKey = i;
break;
case long l:
entry.LongKey = l;
break;
case string s:
entry.StringKey = s;
break;
default:
entry.OtherKey = _ser.OtherMessageToProto(key);
break;
}
if (value != null)
entry.Value = _ser.OtherMessageToProto(value);
return entry;
}
ORMapDeltaGroup.Types.Entry CreateEntry(ORDictionary<TKey, TValue>.IDeltaOperation op)
{
var entry = new ORMapDeltaGroup.Types.Entry();
switch (op)
{
case ORDictionary<TKey, TValue>.PutDeltaOperation putDelta:
entry.Operation = ORMapDeltaOp.OrmapPut;
entry.Underlying = ToProto(putDelta.Underlying.AsInstanceOf<ORSet.IDeltaOperation>()
.UnderlyingSerialization);
entry.EntryData.Add(CreateMapEntry(putDelta.Key, putDelta.Value));
break;
case ORDictionary<TKey, TValue>.UpdateDeltaOperation upDelta:
entry.Operation = ORMapDeltaOp.OrmapUpdate;
entry.Underlying = ToProto(upDelta.Underlying.AsInstanceOf<ORSet.IDeltaOperation>()
.UnderlyingSerialization);
entry.EntryData.AddRange(upDelta.Values.Select(x => CreateMapEntry(x.Key, x.Value)).ToList());
break;
case ORDictionary<TKey, TValue>.RemoveDeltaOperation removeDelta:
entry.Operation = ORMapDeltaOp.OrmapRemove;
entry.Underlying = ToProto(removeDelta.Underlying.AsInstanceOf<ORSet.IDeltaOperation>()
.UnderlyingSerialization);
break;
case ORDictionary<TKey, TValue>.RemoveKeyDeltaOperation removeKeyDelta:
entry.Operation = ORMapDeltaOp.OrmapRemoveKey;
entry.Underlying = ToProto(removeKeyDelta.Underlying.AsInstanceOf<ORSet.IDeltaOperation>()
.UnderlyingSerialization);
entry.EntryData.Add(CreateMapEntry(removeKeyDelta.Key));
break;
default:
throw new SerializationException($"Unknown ORDictionary delta type {op.GetType()}");
}
return entry;
}
group.Entries.Add(deltaGroupOps.Cast<ORDictionary<TKey, TValue>.IDeltaOperation>().Select(x => CreateEntry(x)).ToList());
return group;
}
private Proto.Msg.ORMapDeltaGroup ToProto(ORDictionary.IDeltaOperation op)
{
switch (op)
{
case ORDictionary.IPutDeltaOp p: return ORDictionaryPutToProto(p);
case ORDictionary.IRemoveDeltaOp r: return ORDictionaryRemoveToProto(r);
case ORDictionary.IRemoveKeyDeltaOp r: return ORDictionaryRemoveKeyToProto(r);
case ORDictionary.IUpdateDeltaOp u: return ORDictionaryUpdateToProto(u);
case ORDictionary.IDeltaGroupOp g: return ORDictionaryDeltasToProto(g.OperationsSerialization.ToList());
default:
throw new SerializationException($"Unrecognized delta operation [{op}]");
}
}
private Proto.Msg.ORMapDeltaGroup ORDictionaryPutToProto(ORDictionary.IPutDeltaOp op)
{
return ORDictionaryDeltasToProto(new List<ORDictionary.IDeltaOperation>() { op });
}
private Proto.Msg.ORMapDeltaGroup ORDictionaryRemoveToProto(ORDictionary.IRemoveDeltaOp op)
{
return ORDictionaryDeltasToProto(new List<ORDictionary.IDeltaOperation>() { op });
}