Skip to content

gRPC Interface

All capabilities communicate with the Selu orchestrator through a single gRPC service defined in capability.proto. This keeps the interface uniform regardless of what language your capability is written in.

The key messages are shown below. See the full proto reference for the complete definition including artifact messages.

capability.proto (simplified)
syntax = "proto3";
package selu.capability;
service Capability {
rpc Invoke(InvokeRequest) returns (InvokeResponse);
rpc StreamInvoke(InvokeRequest) returns (stream InvokeChunk);
rpc Healthcheck(HealthRequest) returns (HealthResponse);
rpc UploadInputArtifact(stream UploadInputArtifactChunk) returns (UploadInputArtifactResponse);
rpc DownloadOutputArtifact(DownloadOutputArtifactRequest) returns (stream ArtifactChunk);
}
message InvokeRequest {
string tool_name = 1;
bytes args_json = 2;
bytes config_json = 3;
string session_id = 4;
string capability_id = 5;
string thread_id = 6;
}
message InvokeResponse {
bytes result_json = 1;
string error = 2;
}
message InvokeChunk {
bytes data = 1;
bool done = 2;
string error = 3;
}
message HealthRequest {}
message HealthResponse {
bool ready = 1;
string message = 2;
}
FieldTypeDescription
tool_namestringMatches the name field in your manifest.yaml tools list or discovered tools. A single capability can expose multiple tools.
args_jsonbytesJSON-encoded tool arguments conforming to the tool’s input_schema. Parse this in your handler.
config_jsonbytesJSON-encoded configuration object containing credentials and runtime config injected by the orchestrator.
session_idstringUnique session identifier for workspace isolation. Use this for temporary file naming or workspace organization.
capability_idstringThe unique identifier of this capability instance.
thread_idstringConversation thread within the session. Use this to maintain per-conversation state if needed.

There is no separate credentials field. The orchestrator bundles credentials declared in your manifest.yaml into the config_json object alongside any other runtime configuration. Parse config_json as JSON to access them:

config = json.loads(request.config_json)
api_key = config.get("WEATHER_API_KEY")
Example config_json contents
{
"WEATHER_API_KEY": "abc123...",
"PREMIUM_TOKEN": "xyz789..."
}

Only credentials that are currently configured will be included. Optional credentials may be missing if the user hasn’t set them.

FieldTypeDescription
result_jsonbytesJSON-encoded result. This is injected into the LLM conversation as the tool result. Must be valid JSON encoded as bytes.
errorstringIf non-empty, the invocation failed and this contains the error message shown to the LLM.

For capabilities that produce incremental output (search results, progress updates, long-running operations), implement StreamInvoke instead of or in addition to Invoke. It takes the same InvokeRequest but returns a stream of InvokeChunk messages:

FieldTypeDescription
databytesA chunk of the result payload.
donebooltrue on the final chunk in the stream.
errorstringIf non-empty, the stream is terminating with an error.
Streaming example
def StreamInvoke(self, request, context):
args = json.loads(request.args_json)
query = args["query"]
for result in search_incrementally(query):
yield pb2.InvokeChunk(
data=json.dumps(result).encode(),
done=False,
)
yield pb2.InvokeChunk(data=b"", done=True)

The orchestrator calls Healthcheck (lowercase “c”) periodically. If a capability returns ready: false or fails to respond within the timeout, the orchestrator marks it as unhealthy and stops routing invocations until it recovers.

FieldTypeDescription
readybooltrue when the capability is ready to accept invocations.
messagestringOptional human-readable status detail (e.g. reason for not being ready).
def Healthcheck(self, request, context):
return pb2.HealthResponse(ready=True, message="")

For capabilities using tool_source: dynamic, implement a discovery tool that returns available tools:

server.py
def Invoke(self, request, context):
if request.tool_name == "list_tools":
config = json.loads(request.config_json)
tools = []
# Always available
tools.append({
"name": "basic_search",
"description": "Search through available data",
"input_schema": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"}
},
"required": ["query"]
},
"recommended_policy": "allow"
})
# Available only with credentials
if "PREMIUM_API_KEY" in config:
tools.append({
"name": "premium_search",
"description": "Enhanced search with premium features",
"input_schema": {
"type": "object",
"properties": {
"query": {"type": "string"},
"filters": {"type": "array", "items": {"type": "string"}}
},
"required": ["query"]
},
"recommended_policy": "ask"
})
return pb2.InvokeResponse(
result_json=json.dumps(tools).encode(),
error="",
)

