Skip to content

Commit 8122ee7

Browse files
committed
ENH: add a camonitor -> kafka bridge
1 parent 18c0632 commit 8122ee7

File tree

2 files changed

+163
-7
lines changed

2 files changed

+163
-7
lines changed
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
"""
2+
caproto-monitor-to-kafka ...
3+
4+
It can equivalently be invoked as:
5+
6+
python3 -m nslsii.commandline.monitor2kafka ...
7+
8+
For access to the underlying functionality from a Python script or interactive
9+
Python session, do not import this module; instead import caproto.sync.client.
10+
"""
11+
import argparse
12+
from caproto.sync.client import subscribe, block
13+
from caproto import SubscriptionType, __version__
14+
import msgpack
15+
import msgpack_numpy as mpn
16+
from confluent_kafka import Producer
17+
18+
19+
def main():
20+
parser = argparse.ArgumentParser(
21+
description="Publish a PV monitor to a kafka topic.",
22+
epilog=f"caproto version {__version__}",
23+
)
24+
exit_group = parser.add_mutually_exclusive_group()
25+
parser.add_argument(
26+
"kafka_server", type=str, help="bootstrap server to connect to."
27+
)
28+
parser.add_argument("topic", type=str, help="The topic to publish to.")
29+
30+
parser.add_argument("pv_names", type=str, nargs="+", help="PV (channel) name")
31+
32+
parser.add_argument(
33+
"--verbose",
34+
"-v",
35+
action="count",
36+
help="Show more log messages. (Use -vvv for even more.)",
37+
)
38+
exit_group.add_argument(
39+
"--duration",
40+
type=float,
41+
default=None,
42+
help=(
43+
"Maximum number seconds to run before "
44+
"exiting. Runs indefinitely by default."
45+
),
46+
)
47+
exit_group.add_argument(
48+
"--maximum",
49+
type=int,
50+
default=None,
51+
help=(
52+
"Maximum number of monitor events to "
53+
"process exiting. Unlimited by "
54+
"default."
55+
),
56+
)
57+
parser.add_argument(
58+
"--timeout",
59+
"-w",
60+
type=float,
61+
default=1,
62+
help=("Timeout ('wait') in seconds for server " "responses."),
63+
)
64+
parser.add_argument(
65+
"-m",
66+
type=str,
67+
metavar="MASK",
68+
default="va",
69+
help=(
70+
"Channel Access mask. Any combination of "
71+
"'v' (value), 'a' (alarm), 'l' (log/archive), "
72+
"'p' (property). Default is 'va'."
73+
),
74+
)
75+
parser.add_argument(
76+
"--priority",
77+
"-p",
78+
type=int,
79+
default=0,
80+
help="Channel Access Virtual Circuit priority. " "Lowest is 0; highest is 99.",
81+
)
82+
parser.add_argument(
83+
"-n",
84+
action="store_true",
85+
help=("Retrieve enums as integers (default is " "strings)."),
86+
)
87+
parser.add_argument(
88+
"--no-repeater",
89+
action="store_true",
90+
help=("Do not spawn a Channel Access repeater daemon " "process."),
91+
)
92+
93+
args = parser.parse_args()
94+
95+
mask = 0
96+
if "v" in args.m:
97+
mask |= SubscriptionType.DBE_VALUE
98+
if "a" in args.m:
99+
mask |= SubscriptionType.DBE_ALARM
100+
if "l" in args.m:
101+
mask |= SubscriptionType.DBE_LOG
102+
if "p" in args.m:
103+
mask |= SubscriptionType.DBE_PROPERTY
104+
105+
tokens = {"callback_count": 0}
106+
producer = Producer({"bootstrap.servers": args.kafka_server})
107+
108+
def callback(sub, response):
109+
tokens["callback_count"] += 1
110+
payload = {
111+
"pvname": sub.pv_name,
112+
**{k: getattr(response, k) for k in ("data", "data_count", "data_type")},
113+
"medatadata": {
114+
k: getattr(response.metadata, k)
115+
for k in ("status", "severity", "timestamp")
116+
},
117+
}
118+
msg = msgpack.dumps(payload, default=mpn.encode)
119+
producer.produce(topic=args.topic, key=sub.pv_name, value=msg)
120+
121+
if args.maximum is not None:
122+
if tokens["callback_count"] >= args.maximum:
123+
raise KeyboardInterrupt()
124+
125+
try:
126+
subs = []
127+
cbs = []
128+
for pv_name in args.pv_names:
129+
sub = subscribe(pv_name, mask=mask, priority=args.priority)
130+
sub.add_callback(callback)
131+
cbs.append(callback) # Hold ref to keep it from being garbage collected.
132+
subs.append(sub)
133+
# Wait to be interrupted by KeyboardInterrupt.
134+
block(
135+
*subs,
136+
duration=args.duration,
137+
timeout=args.timeout,
138+
force_int_enums=args.n,
139+
repeater=not args.no_repeater,
140+
)
141+
except BaseException as exc:
142+
if args.verbose:
143+
# Show the full traceback.
144+
raise
145+
else:
146+
# Print a one-line error message.
147+
print(exc)
148+
149+
150+
if __name__ == "__main__":
151+
main()

setup.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from __future__ import (absolute_import, division, print_function)
1+
from __future__ import absolute_import, division, print_function
22

33
import versioneer
44
import setuptools
@@ -10,21 +10,26 @@
1010
here = path.abspath(path.dirname(__file__))
1111

1212
# Get the long description from the README file
13-
with open(path.join(here, 'README.md'), encoding='utf-8') as f:
13+
with open(path.join(here, "README.md"), encoding="utf-8") as f:
1414
long_description = f.read()
1515

16-
with open(path.join(here, 'requirements.txt')) as f:
16+
with open(path.join(here, "requirements.txt")) as f:
1717
requirements = f.read().splitlines()
1818

1919
setuptools.setup(
20-
name='nslsii',
20+
name="nslsii",
2121
version=versioneer.get_version(),
2222
cmdclass=versioneer.get_cmdclass(),
2323
license="BSD (3-clause)",
2424
packages=setuptools.find_packages(),
2525
long_description=long_description,
26-
long_description_content_type='text/markdown',
27-
description='Tools for data collection and analysis at NSLS-II',
28-
author='Brookhaven National Laboratory',
26+
long_description_content_type="text/markdown",
27+
description="Tools for data collection and analysis at NSLS-II",
28+
author="Brookhaven National Laboratory",
2929
install_requires=requirements,
30+
entry_points={
31+
"console_scripts": [
32+
"monitor-to-kafka = nslsii.commandline.monitor2kafka:main",
33+
],
34+
},
3035
)

0 commit comments

Comments
 (0)