EsRedisClient
Description
I am a dedicated client for handling Redis's Publish/Subscribe (Pub/Sub) functionality. I wrap a standard EsRedisClient to manage the stateful, persistent connection required for receiving messages. When I am created, I put the underlying EsRedisClient into a special 'subscriber mode' where only Pub/Sub-related commands are allowed, ensuring correct usage.
My primary role is to manage the lifecycle of subscriptions and to listen for incoming messages from the Redis server in a background process. When a message arrives, I dispatch it to the appropriate EsRedisSubscription object, which in turn delivers it to the user via an asynchronous EsStream.
Public API and Key Responsibilities
-
Creating a Pub/Sub client from an existing EsRedisClient using EsRedisPubSubClient on: aClient, or more commonly, via EsRedisClient>>asPubSubClient.
-
Subscribing to channels and patterns with subscribe: and psubscribe:, which return an EsRedisSubscription object.
-
Managing a non-blocking listener process that uses select() to efficiently wait for messages from Redis or internal shutdown signals.
-
Handling both RESP2 (Array-based) and RESP3 (Push-based) pub/sub messages transparently.
-
Ensuring thread-safe management of subscriptions.
Usage Example
| redisClient pubSubClient subscription |
redisClient := EsRedisClient host: '127.0.0.1' port: 6379.
redisClient connect.
pubSubClient := redisClient asPubSubClient.
subscription := pubSubClient subscribe: 'news-channel'.
subscription stream listen: [:message | Transcript show: message payload].
" ... publish messages from another client ... "
subscription unsubscribe.
pubSubClient disconnect. "This also returns the original client to normal mode"
Collaborators
-
EsRedisClient: The underlying client used to send commands and access the network socket. It is placed into a special mode while wrapped by me.
-
EsRedisSubscription: The object returned to the user that provides the EsStream of messages.
-
SciSocketManager: Used by the internal listener process for non-blocking I/O.
Instance Variables
-
redisClient - <EsRedisClient> The underlying standard client for communication.
-
subscriptions - <Dictionary> A map of channel/pattern names to their EsRedisSubscription objects.
-
accessLock - <Semaphore> A lock to ensure thread-safe access to the subscriptions dictionary and listener process state.
-
listenerProcess - <Process> The background process that actively listens for incoming messages.
-
controlPipe - <Dictionary> A pair of sockets used to signal the listener process to shut down gracefully.
Class Methods
host:port:
Answer a new, configured instance of a EsRedisClient for a TCP connection.
This is a convenience method. For more options, use #withOptions:.
Arguments:
aString - <String> The hostname or IP address of the Redis server.
anInteger - <Integer> The port number of the Redis server.
Answers:
<EsRedisClient> A new, un-connected client instance.
onSocket:
Answer a new EsRedisClient instance that wraps an existing, connected SciSocket.
The client will not own the socket, meaning it will not close it on disconnect.
Arguments:
anSciSocket - <SciSocket> A connected socket instance.
Answers:
<EsRedisClient> A new, connected client instance.
withOptions:
Answer a new, configured instance of a EsRedisClient.
This is the preferred way to create a client.
Arguments:
aEsRedisConnectionOptions - <EsRedisConnectionOptions> The options for the connection.
Answers:
<EsRedisClient> A new, un-connected client instance.
Instance Methods
asPubSubClient
Answers a new EsRedisPubSubClient that wraps the receiver.
This puts the receiver into a special 'subscriber mode' where only
Pub/Sub-related commands are allowed. The client is returned to its
normal state when the pub/sub client is disconnected.
Answers:
<EsRedisPubSubClient> A new pub/sub client wrapping the receiver.
bitmaps
Answer the command group object for Redis Bitmap commands.
Answers:
<EsRedisBitmapCommands>
command:
Private - Execute a raw command. This is the central point for command execution.
This method sends a generic container (Object), allowing the reply handling
logic in OSHiredisReply to determine the correct final container based on
the context (e.g., a temporary override, an explicit request, or the
client's default for strings).
Arguments:
aStringOrSequenceableCollection - <String | SequenceableCollection>
Answers:
<Object | EsFuture>
command:as:
Execute a raw command, specifying the reply container.
This is the central point for command execution.
Arguments:
aStringOrSequenceableCollection - <String | SequenceableCollection>
aContainerClass - <Class> The desired container for the reply.
Answers:
<Object | EsFuture>
connect
Establish a connection to the Redis server.
If a socket was provided at creation time, this method verifies the connection.
Otherwise, it creates a new connection using the provided options.
If in async mode, answers a future that completes when connected.
Example:
client connect.
self assert: [client isConnected].
Answers:
<EsRedisClient> The receiver in sync mode.
<EsFuture> A future that completes with the receiver in async mode.
Raises:
<EsRedisConnectionException> If the connection fails in sync mode.
connectionOptions
Answer the connection options for this client
Answers:
<EsRedisConnectionOptions>
defaultStringReplyContainer
Answer the default container class (<Class>) used for string replies when one
is not explicitly specified in the command. Defaults to String.
Other common options include ByteArray or UnicodeString.
Answers:
<Class>
defaultStringReplyContainer:
Set the default container class used for string replies when one is not
explicitly specified.
Arguments:
aClass - <Class> The class to use (e.g., String, ByteArray, UnicodeString).
disconnect
Close the connection to the Redis server.
If in async mode, answers a future that completes when disconnected.
Example:
client disconnect.
self assert: [client isConnected not].
Answers:
<EsRedisClient> The receiver in sync mode.
<EsFuture> A future that completes with the receiver in async mode.
executeWithClient:
Executes aBlock with the receiver as the client argument.
This method provides a polymorphic interface compatible with EsRedisClientPool,
allowing code that requires a dedicated, stateful connection to work seamlessly
with both single clients and client pools without modification. For a single client,
this is equivalent to simply evaluating the block.
Arguments:
aBlock - <Block> A one-argument block that receives the client instance (self).
Answers:
<Object> The result of the block execution.
<EsFuture> If the block returns a future, this method returns that future.
Raises:
<EsRedisConnectionException> If the client is not connected.
geos
Answer the command group object for Redis Geo commands.
Answers:
<EsRedisGeoCommands>
hashes
Answer the command group object for Redis Hash commands.
Answers:
<EsRedisHashCommands>
hyperLogLogs
Answer the command group object for Redis HyperLogLog commands.
Answers:
<EsRedisHyperLogLogCommands>
isAsync
Answer true if this client is running in asynchronous mode, false otherwise.
Answers:
<Boolean>
isAsync:
Set if this client is in async mode or not.
Arguments:
aBoolean - <Boolean>
isConnected
Answer true if the client has an active connection to the Redis server.
Answers:
<Boolean>
isSecure
The client is secure if it's running a secure context
Answers:
<Boolean>
isSubscriber
Answers whether the client is in a special 'subscriber mode', which
restricts the available commands to only those valid for Pub/Sub. This
state is determined by the presence of a pub/sub wrapper.
Answers:
<Boolean> True if the client is in subscriber mode, false otherwise.
json
Answer the command group object for RedisJSON commands.
Answers:
<EsRedisJsonCommands>
keys
Answer the command group object for Redis Key commands.
Answers:
<EsRedisKeyCommands>
lists
Answer the command group object for Redis List commands.
Answers:
<EsRedisListCommands>
pipeline:
Execute a block of commands in a pipeline. The block receives an
EsRedisPipeline instance on which to send commands. This method now
delegates the entire execution flow to the pipeline object itself.
Arguments:
aBlock - <Block> A one-argument block that receives the <EsRedisPipeline> instance.
Answers:
<Array> The array of replies in synchronous mode.
<EsFuture> In async mode, a future that completes with the array of replies.
protocolVersion
Answer the current Redis protocol version (RESP) the client is using.
Defaults to 2.
Answers:
<Integer>
protocolVersion:
Switches the connection to a different protocol version by sending the
`HELLO` command to the server. The client's internal state is updated
automatically upon a successful reply.
A connection must be active to use this method.
Arguments:
anInteger - <Integer> The protocol version to set (either 2 or 3).
Answers:
<Object> The reply from the server, typically a Dictionary.
<EsFuture> In async mode, a future that completes with the reply.
Raises:
<EsRedisException> if the client is not connected or if the version is invalid.
pubsubs
Answer the command group object for Redis Pub/Sub commands.
Answers:
<EsRedisPubSubCommands>
readerMaxBuffer
Answer the maximum size of the reader's buffer in bytes.
A value of 0 means the buffer is unlimited and will not be freed.
See hiredis documentation for more details. Answers nil if not connected.
readerMaxBuffer:
Set the maximum size of the reader's buffer in bytes.
Set to 0 for an unlimited buffer size to improve performance with large payloads.
This setting should be restored to its default value after the high-payload
operations are complete. Does nothing if not connected.
Arguments:
anInteger - <Integer> The max buffer size in bytes.
reconnect
Re-establish a lost connection to the Redis server using the existing
connection options. If the client has never been connected, this will
result in an error.
Answers:
<EsRedisClient> The receiver in sync mode.
<EsFuture> A future that completes with the receiver in async mode.
Raises:
<EsRedisException> If the client has never been connected or the reconnect fails.
scriptings
Answer the command group object for Redis Scripting commands.
Answers:
<EsRedisScriptingCommands>
server
Answer the command group object for Redis Server commands.
Answers:
<EsRedisServerCommands>
sets
Answer the command group object for Redis Set commands.
Answers:
<EsRedisSetCommands>
sortedSets
Answer the command group object for Redis Sorted Set commands.
Answers:
<EsRedisSortedSetCommands>
streams
Answer the command group object for Redis Stream commands.
Answers:
<EsRedisStreamCommands>
strings
Answer the command group object for Redis String commands.
Answers:
<EsRedisStringCommands>
transaction:
Execute a block of commands atomically using MULTI/EXEC. This method now
delegates the entire execution flow to the transaction object itself.
Arguments:
aBlock - <Block> A one-argument block that receives the <EsRedisTransaction> instance.
Answers:
<Array> The array of replies in synchronous mode.
<EsFuture> In async mode, a future that completes with the array of replies.
usingRawReply:do:
Executes aOneArgumentBlock, using aTwoArgumentBlock to process all Redis
replies within the block's dynamic scope. This method provides the raw,
unconverted <OSHiredisReply> object directly to the processing block, which
is ideal for performance-critical operations like custom binary deserialization,
as it avoids intermediate object creation.
This setting is process-specific and safe for asynchronous use.
Example (for custom deserialization):
| myObject |
myObject := client
usingRawReply: [:rawReply :context |
MyObject fromSerializedBytes: rawReply byteArrayValue
]
do: [:c | c strings get: 'my-object-key'].
Arguments:
aTwoArgumentBlock - <Block> A block that accepts two arguments: the raw
<OSHiredisReply> and the <OSHiredisContext>.
aOneArgumentBlock - <Block> A one-argument block that receives the client.
Answers:
<Object> The result of aOneArgumentBlock.
usingReplyConverter:do:
Executes aBlock, using aConverter to process all Redis replies
within the block's dynamic scope. This setting is process-specific
and safe for asynchronous use.
The converter can be a Class that understands the fromRedis*Reply:inContext:
protocol, or it can be a two-argument Block for dynamic, inline reply
processing. When a block is used, it receives the already-converted default
Smalltalk object for further transformation.
Example (using a Class):
| value |
value := client usingReplyConverter: ByteArray do: [:c |
c strings get: 'key'
].
Example (using a Block for custom deserialization):
| myObject |
myObject := client
usingReplyConverter: [:replyBytes :context | MyObject fromSerializedBytes: replyBytes]
do: [:c | c strings get: 'my-object-key'].
Arguments:
aConverter - <Class | Block> The converter to use. If a Block, it must
accept two arguments: the default-converted Smalltalk object
for the reply, and the <OSHiredisContext>.
aBlock - <Block> A one-argument block that receives the client.
Answers:
<Object> The result of aBlock.
Last modified date: 01/22/2026