services - application services¶
- class DatabaseService[source]¶
Bases:
ContextableServicePostgresql database transport service.
Initializes a connection pool and can provide connections and basic execution commands. It also can perform initial database, functions and tables initialization.
- __init__(app, *, host: str, port: str, database: str, user: str, password: str, fallback_hosts: List[_FallbackHost] = None, root_user: str = '', root_password: str = '', root_database: str = 'postgres', metadata: MetaData = None, init_db: bool = True, init_tables: bool = True, pool_size: int = 10, idle_connection_lifetime: int = 3600, engine_settings: dict = None, extensions: List[str] = None, functions=SQL_FUNCTIONS, echo: bool = False, logger=None)[source]¶
Initialize.
- Parameters:
app – web app
host – db url or address
port – db port
database – db name
user – db user (non-root)
password – db user password (non-root)
fallback_hosts – list of fallback host, port configurations
root_user – root user is required only for database and extensions initialization
root_password – root user is required only for database and extensions initialization
root_database – root db is required only for database and extensions initialization
metadata – optional SA metadata object
init_db – perform database and pg extensions initialization upon start (requires root credentials)
init_tables – initialize tables upon start (in not present)
pool_size – connection pool size
idle_connection_lifetime – connection idle lifetime before recycling
engine_settings – additional engine settings kw map
extensions – list of pg extensions to init (init_db flag should be enabled)
functions – optional function registry for pre-defined functions
echo – echo SQL requests in console
logger – optional logger
- property engine¶
Get sqlalchemy async engine.
- begin(*args, **kws)[source]¶
Start an sqlalchemy transaction with auto-commit.
See sqlalchemy.AsyncEngine.begin
async with db.begin() as conn: ... # transaction block
- connect(*args, **kws)[source]¶
Start an sqlalchemy transaction with no auto-commit.
See sqlalchemy.AsyncEngine.connect
async with db.connect() as conn: ... # transaction block await conn.commit()
- async execute(_DatabaseService__obj, *args, _commit=True, _conn=None, **kws) Result[source]¶
Execute a single SQL command.
A wrapper around sqlalchemy.AsyncConnection.execute
- This method will return a sqlalchemy.Result object so you can use result.all() or result.first() etc.
to retrieve results (these methods are synchronous) depending on what you need. These methods return namedtuple rows. To convert a namedtuple to a old-style dictionary use row._asdict() method.
- async fetchrow(_DatabaseService__obj, *args, _commit=True, _conn=None, **kws) dict | None[source]¶
Execute an SQL command and fetch the first row.
A wrapper around sqlalchemy.AsyncConnection.executoe
Returns the first row as dict or None.
- async fetch(_DatabaseService__obj, *args, _commit=True, _conn=None, **kws) List[dict][source]¶
Execute an SQL command and fetch all the results.
A wrapper around sqlalchemy.AsyncConnection.executoe
Returns a list of rows as dicts.
- async fetchval(_DatabaseService__obj, *args, _commit=True, _conn=None, **kws) Any[source]¶
Execute an SQL command and fetch the first column of the first row.
A wrapper around sqlalchemy.AsyncConnection.execute
- class DatabaseMigrationService[source]¶
Bases:
ContextableServiceSimple migration tool.
This service does not resolve migration issues leaving them for the developer. You should create an appropriate SQL queries yourself before migration. The only thing what this service does is maintains the database state and performs listed migration instructions from a pre-configured list of migrations. The service uses db_info table to store its current state.
How to set up migrations:
Add DatabaseMigrationService to your configuration file.
Create DatabaseMigrationService.migrations_file which must be a json array of migration state objects.
When a migration is required you should write a new migration state into a json array, see
MigrationStatefor example. Note that id must be an incremental integer starting from 0.
Once the app has started, the migration service will automatically perform all the migrations.
- __init__(app, *, database_service: DatabaseService = None, locks_service: Locks = None, migrations_file='./etc/migrations.json', migrate_on_start: bool = False, logger=None)[source]¶
Initialize.
- Parameters:
app –
database_service –
locks_service –
migrations_file – default migrations file
migrate_on_start – perform an automatic migration from the current state at async init
logger –
- async migrate(from_: int = None, to_: int = None, migrations_file: str = None) int[source]¶
Migrate DB from one state to another.
- Parameters:
from – state to migrate from (by default the current state is used)
to – state to migrate to (by default the last state is used)
migrations_file – optional migrations file path
- Returns:
current state id
- class FixtureService[source]¶
Bases:
ContextableServiceFixture service.
It can load data from json files using SQLService interface.
How to set up automatic fixtures:
Add FixtureService to your app configuration.
Create your sql service.
- Create a .json file inside FixtureService.root_dir location with the name matching your sql service
public name, i.e. your_service.service_name.
- Write an array of json objects into the file with object attributes same as your service expects
in m_create method.
Upon the app start the fixture service will automatically create data from fixture files if there is no data currently in respective tables.
- __init__(app, root_dir: str = './fixtures/sql', fixtures: Collection[str] = None, empty_tables_only: bool = True, load_on_init: bool = True, logger=None)[source]¶
Initialize.
- Parameters:
app – web app
root_dir – fixtures base dir
fixtures – list of fixtures (service names) to load, None - load all
empty_tables_only – load only when the service table is empty
load_on_init – load all fixtures on service init (when starting the app)
logger – optional logger instance
- class SQLService[source]¶
Bases:
Service,DataStore,Generic[_Id,_Row],ABCBase SQL service interface with common commands and errors.
Optimized for a single primary key only with a name “id”
Example of use:
You need to set your own table and, if needed, define column whitelists for different operations.
class MyService(SQLService): table = my_table select_columns = {'id', 'data', 'flag'} insert_columns = {'data', 'flag'} update_columns = {'flag'}
Here you can access some basic methods: “exists”, “get”, “create”, “update”, “delete” and their bulk versions (for multiple objects at once).
You can change any of SQL base for these commands by redefining query constructors.
class MyService(SQLService): ... def _create_get_query(id, columns, table=None): ... # custom query def _create_m_get_query(self, id: list, columns, table=None): ... # custom query
- Parameters:
app – web app instance
database_service – database service instance or instance name
logger – optional logger instance
- DEFAULT_ROW_LIMIT = 24¶
defaults row limit on list / iter queries
- MAX_ROW_LIMIT = 1000¶
max page size in list / iter queries
- __init__(app, database_service: DatabaseService = None, logger=None)[source]¶
Initialize.
- Parameters:
app – web app
database_service – database transport service
logger – optional logger instance
- table = None¶
here should be your table
- select_columns = None¶
you can specify a whitelist of output columns here
- insert_columns = None¶
you may specify insert columns here
- update_columns = None¶
you may specify columns for update here
- static get_condition_hook(sql)[source]¶
Set up specific get conditions.
You can use this hook to impose permission check based on current user session, add some conditions to filter out data for public (rpc) output etc.
Example:
def get_condition_hook(self, sql): sql = sql.where(self.table.c.enabled.is_(True)) return sql
- static insert_condition_hook(sql)[source]¶
Set up specific insert conditions.
Similar to
get_condition_hook().
- static update_condition_hook(sql)[source]¶
Set up specific update conditions.
Similar to
get_condition_hook().
- static delete_condition_hook(sql)[source]¶
Set up specific delete conditions.
Similar to
get_condition_hook().
- async exists(id: _Id, _connection=None) bool[source]¶
Return True if object exists. False otherwise.
- Parameters:
id – object id
If you have composite primary keys you should pass them as tuples:
await service.exists((1, 'abc'), ...)
- async m_exists(id: Collection[_Id], _connection=None) frozenset[_Id][source]¶
Return a set of existing IDs for a list of IDs.
- Parameters:
id – list of object ids
- Returns:
a set of existing ids
Note
Be aware that there is database limit on max number of ids per query (65535 in postgres).
If you have composite primary keys you should pass them as tuples:
await service.m_exists([(1, 'abc'), ...])
- async get(id: _Id, columns: Collection[str] | Literal['*'] | None = '*', _connection=None) _Row[source]¶
Return information about an object.
- Parameters:
id – object id
columns – list of columns to return, None for no return, * for all
- Raises:
NotFound –
If you have composite primary keys you should pass them as tuples:
await service.get((1, 'abc'), ...)
- async m_get(id: Collection[_Id] = None, conditions: dict | List[dict] | None = None, columns: Collection[str] | Literal['*'] | None = '*', _connection=None) List[_Row][source]¶
Return multiple objects.
- Parameters:
id – object id
conditions – filtering conditions.
columns – list of columns to return, None for no return, * for all
- Returns:
a list of objects found
Note
Be aware that there is a database limit on max number of ids per query (65535 in postgres).
Objects that don’t exist will be skipped. Returns all data at once without pagination. Use SQLService.list if you want pagination or sorting.
If you have composite primary keys you should pass them as tuples
await service.m_get([(1, 'abc'), ...], ...)
- async delete(id: _Id, columns: Collection[str] | Literal['*'] | None = None, _connection=None) _Row | None[source]¶
Remove a single object from a table.
- Parameters:
id – object id
columns – list of columns to return, None for no return, * for all
- Returns:
a deleted object columns if requested
- Raises:
NotFound – if object doesn’t exist or already was deleted
If you have composite primary keys you should pass them as tuples:
await service.delete((1, 'abc'), ...)
- async m_delete(id: Collection[_Id] = None, conditions: dict | List[dict] | None = None, columns: Collection[str] | Literal['*'] | None = None, _connection=None) List[_Row] | None[source]¶
Remove multiple objects from a table. Non-existing objects will be skipped.
- Parameters:
id – object ids
conditions – filtering conditions.
columns – list of columns to return, None for no return, * for all
- Returns:
a list of deleted objects if requested
If you have composite primary keys you should pass them as tuples:
await service.m_delete([(1, 'abc'), ...])
You can also use conditions without primary keys if you provide an empty key list:
await service.m_delete([], conditions={'enabled': False})
- static prepare_insert_data(data: dict) _Row[source]¶
Prepare data before insert.
- Parameters:
data – raw data
- async create(data: dict, columns: Collection[str] | Literal['*'] | None = '*', _connection=None, on_conflict=None, on_conflict_keys=None, on_conflict_values=None) _Row | None[source]¶
Create a single object.
- Parameters:
data – objects data
columns – columns to return, None for no return, * for all
on_conflict – on conflict clause if required (‘do_nothing’, ‘do_update’)
on_conflict_keys – list of on conflict constraints
on_conflict_values – an object with on conflict values, used only by do_update clause
_connection – optional connection object (when using inside a transactional block)
- Returns:
inserted object if requested
Customize
prepare_insert_data()to normalize and prepare data before insert if you want more control over inserted data.
- async m_create(data: Collection[dict], columns: Collection[str] | Literal['*'] | None = '*', _connection=None, on_conflict: str = None, on_conflict_keys: list = None, on_conflict_values: dict = None) List[_Row] | None[source]¶
Create multiple objects.
- Parameters:
data – list of objects data
columns – columns to return, None for no return, * for all
on_conflict – on conflict clause if required (‘do_nothing’, ‘do_update’)
on_conflict_keys – list of on conflict constraints
on_conflict_values – an object with on conflict values, used only by do_update clause
_connection – optional connection object (when using inside a transactional block)
- Returns:
inserted objects if requested
Customize
prepare_insert_data()to normalize and prepare data before insert if you want more control over inserted data.
- static prepare_update_data(data: dict)[source]¶
Prepare data before update.
- Parameters:
data – raw data
- async update(id: _Id, data: dict, columns: Collection[str] | Literal['*'] | None = '*', _connection=None) _Row | None[source]¶
Update a single object. Raises error if object doesn’t exist.
- Parameters:
id – object id
data – data to update
columns – columns to return, None for no return, * for all
_connection – optional connection object (when using inside a transactional block)
- Returns:
inserted object if requested
- Raises:
NotFound –
Customize
prepare_update_data()to normalize and prepare data before update if you want more control over updated data.Use tuples for composite keys:
await service.update((1, 'abc'), ...)
- async m_update(id: Collection[_Id], data: dict, conditions: dict | List[dict] | None = None, columns: Collection[str] | Literal['*'] | None = '*', _connection=None) List[_Row] | None[source]¶
Update multiple objects with the same data. Non-existing objects will be skipped.
- Parameters:
id – list of ids
data – update data
columns – columns to return, None for no return, * for all
conditions – update conditions
_connection – optional connection object (when using inside a transactional block)
- Returns:
inserted objects if requested
Customize
prepare_update_data()to normalize and prepare data before update if you want more control over updated data.Use tuples for composite keys:
await service.m_update([(1, 'abc'), ...], ...)
You can also use conditions without primary keys if you provide an empty key list:
await service.m_update([], data={'value': 42}, conditions={'enabled': True})
- async after_update_hook(*rows: List[_Row]) None[source]¶
Execute after update service-specific logic.
The method is called after each update or m_update query with the rows which has been updated. It is executed only if rows have been returned by the update method. The rows would contain only the columns specified by columns arg of the query.
One can use this method to implement cache or event/notification system for SQL services.
- async after_create_hook(*rows: List[_Row]) None[source]¶
Execute after create service-specific logic.
The method is called after each create or m_create query with the rows which has been updated. It is executed only if rows have been returned by the create method. The rows would contain only the columns specified by columns arg of the query.
One can use this method to implement cache or event/notification system for SQL services.
- async list(conditions: dict | List[dict] | None = None, sort: List[_SortDesc | _SortAsc | str] | None = None, offset: int = 0, limit: int = DEFAULT_ROW_LIMIT, count: bool = True, columns: Collection[str] | Literal['*'] | None = '*', _connection=None) _List[source]¶
List rows with pagination and conditions.
- Parameters:
conditions – optional query conditions
sort – optional row ordering
offset – optional row offset
limit – optional row limit
count – calculate page count
columns – columns to return, None for nothing (count only), * for all columns
_connection – optional connection object (when using inside a transactional block)
- Returns:
This method may return different data depending on the provided params
Condition example:
service.list( conditions={ 'tag': ['tag_1', 'tag_2', 'tag_3'], # IN condition 'active': True, # EQ condition 'value': {'gt': 41, 'le': 42'}, # num conditions, 'text': {'like': 'sht'}, # text field "likeness" } )
Available numeric conditions: gt, lt, ge, le, eq Available other conditions: like
Sort example:
service.list( sort=['tag', {'desc': 'timestamp'}] # order matters )
Available sorting conditions: desc, asc (default)
You can use this method for counting without returning any results. Just set the limit to zero. Optionally, you can also set the counting precision.
service.list( conditions={ ... }, precision=50 limit=0 )
Contrary, if you don’t need counting, you can disable it. No count / page data will be available then.
service.list( conditions={ ... }, count=False )
Precision uses a number of table samples to estimate the count. If the precision is set to 0 or None, then the exact count will be performed.
Attention
Precision is not working at the moment (don’t know why).
If count argument is False, then count, page and pages result values will be None.
If columns is None, then data will be None and on_page will be zero.
{ count: Optional[int] #: total rows matching the query, None if count hasn't been requested offset: int #: row offset for this selection page: Optional[int] #: current page number, None if count hasn't been requested pages: Optional[int] #: total pages, None if count hasn't been requested on_page: int #: number of rows on this page data: Optional[List[dict]] #: returned rows, None if limit was set to 0 }
- async iter(conditions: dict | List[dict] | None = None, sort: List[_SortDesc | _SortAsc | str] | None = None, offset: int = 0, limit: int = DEFAULT_ROW_LIMIT, columns: Collection[str] | Literal['*'] | None = '*') AsyncGenerator[List[_Row], None][source]¶
Iterate over listed data.
- Parameters:
conditions – optional query conditions
sort – optional row ordering
offset – optional row offset
limit – optional row limit
columns – columns to return, * for all columns
- Returns:
This method may return different data depending on the provided params
Same as
list()except it provides an async generator over all selected data. At each iteration a batch with max size of limit is returned. You can use this method to asynchronously iterate over selected table data in chunks.async for batch in sql_service.iter(conditions={'enabled': True}, sort=['created'], limit=100): for row in batch: ... # batches of max 100 rows will be returned # the generator will exit after when last batch of data has been returned
- service_name = None¶
you may define a custom service name here
- class PermHook[source]¶
Bases:
PublicInterface,ABCSql service with a pre-configured user permissions hook.
it uses ‘user_id’ column by default to check if objects can be edited or viewed by users. You should modify _set_user_condition() if you intend to use different columns.
- class PermissionKeys¶
Bases:
objectPermission scopes.
- get_request_context() RequestContext | None¶
Get current user request context.
- get_session()¶
Get current user session.
- get_user_id()¶
Return current session user id.
- has_permission(permission: str) bool¶
Check if a user session has a particular permission.
- system_user() bool¶
Check if user session has the system scope.
- class Permission[source]¶
Bases:
EnumUser permissions.
These keys will be joined with <service_name> to create unique permission keys for each service.
- property permission_modify: str¶
Get a modify permission key.
- property permission_view: str¶
Get a view permission key.