Skip to content
This repository was archived by the owner on Jun 26, 2025. It is now read-only.

Conversation

@rand00
Copy link

@rand00 rand00 commented Mar 28, 2025

This works for when getting EINTR on zmq_recv.

I guess that we would want to only ignore this error on this specific call, but that information is not present in the code at that point. A solution could be to somehow propagate the EINTR to the user thread that lead to the error. The user then knows that the last call e.g. was a recv and can ignore the error.

From reading the code in zmq-deferred/src/socket.ml - I'm currently unsure of how this could be done in the best way. I'm thinking that the corresponding callback in t.receivers should be woken up with the EINTR exception - so either communicate this exception via a new field in t or change the interface of all receivers to take an exn option as argument. To make the semantics consistent - all senders should also get this error propagated.

But how to know what receiver/sender that caused EINTR...


Update after testing

As retrying recv on EINTR works as implemented now, without giving the error to the user - the question is if we ever want to propagate EINTR instead of auto-retrying. This would demand a more complex solution, as discussed earlier.

Personally I currently think that it doesn't make much sense to propagate EINTR. I havn't tried a case where e.g. Ctrl-c didn't work on the process using ZMQ, so that signal is propagated fine. I don't know how we would even know what signal lead to EINTR - which is pretty essential for the user if they need to act upon it...

Also, currently ocaml-zmq already retries reading from socket in certain cases.

Therefore I think this simple fix seems like the right one.

As an aside: Maybe some users would be interested in knowing if EINTR / retry has happened (I for one would be interested) - so maybe a user-setable 'logger' could be added to Deferred.T.


