My Own Database Trial
This is the lesson that talks about using programs with Data and SQL based around Big Idea 2.4, and I am making my own database using the code from 2.4
"""
These imports define the key objects
"""
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
"""
These object and definitions are used throughout the Jupyter Notebook.
"""
# Setup of key Flask object (app)
app = Flask(__name__)
# Setup SQLAlchemy object and properties for the database (db)
database = 'sqlite:///sqlite.db' # path and filename of databaseapp.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_DATABASE_URI'] = database
app.config['SECRET_KEY'] = 'SECRET_KEY'
db = SQLAlchemy()
# This belongs in place where it runs once per project
db.init_app(app)
""" database dependencies to support sqlite examples """
import datetime
from datetime import datetime
import json
from sqlalchemy.exc import IntegrityError
from werkzeug.security import generate_password_hash, check_password_hash
''' Tutorial: https://www.sqlalchemy.org/library.html#tutorials, try to get into a Python shell and follow along '''
class Phone(db.Model):
__tablename__ = 'Phones' # table name is plural, class name is singular
id = db.Column(db.Integer, primary_key=True)
_company = db.Column(db.String(255), unique=False, nullable=False) #_name
_model = db.Column(db.String(255), unique=True, nullable=False) #_uid
_price = db.Column(db.String(255), unique=False, nullable=False) # _password
_dob = db.Column(db.Date) #_dob
def __init__(self, company, model, price, dob=datetime.today()):
self._company = company # variables with self prefix become part of the object,
self._model = model
self._price = price
# self.set_password(password)
if isinstance(dob, str): # not a date type
dob = date=datetime.today()
self._dob = dob
@property
def company(self):
return self._company
@company.setter
def company(self, company):
self._company = company
@property
def model(self):
return self._model
@model.setter
def model(self, model):
self._model = model
def is_model(self, model):
return self._model == model
@property
def price(self):
return self._price
@price.setter
def price(self, price):
self._price = price
# @property
# def password(self):
# return self._password[0:10] + "..." # because of security only show 1st characters
# # update password, this is conventional setter
# def set_password(self, password):
# """Create a hashed password."""
# self._password = generate_password_hash(password, method='sha256')
# # check password parameter versus stored/encrypted password
# def is_password(self, password):
# """Check against hashed password."""
# result = check_password_hash(self._password, password)
# return result
# dob property is returned as string, to avoid unfriendly outcomes
@property
def dob(self):
dob_string = self._dob.strftime('%m-%d-%Y')
return dob_string
@dob.setter
def dob(self, dob):
if isinstance(dob, str): # not a date type
dob = date=datetime.today()
self._dob = dob
@property
def age(self):
today = datetime.today()
return today.year - self._dob.year - ((today.month, today.day) < (self._dob.month, self._dob.day))
def __str__(self):
return json.dumps(self.read())
def create(self):
try:
db.session.add(self) # add prepares to persist person object to Users table
db.session.commit() # SqlAlchemy "unit of work pattern" requires a manual commit
return self
except IntegrityError:
db.session.remove()
return None
def read(self):
return {
"id": self.id,
"company": self.company,
"model": self.model,
"price": self.price,
"dob": self.dob,
"age": self.age,
}
def update(self, company="", model="", price=""):
"""only updates values with length"""
if len(company) > 0:
self.company = company
if len(model) > 0:
self.model = model
if len(price) > 0:
self.price = price
db.session.commit()
db.session.add(self)
return self
def delete(self):
db.session.delete(self)
db.session.commit()
return None
"""Database Creation and Testing """
# Builds working data for testing
def initPhones():
with app.app_context():
"""Create database and tables"""
db.create_all()
"""Tester data for table"""
p1 = Phone(company='Apple', model='iPhone 14', price='799')
p2 = Phone(company='Apple', model='iPhone 14 Pro', price='999')
p3 = Phone(company='Samsung', model='Galaxy S23', price='799')
p4 = Phone(company='LG', model='Wing', price='999')
p5 = Phone(company='Motorola', model='Razr', price='1399')
p6 = Phone(company='Google', model='Pixel 7', price='599')
phones = [p1, p2, p3, p4, p5, p6]
"""Builds sample user/note(s) data"""
for phone in phones:
try:
'''add user to table'''
object = phone.create()
print(f"Created new uid {object.model}")
except: # error raised if object nit created
'''fails with bad or duplicate data'''
print(f"Records exist uid {phone.model}, or error.")
initPhones()
def find_by_model(model):
with app.app_context():
phone = Phone.query.filter_by(_model=model).first()
return phone # returns user object
# # Check credentials by finding user and verify password
# def check_credentials(model, price):
# # query email and return user record
# phone = find_by_model(company)
# if phone == None:
# return False
# if (phone.is_password(price)):
# return True
# return False
# check_credentials("indi", "123qwerty")
def create():
# optimize user time to see if uid exists
model = input("Enter your model:")
phone = find_by_model(model)
try:
print("Found\n", phone.read())
return
except:
pass # keep going
# request value that ensure creating valid object
company = input("Enter the company:")
price = input("Enter the price")
# Initialize User object before date
phone = Phone(company=company,
model=model,
price=price,
)
# create user.dob, fail with today as dob
# dob = input("Enter your date of birth 'YYYY-MM-DD'")
# try:
# user.dob = datetime.strptime(dob, '%Y-%m-%d').date()
# except ValueError:
# user.dob = datetime.today()
# print(f"Invalid date {dob} require YYYY-mm-dd, date defaulted to {user.dbo}")
# write object to database
with app.app_context():
try:
object = phone.create()
print("Created\n", object.read())
except: # error raised if object not created
print("Unknown error uid {uid}")
create()
# SQLAlchemy extracts all users from database, turns each user into JSON
def read():
with app.app_context():
table = Phone.query.all()
json_ready = [phone.read() for phone in table] # each user adds user.read() to list
return json_ready
# read()
def delete_by_company(): # makes a new function called delete_by_uid
model = input("Enter uid of user to be deleted.") # prompts the user to enter the uid
user = find_by_model(model) # using previous function to locate user by inputted id
with app.app_context():
try:
object = user.delete()
print(f"User with uid --{model}-- has been deleted. ")
db = read()
print(db)
except: # error raised if object not found
(f"No user with uid {model} was found.")
delete_by_company()
import sqlite3
database = 'instance/sqlite.db' # this is location of database
def schema():
# Connect to the database file
conn = sqlite3.connect(database)
# Create a cursor object to execute SQL queries
cursor = conn.cursor()
# Fetch results of Schema
results = cursor.execute("PRAGMA table_info('phones')").fetchall()
# Print the results
for row in results:
print(row)
# Close the database connection
conn.close()
schema()
import sqlite3
def read():
# Connect to the database file
conn = sqlite3.connect(database)
# Create a cursor object to execute SQL queries
cursor = conn.cursor()
# Execute a SELECT statement to retrieve data from a table
results = cursor.execute('SELECT * FROM phones').fetchall()
# Print the results
if len(results) == 0:
print("Table is empty")
else:
for row in results:
print(row)
# Close the cursor and connection objects
cursor.close()
conn.close()
read()
import sqlite3
def create():
company = input("Enter the company:")
model = input("Enter the phone model:")
price = input("Enter the price")
# dob = input("Enter your date of birth 'YYYY-MM-DD'")
# Connect to the database file
conn = sqlite3.connect(database)
# Create a cursor object to execute SQL commands
cursor = conn.cursor()
try:
# Execute an SQL command to insert data into a table
cursor.execute("INSERT INTO phones (_company, _model, _price) VALUES (?, ?, ?)", (company, model, price))
# Commit the changes to the database
conn.commit()
print(f"A new smartphone record {model} has been created")
except sqlite3.Error as error:
print("Error while executing the INSERT:", error)
# Close the cursor and connection objects
cursor.close()
conn.close()
create()
import sqlite3
def update():
model = input("Enter smartphone model to update")
price = input("Enter updated price")
# if len(password) < 2:
# message = "hacked"
# password = 'gothackednewpassword123'
# else:
# message = "successfully updated"
# Connect to the database file
conn = sqlite3.connect(database)
# Create a cursor object to execute SQL commands
cursor = conn.cursor()
try:
# Execute an SQL command to update data in a table
cursor.execute("UPDATE phones SET _price = ? WHERE _model = ?", (price, model))
if cursor.rowcount == 0:
# The uid was not found in the table
print(f"No smartphone model called {model} was found in the table")
else:
print(f"The row with {model} has been successfully updated with a price of {price}")
conn.commit()
except sqlite3.Error as error:
print("Error while executing the UPDATE:", error)
# Close the cursor and connection objects
cursor.close()
conn.close()
update()
import sqlite3
def delete():
model = input("Enter a smartphone model to delete")
# Connect to the database file
conn = sqlite3.connect(database)
# Create a cursor object to execute SQL commands
cursor = conn.cursor()
try:
cursor.execute("DELETE FROM phones WHERE _model = ?", (model,))
if cursor.rowcount == 0:
# The uid was not found in the table
print(f"No smartphone called {model} was found in the table")
else:
# The uid was found in the table and the row was deleted
print(f"The row with {model} was successfully deleted")
conn.commit()
except sqlite3.Error as error:
print("Error while executing the DELETE:", error)
# Close the cursor and connection objects
cursor.close()
conn.close()
delete()
def menu():
operation = input("Enter: (C)reate (R)ead (U)pdate or (D)elete or (S)chema")
if operation.lower() == 'c':
create()
elif operation.lower() == 'r':
read()
elif operation.lower() == 'u':
update()
elif operation.lower() == 'd':
delete()
elif operation.lower() == 's':
schema()
elif len(operation)==0: # Escape Key
return
else:
print("Please enter c, r, u, d, or s")
menu() # recursion, repeat menu
try:
menu() # start menu
except:
print("Perform Jupyter 'Run All' prior to starting menu")
'''reference https://www.sqlitetutorial.net/sqlite-python/sqlite-python-select/'''
'''reference https://stackoverflow.com/questions/65401038/python-sqlite3-print-specific-data-only-from-row'''
'''reference https://www.sqlitetutorial.net/sqlite-select/'''
'''reference https://www.quackit.com/sqlite/tutorial/select_data.cfm'''
'''reference https://pynative.com/python-sqlite-select-from-table/'''
# !CODE SEGMENT 1
# def find():
# findCompany = input("Enter the Name of the Company You Want to Find")
# if findCompany.lower() == 'Apple':
# print()
# AppleResults = cursor.execute("PRAGMA table_info('phones')").fetch(_company)
# !CODE SEGMENT 2
# import sqlite3
# database = 'instance/sqlite.db' # this is location of database
# # Connect to the database file
# def GetData():
# conn= sqlite3.connect(database)
# cursor = conn.cursor()
# query = ("""SELECT _company FROM phones WHERE _company = 'Apple'""")
# cursor.execute(query)
# data = cursor.fetchmany()
# print(data)
# GetData()
# import sqlite3
# from sqlite3 import Error
# !CODE SEGMENT 3
# def create_connection(db_file):
# """ create a database connection to a SQLite database """
# conn = None
# try:
# conn = sqlite3.connect(db_file)
# print(sqlite3.version)
# except Error as e:
# print(e)
# finally:
# if conn:
# conn.close()
# if __name__ == '__main__':
# create_connection(r"///sqlite.db")
def find_by_company(self, company):
with app.app_context():
company = input("Input a company")
phone = Phone.query.filter_by(_company=company).first()
return phone # returns user object
Phone.read()