You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
62 lines
2.0 KiB
62 lines
2.0 KiB
from __future__ import annotations |
|
|
|
import time |
|
import typing |
|
import logging |
|
|
|
from cloudant.document import Document |
|
|
|
|
|
class Pointer: |
|
""" |
|
Pointer object. |
|
""" |
|
def __init__(self, pointers_db: object, |
|
identifier: str = None) -> typing.NoReturn: |
|
""" |
|
Initialize the Pointer object. |
|
|
|
:param pointers_db: The Pointer database object. |
|
:param identifier: A uniquely generated ID identifying the pointer object. |
|
""" |
|
self.logger = logging.getLogger(self.__class__.__name__) |
|
self.pointers_db = pointers_db |
|
self.identifier = identifier |
|
self.data_hash = None |
|
self.ttl = None |
|
self.timestamp = time.time() |
|
|
|
def generate_pointer(self, data_hash: str, ttl: time.time) -> Pointer: |
|
""" |
|
Generates a pointer object and saves it into the database. |
|
|
|
:param data_hash: A uniquely generated ID identifying the data object. |
|
:param ttl: The "Time to Live" of the pointer. |
|
:returns: The Pointer object. |
|
""" |
|
self.logger.debug("identifier is %s", self.identifier) |
|
with Document(self.pointers_db, self.identifier) as pointer: |
|
pointer['value'] = data_hash |
|
pointer['ttl'] = ttl |
|
pointer['timestamp'] = self.timestamp |
|
self.data_hash = data_hash |
|
self.ttl = ttl |
|
return self |
|
|
|
def get_pointer(self, identifier: str) -> Pointer: |
|
""" |
|
Retrieve a pointer object from the database. |
|
|
|
:param identifier: A uniquely generated ID identifying the Pointer object. |
|
:returns: The Pointer object requested. |
|
""" |
|
with Document(self.pointers_db, identifier) as pointer: |
|
try: |
|
self.identifier = pointer['_id'] |
|
self.data_hash = pointer['value'] |
|
self.ttl = pointer['ttl'] |
|
self.timestamp = pointer['timestamp'] |
|
return self |
|
except KeyError: |
|
pass |
|
return None
|
|
|