Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add streaming function #38

Merged
merged 8 commits into from
Mar 17, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 12 additions & 11 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,28 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
go: ['1.19', '1.20', '1.21']
go: ['1.20', '1.21', '1.22']
name: ${{ matrix.os }} @ Go ${{ matrix.go }}
runs-on: ${{ matrix.os }}
steps:
- name: Checkout
uses: actions/checkout@v4
with:
path: ${{ env.WORKSPACE }}

- name: Set up Go ${{ matrix.go }}
uses: actions/setup-go@v3
uses: actions/setup-go@v5
with:
go-version: ${{ matrix.go }}

- name: Cache
uses: actions/cache@v3
uses: actions/cache@v4
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-

- name: Checkout
uses: actions/checkout@v3
with:
path: ${{ env.WORKSPACE }}

- name: Build
run: go build -v ./...
Expand All @@ -43,10 +44,10 @@ jobs:
run: go test -v --coverpkg=github.com/shamaton/msgpack/... --coverprofile=coverage.coverprofile --covermode=atomic ./...

- name: Upload coverage to Codecov
if: success() && matrix.go == '1.21' && matrix.os == 'ubuntu-latest'
uses: codecov/codecov-action@v1
if: success() && matrix.go == '1.22' && matrix.os == 'ubuntu-latest'
uses: codecov/codecov-action@v4
with:
token:
token: ${{ secrets.CODECOV_TOKEN }}
fail_ci_if_error: false
working-directory: ${{ env.WORKSPACE }}

Expand All @@ -55,7 +56,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
with:
path: ${{ env.WORKSPACE }}
- name: golangci-lint
Expand Down
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
MIT License

