Title: | Lightweight Portable Message Queue Using 'SQLite' |
---|---|
Description: | Temporary and permanent message queues for R. Built on top of 'SQLite' databases. 'SQLite' provides locking, and makes it possible to detect crashed consumers. Crashed jobs can be automatically marked as "failed", or put in the queue again, potentially a limited number of times. |
Authors: | Gábor Csárdi [aut, cre], Posit Software, PBC [cph, fnd] |
Maintainer: | Gábor Csárdi <[email protected]> |
License: | MIT + file LICENSE |
Version: | 1.1.0 |
Built: | 2025-01-08 05:43:37 UTC |
Source: | https://github.com/r-lib/liteq |
Acknowledge that the work on a message has finished successfully
ack(message)
ack(message)
message |
The message object. |
liteq for examples
Other liteq messages:
consume()
,
is_empty()
,
list_failed_messages()
,
list_messages()
,
message_count()
,
publish()
,
remove_failed_messages()
,
requeue_failed_messages()
,
try_consume()
Blocks and waits for a message if there isn't one to work on currently.
consume(queue, poll_interval = 500)
consume(queue, poll_interval = 500)
queue |
The queue object. |
poll_interval |
Poll interval in milliseconds. How often to poll the queue for new jobs, if none are immediately available. |
A message.
liteq for examples
Other liteq messages:
ack()
,
is_empty()
,
list_failed_messages()
,
list_messages()
,
message_count()
,
publish()
,
remove_failed_messages()
,
requeue_failed_messages()
,
try_consume()
It also creates the database, if it does not exist.
create_queue(name = NULL, db = default_db(), crash_strategy = "fail")
create_queue(name = NULL, db = default_db(), crash_strategy = "fail")
name |
Name of the queue. If not specified or |
db |
Path to the database file. |
crash_strategy |
What to do with crashed jobs. The default is that
they will |
liteq for examples
Other liteq queues:
delete_queue()
,
ensure_queue()
,
list_queues()
If the queue database is not specified explicitly,
then liteq
uses this file. Its location is determined via the
rappdirs
package, see rappdirs::user_data_dir()
.
default_db()
default_db()
A characater scalar, the name of the default database.
Delete a queue
delete_queue(queue, force = FALSE)
delete_queue(queue, force = FALSE)
queue |
The queue to delete. |
force |
Whether to delete the queue even if it contains messages. |
liteq for examples
Other liteq queues:
create_queue()
,
ensure_queue()
,
list_queues()
If it does not exist, then the queue will be created.
ensure_queue(name, db = default_db(), crash_strategy = "fail")
ensure_queue(name, db = default_db(), crash_strategy = "fail")
name |
Name of the queue. If not specified or |
db |
Path to the database file. |
crash_strategy |
What to do with crashed jobs. The default is that
they will |
The queue object.
liteq for examples
Other liteq queues:
create_queue()
,
delete_queue()
,
list_queues()
Check if a queue is empty
is_empty(queue)
is_empty(queue)
queue |
The queue object. |
Logical, whether the queue is empty.
liteq for examples
Other liteq messages:
ack()
,
consume()
,
list_failed_messages()
,
list_messages()
,
message_count()
,
publish()
,
remove_failed_messages()
,
requeue_failed_messages()
,
try_consume()
List failed messages in a queue
list_failed_messages(queue)
list_failed_messages(queue)
queue |
The queue object. |
Data frame with columns: id
, title
, status
.
liteq for examples
Other liteq messages:
ack()
,
consume()
,
is_empty()
,
list_messages()
,
message_count()
,
publish()
,
remove_failed_messages()
,
requeue_failed_messages()
,
try_consume()
List all messages in a queue
list_messages(queue)
list_messages(queue)
queue |
The queue object. |
Data frame with columns: id
, title
, status
.
liteq for examples
Other liteq messages:
ack()
,
consume()
,
is_empty()
,
list_failed_messages()
,
message_count()
,
publish()
,
remove_failed_messages()
,
requeue_failed_messages()
,
try_consume()
List all queues in a database
list_queues(db = default_db())
list_queues(db = default_db())
db |
The queue database to query. |
A list of liteq_queue
objects.
liteq for examples
Other liteq queues:
create_queue()
,
delete_queue()
,
ensure_queue()
Message queues for R. Built on top of 'SQLite' databases.
liteq works with multiple producer and/or consumer processes accessing
the same queue, via the locking mechanism of 'SQLite'. If a queue is
locked by 'SQLite', the process that tries to access it, must wait until
it is unlocked. The maximum amount of waiting time is by default 10
seconds, and it can be changed via the R_LITEQ_BUSY_TIMEOUT
environment variable, in milliseconds. If you have many concurrent
processes using the same liteq database, and see database locked
errors, then you can try to increase the timeout value.
# We don't run this, because it writes to the cache directory db <- tempfile() q <- ensure_queue("jobs", db = db) q list_queues(db) # Publish two messages publish(q, title = "First message", message = "Hello world!") publish(q, title = "Second message", message = "Hello again!") is_empty(q) message_count(q) list_messages(q) # Consume one msg <- try_consume(q) msg ack(msg) list_messages(q) msg2 <- try_consume(q) nack(msg2) list_messages(q) # No more messages is_empty(q) try_consume(q)
## See the manual page
## See the manual page
Get the number of messages in a queue.
message_count(queue)
message_count(queue)
queue |
The queue object. |
Number of messages in the queue.
liteq for examples
Other liteq messages:
ack()
,
consume()
,
is_empty()
,
list_failed_messages()
,
list_messages()
,
publish()
,
remove_failed_messages()
,
requeue_failed_messages()
,
try_consume()
Report that the work on a message has failed
nack(message)
nack(message)
message |
The message object. |
liteq for examples
Publish messages in a queue
publish(queue, title = "", message = "")
publish(queue, title = "", message = "")
queue |
The queue object. |
title |
The title of the messages. It can be the empty string. |
message |
The body of the messages. It can be the empty string.
Must be the same length as |
liteq for examples
Other liteq messages:
ack()
,
consume()
,
is_empty()
,
list_failed_messages()
,
list_messages()
,
message_count()
,
remove_failed_messages()
,
requeue_failed_messages()
,
try_consume()
Remove failed messages from the queue
remove_failed_messages(queue, id = NULL)
remove_failed_messages(queue, id = NULL)
queue |
The queue object. |
id |
Ids of the messages to requeue. If it is |
liteq for examples
Other liteq messages:
ack()
,
consume()
,
is_empty()
,
list_failed_messages()
,
list_messages()
,
message_count()
,
publish()
,
requeue_failed_messages()
,
try_consume()
Requeue failed messages
requeue_failed_messages(queue, id = NULL)
requeue_failed_messages(queue, id = NULL)
queue |
The queue object. |
id |
Ids of the messages to requeue. If it is |
liteq for examples
Other liteq messages:
ack()
,
consume()
,
is_empty()
,
list_failed_messages()
,
list_messages()
,
message_count()
,
publish()
,
remove_failed_messages()
,
try_consume()
Consume a message if there is one available
try_consume(queue)
try_consume(queue)
queue |
The queue object. |
A message, or NULL
if there is not message to work on.
liteq for examples
Other liteq messages:
ack()
,
consume()
,
is_empty()
,
list_failed_messages()
,
list_messages()
,
message_count()
,
publish()
,
remove_failed_messages()
,
requeue_failed_messages()