forked from eoscanada/eos-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathproxy.go
124 lines (97 loc) · 2.48 KB
/
proxy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package p2p
import (
"fmt"
"github.com/eoscanada/eos-go"
"go.uber.org/zap"
)
type Proxy struct {
Peer1 *Peer
Peer2 *Peer
handlers []Handler
waitingOriginHandShake bool
waitingDestinationHandShake bool
}
func NewProxy(peer1 *Peer, peer2 *Peer) *Proxy {
return &Proxy{
Peer1: peer1,
Peer2: peer2,
}
}
func (p *Proxy) RegisterHandler(handler Handler) {
p.handlers = append(p.handlers, handler)
}
func (p *Proxy) RegisterHandlers(handlers []Handler) {
p.handlers = append(p.handlers, handlers...)
}
func (p *Proxy) read(sender *Peer, receiver *Peer, errChannel chan error) {
for {
//p2pLog.Debug("Waiting for packet")
packet, err := sender.Read()
//p2pLog.Debug("Received for packet")
if err != nil {
errChannel <- fmt.Errorf("read message from %s: %w", sender.Address, err)
return
}
err = p.handle(packet, sender, receiver)
if err != nil {
errChannel <- err
}
}
}
func (p *Proxy) handle(packet *eos.Packet, sender *Peer, receiver *Peer) error {
_, err := receiver.Write(packet.Raw)
if err != nil {
return fmt.Errorf("handleDefault: %w", err)
}
switch m := packet.P2PMessage.(type) {
case *eos.GoAwayMessage:
return fmt.Errorf("handling message: go away: reason [%d]", m.Reason)
}
envelope := NewEnvelope(sender, receiver, packet)
for _, handle := range p.handlers {
handle.Handle(envelope)
}
return nil
}
func triggerHandshake(peer *Peer) error {
return peer.SendHandshake(peer.handshakeInfo)
}
func (p *Proxy) ConnectAndStart() error {
zlog.Info("Connecting and starting proxy")
errorChannel := make(chan error)
peer1ReadyChannel := p.Peer1.Connect(errorChannel)
peer2ReadyChannel := p.Peer2.Connect(errorChannel)
peer1Ready := false
peer2Ready := false
for {
select {
case <-peer1ReadyChannel:
peer1Ready = true
case <-peer2ReadyChannel:
peer2Ready = true
case err := <-errorChannel:
return err
}
if peer1Ready && peer2Ready {
break
}
}
return p.Start()
}
func (p *Proxy) Start() error {
zlog.Info("Starting readers",
zap.String("peer1", p.Peer1.Address),
zap.String("peer1", p.Peer2.Address))
errorChannel := make(chan error)
go p.read(p.Peer1, p.Peer2, errorChannel)
go p.read(p.Peer2, p.Peer1, errorChannel)
if p.Peer2.handshakeInfo != nil {
err := triggerHandshake(p.Peer2)
if err != nil {
return fmt.Errorf("connect and start: trigger handshake: %w", err)
}
return nil
}
//p2pLog.Info("Started")
return <-errorChannel
}