Browse Source

Committing initial draft of shortenit

flask
Elia El Lazkani 2 years ago
parent
commit
03a38d7160
  1. 3
      .gitignore
  2. 2
      README.rst
  3. 7
      config/config.yaml
  4. 1
      requirements/development.txt
  5. 5
      requirements/requirements.txt
  6. 48
      setup.py
  7. 0
      shortenit/__init__.py
  8. 39
      shortenit/common.py
  9. 22
      shortenit/config.py
  10. 29
      shortenit/counter.py
  11. 57
      shortenit/data.py
  12. 57
      shortenit/db.py
  13. 38
      shortenit/logger.py
  14. 102
      shortenit/main.py
  15. 67
      shortenit/pointer.py
  16. 78
      shortenit/web.py
  17. 110
      templates/index.html

3
.gitignore

@ -0,0 +1,3 @@
.eggs/
*.egg-info
__pycache__/

2
README.rst

@ -2,3 +2,5 @@ shortenit
=========
shortenit is a tool to shorten urls.
NOTE: This is a very early draft project. Contributions are welcome.

7
config/config.yaml

@ -0,0 +1,7 @@
Server:
host: 127.0.0.1
port: 8000
CouchDB:
username: root
password: root
url: http://localhost:5984

1
requirements/development.txt

@ -0,0 +1 @@
setuptools-git

5
requirements/requirements.txt

@ -0,0 +1,5 @@
pyyaml
cloudant
aiohttp
aiohttp-jinja2
trafaret

48
setup.py

@ -0,0 +1,48 @@
from pathlib import Path
from setuptools import setup
#<link rel=stylesheet type=text/css href="{{ url('static', filename='css/custom.css') }}">
here = Path(__file__).absolute().parent
with open(str(here.joinpath('README.rst')), encoding='utf-8') as f:
long_description = f.read()
with open(str(here.joinpath('requirements/requirements.txt')),
encoding='utf-8') as f:
install_requires = f.read()
description = 'Shortenit will shorten that url for you'
setup(
name='shortenit',
exclude_package_data={'': ['.gitignore', 'requirements/', 'setup.py']},
version_format='{tag}_{gitsha}',
setup_requires=['setuptools-git', 'setuptools-git-version'],
license='BSD',
author='Elia El Lazkani',
author_email='eliaellazkani@gmail.com',
url='https://gitlab.com/elazkani/shortenit',
python_requires='>=python3.6',
description=description,
long_description=long_description,
install_requires=install_requires,
entry_points={
'console_scripts': [
'shortenit = shortenit.main:main'
]
},
packages=['shortenit'],
classifiers=[
'Development Status :: 4 - Beta',
'License :: OSI Approved :: BSD License',
'Operating System :: POSIX :: Linux',
'Operating System :: MacOS :: MacOS X',
'Programming Language :: Python',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Environment :: Console',
'Intended Audience :: Information Technology',
'Intended Audience :: System Administrators'
],
zip_safe=False
)

0
shortenit/__init__.py

39
shortenit/common.py

@ -0,0 +1,39 @@
import os
import logging
# Setup logging
logger = logging.getLogger(__name__)
def normalize_path(path: str) -> str:
"""
Method to expand and return an absolute
path from a normal path.
:param path: The path to normalize.
:returns: The absolute path.
"""
logger.debug("Normalizing path: %s", path)
exp_path = os.path.expanduser(path)
abs_path = os.path.abspath(exp_path)
logger.debug("Normalized path: %s", abs_path)
return abs_path
def check_file(path: str) -> str:
"""
Method to normalize the path of a file and
check if the file exists and is a file.
:param path: The file path to check.
:returns: The absolute path of a file.
:raises: FileNotFoundError
"""
logger.debug("Checking file: %s", path)
file_path = normalize_path(path)
if not os.path.exists(file_path) or not os.path.isfile(file_path):
logger.error("File '%s' not found, raising exception", file_path)
raise FileNotFoundError
logger.debug("File '%s' found, returning path", file_path)
return file_path

22
shortenit/config.py

@ -0,0 +1,22 @@
import yaml
class Config:
def __init__(self, config_path: str):
self.config_path = config_path
self.config = None
def get_config(self):
if not self.config:
_config = self.load_config()
self.config = _config
return self.config
def load_config(self):
with open(self.config_path, 'rt') as f:
config = yaml.load(f)
if self.validate_config(config):
return config
def validate_config(self, config):
return True

29
shortenit/counter.py

