Skip to content

utils

This submodule implements some useful utilities for dealing with OPTIMADE providers that can be used in server or client code.

get_all_databases(include_providers=None, exclude_providers=None, exclude_databases=None, progress=None, skip_ssl=False)

Iterate through all databases reported by registered OPTIMADE providers.

Parameters:

Name Type Description Default
include_providers Container[str] | None

A set/container of provider IDs to include child databases for.

None
exclude_providers Container[str] | None

A set/container of provider IDs to exclude child databases for.

None
exclude_databases Container[str] | None

A set/container of specific database URLs to exclude.

None

Returns:

Type Description
Iterable[str]

A generator of child database links that obey the given parameters.

Source code in optimade/utils.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
def get_all_databases(
    include_providers: Container[str] | None = None,
    exclude_providers: Container[str] | None = None,
    exclude_databases: Container[str] | None = None,
    progress: "Optional[rich.Progress]" = None,
    skip_ssl: bool = False,
) -> Iterable[str]:
    """Iterate through all databases reported by registered OPTIMADE providers.

    Parameters:
        include_providers: A set/container of provider IDs to include child databases for.
        exclude_providers: A set/container of provider IDs to exclude child databases for.
        exclude_databases: A set/container of specific database URLs to exclude.

    Returns:
        A generator of child database links that obey the given parameters.

    """
    if progress is not None:
        _task = progress.add_task(
            description="Retrieving all databases from registered OPTIMADE providers...",
            total=None,
        )
    else:
        progress = contextlib.nullcontext()
        progress.print = lambda _: None  # type: ignore[attr-defined]
        progress.advance = lambda *_: None  # type: ignore[attr-defined]
        _task = None

    with progress:
        for provider in get_providers():
            if exclude_providers and provider["id"] in exclude_providers:
                continue
            if include_providers and provider["id"] not in include_providers:
                continue

            try:
                links = get_child_database_links(provider, skip_ssl=skip_ssl)
                for link in links:
                    if link.attributes.base_url:
                        if (
                            exclude_databases
                            and link.attributes.base_url in exclude_databases
                        ):
                            continue
                        yield str(link.attributes.base_url)
                if links and progress is not None:
                    progress.advance(_task, 1)
                    progress.print(
                        f"Retrieved databases from [bold green]{provider['id']}[/bold green]"
                    )
            except RuntimeError as exc:
                if progress is not None:
                    progress.print(
                        f"Unable to retrieve databases from [bold red]{provider['id']}[/bold red]: {exc}",
                    )
                pass

For a provider, return a list of available child databases.

Parameters:

Name Type Description Default
provider LinksResource

The links entry for the provider.

required
obey_aggregate bool

Whether to only return links that allow aggregation.

True
headers dict | None

Additional HTTP headers to pass to the provider.

None

Returns:

Type Description
list[LinksResource]

A list of the valid links entries from this provider that

list[LinksResource]

have link_type "child".

Raises:

Type Description
RuntimeError

If the provider's index meta-database is down, invalid, or the request otherwise fails.

Source code in optimade/utils.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
def get_child_database_links(
    provider: LinksResource,
    obey_aggregate: bool = True,
    headers: dict | None = None,
    skip_ssl: bool = False,
) -> list[LinksResource]:
    """For a provider, return a list of available child databases.

    Arguments:
        provider: The links entry for the provider.
        obey_aggregate: Whether to only return links that allow
            aggregation.
        headers: Additional HTTP headers to pass to the provider.

    Returns:
        A list of the valid links entries from this provider that
        have `link_type` `"child"`.

    Raises:
        RuntimeError: If the provider's index meta-database is down,
            invalid, or the request otherwise fails.

    """
    import requests

    from optimade.models.links import Aggregate, LinkType

    base_url = provider.pop("base_url")
    if base_url is None:
        raise RuntimeError(f"Provider {provider['id']} provides no base URL.")

    links_endp = base_url + "/v1/links"
    try:
        links = requests.get(links_endp, timeout=10, headers=headers)
    except SSLError as exc:
        if skip_ssl:
            links = requests.get(links_endp, timeout=10, headers=headers, verify=False)
        else:
            raise RuntimeError(
                f"SSL error when connecting to provider {provider['id']}. Use `skip_ssl` to ignore."
            ) from exc
    except (requests.ConnectionError, requests.Timeout) as exc:
        raise RuntimeError(f"Unable to connect to provider {provider['id']}") from exc

    if links.status_code != 200:
        raise RuntimeError(
            f"Invalid response from {links_endp} for provider {provider['id']}: {links.content!r}."
        )

    try:
        links_resources = links.json().get("data", [])
        return_links = []
        for link in links_resources:
            link = LinksResource(**link)
            if (
                link.attributes.link_type == LinkType.CHILD
                and link.attributes.base_url is not None
                and (not obey_aggregate or link.attributes.aggregate == Aggregate.OK)
            ):
                return_links.append(link)

        return return_links

    except (ValidationError, json.JSONDecodeError) as exc:
        raise RuntimeError(
            f"Did not understand response from {provider['id']}: {links.content!r}, {exc}"
        )

get_providers(add_mongo_id=False)

