I'm looking for an easy python way to compare column types from SQLAlchemy to base types. For example, if my column type is a VARCHAR of any length, I want to read it as a string.
I can read the column type okay, but I'm not sure an easy way to verify it's basic type... it would be nice if I could use something like "if isinstance(mycolumn, int)" - but I'm new to python and not sure how this would work.
Here's what I have so far:
from sqlalchemy import MetaData
from sqlalchemy import create_engine, Column, Table
engine = create_engine('mysql+mysqldb://user:pass@localhost:3306/mydb', pool_recycle=3600)
meta = MetaData()
meta.bind = engine
meta.reflect()
datatable = meta.tables['my_data_table']
[c.type for c in datatable.columns]
Output:
[INTEGER(display_width=11), DATE(), VARCHAR(length=127), DOUBLE(precision=None, scale=None, asdecimal=True)]
My end purpose is twofold, first because I want to format the output based on the type when I load it into my jQuery jqGrid. The second, is I'm slowly converting non-normalized data tables into a normalized structure, and want to ensure that I keep my types consistent - (to make sure my numbers in the previous table are stored as numbers and not strings...)
SQLAlchemy considers the database to be a relational algebra engine, not just a collection of tables. Rows can be selected from not only tables but also joins and other select statements; any of these units can be composed into a larger structure. SQLAlchemy's expression language builds on this concept from its core.
PickleType. Holds Python objects, which are serialized using pickle.
SQLAlchemy is the ORM of choice for working with relational databases in python. The reason why SQLAlchemy is so popular is because it is very simple to implement, helps you develop your code quicker and doesn't require knowledge of SQL to get started.
SQLAlchemy is a library that facilitates the communication between Python programs and databases. Most of the times, this library is used as an Object Relational Mapper (ORM) tool that translates Python classes to tables on relational databases and automatically converts function calls to SQL statements.
Just use the python_type
attribute available in all AQLAlchemy types:
[c.type.python_type for c in datatable.columns]
I struggled with the problem of creating SQL tables on-the-fly with default sql-types. I ended up with the following handy functions for all my a python type to a sql-type conversion needs. To go from sql-type to python type is trivial as will be explained in the next section.
import sqlalchemy
import numpy as np
import datetime
import decimal
_type_py2sql_dict = {
int: sqlalchemy.sql.sqltypes.BigInteger,
str: sqlalchemy.sql.sqltypes.Unicode,
float: sqlalchemy.sql.sqltypes.Float,
decimal.Decimal: sqlalchemy.sql.sqltypes.Numeric,
datetime.datetime: sqlalchemy.sql.sqltypes.DateTime,
bytes: sqlalchemy.sql.sqltypes.LargeBinary,
bool: sqlalchemy.sql.sqltypes.Boolean,
datetime.date: sqlalchemy.sql.sqltypes.Date,
datetime.time: sqlalchemy.sql.sqltypes.Time,
datetime.timedelta: sqlalchemy.sql.sqltypes.Interval,
list: sqlalchemy.sql.sqltypes.ARRAY,
dict: sqlalchemy.sql.sqltypes.JSON
}
def type_py2sql(pytype):
'''Return the closest sql type for a given python type'''
if pytype in _type_py2sql_dict:
return _type_py2sql_dict[pytype]
else:
raise NotImplementedError(
"You may add custom `sqltype` to `"+str(pytype)+"` assignment in `_type_py2sql_dict`.")
def type_np2py(dtype=None, arr=None):
'''Return the closest python type for a given numpy dtype'''
if ((dtype is None and arr is None) or
(dtype is not None and arr is not None)):
raise ValueError(
"Provide either keyword argument `dtype` or `arr`: a numpy dtype or a numpy array.")
if dtype is None:
dtype = arr.dtype
#1) Make a single-entry numpy array of the same dtype
#2) force the array into a python 'object' dtype
#3) the array entry should now be the closest python type
single_entry = np.empty([1], dtype=dtype).astype(object)
return type(single_entry[0])
def type_np2sql(dtype=None, arr=None):
'''Return the closest sql type for a given numpy dtype'''
return type_py2sql(type_np2py(dtype=dtype, arr=arr))
Some usecases:
>>> sqlalchemy.Column(type_py2sql(int))
Column(None, BigInteger(), table=None)
>>> type_py2sql(type('hello'))
sqlalchemy.sql.sqltypes.Unicode
>>> type_np2sql(arr=np.array([1.,2.,3.]))
sqlalchemy.sql.sqltypes.Float
What I did was to map all the sql-types to their equivalent python types. I then printed which python type corresponds to which sql-types and picked the best sql-type for each python type. Here is the code I used to generate this mapping:
#********** SQL to Python: one to one **********
type_sql2py_dict = {}
for key in sqlalchemy.types.__dict__['__all__']:
sqltype = getattr(sqlalchemy.types, key)
if 'python_type' in dir(sqltype) and not sqltype.__name__.startswith('Type'):
try:
typeinst = sqltype()
except TypeError as e: #List/array wants inner-type
typeinst = sqltype(None)
try:
type_sql2py_dict[sqltype] = typeinst.python_type
except NotImplementedError:
pass
#********** Python to SQL: one to many **********
type_py2sql_dict = {}
for key, val in type_sql2py_dict.items():
if not val in type_py2sql_dict:
type_py2sql_dict[val] = [key]
else:
type_py2sql_dict[val].append(key)
And here is the output of type_py2sql_dict
under sqlalchemy version 1.3.5:
{int: [sqlalchemy.sql.sqltypes.INTEGER,
sqlalchemy.sql.sqltypes.BIGINT,
sqlalchemy.sql.sqltypes.SMALLINT,
sqlalchemy.sql.sqltypes.Integer,
sqlalchemy.sql.sqltypes.SmallInteger,
sqlalchemy.sql.sqltypes.BigInteger],
str: [sqlalchemy.sql.sqltypes.CHAR,
sqlalchemy.sql.sqltypes.VARCHAR,
sqlalchemy.sql.sqltypes.NCHAR,
sqlalchemy.sql.sqltypes.NVARCHAR,
sqlalchemy.sql.sqltypes.TEXT,
sqlalchemy.sql.sqltypes.Text,
sqlalchemy.sql.sqltypes.CLOB,
sqlalchemy.sql.sqltypes.String,
sqlalchemy.sql.sqltypes.Unicode,
sqlalchemy.sql.sqltypes.UnicodeText,
sqlalchemy.sql.sqltypes.Enum],
float: [sqlalchemy.sql.sqltypes.FLOAT,
sqlalchemy.sql.sqltypes.REAL,
sqlalchemy.sql.sqltypes.Float],
decimal.Decimal: [sqlalchemy.sql.sqltypes.NUMERIC,
sqlalchemy.sql.sqltypes.DECIMAL,
sqlalchemy.sql.sqltypes.Numeric],
datetime.datetime: [sqlalchemy.sql.sqltypes.TIMESTAMP,
sqlalchemy.sql.sqltypes.DATETIME,
sqlalchemy.sql.sqltypes.DateTime],
bytes: [sqlalchemy.sql.sqltypes.BLOB,
sqlalchemy.sql.sqltypes.BINARY,
sqlalchemy.sql.sqltypes.VARBINARY,
sqlalchemy.sql.sqltypes.LargeBinary,
sqlalchemy.sql.sqltypes.Binary],
bool: [sqlalchemy.sql.sqltypes.BOOLEAN, sqlalchemy.sql.sqltypes.Boolean],
datetime.date: [sqlalchemy.sql.sqltypes.DATE, sqlalchemy.sql.sqltypes.Date],
datetime.time: [sqlalchemy.sql.sqltypes.TIME, sqlalchemy.sql.sqltypes.Time],
datetime.timedelta: [sqlalchemy.sql.sqltypes.Interval],
list: [sqlalchemy.sql.sqltypes.ARRAY],
dict: [sqlalchemy.sql.sqltypes.JSON]}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With