services - application services

class DatabaseService[source]

Bases: ContextableService

Postgresql database transport service.

Initializes a connection pool and can provide connections and basic execution commands. It also can perform initial database, functions and tables initialization.

__init__(app, *, host: str, port: str, database: str, user: str, password: str, fallback_hosts: List[_FallbackHost] = None, root_user: str = '', root_password: str = '', root_database: str = 'postgres', metadata: MetaData = None, init_db: bool = True, init_tables: bool = True, pool_size: int = 10, idle_connection_lifetime: int = 3600, engine_settings: dict = None, extensions: List[str] = None, functions=SQL_FUNCTIONS, echo: bool = False, logger=None)[source]

Initialize.

Parameters:
  • app – web app

  • host – db url or address

  • port – db port

  • database – db name

  • user – db user (non-root)

  • password – db user password (non-root)

  • fallback_hosts – list of fallback host, port configurations

  • root_user – root user is required only for database and extensions initialization

  • root_password – root user is required only for database and extensions initialization

  • root_database – root db is required only for database and extensions initialization

  • metadata – optional SA metadata object

  • init_db – perform database and pg extensions initialization upon start (requires root credentials)

  • init_tables – initialize tables upon start (in not present)

  • pool_size – connection pool size

  • idle_connection_lifetime – connection idle lifetime before recycling

  • engine_settings – additional engine settings kw map

  • extensions – list of pg extensions to init (init_db flag should be enabled)

  • functions – optional function registry for pre-defined functions

  • echo – echo SQL requests in console

  • logger – optional logger

property engine

Get sqlalchemy async engine.

See sqlalchemy.AsyncEngine

begin(*args, **kws)[source]

Start an sqlalchemy transaction with auto-commit.

See sqlalchemy.AsyncEngine.begin

async with db.begin() as conn:
    ...  # transaction block
connect(*args, **kws)[source]

Start an sqlalchemy transaction with no auto-commit.

See sqlalchemy.AsyncEngine.connect

async with db.connect() as conn:
    ...  # transaction block
    await conn.commit()
async execute(_DatabaseService__obj, *args, _commit=True, _conn=None, **kws) Result[source]

Execute a single SQL command.

A wrapper around sqlalchemy.AsyncConnection.execute

This method will return a sqlalchemy.Result object so you can use result.all() or result.first() etc.

to retrieve results (these methods are synchronous) depending on what you need. These methods return namedtuple rows. To convert a namedtuple to a old-style dictionary use row._asdict() method.

async fetchrow(_DatabaseService__obj, *args, _commit=True, _conn=None, **kws) dict | None[source]

Execute an SQL command and fetch the first row.

A wrapper around sqlalchemy.AsyncConnection.executoe

Returns the first row as dict or None.

async fetch(_DatabaseService__obj, *args, _commit=True, _conn=None, **kws) List[dict][source]

Execute an SQL command and fetch all the results.

A wrapper around sqlalchemy.AsyncConnection.executoe

Returns a list of rows as dicts.

async fetchval(_DatabaseService__obj, *args, _commit=True, _conn=None, **kws) Any[source]

Execute an SQL command and fetch the first column of the first row.

A wrapper around sqlalchemy.AsyncConnection.execute

add_table(table: Table) Table[source]

Register a table in the database service metadata.

This method must be called before async init if you want to automatically create this table at start time. SQL services do this automatically for their tables.

class DatabaseMigrationService[source]

Bases: ContextableService

Simple migration tool.

This service does not resolve migration issues leaving them for the developer. You should create an appropriate SQL queries yourself before migration. The only thing what this service does is maintains the database state and performs listed migration instructions from a pre-configured list of migrations. The service uses db_info table to store its current state.

How to set up migrations:

  1. Add DatabaseMigrationService to your configuration file.

  2. Create DatabaseMigrationService.migrations_file which must be a json array of migration state objects.

  3. When a migration is required you should write a new migration state into a json array, see MigrationState for example. Note that id must be an incremental integer starting from 0.

Once the app has started, the migration service will automatically perform all the migrations.

__init__(app, *, database_service: DatabaseService = None, locks_service: Locks = None, migrations_file='./etc/migrations.json', migrate_on_start: bool = False, logger=None)[source]

Initialize.

Parameters:
  • app

  • database_service

  • locks_service

  • migrations_file – default migrations file

  • migrate_on_start – perform an automatic migration from the current state at async init

  • logger

async migrate(from_: int = None, to_: int = None, migrations_file: str = None) int[source]

Migrate DB from one state to another.