@ -0,0 +1,29 @@
import logging
from cloudant.document import Document
class Counter:
def __init__(self, counter_db):
self.logger = logging.getLogger(self.__class__.__name__)
self.counter_db = counter_db
self.counter = None
def get_counter(self) -> int:
with Document(self.counter_db, 'counter') as counter:
self.logger.debug("Counter: %s", counter)
try:
self.counter = counter['value']
except KeyError:
self.logger.warn(
"Counter was not initialized, initializing...")
counter['value'] = 0
try:
counter['value'] += 1
except Exception as e:
self.logger.err(e)
# Need to check if the value exists or not as to not jump values
# which it currently does but it's not a big issue for right now
self.counter = counter['value']
return self.counter

57
shortenit/data.py

@ -0,0 +1,57 @@
import time
import logging
from hashlib import sha256
from cloudant.document import Document
class Data:
def __init__(self, data_db: object,
identifier: str = None,
data: str = None):
self.logger = logging.getLogger(self.__class__.__name__)
self.data_db = data_db
self.identifier = identifier
self.data = data
self.timestamp = time.time()
self.pointers = []
self.data_found = None
self.populate()
def generate_identifier(self):
hash_object = sha256(self.data.encode('utf-8'))
self.identifier = hash_object.hexdigest()
def populate(self, pointer: str = None):
if self.identifier:
self.logger.debug("The identifier is set, retrieving data...")
self.get_data()
elif self.data:
self.logger.debug("The data is set, generating an identifier...")
self.generate_identifier()
self.logger.debug("Attempting to get the data with "
"the identifier generated...")
self.get_data()
if not self.data_found:
self.logger.debug("The data generated is not found, "
"creating...")
self.set_data(pointer)
def get_data(self):
with Document(self.data_db, self.identifier) as data:
try:
self.data = data['value']
self.timestamp = data['timestamp']
self.pointers = data['pointers']
self.data_found = True
except KeyError:
self.data_found = False
def set_data(self, pointer):
with Document(self.data_db, self.identifier) as data:
data['value'] = self.data
data['timestamp'] = self.timestamp
try:
data['pointers'].append(pointer)
except KeyError:
data['pointers'] = [pointer]

57
shortenit/db.py

@ -0,0 +1,57 @@
import logging
from cloudant.client import CouchDB
class DB:
def __init__(self, config: dict) -> None:
self.logger = logging.getLogger(self.__class__.__name__)
self.username = config['username']
self.password = config['password']
self.url = config['url']
self.client = None
self.session = None
def initialize_shortenit(self):
try:
self.counter_db = self.client['counter']
except KeyError:
self.logger.warn(
"The 'counter' database was not found, creating...")
self.counter_db = self.client.create_database('counter')
if self.counter_db.exists():
self.logger.info(
"The 'counter' database was successfully created.")
try:
self.data_db = self.client['data']
except KeyError:
self.logger.warn(
"The 'data' database was not found, creating...")
self.data_db = self.client.create_database('data')
if self.data_db.exists():
self.logger.info(
"The 'data' database was successfully created.")
try:
self.pointers_db = self.client['pointers']
except KeyError:
self.logger.warn(
"The 'pointers' database was not found, creating...")
self.pointers_db = self.client.create_database('pointers')
if self.pointers_db.exists():
self.logger.info(
"The 'pointers' database was successfully created.")
def __enter__(self) -> CouchDB:
"""
"""
self.client = CouchDB(self.username, self.password,
url=self.url, connect=True)
self.session = self.client.session()
return self
def __exit__(self, *args) -> None:
"""
"""
self.client.disconnect()

38
shortenit/logger.py

@ -0,0 +1,38 @@
import os
import yaml
import logging.config
from .common import check_file
def setup_logging(default_path: str = None,
default_level: int = logging.ERROR,
env_key: str = 'LOG_CFG') -> None:
"""
Method that sets the logging system up.
:param default_path: The path to the logger configuration.
:param default_level: The default logging level (DEFAULT: ERROR)
:param env_key: The environment variable specifying the path to the
configuration file.
"""
value = os.getenv(env_key, None)
path = None
if default_path:
try:
path = check_file(default_path)
except FileNotFoundError:
path = None
if value:
try:
path = check_file(value)
except FileNotFoundError:
path = None
if path:
with open(path, mode='r') as f:
config = yaml.safe_load(f.read())
logging.config.dictConfig(config)
else:
_format = '%(asctime)s - %(levelname)s - %(filename)s:' \
'%(name)s.%(funcName)s:%(lineno)d - %(message)s'
logging.basicConfig(level=default_level, format=_format)

102
shortenit/main.py

