defmodule GenStage do @moduledoc ~S""" Stages are data-exchange steps that send and/or receive data from other stages. When a stage sends data, it acts as a producer. When it receives data, it acts as a consumer. Stages may take both producer and consumer roles at once. ## Stage types Besides taking both producer and consumer roles, a stage may be called "source" if it only produces items or called "sink" if it only consumes items. For example, imagine the stages below where A sends data to B that sends data to C: [A] -> [B] -> [C] we conclude that: * A is only a producer (and therefore a source) * B is both producer and consumer * C is only a consumer (and therefore a sink) As we will see in the upcoming Examples section, we must specify the type of the stage when we implement each of them. To start the flow of events, we subscribe consumers to producers. Once the communication channel between them is established, consumers will ask the producers for events. We typically say the consumer is sending demand upstream. Once demand arrives, the producer will emit items, never emitting more items than the consumer asked for. This provides a back-pressure mechanism. A consumer may have multiple producers and a producer may have multiple consumers. When a consumer asks for data, each producer is handled separately, with its own demand. When a producer receives demand and sends data to multiple consumers, the demand is tracked and the events are sent by a dispatcher. This allows producers to send data using different "strategies". See `GenStage.Dispatcher` for more information. Many developers tend to create layers of stages, such as A, B and C, for achieving concurrency. If all you want is concurrency, using processes is enough. They are the primitive for achieving concurrency in Elixir and the VM does all of the work of multiplexing them. Instead, layers in GenStage must be created when there is a need for back-pressure or to route the data in different ways. For example, if you need the data to go over multiple steps but without a need for back-pressure or without a need to break the data apart, do not design it as such: [Producer] -> [Step 1] -> [Step 2] -> [Step 3] Instead it is better to design it as: [Consumer] / [Producer]-<-[Consumer] \ [Consumer] where "Consumer" are multiple processes that subscribe to the same "Producer" and run exactly the same code, with all of transformation steps from above. In such scenarios, you may even find the `Task.async_stream/2` function that ships as part of Elixir to be enough or achieve the flexibility you need with the `ConsumerSupervisor` functionality that is included as part of `GenStage`. ## Example Let's define the simple pipeline below: [A] -> [B] -> [C] where A is a producer that will emit items starting from 0, B is a producer-consumer that will receive those items and multiply them by a given number and C will receive those events and print them to the terminal. Let's start with A. Since A is a producer, its main responsibility is to receive demand and generate events. Those events may be in memory or an external queue system. For simplicity, let's implement a simple counter starting from a given value of `counter` received on `init/1`: defmodule A do use GenStage def start_link(number) do GenStage.start_link(A, number) end def init(counter) do {:producer, counter} end def handle_demand(demand, counter) when demand > 0 do # If the counter is 3 and we ask for 2 items, we will # emit the items 3 and 4, and set the state to 5. events = Enum.to_list(counter..counter+demand-1) {:noreply, events, counter + demand} end end B is a producer-consumer. This means it does not explicitly handle the demand because the demand is always forwarded to its producer. Once A receives the demand from B, it will send events to B which will be transformed by B as desired. In our case, B will receive events and multiply them by a number given on initialization and stored as the state: defmodule B do use GenStage def start_link(number) do GenStage.start_link(B, number) end def init(number) do {:producer_consumer, number} end def handle_events(events, _from, number) do events = Enum.map(events, & &1 * number) {:noreply, events, number} end end C will finally receive those events and print them every second to the terminal: defmodule C do use GenStage def start_link() do GenStage.start_link(C, :ok) end def init(:ok) do {:consumer, :the_state_does_not_matter} end def handle_events(events, _from, state) do # Wait for a second. Process.sleep(1000) # Inspect the events. IO.inspect(events) # We are a consumer, so we would never emit items. {:noreply, [], state} end end Now we can start and connect them: {:ok, a} = A.start_link(0) # starting from zero {:ok, b} = B.start_link(2) # multiply by 2 {:ok, c} = C.start_link() # state does not matter GenStage.sync_subscribe(c, to: b) GenStage.sync_subscribe(b, to: a) Typically, we subscribe from bottom to top. Since A will start producing items only when B connects to it, we want this subscription to happen when the whole pipeline is ready. After you subscribe all of them, demand will start flowing upstream and events downstream. When implementing consumers, we often set the `:max_demand` and `:min_demand` on subscription. The `:max_demand` specifies the maximum amount of events that must be in flow while the `:min_demand` specifies the minimum threshold to trigger for more demand. For example, if `:max_demand` is 1000 and `:min_demand` is 750, the consumer will ask for 1000 events initially and ask for more only after it receives at least 250. In the example above, B is a `:producer_consumer` and therefore acts as a buffer. Getting the proper demand values in B is important: making the buffer too small may make the whole pipeline slower, making the buffer too big may unnecessarily consume memory. When such values are applied to the stages above, it is easy to see the producer works in batches. The producer A ends-up emitting batches of 50 items which will take approximately 50 seconds to be consumed by C, which will then request another batch of 50 items. ## `init` and `:subscribe_to` In the example above, we have started the processes A, B, and C independently and subscribed them later on. But most often it is simpler to subscribe a consumer to its producer on its `c:init/1` callback. This way, if the consumer crashes, restarting the consumer will automatically re-invoke its `c:init/1` callback and resubscribe it to the supervisor. This approach works as long as the producer can be referenced when the consumer starts--such as by name (for a named process) or by pid for a running unnamed process. For example, assuming the process `A` and `B` are started as follows: # Let's call the stage in module A as A GenStage.start_link(A, 0, name: A) # Let's call the stage in module B as B GenStage.start_link(B, 2, name: B) # No need to name consumers as they won't be subscribed to GenStage.start_link(C, :ok) We can now change the `c:init/1` callback for C to the following: def init(:ok) do {:consumer, :the_state_does_not_matter, subscribe_to: [B]} end or: def init(:ok) do {:consumer, :the_state_does_not_matter, subscribe_to: [{B, options}]} end And we will no longer need to call `sync_subscribe/2`. Another advantage of this approach is that it makes it straight-forward to leverage concurrency by simply starting multiple consumers that subscribe to their producer (or producer-consumer). This can be done in the example above by simply calling start link multiple times: # Start 4 consumers GenStage.start_link(C, :ok) GenStage.start_link(C, :ok) GenStage.start_link(C, :ok) GenStage.start_link(C, :ok) In a supervision tree, this is often done by starting multiple workers: children = [ worker(A, [0]), worker(B, [2]), worker(C, []), worker(C, []), worker(C, []), worker(C, []) ] Supervisor.start_link(children, strategy: :one_for_one) In fact, having multiple consumers is often the easiest and simplest way to leverage concurrency in a GenStage pipeline, especially if events can be processed out of order. For example, imagine a scenario where you have a stream of incoming events and you need to access a number of external services per event. Instead of building complex stages that route events through those services, one simple mechanism to leverage concurrency is to start a producer and N consumers and invoke the external services directly for each event in each consumer. N is typically the number of cores (as returned by `System.schedulers_online/0`) but can likely be increased if the consumers are mostly waiting on IO. Another alternative to the scenario above is to use a `ConsumerSupervisor` for consuming the events instead of N consumers. The `ConsumerSupervisor` will start a separate supervised process per event where the number of children is at most `max_demand` and the average amount of children is `(max_demand - min_demand) / 2`. ## Buffering In many situations, producers may attempt to emit events while no consumers have yet subscribed. Similarly, consumers may ask producers for events that are not yet available. In such cases, it is necessary for producers to buffer events until a consumer is available or buffer the consumer demand until events arrive, respectively. As we will see next, buffering events can be done automatically by `GenStage`, while buffering the demand is a case that must be explicitly considered by developers implementing producers. ### Buffering events Due to the concurrent nature of Elixir software, sometimes a producer may dispatch events without consumers to send those events to. For example, imagine a `:consumer` B subscribes to `:producer` A. Next, the consumer B sends demand to A, which starts producing events to satisfy the demand. Now, if the consumer B crashes, the producer may attempt to dispatch the now produced events but it no longer has a consumer to send those events to. In such cases, the producer will automatically buffer the events until another consumer subscribes. The buffer can also be used in cases where external sources only send events in batches larger than asked for. For example, if you are receiving events from an external source that only sends events in batches of 1000 and the internal demand is smaller than that, the buffer allows you to always emit batches of 1000 events even when the consumer has asked for less. In all of those cases when an event cannot be sent immediately by a producer, the event will be automatically stored and sent the next time consumers ask for events. The size of the buffer is configured via the `:buffer_size` option returned by `init/1` and the default value is `10_000`. If the `buffer_size` is exceeded, an error is logged. See the documentation for `c:init/1` for more detailed infromation about the `:buffer_size` option. ### Buffering demand In case consumers send demand and the producer is not yet ready to fill in the demand, producers must buffer the demand until data arrives. As an example, let's implement a producer that broadcasts messages to consumers. For producers, we need to consider two scenarios: 1. what if events arrive and there are no consumers? 2. what if consumers send demand and there are not enough events? One way to implement such a broadcaster is to simply rely on the internal buffer available in `GenStage`, dispatching events as they arrive, as explained in the previous section: defmodule Broadcaster do use GenStage @doc "Starts the broadcaster." def start_link() do GenStage.start_link(__MODULE__, :ok, name: __MODULE__) end @doc "Sends an event and returns only after the event is dispatched." def sync_notify(event, timeout \\ 5000) do GenStage.call(__MODULE__, {:notify, event}, timeout) end def init(:ok) do {:producer, :ok, dispatcher: GenStage.BroadcastDispatcher} end def handle_call({:notify, event}, _from, state) do {:reply, :ok, [event], state} # Dispatch immediately end def handle_demand(_demand, state) do {:noreply, [], state} # We don't care about the demand end end By always sending events as soon as they arrive, if there is any demand, we will serve the existing demand, otherwise the event will be queued in `GenStage`'s internal buffer. In case events are being queued and not being consumed, a log message will be emitted when we exceed the `:buffer_size` configuration. While the implementation above is enough to solve the constraints above, a more robust implementation would have tighter control over the events and demand by tracking this data locally, leaving the `GenStage` internal buffer only for cases where consumers crash without consuming all data. To handle such cases, we will use a two-element tuple as the broadcaster state where the first elemeent is a queue and the second element is the pending demand. When events arrive and there are no consumers, we will store the event in the queue alongside information about the process that broadcasted the event. When consumers send demand and there are not enough events, we will increase the pending demand. Once we have both data and demand, we acknowledge the process that has sent the event to the broadcaster and finally broadcast the event downstream. defmodule QueueBroadcaster do use GenStage @doc "Starts the broadcaster." def start_link() do GenStage.start_link(__MODULE__, :ok, name: __MODULE__) end @doc "Sends an event and returns only after the event is dispatched." def sync_notify(event, timeout \\ 5000) do GenStage.call(__MODULE__, {:notify, event}, timeout) end ## Callbacks def init(:ok) do {:producer, {:queue.new, 0}, dispatcher: GenStage.BroadcastDispatcher} end def handle_call({:notify, event}, from, {queue, pending_demand}) do queue = :queue.in({from, event}, queue) dispatch_events(queue, pending_demand, []) end def handle_demand(incoming_demand, {queue, pending_demand}) do dispatch_events(queue, incoming_demand + pending_demand, []) end defp dispatch_events(queue, 0, events) do {:noreply, Enum.reverse(events), {queue, 0}} end defp dispatch_events(queue, demand, events) do case :queue.out(queue) do {{:value, {from, event}}, queue} -> GenStage.reply(from, :ok) dispatch_events(queue, demand - 1, [event | events]) {:empty, queue} -> {:noreply, Enum.reverse(events), {queue, demand}} end end end Let's also implement a consumer that automatically subscribes to the broadcaster on `c:init/1`. The advantage of doing so on initialization is that, if the consumer crashes while it is supervised, the subscription is automatically re-established when the supervisor restarts it. defmodule Printer do use GenStage @doc "Starts the consumer." def start_link() do GenStage.start_link(__MODULE__, :ok) end def init(:ok) do # Starts a permanent subscription to the broadcaster # which will automatically start requesting items. {:consumer, :ok, subscribe_to: [QueueBroadcaster]} end def handle_events(events, _from, state) do for event <- events do IO.inspect {self(), event} end {:noreply, [], state} end end With the broadcaster in hand, now let's start the producer as well as multiple consumers: # Start the producer QueueBroadcaster.start_link() # Start multiple consumers Printer.start_link() Printer.start_link() Printer.start_link() Printer.start_link() At this point, all consumers must have sent their demand which we were not able to fulfill. Now by calling `QueueBroadcaster.sync_notify/1`, the event shall be broadcasted to all consumers at once as we have buffered the demand in the producer: QueueBroadcaster.sync_notify(:hello_world) If we had called `QueueBroadcaster.sync_notify(:hello_world)` before any consumer was available, the event would also have been buffered in our own queue and served only when demand had been received. By having control over the demand and queue, the broadcaster has full control on how to behave when there are no consumers, when the queue grows too large, and so forth. ## Asynchronous work and `handle_subscribe` Both `:producer_consumer` and `:consumer` stages have been designed to do their work in the `c:handle_events/3` callback. This means that, after `c:handle_events/3` is invoked, both `:producer_consumer` and `:consumer` stages will immediately send demand upstream and ask for more items, as the stage that produced the events assumes events have been fully processed by `c:handle_events/3`. Such default behaviour makes `:producer_consumer` and `:consumer` stages unfeasible for doing asynchronous work. However, given `GenStage` was designed to run with multiple consumers, it is not a problem to perform synchronous or blocking actions inside `handle_events/3` as you can then start multiple consumers in order to max both CPU and IO usage as necessary. On the other hand, if you must perform some work asynchronously, `GenStage` comes with an option that manually controls how demand is sent upstream, avoiding the default behaviour where demand is sent after `c:handle_events/3`. Such can be done by implementing the `c:handle_subscribe/4` callback and returning `{:manual, state}` instead of the default `{:automatic, state}`. Once the producer mode is set to `:manual`, developers must use `GenStage.ask/3` to send demand upstream when necessary. Note that when `:max_demand` and `:min_demand` must be manually respected when manually asking for demand through `GenStage.ask/3`. For example, the `ConsumerSupervisor` module processes events asynchronously by starting a process for each event and this is achieved by manually sending demand to producers. `ConsumerSupervisor` can be used to distribute work to a limited amount of processes, behaving similar to a pool where a new process is started for each event. See the `ConsumerSupervisor` docs for more information. Setting the demand to `:manual` in `c:handle_subscribe/4` is not only useful for asynchronous work but also for setting up other mechanisms for back-pressure. As an example, let's implement a consumer that is allowed to process a limited number of events per time interval. Those are often called rate limiters: defmodule RateLimiter do use GenStage def init(_) do # Our state will keep all producers and their pending demand {:consumer, %{}} end def handle_subscribe(:producer, opts, from, producers) do # We will only allow max_demand events every 5000 milliseconds pending = opts[:max_demand] || 1000 interval = opts[:interval] || 5000 # Register the producer in the state producers = Map.put(producers, from, {pending, interval}) # Ask for the pending events and schedule the next time around producers = ask_and_schedule(producers, from) # Returns manual as we want control over the demand {:manual, producers} end def handle_cancel(_, from, producers) do # Remove the producers from the map on unsubscribe {:noreply, [], Map.delete(producers, from)} end def handle_events(events, from, producers) do # Bump the amount of pending events for the given producer producers = Map.update!(producers, from, fn {pending, interval} -> {pending + length(events), interval} end) # Consume the events by printing them. IO.inspect(events) # A producer_consumer would return the processed events here. {:noreply, [], producers} end def handle_info({:ask, from}, producers) do # This callback is invoked by the Process.send_after/3 message below. {:noreply, [], ask_and_schedule(producers, from)} end defp ask_and_schedule(producers, from) do case producers do %{^from => {pending, interval}} -> # Ask for any pending events GenStage.ask(from, pending) # And let's check again after interval Process.send_after(self(), {:ask, from}, interval) # Finally, reset pending events to 0 Map.put(producers, from, {0, interval}) %{} -> producers end end end Let's subscribe the `RateLimiter` above to the producer we have implemented at the beginning of the module documentation: {:ok, a} = GenStage.start_link(A, 0) {:ok, b} = GenStage.start_link(RateLimiter, :ok) # Ask for 10 items every 2 seconds GenStage.sync_subscribe(b, to: a, max_demand: 10, interval: 2000) Although the rate limiter above is a consumer, it could be made a producer-consumer by changing `c:init/1` to return a `:producer_consumer` and then forwarding the events in `c:handle_events/3`. ## Notifications `GenStage` also supports the ability to send notifications to all consumers. Those notifications are sent as regular messages outside of the demand-driven protocol but respecting the event ordering. See `sync_notify/3` and `async_notify/2`. Notifications are useful for out-of-band information, for example, to notify consumers the producer has sent all events it had to process or that a new batch of events is starting. Note the notification system should not be used for broadcasting events; for such, consider using `GenStage.BroadcastDispatcher`. ## Callbacks `GenStage` is implemented on top of a `GenServer` with a few additions. Besides exposing all of the `GenServer` callbacks, it also provides `handle_demand/2` to be implemented by producers and `handle_events/3` to be implemented by consumers, as shown above, as well as subscription-related callbacks. Furthermore, all the callback responses have been modified to potentially emit events. See the callbacks documentation for more information. By adding `use GenStage` to your module, Elixir will automatically define all callbacks for you except for the following ones: * `init/1` - must be implemented to choose between `:producer`, `:consumer`, or `:producer_consumer` stages * `handle_demand/2` - must be implemented by `:producer` stages * `handle_events/3` - must be implemented by `:producer_consumer` and `:consumer` stages Although this module exposes functions similar to the ones found in the `GenServer` API, like `call/3` and `cast/2`, developers can also rely directly on GenServer functions such as `GenServer.multi_call/4` and `GenServer.abcast/3` if they wish to. ### Name registration `GenStage` is bound to the same name registration rules as a `GenServer`. Read more about it in the `GenServer` docs. ## Message protocol overview This section will describe the message protocol implemented by stages. By documenting these messages, we will allow developers to provide their own stage implementations. ### Back-pressure When data is sent between stages, it is done by a message protocol that provides back-pressure. The first step is for the consumer to subscribe to the producer. Each subscription has a unique reference. Once subscribed, the consumer may ask the producer for messages for the given subscription. The consumer may demand more items whenever it wants to. A consumer must never receive more data than it has asked for from any given producer stage. A consumer may have multiple producers, where each demand is managed individually (on a per-subscription basis). A producer may have multiple consumers, where the demand and events are managed and delivered according to a `GenStage.Dispatcher` implementation. ### Producer messages The producer is responsible for sending events to consumers based on demand. These are the messages that consumers can send to producers: * `{:"$gen_producer", from :: {consumer_pid, subscription_tag}, {:subscribe, current, options}}` - sent by the consumer to the producer to start a new subscription. Before sending, the consumer MUST monitor the producer for clean-up purposes in case of crashes. The `subscription_tag` is unique to identify the subscription. It is typically the subscriber monitoring reference although it may be any term. Once sent, the consumer MAY immediately send demand to the producer. The `current` field, when not `nil`, is a two-item tuple containing a subscription that must be cancelled with the given reason before the current one is accepted. Once received, the producer MUST monitor the consumer. However, if the subscription reference is known, it MUST send a `:cancel` message to the consumer instead of monitoring and accepting the subscription. * `{:"$gen_producer", from :: {consumer_pid, subscription_tag}, {:cancel, reason}}` - sent by the consumer to cancel a given subscription. Once received, the producer MUST send a `:cancel` reply to the registered consumer (which may not necessarily be the one received in the tuple above). Keep in mind, however, there is no guarantee such messages can be delivered in case the producer crashes before. If the pair is unknown, the producer MUST send an appropriate cancel reply. * `{:"$gen_producer", from :: {consumer_pid, subscription_tag}, {:ask, demand}}` - sent by consumers to ask demand for a given subscription (identified by `subscription_tag`). Once received, the producer MUST send data up to the demand. If the pair is unknown, the producer MUST send an appropriate cancel reply. ### Consumer messages The consumer is responsible for starting the subscription and sending demand to producers. These are the messages that producers can send to consumers: * `{:"$gen_consumer", from :: {producer_pid, subscription_tag}, {:notification, message}}` - notifications sent by producers. * `{:"$gen_consumer", from :: {producer_pid, subscription_tag}, {:cancel, reason}}` - sent by producers to cancel a given subscription. It is used as a confirmation for client cancellations OR whenever the producer wants to cancel some upstream demand. * `{:"$gen_consumer", from :: {producer_pid, subscription_tag}, events :: [event, ...]}` - events sent by producers to consumers. `subscription_tag` identifies the subscription. The third argument is a non-empty list of events. If the subscription is unknown, the events must be ignored and a cancel message must be sent to the producer. """ defstruct [:mod, :state, :type, :dispatcher_mod, :dispatcher_state, :buffer, :buffer_config, events: :forward, monitors: %{}, producers: %{}, consumers: %{}] @typedoc "The supported stage types." @type type :: :producer | :consumer | :producer_consumer @typedoc "The supported init options." @type options :: keyword() @typedoc "The stage." @type stage :: pid | atom | {:global, term} | {:via, module, term} | {atom, node} @typedoc "The term that identifies a subscription." @opaque subscription_tag :: reference @typedoc "The term that identifies a subscription associated with the corresponding producer/consumer." @type from :: {pid, subscription_tag} @doc """ Invoked when the server is started. `start_link/3` (or `start/3`) will block until this callback returns. `args` is the argument term (second argument) passed to `start_link/3` (or `start/3`). In case of successful start, this callback must return a tuple where the first element is the stage type, which is one of: * `:producer` * `:consumer` * `:producer_consumer` (if the stage is acting as both) For example: def init(args) do {:producer, some_state} end The returned tuple may also contain 3 or 4 elements. The third element may be the `:hibernate` atom or a set of options defined below. Returning `:ignore` will cause `start_link/3` to return `:ignore` and the process will exit normally without entering the loop or calling `terminate/2`. Returning `{:stop, reason}` will cause `start_link/3` to return `{:error, reason}` and the process to exit with reason `reason` without entering the loop or calling `terminate/2`. ## Options This callback may return options. Some options are specific to the chosen stage type while others are shared across all types. ### `:producer` options * `:demand` - when `:forward`, the demand is always forwarded to the `c:handle_demand/2` callback. When `:accumulate`, demand is accumulated until its mode is set to `:forward` via `demand/2`. This is useful as a synchronization mechanism, where the demand is accumulated until all consumers are subscribed. Defaults to `:forward`. ### `:producer` and `:producer_consumer` options * `:buffer_size` - the size of the buffer to store events without demand. Can be `:infinity` to signal no limit on the buffer size. Check the "Buffer events" section of the module documentation. Defaults to `10_000` for `:producer`, `:infinity` for `:producer_consumer`. * `:buffer_keep` - returns whether the `:first` or `:last` entries should be kept on the buffer in case the buffer size is exceeded. Defaults to `:last`. * `:dispatcher` - the dispatcher responsible for handling demands. Defaults to `GenStage.DemandDispatch`. May be either an atom representing a dispatcher module or a two-element tuple with the dispatcher module and the dispatcher options. ### `:consumer` and `:producer_consumer` options * `:subscribe_to` - a list of producers to subscribe to. Each element represents either the producer module or a tuple with the producer module and the subscription options (as defined in `sync_subscribe/2`). """ @callback init(args :: term) :: {type, state} | {type, state, options} | :ignore | {:stop, reason :: any} when state: any @doc """ Invoked on `:producer` stages. This callback is invoked on `:producer` stages with the demand from consumers/dispatcher. The producer that implements this callback must either store the demand, or return the amount of requested events. Must always be explicitly implemented by `:producer` stages. ## Examples def handle_demand(demand, state) do # We check if we're able to satisfy the demand and fetch # events if we aren't. events = if length(state.events) >= demand do events else fetch_events() end # We dispatch only the requested number of events. {to_dispatch, remaining} = Enum.split(events, demand) {:noreply, to_dispatch, %{state | events: remaining}} end """ @callback handle_demand(demand :: pos_integer, state :: term) :: {:noreply, [event], new_state} | {:noreply, [event], new_state, :hibernate} | {:stop, reason, new_state} when new_state: term, reason: term, event: term @doc """ Invoked when a consumer subscribes to a producer. This callback is invoked in both producers and consumers. `producer_or_consumer` will be `:producer` when this callback is invoked on a consumer that subscribed to a producer, and `:consumer` if when this callback is invoked on producers a consumer subscribed to. For consumers, successful subscriptions must return one of: * `{:automatic, new_state}` - means the stage implementation will take care of automatically sending demand to producers. This is the default. * `{:manual, state}` - means that demand must be sent to producers explicitly via `ask/3`. `:manual` subscriptions must be cancelled when `c:handle_cancel/3` is called. `:manual` can be used when a special behaviour is desired (for example, `ConsumerSupervisor` uses `:manual` demand in its implementation). For producers, successful subscriptions must always return `{:automatic, new_state}`. `:manual` mode is not supported. If this callback is not implemented, the default implementation by `use GenStage` will return `{:automatic, state}`. ## Examples Let's see an example where we define this callback in a consumer that will use `:manual` mode. In this case, we'll store the subscription (`from`) in the state in order to be able to use it later on when asking demand via `ask/3`. def handle_subscribe(:producer, _options, from, state) do new_state = %{state | subscription: from} {:manual, new_state end """ @callback handle_subscribe(producer_or_consumer :: :producer | :consumer, options, from, state :: term) :: {:automatic | :manual, new_state} | {:stop, reason, new_state} when new_state: term, reason: term @doc """ Invoked when a consumer is no longer subscribed to a producer. It receives the cancellation reason, the `from` tuple representing the cancelled subscription and the state. The `cancel_reason` will be a `{:cancel, _}` tuple if the reason for cancellation was a `GenStage.cancel/2` call. Any other value means the cancellation reason was due to an EXIT. If this callback is not implemented, the default implementation by `use GenStage` will return `{:noreply, [], state}`. Return values are the same as `c:handle_cast/2`. """ @callback handle_cancel(cancellation_reason :: {:cancel | :down, reason :: term}, from, state :: term) :: {:noreply, [event], new_state} | {:noreply, [event], new_state, :hibernate} | {:stop, reason, new_state} when event: term, new_state: term, reason: term @doc """ Invoked on `:producer_consumer` and `:consumer` stages to handle events. Must always be explicitly implemented by such types. Return values are the same as `c:handle_cast/2`. """ @callback handle_events(events :: [event], from, state :: term) :: {:noreply, [event], new_state} | {:noreply, [event], new_state, :hibernate} | {:stop, reason, new_state} when new_state: term, reason: term, event: term @doc """ Invoked to handle synchronous `call/3` messages. `call/3` will block until a reply is received (unless the call times out or nodes are disconnected). `request` is the request message sent by a `call/3`, `from` is a two-element tuple containing the caller's PID and a term that uniquely identifies the call, and `state` is the current state of the `GenStage`. Returning `{:reply, reply, [events], new_state}` sends the response `reply` to the caller after events are dispatched (or buffered) and continues the loop with new state `new_state`. In case you want to deliver the reply before processing events, use `reply/2` and return `{:noreply, [event], state}`. Returning `{:noreply, [event], new_state}` does not send a response to the caller and processes the given events before continuing the loop with new state `new_state`. The response must be sent with `reply/2`. Hibernating is also supported as an atom to be returned from either `:reply` and `:noreply` tuples. Returning `{:stop, reason, reply, new_state}` stops the loop and `terminate/2` is called with reason `reason` and state `new_state`. Then the `reply` is sent as the response to the call and the process exits with reason `reason`. Returning `{:stop, reason, new_state}` is similar to `{:stop, reason, reply, new_state}` except that no reply is sent to the caller. If this callback is not implemented, the default implementation by `use GenStage` will return `{:stop, {:bad_call, request}, state}`. """ @callback handle_call(request :: term, from :: GenServer.from, state :: term) :: {:reply, reply, [event], new_state} | {:reply, reply, [event], new_state, :hibernate} | {:noreply, [event], new_state} | {:noreply, [event], new_state, :hibernate} | {:stop, reason, reply, new_state} | {:stop, reason, new_state} when reply: term, new_state: term, reason: term, event: term @doc """ Invoked to handle asynchronous `cast/2` messages. `request` is the request message sent by a `cast/2` and `state` is the current state of the `GenStage`. Returning `{:noreply, [event], new_state}` dispatches the events and continues the loop with new state `new_state`. Returning `{:noreply, [event], new_state, :hibernate}` is similar to `{:noreply, new_state}` except the process is hibernated before continuing the loop. See the return values for `c:GenServer.handle_call/3` for more information on hibernation. Returning `{:stop, reason, new_state}` stops the loop and `terminate/2` is called with the reason `reason` and state `new_state`. The process exits with reason `reason`. If this callback is not implemented, the default implementation by `use GenStage` will return `{:stop, {:bad_cast, request}, state}`. """ @callback handle_cast(request :: term, state :: term) :: {:noreply, [event], new_state} | {:noreply, [event], new_state, :hibernate} | {:stop, reason :: term, new_state} when new_state: term, event: term @doc """ Invoked to handle all other messages. `message` is the message and `state` is the current state of the `GenStage`. When a timeout occurs the message is `:timeout`. If this callback is not implemented, the default implementation by `use GenStage` will return `{:noreply, [], state}`. Return values are the same as `c:handle_cast/2`. """ @callback handle_info(message :: term, state :: term) :: {:noreply, [event], new_state} | {:noreply, [event], new_state, :hibernate} | {:stop, reason :: term, new_state} when new_state: term, event: term @doc """ The same as `c:GenServer.terminate/2`. """ @callback terminate(reason, state :: term) :: term when reason: :normal | :shutdown | {:shutdown, term} | term @doc """ The same as `c:GenServer.code_change/3`. """ @callback code_change(old_vsn, state :: term, extra :: term) :: {:ok, new_state :: term} |