Parameters:
  • from – state to migrate from (by default the current state is used)

  • to – state to migrate to (by default the last state is used)

  • migrations_file – optional migrations file path

Returns:

current state id

async get_state() int[source]

Get current (supposed) database state.

class FixtureService[source]

Bases: ContextableService

Fixture service.

It can load data from json files using SQLService interface.

How to set up automatic fixtures:

  1. Add FixtureService to your app configuration.

  2. Create your sql service.

  3. Create a .json file inside FixtureService.root_dir location with the name matching your sql service

    public name, i.e. your_service.service_name.

  4. Write an array of json objects into the file with object attributes same as your service expects

    in m_create method.

Upon the app start the fixture service will automatically create data from fixture files if there is no data currently in respective tables.

__init__(app, root_dir: str = './fixtures/sql', fixtures: Collection[str] = None, empty_tables_only: bool = True, load_on_init: bool = True, logger=None)[source]

Initialize.

Parameters:
  • app – web app

  • root_dir – fixtures base dir

  • fixtures – list of fixtures (service names) to load, None - load all

  • empty_tables_only – load only when the service table is empty

  • load_on_init – load all fixtures on service init (when starting the app)

  • logger – optional logger instance

async load_all() None[source]

Load all fixtures from the root dir.

async load_fixture(path: Path) None[source]

Load a particular fixture.

Parameters:

path – fixture path, must contain a json array of objects

class SQLService[source]

Bases: Service, DataStore, Generic[_Id, _Row], ABC

Base SQL service interface with common commands and errors.

Optimized for a single primary key only with a name “id”

Example of use:

You need to set your own table and, if needed, define column whitelists for different operations.

class MyService(SQLService):
    table = my_table
    select_columns = {'id', 'data', 'flag'}
    insert_columns = {'data', 'flag'}
    update_columns = {'flag'}

Here you can access some basic methods: “exists”, “get”, “create”, “update”, “delete” and their bulk versions (for multiple objects at once).

You can change any of SQL base for these commands by redefining query constructors.

class MyService(SQLService):

    ...

    def _create_get_query(id, columns, table=None):
        ... # custom query

    def _create_m_get_query(self, id: list, columns, table=None):
        ... # custom query
Parameters:
  • app – web app instance

  • database_service – database service instance or instance name

  • logger – optional logger instance

DEFAULT_ROW_LIMIT = 24

defaults row limit on list / iter queries

MAX_ROW_LIMIT = 1000

max page size in list / iter queries

__init__(app, database_service: DatabaseService = None, logger=None)[source]

Initialize.

Parameters:
  • app – web app

  • database_service – database transport service

  • logger – optional logger instance

table = None

here should be your table

select_columns = None

you can specify a whitelist of output columns here

insert_columns = None

you may specify insert columns here

update_columns = None

you may specify columns for update here

static get_condition_hook(sql)[source]

Set up specific get conditions.

You can use this hook to impose permission check based on current user session, add some conditions to filter out data for public (rpc) output etc.

Example:

def get_condition_hook(self, sql):
    sql = sql.where(self.table.c.enabled.is_(True))
    return sql
static insert_condition_hook(sql)[source]

Set up specific insert conditions.

Similar to get_condition_hook().

static update_condition_hook(sql)[source]

Set up specific update conditions.

Similar to get_condition_hook().

static delete_condition_hook(sql)[source]

Set up specific delete conditions.

Similar to get_condition_hook().

async exists(id: _Id, _connection=None) bool[source]

Return True if object exists. False otherwise.

Parameters:

id – object id

If you have composite primary keys you should pass them as tuples:

await service.exists((1, 'abc'), ...)
async m_exists(id: Collection[_Id], _connection=None) frozenset[_Id][source]

Return a set of existing IDs for a list of IDs.

Parameters:

id – list of object ids

Returns:

a set of existing ids

Note

Be aware that there is database limit on max number of ids per query (65535 in postgres).

If you have composite primary keys you should pass them as tuples:

await service.m_exists([(1, 'abc'), ...])
async get(id: _Id, columns: Collection[str] | Literal['*'] | None = '*', _connection=None) _Row[source]

Return information about an object.

Parameters:
  • id – object id

  • columns – list of columns to return, None for no return, * for all

Raises:

NotFound

If you have composite primary keys you should pass them as tuples:

await service.get((1, 'abc'), ...)
async m_get(id: Collection[_Id] = None, conditions: dict | List[dict] | None = None, columns: Collection[str] | Literal['*'] | None = '*', _connection=None) List[_Row][source]

Return multiple objects.

