Cro::ZeroMQ
This module enables building Cro pipelines using ZeroMQ. The lower-level components are intended to be composed together using Cro.compose
. Some higher level classes also exist to form Cro services and clients in a more convenient way for the most common use cases. This document assumes the reader has some existing understanding of ZeroMQ and its various socket types.
Cro::ZeroMQ::Message§
This class represents a ZeroMQ message. It does the Cro::Message
role. A Cro::ZeroMQ::Message
can represent both a message that was received and a message to be sent (in a simple echo server, for example, it would be both).
ZeroMQ messages may consist of multiple parts. The parts
method returns an List
of the parts that were received (or, if this is a message to be sent, the parts added to send so far). Each part is represented as a Blob
. Since a List
is returned, this cannot be mutated. The body-blob
method returns the last part, assuming that previous parts are some form of envelope. This is only a convention provided by Cro. The body-text
property will try to decode body-blob
as UTF-8; pass the :enc
parameter to choose another encoding.
The identity
method returns a List
of Blob
, which one Blob
per frame up to and including the first zero-length frame. It is intended for use when working with ROUTER sockets. If there is no zero-length frame in the message, it will throw an Exception
of type X::Cro::ZeroMQ::MissingIdentity
.
The most basic way to construct a message to to pass an list of Blob
s to the new
method:
Cro::ZeroMQ::Message.new(
parts => ($blob-key, $blob-message)
)
There are also a number of convenience constructors taking a single positional parameter that is either a string or a blob:
Cro::ZeroMQ::Message.new("hello world")
Cro::ZeroMQ::Message.new("goodbye cruel world".encode("ascii"))
As well as a slurpy candidate for multi-part messages:
Cro::ZeroMQ::Message.new("SomeEventName", $event-json)
Whenever a Str
is passed, it will first be encoded using UTF-8 before being stored. (This is true of both the Str
constructor and the slurpy positional constructor; the constructor taking the parts
named attribute expects only Blob
s.)
A Cro::ZeroMQ::Message
is immutable once constructed.
Note that there is no way to provide or obtain a Supply
of parts because ZeroMQ doesn't send parts until it has seen them all, and doesn't pass parts on to the application until it has received them all. Therefore, there's no opportunity for increased performance by sending some parts earlier.
Pipeline Components§
Pipeline components exist each of the ZeroMQ socket types. They all may be constructed with either (but not both of):
connect
, which specifies what to connect to. This may be anIterable
if the socket should be connected to multiple endpoints; otherwise, it will be treated as a single endpoint. The provided value(s) will be coerced toStr
and should be of the formtcp://somehost:5555
.bind
, which specifies what to bind to. This may be anIterable
if the socket should be bound to multiple endpoints; otherwise it will be treated as a single endpoint. The provided value(s) will be coerced toStr
and should be of the formtcp://localhost:5555
(ortcp:://*:5555
to bind to all interfaces).
Passing neither of connect
or bind
, or passing both of connect
and bind
, will result in an exception of type X::Cro::ZeroMQ::IllegalBind
.
The optional high-water-mark
named parameter sets the high water mark (how many messages may be outstanding for the socket enters an exception state, either starting to block on send or drop messages, depending on the socket type).
All of the ZeroMQ pipeline components do the Cro::ZeroMQ::Component
role, which factors out these commonalities. There will be little reason for typical Cro applications to care about this, however.
Cro::ZeroMQ::Socket::Push§
This class is a Cro::Sink
backed by a ZeroMQ PUSH socket. It is to be placed at the end of a pipeline, consuming Cro::ZeroMQ::Message
objects and sending them.
Cro::ZeroMQ::Socket::Pull§
This class is a Cro::Source
backed by a ZeroMQ PULL socket. It is to be placed at the start of a pipeline, and produces Cro::ZeroMQ::Message
objects.
Cro::ZeroMQ::Socket::Pub§
This class is a Cro::Sink
backed by a ZeroMQ PUB socket. It consumes Cro::ZeroMQ::Message
objects and publishes them.
Cro::ZeroMQ::Socket::Sub§
This class is a Cro::Source
backed by a ZeroMQ SUB socket. It produces Cro::ZeroMQ::Message
objects for each message received. Subscriptions are filtered on message prefix (only the first message part being considered as the subscription key).
The subscribe
named argument is required (since not subscribing will always result in receiving no messages). It may take:
A
Blob
specifying the subscription topic.A
Str
specifying the subscription topic; since they are matched as bytes, this will be encoded asUTF-8
.An
Iterable
of subscription topics (will be iterated over and treated asBlob
orStr
would be)A
Whatever
(that is,subscribe => *
), which is equivalent to callingsubscribe
with the emptyBlob
or emptyStr
and subscribes to all messages.A
Supply
, which will be tapped. Each emitted value will be treated as a subscription to establish, and may be aBlob
or aStr
. This allows the set of subscriptions to change dynamically over time.
Optionally, unsubscribe
may be passed. This only takes a Supply
, and each time a topic is emitted (either as a Blob
or a Str
) an unsubscription will take place. Note that it only makes sense to unsubscribe from topics that were already subscribed to, and so this likely only makes sense in combination with having passed a Supply
to the subscribe
named argument.
If the same topic is subscribed to multiple times, then an unsubscription will only remove one of these (put another way, the set of topics subscribed to behave as if they were reference counted). This behavior is provided by ZeroMQ rather than added by Cro. Note that unsubscriptions must match a subscription that is still active to be meaningful; it is not, therefore, possible to subscribe to everything and then exclude things. (At the end of the day, these are all just convenient ways to have zmq_setsockopt
called.)
Cro::ZeroMQ::Socket::XPub§
This is a Cro::Source
backed by a ZeroMQ XPUB socket. It produces Cro::ZeroMQ::Message
objects, which represent subscription requests. It is Cro::Replyable
, the replier being a Cro::Sink
that consumes Cro::ZeroMQ::Message
objects (which correspond to messages to publish). Typically used together with Cro::ZeroMQ::XSub
.
Cro::ZeroMQ::Socket::XSub§
This is a Cro::Source
backed by a ZeroMQ XSUB socket. It produces Cro::ZeroMQ::Message
objects, which represent messages received from publishers. It is a Cro::Replyable
, the replier being a Cro::Sink
that consumes Cro::ZeroMQ::Message
objects (which represent subscription requests to pass on to the publisher). Typically used together with Cro::ZeroMQ::XPub
.
Cro::ZeroMQ::Socket::Rep§
This class is a Cro::Source
backed by a ZeroMQ REP socket, which produces Cro::ZeroMQ::Message
. It is also a Cro::Replyable
, and the replier is a Cro::Sink
that sends a Cro::ZeroMQ::Message
reply. Therefore, an echo server would just be:
my Cro::Service $echo = Cro.compose(
Cro::ZeroMQ::Rep.new(bind => "tcp://*:5555")
);
A more useful service would place one or more Cro::Transform
s into the pipeline to transform the request into a response. A ZeroMQ REP processes a message at a time, so another message will not be emitted until the reply has been sent by the replier. To build services that can work on multiple messages asynchronously, Cro::ZeroMQ::Router
is a better bet.
Cro::ZeroMQ::Socket::Req§
This class is a Cro::Connector
backed by a ZeroMQ REQ socket. It consumes Cro::ZeroMQ::Message
and produces Cro::ZeroMQ::Message
. It must never have a message emitted to it while it is still waiting for the response from an earlier message, to follow the ZeroMQ REQ constraints. To build clients that can have multiple outstanding requests, Cro::ZeroMQ::Dealer
is a better bet.
Cro::ZeroMQ::Socket::Router§
This class is a Cro::Source
backed by a ZeroMQ ROUTER socket. It produces Cro::ZeroMQ::Message
objects. It is also a Cro::Replyable
, the replier being a sink that sends on the ROUTER socket.
Cro::ZeroMQ::Socket::Dealer§
This class is a Cro::Connector
backed by a ZeroMQ DEALER socket. It consumes and produces Cro::ZeroMQ::Message
objects.
Higher Level APIs§
Cro::ZeroMQ::Service§
This is a Cro::Service
containing a pipeline for processing ZeroMQ messages. Many services will have a single Cro::Service
instance that they start up, but there's no reason a service can't have many. A Cro::Service
makes most sense for pipelines that feel "server"-like, or that are the "main loop" of the service.
There are various methods for constructing the most common kinds of pipeline. It is allowable (though not always sensible) to either bind or connect the endpoints.
Replier (REP)§
A service that receives and replies to messages. The echo service would just be:
my Cro::Service $rep = Cro::ZeroMQ::Service.rep(
bind => 'tcp://*:5555'
);
A more realistic example would include a transform implementing the service.
class MyReplier is Cro::Transform {
method consumes() { Cro::ZeroMQ::Message }
method produces() { Cro::ZeroMQ::Message }
method transformer(Supply $messages --> Supply) {
supply {
whenever $messages {
my $response = Cro::ZeroMQ::Message.new(.body-text.uc);
emit $response;
}
}
}
}
my Cro::Service $rep = Cro::ZeroMQ::Service.rep(
bind => 'tcp://*:5555',
MyReplier
);
Router (ROUTER)§
A service that receives messages prefixed with an identify, and processes them. A REP
can only process one message at a time, which will not scale so well if multiple messages should be processed. A ROUTER
socket allows for processing of many messages at a time. Each message is prefixed with an identity. It is up to the message transform to include that in the response message, so that it can be routed back to the correct client. (Note that the identity actually consists of one or more frames, followed by an empty frame.)
class MyAsyncReplier is Cro::Transform {
method consumes() { Cro::ZeroMQ::Message }
method produces() { Cro::ZeroMQ::Message }
method transformer(Supply $messages --> Supply) {
supply {
whenever $messages -> $msg {
whenever start { self!process($msg.body) } -> $response {
emit Cro::ZeroMQ::Message.new($msg.identity, $response);
}
}
}
}
method !process($data) {
...
}
}
my Cro::Service $rep = Cro::ZeroMQ::Service.router(
bind => 'tcp://*:5555',
MyAsyncReplier
);
Pull (PULL)§
Used for a service that will pull messages and process them. Since it doesn't send anything, then the service is a sink.
class MyAggregator is Cro::Sink {
method consumes() { Cro::ZeroMQ::Message }
method sinker(Supply $messages --> Supply) {
supply {
whenever $messages {
# Do something with the message.
}
}
}
}
my Cro::Service $pull = Cro::ZeroMQ::Service.pull(
connect => 'tcp://some-publisher:5555',
MyAggregator
);
Pull/Push (PULL, PUSH)§
This is typically used as a parallel worker. Unlike other service types, it expects:
Either
pull-bind
orpull-connect
(typicallyconnect
, as the source of work is a static point in the service topology)Either
push-bind
orpush-connect
(typically alsoconnect
, as the sink for work is a static point in the service topology too)class MyWorker is Cro::Transform { method consumes() { Cro::ZeroMQ::Message } method produces() { Cro::ZeroMQ::Message } method transformer(Supply $messages --> Supply) { supply { whenever $messages { my $outcome = ...; emit $outcome; } } } } my Cro::Service $pull-push = Cro::ZeroMQ::Service.pull-push( pull-connect => 'tcp://source:5555', push-connect => 'tcp://sink:5555', MyWorker );
Subscriber (SUB)§
A service that reacts to received messages. Since it doesn't send anything, then the service is a sink.
class MyHandler is Cro::Sink {
method consumes() { Cro::ZeroMQ::Message }
method sinker(Supply $messages --> Supply) {
supply {
whenever $messages {
# Do something with the message.
}
}
}
}
my Cro::Service $sub = Cro::ZeroMQ::Service.sub(
connect => 'tcp://some-publisher:5555',
MyHandler
);
Cro::ZeroMQ::Client for request/response (REQ, DEALER)§
This provides a simple interface for sending messages using a REQ
or a DEALER
socket and receiving a single response to that message. A req
client will send a message, and will not be able to send another until a response has been received. Trying to do so will result in an exception.
my $client = Cro::ZeroMQ::Client.req(
connect => 'tcp://some-replier:5555'
);
my $reply = await $client.send($message);
By contrast, a dealer
is an asynchronous client. It will synthesize an ID for each request and insert it, followed by an empty frame, before sending the request. Responses will cause the correct Promise
s returned by send
to be kept.
my $client = Cro::ZeroMQ::Client.dealer(
connect => 'tcp://some-replier:5555'
);
my $reply = await $client.send($message);
In most cases, dealer
will be preferable for scalability reasons.
Cro::ZeroMQ::Distributor for send-only (PUSH, PUB)§
The high level client for PUSH and PUB socket types has a send
method, which sends the message asynchronously and return immediately. For example:
my $client = Cro::ZeroMQ::Distributor.push(
bind => 'tcp://work-source:5555'
);
$client.send($message);
Cro::ZeroMQ::Collector for receive-only (PULL, SUB)§
The high level client for PULL and SUB socket types has a Supply
method, which will emit received Cro::ZeroMQ::Message
objects. A PULL will probably be more suited to a service rather than a client, but may be useful. A useful example of this with a SUB socket would be to create such a client in a web server, and then send all received messages out over a web socket:
my $client = Cro::ZeroMQ::Collector.sub(
connect => 'tcp://notifications:5555'
);
my $notifications = $client.Supply.share; # Turn it into a live Supply
my $application = route {
get -> 'notifications' {
web-socket {
supply {
whenever $notifications {
.emit;
}
}
}
}
}