MongoTransformer (BaseTransformer)

Transformer for MongoDB backend. Parses lark tree into a dictionary to be passed to pymongo/mongomock. Uses post-processing functions to handle aliasing and some specific edge-cases for MongoDB.

constant_first_comparison(self, arg)

constant_first_comparison: constant OPERATOR ( non_string_value | not_implemented_string )

def constant_first_comparison(self, arg):
    # constant_first_comparison: constant OPERATOR ( non_string_value | not_implemented_string )
    return {arg[2]: {self.reversed_operator_map[self.operator_map[arg[1]]]: arg[0]}}

expression(self, arg)

expression: expression_clause ( OR expression_clause )

def expression(self, arg):
    # expression: expression_clause ( OR expression_clause )
    # expression with and without 'OR'
    return {"$or": arg} if len(arg) > 1 else arg[0]

expression_clause(self, arg)

expression_clause: expression_phrase ( AND expression_phrase )*

def expression_clause(self, arg):
    # expression_clause: expression_phrase ( AND expression_phrase )*
    # expression_clause with and without 'AND'
    return {"$and": arg} if len(arg) > 1 else arg[0]

expression_phrase(self, arg)

expression_phrase: [ NOT ] ( comparison | "(" expression ")" )

def expression_phrase(self, arg):
    # expression_phrase: [ NOT ] ( comparison | "(" expression ")" )
    return self._recursive_expression_phrase(arg)

fuzzy_string_op_rhs(self, arg)

fuzzy_string_op_rhs: CONTAINS value | STARTS [ WITH ] value | ENDS [ WITH ] value

def fuzzy_string_op_rhs(self, arg):
    # fuzzy_string_op_rhs: CONTAINS value | STARTS [ WITH ] value | ENDS [ WITH ] value

    # The WITH keyword may be omitted.
    if isinstance(arg[1], Token) and arg[1].type == "WITH":
        pattern = arg[2]
        pattern = arg[1]

    if arg[0] == "CONTAINS":
        regex = f"{pattern}"
    elif arg[0] == "STARTS":
        regex = f"^{pattern}"
    elif arg[0] == "ENDS":
        regex = f"{pattern}$"
    return {"$regex": regex}

known_op_rhs(self, arg)

known_op_rhs: IS ( KNOWN | UNKNOWN )

def known_op_rhs(self, arg):
    # known_op_rhs: IS ( KNOWN | UNKNOWN )
    # The OPTIMADE spec also required a type comparison with null, this must be post-processed
    # so here we use a special key "#known" which will get replaced in post-processing with the
    # expanded dict
    return {"#known": arg[1] == "KNOWN"}

length_op_rhs(self, arg)

length_op_rhs: LENGTH [ OPERATOR ] value

