Skip to content

Commit 7b1290a

Browse files
committed
feat: first draft of a daemon
1 parent d15446c commit 7b1290a

File tree

7 files changed

+714
-80
lines changed

7 files changed

+714
-80
lines changed

sqlmesh/cli/daemon_connector.py

Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
import json
2+
import typing as t
3+
import uuid
4+
from pathlib import Path
5+
6+
from sqlmesh.core.console import JanitorState, JanitorStateRenderer
7+
from sqlmesh.lsp.cli_calls import (
8+
DaemonCommunicationModeTCP,
9+
DaemonCommunicationModeUnixSocket,
10+
LockFile,
11+
generate_lock_file,
12+
return_lock_file_path,
13+
)
14+
from sqlmesh.utils.pydantic import PydanticModel
15+
16+
17+
class LSPCLICallRequest(PydanticModel):
18+
"""Request to call a CLI command through the LSP."""
19+
20+
arguments: t.List[str]
21+
22+
23+
class SocketMessageFinished(PydanticModel):
24+
state: t.Literal["finished"] = "finished"
25+
26+
27+
class SocketMessageOngoing(PydanticModel):
28+
state: t.Literal["ongoing"] = "ongoing"
29+
message: t.Dict[str, t.Any]
30+
31+
32+
class SocketMessageError(PydanticModel):
33+
state: t.Literal["error"] = "error"
34+
message: str
35+
36+
37+
SocketMessage = t.Union[SocketMessageFinished, SocketMessageOngoing, SocketMessageError]
38+
39+
40+
def _validate_lock_file(lock_file_path: Path) -> LockFile:
41+
"""Validate that the lock file is compatible with current version."""
42+
current_lock = generate_lock_file()
43+
try:
44+
read_file = LockFile.model_validate_json(lock_file_path.read_text())
45+
except Exception as e:
46+
raise ValueError(f"Failed to parse lock file: {e}")
47+
48+
if not read_file.validate_lock_file(current_lock):
49+
raise ValueError(
50+
f"Lock file version mismatch. Expected: {current_lock.version}, "
51+
f"Got: {read_file.version}"
52+
)
53+
return read_file
54+
55+
56+
class DaemonConnector:
57+
"""Connects to the LSP daemon via socket to execute commands."""
58+
59+
def __init__(self, project_path: Path, lock_file: LockFile):
60+
self.project_path = project_path
61+
self.lock_file = lock_file
62+
self.renderer = JanitorStateRenderer()
63+
64+
def _open_connection(self) -> t.Any:
65+
"""Open connection (named pipe or Unix socket) for communication."""
66+
if self.lock_file.communication is None:
67+
raise ValueError("Lock file does not contain communication information")
68+
69+
comm_mode = self.lock_file.communication.type
70+
71+
if isinstance(comm_mode, DaemonCommunicationModeUnixSocket):
72+
socket_path = comm_mode.socket
73+
74+
# Check if it's a Unix domain socket or named pipe
75+
if socket_path.endswith(".sock"):
76+
# Unix domain socket
77+
import socket
78+
79+
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
80+
sock.connect(socket_path)
81+
82+
# Convert to file-like object
83+
import io
84+
85+
return io.BufferedRWPair(
86+
io.BufferedReader(io.FileIO(sock.fileno(), mode="r", closefd=False)),
87+
io.BufferedWriter(io.FileIO(sock.fileno(), mode="w", closefd=False)),
88+
)
89+
else:
90+
# Named pipe
91+
return open(socket_path, "r+b")
92+
else:
93+
raise ValueError(f"Only Unix socket communication is supported")
94+
95+
def _send_jsonrpc_request(self, connection: t.Any, method: str, params: dict) -> str:
96+
"""Send a JSON-RPC request over the connection and return the request ID."""
97+
request_id = str(uuid.uuid4())
98+
jsonrpc_request = {"jsonrpc": "2.0", "method": method, "params": params, "id": request_id}
99+
100+
# JSON-RPC over connection uses Content-Length header (LSP protocol style)
101+
message = json.dumps(jsonrpc_request)
102+
content_length = len(message.encode("utf-8"))
103+
104+
# Send with Content-Length header
105+
header = f"Content-Length: {content_length}\r\n\r\n"
106+
full_message = header.encode("utf-8") + message.encode("utf-8")
107+
connection.write(full_message)
108+
connection.flush()
109+
110+
return request_id
111+
112+
def _read_jsonrpc_message(self, connection: t.Any) -> t.Dict[str, t.Any]:
113+
"""Read any JSON-RPC message (response or notification) from the connection."""
114+
# Read headers
115+
headers = b""
116+
while b"\r\n\r\n" not in headers:
117+
chunk = connection.read(1)
118+
if not chunk:
119+
raise ValueError("Connection closed while reading headers")
120+
headers += chunk
121+
122+
# Parse Content-Length header
123+
header_str = headers.decode("utf-8")
124+
content_length = None
125+
for line in header_str.split("\r\n"):
126+
if line.startswith("Content-Length:"):
127+
content_length = int(line.split(":")[1].strip())
128+
break
129+
130+
if content_length is None:
131+
raise ValueError("No Content-Length header found")
132+
133+
# Read the content
134+
content = connection.read(content_length)
135+
if len(content) < content_length:
136+
raise ValueError("Connection closed while reading content")
137+
138+
# Parse JSON-RPC message
139+
message = json.loads(content.decode("utf-8"))
140+
return message
141+
142+
def _read_jsonrpc_response(self, connection: t.Any, expected_id: str) -> t.Any:
143+
"""Read a JSON-RPC response from the connection."""
144+
message = self._read_jsonrpc_message(connection)
145+
146+
if message.get("id") != expected_id:
147+
raise ValueError(f"Unexpected response ID: {message.get('id')}")
148+
149+
if "error" in message:
150+
raise ValueError(f"JSON-RPC error: {message['error']}")
151+
152+
return message.get("result", {})
153+
154+
def call_janitor(self, ignore_ttl: bool = False) -> bool:
155+
"""Call the janitor command through the LSP daemon."""
156+
connection = None
157+
try:
158+
connection = self._open_connection()
159+
160+
# Send the janitor request via JSON-RPC
161+
request = LSPCLICallRequest(
162+
arguments=["janitor"] + (["--ignore-ttl"] if ignore_ttl else [])
163+
)
164+
165+
request_id = self._send_jsonrpc_request(
166+
connection, "sqlmesh/cli/call", request.model_dump()
167+
)
168+
169+
# Listen for notifications and the final response
170+
with self.renderer as renderer:
171+
while True:
172+
try:
173+
# Read any JSON-RPC message (response or notification)
174+
message_data = self._read_jsonrpc_message(connection)
175+
176+
# Check if it's the response to our request
177+
if "id" in message_data and message_data["id"] == request_id:
178+
# This is the response to our request
179+
result = message_data.get("result", {})
180+
if result.get("state") == "finished":
181+
return True
182+
elif result.get("state") == "error":
183+
print(f"Error from daemon: {result.get('message', 'Unknown error')}")
184+
return False
185+
186+
# Check if it's a notification with updates
187+
elif message_data.get("method") == "sqlmesh/cli/update":
188+
params = message_data.get("params", {})
189+
if params.get("state") == "ongoing":
190+
# Parse the janitor state and render it
191+
message = params.get("message", {})
192+
if "state" in message:
193+
janitor_state = JanitorState.model_validate(message)
194+
renderer.render(janitor_state.state)
195+
196+
except Exception as stream_error:
197+
# If we can't read more messages, assume we're done
198+
print(f"Stream ended: {stream_error}")
199+
break
200+
201+
return True
202+
203+
except Exception as e:
204+
print(f"Failed to communicate with daemon: {e}")
205+
return False
206+
finally:
207+
if connection:
208+
connection.close()
209+
210+
211+
def get_daemon_connector(project_path: Path) -> t.Optional[DaemonConnector]:
212+
"""Get a daemon connector if a valid lock file exists."""
213+
lock_path = return_lock_file_path(project_path)
214+
215+
if not lock_path.exists():
216+
return None
217+
218+
try:
219+
# Validate the lock file
220+
lock_file = _validate_lock_file(lock_path)
221+
222+
# Check if communication info is present
223+
if lock_file.communication is None:
224+
return None
225+
226+
return DaemonConnector(project_path, lock_file)
227+
except Exception as e:
228+
# Log the error but don't fail - fall back to direct execution
229+
print(f"Warning: Could not connect to daemon: {e}")
230+
return None

sqlmesh/cli/main.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from sqlmesh.utils import Verbosity
2525
from sqlmesh.utils.date import TimeLike
2626
from sqlmesh.utils.errors import MissingDependencyError, SQLMeshError
27+
from sqlmesh.cli.daemon_connector import get_daemon_connector
2728

2829
logger = logging.getLogger(__name__)
2930

@@ -640,7 +641,16 @@ def janitor(ctx: click.Context, ignore_ttl: bool, **kwargs: t.Any) -> None:
640641
641642
The janitor cleans up old environments and expired snapshots.
642643
"""
643-
ctx.obj.run_janitor(ignore_ttl, **kwargs)
644+
project_path = Path.cwd()
645+
daemon = get_daemon_connector(project_path)
646+
if daemon:
647+
print("Connecting to SQLMesh daemon...")
648+
success = daemon.call_janitor(ignore_ttl)
649+
print("Janitor completed, with success:", success)
650+
else:
651+
# No daemon available, run directly
652+
# ctx.obj.run_janitor(ignore_ttl, **kwargs)
653+
raise click.ClickException("no socket found")
644654

645655

646656
@cli.command("destroy")

0 commit comments

Comments
 (0)