Retrieve Materials-Consortia providers (from https://providers.optimade.org/v1/links).

Fallback order if providers.optimade.org is not available:

  1. Try Materials-Consortia/providers on GitHub.
  2. Try submodule providers' list of providers.
  3. Log warning that providers list from Materials-Consortia is not included in the /links-endpoint.

Parameters:

Name Type Description Default
add_mongo_id bool

Whether to populate the _id field of the provider with MongoDB ObjectID.

False

Returns:

Type Description
list

List of raw JSON-decoded providers including MongoDB object IDs.

Source code in optimade/utils.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def get_providers(add_mongo_id: bool = False) -> list:
    """Retrieve Materials-Consortia providers (from https://providers.optimade.org/v1/links).

    Fallback order if providers.optimade.org is not available:

    1. Try Materials-Consortia/providers on GitHub.
    2. Try submodule `providers`' list of providers.
    3. Log warning that providers list from Materials-Consortia is not included in the
       `/links`-endpoint.

    Arguments:
        add_mongo_id: Whether to populate the `_id` field of the provider with MongoDB
            ObjectID.

    Returns:
        List of raw JSON-decoded providers including MongoDB object IDs.

    """
    import json

    import requests

    for provider_list_url in PROVIDER_LIST_URLS:
        try:
            providers = requests.get(provider_list_url, timeout=10).json()
        except (
            requests.exceptions.ConnectionError,
            requests.exceptions.ConnectTimeout,
            json.JSONDecodeError,
            requests.exceptions.SSLError,
        ):
            pass
        else:
            break
    else:
        try:
            from optimade.server.data import providers  # type: ignore
        except ImportError:
            from optimade.server.logger import LOGGER

            LOGGER.warning(
                """Could not retrieve a list of providers!

    Tried the following resources:

{}
    The list of providers will not be included in the `/links`-endpoint.
""".format("".join([f"    * {_}\n" for _ in PROVIDER_LIST_URLS]))
            )
            return []

    providers_list = []
    for provider in providers.get("data", []):
        # Remove/skip "exmpl"
        if provider["id"] == "exmpl":
            continue

        provider.update(provider.pop("attributes", {}))

        # Add MongoDB ObjectId
        if add_mongo_id:
            provider["_id"] = {
                "$oid": mongo_id_for_database(provider["id"], provider["type"])
            }

        providers_list.append(provider)

    return providers_list

insert_from_jsonl(jsonl_path, create_default_index=False)

Insert OPTIMADE JSON lines data into the database.

Parameters:

Name Type Description Default
jsonl_path Path

Path to the JSON lines file.

required
create_default_index bool

Whether to create a default index on the id field.

False
Source code in optimade/utils.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def insert_from_jsonl(jsonl_path: Path, create_default_index: bool = False) -> None:
    """Insert OPTIMADE JSON lines data into the database.

    Arguments:
        jsonl_path: Path to the JSON lines file.
        create_default_index: Whether to create a default index on the `id` field.

    """
    from collections import defaultdict

    import bson.json_util

    from optimade.server.logger import LOGGER
    from optimade.server.routers import ENTRY_COLLECTIONS

    batch = defaultdict(list)
    batch_size: int = 1000

    # Attempt to treat path as absolute, otherwise join with root directory
    if not jsonl_path.is_file():
        _jsonl_path = Path(__file__).parent.joinpath(jsonl_path)
        if not _jsonl_path.is_file():
            raise FileNotFoundError(
                f"Could not find file {jsonl_path} or {_jsonl_path}"
            )
        jsonl_path = _jsonl_path

    # If the chosen database backend supports it, make the default indices
    if create_default_index:
        for entry_type in ENTRY_COLLECTIONS:
            try:
                ENTRY_COLLECTIONS[entry_type].create_default_index()
            except NotImplementedError:
                pass

    bad_rows: int = 0
    good_rows: int = 0
    with open(jsonl_path) as handle:
        header = handle.readline()
        header_jsonl = json.loads(header)
        assert header_jsonl.get(
            "x-optimade"
        ), "No x-optimade header, not sure if this is a JSONL file"

        for line_no, json_str in enumerate(handle):
            try:
                if json_str.strip():
                    entry = bson.json_util.loads(json_str)
                else:
                    LOGGER.warning("Could not read any data from L%s", line_no)
                    bad_rows += 1
                    continue
            except json.JSONDecodeError:
                from optimade.server.logger import LOGGER

                LOGGER.warning("Could not read entry L%s JSON: '%s'", line_no, json_str)
                bad_rows += 1
                continue
            try:
                id = entry.get("id", None)
                _type = entry.get("type", None)
                if id is None or _type == "info":
                    # assume this is an info endpoint for pre-1.2
                    continue

                inp_data = entry["attributes"]
                inp_data["id"] = id
                # Append the data to the batch
                batch[_type].append(inp_data)
            except Exception as exc:
                LOGGER.warning(f"Error with entry at L{line_no} -- {entry} -- {exc}")
                bad_rows += 1
                continue

            if len(batch[_type]) >= batch_size:
                ENTRY_COLLECTIONS[_type].insert(batch[_type])
                batch[_type] = []

            good_rows += 1

        # Insert any remaining data
        for entry_type in batch:
            ENTRY_COLLECTIONS[entry_type].insert(batch[entry_type])
            batch[entry_type] = []

        if bad_rows:
            LOGGER.warning("Could not read %d rows from the JSONL file", bad_rows)

        LOGGER.info("Inserted %d rows from the JSONL file", good_rows)

mongo_id_for_database(database_id, database_type)

Produce a MongoDB ObjectId for a database

Source code in optimade/utils.py
118
119
120
121
122
123
124
125
126
127
128
def mongo_id_for_database(database_id: str, database_type: str) -> str:
    """Produce a MongoDB ObjectId for a database"""
    from bson.objectid import ObjectId

    oid = f"{database_id}{database_type}"
    if len(oid) > 12:
        oid = oid[:12]
    elif len(oid) < 12:
        oid = f"{oid}{'0' * (12 - len(oid))}"

    return str(ObjectId(oid.encode("UTF-8")))