Skip to content

API Reference

Core Session Management

This module contains the session manager for the MCP ephemeral K8s library. It is used to create and manage MCP servers in a Kubernetes cluster.

KubernetesSessionManager

Bases: BaseModel

Kubernetes session manager for MCP.

This manager creates and manages Kubernetes jobs for MCP sessions. It implements the async context manager protocol for easy resource management.

Source code in src/mcp_ephemeral_k8s/session_manager.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
class KubernetesSessionManager(BaseModel):
    """
    Kubernetes session manager for MCP.

    This manager creates and manages Kubernetes jobs for MCP sessions.
    It implements the async context manager protocol for easy resource management.
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)

    namespace: str = Field(default="default", description="The namespace to create resources in")
    jobs: dict[str, EphemeralMcpServer] = Field(
        default_factory=dict,
        description="A dictionary mapping between pod names and MCP servers jobs that are running.",
    )
    runtime: KubernetesRuntime = Field(
        default=KubernetesRuntime.KUBECONFIG, description="The runtime to use for the MCP server"
    )
    sleep_time: float = Field(default=1, description="The time to sleep between job status checks")
    max_wait_time: float = Field(default=300, description="The maximum time to wait for a job to complete")
    _api_client: ApiClient = PrivateAttr()
    _batch_v1: BatchV1Api = PrivateAttr()
    _core_v1: CoreV1Api = PrivateAttr()
    _rbac_v1: RbacAuthorizationV1Api = PrivateAttr()

    def load_session_manager(self) -> Self:
        """Load Kubernetes configuration from default location or from service account if running in cluster."""
        self._load_kube_config()
        if not hasattr(self, "_api_client"):
            self._api_client = ApiClient()
        if not hasattr(self, "_batch_v1"):
            self._batch_v1 = BatchV1Api(self._api_client)
        if not hasattr(self, "_core_v1"):
            self._core_v1 = CoreV1Api(self._api_client)
        if not hasattr(self, "_rbac_v1"):
            self._rbac_v1 = RbacAuthorizationV1Api(self._api_client)
        # Check if the configured namespace exists using direct read (more efficient than listing all)
        try:
            self._core_v1.read_namespace(name=self.namespace)
        except Exception as e:
            raise MCPNamespaceNotFoundError(self.namespace) from e
        return self

    def _load_kube_config(self) -> None:
        """Load Kubernetes configuration from default location or from service account if running in cluster."""
        if self.runtime == KubernetesRuntime.KUBECONFIG:
            try:
                load_kube_config(
                    config_file=os.environ.get("KUBECONFIG"),
                    context=os.environ.get("KUBECONTEXT"),
                    client_configuration=None,
                    persist_config=False,
                )
            except (FileNotFoundError, OSError, ConfigException) as e:
                logger.warning(f"Failed to load local kubernetes configuration: {e}. Trying in-cluster configuration")
                self.runtime = KubernetesRuntime.INCLUSTER
            else:
                logger.info("Using local kubernetes configuration")
                return
        if self.runtime == KubernetesRuntime.INCLUSTER:
            try:
                load_incluster_config()
            except (FileNotFoundError, OSError) as e:
                msg = "Failed to load in-cluster configuration"
                raise InvalidKubeConfigError(msg) from e
            else:
                logger.info("Using in-cluster kubernetes configuration")
                return
        raise InvalidKubeConfigError(self.runtime)

    async def __aenter__(self) -> Self:
        """Enter the async context manager."""
        self.load_session_manager()
        return self

    async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        """Exit the async context manager."""
        for job_name in self.jobs:
            await self._delete_job(job_name)

    async def _create_job(self, config: EphemeralMcpServerConfig) -> EphemeralMcpServer:
        """
        Create a job that will run until explicitly terminated.

        This also creates a dedicated ServiceAccount with RBAC permissions for the spawned pod.

        Args:
            config: The configuration for the MCP servers

        Returns:
            The MCP server instance
        """
        # Create ServiceAccount and RBAC resources for the job
        service_account_name = create_service_account_for_job(
            core_v1=self._core_v1,
            rbac_v1=self._rbac_v1,
            job_name=config.job_name,
            namespace=self.namespace,
            sa_config=config.sa_config,
        )

        # Create the job with the service account
        job = create_mcp_server_job(config=config, namespace=self.namespace, service_account_name=service_account_name)
        response = self._batch_v1.create_namespaced_job(namespace=self.namespace, body=job)
        logger.info(f"Job '{config.job_name}' created successfully")
        logger.debug(f"Job response: {response}")
        if not response.metadata or not response.metadata.name:
            raise MCPServerCreationError(str(response.metadata))
        return EphemeralMcpServer(config=config, job_name=response.metadata.name)

    async def _get_job_status(self, job_name: str) -> None | client.V1Job:
        """
        Get current status of a job.

        Args:
            job_name: Name of the pod

        Returns:
            The job status
        """
        return get_mcp_server_job_status(self._batch_v1, job_name, self.namespace)

    async def _check_pod_status(self, job_name: str) -> bool:
        """
        Check the status of pods associated with a job.

        Args:
            job_name: Name of the job/pod

        Returns:
            True if a pod is running and ready (probes successful), False if waiting for pods

        Raises:
            MCPJobError: If a pod is in Failed or Unknown state
        """
        return check_pod_status(self._core_v1, job_name, self.namespace)

    async def _wait_for_job_ready(self, job_name: str) -> None:
        """Wait for a job's pod to be in the running state and ready (probes successful).

        Args:
            job_name: Name of the job/pod
        """
        await wait_for_job_ready(
            self._batch_v1, self._core_v1, job_name, self.namespace, self.sleep_time, self.max_wait_time
        )

    async def _wait_for_job_deletion(self, job_name: str) -> None:
        """Wait for a job to be deleted.

        Args:
            job_name: Name of the job/pod
        """
        await wait_for_job_deletion(self._batch_v1, job_name, self.namespace, self.sleep_time, self.max_wait_time)

    async def _delete_job(self, job_name: str) -> bool:
        """
        Delete a Kubernetes job and its associated pods.

        This also deletes the dedicated ServiceAccount and RBAC resources.

        Args:
            job_name: Name of the job/pod

        Returns:
            True if the job was deleted successfully, False otherwise
        """
        # Remove service port if it exists
        if job_name in self.jobs:
            try:
                self.remove_mcp_server_port(self.jobs[job_name])
            except Exception as e:
                logger.warning(f"Failed to remove MCP server port for job {job_name}: {e}")
        else:
            logger.warning(f"Job {job_name} not found in session manager, skipping port removal")

        # Delete the job and pods
        job_deleted = delete_mcp_server_job(self._core_v1, self._batch_v1, job_name, self.namespace)

        # Delete ServiceAccount and RBAC resources
        if job_name in self.jobs:
            job_config = self.jobs[job_name].config
            cluster_wide = job_config.sa_config.cluster_wide if job_config.sa_config else True
        else:
            # Default to cluster_wide=True if job not found in tracking
            cluster_wide = True
            logger.warning(f"Job {job_name} config not found, using default cluster_wide=True for RBAC cleanup")

        rbac_deleted = delete_service_account_for_job(
            core_v1=self._core_v1,
            rbac_v1=self._rbac_v1,
            job_name=job_name,
            namespace=self.namespace,
            cluster_wide=cluster_wide,
        )

        return job_deleted and rbac_deleted

    async def create_mcp_server(
        self, config: EphemeralMcpServerConfig, wait_for_ready: bool = True, expose_port: bool = False
    ) -> EphemeralMcpServer:
        """Start a new MCP server using the provided configuration.

        Args:
            config: The configuration for the MCP servers
            wait_for_ready: Whether to wait for the job to be ready before returning a response to the client
            expose_port: Whether to expose the port through a Kubernetes service

        Returns:
            The MCP server instance
        """
        mcp_server = await self._create_job(config)
        self.jobs[mcp_server.job_name] = mcp_server
        if wait_for_ready:
            await self._wait_for_job_ready(mcp_server.job_name)
            logger.info(f"MCP server {mcp_server.job_name} ready")
        if expose_port:
            self.expose_mcp_server_port(mcp_server)
            logger.info(f"MCP server {mcp_server.job_name} port exposed with service '{mcp_server.job_name}'")
        return mcp_server

    async def delete_mcp_server(self, job_name: str, wait_for_deletion: bool = True) -> EphemeralMcpServer:
        """Delete the MCP server.

        Args:
            job_name: Name of the job/pod
            wait_for_deletion: Whether to wait for the job to be deleted

        Returns:
            The MCP server instance
        """
        if job_name in self.jobs:
            await self._delete_job(job_name)
            if wait_for_deletion:
                await self._wait_for_job_deletion(job_name)
            config = self.jobs[job_name].config
            result = EphemeralMcpServer(config=config, job_name=job_name)
            del self.jobs[job_name]
            return result
        raise MCPJobNotFoundError(self.namespace, job_name)

    def expose_mcp_server_port(self, mcp_server: EphemeralMcpServer) -> None:
        """Expose the MCP server port to the outside world through a Kubernetes service."""
        expose_mcp_server_port(self._core_v1, mcp_server.job_name, self.namespace, mcp_server.config.port)

    def remove_mcp_server_port(self, mcp_server: EphemeralMcpServer) -> None:
        """Remove the MCP server."""
        remove_mcp_server_port(self._core_v1, mcp_server.job_name, self.namespace)

    async def mount_mcp_server(self, job_name: str) -> tuple[FastMCP, EphemeralMcpServer]:
        """Mount an MCP server over SSE.

        Args:
            job_name: The name of the pod that is running the MCP server.
        """
        if job_name not in self.jobs:
            raise MCPJobNotFoundError(self.namespace, job_name)
        mcp_server = self.jobs[job_name]
        url = str(mcp_server.sse_url)
        if self.runtime == KubernetesRuntime.KUBECONFIG:
            # @TODO we need to port forward when running locally
            url = f"http://localhost:{mcp_server.config.port}/sse"
            logger.warning(
                f"The MCP server is running locally, port forwarding to localhost is required if you want to access {url=!r} for {job_name=!r}"
            )
        else:
            # we are running in a cluster
            url = str(mcp_server.sse_url)
        server = create_proxy_server(url=url)
        logger.info(f"Mounted MCP server {mcp_server.job_name} over SSE")
        return server, mcp_server

__aenter__() async

Enter the async context manager.

Source code in src/mcp_ephemeral_k8s/session_manager.py
112
113
114
115
async def __aenter__(self) -> Self:
    """Enter the async context manager."""
    self.load_session_manager()
    return self

__aexit__(exc_type, exc_val, exc_tb) async

Exit the async context manager.

Source code in src/mcp_ephemeral_k8s/session_manager.py
117
118
119
120
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
    """Exit the async context manager."""
    for job_name in self.jobs:
        await self._delete_job(job_name)

create_mcp_server(config, wait_for_ready=True, expose_port=False) async

Start a new MCP server using the provided configuration.

Parameters:

Name Type Description Default
config EphemeralMcpServerConfig

The configuration for the MCP servers

required
wait_for_ready bool

Whether to wait for the job to be ready before returning a response to the client

True
expose_port bool

Whether to expose the port through a Kubernetes service

False

Returns:

Type Description
EphemeralMcpServer

The MCP server instance

Source code in src/mcp_ephemeral_k8s/session_manager.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
async def create_mcp_server(
    self, config: EphemeralMcpServerConfig, wait_for_ready: bool = True, expose_port: bool = False
) -> EphemeralMcpServer:
    """Start a new MCP server using the provided configuration.

    Args:
        config: The configuration for the MCP servers
        wait_for_ready: Whether to wait for the job to be ready before returning a response to the client
        expose_port: Whether to expose the port through a Kubernetes service

    Returns:
        The MCP server instance
    """
    mcp_server = await self._create_job(config)
    self.jobs[mcp_server.job_name] = mcp_server
    if wait_for_ready:
        await self._wait_for_job_ready(mcp_server.job_name)
        logger.info(f"MCP server {mcp_server.job_name} ready")
    if expose_port:
        self.expose_mcp_server_port(mcp_server)
        logger.info(f"MCP server {mcp_server.job_name} port exposed with service '{mcp_server.job_name}'")
    return mcp_server

delete_mcp_server(job_name, wait_for_deletion=True) async

Delete the MCP server.

Parameters:

Name Type Description Default
job_name str

Name of the job/pod

required
wait_for_deletion bool

Whether to wait for the job to be deleted

True

Returns:

Type Description
EphemeralMcpServer

The MCP server instance

Source code in src/mcp_ephemeral_k8s/session_manager.py
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
async def delete_mcp_server(self, job_name: str, wait_for_deletion: bool = True) -> EphemeralMcpServer:
    """Delete the MCP server.

    Args:
        job_name: Name of the job/pod
        wait_for_deletion: Whether to wait for the job to be deleted

    Returns:
        The MCP server instance
    """
    if job_name in self.jobs:
        await self._delete_job(job_name)
        if wait_for_deletion:
            await self._wait_for_job_deletion(job_name)
        config = self.jobs[job_name].config
        result = EphemeralMcpServer(config=config, job_name=job_name)
        del self.jobs[job_name]
        return result
    raise MCPJobNotFoundError(self.namespace, job_name)

expose_mcp_server_port(mcp_server)

Expose the MCP server port to the outside world through a Kubernetes service.

Source code in src/mcp_ephemeral_k8s/session_manager.py
283
284
285
def expose_mcp_server_port(self, mcp_server: EphemeralMcpServer) -> None:
    """Expose the MCP server port to the outside world through a Kubernetes service."""
    expose_mcp_server_port(self._core_v1, mcp_server.job_name, self.namespace, mcp_server.config.port)

load_session_manager()

Load Kubernetes configuration from default location or from service account if running in cluster.

Source code in src/mcp_ephemeral_k8s/session_manager.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def load_session_manager(self) -> Self:
    """Load Kubernetes configuration from default location or from service account if running in cluster."""
    self._load_kube_config()
    if not hasattr(self, "_api_client"):
        self._api_client = ApiClient()
    if not hasattr(self, "_batch_v1"):
        self._batch_v1 = BatchV1Api(self._api_client)
    if not hasattr(self, "_core_v1"):
        self._core_v1 = CoreV1Api(self._api_client)
    if not hasattr(self, "_rbac_v1"):
        self._rbac_v1 = RbacAuthorizationV1Api(self._api_client)
    # Check if the configured namespace exists using direct read (more efficient than listing all)
    try:
        self._core_v1.read_namespace(name=self.namespace)
    except Exception as e:
        raise MCPNamespaceNotFoundError(self.namespace) from e
    return self

mount_mcp_server(job_name) async

Mount an MCP server over SSE.

Parameters:

Name Type Description Default
job_name str

The name of the pod that is running the MCP server.

required
Source code in src/mcp_ephemeral_k8s/session_manager.py
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
async def mount_mcp_server(self, job_name: str) -> tuple[FastMCP, EphemeralMcpServer]:
    """Mount an MCP server over SSE.

    Args:
        job_name: The name of the pod that is running the MCP server.
    """
    if job_name not in self.jobs:
        raise MCPJobNotFoundError(self.namespace, job_name)
    mcp_server = self.jobs[job_name]
    url = str(mcp_server.sse_url)
    if self.runtime == KubernetesRuntime.KUBECONFIG:
        # @TODO we need to port forward when running locally
        url = f"http://localhost:{mcp_server.config.port}/sse"
        logger.warning(
            f"The MCP server is running locally, port forwarding to localhost is required if you want to access {url=!r} for {job_name=!r}"
        )
    else:
        # we are running in a cluster
        url = str(mcp_server.sse_url)
    server = create_proxy_server(url=url)
    logger.info(f"Mounted MCP server {mcp_server.job_name} over SSE")
    return server, mcp_server

remove_mcp_server_port(mcp_server)

Remove the MCP server.

Source code in src/mcp_ephemeral_k8s/session_manager.py
287
288
289
def remove_mcp_server_port(self, mcp_server: EphemeralMcpServer) -> None:
    """Remove the MCP server."""
    remove_mcp_server_port(self._core_v1, mcp_server.job_name, self.namespace)

Command Line Interface

McpEphemeralK8s

Bases: BaseSettings

The MCP ephemeral K8s CLI.

Source code in src/mcp_ephemeral_k8s/cli.py
45
46
47
48
49
50
51
52
53
class McpEphemeralK8s(BaseSettings):
    """The MCP ephemeral K8s CLI."""

    model_config = SettingsConfigDict(cli_kebab_case=True)
    init: CliSubCommand[Init] = Field(description="Initialize the MCP ephemeral K8s client in the current directory")
    serve: CliSubCommand[Serve] = Field(description="Serve the MCP ephemeral K8s client")

    def cli_cmd(self) -> None:
        CliApp.run_subcommand(self)

Application Layer

MCP server application, meant to be used as an MCP server that can spawn other MCP servers.

create_mcp_server(ctx, runtime_exec, runtime_mcp, runtime_args='', env=None, wait_for_ready=False) async

Create a new ephemeral MCP server in Kubernetes.

Spawns a new MCP server instance as a Kubernetes Job with the specified configuration. The server runs in an isolated environment and can be customized with different runtime executors, packages, arguments, and environment variables.

Parameters:

Name Type Description Default
runtime_exec str

The executor to use for running the MCP server (e.g., 'uvx' for Python packages, 'npx' for Node.js packages, 'docker' for container images).

required
runtime_mcp str

The MCP package or image to run (e.g., 'mcp-server-sqlite' for a Python-based SQLite MCP server, or '@modelcontextprotocol/server-filesystem' for a Node.js filesystem server).

required
runtime_args str

Optional arguments to pass to the MCP server at runtime. These are appended to the command line when starting the server.

''
env dict[str, str] | None

Optional dictionary of environment variables to set in the server's container. Useful for configuration, authentication tokens, or feature flags.

None
wait_for_ready bool

If True, this call will block until the Kubernetes Job is in a ready state. If False, returns immediately after submitting the Job.

False

Returns:

Type Description
EphemeralMcpServer

An EphemeralMcpServer object containing the created server's configuration,

EphemeralMcpServer

pod name, status, and connection details.

Example usage

To create a filesystem MCP server: create_mcp_server( runtime_exec='npx', runtime_mcp='@modelcontextprotocol/server-filesystem', runtime_args='/tmp', wait_for_ready=True )

Source code in src/mcp_ephemeral_k8s/app/mcp_server.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
@mcp.tool("create_mcp_server")
async def create_mcp_server(
    ctx: Context,
    runtime_exec: str,
    runtime_mcp: str,
    runtime_args: str = "",
    env: dict[str, str] | None = None,
    wait_for_ready: bool = False,
) -> EphemeralMcpServer:
    """
    Create a new ephemeral MCP server in Kubernetes.

    Spawns a new MCP server instance as a Kubernetes Job with the specified configuration.
    The server runs in an isolated environment and can be customized with different
    runtime executors, packages, arguments, and environment variables.

    Args:
        runtime_exec: The executor to use for running the MCP server (e.g., 'uvx' for
            Python packages, 'npx' for Node.js packages, 'docker' for container images).
        runtime_mcp: The MCP package or image to run (e.g., 'mcp-server-sqlite' for a
            Python-based SQLite MCP server, or '@modelcontextprotocol/server-filesystem'
            for a Node.js filesystem server).
        runtime_args: Optional arguments to pass to the MCP server at runtime. These are
            appended to the command line when starting the server.
        env: Optional dictionary of environment variables to set in the server's container.
            Useful for configuration, authentication tokens, or feature flags.
        wait_for_ready: If True, this call will block until the Kubernetes Job is in a
            ready state. If False, returns immediately after submitting the Job.

    Returns:
        An EphemeralMcpServer object containing the created server's configuration,
        pod name, status, and connection details.

    Example usage:
        To create a filesystem MCP server: create_mcp_server(
            runtime_exec='npx',
            runtime_mcp='@modelcontextprotocol/server-filesystem',
            runtime_args='/tmp',
            wait_for_ready=True
        )
    """
    config = EphemeralMcpServerConfig(
        runtime_exec=runtime_exec,
        runtime_mcp=runtime_mcp,
        runtime_args=runtime_args,
        env=env,
    )
    session_manager: KubernetesSessionManager = ctx.request_context.lifespan_context
    return await session_manager.create_mcp_server(config, wait_for_ready=wait_for_ready, expose_port=True)

delete_mcp_server(ctx, job_name, wait_for_deletion=False) async

Delete an ephemeral MCP server and clean up its resources.

Terminates a running MCP server instance and removes its associated Kubernetes resources including the Job, Pod, and any exposed Services. This is important for cleaning up resources when an MCP server is no longer needed.

Parameters:

Name Type Description Default
job_name str

The name of the pod/server to delete. This is returned when creating a server and can be retrieved using list_mcp_servers().

required
wait_for_deletion bool

If True, this call will block until the Kubernetes resources are fully deleted and removed from the cluster. If False, initiates deletion and returns immediately.

False

Returns:

Type Description
EphemeralMcpServer

An EphemeralMcpServer object containing the deleted server's final state and

EphemeralMcpServer

metadata before removal.

Example usage

After retrieving the list of servers, delete a specific one by its pod name: delete_mcp_server(job_name='mcp-server-xyz123', wait_for_deletion=True)

Source code in src/mcp_ephemeral_k8s/app/mcp_server.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
@mcp.tool("delete_mcp_server")
async def delete_mcp_server(ctx: Context, job_name: str, wait_for_deletion: bool = False) -> EphemeralMcpServer:
    """
    Delete an ephemeral MCP server and clean up its resources.

    Terminates a running MCP server instance and removes its associated Kubernetes
    resources including the Job, Pod, and any exposed Services. This is important
    for cleaning up resources when an MCP server is no longer needed.

    Args:
        job_name: The name of the pod/server to delete. This is returned when creating
            a server and can be retrieved using list_mcp_servers().
        wait_for_deletion: If True, this call will block until the Kubernetes resources
            are fully deleted and removed from the cluster. If False, initiates deletion
            and returns immediately.

    Returns:
        An EphemeralMcpServer object containing the deleted server's final state and
        metadata before removal.

    Example usage:
        After retrieving the list of servers, delete a specific one by its pod name:
        delete_mcp_server(job_name='mcp-server-xyz123', wait_for_deletion=True)
    """
    session_manager: KubernetesSessionManager = ctx.request_context.lifespan_context
    return await session_manager.delete_mcp_server(job_name, wait_for_deletion=wait_for_deletion)

get_mcp_server_status(ctx, job_name) async

Get the detailed Kubernetes status of an MCP server.

Retrieves the underlying Kubernetes Job status for a specific MCP server instance. This provides low-level details about the Job's execution state, including conditions, start/completion times, and any failure information.

Parameters:

Name Type Description Default
job_name str

The name of the pod/server to check. This identifier is returned when creating a server and can be retrieved using list_mcp_servers().

required

Returns:

Type Description
V1Job | None

A Kubernetes V1Job object containing the Job's complete status information,

V1Job | None

or None if the Job is not found. The Job status includes fields such as:

V1Job | None
  • active: number of active pods
V1Job | None
  • succeeded: number of succeeded pods
V1Job | None
  • failed: number of failed pods
V1Job | None
  • conditions: detailed state information
V1Job | None
  • start_time and completion_time
Example usage

Use this to debug issues with a server or to check if a Job has completed successfully: get_mcp_server_status(job_name='mcp-server-xyz123')

Source code in src/mcp_ephemeral_k8s/app/mcp_server.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
@mcp.tool("get_mcp_server_status")
async def get_mcp_server_status(ctx: Context, job_name: str) -> client.V1Job | None:
    """
    Get the detailed Kubernetes status of an MCP server.

    Retrieves the underlying Kubernetes Job status for a specific MCP server instance.
    This provides low-level details about the Job's execution state, including
    conditions, start/completion times, and any failure information.

    Args:
        job_name: The name of the pod/server to check. This identifier is returned
            when creating a server and can be retrieved using list_mcp_servers().

    Returns:
        A Kubernetes V1Job object containing the Job's complete status information,
        or None if the Job is not found. The Job status includes fields such as:
        - active: number of active pods
        - succeeded: number of succeeded pods
        - failed: number of failed pods
        - conditions: detailed state information
        - start_time and completion_time

    Example usage:
        Use this to debug issues with a server or to check if a Job has completed
        successfully: get_mcp_server_status(job_name='mcp-server-xyz123')
    """
    session_manager: KubernetesSessionManager = ctx.request_context.lifespan_context
    return await session_manager._get_job_status(job_name)

get_version()

Get the version of the MCP ephemeral server.

Source code in src/mcp_ephemeral_k8s/app/mcp_server.py
44
45
46
47
@mcp.resource("config://version")
def get_version() -> str:
    """Get the version of the MCP ephemeral server."""
    return __version__

health_check(request) async

Health check endpoint for the MCP server.

Source code in src/mcp_ephemeral_k8s/app/mcp_server.py
238
239
240
241
@mcp.custom_route("/health", methods=["GET"])
async def health_check(request: Request) -> PlainTextResponse:
    """Health check endpoint for the MCP server."""
    return PlainTextResponse("OK", status_code=200)

lifespan(server) async

Lifecycle hooks for the MCP ephemeral server.

Source code in src/mcp_ephemeral_k8s/app/mcp_server.py
18
19
20
21
22
23
24
25
26
@asynccontextmanager
async def lifespan(server: FastMCP) -> AsyncIterator[KubernetesSessionManager]:
    """
    Lifecycle hooks for the MCP ephemeral server.
    """
    async with KubernetesSessionManager(
        namespace="default", jobs={}, sleep_time=1, max_wait_time=60
    ) as session_manager:
        yield session_manager

list_mcp_servers(ctx) async

List all currently running ephemeral MCP servers.

Retrieves information about all MCP server instances that are currently managed by this controller. Each server entry includes its pod name, configuration details, current status, creation timestamp, and connection information if available.

Returns:

Type Description
list[EphemeralMcpServer]

A list of EphemeralMcpServer objects representing all active MCP servers.

list[EphemeralMcpServer]

Each object contains the server's configuration, runtime status, and metadata.

Example usage

Call this tool to see what MCP servers are currently running before creating new ones or to check the status of existing servers.

Source code in src/mcp_ephemeral_k8s/app/mcp_server.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
@mcp.tool("list_mcp_servers")
async def list_mcp_servers(ctx: Context) -> list[EphemeralMcpServer]:
    """
    List all currently running ephemeral MCP servers.

    Retrieves information about all MCP server instances that are currently managed
    by this controller. Each server entry includes its pod name, configuration details,
    current status, creation timestamp, and connection information if available.

    Returns:
        A list of EphemeralMcpServer objects representing all active MCP servers.
        Each object contains the server's configuration, runtime status, and metadata.

    Example usage:
        Call this tool to see what MCP servers are currently running before creating
        new ones or to check the status of existing servers.
    """
    session_manager: KubernetesSessionManager = ctx.request_context.lifespan_context
    return list(session_manager.jobs.values())

list_presets()

List all available preset MCP server configurations.

Returns a list of pre-configured MCP server templates that can be used as examples or starting points for creating new ephemeral MCP servers. Each preset includes the runtime executor (e.g., 'uvx', 'npx'), the MCP package to install, optional runtime arguments, and environment variables.

Returns:

Type Description
list[EphemeralMcpServerConfig]

A list of EphemeralMcpServerConfig objects containing preset configurations.

list[EphemeralMcpServerConfig]

Each config specifies how to run a specific MCP server package.

Example usage

Use this to discover available MCP server configurations before creating one.

Source code in src/mcp_ephemeral_k8s/app/mcp_server.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@mcp.tool("list_presets")
def list_presets() -> list[EphemeralMcpServerConfig]:
    """
    List all available preset MCP server configurations.

    Returns a list of pre-configured MCP server templates that can be used as examples
    or starting points for creating new ephemeral MCP servers. Each preset includes
    the runtime executor (e.g., 'uvx', 'npx'), the MCP package to install, optional
    runtime arguments, and environment variables.

    Returns:
        A list of EphemeralMcpServerConfig objects containing preset configurations.
        Each config specifies how to run a specific MCP server package.

    Example usage:
        Use this to discover available MCP server configurations before creating one.
    """
    return presets.EXAMPLE_MCP_SERVER_CONFIGS

main(transport='sse', show_banner=True, allow_origins=None, **transport_kwargs)

Run the FastMCP server.

Source code in src/mcp_ephemeral_k8s/app/mcp_server.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
def main(
    transport: Transport = "sse",
    show_banner: bool = True,
    allow_origins: list[str] | None = None,
    **transport_kwargs: Any,
) -> None:
    """Run the FastMCP server."""
    # Configure CORS middleware for SSE transport to support browser-based clients
    if transport == "sse":
        cors_middleware = [
            (
                CORSMiddleware,
                (),
                {
                    "allow_origins": allow_origins or ["*"],
                    "allow_credentials": True,
                    "allow_methods": ["*"],
                    "allow_headers": ["*"],
                },
            )
        ]
        transport_kwargs.setdefault("middleware", cors_middleware)

    mcp.run(transport=transport, show_banner=show_banner, **transport_kwargs)

mount_mcp_server(ctx, job_name, name=None) async

Mount a remote MCP server over SSE.

Parameters:

Name Type Description Default
job_name str

The name of the pod that is running the remote MCP server.

required
name str | None

The name of the proxy server.

None
Source code in src/mcp_ephemeral_k8s/app/mcp_server.py
201
202
203
204
205
206
207
208
209
210
211
212
@mcp.tool("mount_mcp_server")
async def mount_mcp_server(ctx: Context, job_name: str, name: str | None = None) -> EphemeralMcpServer:
    """Mount a remote MCP server over SSE.

    Args:
        job_name: The name of the pod that is running the remote MCP server.
        name: The name of the proxy server.
    """
    session_manager: KubernetesSessionManager = ctx.request_context.lifespan_context
    server, ephemeral_server = await session_manager.mount_mcp_server(job_name)
    mcp.mount(server=server, prefix=name, as_proxy=True)
    return ephemeral_server

remove_mcp_server_mount(name=None) async

Remove the mount of an MCP server.

Parameters:

Name Type Description Default
name str | None

The name of the server to remove. If None, all mounted servers with a prefix will be removed.

None
Source code in src/mcp_ephemeral_k8s/app/mcp_server.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
@mcp.tool("remove_mcp_server_mount")
async def remove_mcp_server_mount(name: str | None = None) -> None:
    """Remove the mount of an MCP server.

    Args:
        name: The name of the server to remove. If None, all mounted servers with a prefix will be removed.
    """
    # Collect servers to remove first to avoid index shifting issues
    servers_to_remove = [
        mounted_server
        for mounted_server in mcp._mounted_servers
        if mounted_server.prefix == name or (name is None and mounted_server.prefix is not None)
    ]

    if not servers_to_remove:
        msg = f"No mounted server found with name {name}" if name is not None else "No mounted servers found"
        raise ValueError(msg)

    # Remove servers from the list
    for server in servers_to_remove:
        mcp._mounted_servers.remove(server)

API Models & Configuration

This module contains the models for the MCP ephemeral K8s library.

EphemeralMcpServer

Bases: BaseModel

The MCP server that is running in a Kubernetes pod.

Source code in src/mcp_ephemeral_k8s/api/ephemeral_mcp_server.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
class EphemeralMcpServer(BaseModel):
    """The MCP server that is running in a Kubernetes pod."""

    job_name: str = Field(
        description="The name of the pod that is running the MCP server", examples=["mcp-ephemeral-k8s-proxy-test"]
    )
    config: EphemeralMcpServerConfig = Field(
        description="The configuration that was used to create the MCP server",
        examples=[
            EphemeralMcpServerConfig(
                runtime_exec="uvx",
                runtime_mcp="mcp-server-fetch",
                port=8000,
                cors_origins=["*"],
            )
        ],
    )

    @computed_field  # type: ignore[prop-decorator]
    @property
    def url(self) -> HttpUrl:
        """The Uniform Resource Locator (URL) for the MCP server."""
        return HttpUrl(f"http://{self.job_name}.default.svc.cluster.local:{self.config.port}/")

    @computed_field  # type: ignore[prop-decorator]
    @property
    def sse_url(self) -> HttpUrl:
        """The Server-Sent Events (SSE) URL for the MCP server."""
        return HttpUrl(f"{self.url}sse")

sse_url property

The Server-Sent Events (SSE) URL for the MCP server.

url property

The Uniform Resource Locator (URL) for the MCP server.

EphemeralMcpServerConfig

Bases: BaseModel

Configuration for Kubernetes resources.

Source code in src/mcp_ephemeral_k8s/api/ephemeral_mcp_server.py
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
class EphemeralMcpServerConfig(BaseModel):
    """Configuration for Kubernetes resources."""

    runtime_exec: str | None = Field(
        description="The runtime to use for the MCP container. When None, the image is assumed to be a MCP server instead of a proxy.",
        examples=["uvx", "npx"],
    )
    runtime_mcp: str | None = Field(
        description="The runtime to use for the MCP container. Can be any supported MCP server runtime loadable via the `runtime_exec`. See the [MCP Server Runtimes](https://github.com/modelcontextprotocol/servers/tree/main) for a list of supported runtimes.",
        examples=["mcp-server-fetch", "@modelcontextprotocol/server-github"],
    )
    runtime_args: str = Field(
        default_factory=str,
        description="The arguments to pass to the MCP server runtime.",
        examples=["--port 8080"],
    )
    image: str = Field(
        default="ghcr.io/bobmerkus/mcp-ephemeral-k8s-proxy:latest",
        description="The image to use for the MCP server proxy",
    )
    entrypoint: list[str] | None = Field(
        default=["mcp-proxy"],
        description="The entrypoint for the MCP container. Normally not changed unless a custom image is used.",
    )
    host: str = Field(default="0.0.0.0", description="The host to expose the MCP server on")  # noqa: S104
    port: int = Field(default=8080, description="The port to expose the MCP server on")
    resource_requests: dict[str, str] = Field(
        default={"cpu": "100m", "memory": "100Mi"}, description="Resource requests for the container"
    )
    resource_limits: dict[str, str] = Field(
        default={"cpu": "200m", "memory": "200Mi"}, description="Resource limits for the container"
    )
    env: dict[str, str] | None = Field(
        default=None,
        description="Environment variables to set for the container",
        examples=[None, {"GITHUB_PERSONAL_ACCESS_TOKEN": "1234567890", "GITHUB_DYNAMIC_TOOLSETS": "1"}],
    )
    cors_origins: list[str] | None = Field(
        default=["*"],
        description="The origins to allow CORS from",
        examples=["*"],
    )
    probe_config: KubernetesProbeConfig = Field(
        default_factory=KubernetesProbeConfig,
        description="The configuration for the Kubernetes probe",
    )
    sa_config: ServiceAccountConfig | None = Field(
        default=None,
        description="ServiceAccount RBAC configuration. If None, uses minimal preset by default.",
    )

    @model_validator(mode="after")
    def validate_runtime_exec(self) -> Self:
        """Validate the runtime configuration.
        Both runtime_exec and runtime_mcp must be specified, or neither.
        """
        if self.runtime_exec is not None and self.runtime_mcp is None:
            message = "Invalid runtime: runtime_exec is specified but runtime_mcp is not"
            raise MCPInvalidRuntimeError(runtime_exec=self.runtime_exec, runtime_mcp=self.runtime_mcp, message=message)
        if self.runtime_exec is None and self.runtime_mcp is not None:
            message = "Invalid runtime: runtime_mcp is specified but runtime_exec is not"
            raise MCPInvalidRuntimeError(runtime_exec=self.runtime_exec, runtime_mcp=self.runtime_mcp, message=message)
        return self

    @computed_field  # type: ignore[prop-decorator]
    @property
    def args(self) -> list[str] | None:
        """The arguments to pass to the MCP server.
        [mcp-proxy](https://github.com/sparfenyuk/mcp-proxy?tab=readme-ov-file#21-configuration)"""
        if self.runtime_exec is not None and self.runtime_mcp is not None:
            args = [
                self.runtime_exec,
                self.runtime_mcp,
                "--pass-environment",
                f"--port={self.port}",
                f"--host={self.host}",
            ]
            if self.cors_origins is not None:
                args.extend(["--allow-origin", *self.cors_origins])
            if self.runtime_args:
                args.append(f"-- {self.runtime_args}")
            return args
        return None

    @computed_field  # type: ignore[prop-decorator]
    @property
    def image_name(self) -> str:
        """The name of the image to use for the MCP server."""
        return self.image.split("/")[-1].split(":")[0]

    @computed_field  # type: ignore[prop-decorator]
    @cached_property
    def job_name(self) -> str:
        """The name of the job to use for the MCP server."""
        return generate_unique_id(prefix=self.image_name)

    @classmethod
    def from_docker_image(cls, image: str, entrypoint: list[str] | None = None, **kwargs: Any) -> Self:
        """Create an EphemeralMcpServerConfig from a Docker image.
        The image must be a MCP server image, otherwise an error is raised.
        """
        if image.startswith("ghcr.io/bobmerkus/mcp-ephemeral-k8s-proxy") or image.startswith(
            "ghcr.io/sparfenyuk/mcp-proxy"
        ):
            message = "Invalid runtime: image is a proxy image, please use the `runtime_exec` and `runtime_mcp` fields to specify the MCP server to use."
            raise MCPInvalidRuntimeError(runtime_exec=None, runtime_mcp=None, message=message)
        return cls(image=image, entrypoint=entrypoint, runtime_exec=None, runtime_mcp=None, **kwargs)

args property

The arguments to pass to the MCP server. mcp-proxy

image_name property

The name of the image to use for the MCP server.

job_name cached property

The name of the job to use for the MCP server.

from_docker_image(image, entrypoint=None, **kwargs) classmethod

Create an EphemeralMcpServerConfig from a Docker image. The image must be a MCP server image, otherwise an error is raised.

Source code in src/mcp_ephemeral_k8s/api/ephemeral_mcp_server.py
143
144
145
146
147
148
149
150
151
152
153
@classmethod
def from_docker_image(cls, image: str, entrypoint: list[str] | None = None, **kwargs: Any) -> Self:
    """Create an EphemeralMcpServerConfig from a Docker image.
    The image must be a MCP server image, otherwise an error is raised.
    """
    if image.startswith("ghcr.io/bobmerkus/mcp-ephemeral-k8s-proxy") or image.startswith(
        "ghcr.io/sparfenyuk/mcp-proxy"
    ):
        message = "Invalid runtime: image is a proxy image, please use the `runtime_exec` and `runtime_mcp` fields to specify the MCP server to use."
        raise MCPInvalidRuntimeError(runtime_exec=None, runtime_mcp=None, message=message)
    return cls(image=image, entrypoint=entrypoint, runtime_exec=None, runtime_mcp=None, **kwargs)

validate_runtime_exec()

Validate the runtime configuration. Both runtime_exec and runtime_mcp must be specified, or neither.

Source code in src/mcp_ephemeral_k8s/api/ephemeral_mcp_server.py
 98
 99
100
101
102
103
104
105
106
107
108
109
@model_validator(mode="after")
def validate_runtime_exec(self) -> Self:
    """Validate the runtime configuration.
    Both runtime_exec and runtime_mcp must be specified, or neither.
    """
    if self.runtime_exec is not None and self.runtime_mcp is None:
        message = "Invalid runtime: runtime_exec is specified but runtime_mcp is not"
        raise MCPInvalidRuntimeError(runtime_exec=self.runtime_exec, runtime_mcp=self.runtime_mcp, message=message)
    if self.runtime_exec is None and self.runtime_mcp is not None:
        message = "Invalid runtime: runtime_mcp is specified but runtime_exec is not"
        raise MCPInvalidRuntimeError(runtime_exec=self.runtime_exec, runtime_mcp=self.runtime_mcp, message=message)
    return self

KubernetesProbeConfig

Bases: BaseModel

Configuration for Kubernetes readiness probe.

The readiness probe is used to determine when a container is ready to accept traffic. These defaults are tuned for MCP server startup, which may take time to install dependencies.

With defaults: waits 10s initially, then checks every 1s for up to 300 failures (5 minutes total).

Source code in src/mcp_ephemeral_k8s/api/ephemeral_mcp_server.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class KubernetesProbeConfig(BaseModel):
    """Configuration for Kubernetes readiness probe.

    The readiness probe is used to determine when a container is ready to accept traffic.
    These defaults are tuned for MCP server startup, which may take time to install dependencies.

    With defaults: waits 10s initially, then checks every 1s for up to 300 failures (5 minutes total).
    """

    initial_delay_seconds: int = Field(
        default=10, description="Seconds to wait before performing the first probe (allows for container startup)"
    )
    period_seconds: int = Field(default=1, description="How often (in seconds) to perform the probe")
    timeout_seconds: int = Field(default=2, description="Number of seconds after which the probe times out")
    success_threshold: int = Field(
        default=1,
        description="Minimum consecutive successes for the probe to be considered successful after having failed",
    )
    failure_threshold: int = Field(
        default=300,
        description="Number of consecutive failures before giving up. With period_seconds=1, this allows 5 minutes for server startup",
    )

KubernetesRuntime

Bases: StrEnum

The runtime that is being used for Kubeconfig

Source code in src/mcp_ephemeral_k8s/api/ephemeral_mcp_server.py
16
17
18
19
20
class KubernetesRuntime(StrEnum):
    """The runtime that is being used for Kubeconfig"""

    KUBECONFIG = "KUBECONFIG"
    INCLUSTER = "INCLUSTER"

Exceptions

This module contains the exceptions for the MCP ephemeral K8s library.

InvalidKubeConfigError

Bases: Exception

Exception raised when the kube config is invalid.

Source code in src/mcp_ephemeral_k8s/api/exceptions.py
 6
 7
 8
 9
10
11
class InvalidKubeConfigError(Exception):
    """Exception raised when the kube config is invalid."""

    def __init__(self, message: str):
        self.message = f"Invalid kube config: {message}"
        super().__init__(self.message)

MCPInvalidRuntimeError

Bases: ValueError

An error that occurs when the runtime is invalid.

Source code in src/mcp_ephemeral_k8s/api/exceptions.py
38
39
40
41
42
class MCPInvalidRuntimeError(ValueError):
    """An error that occurs when the runtime is invalid."""

    def __init__(self, runtime_exec: str | None, runtime_mcp: str | None, message: str) -> None:
        super().__init__(f"Invalid runtime: {runtime_exec=} and {runtime_mcp=} {message}")

MCPJobError

Bases: Exception

Exception raised when the MCP job is in an error state.

Source code in src/mcp_ephemeral_k8s/api/exceptions.py
61
62
63
64
65
66
class MCPJobError(Exception):
    """Exception raised when the MCP job is in an error state."""

    def __init__(self, namespace: str, job_name: str, message: str):
        self.message = f"MCP job error: {namespace=} {job_name=} - {message}"
        super().__init__(self.message)

MCPJobNotFoundError

Bases: Exception

Exception raised when the MCP job is not found.

Source code in src/mcp_ephemeral_k8s/api/exceptions.py
30
31
32
33
34
35
class MCPJobNotFoundError(Exception):
    """Exception raised when the MCP job is not found."""

    def __init__(self, namespace: str, job_name: str):
        self.message = f"Failed to find MCP job: {namespace=} {job_name=}"
        super().__init__(self.message)

MCPJobTimeoutError

Bases: Exception

Exception raised when the MCP job times out.

Source code in src/mcp_ephemeral_k8s/api/exceptions.py
53
54
55
56
57
58
class MCPJobTimeoutError(Exception):
    """Exception raised when the MCP job times out."""

    def __init__(self, namespace: str, job_name: str):
        self.message = f"MCP job timed out: {namespace=} {job_name=}"
        super().__init__(self.message)

MCPNamespaceNotFoundError

Bases: ValueError

An error that occurs when the namespace is not found.

Source code in src/mcp_ephemeral_k8s/api/exceptions.py
45
46
47
48
49
50
class MCPNamespaceNotFoundError(ValueError):
    """An error that occurs when the namespace is not found."""

    def __init__(self, namespace: str):
        self.message = f"Namespace not found: {namespace}"
        super().__init__(self.message)

MCPPortForwardError

Bases: Exception

Exception raised when the MCP port forward fails.

Source code in src/mcp_ephemeral_k8s/api/exceptions.py
14
15
16
17
18
19
class MCPPortForwardError(Exception):
    """Exception raised when the MCP port forward fails."""

    def __init__(self, job_name: str, namespace: str, port: int):
        self.message = f"Failed to create port forward: {job_name=} {namespace=} {port=}"
        super().__init__(self.message)

MCPServerCreationError

Bases: Exception

Exception raised when the MCP server creation fails.

Source code in src/mcp_ephemeral_k8s/api/exceptions.py
22
23
24
25
26
27
class MCPServerCreationError(Exception):
    """Exception raised when the MCP server creation fails."""

    def __init__(self, message: str):
        self.message = f"Failed to create MCP server: {message}"
        super().__init__(self.message)

Kubernetes Operations

Job Management

check_pod_status(core_v1, job_name, namespace)

Check the status of pods associated with a job.

Parameters:

Name Type Description Default
core_v1 CoreV1Api

The Kubernetes core API client

required
job_name str

Name of the job/pod

required
namespace str

Kubernetes namespace

required

Returns:

Type Description
bool

True if a pod is running and ready (probes successful), False if waiting for pods

Raises:

Type Description
MCPJobError

If a pod is in Failed or Unknown state

Source code in src/mcp_ephemeral_k8s/k8s/job.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def check_pod_status(core_v1: client.CoreV1Api, job_name: str, namespace: str) -> bool:
    """
    Check the status of pods associated with a job.

    Args:
        core_v1: The Kubernetes core API client
        job_name: Name of the job/pod
        namespace: Kubernetes namespace

    Returns:
        True if a pod is running and ready (probes successful), False if waiting for pods

    Raises:
        MCPJobError: If a pod is in Failed or Unknown state
    """
    pods = core_v1.list_namespaced_pod(namespace=namespace, label_selector=f"job-name={job_name}")
    if not pods.items:
        logger.warning(f"No pods found for job '{job_name}', waiting...")
        return False

    for pod in pods.items:
        if not pod.status or not pod.status.phase:
            continue

        # Handle error states
        if pod.status.phase in ["Failed", "Unknown"]:
            _handle_failed_pod(core_v1, pod, namespace, job_name)

        # Handle running pods
        if pod.status.phase == "Running":
            if _is_pod_ready(pod):
                logger.info(f"Job '{job_name}' pod is running and ready (probes successful)")
                return True
            else:
                logger.info(f"Job '{job_name}' pod is running but not ready yet (waiting for probes)")

    return False

create_mcp_server_job(config, namespace, service_account_name=None)

Create a job that will run until explicitly terminated.

Parameters:

Name Type Description Default
config EphemeralMcpServerConfig

The configuration for the MCP server

required
namespace str

Kubernetes namespace

required
service_account_name str | None

Optional ServiceAccount name to use for the pod

None

Returns:

Type Description
V1Job

The MCP server instance

Source code in src/mcp_ephemeral_k8s/k8s/job.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def create_mcp_server_job(
    config: EphemeralMcpServerConfig, namespace: str, service_account_name: str | None = None
) -> client.V1Job:
    """
    Create a job that will run until explicitly terminated.

    Args:
        config: The configuration for the MCP server
        namespace: Kubernetes namespace
        service_account_name: Optional ServiceAccount name to use for the pod

    Returns:
        The MCP server instance
    """
    # Convert environment variables dictionary to list of V1EnvVar
    env_list = [client.V1EnvVar(name=key, value=value) for key, value in (config.env or {}).items()]

    # Configure the job
    job = client.V1Job(
        api_version="batch/v1",
        kind="Job",
        metadata=client.V1ObjectMeta(name=config.job_name, namespace=namespace),
        spec=client.V1JobSpec(
            backoff_limit=10,
            template=client.V1PodTemplateSpec(
                metadata=client.V1ObjectMeta(labels={"app": config.job_name}),
                spec=client.V1PodSpec(
                    service_account_name=service_account_name,
                    containers=[
                        client.V1Container(
                            name=config.job_name,
                            image=config.image,
                            command=config.entrypoint,
                            image_pull_policy="IfNotPresent",
                            args=config.args,
                            resources=client.V1ResourceRequirements(
                                requests=config.resource_requests, limits=config.resource_limits
                            ),
                            ports=[client.V1ContainerPort(container_port=config.port)],
                            env=env_list,
                            readiness_probe=client.V1Probe(
                                tcp_socket=client.V1TCPSocketAction(port=config.port),
                                **config.probe_config.model_dump(),
                            ),
                        )
                    ],
                    restart_policy="Never",
                ),
            ),
        ),
    )

    return job

create_proxy_server(url, **kwargs)

Create a proxy server from a remote MCP server over SSE.

Parameters:

Name Type Description Default
url str

The SSE endpoint URL of the remote MCP server

required
**kwargs Any

Additional keyword arguments for SSETransport configuration - sse_read_timeout: SSE read timeout (default: 300s) - headers: Optional HTTP headers dict - auth: Optional authentication - httpx_client_factory: Optional custom HTTPX client factory

{}

Returns:

Type Description
FastMCP

FastMCP proxy server instance

Example

server = create_proxy_server( ... url="http://pod.default.svc.cluster.local:8080/sse", ... sse_read_timeout=600.0 ... )

Source code in src/mcp_ephemeral_k8s/k8s/job.py
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
def create_proxy_server(url: str, **kwargs: Any) -> FastMCP:
    """Create a proxy server from a remote MCP server over SSE.

    Args:
        url: The SSE endpoint URL of the remote MCP server
        **kwargs: Additional keyword arguments for SSETransport configuration
            - sse_read_timeout: SSE read timeout (default: 300s)
            - headers: Optional HTTP headers dict
            - auth: Optional authentication
            - httpx_client_factory: Optional custom HTTPX client factory

    Returns:
        FastMCP proxy server instance

    Example:
        >>> server = create_proxy_server(
        ...     url="http://pod.default.svc.cluster.local:8080/sse",
        ...     sse_read_timeout=600.0
        ... )
    """
    # Only pass valid SSETransport parameters
    valid_params = {"sse_read_timeout", "headers", "auth", "httpx_client_factory"}
    transport_kwargs = {k: v for k, v in kwargs.items() if k in valid_params}

    logger.debug(f"Creating proxy server for {url} with kwargs: {transport_kwargs}")

    remote_client = Client(SSETransport(url=url, **transport_kwargs))
    return FastMCP.as_proxy(remote_client)

delete_mcp_server_job(core_v1, batch_v1, job_name, namespace)

Delete a Kubernetes job and its associated pods.

Parameters:

Name Type Description Default
core_v1 CoreV1Api

The Kubernetes core API client

required
batch_v1 BatchV1Api

The Kubernetes batch API client

required
job_name str

The name of the pod to delete

required
namespace str

The namespace of the pod

required

Returns:

Type Description
bool

True if the job was deleted successfully, False otherwise

Source code in src/mcp_ephemeral_k8s/k8s/job.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def delete_mcp_server_job(
    core_v1: client.CoreV1Api, batch_v1: client.BatchV1Api, job_name: str, namespace: str
) -> bool:
    """
    Delete a Kubernetes job and its associated pods.

    Args:
        core_v1: The Kubernetes core API client
        batch_v1: The Kubernetes batch API client
        job_name: The name of the pod to delete
        namespace: The namespace of the pod

    Returns:
        True if the job was deleted successfully, False otherwise
    """
    try:
        pods = core_v1.list_namespaced_pod(namespace=namespace, label_selector=f"app={job_name}")
        for pod in pods.items:
            if pod.metadata is None:
                continue
            pod_name_to_delete = pod.metadata.name
            if pod_name_to_delete is None:
                continue
            logger.info(f"Deleting pod {pod_name_to_delete}")
            core_v1.delete_namespaced_pod(
                name=pod_name_to_delete,
                namespace=namespace,
                body=client.V1DeleteOptions(grace_period_seconds=0, propagation_policy="Background"),
            )
    except ApiException:
        logger.exception("Error deleting pods")
        return False
    try:
        batch_v1.delete_namespaced_job(
            name=job_name, namespace=namespace, body=client.V1DeleteOptions(propagation_policy="Foreground")
        )
        logger.info(f"Job '{job_name}' deleted successfully")
    except ApiException:
        logger.exception("Error deleting job")
        return False
    else:
        return True

expose_mcp_server_port(core_v1, job_name, namespace, port)

Expose the MCP server port to the outside world.

Parameters:

Name Type Description Default
core_v1 CoreV1Api

The Kubernetes core API client

required
job_name str

Name of the pod (job name)

required
namespace str

Kubernetes namespace

required
port int

Port to expose

required
Source code in src/mcp_ephemeral_k8s/k8s/job.py
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
def expose_mcp_server_port(core_v1: client.CoreV1Api, job_name: str, namespace: str, port: int) -> None:
    """
    Expose the MCP server port to the outside world.

    Args:
        core_v1: The Kubernetes core API client
        job_name: Name of the pod (job name)
        namespace: Kubernetes namespace
        port: Port to expose
    """
    core_v1.create_namespaced_service(
        namespace=namespace,
        body=client.V1Service(
            metadata=client.V1ObjectMeta(name=job_name),
            spec=client.V1ServiceSpec(
                selector={"job-name": job_name},
                ports=[client.V1ServicePort(port=port)],
            ),
        ),
    )
    logger.info(f"Service '{job_name}' created successfully")

get_mcp_server_job_status(batch_v1, job_name, namespace)

Get the status of a Kubernetes job.

Parameters:

Name Type Description Default
batch_v1 BatchV1Api

The Kubernetes batch API client

required
job_name str

The name of the pod to get the status of

required
namespace str

The namespace of the pod

required

Returns:

Type Description
None | V1Job

The status of the job

Source code in src/mcp_ephemeral_k8s/k8s/job.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def get_mcp_server_job_status(batch_v1: client.BatchV1Api, job_name: str, namespace: str) -> None | client.V1Job:
    """
    Get the status of a Kubernetes job.

    Args:
        batch_v1: The Kubernetes batch API client
        job_name: The name of the pod to get the status of
        namespace: The namespace of the pod

    Returns:
        The status of the job
    """
    try:
        job = cast(client.V1Job, batch_v1.read_namespaced_job(name=job_name, namespace=namespace))

        # Get status
        if job.status is not None:
            active = job.status.active if job.status.active is not None else 0
            succeeded = job.status.succeeded if job.status.succeeded is not None else 0
            failed = job.status.failed if job.status.failed is not None else 0

            logger.info(f"Job '{job_name}' status:")
            logger.info(f"Active pods: {active}")
            logger.info(f"Succeeded pods: {succeeded}")
            logger.info(f"Failed pods: {failed}")

        # Get job creation time
        if job.metadata is not None and job.metadata.creation_timestamp is not None:
            creation_time = job.metadata.creation_timestamp
            logger.info(f"Creation time: {creation_time}")
    except ApiException as e:
        if e.status == 404:
            logger.info(f"Job '{job_name}' not found")
        else:
            logger.info(f"Error getting job status: {e}")
        return None
    else:
        return job

remove_mcp_server_port(core_v1, job_name, namespace)

Remove the MCP server port from the outside world.

Parameters:

Name Type Description Default
core_v1 CoreV1Api

The Kubernetes core API client

required
job_name str

Name of the pod

required
namespace str

Kubernetes namespace

required
Source code in src/mcp_ephemeral_k8s/k8s/job.py
335
336
337
338
339
340
341
342
343
344
345
def remove_mcp_server_port(core_v1: client.CoreV1Api, job_name: str, namespace: str) -> None:
    """
    Remove the MCP server port from the outside world.

    Args:
        core_v1: The Kubernetes core API client
        job_name: Name of the pod
        namespace: Kubernetes namespace
    """
    core_v1.delete_namespaced_service(name=job_name, namespace=namespace)
    logger.info(f"Service '{job_name}' deleted successfully")

wait_for_job_deletion(batch_v1, job_name, namespace, sleep_time=1, max_wait_time=60) async

Wait for a job to be deleted.

Parameters:

Name Type Description Default
batch_v1 BatchV1Api

The Kubernetes batch API client

required
job_name str

Name of the pod

required
namespace str

Kubernetes namespace

required
sleep_time float

Time to sleep between checks

1
max_wait_time float

Maximum time to wait before timing out

60

Raises:

Type Description
MCPJobTimeoutError

If the job is not deleted within max_wait_time

Source code in src/mcp_ephemeral_k8s/k8s/job.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
async def wait_for_job_deletion(
    batch_v1: client.BatchV1Api, job_name: str, namespace: str, sleep_time: float = 1, max_wait_time: float = 60
) -> None:
    """
    Wait for a job to be deleted.

    Args:
        batch_v1: The Kubernetes batch API client
        job_name: Name of the pod
        namespace: Kubernetes namespace
        sleep_time: Time to sleep between checks
        max_wait_time: Maximum time to wait before timing out

    Raises:
        MCPJobTimeoutError: If the job is not deleted within max_wait_time
    """
    start_time = time.time()
    while True:
        if time.time() - start_time > max_wait_time:
            raise MCPJobTimeoutError(namespace, job_name)
        if get_mcp_server_job_status(batch_v1, job_name, namespace) is None:
            break
        await asyncio.sleep(sleep_time)

wait_for_job_ready(batch_v1, core_v1, job_name, namespace, sleep_time=1, max_wait_time=60) async

Wait for a job's pod to be in the running state and ready (probes successful).

Parameters:

Name Type Description Default
batch_v1 BatchV1Api

The Kubernetes batch API client

required
core_v1 CoreV1Api

The Kubernetes core API client

required
job_name str

Name of the pod

required
namespace str

Kubernetes namespace

required
sleep_time float

Time to sleep between checks

1
max_wait_time float

Maximum time to wait before timing out

60

Raises:

Type Description
MCPJobTimeoutError

If the job does not become ready within max_wait_time

Source code in src/mcp_ephemeral_k8s/k8s/job.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
async def wait_for_job_ready(
    batch_v1: client.BatchV1Api,
    core_v1: client.CoreV1Api,
    job_name: str,
    namespace: str,
    sleep_time: float = 1,
    max_wait_time: float = 60,
) -> None:
    """
    Wait for a job's pod to be in the running state and ready (probes successful).

    Args:
        batch_v1: The Kubernetes batch API client
        core_v1: The Kubernetes core API client
        job_name: Name of the pod
        namespace: Kubernetes namespace
        sleep_time: Time to sleep between checks
        max_wait_time: Maximum time to wait before timing out

    Raises:
        MCPJobTimeoutError: If the job does not become ready within max_wait_time
    """
    start_time = time.time()
    while True:
        if time.time() - start_time > max_wait_time:
            raise MCPJobTimeoutError(namespace, job_name)

        job = get_mcp_server_job_status(batch_v1, job_name, namespace)
        if job is None:
            logger.warning(f"Job '{job_name}' not found, waiting for pod to become ready...")
            await asyncio.sleep(sleep_time)
            continue

        if job.status is None:
            logger.warning(f"Job '{job_name}' status is None, waiting for pod to become ready...")
            await asyncio.sleep(sleep_time)
            continue

        # Check if any pod is in running state and ready
        if check_pod_status(core_v1, job_name, namespace):
            break

        if job.status.active == 1:
            logger.info(f"Job '{job_name}' active")
        else:
            logger.warning(f"Job '{job_name}' in unknown state, waiting...")

        await asyncio.sleep(sleep_time)

RBAC Management

Helper functions for managing RBAC resources for spawned MCP server pods.

RBACPreset

Bases: StrEnum

Preset RBAC configurations for service accounts.

Source code in src/mcp_ephemeral_k8s/k8s/rbac.py
14
15
16
17
18
class RBACPreset(StrEnum):
    """Preset RBAC configurations for service accounts."""

    MINIMAL = "minimal"
    EXTENSIVE = "extensive"

ServiceAccountConfig

Bases: BaseModel

Configuration for ServiceAccount RBAC permissions.

Source code in src/mcp_ephemeral_k8s/k8s/rbac.py
29
30
31
32
33
34
35
36
37
38
39
class ServiceAccountConfig(BaseModel):
    """Configuration for ServiceAccount RBAC permissions."""

    preset: RBACPreset = Field(
        default=RBACPreset.MINIMAL,
        description="The RBAC preset to use for the service account",
    )
    cluster_wide: bool = Field(
        default=True,
        description="Whether to create ClusterRole/ClusterRoleBinding (True) or Role/RoleBinding (False)",
    )

UnknownRBACPresetError

Bases: ValueError

Exception raised when an unknown RBAC preset is encountered.

Source code in src/mcp_ephemeral_k8s/k8s/rbac.py
21
22
23
24
25
26
class UnknownRBACPresetError(ValueError):
    """Exception raised when an unknown RBAC preset is encountered."""

    def __init__(self, preset: RBACPreset) -> None:
        self.preset = preset
        super().__init__(f"Unknown RBAC preset: {preset}")

create_service_account_for_job(core_v1, rbac_v1, job_name, namespace, cluster_wide=True, sa_config=None)

Create a ServiceAccount and RBAC resources for a spawned MCP server job.

This creates: - A ServiceAccount - A Role/ClusterRole with permissions based on the ServiceAccountConfig - A RoleBinding/ClusterRoleBinding

Parameters:

Name Type Description Default
core_v1 CoreV1Api

The Kubernetes core API client

required
rbac_v1 RbacAuthorizationV1Api

The Kubernetes RBAC API client

required
job_name str

The name of the job (used for naming resources)

required
namespace str

Kubernetes namespace

required
cluster_wide bool

Whether to create ClusterRole/ClusterRoleBinding (default: True) Note: This parameter is deprecated when sa_config is provided

True
sa_config ServiceAccountConfig | None

ServiceAccount configuration with RBAC preset (default: minimal preset)

None

Returns:

Type Description
str

The name of the created ServiceAccount

Source code in src/mcp_ephemeral_k8s/k8s/rbac.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
def create_service_account_for_job(
    core_v1: client.CoreV1Api,
    rbac_v1: client.RbacAuthorizationV1Api,
    job_name: str,
    namespace: str,
    cluster_wide: bool = True,
    sa_config: ServiceAccountConfig | None = None,
) -> str:
    """
    Create a ServiceAccount and RBAC resources for a spawned MCP server job.

    This creates:
    - A ServiceAccount
    - A Role/ClusterRole with permissions based on the ServiceAccountConfig
    - A RoleBinding/ClusterRoleBinding

    Args:
        core_v1: The Kubernetes core API client
        rbac_v1: The Kubernetes RBAC API client
        job_name: The name of the job (used for naming resources)
        namespace: Kubernetes namespace
        cluster_wide: Whether to create ClusterRole/ClusterRoleBinding (default: True)
                     Note: This parameter is deprecated when sa_config is provided
        sa_config: ServiceAccount configuration with RBAC preset (default: minimal preset)

    Returns:
        The name of the created ServiceAccount
    """
    service_account_name = f"{job_name}-sa"

    # Create ServiceAccount
    _create_service_account(core_v1, service_account_name, namespace, job_name)

    # Use ServiceAccountConfig if provided, otherwise use defaults
    if sa_config is None:
        sa_config = ServiceAccountConfig()

    rules = _get_rbac_rules_by_preset(sa_config.preset)

    # Create Role/ClusterRole and RoleBinding/ClusterRoleBinding
    if sa_config.cluster_wide:
        _create_cluster_role_and_binding(rbac_v1, job_name, namespace, service_account_name, rules)
    else:
        _create_role_and_binding(rbac_v1, job_name, namespace, service_account_name, rules)

    return service_account_name

delete_service_account_for_job(core_v1, rbac_v1, job_name, namespace, cluster_wide=True)

Delete the ServiceAccount and RBAC resources for a spawned MCP server job.

Parameters:

Name Type Description Default
core_v1 CoreV1Api

The Kubernetes core API client

required
rbac_v1 RbacAuthorizationV1Api

The Kubernetes RBAC API client

required
job_name str

The name of the job

required
namespace str

Kubernetes namespace

required
cluster_wide bool

Whether ClusterRole/ClusterRoleBinding were created (default: True)

True

Returns:

Type Description
bool

True if all resources were deleted successfully, False otherwise

Source code in src/mcp_ephemeral_k8s/k8s/rbac.py
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
def delete_service_account_for_job(
    core_v1: client.CoreV1Api,
    rbac_v1: client.RbacAuthorizationV1Api,
    job_name: str,
    namespace: str,
    cluster_wide: bool = True,
) -> bool:
    """
    Delete the ServiceAccount and RBAC resources for a spawned MCP server job.

    Args:
        core_v1: The Kubernetes core API client
        rbac_v1: The Kubernetes RBAC API client
        job_name: The name of the job
        namespace: Kubernetes namespace
        cluster_wide: Whether ClusterRole/ClusterRoleBinding were created (default: True)

    Returns:
        True if all resources were deleted successfully, False otherwise
    """
    service_account_name = f"{job_name}-sa"

    # Delete Role/ClusterRole and RoleBinding/ClusterRoleBinding
    if cluster_wide:
        rbac_success = _delete_cluster_role_and_binding(rbac_v1, job_name)
    else:
        rbac_success = _delete_role_and_binding(rbac_v1, job_name, namespace)

    # Delete ServiceAccount
    sa_success = _delete_service_account(core_v1, service_account_name, namespace)

    return rbac_success and sa_success

Utilities

This module contains a utility function to generate unique identifiers for MCP ephemeral K8s resources based on RFC 1123 Label Names.

generate_unique_id(prefix=None, max_length=63)

Generate a unique identifier that follows the Kubernetes naming rules (RFC 1123 Label Names).

RFC 1123 Label Names must: - Contain only lowercase alphanumeric characters or '-' - Start with an alphanumeric character - End with an alphanumeric character - Be at most 63 characters

Parameters:

Name Type Description Default
prefix str | None

Optional prefix for the ID. Will be converted to lowercase and non-compliant characters will be replaced with dashes.

None
max_length int

Maximum length of the generated ID, defaults to 63 (K8s limit).

63

Returns:

Type Description
str

A unique RFC 1123 compliant identifier string.

Source code in src/mcp_ephemeral_k8s/k8s/uid.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def generate_unique_id(prefix: str | None = None, max_length: int = 63) -> str:
    """
    Generate a unique identifier that follows the Kubernetes naming rules (RFC 1123 Label Names).

    RFC 1123 Label Names must:
    - Contain only lowercase alphanumeric characters or '-'
    - Start with an alphanumeric character
    - End with an alphanumeric character
    - Be at most 63 characters

    Args:
        prefix: Optional prefix for the ID. Will be converted to lowercase and non-compliant
                characters will be replaced with dashes.
        max_length: Maximum length of the generated ID, defaults to 63 (K8s limit).

    Returns:
        A unique RFC 1123 compliant identifier string.
    """
    # Process prefix if provided
    processed_prefix = ""
    if prefix:
        # Convert to lowercase and replace invalid characters
        processed_prefix = "".join(
            c if c.isalnum() and c.islower() else (c.lower() if c.isalnum() else "-") for c in prefix
        )

        # Ensure prefix starts with alphanumeric
        if processed_prefix and not processed_prefix[0].isalnum():
            processed_prefix = f"p{processed_prefix}"

        # Add separator
        if processed_prefix:
            processed_prefix = f"{processed_prefix}-"

    # Generate a unique part (timestamp + random)
    timestamp = str(int(time.time()))
    random_chars = "".join(random.choices(string.ascii_lowercase + string.digits, k=8))  # noqa: S311
    unique_part = f"{timestamp}-{random_chars}"

    # Combine and ensure max length
    full_id = f"{processed_prefix}{unique_part}"
    if len(full_id) > max_length:
        # If too long, truncate the ID but keep the random part
        chars_to_keep = max_length - len(random_chars) - 1
        full_id = f"{full_id[:chars_to_keep]}-{random_chars}"

    # Ensure ID ends with alphanumeric
    if not full_id[-1].isalnum():
        full_id = f"{full_id[:-1]}{random.choice(string.ascii_lowercase)}"  # noqa: S311

    return full_id

Integrations & Presets