Queue¶
Note: I'm in the process of rewriting this documentation after the release of v0.6
Installation¶
pip install litequeue
Use cases¶
You can use this to implement a persistent queue. It also has timing metrics for the messages, and the api to set a message as done lets you specify the message_id
to be set as done.
Since it's all based on SQLite / SQL, it is easily extendable.
Tasks/messages are always passed as strings, so you can use JSON data as messages. Messages are interpreted as tasks, so after you pop
a message, you need to mark it as done when you finish processing it.
Differences with a normal Python queue.Queue
¶
- Persistence
- Different API to mark messages as done (you tell it which
message_id
to set as done) - Timing metrics. As long as messages are still in the queue or not pruned, you can see how long they have been there or how long they took to finish.
- Easy to extend using SQL
- Messages/elements/tasks in the queue are always strings
Messages data¶
- data (text): the message itself, it must be a string
- message_id (text): a UUID v7 generated when the message is put in the queue.
- status (int): status of the message. 0 = free, 1 = locked (the message is being processed), 2 = done (the message has been processed, and it can be deleted), 3 = failed.
- in_time (int): the Unix epoch time when the message was inserted in the queue (nanoseconds)
- lock_time (int): the Unix epoch (nanoseconds) time when the message was locked for processing
- done_time (int): the Unix epoch time (nanoseconds) when the message was marked as done/processed
Architecture¶
SQLite does not have row-level locks, so we can't use the pattern like SELECT ... FOR UPDATE SKIP LOCKED
. The current litequeue
implementation marks a message as locked first and then returns it. The application is in charge of setting it as done. The problem with this approach is that the application could crash while processing the message/task, so it would stay marked locked forever. The messages table has an in_time
and lock_time
columns (both are Unix epochs). To counter the lock + crash problem, some logic could be implemented like:
time_locked = in_time - lock_time
if time_locked > threshhold:
delete/modify/add_again ( message )
With that pattern, you can check all the tasks that have been locked for more than X
seconds and do whatever you need with them.
Examples¶
Initialize a queue and put 4 messages. Each time you put a message in the queue, it returns the rowid
of the message you just inserted.
Put messages¶
from litequeue import LiteQueue
q = LiteQueue(":memory:")
q.put("hello")
q.put("world")
q.put("foo")
q.put("bar")
# 4 <- ID of the last row modified
Pop messages¶
Now we can use the q.pop()
method to retrieve the next message. For each message, a random message_id
will be generated on creation. The .pop()
method returns a dictionary with the message's data.
q.pop()
# {'message': 'hello', 'message_id': '7da620ac542acd76c806dbcf00218426', ...}
Printing the queue¶
The queue object implements a __repr__
method, so you can use print(q)
to check the contents.
print(q)
# LiteQueue(Connection='sqlite3.Connection(...)', items=[{'done_time': None,
# 'in_time': 1612711137,
# 'lock_time': 1612711137,
# 'message': 'hello',
# 'status': 1,
# 'message_id': '7da620ac542acd76c806dbcf00218426'},
# ...
Message processing¶
If we pop
all the messages and try to pop
another one, it will return None
.
# pop remaining
for _ in range(3):
q.pop()
assert q.pop() is None
Now we will insert 4 more messages. The last message returns 8
. That means the last message inserted has a rowid
of 8
. Then we will pop()
a message and save it in a variable called task
. The tasks are returned as dictionaries.
q.put("hello")
q.put("world")
q.put("foo")
q.put("bar")
# 8 <- ID of the last row modified
task = q.pop()
assert task.data == "hello"
Peek a message¶
With the q.peek()
method you can have a look at the next message to be processed. The method will return the message, but it won't pop
it from the queue. Since we have already popped the "hello"
message, the peek()
method will return the "world"
message.
q.peek()
# {'message': 'world',
# 'message_id': '44cbc85f12b62891aa596b91f14183e5',
# 'status': 0,
# 'in_time': 1612711138,
# 'lock_time': None,
# 'done_time': None}
# next one that is free
assert q.peek().message == "world"
# status = 0 = free
assert q.peek().status == 0
Now we'll go back to the message we previously popped from the queue. We will mark it as done with the q.done(message_id)
method. After that, we can use the q.get(message_id)
method to check it has been marked as done ('status' = 2
)
task.data, task.message_id
# ('hello', 'c9b9ef76e3a77cc66dd749d485613ec1')
q.done(task.message_id)
# 8 <- ID of the last row modified
q.get(task.message_id)
# {'message': 'hello',
# 'message_id': 'c9b9ef76e3a77cc66dd749d485613ec1',
# 'status': 2, <---- status is now 2 (DONE)
# 'in_time': 1612711138,
# 'lock_time': 1612711138,
# 'done_time': 1612711138}
already_done = q.get(task.message_id)
# stauts = 2 = done
assert already_done.status == 2
Message timing data¶
We can use the timing data that is automatically created during messages create/lock/mark as done steps.
in_time = already_done.in_time
lock_time = already_done.lock_time
done_time = already_done.done_time * 1e-9
print(
f"Task {already_done['message_id']} took {done_time - lock_time} seconds to get done and was in the queue for {done_time - in_time} seconds"
)
# Task c9b9ef76e3a77cc66dd749d485613ec1 took 0 seconds to get done and was in the queue for 0 seconds
Check queue size¶
We can get the queue size using the q.size()
method. It will ignore the finished items, so the real number of rows in the SQLite database can be bigger than the number returned.
To remove the messages marked as done ('status' = 2
), use the q.prune()
method. This will remove those messages permanently.
assert q.qsize() == 7
next_one_msg = q.peek().message
next_one_id = q.peek().message_id
task = q.pop()
assert task.message == next_one_msg
assert task.message_id == next_one_id
# remove finished items
q.prune()
print(q)
# LiteQueue(Connection='sqlite3.Connection(...)', items=[{'done_time': None,
# 'in_time': 1612711137,
# 'lock_time': 1612711137,
# 'message': 'hello',
# 'status': 1,
# 'message_id': '7da620ac542acd76c806dbcf00218426'},
# {'done_time': None,
# 'in_time': 1612711137,
# 'lock_time': 1612711137,
# 'message': 'world',
# 'status': 1,
# 'message_id': 'a593292cfc8d2f3949eab857eafaf608'},
# {'done_time': None,
# 'in_time': 1612711137,
# 'lock_time': 1612711137,
# 'message': 'foo',
# 'status': 1,
# 'message_id': '17e843a29770df8438ad72bbcf059bf5'},
# ...
Set a max queue size¶
If you specify a maxsize
when you initialize the queue, it will create a trigger that will raise an error when that size is reached. In Python, it will rise an sqlite3.IntegrityError
exception.
q = LiteQueue(":memory:", maxsize=50)
for i in range(50):
q.put(f"data_{i}")
assert q.qsize() == 50
An error is raised when the queue has reached its size limit.
import sqlite3
try:
q.put("new")
except sqlite3.IntegrityError: # max len reached
print("test pass")
# test pass
When we pop
and item we can add another one. Take into account that q.put()
will return the rowid
of the latest inserted message, it does not represent the current queue size.
q.pop()
# {'message': 'aktabyjadzrsohlitnei',
# 'message_id': '08b201c31099a296ef37f23b5257e5b6'}
# Now we can put another message without error
q.put("hello")
# 51
Empty queues¶
We can check if a queue is empty using the q.empty()
method.
# Check if a queue is empty
assert q.empty() == False
q2 = LiteQueue(":memory:")
assert q2.empty() == True
Disclaimer¶
I'm still designing the internal structure of litequeue, the messages metadata and how they are created / locked / deleted, so changes can be expected. However, the main functionality and the exposed API of put()
/ pop()
/ done()
/ get()
should stay the same. The changes will be mostly internal or adding new methods to the queue. Feedback is welcome!
Alternatives¶
- Huey: Huey is a task queue implemented in Python, with multiple backends (Redis/SQLite/in-memory). Huey is a more "complete" task queue, it includes a lot of functionality that is missing from
litequeue
. The scope of Huey is much bigger, it lets you decorate functions, run tasks periodically, etc.litequeue
tries to "just" be a primitive queue implementation on which to build other tools. Even though it's written in Python,litequeue
is easy to port to other programming languages and have multiple processes interact with the same persistent queue.