Parameters:
  • id – object id

  • conditions – filtering conditions.

  • columns – list of columns to return, None for no return, * for all

Returns:

a list of objects found

Note

Be aware that there is a database limit on max number of ids per query (65535 in postgres).

Objects that don’t exist will be skipped. Returns all data at once without pagination. Use SQLService.list if you want pagination or sorting.

If you have composite primary keys you should pass them as tuples

await service.m_get([(1, 'abc'), ...], ...)
async delete(id: _Id, columns: Collection[str] | Literal['*'] | None = None, _connection=None) _Row | None[source]

Remove a single object from a table.

Parameters:
  • id – object id

  • columns – list of columns to return, None for no return, * for all

Returns:

a deleted object columns if requested

Raises:

NotFound – if object doesn’t exist or already was deleted

If you have composite primary keys you should pass them as tuples:

await service.delete((1, 'abc'), ...)
async m_delete(id: Collection[_Id] = None, conditions: dict | List[dict] | None = None, columns: Collection[str] | Literal['*'] | None = None, _connection=None) List[_Row] | None[source]

Remove multiple objects from a table. Non-existing objects will be skipped.

Parameters:
  • id – object ids

  • conditions – filtering conditions.

  • columns – list of columns to return, None for no return, * for all

Returns:

a list of deleted objects if requested

If you have composite primary keys you should pass them as tuples:

await service.m_delete([(1, 'abc'), ...])

You can also use conditions without primary keys if you provide an empty key list:

await service.m_delete([], conditions={'enabled': False})
static prepare_insert_data(data: dict) _Row[source]

Prepare data before insert.

Parameters:

data – raw data

async create(data: dict, columns: Collection[str] | Literal['*'] | None = '*', _connection=None, on_conflict=None, on_conflict_keys=None, on_conflict_values=None) _Row | None[source]

Create a single object.

Parameters:
  • data – objects data

  • columns – columns to return, None for no return, * for all

  • on_conflict – on conflict clause if required (‘do_nothing’, ‘do_update’)

  • on_conflict_keys – list of on conflict constraints

  • on_conflict_values – an object with on conflict values, used only by do_update clause

  • _connection – optional connection object (when using inside a transactional block)

Returns:

inserted object if requested

Customize prepare_insert_data() to normalize and prepare data before insert if you want more control over inserted data.

async m_create(data: Collection[dict], columns: Collection[str] | Literal['*'] | None = '*', _connection=None, on_conflict: str = None, on_conflict_keys: list = None, on_conflict_values: dict = None) List[_Row] | None[source]

Create multiple objects.

Parameters:
  • data – list of objects data

  • columns – columns to return, None for no return, * for all

  • on_conflict – on conflict clause if required (‘do_nothing’, ‘do_update’)

  • on_conflict_keys – list of on conflict constraints

  • on_conflict_values – an object with on conflict values, used only by do_update clause

  • _connection – optional connection object (when using inside a transactional block)

Returns:

inserted objects if requested

Customize prepare_insert_data() to normalize and prepare data before insert if you want more control over inserted data.

static prepare_update_data(data: dict)[source]

Prepare data before update.

Parameters:

data – raw data

async update(id: _Id, data: dict, columns: Collection[str] | Literal['*'] | None = '*', _connection=None) _Row | None[source]

Update a single object. Raises error if object doesn’t exist.

Parameters:
  • id – object id

  • data – data to update

  • columns – columns to return, None for no return, * for all

  • _connection – optional connection object (when using inside a transactional block)

Returns:

inserted object if requested

Raises:

NotFound

Customize prepare_update_data() to normalize and prepare data before update if you want more control over updated data.

Use tuples for composite keys:

await service.update((1, 'abc'), ...)
async m_update(id: Collection[_Id], data: dict, conditions: dict | List[dict] | None = None, columns: Collection[str] | Literal['*'] | None = '*', _connection=None) List[_Row] | None[source]

Update multiple objects with the same data. Non-existing objects will be skipped.

Parameters:
  • id – list of ids

  • data – update data

  • columns – columns to return, None for no return, * for all

  • conditions – update conditions

  • _connection – optional connection object (when using inside a transactional block)

Returns:

inserted objects if requested

Customize prepare_update_data() to normalize and prepare data before update if you want more control over updated data.

Use tuples for composite keys:

await service.m_update([(1, 'abc'), ...], ...)

You can also use conditions without primary keys if you provide an empty key list:

await service.m_update([], data={'value': 42}, conditions={'enabled': True})
async after_update_hook(*rows: List[_Row]) None[source]

Execute after update service-specific logic.

