Skip to content

Commit 0bd60e4

Browse files
committed
feat: first step of daemon mode (not ready)
[ci skip]
1 parent 7b1290a commit 0bd60e4

File tree

3 files changed

+91
-113
lines changed

3 files changed

+91
-113
lines changed

sqlmesh/cli/daemon_connector.py

Lines changed: 29 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ def _validate_lock_file(lock_file_path: Path) -> LockFile:
5353
return read_file
5454

5555

56+
import socket
57+
58+
5659
class DaemonConnector:
5760
"""Connects to the LSP daemon via socket to execute commands."""
5861

@@ -61,37 +64,25 @@ def __init__(self, project_path: Path, lock_file: LockFile):
6164
self.lock_file = lock_file
6265
self.renderer = JanitorStateRenderer()
6366

64-
def _open_connection(self) -> t.Any:
65-
"""Open connection (named pipe or Unix socket) for communication."""
66-
if self.lock_file.communication is None:
67-
raise ValueError("Lock file does not contain communication information")
68-
69-
comm_mode = self.lock_file.communication.type
70-
71-
if isinstance(comm_mode, DaemonCommunicationModeUnixSocket):
72-
socket_path = comm_mode.socket
73-
74-
# Check if it's a Unix domain socket or named pipe
75-
if socket_path.endswith(".sock"):
76-
# Unix domain socket
77-
import socket
78-
79-
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
80-
sock.connect(socket_path)
81-
82-
# Convert to file-like object
83-
import io
84-
85-
return io.BufferedRWPair(
86-
io.BufferedReader(io.FileIO(sock.fileno(), mode="r", closefd=False)),
87-
io.BufferedWriter(io.FileIO(sock.fileno(), mode="w", closefd=False)),
88-
)
89-
else:
90-
# Named pipe
91-
return open(socket_path, "r+b")
67+
def _open_connection(self) -> tuple[t.BinaryIO, t.BinaryIO]:
68+
lock_file = self.lock_file
69+
communication = lock_file.communication
70+
print(f"Using communication mode: {communication}")
71+
if communication is None:
72+
raise ValueError("not correct")
73+
74+
if isinstance(communication.type, DaemonCommunicationModeUnixSocket):
75+
print("Opening Unix socket connection...")
76+
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
77+
sock.connect(communication.type.socket)
78+
print(f"Connected to Unix socket at {communication.type.socket}")
79+
rfile = sock.makefile("rb", buffering=0)
80+
wfile = sock.makefile("wb", buffering=0)
81+
print("Connected to daemon via Unix socket.")
82+
return rfile, wfile
9283
else:
93-
raise ValueError(f"Only Unix socket communication is supported")
94-
84+
raise ValueError("Only Unix socket communication is supported")
85+
9586
def _send_jsonrpc_request(self, connection: t.Any, method: str, params: dict) -> str:
9687
"""Send a JSON-RPC request over the connection and return the request ID."""
9788
request_id = str(uuid.uuid4())
@@ -152,61 +143,45 @@ def _read_jsonrpc_response(self, connection: t.Any, expected_id: str) -> t.Any:
152143
return message.get("result", {})
153144

154145
def call_janitor(self, ignore_ttl: bool = False) -> bool:
155-
"""Call the janitor command through the LSP daemon."""
156-
connection = None
146+
rfile = wfile = None
157147
try:
158-
connection = self._open_connection()
148+
rfile, wfile = self._open_connection()
159149

160-
# Send the janitor request via JSON-RPC
161150
request = LSPCLICallRequest(
162151
arguments=["janitor"] + (["--ignore-ttl"] if ignore_ttl else [])
163152
)
153+
request_id = self._send_jsonrpc_request(wfile, "sqlmesh/cli/call", request.model_dump())
164154

165-
request_id = self._send_jsonrpc_request(
166-
connection, "sqlmesh/cli/call", request.model_dump()
167-
)
168-
169-
# Listen for notifications and the final response
170155
with self.renderer as renderer:
171156
while True:
172157
try:
173-
# Read any JSON-RPC message (response or notification)
174-
message_data = self._read_jsonrpc_message(connection)
175-
176-
# Check if it's the response to our request
158+
message_data = self._read_jsonrpc_message(rfile)
177159
if "id" in message_data and message_data["id"] == request_id:
178-
# This is the response to our request
179160
result = message_data.get("result", {})
180161
if result.get("state") == "finished":
181162
return True
182163
elif result.get("state") == "error":
183164
print(f"Error from daemon: {result.get('message', 'Unknown error')}")
184165
return False
185-
186-
# Check if it's a notification with updates
187166
elif message_data.get("method") == "sqlmesh/cli/update":
188167
params = message_data.get("params", {})
189168
if params.get("state") == "ongoing":
190-
# Parse the janitor state and render it
191169
message = params.get("message", {})
192170
if "state" in message:
193171
janitor_state = JanitorState.model_validate(message)
194172
renderer.render(janitor_state.state)
195-
196173
except Exception as stream_error:
197-
# If we can't read more messages, assume we're done
198174
print(f"Stream ended: {stream_error}")
199175
break
200-
201176
return True
202-
203177
except Exception as e:
204178
print(f"Failed to communicate with daemon: {e}")
205179
return False
206180
finally:
207-
if connection:
208-
connection.close()
209-
181+
try:
182+
if rfile: rfile.close()
183+
finally:
184+
if wfile: wfile.close()
210185

