/
session.go
134 lines (116 loc) · 3.48 KB
/
session.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package session
import (
"context"
"encoding/json"
"fmt"
"io"
"time"
"github.com/mafredri/cdp"
"github.com/mafredri/cdp/internal/errors"
"github.com/mafredri/cdp/protocol/target"
"github.com/mafredri/cdp/rpcc"
)
// session represents a session connection to a target.
type session struct {
ID target.SessionID
TargetID target.ID
recvC chan []byte
send func([]byte) error
init chan struct{} // Protect conn from early read.
conn *rpcc.Conn
}
// Ensure that session implements rpcc.Codec.
var _ rpcc.Codec = (*session)(nil)
// WriteRequest implements rpcc.Codec.
func (s *session) WriteRequest(r *rpcc.Request) error {
data, err := json.Marshal(r)
if err != nil {
return err
}
return s.send(data)
}
// ReadResponse implements rpcc.Codec.
func (s *session) ReadResponse(r *rpcc.Response) error {
<-s.init
select {
case m := <-s.recvC:
return json.Unmarshal(m, r)
case <-s.conn.Context().Done():
return s.conn.Context().Err()
}
}
// Conn returns the underlying *rpcc.Conn that uses session as codec.
func (s *session) Conn() *rpcc.Conn { return s.conn }
// Write forwards a target message to the session connection.
// When write returns an error, the session is closed.
func (s *session) Write(data []byte) error {
select {
case s.recvC <- data:
return nil
case <-s.conn.Context().Done():
return s.conn.Context().Err()
}
}
// Close closes the underlying *rpcc.Conn.
func (s *session) Close() error {
return s.conn.Close()
}
var (
// We only handle Close on conn to detach the session. The codec
// handles the actual transport (Read / Write) in this case.
sessionDetachConn = func(detach func() error) rpcc.DialOption {
return rpcc.WithDialer(
func(_ context.Context, _ string) (io.ReadWriteCloser, error) {
return &closeConn{close: detach}, nil
},
)
}
sessionCodec = func(s *session) rpcc.DialOption {
return rpcc.WithCodec(func(_ io.ReadWriter) rpcc.Codec {
return s
})
}
)
// dial attaches to the target via the provided *cdp.Client and creates
// a lightweight RPC connection to the target. Communication is done via
// the underlying *rpcc.Conn for the provided *cdp.Client.
func dial(ctx context.Context, id target.ID, tc *cdp.Client, detachTimeout time.Duration) (s *session, err error) {
args := target.NewAttachToTargetArgs(id)
// The default of this flag will change to true, so until CDP
// uses flat session mode, we set this to false explicitly.
// See https://bugs.chromium.org/p/chromium/issues/detail?id=991325.
args.SetFlatten(false)
reply, err := tc.Target.AttachToTarget(ctx, args)
if err != nil {
return nil, err
}
s = &session{
TargetID: id,
ID: reply.SessionID,
recvC: make(chan []byte, 1),
init: make(chan struct{}),
send: func(data []byte) error {
<-s.init
// TODO(maf): Use async invocation.
return tc.Target.SendMessageToTarget(s.conn.Context(),
target.NewSendMessageToTargetArgs(string(data)).
SetSessionID(s.ID))
},
}
detach := func() error {
ctx, cancel := context.WithTimeout(context.Background(), detachTimeout)
defer cancel()
err := tc.Target.DetachFromTarget(ctx,
target.NewDetachFromTargetArgs().SetSessionID(s.ID))
if errors.Is(err, context.DeadlineExceeded) {
return fmt.Errorf("session: detach timed out for session %s", s.ID)
}
return errors.Wrapf(err, "session: detach failed for session %s", s.ID)
}
s.conn, err = rpcc.DialContext(ctx, "", sessionDetachConn(detach), sessionCodec(s))
if err != nil {
return nil, err
}
close(s.init)
return s, nil
}