@@ -16,9 +16,16 @@ import (
16
16
"github.com/lightninglabs/loop"
17
17
"github.com/lightninglabs/loop/lndclient"
18
18
"github.com/lightninglabs/loop/looprpc"
19
+ "github.com/lightningnetwork/lnd/lntypes"
19
20
"google.golang.org/grpc"
20
21
)
21
22
23
+ var (
24
+ // maxMsgRecvSize is the largest message our REST proxy will receive. We
25
+ // set this to 200MiB atm.
26
+ maxMsgRecvSize = grpc .MaxCallRecvMsgSize (1 * 1024 * 1024 * 200 )
27
+ )
28
+
22
29
// listenerCfg holds closures used to retrieve listeners for the gRPC services.
23
30
type listenerCfg struct {
24
31
// grpcListener returns a listener to use for the gRPC server.
@@ -68,14 +75,18 @@ func daemon(config *config, lisCfg *listenerCfg) error {
68
75
return err
69
76
}
70
77
78
+ swaps := make (map [lntypes.Hash ]loop.SwapInfo )
71
79
for _ , s := range swapsList {
72
80
swaps [s .SwapHash ] = * s
73
81
}
74
82
75
83
// Instantiate the loopd gRPC server.
76
84
server := swapClientServer {
77
- impl : swapClient ,
78
- lnd : & lnd .LndServices ,
85
+ impl : swapClient ,
86
+ lnd : & lnd .LndServices ,
87
+ swaps : swaps ,
88
+ subscribers : make (map [int ]chan <- interface {}),
89
+ statusChan : make (chan loop.SwapInfo ),
79
90
}
80
91
81
92
serverOpts := []grpc.ServerOption {}
@@ -92,12 +103,26 @@ func daemon(config *config, lisCfg *listenerCfg) error {
92
103
}
93
104
defer grpcListener .Close ()
94
105
106
+ // The default JSON marshaler of the REST proxy only sets OrigName to
107
+ // true, which instructs it to use the same field names as specified in
108
+ // the proto file and not switch to camel case. What we also want is
109
+ // that the marshaler prints all values, even if they are falsey.
110
+ customMarshalerOption := proxy .WithMarshalerOption (
111
+ proxy .MIMEWildcard , & proxy.JSONPb {
112
+ OrigName : true ,
113
+ EmitDefaults : true ,
114
+ },
115
+ )
116
+
95
117
// We'll also create and start an accompanying proxy to serve clients
96
118
// through REST.
97
119
ctx , cancel := context .WithCancel (context .Background ())
98
120
defer cancel ()
99
- mux := proxy .NewServeMux ()
100
- proxyOpts := []grpc.DialOption {grpc .WithInsecure ()}
121
+ mux := proxy .NewServeMux (customMarshalerOption )
122
+ proxyOpts := []grpc.DialOption {
123
+ grpc .WithInsecure (),
124
+ grpc .WithDefaultCallOptions (maxMsgRecvSize ),
125
+ }
101
126
err = looprpc .RegisterSwapClientHandlerFromEndpoint (
102
127
ctx , mux , config .RPCListen , proxyOpts ,
103
128
)
@@ -130,8 +155,6 @@ func daemon(config *config, lisCfg *listenerCfg) error {
130
155
log .Infof ("REST proxy disabled" )
131
156
}
132
157
133
- statusChan := make (chan loop.SwapInfo )
134
-
135
158
mainCtx , cancel := context .WithCancel (context .Background ())
136
159
var wg sync.WaitGroup
137
160
@@ -141,7 +164,7 @@ func daemon(config *config, lisCfg *listenerCfg) error {
141
164
defer wg .Done ()
142
165
143
166
log .Infof ("Starting swap client" )
144
- err := swapClient .Run (mainCtx , statusChan )
167
+ err := swapClient .Run (mainCtx , server . statusChan )
145
168
if err != nil {
146
169
log .Error (err )
147
170
}
@@ -159,25 +182,7 @@ func daemon(config *config, lisCfg *listenerCfg) error {
159
182
defer wg .Done ()
160
183
161
184
log .Infof ("Waiting for updates" )
162
- for {
163
- select {
164
- case swap := <- statusChan :
165
- swapsLock .Lock ()
166
- swaps [swap .SwapHash ] = swap
167
-
168
- for _ , subscriber := range subscribers {
169
- select {
170
- case subscriber <- swap :
171
- case <- mainCtx .Done ():
172
- return
173
- }
174
- }
175
-
176
- swapsLock .Unlock ()
177
- case <- mainCtx .Done ():
178
- return
179
- }
180
- }
185
+ server .processStatusUpdates (mainCtx )
181
186
}()
182
187
183
188
// Start the grpc server.
0 commit comments