Add migrate function:w

This commit is contained in:
Mathieu Comandon 2013-12-11 23:36:23 +01:00
parent 9995481fb1
commit a3c6a90553
2 changed files with 51 additions and 15 deletions

View file

@ -27,7 +27,14 @@ def get_schema(tablename):
query = "pragma table_info('%s')" % tablename
with sql.db_cursor(PGA_DB) as cursor:
for row in cursor.execute(query).fetchall():
tables.append(row)
field = {
'name': row[1],
'type': row[2],
'not_null': row[3],
'default': row[4],
'indexed': row[5]
}
tables.append(field)
return tables
@ -46,9 +53,10 @@ def add_field(tablename, field):
cursor.execute(query)
def create_table(name, field_description):
fields = ", ".join([field_to_string(**f) for f in field_description])
query = "CREATE TABLE IF NOT EXISTS %s (%s)" % (name, fields)
def create_table(name, schema):
fields = ", ".join([field_to_string(**f) for f in schema])
fields = "(%s)" % fields
query = "CREATE TABLE IF NOT EXISTS %s %s" % (name, fields)
LOGGER.debug("[PGAQuery] %s", query)
with sql.db_cursor(PGA_DB) as cursor:
cursor.execute(query)
@ -67,6 +75,17 @@ def create_games(cursor):
cursor.execute(create_game_table_query)
def migrate(table, schema):
existing_schema = get_schema(table)
if existing_schema:
columns = [col['name'] for col in existing_schema]
for field in schema:
if field['name'] not in columns:
add_field(table, field)
else:
create_table(table, schema)
def create_sources(cursor):
create_sources_table_query = """CREATE TABLE IF NOT EXISTS sources (
id INTEGER PRIMARY KEY,

View file

@ -87,6 +87,7 @@ class TestMigration(DatabaseTester):
def setUp(self):
super(TestMigration, self).setUp()
pga.create()
self.tablename = "basetable"
self.schema = [
{
'name': 'id',
@ -100,15 +101,15 @@ class TestMigration(DatabaseTester):
]
def create_table(self):
pga.create_table('basetable', self.schema)
pga.create_table(self.tablename, self.schema)
def test_get_schema(self):
self.create_table()
schema = pga.get_schema('basetable')
self.assertEqual(schema[0][1], 'id')
self.assertEqual(schema[0][2], 'INTEGER')
self.assertEqual(schema[1][1], 'name')
self.assertEqual(schema[1][2], 'TEXT')
schema = pga.get_schema(self.tablename)
self.assertEqual(schema[0]['name'], 'id')
self.assertEqual(schema[0]['type'], 'INTEGER')
self.assertEqual(schema[1]['name'], 'name')
self.assertEqual(schema[1]['type'], 'TEXT')
def test_add_field(self):
self.create_table()
@ -116,10 +117,10 @@ class TestMigration(DatabaseTester):
'name': 'counter',
'type': 'INTEGER'
}
pga.add_field('basetable', field)
schema = pga.get_schema('basetable')
self.assertEqual(schema[2][1], 'counter')
self.assertEqual(schema[2][2], 'INTEGER')
pga.add_field(self.tablename, field)
schema = pga.get_schema(self.tablename)
self.assertEqual(schema[2]['name'], 'counter')
self.assertEqual(schema[2]['type'], 'INTEGER')
def test_cant_add_existing_field(self):
self.create_table()
@ -128,4 +129,20 @@ class TestMigration(DatabaseTester):
'type': 'TEXT'
}
with self.assertRaises(OperationalError):
pga.add_field('basetable', field)
pga.add_field(self.tablename, field)
def test_cant_create_empty_table(self):
with self.assertRaises(OperationalError):
pga.create_table('emptytable', [])
def test_can_know_if_table_exists(self):
self.create_table()
self.assertTrue(pga.get_schema(self.tablename))
self.assertFalse(pga.get_schema('notatable'))
def test_can_migrate(self):
self.create_table()
self.schema += [{'name': 'new_field', 'type': 'TEXT'}]
pga.migrate(self.tablename, self.schema)
schema = pga.get_schema(self.tablename)
self.assertEqual(schema[2]['name'], 'new_field')