EsRedisPipeline
Description
Represents a Redis pipeline for executing multiple commands in a single network round-trip. This class is central to improving performance for bulk operations. It inherits from the abstract superclass <EsRedisCommandBatcher>, sharing a common structure with <EsRedisTransaction>
but differing in its execution semantics (performance vs. atomicity).
You don't create instances of <EsRedisPipeline> directly. Instead, an instance is created for you when you send the EsRedisClient>>pipeline: message. This pipeline object is then passed into your code block.
The pipeline works by intercepting command group messages (like
strings, hashes, etc.) and creating temporary, rewired command groups . When you send a command to one of these groups (e.g., pipe strings set: 'k' value: 'v'), the command is queued within the pipeline instead of being sent to the server immediately. When your block finishes, the client automatically executes all queued commands in a single batch.
Nested Operations
Pipelines can contain transactions. When a transaction is initiated within a pipeline, all of its commands (MULTI, the commands in the block, and EXEC) are queued into the pipeline's command batch. This allows for an atomic sequence of operations to be executed efficiently as part of a larger batch of commands. Nesting a pipeline within another pipeline is also supported, though less common.
The pipeline behaves differently depending on whether the client is in synchronous or asynchronous mode.
Asynchronous Pipelining
When the client is in asynchronous mode (isAsync: true), the pipelining behavior is enhanced:
-
Each command you send within the pipeline: block immediately returns an individual <EsFuture> that will be completed with that specific command's result.
-
The pipeline: message itself returns a "master" <EsFuture>. This master future is completed with an array of
all the results from the pipeline, but only after all the individual commands have finished.
This dual-future mechanism provides flexibility, allowing you to coordinate actions based on the completion of individual commands or wait for the entire batch to finish.
Instance Variables
-
client - <EsRedisClient> A reference to the client that created this pipeline.
-
commandCount - <Integer> The number of commands that have been queued.
-
replyConverters - <OrderedCollection> A collection of the reply converter classes for each queued command.
-
masterPromise <EsPromise>: The promise associated with the entire pipeline operation. It is completed with an
array of all results when the pipeline finishes.
-
commandPromises <OrderedCollection>: A collection of EsPromise instances, where each promise corresponds to a single command queued in the pipeline. This allows each command
in an async pipeline to return its own future.
Example
Synchronous Example
| results |
results := client pipeline: [:pipe |
pipe strings set: 'key1' value: 'value1'.
pipe strings set: 'key2' value: 'value2'.
pipe lists lpush: 'a_list' values: #('item1').
pipe strings get: 'key1'.
"Commands are sent in a batch when the block returns"
].
"results => #('OK' 'OK' 1 'value1')"
Asynchronous Example
| fSet fGet fList fPipeline |
client isAsync: true.
"The pipeline: message returns a master future for all results."
fPipeline := client pipeline: [:pipe |
"Each command returns its own future."
fSet := pipe strings set: 'key1' value: 'value1'.
fList := pipe lists lpush: 'a_list' values: #('item1').
fGet := pipe strings get: 'key1'.
].
"You can wait on individual futures."
self assert: fSet waitFor equals: 'OK'.
self assert: fGet waitFor equals: 'value1'.
"Or wait on the master future to get all results at once."
self assert: fPipeline waitFor equals: #('OK' 1 'value1').
Nested Transaction in Pipeline Example
| results txResults |
results := self client pipeline: [:pipe |
pipe strings set: 'before' value: '1'.
pipe transaction: [:tx |
tx strings set: 'tx_key' value: 10.
tx strings incr: 'tx_key'.
].
pipe strings set: 'after' value: '2'.
].
"The result of the EXEC command is an array at index 5"
txResults := results at: 5.
self assert: [txResults = #('OK' 11)].
Class Methods
None
Instance Methods
pipeline:
Delegation - Execute a block of commands in a pipeline. The block receives an
EsRedisPipeline instance on which to send commands. This method now
delegates the entire execution flow to the pipeline object itself.
For nested pipelines, this is a no-op that simply executes the block against
the current pipeline instance.
Arguments:
aBlock - <Block> A one-argument block that receives the <EsRedisPipeline> instance.
Answers:
<Array> The array of replies in synchronous mode.
<EsFuture> In async mode, a future that completes with the array of replies.
Last modified date: 01/22/2026