@ -0,0 +1,102 @@
#!/usr/bin/env python3
import argparse
import logging
import pathlib
import asyncio
import time
from .data import Data
from .pointer import Pointer
from .config import Config
from .counter import Counter
from .db import DB
from .logger import setup_logging
from .web import Web, SiteHandler
PROJECT_ROOT = pathlib.Path(__file__).parent.parent
CONFIGURATION = f'{PROJECT_ROOT}/config/config.yaml'
# Setup logging
logger = logging.getLogger(__name__)
def main() -> None:
"""
Main method
"""
parser = argument_parse()
args = parser.parse_args()
verbosity_level = verbosity(args.verbose)
setup_logging(args.logger, verbosity_level)
config = Config(CONFIGURATION).get_config()
db_config = config.get('CouchDB', None)
server_config = config.get('Server', None)
if db_config:
with DB(db_config) as db:
db.initialize_shortenit()
loop = asyncio.get_event_loop()
handler = SiteHandler(db, shorten_url, lenghten_url)
web = Web(loop, handler)
web.host = server_config.get('host', None)
web.port = server_config.get('port', None)
web.start_up()
def shorten_url(database: DB, data: str, ttl: time.time):
counter = Counter(database.counter_db)
data = Data(database.data_db,
data=data)
data.populate()
pointer = Pointer(database.pointers_db, counter)
pointer.generate_pointer(
data.identifier,
ttl
)
data.set_data(pointer.identifier)
return pointer.identifier
def lenghten_url(database: DB, identifier: str):
pointer = Pointer(database.pointers_db)
pointer.get_pointer(identifier)
data = Data(database.data_db, identifier=pointer.data_hash)
data.populate()
return data.data
def argument_parse() -> argparse.ArgumentParser:
"""
Method to extract the arguments from the command line.
:returns: The argument parser.
"""
parser = argparse.ArgumentParser(
description="Generates rundeck resources "
"file from different API sources.")
parser.add_argument(
'-v', '--verbose', action='count', default=0,
help='Verbosity level to use.')
parser.add_argument(
'-l', '--logger', type=str,
help='The logger YAML configuration file.')
return parser
def verbosity(verbose: int):
"""
Method to set the verbosity.
:param verbose: The verbosity set by user.
:returns: The verbosity level.
"""
if verbose == 0:
return logging.ERROR
elif verbose == 1:
return logging.WARNING
elif verbose == 2:
return logging.INFO
elif verbose > 2:
return logging.DEBUG

67
shortenit/pointer.py

@ -0,0 +1,67 @@
import time
import logging
from .counter import Counter
from cloudant.document import Document
CHARS = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ'
class Pointer:
def __init__(self, pointers_db: object,
counter: Counter = None) -> None:
self.logger = logging.getLogger(self.__class__.__name__)
self.pointers_db = pointers_db
self.counter = counter
self.identifier = None
self.data_hash = None
self.ttl = None
self.timestamp = time.time()
def generate_pointer(self, data_hash: str, ttl: time.time):
self.logger.debug("Generating new counter...")
counter = self.counter.get_counter()
self.logger.debug("Encoding the counter into an ID")
self.identifier = Pointer.encode(counter)
self.logger.debug("Encoded counter is %s", self.identifier)
with Document(self.pointers_db, self.identifier) as pointer:
pointer['value'] = data_hash
pointer['ttl'] = ttl
pointer['timestamp'] = self.timestamp
self.data_hash = data_hash
self.ttl = ttl
return self
def get_pointer(self, identifier: str):
with Document(self.pointers_db, identifier) as pointer:
try:
self.identifier = pointer['_id']
self.data_hash = pointer['value']
self.ttl = pointer['ttl']
self.timestamp = pointer['timestamp']
return self
except KeyError:
pass
return None
@staticmethod
def encode(counter):
sign = '-' if counter < 0 else ''
counter = abs(counter)
result = ''
while counter > 0:
counter, remainder = divmod(counter, len(CHARS))
result = CHARS[remainder]+result
return sign+result
@staticmethod
def decode(counter):
return int(counter, len(CHARS))
@staticmethod
def padding(counter: str, count: int=6):
if len(counter) < count:
pad = '0' * (count - len(counter))
return f"{pad}{counter}"
return f"{counter}"

78
shortenit/web.py

