Dynamodb Tracker¶
Keywords: Dynamodb Tracker, Pynamodb, Python, Status Tracking, Error Handling, Failure Handling
Summary¶
Suppose you have many tasks to do, and it is important to track the status of each task. You can use a DynamoDB table to store the status and error information. With DynamoDB tracker, you can use a simple query to select those unfinished tasks start from the last success. Also, you can select those failed items to debug, retry or ignore them later.
Source Code¶
Below is the source code of the DynamoDB tracker. It has a unit test covers 100% of the code.
[2]:
# -*- coding: utf-8 -*-
"""
This module implements the status tracking using DynamoDB as the backend.
"""
import typing as T
import uuid
import traceback
from datetime import datetime, timezone
from functools import cached_property
from contextlib import contextmanager
from pynamodb.models import (
Model,
PAY_PER_REQUEST_BILLING_MODE,
)
from pynamodb.indexes import (
GlobalSecondaryIndex,
KeysOnlyProjection,
)
from pynamodb.connection import Connection
from pynamodb.attributes import (
UnicodeAttribute,
NumberAttribute,
UTCDateTimeAttribute,
MapAttribute,
JSONAttribute,
)
connection = Connection()
EPOCH = datetime(1970, 1, 1, tzinfo=timezone.utc)
def utc_now() -> datetime:
return datetime.utcnow().replace(tzinfo=timezone.utc)
class StatusAndTaskIdIndex(GlobalSecondaryIndex):
"""
GSI for query by job_id and status
"""
class Meta:
index_name = "status_and_task_id-index"
projection = KeysOnlyProjection()
value: T.Union[str, UnicodeAttribute] = UnicodeAttribute(hash_key=True)
key: T.Union[str, UnicodeAttribute] = UnicodeAttribute()
class LockError(Exception):
"""
Raised when a task worker is trying to work on a locked task.
"""
pass
class IgnoreError(Exception):
"""
Raised when a task is already in "ignore" status (You need to define).
"""
pass
# update context manager in-memory cache
_update_context: T.Dict[
str, # the Dynamodb item hash key
T.Dict[
str, # item attribute name
T.Dict[
str, # there are three key, "old" (value), "new" (value), "act" (update action)
T.Any,
],
],
] = dict()
class Tracker(Model):
"""
The DynamoDB ORM model for the status tracking. You can use one
DynamoDB table for multiple status tracking jobs.
Concepts:
- job: a high-level description of a job, the similar task on different
items will be grouped into one job. ``job_id`` is the unique identifier
for a job.
- task: a specific task on a specific item. ``task_id`` is the unique identifier
for a task.
- status: an integer value to indicate the status of a task. The closer to
the end, the value should be larger, so we can compare the values.
Attributes:
:param key: The unique identifier of the task. It is a compound key of
job_id and task_id. The format is ``{job_id}{separator}{task_id}``
:param value: Indicate the status of the task. The format is
``{job_id}{separator}{status_code}``.
:param update_time: when the task status is updated
:param retry: how many times the task has been retried
:param lock: a concurrency control mechanism. It is an uuid string.
:param lock_time: when this task is locked.
:param data: arbitrary data in python dictionary.
:param errors: arbitrary error data in python dictionary.
"""
class Meta:
table_name = "tracker-pattern"
region = "us-east-1"
billing_mode = PAY_PER_REQUEST_BILLING_MODE
# define attributes
key: T.Union[str, UnicodeAttribute] = UnicodeAttribute(hash_key=True)
value: T.Union[str, UnicodeAttribute] = UnicodeAttribute()
update_time: T.Union[datetime, UTCDateTimeAttribute] = UTCDateTimeAttribute(
default=utc_now,
)
retry: T.Union[int, NumberAttribute] = NumberAttribute(default=0)
lock: T.Union[T.Optional[str], UnicodeAttribute] = UnicodeAttribute(
default=None,
null=True,
)
lock_time: T.Union[datetime, UTCDateTimeAttribute] = UTCDateTimeAttribute(
default=EPOCH,
)
data: T.Optional[T.Union[dict, JSONAttribute]] = JSONAttribute(
default=None, null=True
)
errors: T.Optional[T.Union[dict, JSONAttribute]] = JSONAttribute(
default=None, null=True
)
status_and_task_id_index = StatusAndTaskIdIndex()
SEP = "____" # the separator string between job_id and task_id
# how many digits the max status code have, this ensures that the
# status can be used in comparison
STATUS_ZERO_PAD = 2
MAX_RETRY = 3 # how many retry is allowed before we ignore it
LOCK_EXPIRE_SECONDS = 900 # how long the lock will expire
DEFAULT_STATUS = 0 # the default status code, means "to do", usually start from 0
@classmethod
def make_key(cls, job_id: str, task_id: str) -> str:
return f"{job_id}{cls.SEP}{task_id}"
@classmethod
def make_value(cls, job_id: str, status: int) -> str:
return f"{job_id}{cls.SEP}{str(status).zfill(cls.STATUS_ZERO_PAD)}"
@cached_property
def job_id(self) -> str:
return self.key.split(self.SEP)[0]
@cached_property
def task_id(self) -> str:
return self.key.split(self.SEP)[1]
@property
def status(self) -> int:
return int(self.value.split(self.SEP)[1])
@classmethod
def get_one(cls, job_id: str, task_id: str) -> "Tracker":
return cls.get(cls.make_key(job_id, task_id))
@classmethod
def make(
cls,
job_id: str,
task_id: str,
status: int,
data: T.Optional[dict] = None,
) -> "Tracker":
"""
A factory method to create new instance of a tracker. It won't save
to DynamoDB.
"""
kwargs = dict(
key=cls.make_key(job_id, task_id),
value=cls.make_value(job_id, status),
)
if data is not None:
kwargs["data"] = data
return cls(**kwargs)
@classmethod
def new(
cls,
job_id: str,
task_id: str,
data: T.Optional[dict] = None,
) -> "Tracker":
"""
A factory method to create new instance of a tracker and save it to
DynamoDB.
"""
obj = cls.make(
job_id=job_id,
task_id=task_id,
status=cls.DEFAULT_STATUS,
data=data,
)
obj.save()
return obj
@classmethod
def delete_all(cls) -> int:
"""
Delete all item in a DynamoDB table.
"""
ith = 0
with cls.batch_write() as batch:
for ith, item in enumerate(cls.scan(), start=1):
batch.delete(item)
return ith
def is_locked(self) -> bool:
"""
Check if the task is locked.
"""
if self.lock is None:
return False
else:
now = utc_now()
return (now - self.lock_time).total_seconds() < self.LOCK_EXPIRE_SECONDS
def _setup_update_context(self):
_update_context[self.key] = dict()
def _rollback_update_context(self):
for attr_name, dct in _update_context[self.key].items():
setattr(self, attr_name, dct["old"])
def _flush_update_context(self):
actions = [dct["act"] for dct in _update_context[self.key].values()]
if len(actions):
# print("flushing update data to Dyanmodb")
# print(f"actions: {actions}")
# pynamodb update API will apply the updated data to the current
# item object.
self.update(actions=actions)
def _teardown_update_context(self):
del _update_context[self.key]
@contextmanager
def update_context(self) -> "Tracker":
"""
A context manager to update the attributes of the task.
If the update fails, the attributes will be rolled back to the original
value.
Usage::
tracker = Tracker.new(job_id="my-job", task_id="task-1")
with tracker.update_context():
tracker.set_status(StatusEnum.s03_in_progress)
tracker.set_data({"foo": "bar"})
"""
try:
self._setup_update_context()
yield self
self._flush_update_context()
except Exception as e:
self._rollback_update_context()
raise e
finally:
self._teardown_update_context()
def set_status(self, status: int) -> "Tracker":
"""
Set the status of the task. Don't do this directly::
self.value = self.make_value(self.job_id, ...)
"""
_update_context[self.key]["value"] = {"old": self.value}
self.value = self.make_value(self.job_id, status)
_update_context[self.key]["value"]["new"] = self.value
_update_context[self.key]["value"]["act"] = Tracker.value.set(self.value)
return self
def set_update_time(self, update_time: T.Optional[datetime] = None) -> "Tracker":
"""
Set the update time of the task. Don't do this directly::
self.update_time = ...
"""
_update_context[self.key]["update_time"] = {"old": self.update_time}
if update_time is None:
update_time = utc_now()
self.update_time = update_time
_update_context[self.key]["update_time"]["new"] = self.update_time
_update_context[self.key]["update_time"]["act"] = Tracker.update_time.set(
self.update_time
)
return self
def set_retry_as_zero(self) -> "Tracker":
"""
Set the retry count to zero. Don't do this directly::
self.retry = 0
"""
_update_context[self.key]["retry"] = {"old": self.retry}
self.retry = 0
_update_context[self.key]["retry"]["new"] = self.retry
_update_context[self.key]["retry"]["act"] = Tracker.retry.set(self.retry)
return self
def set_retry_plus_one(self) -> "Tracker":
"""
Increase the retry count by one. Don't do this directly::
self.retry += 1
"""
_update_context[self.key]["retry"] = {"old": self.retry}
self.retry += 1
_update_context[self.key]["retry"]["new"] = self.retry
_update_context[self.key]["retry"]["act"] = Tracker.retry.set(Tracker.retry + 1)
return self
def set_locked(self) -> "Tracker":
"""
Set the lock of the task. Don't do this directly::
self.lock = ...
self.lock_time = ...
"""
_update_context[self.key]["lock"] = {"old": self.lock}
_update_context[self.key]["lock_time"] = {"old": self.lock_time}
self.lock = uuid.uuid4().hex
self.lock_time = utc_now()
_update_context[self.key]["lock"]["new"] = self.lock
_update_context[self.key]["lock_time"]["new"] = self.lock_time
_update_context[self.key]["lock"]["act"] = Tracker.lock.set(self.lock)
_update_context[self.key]["lock_time"]["act"] = Tracker.lock_time.set(
self.lock_time
)
return self
def set_unlock(self) -> "Tracker":
"""
Set the lock of the task to None. Don't do this directly::
self.lock = None
"""
_update_context[self.key]["lock"] = {"old": self.lock}
self.lock = None
_update_context[self.key]["lock"]["new"] = self.lock
_update_context[self.key]["lock"]["act"] = Tracker.lock.set(self.lock)
return self
def set_data(self, data: T.Optional[dict]) -> "Tracker":
"""
Logically the data attribute should be mutable,
make sure don't edit the old data directly
for example, don't do this::
self.data["foo"] = "bar"
self.set_data(self.data)
Please do this::
new_data = self.data.copy()
new_data["foo"] = "bar"
self.set_data(new_data)
"""
_update_context[self.key]["data"] = {"old": self.data}
self.data = data
_update_context[self.key]["data"]["new"] = self.data
_update_context[self.key]["data"]["act"] = Tracker.data.set(data)
return self
def set_errors(self, errors: T.Optional[dict]) -> "Tracker":
"""
Similar to :meth:`Tracker.set_data`. But it is for errors.
"""
_update_context[self.key]["errors"] = {"old": self.errors}
self.errors = errors
_update_context[self.key]["errors"]["new"] = self.errors
_update_context[self.key]["errors"]["act"] = Tracker.errors.set(errors)
return self
@contextmanager
def start(
self,
in_process_status: int,
failed_status: int,
success_status: int,
ignore_status: int,
) -> "Tracker":
# Handle concurrent lock
if self.is_locked():
raise LockError(f"Task {self.key} is locked.")
# Handle ignore status
if self.status == ignore_status:
raise IgnoreError(
f"Task {self.key} retry count already exceeded {self.MAX_RETRY}, "
f"ignore it."
)
# mark as in progress
with self.update_context():
(self.set_status(in_process_status).set_update_time().set_locked())
try:
self._setup_update_context()
# print("before yield")
yield self
# print("after yield")
(
self.set_status(success_status)
.set_update_time()
.set_unlock()
.set_retry_as_zero()
)
# print("end of success logic")
except Exception as e: # handling user code
# print("begin of error handling logic")
# reset the update context
self._teardown_update_context()
self._setup_update_context()
(
self.set_status(failed_status)
.set_update_time()
.set_unlock()
.set_errors(
{"error": repr(e), "traceback": traceback.format_exc(limit=10)}
)
.set_retry_plus_one()
)
if self.retry >= self.MAX_RETRY:
self.set_status(ignore_status)
# print("end of error handling logic")
raise e
finally:
# print("begin of finally")
self._flush_update_context()
self._teardown_update_context()
# print("end of finally")
@classmethod
def query_by_status(
cls,
job_id: str,
status: T.Union[int, T.List[int]],
limit: int = 10,
) -> T.Iterable["Tracker"]:
if isinstance(status, list):
status_list = status
else:
status_list = [status]
for status in status_list:
yield from cls.status_and_task_id_index.query(
hash_key=cls.make_value(job_id, status),
limit=limit,
)
Usage Example¶
[14]:
import enum
import time
from rich import print as rprint
class StatusEnum(int, enum.Enum):
s00_todo = 0
s03_in_progress = 3
s06_failed = 6
s09_success = 9
s10_ignore = 10
_status_mapper = {
0: "s00_todo",
3: "s03_in_progress",
6: "s06_failed",
9: "s09_success",
10: "s10_ignore",
}
class Task(Tracker):
DEFAULT_STATUS = StatusEnum.s00_todo.value
@property
def status_code(self) -> str:
return _status_mapper[self.status]
def start_job(
self,
) -> "Task":
"""
This is just an example of how to use :meth:`Tracker.start`.
A job should always have four related status codes:
- in process status
- failed status
- success status
- ignore status
If you have multiple type of jobs, I recommend creating multiple
wrapper functions like this for each type of jobs. And ensure that
the "ignore" status value is the largest status value among all,
and use the same "ignore" status value for all type of jobs.
"""
return self.start(
in_process_status=StatusEnum.s03_in_progress.value,
failed_status=StatusEnum.s06_failed.value,
success_status=StatusEnum.s09_success.value,
ignore_status=StatusEnum.s10_ignore.value,
)
Task.create_table(wait=True)
The Happy Path¶
Initialize a new task
Start the job
Do some job
See how’s the status changed in the DynamoDB table
[38]:
job_id = "parse-json"
task_id = "t-1"
task = Task.new(job_id, task_id)
rprint(task.attribute_values)
print(f"before the job start, status is {task.status_code!r}, the concurrency lock is locked: {task.is_locked()}")
with task.start_job():
print(f" the job is in progress, status is {task.status_code!r}, the concurrency lock is locked: {task.is_locked()}")
print(" do some job ...")
task.set_data({"n_records": 100})
print(f"job is finished and succeeded without any error, status is {task.status_code!r}, the concurrency lock is locked: {task.is_locked()}")
rprint(f"the updated task.data: {task.data}")
{ 'lock_time': datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), 'retry': 0, 'update_time': datetime.datetime(2022, 12, 26, 4, 9, 18, 866585, tzinfo=datetime.timezone.utc), 'key': 'parse-json____t-1', 'value': 'parse-json____00' }
before the job start, status is 's00_todo', the concurrency lock is locked: False
the job is in progress, status is 's03_in_progress', the concurrency lock is locked: True
do some job ...
job is finished and succeeded without any error, status is 's09_success', the concurrency lock is locked: False
the updated task.data: {'n_records': 100}
Error Handling¶
[34]:
job_id = "parse-json"
task_id = "t-2"
task = Task.new(job_id, task_id)
rprint(task.attribute_values)
# define a custom error
class UserError(Exception):
pass
print(f"before the job start, status is {task.status_code}")
try:
with task.start_job():
print(f" the job is in progress, status is {task.status_code}")
print(" do some job ...")
task.set_data({"n_records": 100})
raise UserError("something wrong!")
except UserError:
pass
print(f"job is failed, status is {task.status_code}")
print(f"task.data is not changed: {task.data}")
print(f"task.retry: {task.retry} ")
print("the error is logged in task.error attribute")
print(f"the raised error object: {task.errors['error']}")
rprint(f"the error traceback: {task.errors['traceback']}")
{ 'lock_time': datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), 'retry': 0, 'update_time': datetime.datetime(2022, 12, 26, 4, 6, 44, 124953, tzinfo=datetime.timezone.utc), 'key': 'parse-json____t-2', 'value': 'parse-json____00' }
before the job start, status is s00_todo
the job is in progress, status is s03_in_progress
do some job ...
job is failed, status is s06_failed
task.data is not changed: None
task.retry: 1
the error is logged in task.error attribute
the raised error object: UserError('something wrong!')
the error traceback: Traceback (most recent call last): File "/var/folders/3y/7t5ll4sn6x76g8rhfqlc36dw0000gn/T/ipykernel_57940/328322325.py", line 416, in start yield self File "/var/folders/3y/7t5ll4sn6x76g8rhfqlc36dw0000gn/T/ipykernel_57940/2331532066.py", line 17, in <cell line: 12> raise UserError("something wrong!") UserError: something wrong!
Catch up from Last Success¶
[39]:
job_id = "copy-files"
Task.make(job_id, "t-1", StatusEnum.s00_todo.value).save()
Task.make(job_id, "t-2", StatusEnum.s03_in_progress.value).save()
Task.make(job_id, "t-3", StatusEnum.s06_failed.value).save()
Task.make(job_id, "t-4", StatusEnum.s09_success.value).save()
Task.make(job_id, "t-5", StatusEnum.s10_ignore.value).save()
rprint(
list(
Task.query_by_status(
job_id,
status=[
StatusEnum.s00_todo.value,
StatusEnum.s06_failed.value,
]
)
)
)
[tracker-pattern<copy-files____t-1>, tracker-pattern<copy-files____t-3>]