The method is called after each update or m_update query with the rows which has been updated. It is executed only if rows have been returned by the update method. The rows would contain only the columns specified by columns arg of the query.

One can use this method to implement cache or event/notification system for SQL services.

async after_create_hook(*rows: List[_Row]) None[source]

Execute after create service-specific logic.

The method is called after each create or m_create query with the rows which has been updated. It is executed only if rows have been returned by the create method. The rows would contain only the columns specified by columns arg of the query.

One can use this method to implement cache or event/notification system for SQL services.

async list(conditions: dict | List[dict] | None = None, sort: List[_SortDesc | _SortAsc | str] | None = None, offset: int = 0, limit: int = DEFAULT_ROW_LIMIT, count: bool = True, columns: Collection[str] | Literal['*'] | None = '*', _connection=None) _List[source]

List rows with pagination and conditions.

Parameters:
  • conditions – optional query conditions

  • sort – optional row ordering

  • offset – optional row offset

  • limit – optional row limit

  • count – calculate page count

  • columns – columns to return, None for nothing (count only), * for all columns

  • _connection – optional connection object (when using inside a transactional block)

Returns:

This method may return different data depending on the provided params

Condition example:

service.list(
    conditions={
        'tag': ['tag_1', 'tag_2', 'tag_3'],  # IN condition
        'active': True,                      # EQ condition
        'value': {'gt': 41, 'le': 42'},      # num conditions,
        'text': {'like': 'sht'},             # text field "likeness"
    }
)

Available numeric conditions: gt, lt, ge, le, eq Available other conditions: like

Sort example:

service.list(
    sort=['tag', {'desc': 'timestamp'}]     # order matters
)

Available sorting conditions: desc, asc (default)

You can use this method for counting without returning any results. Just set the limit to zero. Optionally, you can also set the counting precision.

service.list(
    conditions={ ... },
    precision=50
    limit=0
)

Contrary, if you don’t need counting, you can disable it. No count / page data will be available then.

service.list(
    conditions={ ... },
    count=False
)

Precision uses a number of table samples to estimate the count. If the precision is set to 0 or None, then the exact count will be performed.

Attention

Precision is not working at the moment (don’t know why).

If count argument is False, then count, page and pages result values will be None.

If columns is None, then data will be None and on_page will be zero.

{
    count: Optional[int]          #: total rows matching the query, None if count hasn't been requested
    offset: int                   #: row offset for this selection
    page: Optional[int]           #: current page number, None if count hasn't been requested
    pages: Optional[int]          #: total pages, None if count hasn't been requested
    on_page: int                  #: number of rows on this page
    data: Optional[List[dict]]    #: returned rows, None if limit was set to 0
}
async iter(conditions: dict | List[dict] | None = None, sort: List[_SortDesc | _SortAsc | str] | None = None, offset: int = 0, limit: int = DEFAULT_ROW_LIMIT, columns: Collection[str] | Literal['*'] | None = '*') AsyncGenerator[List[_Row], None][source]

Iterate over listed data.

Parameters:
  • conditions – optional query conditions

  • sort – optional row ordering

  • offset – optional row offset

  • limit – optional row limit

  • columns – columns to return, * for all columns

Returns:

This method may return different data depending on the provided params

Same as list() except it provides an async generator over all selected data. At each iteration a batch with max size of limit is returned. You can use this method to asynchronously iterate over selected table data in chunks.

async for batch in sql_service.iter(conditions={'enabled': True}, sort=['created'], limit=100):
    for row in batch:
        ... # batches of max 100 rows will be returned
# the generator will exit after when last batch of data has been returned
service_name = None

you may define a custom service name here

class PermHook[source]

Bases: PublicInterface, ABC

Sql service with a pre-configured user permissions hook.

it uses ‘user_id’ column by default to check if objects can be edited or viewed by users. You should modify _set_user_condition() if you intend to use different columns.

class PermissionKeys

Bases: object

Permission scopes.

get_request_context() RequestContext | None

Get current user request context.

get_session()

Get current user session.

get_user_id()

Return current session user id.

has_permission(permission: str) bool

Check if a user session has a particular permission.

system_user() bool

Check if user session has the system scope.

class Permission[source]

Bases: Enum

User permissions.

These keys will be joined with <service_name> to create unique permission keys for each service.

property permission_modify: str

Get a modify permission key.

property permission_view: str

Get a view permission key.

update_condition_hook(sql)[source]

Place user condition on update operations.

delete_condition_hook(sql)[source]

Place user condition on delete operations.

get_condition_hook(sql)[source]

Place user condition on get and list operations.