def length_op_rhs(self, arg):
    # length_op_rhs: LENGTH [ OPERATOR ] value
    if len(arg) == 2 or (len(arg) == 3 and arg[1] == "="):
        return {"$size": arg[-1]}

    elif arg[1] in self.operator_map and arg[1] != "!=":
        # create an invalid query that needs to be post-processed
        # e.g. {'$size': {'$gt': 2}}, which is not allowed by Mongo.
        return {"$size": {self.operator_map[arg[1]]: arg[-1]}}

    raise NotImplementedError(
        f"Operator {arg[1]} not implemented for LENGTH filter."

postprocess(self, query)

Used to post-process the final parsed query.

def postprocess(self, query):
    """ Used to post-process the final parsed query. """
    if self.mapper:
        # important to apply length alias before normal aliases
        query = self._apply_length_aliases(query)
        query = self._apply_aliases(query)

    query = self._apply_relationship_filtering(query)
    query = self._apply_length_operators(query)
    query = self._apply_unknown_or_null_filter(query)
    query = self._apply_mongo_id_filter(query)
    query = self._apply_mongo_date_filter(query)

    return query

property_first_comparison(self, arg)

property_first_comparison: property ( value_op_rhs | known_op_rhs | fuzzy_string_op_rhs | set_op_rhs | set_zip_op_rhs | length_op_rhs )

def property_first_comparison(self, arg):
    # property_first_comparison: property ( value_op_rhs | known_op_rhs | fuzzy_string_op_rhs | set_op_rhs |
    # set_zip_op_rhs | length_op_rhs )
    return {arg[0]: arg[1]}

property_zip_addon(self, arg)

property_zip_addon: ":" property (":" property)*

def property_zip_addon(self, arg):
    # property_zip_addon: ":" property (":" property)*
    raise NotImplementedError

set_op_rhs(self, arg)

set_op_rhs: HAS ( [ OPERATOR ] value | ALL value_list | ANY value_list | ONLY value_list )

def set_op_rhs(self, arg):
    # set_op_rhs: HAS ( [ OPERATOR ] value | ALL value_list | ANY value_list | ONLY value_list )

    if len(arg) == 2:
        # only value without OPERATOR
        return {"$in": arg[1:]}

    if arg[1] == "ALL":
        return {"$all": arg[2]}

    if arg[1] == "ANY":
        return {"$in": arg[2]}

    if arg[1] == "ONLY":
        return {"$all": arg[2], "$size": len(arg[2])}

    # value with OPERATOR
    raise NotImplementedError(
        f"set_op_rhs not implemented for use with OPERATOR. Given: {arg}"

set_zip_op_rhs(self, arg)

set_zip_op_rhs: property_zip_addon HAS ( value_zip | ONLY value_zip_list | ALL value_zip_list | ANY value_zip_list )

def set_zip_op_rhs(self, arg):
    # set_zip_op_rhs: property_zip_addon HAS ( value_zip | ONLY value_zip_list | ALL value_zip_list |
    # ANY value_zip_list )
    raise NotImplementedError

value_list(self, arg)

value_list: [ OPERATOR ] value ( "," [ OPERATOR ] value )*

def value_list(self, arg):
    # value_list: [ OPERATOR ] value ( "," [ OPERATOR ] value )*
    # NOTE: no support for optional OPERATOR, yet, so this takes the
    # parsed values and returns an error if that is being attempted
    for value in arg:
        if str(value) in self.operator_map.keys():
            raise NotImplementedError(
                f"OPERATOR {value} inside value_list {arg} not implemented."

    return arg

value_zip(self, arg)

value_zip: [ OPERATOR ] value ":" [ OPERATOR ] value (":" [ OPERATOR ] value)*

def value_zip(self, arg):
    # value_zip: [ OPERATOR ] value ":" [ OPERATOR ] value (":" [ OPERATOR ] value)*
    raise NotImplementedError

value_zip_list(self, arg)

value_zip_list: value_zip ( "," value_zip )*

def value_zip_list(self, arg):
    # value_zip_list: value_zip ( "," value_zip )*
    raise NotImplementedError

recursive_postprocessing(filter_, condition, replacement)

Recursively descend into the query, checking each dictionary (contained in a list, or as an entry in another dictionary) for the condition passed. If the condition is true, apply the replacement to the dictionary.


Name Type Description Default
filter_ list/dict

the filter_ to process.

condition callable

a function that returns True if the replacement function should be applied. It should take as arguments the property and expression from the filter_, as would be returned by iterating over filter_.items().

replacement callable

a function that returns the processed dictionary. It should take as arguments the dictionary to modify, the property and the expression (as described above).



For the simple case of replacing one field name with another, the following functions could be used:

def condition(prop, expr):
    return prop == "field_name_old"

def replacement(d, prop, expr):
    d["field_name_old"] = d.pop(prop)

filter_ = recursive_postprocessing(
    filter_, condition, replacement
def recursive_postprocessing(filter_, condition, replacement):
    """Recursively descend into the query, checking each dictionary
    (contained in a list, or as an entry in another dictionary) for
    the condition passed. If the condition is true, apply the
    replacement to the dictionary.

        filter_ (list/dict): the filter_ to process.
        condition (callable): a function that returns True if the
            replacement function should be applied. It should take
            as arguments the property and expression from the filter_,
            as would be returned by iterating over `filter_.items()`.
        replacement (callable): a function that returns the processed
            dictionary. It should take as arguments the dictionary
            to modify, the property and the expression (as described

        For the simple case of replacing one field name with
        another, the following functions could be used:

        def condition(prop, expr):
            return prop == "field_name_old"

        def replacement(d, prop, expr):
            d["field_name_old"] = d.pop(prop)

        filter_ = recursive_postprocessing(
            filter_, condition, replacement


    if isinstance(filter_, list):
        result = [recursive_postprocessing(q, condition, replacement) for q in filter_]
        return result

    if isinstance(filter_, dict):
        # this could potentially lead to memory leaks if the filter_ is *heavily* nested
        _cached_filter = copy.deepcopy(filter_)
        for prop, expr in filter_.items():
            if condition(prop, expr):
                _cached_filter = replacement(_cached_filter, prop, expr)
            elif isinstance(expr, list):
                _cached_filter[prop] = [
                    recursive_postprocessing(q, condition, replacement) for q in expr
        return _cached_filter

    return filter_