211186
def get_daemon_connector(project_path: Path) -> t.Optional[DaemonConnector]:
212187
"""Get a daemon connector if a valid lock file exists."""

sqlmesh/lsp/main.py

Lines changed: 40 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
from itertools import chain
55
import logging
6+
import socket
7+
import sys
68
import typing as t
79
from pathlib import Path
810
import urllib.parse
@@ -466,14 +468,16 @@ def _custom_cli_call(self, ls: LanguageServer, params: LSPCLICallRequest) -> Soc
466468
"""Handle CLI call requests from the daemon connector."""
467469
try:
468470
context = self._context_get_or_load()
469-
if not context or not hasattr(context, 'context'):
471+
if not context or not hasattr(context, "context"):
470472
return SocketMessage(state=SocketMessageError(message="No context available"))
471473

472-
arguments = params.argumens if hasattr(params, 'argumens') else params.arguments
474+
arguments = params.argumens if hasattr(params, "argumens") else params.arguments
473475

474476
# For now, only support janitor command
475477
if not arguments or arguments[0] != "janitor":
476-
return SocketMessage(state=SocketMessageError(message="Only 'janitor' command is supported"))
478+
return SocketMessage(
479+
state=SocketMessageError(message="Only 'janitor' command is supported")
480+
)
477481

478482
# Parse janitor arguments
479483
ignore_ttl = "--ignore-ttl" in arguments
@@ -490,10 +494,7 @@ def send_state(state: JanitorState):
490494
notification = {
491495
"jsonrpc": "2.0",
492496
"method": "sqlmesh/cli/update",
493-
"params": {
494-
"state": "ongoing",
495-
"message": state.state.model_dump()
496-
}
497+
"params": {"state": "ongoing", "message": state.state.model_dump()},
497498
}
498499

499500
# Send notification through the server's transport
@@ -503,7 +504,7 @@ def send_state(state: JanitorState):
503504
full_message = header.encode("utf-8") + message.encode("utf-8")
504505

505506
# Write directly to the server's output stream
506-
if hasattr(ls.lsp, '_writer') and ls.lsp._writer:
507+
if hasattr(ls.lsp, "_writer") and ls.lsp._writer:
507508
ls.lsp._writer.write(full_message)
508509
ls.lsp._writer.flush()
509510

@@ -1284,47 +1285,15 @@ def _uri_to_path(uri: str) -> Path:
12841285
"""Convert a URI to a path."""
12851286
return URI(uri).to_path()
12861287

1287-
def start(self, socket_path: t.Optional[str] = None) -> None:
1288+
def start(self, rfile: t.Optional[t.Any], wfile: t.Optional[t.Any]) -> None:
12881289
"""Start the server with Unix socket (for both VS Code and CLI)."""
12891290
logging.basicConfig(level=logging.DEBUG)
12901291

1291-
if socket_path:
1292-
import os
1293-
import socket
1294-
from pathlib import Path
1295-
1296-
# Write lock file with socket information
1297-
project_path = self.workspace_folders[0] if self.workspace_folders else Path.cwd()
1298-
lock_path = return_lock_file_path(project_path)
1299-
lock_path.parent.mkdir(parents=True, exist_ok=True)
1300-
1301-
lock_file = generate_lock_file()
1302-
lock_file.communication = DaemonCommunicationMode(
1303-
type=DaemonCommunicationModeUnixSocket(socket=socket_path)
1304-
)
1305-
lock_path.write_text(lock_file.model_dump_json(indent=2))
1306-
1307-
# Remove existing socket file if it exists
1308-
if os.path.exists(socket_path):
1309-
os.unlink(socket_path)
1310-
1311-
# Create and bind to the Unix domain socket
1312-
server_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1313-
server_sock.bind(socket_path)
1314-
server_sock.listen(1)
1315-
1316-
# Accept the first connection (from VS Code)
1317-
client_sock, _ = server_sock.accept()
1318-
1319-
# Convert socket to file-like objects for reading and writing
1320-
import io
1321-
reader = io.BufferedReader(io.FileIO(client_sock.fileno(), mode='r', closefd=False))
1322-
writer = io.BufferedWriter(io.FileIO(client_sock.fileno(), mode='w', closefd=False))
1323-
1324-
self.server.start_io(reader, writer)
1325-
else:
1326-
# Use standard I/O
1292+
if rfile is None or wfile is None:
13271293
self.server.start_io()
1294+
else:
1295+
self.server.start_io(rfile, wfile)
1296+
13281297

