{ "cells": [ { "cell_type": "markdown", "source": [ "# Dynamodb Tracker\n", "\n", "Keywords: Dynamodb Tracker, Pynamodb, Python, Status Tracking, Error Handling, Failure Handling\n", "\n", "\n", "## Summary\n", "\n", "Suppose you have many tasks to do, and it is important to track the status of each task. You can use a DynamoDB table to store the status and error information. With DynamoDB tracker, you can use a simple query to select those unfinished tasks start from the last success. Also, you can select those failed items to debug, retry or ignore them later.\n", "\n", "\n", "## Source Code\n", "\n", "Below is the source code of the DynamoDB tracker. It has a unit test covers 100% of the code." ], "metadata": { "collapsed": false, "pycharm": { "name": "#%% md\n" } } }, { "cell_type": "code", "execution_count": 2, "outputs": [], "source": [ "# -*- coding: utf-8 -*-\n", "\n", "\"\"\"\n", "This module implements the status tracking using DynamoDB as the backend.\n", "\"\"\"\n", "\n", "import typing as T\n", "import uuid\n", "import traceback\n", "from datetime import datetime, timezone\n", "from functools import cached_property\n", "from contextlib import contextmanager\n", "\n", "from pynamodb.models import (\n", " Model,\n", " PAY_PER_REQUEST_BILLING_MODE,\n", ")\n", "from pynamodb.indexes import (\n", " GlobalSecondaryIndex,\n", " KeysOnlyProjection,\n", ")\n", "from pynamodb.connection import Connection\n", "from pynamodb.attributes import (\n", " UnicodeAttribute,\n", " NumberAttribute,\n", " UTCDateTimeAttribute,\n", " MapAttribute,\n", " JSONAttribute,\n", ")\n", "\n", "connection = Connection()\n", "\n", "\n", "EPOCH = datetime(1970, 1, 1, tzinfo=timezone.utc)\n", "\n", "\n", "def utc_now() -> datetime:\n", " return datetime.utcnow().replace(tzinfo=timezone.utc)\n", "\n", "\n", "class StatusAndTaskIdIndex(GlobalSecondaryIndex):\n", " \"\"\"\n", " GSI for query by job_id and status\n", " \"\"\"\n", "\n", " class Meta:\n", " index_name = \"status_and_task_id-index\"\n", " projection = KeysOnlyProjection()\n", "\n", " value: T.Union[str, UnicodeAttribute] = UnicodeAttribute(hash_key=True)\n", " key: T.Union[str, UnicodeAttribute] = UnicodeAttribute()\n", "\n", "\n", "class LockError(Exception):\n", " \"\"\"\n", " Raised when a task worker is trying to work on a locked task.\n", " \"\"\"\n", "\n", " pass\n", "\n", "\n", "class IgnoreError(Exception):\n", " \"\"\"\n", " Raised when a task is already in \"ignore\" status (You need to define).\n", " \"\"\"\n", "\n", " pass\n", "\n", "\n", "# update context manager in-memory cache\n", "_update_context: T.Dict[\n", " str, # the Dynamodb item hash key\n", " T.Dict[\n", " str, # item attribute name\n", " T.Dict[\n", " str, # there are three key, \"old\" (value), \"new\" (value), \"act\" (update action)\n", " T.Any,\n", " ],\n", " ],\n", "] = dict()\n", "\n", "\n", "class Tracker(Model):\n", " \"\"\"\n", " The DynamoDB ORM model for the status tracking. You can use one\n", " DynamoDB table for multiple status tracking jobs.\n", "\n", " Concepts:\n", "\n", " - job: a high-level description of a job, the similar task on different\n", " items will be grouped into one job. ``job_id`` is the unique identifier\n", " for a job.\n", " - task: a specific task on a specific item. ``task_id`` is the unique identifier\n", " for a task.\n", " - status: an integer value to indicate the status of a task. The closer to\n", " the end, the value should be larger, so we can compare the values.\n", "\n", " Attributes:\n", "\n", " :param key: The unique identifier of the task. It is a compound key of\n", " job_id and task_id. The format is ``{job_id}{separator}{task_id}``\n", " :param value: Indicate the status of the task. The format is\n", " ``{job_id}{separator}{status_code}``.\n", " :param update_time: when the task status is updated\n", " :param retry: how many times the task has been retried\n", " :param lock: a concurrency control mechanism. It is an uuid string.\n", " :param lock_time: when this task is locked.\n", " :param data: arbitrary data in python dictionary.\n", " :param errors: arbitrary error data in python dictionary.\n", " \"\"\"\n", "\n", " class Meta:\n", " table_name = \"tracker-pattern\"\n", " region = \"us-east-1\"\n", " billing_mode = PAY_PER_REQUEST_BILLING_MODE\n", "\n", " # define attributes\n", " key: T.Union[str, UnicodeAttribute] = UnicodeAttribute(hash_key=True)\n", " value: T.Union[str, UnicodeAttribute] = UnicodeAttribute()\n", " update_time: T.Union[datetime, UTCDateTimeAttribute] = UTCDateTimeAttribute(\n", " default=utc_now,\n", " )\n", " retry: T.Union[int, NumberAttribute] = NumberAttribute(default=0)\n", " lock: T.Union[T.Optional[str], UnicodeAttribute] = UnicodeAttribute(\n", " default=None,\n", " null=True,\n", " )\n", " lock_time: T.Union[datetime, UTCDateTimeAttribute] = UTCDateTimeAttribute(\n", " default=EPOCH,\n", " )\n", " data: T.Optional[T.Union[dict, JSONAttribute]] = JSONAttribute(\n", " default=None, null=True\n", " )\n", " errors: T.Optional[T.Union[dict, JSONAttribute]] = JSONAttribute(\n", " default=None, null=True\n", " )\n", "\n", " status_and_task_id_index = StatusAndTaskIdIndex()\n", "\n", " SEP = \"____\" # the separator string between job_id and task_id\n", " # how many digits the max status code have, this ensures that the\n", " # status can be used in comparison\n", " STATUS_ZERO_PAD = 2\n", " MAX_RETRY = 3 # how many retry is allowed before we ignore it\n", " LOCK_EXPIRE_SECONDS = 900 # how long the lock will expire\n", " DEFAULT_STATUS = 0 # the default status code, means \"to do\", usually start from 0\n", "\n", " @classmethod\n", " def make_key(cls, job_id: str, task_id: str) -> str:\n", " return f\"{job_id}{cls.SEP}{task_id}\"\n", "\n", " @classmethod\n", " def make_value(cls, job_id: str, status: int) -> str:\n", " return f\"{job_id}{cls.SEP}{str(status).zfill(cls.STATUS_ZERO_PAD)}\"\n", "\n", " @cached_property\n", " def job_id(self) -> str:\n", " return self.key.split(self.SEP)[0]\n", "\n", " @cached_property\n", " def task_id(self) -> str:\n", " return self.key.split(self.SEP)[1]\n", "\n", " @property\n", " def status(self) -> int:\n", " return int(self.value.split(self.SEP)[1])\n", "\n", " @classmethod\n", " def get_one(cls, job_id: str, task_id: str) -> \"Tracker\":\n", " return cls.get(cls.make_key(job_id, task_id))\n", "\n", " @classmethod\n", " def make(\n", " cls,\n", " job_id: str,\n", " task_id: str,\n", " status: int,\n", " data: T.Optional[dict] = None,\n", " ) -> \"Tracker\":\n", " \"\"\"\n", " A factory method to create new instance of a tracker. It won't save\n", " to DynamoDB.\n", " \"\"\"\n", " kwargs = dict(\n", " key=cls.make_key(job_id, task_id),\n", " value=cls.make_value(job_id, status),\n", " )\n", " if data is not None:\n", " kwargs[\"data\"] = data\n", " return cls(**kwargs)\n", "\n", " @classmethod\n", " def new(\n", " cls,\n", " job_id: str,\n", " task_id: str,\n", " data: T.Optional[dict] = None,\n", " ) -> \"Tracker\":\n", " \"\"\"\n", " A factory method to create new instance of a tracker and save it to\n", " DynamoDB.\n", " \"\"\"\n", " obj = cls.make(\n", " job_id=job_id,\n", " task_id=task_id,\n", " status=cls.DEFAULT_STATUS,\n", " data=data,\n", " )\n", " obj.save()\n", " return obj\n", "\n", " @classmethod\n", " def delete_all(cls) -> int:\n", " \"\"\"\n", " Delete all item in a DynamoDB table.\n", " \"\"\"\n", " ith = 0\n", " with cls.batch_write() as batch:\n", " for ith, item in enumerate(cls.scan(), start=1):\n", " batch.delete(item)\n", " return ith\n", "\n", " def is_locked(self) -> bool:\n", " \"\"\"\n", " Check if the task is locked.\n", " \"\"\"\n", " if self.lock is None:\n", " return False\n", " else:\n", " now = utc_now()\n", " return (now - self.lock_time).total_seconds() < self.LOCK_EXPIRE_SECONDS\n", "\n", " def _setup_update_context(self):\n", " _update_context[self.key] = dict()\n", "\n", " def _rollback_update_context(self):\n", " for attr_name, dct in _update_context[self.key].items():\n", " setattr(self, attr_name, dct[\"old\"])\n", "\n", " def _flush_update_context(self):\n", " actions = [dct[\"act\"] for dct in _update_context[self.key].values()]\n", " if len(actions):\n", " # print(\"flushing update data to Dyanmodb\")\n", " # print(f\"actions: {actions}\")\n", " # pynamodb update API will apply the updated data to the current\n", " # item object.\n", " self.update(actions=actions)\n", "\n", " def _teardown_update_context(self):\n", " del _update_context[self.key]\n", "\n", " @contextmanager\n", " def update_context(self) -> \"Tracker\":\n", " \"\"\"\n", " A context manager to update the attributes of the task.\n", " If the update fails, the attributes will be rolled back to the original\n", " value.\n", "\n", " Usage::\n", "\n", " tracker = Tracker.new(job_id=\"my-job\", task_id=\"task-1\")\n", " with tracker.update_context():\n", " tracker.set_status(StatusEnum.s03_in_progress)\n", " tracker.set_data({\"foo\": \"bar\"})\n", " \"\"\"\n", " try:\n", " self._setup_update_context()\n", " yield self\n", " self._flush_update_context()\n", " except Exception as e:\n", " self._rollback_update_context()\n", " raise e\n", " finally:\n", " self._teardown_update_context()\n", "\n", " def set_status(self, status: int) -> \"Tracker\":\n", " \"\"\"\n", " Set the status of the task. Don't do this directly::\n", "\n", " self.value = self.make_value(self.job_id, ...)\n", " \"\"\"\n", " _update_context[self.key][\"value\"] = {\"old\": self.value}\n", " self.value = self.make_value(self.job_id, status)\n", " _update_context[self.key][\"value\"][\"new\"] = self.value\n", " _update_context[self.key][\"value\"][\"act\"] = Tracker.value.set(self.value)\n", " return self\n", "\n", " def set_update_time(self, update_time: T.Optional[datetime] = None) -> \"Tracker\":\n", " \"\"\"\n", " Set the update time of the task. Don't do this directly::\n", "\n", " self.update_time = ...\n", " \"\"\"\n", " _update_context[self.key][\"update_time\"] = {\"old\": self.update_time}\n", " if update_time is None:\n", " update_time = utc_now()\n", " self.update_time = update_time\n", " _update_context[self.key][\"update_time\"][\"new\"] = self.update_time\n", " _update_context[self.key][\"update_time\"][\"act\"] = Tracker.update_time.set(\n", " self.update_time\n", " )\n", " return self\n", "\n", " def set_retry_as_zero(self) -> \"Tracker\":\n", " \"\"\"\n", " Set the retry count to zero. Don't do this directly::\n", "\n", " self.retry = 0\n", " \"\"\"\n", " _update_context[self.key][\"retry\"] = {\"old\": self.retry}\n", " self.retry = 0\n", " _update_context[self.key][\"retry\"][\"new\"] = self.retry\n", " _update_context[self.key][\"retry\"][\"act\"] = Tracker.retry.set(self.retry)\n", " return self\n", "\n", " def set_retry_plus_one(self) -> \"Tracker\":\n", " \"\"\"\n", " Increase the retry count by one. Don't do this directly::\n", "\n", " self.retry += 1\n", " \"\"\"\n", " _update_context[self.key][\"retry\"] = {\"old\": self.retry}\n", " self.retry += 1\n", " _update_context[self.key][\"retry\"][\"new\"] = self.retry\n", " _update_context[self.key][\"retry\"][\"act\"] = Tracker.retry.set(Tracker.retry + 1)\n", " return self\n", "\n", " def set_locked(self) -> \"Tracker\":\n", " \"\"\"\n", " Set the lock of the task. Don't do this directly::\n", "\n", " self.lock = ...\n", " self.lock_time = ...\n", " \"\"\"\n", " _update_context[self.key][\"lock\"] = {\"old\": self.lock}\n", " _update_context[self.key][\"lock_time\"] = {\"old\": self.lock_time}\n", " self.lock = uuid.uuid4().hex\n", " self.lock_time = utc_now()\n", " _update_context[self.key][\"lock\"][\"new\"] = self.lock\n", " _update_context[self.key][\"lock_time\"][\"new\"] = self.lock_time\n", " _update_context[self.key][\"lock\"][\"act\"] = Tracker.lock.set(self.lock)\n", " _update_context[self.key][\"lock_time\"][\"act\"] = Tracker.lock_time.set(\n", " self.lock_time\n", " )\n", " return self\n", "\n", " def set_unlock(self) -> \"Tracker\":\n", " \"\"\"\n", " Set the lock of the task to None. Don't do this directly::\n", "\n", " self.lock = None\n", " \"\"\"\n", " _update_context[self.key][\"lock\"] = {\"old\": self.lock}\n", " self.lock = None\n", " _update_context[self.key][\"lock\"][\"new\"] = self.lock\n", " _update_context[self.key][\"lock\"][\"act\"] = Tracker.lock.set(self.lock)\n", " return self\n", "\n", " def set_data(self, data: T.Optional[dict]) -> \"Tracker\":\n", " \"\"\"\n", " Logically the data attribute should be mutable,\n", " make sure don't edit the old data directly\n", " for example, don't do this::\n", "\n", " self.data[\"foo\"] = \"bar\"\n", " self.set_data(self.data)\n", "\n", " Please do this::\n", "\n", " new_data = self.data.copy()\n", " new_data[\"foo\"] = \"bar\"\n", " self.set_data(new_data)\n", " \"\"\"\n", " _update_context[self.key][\"data\"] = {\"old\": self.data}\n", " self.data = data\n", " _update_context[self.key][\"data\"][\"new\"] = self.data\n", " _update_context[self.key][\"data\"][\"act\"] = Tracker.data.set(data)\n", " return self\n", "\n", " def set_errors(self, errors: T.Optional[dict]) -> \"Tracker\":\n", " \"\"\"\n", " Similar to :meth:`Tracker.set_data`. But it is for errors.\n", " \"\"\"\n", " _update_context[self.key][\"errors\"] = {\"old\": self.errors}\n", " self.errors = errors\n", " _update_context[self.key][\"errors\"][\"new\"] = self.errors\n", " _update_context[self.key][\"errors\"][\"act\"] = Tracker.errors.set(errors)\n", " return self\n", "\n", " @contextmanager\n", " def start(\n", " self,\n", " in_process_status: int,\n", " failed_status: int,\n", " success_status: int,\n", " ignore_status: int,\n", " ) -> \"Tracker\":\n", " # Handle concurrent lock\n", " if self.is_locked():\n", " raise LockError(f\"Task {self.key} is locked.\")\n", "\n", " # Handle ignore status\n", " if self.status == ignore_status:\n", " raise IgnoreError(\n", " f\"Task {self.key} retry count already exceeded {self.MAX_RETRY}, \"\n", " f\"ignore it.\"\n", " )\n", "\n", " # mark as in progress\n", " with self.update_context():\n", " (self.set_status(in_process_status).set_update_time().set_locked())\n", "\n", " try:\n", " self._setup_update_context()\n", " # print(\"before yield\")\n", " yield self\n", " # print(\"after yield\")\n", " (\n", " self.set_status(success_status)\n", " .set_update_time()\n", " .set_unlock()\n", " .set_retry_as_zero()\n", " )\n", " # print(\"end of success logic\")\n", " except Exception as e: # handling user code\n", " # print(\"begin of error handling logic\")\n", " # reset the update context\n", " self._teardown_update_context()\n", " self._setup_update_context()\n", " (\n", " self.set_status(failed_status)\n", " .set_update_time()\n", " .set_unlock()\n", " .set_errors(\n", " {\"error\": repr(e), \"traceback\": traceback.format_exc(limit=10)}\n", " )\n", " .set_retry_plus_one()\n", " )\n", " if self.retry >= self.MAX_RETRY:\n", " self.set_status(ignore_status)\n", " # print(\"end of error handling logic\")\n", " raise e\n", " finally:\n", " # print(\"begin of finally\")\n", " self._flush_update_context()\n", " self._teardown_update_context()\n", " # print(\"end of finally\")\n", "\n", " @classmethod\n", " def query_by_status(\n", " cls,\n", " job_id: str,\n", " status: T.Union[int, T.List[int]],\n", " limit: int = 10,\n", " ) -> T.Iterable[\"Tracker\"]:\n", " if isinstance(status, list):\n", " status_list = status\n", " else:\n", " status_list = [status]\n", " for status in status_list:\n", " yield from cls.status_and_task_id_index.query(\n", " hash_key=cls.make_value(job_id, status),\n", " limit=limit,\n", " )" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } }, { "cell_type": "markdown", "source": [ "## Usage Example" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%% md\n" } } }, { "cell_type": "code", "execution_count": 14, "outputs": [], "source": [ "import enum\n", "import time\n", "from rich import print as rprint\n", "\n", "class StatusEnum(int, enum.Enum):\n", " s00_todo = 0\n", " s03_in_progress = 3\n", " s06_failed = 6\n", " s09_success = 9\n", " s10_ignore = 10\n", "\n", "_status_mapper = {\n", " 0: \"s00_todo\",\n", " 3: \"s03_in_progress\",\n", " 6: \"s06_failed\",\n", " 9: \"s09_success\",\n", " 10: \"s10_ignore\",\n", "}\n", "\n", "class Task(Tracker):\n", " DEFAULT_STATUS = StatusEnum.s00_todo.value\n", "\n", " @property\n", " def status_code(self) -> str:\n", " return _status_mapper[self.status]\n", "\n", " def start_job(\n", " self,\n", " ) -> \"Task\":\n", " \"\"\"\n", " This is just an example of how to use :meth:`Tracker.start`.\n", "\n", " A job should always have four related status codes:\n", "\n", " - in process status\n", " - failed status\n", " - success status\n", " - ignore status\n", "\n", " If you have multiple type of jobs, I recommend creating multiple\n", " wrapper functions like this for each type of jobs. And ensure that\n", " the \"ignore\" status value is the largest status value among all,\n", " and use the same \"ignore\" status value for all type of jobs.\n", " \"\"\"\n", " return self.start(\n", " in_process_status=StatusEnum.s03_in_progress.value,\n", " failed_status=StatusEnum.s06_failed.value,\n", " success_status=StatusEnum.s09_success.value,\n", " ignore_status=StatusEnum.s10_ignore.value,\n", " )\n", "\n", "Task.create_table(wait=True)" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } }, { "cell_type": "markdown", "source": [ "### The Happy Path\n", "\n", "1. Initialize a new task\n", "2. Start the job\n", "3. Do some job\n", "4. See how's the status changed in the DynamoDB table" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%% md\n" } } }, { "cell_type": "code", "execution_count": 38, "outputs": [ { "data": { "text/plain": "\u001B[1m{\u001B[0m\n \u001B[32m'lock_time'\u001B[0m: \u001B[1;35mdatetime.datetime\u001B[0m\u001B[1m(\u001B[0m\u001B[1;36m1970\u001B[0m, \u001B[1;36m1\u001B[0m, \u001B[1;36m1\u001B[0m, \u001B[1;36m0\u001B[0m, \u001B[1;36m0\u001B[0m, \u001B[33mtzinfo\u001B[0m=\u001B[35mdatetime\u001B[0m.timezone.utc\u001B[1m)\u001B[0m,\n \u001B[32m'retry'\u001B[0m: \u001B[1;36m0\u001B[0m,\n \u001B[32m'update_time'\u001B[0m: \u001B[1;35mdatetime.datetime\u001B[0m\u001B[1m(\u001B[0m\u001B[1;36m2022\u001B[0m, \u001B[1;36m12\u001B[0m, \u001B[1;36m26\u001B[0m, \u001B[1;36m4\u001B[0m, \u001B[1;36m9\u001B[0m, \u001B[1;36m18\u001B[0m, \u001B[1;36m866585\u001B[0m, \u001B[33mtzinfo\u001B[0m=\u001B[35mdatetime\u001B[0m.timezone.utc\u001B[1m)\u001B[0m,\n \u001B[32m'key'\u001B[0m: \u001B[32m'parse-json____t-1'\u001B[0m,\n \u001B[32m'value'\u001B[0m: \u001B[32m'parse-json____00'\u001B[0m\n\u001B[1m}\u001B[0m\n", "text/html": "
{\n    'lock_time': datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc),\n    'retry': 0,\n    'update_time': datetime.datetime(2022, 12, 26, 4, 9, 18, 866585, tzinfo=datetime.timezone.utc),\n    'key': 'parse-json____t-1',\n    'value': 'parse-json____00'\n}\n
\n" }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "before the job start, status is 's00_todo', the concurrency lock is locked: False\n", " the job is in progress, status is 's03_in_progress', the concurrency lock is locked: True\n", " do some job ...\n", "job is finished and succeeded without any error, status is 's09_success', the concurrency lock is locked: False\n" ] }, { "data": { "text/plain": "the updated task.data: \u001B[1m{\u001B[0m\u001B[32m'n_records'\u001B[0m: \u001B[1;36m100\u001B[0m\u001B[1m}\u001B[0m\n", "text/html": "
the updated task.data: {'n_records': 100}\n
\n" }, "metadata": {}, "output_type": "display_data" } ], "source": [ "job_id = \"parse-json\"\n", "task_id = \"t-1\"\n", "\n", "task = Task.new(job_id, task_id)\n", "rprint(task.attribute_values)\n", "\n", "print(f\"before the job start, status is {task.status_code!r}, the concurrency lock is locked: {task.is_locked()}\")\n", "with task.start_job():\n", " print(f\" the job is in progress, status is {task.status_code!r}, the concurrency lock is locked: {task.is_locked()}\")\n", " print(\" do some job ...\")\n", " task.set_data({\"n_records\": 100})\n", "print(f\"job is finished and succeeded without any error, status is {task.status_code!r}, the concurrency lock is locked: {task.is_locked()}\")\n", "rprint(f\"the updated task.data: {task.data}\")" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } }, { "cell_type": "markdown", "source": [ "### Error Handling" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%% md\n" } } }, { "cell_type": "code", "execution_count": 34, "outputs": [ { "data": { "text/plain": "\u001B[1m{\u001B[0m\n \u001B[32m'lock_time'\u001B[0m: \u001B[1;35mdatetime.datetime\u001B[0m\u001B[1m(\u001B[0m\u001B[1;36m1970\u001B[0m, \u001B[1;36m1\u001B[0m, \u001B[1;36m1\u001B[0m, \u001B[1;36m0\u001B[0m, \u001B[1;36m0\u001B[0m, \u001B[33mtzinfo\u001B[0m=\u001B[35mdatetime\u001B[0m.timezone.utc\u001B[1m)\u001B[0m,\n \u001B[32m'retry'\u001B[0m: \u001B[1;36m0\u001B[0m,\n \u001B[32m'update_time'\u001B[0m: \u001B[1;35mdatetime.datetime\u001B[0m\u001B[1m(\u001B[0m\u001B[1;36m2022\u001B[0m, \u001B[1;36m12\u001B[0m, \u001B[1;36m26\u001B[0m, \u001B[1;36m4\u001B[0m, \u001B[1;36m6\u001B[0m, \u001B[1;36m44\u001B[0m, \u001B[1;36m124953\u001B[0m, \u001B[33mtzinfo\u001B[0m=\u001B[35mdatetime\u001B[0m.timezone.utc\u001B[1m)\u001B[0m,\n \u001B[32m'key'\u001B[0m: \u001B[32m'parse-json____t-2'\u001B[0m,\n \u001B[32m'value'\u001B[0m: \u001B[32m'parse-json____00'\u001B[0m\n\u001B[1m}\u001B[0m\n", "text/html": "
{\n    'lock_time': datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc),\n    'retry': 0,\n    'update_time': datetime.datetime(2022, 12, 26, 4, 6, 44, 124953, tzinfo=datetime.timezone.utc),\n    'key': 'parse-json____t-2',\n    'value': 'parse-json____00'\n}\n
\n" }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "before the job start, status is s00_todo\n", " the job is in progress, status is s03_in_progress\n", " do some job ...\n", "job is failed, status is s06_failed\n", "task.data is not changed: None\n", "task.retry: 1 \n", "the error is logged in task.error attribute\n", "the raised error object: UserError('something wrong!')\n" ] }, { "data": { "text/plain": "the error traceback: Traceback \u001B[1m(\u001B[0mmost recent call last\u001B[1m)\u001B[0m:\n File \u001B[32m\"/var/folders/3y/7t5ll4sn6x76g8rhfqlc36dw0000gn/T/ipykernel_57940/328322325.py\"\u001B[0m, line \u001B[1;36m416\u001B[0m, in start\n yield self\n File \u001B[32m\"/var/folders/3y/7t5ll4sn6x76g8rhfqlc36dw0000gn/T/ipykernel_57940/2331532066.py\"\u001B[0m, line \u001B[1;36m17\u001B[0m, in \u001B[1m<\u001B[0m\u001B[1;95mcell\u001B[0m\u001B[39m line: \u001B[0m\n\u001B[1;36m12\u001B[0m\u001B[1m>\u001B[0m\n raise \u001B[1;35mUserError\u001B[0m\u001B[1m(\u001B[0m\u001B[32m\"something wrong!\"\u001B[0m\u001B[1m)\u001B[0m\nUserError: something wrong!\n\n", "text/html": "
the error traceback: Traceback (most recent call last):\n  File \"/var/folders/3y/7t5ll4sn6x76g8rhfqlc36dw0000gn/T/ipykernel_57940/328322325.py\", line 416, in start\n    yield self\n  File \"/var/folders/3y/7t5ll4sn6x76g8rhfqlc36dw0000gn/T/ipykernel_57940/2331532066.py\", line 17, in <cell line: \n12>\n    raise UserError(\"something wrong!\")\nUserError: something wrong!\n\n
\n" }, "metadata": {}, "output_type": "display_data" } ], "source": [ "job_id = \"parse-json\"\n", "task_id = \"t-2\"\n", "\n", "task = Task.new(job_id, task_id)\n", "rprint(task.attribute_values)\n", "\n", "# define a custom error\n", "class UserError(Exception):\n", " pass\n", "\n", "print(f\"before the job start, status is {task.status_code}\")\n", "try:\n", " with task.start_job():\n", " print(f\" the job is in progress, status is {task.status_code}\")\n", " print(\" do some job ...\")\n", " task.set_data({\"n_records\": 100})\n", " raise UserError(\"something wrong!\")\n", "except UserError:\n", " pass\n", "\n", "print(f\"job is failed, status is {task.status_code}\")\n", "print(f\"task.data is not changed: {task.data}\")\n", "print(f\"task.retry: {task.retry} \")\n", "print(\"the error is logged in task.error attribute\")\n", "print(f\"the raised error object: {task.errors['error']}\")\n", "rprint(f\"the error traceback: {task.errors['traceback']}\")" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } }, { "cell_type": "markdown", "source": [ "## Catch up from Last Success" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%% md\n" } } }, { "cell_type": "code", "execution_count": 39, "outputs": [ { "data": { "text/plain": "\u001B[1m[\u001B[0mtracker-pattern\u001B[1m<\u001B[0m\u001B[1;95mcopy-files____t-\u001B[0m\u001B[1;36m1\u001B[0m\u001B[1m>\u001B[0m, tracker-pattern\u001B[1m<\u001B[0m\u001B[1;95mcopy-files____t-\u001B[0m\u001B[1;36m3\u001B[0m\u001B[1m>\u001B[0m\u001B[1m]\u001B[0m\n", "text/html": "
[tracker-pattern<copy-files____t-1>, tracker-pattern<copy-files____t-3>]\n
\n" }, "metadata": {}, "output_type": "display_data" } ], "source": [ "job_id = \"copy-files\"\n", "\n", "Task.make(job_id, \"t-1\", StatusEnum.s00_todo.value).save()\n", "Task.make(job_id, \"t-2\", StatusEnum.s03_in_progress.value).save()\n", "Task.make(job_id, \"t-3\", StatusEnum.s06_failed.value).save()\n", "Task.make(job_id, \"t-4\", StatusEnum.s09_success.value).save()\n", "Task.make(job_id, \"t-5\", StatusEnum.s10_ignore.value).save()\n", "\n", "rprint(\n", " list(\n", " Task.query_by_status(\n", " job_id,\n", " status=[\n", " StatusEnum.s00_todo.value,\n", " StatusEnum.s06_failed.value,\n", " ]\n", " )\n", " )\n", ")" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.6" } }, "nbformat": 4, "nbformat_minor": 0 }