1
0
Fork 0
mirror of synced 2024-12-26 08:43:19 -05:00
ultimate-vim/sources_non_forked/vim-elixir/large_file.ex

1001 lines
39 KiB
Elixir
Raw Normal View History

2022-05-18 13:31:41 -04:00
defmodule GenStage do
@moduledoc ~S"""
Stages are data-exchange steps that send and/or receive data
from other stages.
When a stage sends data, it acts as a producer. When it receives
data, it acts as a consumer. Stages may take both producer and
consumer roles at once.
## Stage types
Besides taking both producer and consumer roles, a stage may be
called "source" if it only produces items or called "sink" if it
only consumes items.
For example, imagine the stages below where A sends data to B
that sends data to C:
[A] -> [B] -> [C]
we conclude that:
* A is only a producer (and therefore a source)
* B is both producer and consumer
* C is only a consumer (and therefore a sink)
As we will see in the upcoming Examples section, we must
specify the type of the stage when we implement each of them.
To start the flow of events, we subscribe consumers to
producers. Once the communication channel between them is
established, consumers will ask the producers for events.
We typically say the consumer is sending demand upstream.
Once demand arrives, the producer will emit items, never
emitting more items than the consumer asked for. This provides
a back-pressure mechanism.
A consumer may have multiple producers and a producer may have
multiple consumers. When a consumer asks for data, each producer
is handled separately, with its own demand. When a producer
receives demand and sends data to multiple consumers, the demand
is tracked and the events are sent by a dispatcher. This allows
producers to send data using different "strategies". See
`GenStage.Dispatcher` for more information.
Many developers tend to create layers of stages, such as A, B and
C, for achieving concurrency. If all you want is concurrency, using
processes is enough. They are the primitive for achieving concurrency
in Elixir and the VM does all of the work of multiplexing them.
Instead, layers in GenStage must be created when there is a need for
back-pressure or to route the data in different ways.
For example, if you need the data to go over multiple steps but
without a need for back-pressure or without a need to break the
data apart, do not design it as such:
[Producer] -> [Step 1] -> [Step 2] -> [Step 3]
Instead it is better to design it as:
[Consumer]
/
[Producer]-<-[Consumer]
\
[Consumer]
where "Consumer" are multiple processes that subscribe to the same
"Producer" and run exactly the same code, with all of transformation
steps from above. In such scenarios, you may even find the
`Task.async_stream/2` function that ships as part of Elixir to be
enough or achieve the flexibility you need with the `ConsumerSupervisor`
functionality that is included as part of `GenStage`.
## Example
Let's define the simple pipeline below:
[A] -> [B] -> [C]
where A is a producer that will emit items starting from 0,
B is a producer-consumer that will receive those items and
multiply them by a given number and C will receive those events
and print them to the terminal.
Let's start with A. Since A is a producer, its main
responsibility is to receive demand and generate events.
Those events may be in memory or an external queue system.
For simplicity, let's implement a simple counter starting
from a given value of `counter` received on `init/1`:
defmodule A do
use GenStage
def start_link(number) do
GenStage.start_link(A, number)
end
def init(counter) do
{:producer, counter}
end
def handle_demand(demand, counter) when demand > 0 do
# If the counter is 3 and we ask for 2 items, we will
# emit the items 3 and 4, and set the state to 5.
events = Enum.to_list(counter..counter+demand-1)
{:noreply, events, counter + demand}
end
end
B is a producer-consumer. This means it does not explicitly
handle the demand because the demand is always forwarded to
its producer. Once A receives the demand from B, it will send
events to B which will be transformed by B as desired. In
our case, B will receive events and multiply them by a number
given on initialization and stored as the state:
defmodule B do
use GenStage
def start_link(number) do
GenStage.start_link(B, number)
end
def init(number) do
{:producer_consumer, number}
end
def handle_events(events, _from, number) do
events = Enum.map(events, & &1 * number)
{:noreply, events, number}
end
end
C will finally receive those events and print them every second
to the terminal:
defmodule C do
use GenStage
def start_link() do
GenStage.start_link(C, :ok)
end
def init(:ok) do
{:consumer, :the_state_does_not_matter}
end
def handle_events(events, _from, state) do
# Wait for a second.
Process.sleep(1000)
# Inspect the events.
IO.inspect(events)
# We are a consumer, so we would never emit items.
{:noreply, [], state}
end
end
Now we can start and connect them:
{:ok, a} = A.start_link(0) # starting from zero
{:ok, b} = B.start_link(2) # multiply by 2
{:ok, c} = C.start_link() # state does not matter
GenStage.sync_subscribe(c, to: b)
GenStage.sync_subscribe(b, to: a)
Typically, we subscribe from bottom to top. Since A will
start producing items only when B connects to it, we want this
subscription to happen when the whole pipeline is ready. After
you subscribe all of them, demand will start flowing upstream and
events downstream.
When implementing consumers, we often set the `:max_demand` and
`:min_demand` on subscription. The `:max_demand` specifies the
maximum amount of events that must be in flow while the `:min_demand`
specifies the minimum threshold to trigger for more demand. For
example, if `:max_demand` is 1000 and `:min_demand` is 750,
the consumer will ask for 1000 events initially and ask for more
only after it receives at least 250.
In the example above, B is a `:producer_consumer` and therefore
acts as a buffer. Getting the proper demand values in B is
important: making the buffer too small may make the whole pipeline
slower, making the buffer too big may unnecessarily consume
memory.
When such values are applied to the stages above, it is easy
to see the producer works in batches. The producer A ends-up
emitting batches of 50 items which will take approximately
50 seconds to be consumed by C, which will then request another
batch of 50 items.
## `init` and `:subscribe_to`
In the example above, we have started the processes A, B, and C
independently and subscribed them later on. But most often it is
simpler to subscribe a consumer to its producer on its `c:init/1`
callback. This way, if the consumer crashes, restarting the consumer
will automatically re-invoke its `c:init/1` callback and resubscribe
it to the supervisor.
This approach works as long as the producer can be referenced when
the consumer starts--such as by name (for a named process) or by pid
for a running unnamed process. For example, assuming the process
`A` and `B` are started as follows:
# Let's call the stage in module A as A
GenStage.start_link(A, 0, name: A)
# Let's call the stage in module B as B
GenStage.start_link(B, 2, name: B)
# No need to name consumers as they won't be subscribed to
GenStage.start_link(C, :ok)
We can now change the `c:init/1` callback for C to the following:
def init(:ok) do
{:consumer, :the_state_does_not_matter, subscribe_to: [B]}
end
or:
def init(:ok) do
{:consumer, :the_state_does_not_matter, subscribe_to: [{B, options}]}
end
And we will no longer need to call `sync_subscribe/2`.
Another advantage of this approach is that it makes it straight-forward
to leverage concurrency by simply starting multiple consumers that subscribe
to their producer (or producer-consumer). This can be done in the example above
by simply calling start link multiple times:
# Start 4 consumers
GenStage.start_link(C, :ok)
GenStage.start_link(C, :ok)
GenStage.start_link(C, :ok)
GenStage.start_link(C, :ok)
In a supervision tree, this is often done by starting multiple workers:
children = [
worker(A, [0]),
worker(B, [2]),
worker(C, []),
worker(C, []),
worker(C, []),
worker(C, [])
]
Supervisor.start_link(children, strategy: :one_for_one)
In fact, having multiple consumers is often the easiest and simplest way to
leverage concurrency in a GenStage pipeline, especially if events can
be processed out of order. For example, imagine a scenario where you
have a stream of incoming events and you need to access a number of
external services per event. Instead of building complex stages that
route events through those services, one simple mechanism to leverage
concurrency is to start a producer and N consumers and invoke the external
services directly for each event in each consumer. N is typically the
number of cores (as returned by `System.schedulers_online/0`) but can
likely be increased if the consumers are mostly waiting on IO.
Another alternative to the scenario above is to use a `ConsumerSupervisor`
for consuming the events instead of N consumers. The `ConsumerSupervisor`
will start a separate supervised process per event where the number of children
is at most `max_demand` and the average amount of children is
`(max_demand - min_demand) / 2`.
## Buffering
In many situations, producers may attempt to emit events while no consumers
have yet subscribed. Similarly, consumers may ask producers for events
that are not yet available. In such cases, it is necessary for producers
to buffer events until a consumer is available or buffer the consumer
demand until events arrive, respectively. As we will see next, buffering
events can be done automatically by `GenStage`, while buffering the demand
is a case that must be explicitly considered by developers implementing
producers.
### Buffering events
Due to the concurrent nature of Elixir software, sometimes a producer
may dispatch events without consumers to send those events to. For example,
imagine a `:consumer` B subscribes to `:producer` A. Next, the consumer B
sends demand to A, which starts producing events to satisfy the demand.
Now, if the consumer B crashes, the producer may attempt to dispatch the
now produced events but it no longer has a consumer to send those events to.
In such cases, the producer will automatically buffer the events until another
consumer subscribes.
The buffer can also be used in cases where external sources only send
events in batches larger than asked for. For example, if you are
receiving events from an external source that only sends events
in batches of 1000 and the internal demand is smaller than
that, the buffer allows you to always emit batches of 1000 events
even when the consumer has asked for less.
In all of those cases when an event cannot be sent immediately by
a producer, the event will be automatically stored and sent the next
time consumers ask for events. The size of the buffer is configured
via the `:buffer_size` option returned by `init/1` and the default
value is `10_000`. If the `buffer_size` is exceeded, an error is logged.
See the documentation for `c:init/1` for more detailed infromation about
the `:buffer_size` option.
### Buffering demand
In case consumers send demand and the producer is not yet ready to
fill in the demand, producers must buffer the demand until data arrives.
As an example, let's implement a producer that broadcasts messages
to consumers. For producers, we need to consider two scenarios:
1. what if events arrive and there are no consumers?
2. what if consumers send demand and there are not enough events?
One way to implement such a broadcaster is to simply rely on the internal
buffer available in `GenStage`, dispatching events as they arrive, as explained
in the previous section:
defmodule Broadcaster do
use GenStage
@doc "Starts the broadcaster."
def start_link() do
GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
end
@doc "Sends an event and returns only after the event is dispatched."
def sync_notify(event, timeout \\ 5000) do
GenStage.call(__MODULE__, {:notify, event}, timeout)
end
def init(:ok) do
{:producer, :ok, dispatcher: GenStage.BroadcastDispatcher}
end
def handle_call({:notify, event}, _from, state) do
{:reply, :ok, [event], state} # Dispatch immediately
end
def handle_demand(_demand, state) do
{:noreply, [], state} # We don't care about the demand
end
end
By always sending events as soon as they arrive, if there is any demand,
we will serve the existing demand, otherwise the event will be queued in
`GenStage`'s internal buffer. In case events are being queued and not being
consumed, a log message will be emitted when we exceed the `:buffer_size`
configuration.
While the implementation above is enough to solve the constraints above,
a more robust implementation would have tighter control over the events
and demand by tracking this data locally, leaving the `GenStage` internal
buffer only for cases where consumers crash without consuming all data.
To handle such cases, we will use a two-element tuple as the broadcaster state
where the first elemeent is a queue and the second element is the pending
demand. When events arrive and there are no consumers, we will store the
event in the queue alongside information about the process that broadcasted
the event. When consumers send demand and there are not enough events, we will
increase the pending demand. Once we have both data and demand, we
acknowledge the process that has sent the event to the broadcaster and finally
broadcast the event downstream.
defmodule QueueBroadcaster do
use GenStage
@doc "Starts the broadcaster."
def start_link() do
GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
end
@doc "Sends an event and returns only after the event is dispatched."
def sync_notify(event, timeout \\ 5000) do
GenStage.call(__MODULE__, {:notify, event}, timeout)
end
## Callbacks
def init(:ok) do
{:producer, {:queue.new, 0}, dispatcher: GenStage.BroadcastDispatcher}
end
def handle_call({:notify, event}, from, {queue, pending_demand}) do
queue = :queue.in({from, event}, queue)
dispatch_events(queue, pending_demand, [])
end
def handle_demand(incoming_demand, {queue, pending_demand}) do
dispatch_events(queue, incoming_demand + pending_demand, [])
end
defp dispatch_events(queue, 0, events) do
{:noreply, Enum.reverse(events), {queue, 0}}
end
defp dispatch_events(queue, demand, events) do
case :queue.out(queue) do
{{:value, {from, event}}, queue} ->
GenStage.reply(from, :ok)
dispatch_events(queue, demand - 1, [event | events])
{:empty, queue} ->
{:noreply, Enum.reverse(events), {queue, demand}}
end
end
end
Let's also implement a consumer that automatically subscribes to the
broadcaster on `c:init/1`. The advantage of doing so on initialization
is that, if the consumer crashes while it is supervised, the subscription
is automatically re-established when the supervisor restarts it.
defmodule Printer do
use GenStage
@doc "Starts the consumer."
def start_link() do
GenStage.start_link(__MODULE__, :ok)
end
def init(:ok) do
# Starts a permanent subscription to the broadcaster
# which will automatically start requesting items.
{:consumer, :ok, subscribe_to: [QueueBroadcaster]}
end
def handle_events(events, _from, state) do
for event <- events do
IO.inspect {self(), event}
end
{:noreply, [], state}
end
end
With the broadcaster in hand, now let's start the producer as well
as multiple consumers:
# Start the producer
QueueBroadcaster.start_link()
# Start multiple consumers
Printer.start_link()
Printer.start_link()
Printer.start_link()
Printer.start_link()
At this point, all consumers must have sent their demand which we were not
able to fulfill. Now by calling `QueueBroadcaster.sync_notify/1`, the event
shall be broadcasted to all consumers at once as we have buffered the demand
in the producer:
QueueBroadcaster.sync_notify(:hello_world)
If we had called `QueueBroadcaster.sync_notify(:hello_world)` before any
consumer was available, the event would also have been buffered in our own
queue and served only when demand had been received.
By having control over the demand and queue, the broadcaster has
full control on how to behave when there are no consumers, when the
queue grows too large, and so forth.
## Asynchronous work and `handle_subscribe`
Both `:producer_consumer` and `:consumer` stages have been designed to do
their work in the `c:handle_events/3` callback. This means that, after
`c:handle_events/3` is invoked, both `:producer_consumer` and `:consumer`
stages will immediately send demand upstream and ask for more items, as the
stage that produced the events assumes events have been fully processed by
`c:handle_events/3`.
Such default behaviour makes `:producer_consumer` and `:consumer` stages
unfeasible for doing asynchronous work. However, given `GenStage` was designed
to run with multiple consumers, it is not a problem to perform synchronous or
blocking actions inside `handle_events/3` as you can then start multiple
consumers in order to max both CPU and IO usage as necessary.
On the other hand, if you must perform some work asynchronously,
`GenStage` comes with an option that manually controls how demand
is sent upstream, avoiding the default behaviour where demand is
sent after `c:handle_events/3`. Such can be done by implementing
the `c:handle_subscribe/4` callback and returning `{:manual, state}`
instead of the default `{:automatic, state}`. Once the producer mode
is set to `:manual`, developers must use `GenStage.ask/3` to send
demand upstream when necessary.
Note that when `:max_demand` and `:min_demand` must be manually respected when
manually asking for demand through `GenStage.ask/3`.
For example, the `ConsumerSupervisor` module processes events
asynchronously by starting a process for each event and this is achieved by
manually sending demand to producers. `ConsumerSupervisor`
can be used to distribute work to a limited amount of
processes, behaving similar to a pool where a new process is
started for each event. See the `ConsumerSupervisor` docs for more
information.
Setting the demand to `:manual` in `c:handle_subscribe/4` is not
only useful for asynchronous work but also for setting up other
mechanisms for back-pressure. As an example, let's implement a
consumer that is allowed to process a limited number of events
per time interval. Those are often called rate limiters:
defmodule RateLimiter do
use GenStage
def init(_) do
# Our state will keep all producers and their pending demand
{:consumer, %{}}
end
def handle_subscribe(:producer, opts, from, producers) do
# We will only allow max_demand events every 5000 milliseconds
pending = opts[:max_demand] || 1000
interval = opts[:interval] || 5000
# Register the producer in the state
producers = Map.put(producers, from, {pending, interval})
# Ask for the pending events and schedule the next time around
producers = ask_and_schedule(producers, from)
# Returns manual as we want control over the demand
{:manual, producers}
end
def handle_cancel(_, from, producers) do
# Remove the producers from the map on unsubscribe
{:noreply, [], Map.delete(producers, from)}
end
def handle_events(events, from, producers) do
# Bump the amount of pending events for the given producer
producers = Map.update!(producers, from, fn {pending, interval} ->
{pending + length(events), interval}
end)
# Consume the events by printing them.
IO.inspect(events)
# A producer_consumer would return the processed events here.
{:noreply, [], producers}
end
def handle_info({:ask, from}, producers) do
# This callback is invoked by the Process.send_after/3 message below.
{:noreply, [], ask_and_schedule(producers, from)}
end
defp ask_and_schedule(producers, from) do
case producers do
%{^from => {pending, interval}} ->
# Ask for any pending events
GenStage.ask(from, pending)
# And let's check again after interval
Process.send_after(self(), {:ask, from}, interval)
# Finally, reset pending events to 0
Map.put(producers, from, {0, interval})
%{} ->
producers
end
end
end
Let's subscribe the `RateLimiter` above to the
producer we have implemented at the beginning of the module
documentation:
{:ok, a} = GenStage.start_link(A, 0)
{:ok, b} = GenStage.start_link(RateLimiter, :ok)
# Ask for 10 items every 2 seconds
GenStage.sync_subscribe(b, to: a, max_demand: 10, interval: 2000)
Although the rate limiter above is a consumer, it could be made a
producer-consumer by changing `c:init/1` to return a `:producer_consumer`
and then forwarding the events in `c:handle_events/3`.
## Notifications
`GenStage` also supports the ability to send notifications to all
consumers. Those notifications are sent as regular messages outside
of the demand-driven protocol but respecting the event ordering.
See `sync_notify/3` and `async_notify/2`.
Notifications are useful for out-of-band information, for example,
to notify consumers the producer has sent all events it had to
process or that a new batch of events is starting.
Note the notification system should not be used for broadcasting
events; for such, consider using `GenStage.BroadcastDispatcher`.
## Callbacks
`GenStage` is implemented on top of a `GenServer` with a few additions.
Besides exposing all of the `GenServer` callbacks, it also provides
`handle_demand/2` to be implemented by producers and `handle_events/3` to be
implemented by consumers, as shown above, as well as subscription-related
callbacks. Furthermore, all the callback responses have been modified to
potentially emit events. See the callbacks documentation for more
information.
By adding `use GenStage` to your module, Elixir will automatically
define all callbacks for you except for the following ones:
* `init/1` - must be implemented to choose between `:producer`, `:consumer`, or `:producer_consumer` stages
* `handle_demand/2` - must be implemented by `:producer` stages
* `handle_events/3` - must be implemented by `:producer_consumer` and `:consumer` stages
Although this module exposes functions similar to the ones found in
the `GenServer` API, like `call/3` and `cast/2`, developers can also
rely directly on GenServer functions such as `GenServer.multi_call/4`
and `GenServer.abcast/3` if they wish to.
### Name registration
`GenStage` is bound to the same name registration rules as a `GenServer`.
Read more about it in the `GenServer` docs.
## Message protocol overview
This section will describe the message protocol implemented
by stages. By documenting these messages, we will allow
developers to provide their own stage implementations.
### Back-pressure
When data is sent between stages, it is done by a message
protocol that provides back-pressure. The first step is
for the consumer to subscribe to the producer. Each
subscription has a unique reference.
Once subscribed, the consumer may ask the producer for messages
for the given subscription. The consumer may demand more items
whenever it wants to. A consumer must never receive more data
than it has asked for from any given producer stage.
A consumer may have multiple producers, where each demand is managed
individually (on a per-subscription basis). A producer may have multiple
consumers, where the demand and events are managed and delivered according to
a `GenStage.Dispatcher` implementation.
### Producer messages
The producer is responsible for sending events to consumers
based on demand. These are the messages that consumers can
send to producers:
* `{:"$gen_producer", from :: {consumer_pid, subscription_tag}, {:subscribe, current, options}}` -
sent by the consumer to the producer to start a new subscription.
Before sending, the consumer MUST monitor the producer for clean-up
purposes in case of crashes. The `subscription_tag` is unique to
identify the subscription. It is typically the subscriber monitoring
reference although it may be any term.
Once sent, the consumer MAY immediately send demand to the producer.
The `current` field, when not `nil`, is a two-item tuple containing a
subscription that must be cancelled with the given reason before the
current one is accepted.
Once received, the producer MUST monitor the consumer. However, if
the subscription reference is known, it MUST send a `:cancel` message
to the consumer instead of monitoring and accepting the subscription.
* `{:"$gen_producer", from :: {consumer_pid, subscription_tag}, {:cancel, reason}}` -
sent by the consumer to cancel a given subscription.
Once received, the producer MUST send a `:cancel` reply to the
registered consumer (which may not necessarily be the one received
in the tuple above). Keep in mind, however, there is no guarantee
such messages can be delivered in case the producer crashes before.
If the pair is unknown, the producer MUST send an appropriate cancel
reply.
* `{:"$gen_producer", from :: {consumer_pid, subscription_tag}, {:ask, demand}}` -
sent by consumers to ask demand for a given subscription (identified
by `subscription_tag`).
Once received, the producer MUST send data up to the demand. If the
pair is unknown, the producer MUST send an appropriate cancel reply.
### Consumer messages
The consumer is responsible for starting the subscription
and sending demand to producers. These are the messages that
producers can send to consumers:
* `{:"$gen_consumer", from :: {producer_pid, subscription_tag}, {:notification, message}}` -
notifications sent by producers.
* `{:"$gen_consumer", from :: {producer_pid, subscription_tag}, {:cancel, reason}}` -
sent by producers to cancel a given subscription.
It is used as a confirmation for client cancellations OR
whenever the producer wants to cancel some upstream demand.
* `{:"$gen_consumer", from :: {producer_pid, subscription_tag}, events :: [event, ...]}` -
events sent by producers to consumers.
`subscription_tag` identifies the subscription. The third argument
is a non-empty list of events. If the subscription is unknown, the
events must be ignored and a cancel message must be sent to the producer.
"""
defstruct [:mod, :state, :type, :dispatcher_mod, :dispatcher_state, :buffer,
:buffer_config, events: :forward, monitors: %{}, producers: %{}, consumers: %{}]
@typedoc "The supported stage types."
@type type :: :producer | :consumer | :producer_consumer
@typedoc "The supported init options."
@type options :: keyword()
@typedoc "The stage."
@type stage :: pid | atom | {:global, term} | {:via, module, term} | {atom, node}
@typedoc "The term that identifies a subscription."
@opaque subscription_tag :: reference
@typedoc "The term that identifies a subscription associated with the corresponding producer/consumer."
@type from :: {pid, subscription_tag}
@doc """
Invoked when the server is started.
`start_link/3` (or `start/3`) will block until this callback returns.
`args` is the argument term (second argument) passed to `start_link/3`
(or `start/3`).
In case of successful start, this callback must return a tuple
where the first element is the stage type, which is one of:
* `:producer`
* `:consumer`
* `:producer_consumer` (if the stage is acting as both)
For example:
def init(args) do
{:producer, some_state}
end
The returned tuple may also contain 3 or 4 elements. The third
element may be the `:hibernate` atom or a set of options defined
below.
Returning `:ignore` will cause `start_link/3` to return `:ignore`
and the process will exit normally without entering the loop or
calling `terminate/2`.
Returning `{:stop, reason}` will cause `start_link/3` to return
`{:error, reason}` and the process to exit with reason `reason`
without entering the loop or calling `terminate/2`.
## Options
This callback may return options. Some options are specific to
the chosen stage type while others are shared across all types.
### `:producer` options
* `:demand` - when `:forward`, the demand is always forwarded to
the `c:handle_demand/2` callback. When `:accumulate`, demand is
accumulated until its mode is set to `:forward` via `demand/2`.
This is useful as a synchronization mechanism, where the demand
is accumulated until all consumers are subscribed. Defaults to
`:forward`.
### `:producer` and `:producer_consumer` options
* `:buffer_size` - the size of the buffer to store events without
demand. Can be `:infinity` to signal no limit on the buffer size. Check
the "Buffer events" section of the module documentation. Defaults to
`10_000` for `:producer`, `:infinity` for `:producer_consumer`.
* `:buffer_keep` - returns whether the `:first` or `:last` entries
should be kept on the buffer in case the buffer size is exceeded.
Defaults to `:last`.
* `:dispatcher` - the dispatcher responsible for handling demands.
Defaults to `GenStage.DemandDispatch`. May be either an atom
representing a dispatcher module or a two-element tuple with
the dispatcher module and the dispatcher options.
### `:consumer` and `:producer_consumer` options
* `:subscribe_to` - a list of producers to subscribe to. Each element
represents either the producer module or a tuple with the producer module
and the subscription options (as defined in `sync_subscribe/2`).
"""
@callback init(args :: term) ::
{type, state} |
{type, state, options} |
:ignore |
{:stop, reason :: any} when state: any
@doc """
Invoked on `:producer` stages.
This callback is invoked on `:producer` stages with the demand from
consumers/dispatcher. The producer that implements this callback must either
store the demand, or return the amount of requested events.
Must always be explicitly implemented by `:producer` stages.
## Examples
def handle_demand(demand, state) do
# We check if we're able to satisfy the demand and fetch
# events if we aren't.
events =
if length(state.events) >= demand do
events
else
fetch_events()
end
# We dispatch only the requested number of events.
{to_dispatch, remaining} = Enum.split(events, demand)
{:noreply, to_dispatch, %{state | events: remaining}}
end
"""
@callback handle_demand(demand :: pos_integer, state :: term) ::
{:noreply, [event], new_state} |
{:noreply, [event], new_state, :hibernate} |
{:stop, reason, new_state} when new_state: term, reason: term, event: term
@doc """
Invoked when a consumer subscribes to a producer.
This callback is invoked in both producers and consumers.
`producer_or_consumer` will be `:producer` when this callback is
invoked on a consumer that subscribed to a producer, and `:consumer`
if when this callback is invoked on producers a consumer subscribed to.
For consumers, successful subscriptions must return one of:
* `{:automatic, new_state}` - means the stage implementation will take care
of automatically sending demand to producers. This is the default.
* `{:manual, state}` - means that demand must be sent to producers
explicitly via `ask/3`. `:manual` subscriptions must be cancelled when
`c:handle_cancel/3` is called. `:manual` can be used when a special
behaviour is desired (for example, `ConsumerSupervisor` uses `:manual`
demand in its implementation).
For producers, successful subscriptions must always return
`{:automatic, new_state}`. `:manual` mode is not supported.
If this callback is not implemented, the default implementation by
`use GenStage` will return `{:automatic, state}`.
## Examples
Let's see an example where we define this callback in a consumer that will use
`:manual` mode. In this case, we'll store the subscription (`from`) in the
state in order to be able to use it later on when asking demand via `ask/3`.
def handle_subscribe(:producer, _options, from, state) do
new_state = %{state | subscription: from}
{:manual, new_state
end
"""
@callback handle_subscribe(producer_or_consumer :: :producer | :consumer, options, from, state :: term) ::
{:automatic | :manual, new_state} |
{:stop, reason, new_state} when new_state: term, reason: term
@doc """
Invoked when a consumer is no longer subscribed to a producer.
It receives the cancellation reason, the `from` tuple representing the
cancelled subscription and the state. The `cancel_reason` will be a
`{:cancel, _}` tuple if the reason for cancellation was a `GenStage.cancel/2`
call. Any other value means the cancellation reason was due to an EXIT.
If this callback is not implemented, the default implementation by
`use GenStage` will return `{:noreply, [], state}`.
Return values are the same as `c:handle_cast/2`.
"""
@callback handle_cancel(cancellation_reason :: {:cancel | :down, reason :: term}, from, state :: term) ::
{:noreply, [event], new_state} |
{:noreply, [event], new_state, :hibernate} |
{:stop, reason, new_state} when event: term, new_state: term, reason: term
@doc """
Invoked on `:producer_consumer` and `:consumer` stages to handle events.
Must always be explicitly implemented by such types.
Return values are the same as `c:handle_cast/2`.
"""
@callback handle_events(events :: [event], from, state :: term) ::
{:noreply, [event], new_state} |
{:noreply, [event], new_state, :hibernate} |
{:stop, reason, new_state} when new_state: term, reason: term, event: term
@doc """
Invoked to handle synchronous `call/3` messages.
`call/3` will block until a reply is received (unless the call times out or
nodes are disconnected).
`request` is the request message sent by a `call/3`, `from` is a two-element tuple
containing the caller's PID and a term that uniquely identifies the call, and
`state` is the current state of the `GenStage`.
Returning `{:reply, reply, [events], new_state}` sends the response `reply`
to the caller after events are dispatched (or buffered) and continues the
loop with new state `new_state`. In case you want to deliver the reply before
processing events, use `reply/2` and return `{:noreply, [event],
state}`.
Returning `{:noreply, [event], new_state}` does not send a response to the
caller and processes the given events before continuing the loop with new
state `new_state`. The response must be sent with `reply/2`.
Hibernating is also supported as an atom to be returned from either
`:reply` and `:noreply` tuples.
Returning `{:stop, reason, reply, new_state}` stops the loop and `terminate/2`
is called with reason `reason` and state `new_state`. Then the `reply` is sent
as the response to the call and the process exits with reason `reason`.
Returning `{:stop, reason, new_state}` is similar to
`{:stop, reason, reply, new_state}` except that no reply is sent to the caller.
If this callback is not implemented, the default implementation by
`use GenStage` will return `{:stop, {:bad_call, request}, state}`.
"""
@callback handle_call(request :: term, from :: GenServer.from, state :: term) ::
{:reply, reply, [event], new_state} |
{:reply, reply, [event], new_state, :hibernate} |
{:noreply, [event], new_state} |
{:noreply, [event], new_state, :hibernate} |
{:stop, reason, reply, new_state} |
{:stop, reason, new_state} when reply: term, new_state: term, reason: term, event: term
@doc """
Invoked to handle asynchronous `cast/2` messages.
`request` is the request message sent by a `cast/2` and `state` is the current
state of the `GenStage`.
Returning `{:noreply, [event], new_state}` dispatches the events and continues
the loop with new state `new_state`.
Returning `{:noreply, [event], new_state, :hibernate}` is similar to
`{:noreply, new_state}` except the process is hibernated before continuing the
loop. See the return values for `c:GenServer.handle_call/3` for more information
on hibernation.
Returning `{:stop, reason, new_state}` stops the loop and `terminate/2` is
called with the reason `reason` and state `new_state`. The process exits with
reason `reason`.
If this callback is not implemented, the default implementation by
`use GenStage` will return `{:stop, {:bad_cast, request}, state}`.
"""
@callback handle_cast(request :: term, state :: term) ::
{:noreply, [event], new_state} |
{:noreply, [event], new_state, :hibernate} |
{:stop, reason :: term, new_state} when new_state: term, event: term
@doc """
Invoked to handle all other messages.
`message` is the message and `state` is the current state of the `GenStage`. When
a timeout occurs the message is `:timeout`.
If this callback is not implemented, the default implementation by
`use GenStage` will return `{:noreply, [], state}`.
Return values are the same as `c:handle_cast/2`.
"""
@callback handle_info(message :: term, state :: term) ::
{:noreply, [event], new_state} |
{:noreply, [event], new_state, :hibernate} |
{:stop, reason :: term, new_state} when new_state: term, event: term
@doc """
The same as `c:GenServer.terminate/2`.
"""
@callback terminate(reason, state :: term) ::
term when reason: :normal | :shutdown | {:shutdown, term} | term
@doc """
The same as `c:GenServer.code_change/3`.
"""
@callback code_change(old_vsn, state :: term, extra :: term) ::
{:ok, new_state :: term} |