@ -0,0 +1,78 @@
import os
import logging
import aiohttp_jinja2
import jinja2
import trafaret as t
from aiohttp import web
from pathlib import Path
class Web:
def __init__(self, loop, handler):
self.logger = logging.getLogger(self.__class__.__name__)
self.loop = loop
self.app = None
self.host = None
self.port = None
self.handler = handler
self.router = None
self.loader = None
def start_up(self):
self.loop.run_until_complete(self.init())
web.run_app(self.app, host=self.host, port=self.port)
async def init(self):
self.app = web.Application(loop=self.loop)
templates = Path(__file__).absolute().parent.parent.joinpath('templates')
self.loader = jinja2.FileSystemLoader(str(templates))
self.logger.debug(str(templates))
aiohttp_jinja2.setup(self.app,
loader=self.loader)
self.setup_routes()
def setup_routes(self):
self.router = self.app.router
self.router.add_get('/', self.handler.index,
name='index')
self.router.add_post('/shortenit', self.handler.shortenit,
name='shortenit')
self.router.add_get('/{identifier}', self.handler.redirect,
name='redirect')
class SiteHandler:
def __init__(self, database, shorten_url, lenghten_url):
self.logger = logging.getLogger(self.__class__.__name__)
self.database = database
self.shorten_url = shorten_url
self.lenghten_url = lenghten_url
self.shortenit_load_format = t.Dict({
t.Key('url'): t.URL,
t.Key('timestamp'): t.Int
})
async def shortenit(self, request):
data = await request.json()
self.logger.debug(data)
try:
data = self.shortenit_load_format(data)
except t.DataError:
raise web.HTTPBadRequest('URL is not valid')
short_url = self.shorten_url(self.database, data['url'], data['timestamp'])
self.logger.debug(short_url)
return web.json_response({"url": short_url})
async def redirect(self, request):
identifier = request.match_info['identifier']
url = self.lenghten_url(self.database, identifier)
if not url:
raise web.HTTPNotFound()
return web.HTTPFound(location=url)
@aiohttp_jinja2.template('index.html')
async def index(self, request):
return {}

110
templates/index.html

@ -0,0 +1,110 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
<meta name="description" content="The personal URL Shortener">
<meta name="author" content="">
<title>ShortenIt</title>
<!-- Bootstrap core CSS -->
<link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.3.1/css/bootstrap.min.css" integrity="sha384-ggOyR0iXCbMQv3Xipma34MD+dH/1fQ784/j6cY/iJTQUOhcWr7x9JvoRxT2MZw1T" crossorigin="anonymous">
<script src="https://code.jquery.com/jquery-3.4.1.min.js" integrity="sha256-CSXorXvZcTkaix6Yvo6HppcZGetbYMGWSFlBw8HfCJo=" crossorigin="anonymous"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/popper.js/1.14.7/umd/popper.min.js" integrity="sha384-UO2eT0CpHqdSJQ6hJty5KVphtPhzWj9WO1clHTMGa3JDZwrnQq4sF86dIHNDz0W1" crossorigin="anonymous"></script>
<script src="https://stackpath.bootstrapcdn.com/bootstrap/4.3.1/js/bootstrap.min.js" integrity="sha384-JjSmVgyd0p3pXB1rRibZUAYoIIy6OrQ6VrjIEaFf/nJGzIxFDsf4x0xIM+B07jRM" crossorigin="anonymous"></script>
</head>
<script type="text/javascript">
var href = window.location.href;
$(function() {
$('#submitButton').click(function() {
$.ajax({
type: "POST",
url: "/shortenit",
data: JSON.stringify({'url' : $('#url-input').val(),
'timestamp': Date.now()}),
success: returnSuccess,
dataType: 'json',
contentType: "application/json",
});
});
});
function returnSuccess(data, textStatus, jqXHR) {
var url = href.concat(data.url);
console.log(window.location.href)
if(data.url) {
document.getElementById("url-result").value = url;
} else {
document.getElementById("url-result").value = "Please enter a valid URL!";
}
}
function copyToClipboard() {
/* Get the text field */
var copyText = document.querySelector("#url-result");
/* Select the text field */
copyText.select();
copyText.setSelectionRange(0, 99999); /*For mobile devices*/
/* Copy the text inside the text field */
document.execCommand("copy");
/* Alert the copied text */
alert("Copied the text: " + copyText.value);
}
</script>
<body>
<!-- Page Content -->
<div class="container">
<div class="row">
<div class="col-lg-12 text-center">
<h1 class="mt-5">Shorten It!</h1>
</div>
</div>
<div class="row">
<div class="col-lg-12 form-group">
<div class="form-row">
<div class="col-9">
<input type="text" class="form-control" name="url" id="url-input" placeholder="https://www.duckduckgo.com" />
</div>
<div class="col-3">
<button id="submitButton" type="button" class="btn btn-outline-primary">Shorten URL</button>
</div>
</div>
</div>
</div>
<!-- <a href="#" id="url-result">Enter URL</a> -->
<div class="row">
<div class="col-lg-12 form-group">
<div class="form-row">
<div class="col-9">
<input type="text" id="url-result" class="form-control" readonly>
</div>
<div class="col-3">
<button class="btn btn-outline-primary" type="button" id="copy-button" data-toggle="tooltip" onclick="copyToClipboard()" data-placement="button" title="Copy to Clipboard">Copy</button>
</div>
</div>
</div>
</div>
<!-- Bootstrap core JavaScript -->
</body>
</html>
Loading…
Cancel
Save