Skip to main content

Executing queries in a batch

The Python SDK client allows you to group the execution of multiple queries in a batch. Using a batch to execute code concurrently helps reducing the amount of boilerplate code you would have to write.

Additionally a batch allows you to control how many queries can be executed concurrently, preventing you from overwhelming the Infrahub API server.

A batch is only available with the async version of the SDK client.

Using batches

Using a batch is a 3 step process:

  1. first, we instantiate a batch object
  2. we can then add a tasks to the batch, a task can be any Python Callable that returns an Awaitable, like a coroutine
  3. finally we execute the batch and process the results
import asyncio
from infrahub_sdk import InfrahubClient


async def main():
client = InfrahubClient()
batch = await client.create_batch()

for tag in ["red", "green", "blue", "yellow", "orange"]:
batch.add(task=client.get, kind="BuiltinTag", name__value=tag)

async for _, result in batch.execute():
print(result.name.value)

if __name__ == "__main__":
asyncio.run(main())

Additional arguments passed in the batch.add method call, are being passed as arguments to the task when it gets executed.

Controlling the amount of concurrent queries

By default a batch will execute 5 tasks concurrently, but this can be controlled with the max_concurrent_execution configuration variable for the SDK client.

import asyncio
from infrahub_sdk import Config, InfrahubClient


async def main():
client = InfrahubClient(Config(max_concurrent_execution=10))
batch = await client.create_batch()

for tag in ["red", "green", "blue", "yellow", "orange"]:
batch.add(task=client.get, kind="BuiltinTag", name__value=tag)

async for _, result in batch.execute():
print(result.name.value)

if __name__ == "__main__":
asyncio.run(main())

Handling exceptions in tasks

When a batch executes a task that raises an exception, it will re-raise the exception and this will need to be handled in the calling function. Other tasks that were executing or tasks that have not started yet will not be executed or complete.

import asyncio
from infrahub_sdk import InfrahubClient


async def will_raise(swallowed: bool):
raise Exception()

async def main():
client = InfrahubClient()
batch = await client.create_batch()

batch.add(task=client.get, kind="BuiltinTag", name__value="red")
batch.add(task=will_raise, swallowed=True)
batch.add(task=client.get, kind="BuiltinTag", name__value="green")

async for _, result in batch.execute():
print(result.name.value)

if __name__ == "__main__":
asyncio.run(main())

Alternatively we can choose to return the exception raised in a task. This has the advantage that we can still handle the exception in the calling function and that the other batch tasks will be executed.

import asyncio
from infrahub_sdk import InfrahubClient


async def will_raise(swallowed: bool):
raise Exception()

async def main():
client = InfrahubClient()
batch = await client.create_batch()

batch.add(task=client.get, kind="BuiltinTag", name__value="red")
batch.add(task=will_raise, swallowed=True)
batch.add(task=client.get, kind="BuiltinTag", name__value="green"
)

async for _, result in batch.execute(return_exception=True):
if isinstance(result, Exception):
print("this task has failed")
print(result.name.value)

if __name__ == "__main__":
asyncio.run(main())

Adding extra contexts for tasks

A task can be provided with extra context, through the node argument. The goal is to be able to provide more context when a task has completed. This can be especially useful when a task does not return a useful value.

Imagine a scenario where we want to create 100 tag objects. The save method that we have to call to save the object in the database does not return a useful result. Therefor we will pass the tag object as the node argument to the task, so we can inform the user when the task completes.

import asyncio
from infrahub_sdk import InfrahubClient


async def main():
client = InfrahubClient()
batch = await client.create_batch()

for idx in range(100):
tag = await client.create(kind="BuiltinTag", name=f"tag-{idx}")
batch.add(task=tag.save, node=tag, allow_upsert=True)

async for node, result in batch.execute():
print(f"{node.name.value} was created in Infrahub succesfully")

if __name__ == "__main__":
asyncio.run(main())