Copyright (c) 2018 Masayuki Shamoto
Copyright (c) 2024 Masayuki Shamoto

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
25 changes: 18 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ You can get the fastest performance with [msgpackgen](https://github.com/shamato
* Supports extend encoder / decoder
* Can also Encoding / Decoding struct as array

This package requires more than version **1.13**

## Installation

Current version is **msgpack/v2**.
Expand All @@ -32,24 +30,37 @@ package main

import (
"github.com/shamaton/msgpack/v2"
"net/http"
)

type Struct struct {
String string
}

// simple
func main() {
type Struct struct {
String string
}
v := Struct{String: "msgpack"}

d, err := msgpack.Marshal(v)
if err != nil {
panic(err)
}
r := Struct{}
err = msgpack.Unmarshal(d, &r)
if err != nil {
if err = msgpack.Unmarshal(d, &r); err != nil {
panic(err)
}
}

// streaming
func handle(w http.ResponseWriter, r *http.Request) {
var body Struct
if err := msgpack.UnmarshalRead(r, &body); err != nil {
panic(err)
}
if err := msgpack.MarshalWrite(w, body); err != nil {
panic(err)
}
}
```

## Benchmark
Expand Down
19 changes: 18 additions & 1 deletion decode.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package msgpack

import "github.com/shamaton/msgpack/v2/internal/decoding"
import (
"io"

"github.com/shamaton/msgpack/v2/internal/decoding"
streamdecoding "github.com/shamaton/msgpack/v2/internal/stream/decoding"
)

// UnmarshalAsMap decodes data that is encoded as map format.
// This is the same thing that StructAsArray sets false.
Expand All @@ -13,3 +18,15 @@
func UnmarshalAsArray(data []byte, v interface{}) error {
return decoding.Decode(data, v, true)
}

// UnmarshalReadAsMap decodes from stream. stream data expects map format.
// This is the same thing that StructAsArray sets false.
func UnmarshalReadAsMap(r io.Reader, v interface{}) error {
return streamdecoding.Decode(r, v, false)

Check warning on line 25 in decode.go

View check run for this annotation

Codecov / codecov/patch

decode.go#L24-L25

Added lines #L24 - L25 were not covered by tests
}

// UnmarshalReadAsArray decodes from stream. stream data expects array format.
// This is the same thing that StructAsArray sets true.
func UnmarshalReadAsArray(r io.Reader, v interface{}) error {
return streamdecoding.Decode(r, v, true)

Check warning on line 31 in decode.go

View check run for this annotation

Codecov / codecov/patch

decode.go#L30-L31

Added lines #L30 - L31 were not covered by tests
}
15 changes: 15 additions & 0 deletions encode.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package msgpack

import (
"io"

"github.com/shamaton/msgpack/v2/internal/encoding"
streamencoding "github.com/shamaton/msgpack/v2/internal/stream/encoding"
)

// MarshalAsMap encodes data as map format.
Expand All @@ -15,3 +18,15 @@
func MarshalAsArray(v interface{}) ([]byte, error) {
return encoding.Encode(v, true)
}

// MarshalWriteAsMap writes map format encoded data to writer.
// This is the same thing that StructAsArray sets false.
func MarshalWriteAsMap(w io.Writer, v interface{}) error {
return streamencoding.Encode(w, v, false)

Check warning on line 25 in encode.go

View check run for this annotation

Codecov / codecov/patch

encode.go#L24-L25

Added lines #L24 - L25 were not covered by tests
}

// MarshalWriteAsArray writes array format encoded data to writer.
// This is the same thing that StructAsArray sets true.
func MarshalWriteAsArray(w io.Writer, v interface{}) error {
return streamencoding.Encode(w, v, true)

Check warning on line 31 in encode.go

View check run for this annotation

Codecov / codecov/patch

encode.go#L30-L31

Added lines #L30 - L31 were not covered by tests
}
60 changes: 60 additions & 0 deletions ext/decoder_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package ext

import (
"github.com/shamaton/msgpack/v2/internal/common"
"io"
"reflect"
)

var emptyBytes []byte

type StreamDecoder interface {
Code() int8
IsType(code byte, innerType int8, dataLength int) bool
ToValue(code byte, data []byte, k reflect.Kind) (any, error)
}

type DecoderStreamCommon struct {
}

func (d *DecoderStreamCommon) ReadSize1(r io.Reader, buf *common.Buffer) (byte, error) {
if _, err := r.Read(buf.B1); err != nil {
return 0, err

Check warning on line 22 in ext/decoder_stream.go

View check run for this annotation

Codecov / codecov/patch

ext/decoder_stream.go#L20-L22

Added lines #L20 - L22 were not covered by tests
}
return buf.B1[0], nil

Check warning on line 24 in ext/decoder_stream.go

View check run for this annotation

Codecov / codecov/patch

ext/decoder_stream.go#L24

Added line #L24 was not covered by tests
}

func (d *DecoderStreamCommon) ReadSize2(r io.Reader, buf *common.Buffer) ([]byte, error) {
if _, err := r.Read(buf.B2); err != nil {
return emptyBytes, err

Check warning on line 29 in ext/decoder_stream.go

View check run for this annotation

Codecov / codecov/patch

ext/decoder_stream.go#L27-L29

Added lines #L27 - L29 were not covered by tests
}
return buf.B2, nil

Check warning on line 31 in ext/decoder_stream.go

View check run for this annotation

Codecov / codecov/patch

ext/decoder_stream.go#L31

Added line #L31 was not covered by tests
}

func (d *DecoderStreamCommon) ReadSize4(r io.Reader, buf *common.Buffer) ([]byte, error) {
if _, err := r.Read(buf.B4); err != nil {
return emptyBytes, err

Check warning on line 36 in ext/decoder_stream.go

View check run for this annotation

Codecov / codecov/patch

ext/decoder_stream.go#L34-L36

Added lines #L34 - L36 were not covered by tests
}
return buf.B4, nil

Check warning on line 38 in ext/decoder_stream.go

View check run for this annotation

Codecov / codecov/patch

ext/decoder_stream.go#L38

Added line #L38 was not covered by tests
}

func (d *DecoderStreamCommon) ReadSize8(r io.Reader, buf *common.Buffer) ([]byte, error) {
if _, err := r.Read(buf.B8); err != nil {
return emptyBytes, err

Check warning on line 43 in ext/decoder_stream.go

View check run for this annotation

Codecov / codecov/patch

ext/decoder_stream.go#L41-L43

Added lines #L41 - L43 were not covered by tests
}
return buf.B8, nil

Check warning on line 45 in ext/decoder_stream.go

View check run for this annotation

Codecov / codecov/patch

ext/decoder_stream.go#L45

Added line #L45 was not covered by tests
}

func (d *DecoderStreamCommon) ReadSizeN(r io.Reader, buf *common.Buffer, n int) ([]byte, error) {
var b []byte
if len(buf.Data) <= n {
b = buf.Data[:n]
} else {
buf.Data = append(buf.Data, make([]byte, n-len(buf.Data))...)
b = buf.Data

Check warning on line 54 in ext/decoder_stream.go

View check run for this annotation

Codecov / codecov/patch

ext/decoder_stream.go#L48-L54

Added lines #L48 - L54 were not covered by tests
}
if _, err := r.Read(b); err != nil {
return emptyBytes, err

Check warning on line 57 in ext/decoder_stream.go

View check run for this annotation

Codecov / codecov/patch

ext/decoder_stream.go#L56-L57

Added lines #L56 - L57 were not covered by tests
}
return b, nil

Check warning on line 59 in ext/decoder_stream.go

View check run for this annotation

Codecov / codecov/patch

ext/decoder_stream.go#L59

Added line #L59 was not covered by tests
}
120 changes: 120 additions & 0 deletions ext/encode_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package ext

import (
"github.com/shamaton/msgpack/v2/internal/common"
"io"
"reflect"
)

type StreamEncoder interface {
Code() int8
Type() reflect.Type
Write(w io.Writer, value reflect.Value, buf *common.Buffer) error
}

type StreamEncoderCommon struct{}

func (c *StreamEncoderCommon) WriteByte1Int64(w io.Writer, value int64, buf *common.Buffer) error {
return buf.Write(w,
byte(value),
)
}

func (c *StreamEncoderCommon) WriteByte2Int64(w io.Writer, value int64, buf *common.Buffer) error {
return buf.Write(w,
byte(value>>8),
byte(value),
)
}

func (c *StreamEncoderCommon) WriteByte4Int64(w io.Writer, value int64, buf *common.Buffer) error {
return buf.Write(w,
byte(value>>24),
byte(value>>16),
byte(value>>8),
byte(value),
)
}

func (c *StreamEncoderCommon) WriteByte8Int64(w io.Writer, value int64, buf *common.Buffer) error {
return buf.Write(w,
byte(value>>56),
byte(value>>48),
byte(value>>40),
byte(value>>32),
byte(value>>24),
byte(value>>16),
byte(value>>8),
byte(value),
)
}

func (c *StreamEncoderCommon) WriteByte1Uint64(w io.Writer, value uint64, buf *common.Buffer) error {
return buf.Write(w,
byte(value),
)
}

func (c *StreamEncoderCommon) WriteByte2Uint64(w io.Writer, value uint64, buf *common.Buffer) error {
return buf.Write(w,
byte(value>>8),
byte(value),
)
}

func (c *StreamEncoderCommon) WriteByte4Uint64(w io.Writer, value uint64, buf *common.Buffer) error {
return buf.Write(w,
byte(value>>24),
byte(value>>16),
byte(value>>8),
byte(value),
)
}

func (c *StreamEncoderCommon) WriteByte8Uint64(w io.Writer, value uint64, buf *common.Buffer) error {
return buf.Write(w,
byte(value>>56),
byte(value>>48),
byte(value>>40),
byte(value>>32),
byte(value>>24),
byte(value>>16),
byte(value>>8),
byte(value),
)
}

func (c *StreamEncoderCommon) WriteByte1Int(w io.Writer, value int, buf *common.Buffer) error {
return buf.Write(w,
byte(value),
)
}

func (c *StreamEncoderCommon) WriteByte2Int(w io.Writer, value int, buf *common.Buffer) error {
return buf.Write(w,
byte(value>>8),
byte(value),
)
}

func (c *StreamEncoderCommon) WriteByte4Int(w io.Writer, value int, buf *common.Buffer) error {
return buf.Write(w,
byte(value>>24),
byte(value>>16),
byte(value>>8),
byte(value),
)
}

func (c *StreamEncoderCommon) WriteByte4Uint32(w io.Writer, value uint32, buf *common.Buffer) error {
return buf.Write(w,
byte(value>>24),
byte(value>>16),
byte(value>>8),
byte(value),
)
}

func (c *StreamEncoderCommon) WriteBytes(w io.Writer, bs []byte, buf *common.Buffer) error {
return buf.Write(w, bs...)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module github.com/shamaton/msgpack/v2

go 1.15
go 1.20