Todo before merge

  • make a decision on if this auto-retry method is the best way to go, or if user instead should get the EINTR error propagated to the recv caller
  • make a decision on if there should be added a way to log internal retries to user (I'd be interested in this). Should be its own PR.
  • make a decision on what to do about possibility of broken messages from recv_all after retrying following EINTR
    • documentation should be updated on this
    • or we could try to do some internal validation of messages / drop next message after EINTR happening on 'zmq_msg_recv'

@rand00
Copy link
Author

rand00 commented Mar 28, 2025

This PR is related to #134

rand00 added 2 commits March 28, 2025 13:16
…'t raised in 'event_loop' but in 'recv_all' - so added retry in this spot too
@rand00
Copy link
Author

rand00 commented Mar 28, 2025

Haha okay... After running my set of applications that get EINTR at a steady but slow rate for a long time - I now also get the EINTR directly from zmq_msg_recv - which means that this exception is thrown in a different spot in the codebase - now turning up in f' in post. So there is even another spot where we need to retry recv - the good thing is that here we know EINTR comes from the call where we are allowed to retry.

@rand00
Copy link
Author

rand00 commented Mar 30, 2025

Update after testing more:

I caught the EINTR coming from the error tag 'zmq_msg_recv' from the C-stubs (which is very infrequent vs from 'zmq_getsockopt'). The result was that the retry of calling recv worked - but the result of the following recv_all was not of the expected length. I sent a single message body - so I match on the list [ _addr; msg ] - which suddenly was of a different length.

I don't know if this kind of malformed message can happen in other cases using ZMQ, but I think that this happening is strictly better than needing to restart the socket and context.

The question is if the kind of malformed message is something the ocaml-zmq user should handle, or if we can do any kind of fixing behind the scenes.

@andersfugmann
Copy link
Contributor

andersfugmann commented Apr 29, 2025

Sorry for the delay in replying.

I like the approach, but there is a problem with the solution.
If INTR is raised while fetching multi-part messages (call to msg_recv_all) then the call must be repeated in the inner loop of msg_recv_all - and not handled in the upper event loop. Handling on top-level will make the socket unusable.

I'm inclined to just catch and ignore sigint in the blocking parts of zmq when calling a non-blocking operation (send ~block:false / recv ~block_false / getsocketopt / setsocketopt), but that may cause other challenges.

However, I'm unsure why the EINTR even occurs - i mean - who is sending signals to the application? Is there some background monitoring check that's trying to terminate the application, or someone pressing ctrl^c?

@rand00
Copy link
Author

rand00 commented Apr 29, 2025

I line the approach, but there is a problem with the solution. If INTR is raised while fetching multi-part messages (call to msg_recv_all) then the call must be repeated in the inner loop of msg_recv_all - and not handled in the upper event loop. Handling on top-level will make the socket unusable.

You mean to catch EINTR here?

Is this the reason why my call to recv_all after EINTR raised here didn't return a multipart message of the right length?

I havn't observed that the socket became unuseable from my tests

However, I'm unsure why the EINTR even occurs - i mean - who is sending signals to the application? Is there some background monitoring check that's trying to terminate the application, or someone pressing ctrl^c?

I havn't observed this behaviour of ZMQ before now. The specific setup that triggers EINTR is that I have an ipc pub-socket that publishes marshalled OCaml floats on a set of different subscriptions - where a set of processes subscribes to each channel respectively. I have no idea where the interrupt comes from - but from what I've read (e.g. the linked issue at their repo) - it seems like it's "normal behaviour" for ZMQ, and one should just retry...

Edit: What kind of "background monitoring check" are you thinking about?

@andersfugmann
Copy link
Contributor

andersfugmann commented Apr 30, 2025

You mean to catch EINTR here?

Yes, but not the first receive. If the first part of a message is received, its important to receive all remaining parts of the message, or the next call to recv_msg_all may contain the tail of a previous message.

Is this the reason why my call to recv_all after EINTR raised here didn't return a multipart message of the right length?

That seems very likely.

I think the solution is to catch EINTR (or make a wrapper to simplify) for calls on L499. Something like:

let recv_all_wrapper (f : ?block:bool -> _ t -> _) =
    (* Wrapper function to catch EINTR and just try again *)
    let rec retry_on_eintr f = 
      try f () with Unix.EINTR with retry_on_eintr f 
   in
   
    (* Once the first message part is received all remaining message parts can
       be received without blocking. *)
    let rec loop socket accu =
      if retry_on_eintr (fun () -> has_more socket) then
        let msg = retry_on_eintr (fun () -> f socket) in
        loop socket (msg :: accu)
      else
        accu
    in
    fun ?block socket ->
      let first = f ?block socket in
      List.rev (loop socket [first])

With this change, no read data will be thrown away so it will be safe to catch intr on upper level and retry the read.
But - send_all is also quite fragile to EINTR's - and that's not as simple fix. The Ocaml doc says that send on subsequent message parts will not block (I prob wrote that comment some time ago) but I cannot find evidence of that in the ZMQ documentation. If any subsequent send can raise EINTR the sending is broken and sending a new message will then append to "half sent" message. In this case we would need a 'handle' to the send so sending can be resumed - and maybe that's also a better way to solve EINTR when receiving. I need to give that some thought.

I havn't observed this behaviour of ZMQ before now. The specific setup that triggers EINTR is that I have an ipc pub-socket that publishes marshalled OCaml floats on a set of different subscriptions - where a set of processes subscribes to each channel respectively. I have no idea where the interrupt comes from - but from what I've read (e.g. the linked issue at their repo) - it seems like it's "normal behaviour" for ZMQ, and one should just retry...

I think some tests are needed to understand the "correct" way of handling EINTR to avoid the process become stuck because the signals are ignored in a wrong way.

Edit: What kind of "background monitoring check" are you thinking about?

Maybe some sort of process monitor that runs to ensure that your process is still running, e.g. by sending a SIGCONT or just signal #0 to the monitored process. I think some tests should be made to ensure that user processes that uses the ZMQ library does not become stuck because all INTR's are ignored.

@andersfugmann
Copy link
Contributor

andersfugmann commented Apr 30, 2025

-And to add. When sending a multipart message EAGAIN handling has the same problem as with EINTR, that it may leave half sent messages. So that will also need to be fixed somehow, and I'm starting to think that the use of a handler (kinda like a continuation) would be the best solution - but it will require some larger changes in the async code paths as well to handle that in a reliable way.

@andersfugmann
Copy link
Contributor

andersfugmann commented Apr 30, 2025

The handler would look something like:

let send_all =
    let cont ?block socket msgs () = 
       match !msgs with 
         | [] -> ()
         | [msg] -> 
           send ?block ~more:false socket msg;
           msgs := []
   
        | msg :: rest -> 
          send ?block ~more:true socket msg;
          msgs := rest;
          cont ?block socket msgs
    in 

    fun ?block socket msgs -> 
      cont ?block socket (ref msgs) 

This will return a continuation that can be called repeatably. I dont know if I like the mutable state in there but its a suggestion.

To use this do:

...
(* My user-defined hander of EAGAIN or INTR *)
let rec retry f = 
  try f () with _ -> retry f
in 
retry (send_all socket [list; of; messages])

@rand00
Copy link
Author

rand00 commented May 1, 2025

Here is a snippet of when logging the signals on the process reading from the ipc pub-socket with sudo strace -e trace=signal -p <PID> - all the signals come at a steady frequency of ~ 5hz:

--- SIGALRM {si_signo=SIGALRM, si_code=SI_KERNEL} ---
rt_sigreturn({mask=[]})                 = 98691926438752
rt_sigprocmask(SIG_BLOCK, NULL, [], 8)  = 0
rt_sigprocmask(SIG_BLOCK, [ALRM], [], 8) = 0
rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
--- SIGALRM {si_signo=SIGALRM, si_code=SI_KERNEL} ---
rt_sigreturn({mask=[]})                 = -1 EINTR (Interrupted system call)
rt_sigprocmask(SIG_BLOCK, NULL, [], 8)  = 0
rt_sigprocmask(SIG_BLOCK, [ALRM], [], 8) = 0
rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
--- SIGALRM {si_signo=SIGALRM, si_code=SI_KERNEL} ---
rt_sigreturn({mask=[]})                 = -1 EAGAIN (Resource temporarily unavailable)
rt_sigprocmask(SIG_BLOCK, NULL, [], 8)  = 0
rt_sigprocmask(SIG_BLOCK, [ALRM], [], 8) = 0
rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
--- SIGALRM {si_signo=SIGALRM, si_code=SI_KERNEL} ---
rt_sigreturn({mask=[]})                 = -1 EINTR (Interrupted system call)
rt_sigprocmask(SIG_BLOCK, NULL, [], 8)  = 0
rt_sigprocmask(SIG_BLOCK, [ALRM], [], 8) = 0
rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
--- SIGALRM {si_signo=SIGALRM, si_code=SI_KERNEL} ---
rt_sigreturn({mask=[]})                 = -1 EINTR (Interrupted system call)
rt_sigprocmask(SIG_BLOCK, NULL, [], 8)  = 0
rt_sigprocmask(SIG_BLOCK, [ALRM], [], 8) = 0
rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
--- SIGALRM {si_signo=SIGALRM, si_code=SI_KERNEL} ---
rt_sigreturn({mask=[]})                 = 1
rt_sigprocmask(SIG_BLOCK, NULL, [], 8)  = 0
rt_sigprocmask(SIG_BLOCK, [ALRM], [], 8) = 0
rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
--- SIGALRM {si_signo=SIGALRM, si_code=SI_KERNEL} ---
rt_sigreturn({mask=[]})                 = -1 EINTR (Interrupted system call)
rt_sigprocmask(SIG_BLOCK, NULL, [], 8)  = 0
rt_sigprocmask(SIG_BLOCK, [ALRM], [], 8) = 0
rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
--- SIGALRM {si_signo=SIGALRM, si_code=SI_KERNEL} ---
rt_sigreturn({mask=[]})                 = 0
rt_sigprocmask(SIG_BLOCK, NULL, [], 8)  = 0
rt_sigprocmask(SIG_BLOCK, [ALRM], [], 8) = 0
rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
--- SIGALRM {si_signo=SIGALRM, si_code=SI_KERNEL} ---
rt_sigreturn({mask=[]})                 = 1
rt_sigprocmask(SIG_BLOCK, NULL, [], 8)  = 0
rt_sigprocmask(SIG_BLOCK, [ALRM], [], 8) = 0
rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
--- SIGALRM {si_signo=SIGALRM, si_code=SI_KERNEL} ---
rt_sigreturn({mask=[]})                 = -1 EINTR (Interrupted system call)
rt_sigprocmask(SIG_BLOCK, NULL, [], 8)  = 0
rt_sigprocmask(SIG_BLOCK, [ALRM], [], 8) = 0
rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
--- SIGALRM {si_signo=SIGALRM, si_code=SI_KERNEL} ---
rt_sigreturn({mask=[]})                 = -1 EAGAIN (Resource temporarily unavailable)
rt_sigprocmask(SIG_BLOCK, NULL, [], 8)  = 0
rt_sigprocmask(SIG_BLOCK, [ALRM], [], 8) = 0
rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
--- SIGALRM {si_signo=SIGALRM, si_code=SI_KERNEL} ---
rt_sigreturn({mask=[]})                 = 27965
rt_sigprocmask(SIG_BLOCK, NULL, [], 8)  = 0
rt_sigprocmask(SIG_BLOCK, [ALRM], [], 8) = 0
rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
--- SIGALRM {si_signo=SIGALRM, si_code=SI_KERNEL} ---
rt_sigreturn({mask=[]})                 = -1 EINTR (Interrupted system call)
rt_sigprocmask(SIG_BLOCK, NULL, [], 8)  = 0
rt_sigprocmask(SIG_BLOCK, [ALRM], [], 8) = 0
rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
--- SIGALRM {si_signo=SIGALRM, si_code=SI_KERNEL} ---
rt_sigreturn({mask=[]})                 = -1 EAGAIN (Resource temporarily unavailable)
rt_sigprocmask(SIG_BLOCK, NULL, [], 8)  = 0
rt_sigprocmask(SIG_BLOCK, [ALRM], [], 8) = 0
rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
--- SIGALRM {si_signo=SIGALRM, si_code=SI_KERNEL} ---
rt_sigreturn({mask=[]})                 = -1 EAGAIN (Resource temporarily unavailable)
rt_sigprocmask(SIG_BLOCK, NULL, [], 8)  = 0
rt_sigprocmask(SIG_BLOCK, [ALRM], [], 8) = 0
rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
--- SIGALRM {si_signo=SIGALRM, si_code=SI_KERNEL} ---
rt_sigreturn({mask=[]})                 = -1 EINTR (Interrupted system call)
rt_sigprocmask(SIG_BLOCK, NULL, [], 8)  = 0
rt_sigprocmask(SIG_BLOCK, [ALRM], [], 8) = 0
rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
--- SIGALRM {si_signo=SIGALRM, si_code=SI_KERNEL} ---
rt_sigreturn({mask=[]})                 = 32
rt_sigprocmask(SIG_BLOCK, NULL, [], 8)  = 0
rt_sigprocmask(SIG_BLOCK, [ALRM], [], 8) = 0
rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
--- SIGALRM {si_signo=SIGALRM, si_code=SI_KERNEL} ---
rt_sigreturn({mask=[]})                 = 40
rt_sigprocmask(SIG_BLOCK, NULL, [], 8)  = 0
rt_sigprocmask(SIG_BLOCK, [ALRM], [], 8) = 0
rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
--- SIGALRM {si_signo=SIGALRM, si_code=SI_KERNEL} ---
rt_sigreturn({mask=[]})                 = -1 EINTR (Interrupted system call)
rt_sigprocmask(SIG_BLOCK, NULL, [], 8)  = 0
rt_sigprocmask(SIG_BLOCK, [ALRM], [], 8) = 0
rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
--- SIGALRM {si_signo=SIGALRM, si_code=SI_KERNEL} ---
rt_sigreturn({mask=[]})                 = -1 EINTR (Interrupted system call)
rt_sigprocmask(SIG_BLOCK, NULL, [], 8)  = 0
rt_sigprocmask(SIG_BLOCK, [ALRM], [], 8) = 0
rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
--- SIGALRM {si_signo=SIGALRM, si_code=SI_KERNEL} ---
rt_sigreturn({mask=[]})                 = -1 EINTR (Interrupted system call)
rt_sigprocmask(SIG_BLOCK, NULL, [], 8)  = 0
rt_sigprocmask(SIG_BLOCK, [ALRM], [], 8) = 0
rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
--- SIGALRM {si_signo=SIGALRM, si_code=SI_KERNEL} ---
rt_sigreturn({mask=[]})                 = -1 EINTR (Interrupted system call)
rt_sigprocmask(SIG_BLOCK, NULL, [], 8)  = 0
rt_sigprocmask(SIG_BLOCK, [ALRM], [], 8) = 0
rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0

.. these signals didn't specifically cause ZMQ to raise EINTR in ocaml-zmq within the logged period - but I guess could be the reason it happens

@andersfugmann
Copy link
Contributor

Ahh. yes. good old sigalarm. Maybe your application has an alarm set? Any signal delivered to the application will make ZMQ raise EINTR. Also see the zmq test here used to test raising of EINTR

@andersfugmann
Copy link
Contributor

I'm still inclined to just eat EINTER and always try again. However, that does not solve problems with EAGAIN. Stay tuned.

@andersfugmann
Copy link
Contributor

Ok. Since this organization is inactive, I've created a detached fork on https://github.com/andersfugmann/ocaml-zmq. I suggest moving the discussion there.
I've also created a PR to address the problems. Maybe you could try that branch and see if it solves your problems with EINTR and multipart messages.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants