Files
devopsexam/database.py

273 lines
5.9 KiB
Python

import os
import sqlite3
import hashlib
import datetime
import psycopg2
user_db_file_location = "database_file/users.db"
note_db_file_location = "database_file/notes.db"
image_db_file_location = "database_file/images.db"
_schema_initialized = False
def use_postgres():
return os.getenv("DATABASE_URL") is not None
def placeholder():
return "%s" if use_postgres() else "?"
def get_connection(sqlite_db_file=None):
global _schema_initialized
database_url = os.getenv("DATABASE_URL")
if use_postgres():
conn = psycopg2.connect(database_url)
if not _schema_initialized:
init_postgres_schema(conn)
_schema_initialized = True
return conn
return sqlite3.connect(sqlite_db_file)
def init_postgres_schema(conn):
c = conn.cursor()
c.execute("""
CREATE TABLE IF NOT EXISTS users (
id TEXT PRIMARY KEY,
pw TEXT NOT NULL
);
""")
c.execute("""
CREATE TABLE IF NOT EXISTS notes (
"user" TEXT NOT NULL,
timestamp TEXT NOT NULL,
note TEXT NOT NULL,
note_id TEXT PRIMARY KEY
);
""")
c.execute("""
CREATE TABLE IF NOT EXISTS images (
uid TEXT PRIMARY KEY,
owner TEXT NOT NULL,
name TEXT NOT NULL,
timestamp TEXT NOT NULL
);
""")
conn.commit()
c.close()
def list_users():
_conn = get_connection(user_db_file_location)
_c = _conn.cursor()
_c.execute("SELECT id FROM users;")
result = [x[0] for x in _c.fetchall()]
_conn.close()
return result
def verify(id, pw):
_conn = get_connection(user_db_file_location)
_c = _conn.cursor()
_c.execute(
f"SELECT pw FROM users WHERE id = {placeholder()};",
(id.upper(),)
)
row = _c.fetchone()
_conn.close()
if row is None:
return False
return row[0] == hashlib.sha256(pw.encode()).hexdigest()
def delete_user_from_db(id):
user_id = id.upper()
_conn = get_connection(user_db_file_location)
_c = _conn.cursor()
_c.execute(
f"DELETE FROM users WHERE id = {placeholder()};",
(user_id,)
)
_conn.commit()
_conn.close()
# when we delete a user from USERS, delete all notes owned by the user
_conn = get_connection(note_db_file_location)
_c = _conn.cursor()
_c.execute(
f'DELETE FROM notes WHERE "user" = {placeholder()};',
(user_id,)
)
_conn.commit()
_conn.close()
# when we delete a user from USERS, delete all image records owned by the user
_conn = get_connection(image_db_file_location)
_c = _conn.cursor()
_c.execute(
f"DELETE FROM images WHERE owner = {placeholder()};",
(user_id,)
)
_conn.commit()
_conn.close()
def add_user(id, pw):
_conn = get_connection(user_db_file_location)
_c = _conn.cursor()
_c.execute(
f"INSERT INTO users VALUES ({placeholder()}, {placeholder()});",
(id.upper(), hashlib.sha256(pw.encode()).hexdigest())
)
_conn.commit()
_conn.close()
def read_note_from_db(id):
_conn = get_connection(note_db_file_location)
_c = _conn.cursor()
_c.execute(
f'SELECT note_id, timestamp, note FROM notes WHERE "user" = {placeholder()};',
(id.upper(),)
)
result = _c.fetchall()
_conn.close()
return result
def match_user_id_with_note_id(note_id):
# Given the note id, confirm if the current user is the owner of the note being operated.
_conn = get_connection(note_db_file_location)
_c = _conn.cursor()
_c.execute(
f'SELECT "user" FROM notes WHERE note_id = {placeholder()};',
(note_id,)
)
row = _c.fetchone()
_conn.close()
if row is None:
return None
return row[0]
def write_note_into_db(id, note_to_write):
_conn = get_connection(note_db_file_location)
_c = _conn.cursor()
current_timestamp = str(datetime.datetime.now())
note_id = hashlib.sha1((id.upper() + current_timestamp).encode()).hexdigest()
_c.execute(
f"INSERT INTO notes VALUES ({placeholder()}, {placeholder()}, {placeholder()}, {placeholder()});",
(id.upper(), current_timestamp, note_to_write, note_id)
)
_conn.commit()
_conn.close()
def delete_note_from_db(note_id):
_conn = get_connection(note_db_file_location)
_c = _conn.cursor()
_c.execute(
f"DELETE FROM notes WHERE note_id = {placeholder()};",
(note_id,)
)
_conn.commit()
_conn.close()
def image_upload_record(uid, owner, image_name, timestamp):
_conn = get_connection(image_db_file_location)
_c = _conn.cursor()
_c.execute(
f"INSERT INTO images VALUES ({placeholder()}, {placeholder()}, {placeholder()}, {placeholder()});",
(uid, owner, image_name, timestamp)
)
_conn.commit()
_conn.close()
def list_images_for_user(owner):
_conn = get_connection(image_db_file_location)
_c = _conn.cursor()
_c.execute(
f"SELECT uid, timestamp, name FROM images WHERE owner = {placeholder()};",
(owner,)
)
result = _c.fetchall()
_conn.close()
return result
def match_user_id_with_image_uid(image_uid):
# Given the image uid, confirm if the current user is the owner of the image being operated.
_conn = get_connection(image_db_file_location)
_c = _conn.cursor()
_c.execute(
f"SELECT owner FROM images WHERE uid = {placeholder()};",
(image_uid,)
)
row = _c.fetchone()
_conn.close()
if row is None:
return None
return row[0]
def delete_image_from_db(image_uid):
_conn = get_connection(image_db_file_location)
_c = _conn.cursor()
_c.execute(
f"DELETE FROM images WHERE uid = {placeholder()};",
(image_uid,)
)
_conn.commit()
_conn.close()
if __name__ == "__main__":
print(list_users())