13291298

13301299
def loaded_sqlmesh_message(ls: LanguageServer) -> None:
@@ -1338,12 +1307,36 @@ def main() -> None:
13381307
import argparse
13391308

13401309
parser = argparse.ArgumentParser()
1341-
parser.add_argument('--socket', help='Unix socket path for communication', default=None)
1310+
parser.add_argument("--pipe", help="Unix socket path for communication", default=None)
13421311
args = parser.parse_args()
13431312

13441313
# Example instantiator that just uses the same signature as your original `Context` usage.
1314+
file = args.pipe
13451315
sqlmesh_server = SQLMeshLanguageServer(context_class=Context)
1346-
sqlmesh_server.start(socket_path=args.socket)
1316+
if args.pipe:
1317+
# Connect to the pipe the VS Code client created
1318+
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1319+
try:
1320+
sock.connect(args.pipe)
1321+
except OSError as e:
1322+
print(f"Failed to connect to pipe {args.pipe}: {e}", file=sys.stderr)
1323+
sys.exit(1)
1324+
1325+
# Wrap the socket as file-like binary streams for pygls
1326+
rfile = sock.makefile("rb", buffering=0)
1327+
wfile = sock.makefile("wb", buffering=0)
1328+
1329+
try:
1330+
sqlmesh_server.start(rfile, wfile)
1331+
finally:
1332+
try: rfile.close()
1333+
except: pass
1334+
try: wfile.close()
1335+
except: pass
1336+
try: sock.close()
1337+
except: pass
1338+
else:
1339+
sqlmesh_server.start()
13471340

13481341

13491342
if __name__ == "__main__":

vscode/extension/src/lsp/lsp.ts

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ import {
1717
} from '../utilities/errors'
1818
import { CustomLSPMethods } from './custom'
1919
import { resolveProjectPath } from '../utilities/config'
20+
import * as path from 'path'
21+
import * as os from 'os'
22+
import * as crypto from 'crypto'
2023

2124
type SupportedMethodsState =
2225
| { type: 'not-fetched' }
@@ -97,27 +100,20 @@ export class LSPClient implements Disposable {
97100

98101
const workspacePath = sqlmesh.value.workspacePath
99102

100-
// Create a unique socket path for this workspace
101-
const os = require('os')
102-
const path = require('path')
103-
const crypto = require('crypto')
104-
const tmpDir = os.tmpdir()
105-
const workspaceHash = crypto.createHash('md5').update(workspacePath).digest('hex').substring(0, 8)
106-
const socketPath = path.join(tmpDir, `sqlmesh_${workspaceHash}.sock`)
107-
108103
// Add --socket argument to the LSP server
109-
const argsWithSocket = [...sqlmesh.value.args, '--socket', socketPath]
110-
104+
const socketPath = socketAddressForWorkspace(workspacePath)
105+
const argsWithSocket = [...sqlmesh.value.args]
106+
// '--socket', socketPath]
111107
const serverOptions: ServerOptions = {
112108
run: {
113109
command: sqlmesh.value.bin,
114-
transport: TransportKind.ipc,
110+
transport: TransportKind.pipe,
115111
options: { cwd: workspacePath, env: sqlmesh.value.env },
116112
args: argsWithSocket,
117113
},
118114
debug: {
119115
command: sqlmesh.value.bin,
120-
transport: TransportKind.ipc,
116+
transport: TransportKind.pipe,
121117
options: { cwd: workspacePath, env: sqlmesh.value.env },
122118
args: argsWithSocket,
123119
},
@@ -279,3 +275,17 @@ export class LSPClient implements Disposable {
279275
}
280276
}
281277
}
278+
279+
function socketAddressForWorkspace(workspacePath: string) {
280+
const hash = crypto.createHash('md5').update(workspacePath).digest('hex').slice(0, 8)
281+
const base = `sqlmesh_${hash}`
282+
283+
if (process.platform === 'win32') {
284+
// Windows wants a Named Pipe path, not a filesystem .sock file
285+
// NOTE: double backslashes are required in JS strings
286+
return `\\\\.\\pipe\\${base}`
287+
}
288+
289+
// POSIX: use a short dir (tmp) to avoid the ~108 byte sun_path limit
290+
return path.join(os.tmpdir(), `${base}.sock`)
291+
}

0 commit comments

Comments
 (0)