The discovery tool response must be a JSON array of tool objects, each with:

  • name — Tool identifier
  • description — Human-readable explanation
  • input_schema — JSON Schema for parameters
  • recommended_policy (optional) — Suggested security policy

Your gRPC server must listen on port 50051 inside the container. The orchestrator connects to this port automatically.

Here’s a complete Python example:

server.py
import grpc
from concurrent import futures
import json
import capability_pb2 as pb2
import capability_pb2_grpc as pb2_grpc
class CapabilityServicer(pb2_grpc.CapabilityServicer):
def Invoke(self, request, context):
try:
args = json.loads(request.args_json)
config = json.loads(request.config_json)
if request.tool_name == "list_tools":
return self._handle_discovery(config)
elif request.tool_name == "weather_lookup":
return self._handle_weather(args, config)
else:
return pb2.InvokeResponse(
result_json=b"",
error=f"Unknown tool: {request.tool_name}",
)
except Exception as e:
return pb2.InvokeResponse(
result_json=b"",
error=f"Internal error: {str(e)}",
)
def _handle_discovery(self, config):
tools = [
{
"name": "weather_lookup",
"description": "Get current weather for a location",
"input_schema": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "City name or coordinates"
}
},
"required": ["location"]
},
"recommended_policy": "allow"
}
]
return pb2.InvokeResponse(
result_json=json.dumps(tools).encode(),
error="",
)
def _handle_weather(self, args, config):
location = args.get("location", "unknown")
api_key = config.get("WEATHER_API_KEY")
if not api_key:
return pb2.InvokeResponse(
result_json=b"",
error="Weather API key not configured. Please set WEATHER_API_KEY in your credentials.",
)
# Call weather API with api_key...
weather_data = {"temperature": 22, "conditions": "sunny"}
return pb2.InvokeResponse(
result_json=json.dumps(weather_data).encode(),
error="",
)
def Healthcheck(self, request, context):
return pb2.HealthResponse(ready=True)
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=4))
pb2_grpc.add_CapabilityServicer_to_server(CapabilityServicer(), server)
server.add_insecure_port("[::]:50051")
print("Starting capability server on port 50051...")
server.start()
server.wait_for_termination()
if __name__ == "__main__":
serve()

Artifacts are the mechanism for passing files between the orchestrator and capabilities. The proto defines two RPCs for this:

  • UploadInputArtifact — The orchestrator streams a file into the capability before invocation. The capability returns a capability_artifact_id that can be referenced in args_json.
  • DownloadOutputArtifact — After invocation, the orchestrator retrieves output files by streaming them back via ArtifactChunk messages.

Both directions use chunked streaming, so there is no hard size limit on files.

Generate stubs from capability.proto using standard gRPC tooling for your language. Selu publishes the proto file at:

https://github.com/selu-bot/proto/blob/main/capability/v1/capability.proto

For Python:

Terminal window
python -m grpc_tools.protoc \
-I. \
--python_out=. \
--grpc_python_out=. \
capability.proto

For Go:

Terminal window
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
capability.proto

The official Selu SDKs for Go and Python include pre-generated stubs, so you typically do not need to run code generation yourself.

There is no success boolean on InvokeResponse. To determine whether a call succeeded, check whether the error field is empty:

Checking for errors (caller side)
response = stub.Invoke(request)
if response.error:
print(f"Tool failed: {response.error}")
else:
result = json.loads(response.result_json)

When returning errors from your capability, set error to a meaningful message and leave result_json empty:

Returning errors (capability side)
# Good error messages
return pb2.InvokeResponse(
result_json=b"",
error="Weather API key not configured. Please set WEATHER_API_KEY in your credentials.",
)
return pb2.InvokeResponse(
result_json=b"",
error="Invalid location format. Use city name or 'lat,lon' coordinates.",
)
return pb2.InvokeResponse(
result_json=b"",
error="Weather service temporarily unavailable. Try again in a few minutes.",
)

The LLM will see the error message and use it to explain to the user what went wrong. Keep error messages descriptive and actionable.