Skip to content

Modules

This module contains the session manager for the MCP ephemeral K8s library. It is used to create and manage MCP servers in a Kubernetes cluster.

KubernetesSessionManager

Bases: BaseModel

Kubernetes session manager for MCP.

This manager creates and manages Kubernetes jobs for MCP sessions. It implements the async context manager protocol for easy resource management.

Source code in src/mcp_ephemeral_k8s/session_manager.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
class KubernetesSessionManager(BaseModel):
    """
    Kubernetes session manager for MCP.

    This manager creates and manages Kubernetes jobs for MCP sessions.
    It implements the async context manager protocol for easy resource management.
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)

    namespace: str = Field(default="default", description="The namespace to create resources in")
    jobs: dict[str, EphemeralMcpServer] = Field(
        default_factory=dict,
        description="A dictionary mapping between pod names and MCP servers jobs that are running.",
    )
    runtime: KubernetesRuntime = Field(
        default=KubernetesRuntime.KUBECONFIG, description="The runtime to use for the MCP server"
    )
    sleep_time: float = Field(default=1, description="The time to sleep between job status checks")
    max_wait_time: float = Field(default=300, description="The maximum time to wait for a job to complete")
    _api_client: ApiClient = PrivateAttr()
    _batch_v1: BatchV1Api = PrivateAttr()
    _core_v1: CoreV1Api = PrivateAttr()

    def load_session_manager(self) -> Self:
        """Load Kubernetes configuration from default location or from service account if running in cluster."""
        self._load_kube_config()
        if not hasattr(self, "_api_client"):
            self._api_client = ApiClient()
        if not hasattr(self, "_batch_v1"):
            self._batch_v1 = BatchV1Api(self._api_client)
        if not hasattr(self, "_core_v1"):
            self._core_v1 = CoreV1Api(self._api_client)
        # check if the configured namespace exists
        namespaces = self._core_v1.list_namespace().items
        if self.namespace not in [namespace.metadata.name for namespace in namespaces if namespace.metadata]:
            raise MCPNamespaceNotFoundError(self.namespace)
        return self

    def _load_kube_config(self) -> None:
        """Load Kubernetes configuration from default location or from service account if running in cluster."""
        if self.runtime == KubernetesRuntime.KUBECONFIG:
            try:
                load_kube_config(
                    config_file=os.environ.get("KUBECONFIG"),
                    context=os.environ.get("KUBECONTEXT"),
                    client_configuration=None,
                    persist_config=False,
                )
                logger.info("Using local kubernetes configuration")
                return  # noqa: TRY300
            except Exception:
                logger.warning("Failed to load local kubernetes configuration, trying in-cluster configuration")
                self.runtime = KubernetesRuntime.INCLUSTER
        if self.runtime == KubernetesRuntime.INCLUSTER:
            load_incluster_config()
            logger.info("Using in-cluster kubernetes configuration")
            return
        raise InvalidKubeConfigError(self.runtime)

    def __enter__(self) -> Self:
        """Enter the context manager."""
        self.load_session_manager()
        return self

    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        """Exit the context manager."""
        for job_name in self.jobs:
            self._delete_job(job_name)

    def _create_job(self, config: EphemeralMcpServerConfig) -> EphemeralMcpServer:
        """
        Create a job that will run until explicitly terminated.

        Args:
            config: The configuration for the MCP servers

        Returns:
            The MCP server instance
        """
        job = create_mcp_server_job(config, self.namespace)
        response = self._batch_v1.create_namespaced_job(namespace=self.namespace, body=job)
        logger.info(f"Job '{config.job_name}' created successfully")
        logger.debug(f"Job response: {response}")
        if not response.metadata or not response.metadata.name:
            raise MCPServerCreationError(str(response.metadata))
        return EphemeralMcpServer(config=config, pod_name=response.metadata.name)

    def _get_job_status(self, pod_name: str) -> None | client.V1Job:
        """
        Get current status of a job.

        Args:
            pod_name: Name of the pod

        Returns:
            The job status
        """
        return get_mcp_server_job_status(self._batch_v1, pod_name, self.namespace)

    def _check_pod_status(self, pod_name: str) -> bool:
        """
        Check the status of pods associated with a job.

        Args:
            pod_name: Name of the job/pod

        Returns:
            True if a pod is running and ready (probes successful), False if waiting for pods

        Raises:
            MCPJobError: If a pod is in Failed or Unknown state
        """
        return check_pod_status(self._core_v1, pod_name, self.namespace)

    def _wait_for_job_ready(self, pod_name: str) -> None:
        """Wait for a job's pod to be in the running state and ready (probes successful).

        Args:
            pod_name: Name of the job/pod
        """
        wait_for_job_ready(self._batch_v1, self._core_v1, pod_name, self.namespace, self.sleep_time, self.max_wait_time)

    def _wait_for_job_deletion(self, pod_name: str) -> None:
        """Wait for a job to be deleted.

        Args:
            pod_name: Name of the job/pod
        """
        wait_for_job_deletion(self._batch_v1, pod_name, self.namespace, self.sleep_time, self.max_wait_time)

    def _delete_job(self, pod_name: str) -> bool:
        """
        Delete a Kubernetes job and its associated pods.

        Args:
            pod_name: Name of the job/pod

        Returns:
            True if the job was deleted successfully, False otherwise
        """
        try:
            self.remove_mcp_server_port(self.jobs[pod_name])
        except Exception:
            logger.warning(f"Failed to remove MCP server port for job {pod_name}")
        return delete_mcp_server_job(self._core_v1, self._batch_v1, pod_name, self.namespace)

    def create_mcp_server(
        self, config: EphemeralMcpServerConfig, wait_for_ready: bool = True, expose_port: bool = False
    ) -> EphemeralMcpServer:
        """Start a new MCP server using the provided configuration.

        Args:
            config: The configuration for the MCP servers
            wait_for_ready: Whether to wait for the job to be ready

        Returns:
            The MCP server instance
        """
        mcp_server = self._create_job(config)
        self.jobs[mcp_server.pod_name] = mcp_server
        if wait_for_ready:
            self._wait_for_job_ready(mcp_server.pod_name)
        if expose_port:
            self.expose_mcp_server_port(mcp_server)
        return mcp_server

    def delete_mcp_server(self, pod_name: str, wait_for_deletion: bool = True) -> EphemeralMcpServer:
        """Delete the MCP server.

        Args:
            pod_name: Name of the job/pod
            wait_for_deletion: Whether to wait for the job to be deleted

        Returns:
            The MCP server instance
        """
        if pod_name in self.jobs:
            self._delete_job(pod_name)
            if wait_for_deletion:
                self._wait_for_job_deletion(pod_name)
            config = self.jobs[pod_name].config
            result = EphemeralMcpServer(config=config, pod_name=pod_name)
            del self.jobs[pod_name]
            return result
        raise MCPJobNotFoundError(self.namespace, pod_name)

    def expose_mcp_server_port(self, mcp_server: EphemeralMcpServer) -> None:
        """Expose the MCP server port to the outside world."""
        expose_mcp_server_port(self._core_v1, mcp_server.pod_name, self.namespace, mcp_server.config.port)

    def remove_mcp_server_port(self, mcp_server: EphemeralMcpServer) -> None:
        """Remove the MCP server port from the outside world."""
        remove_mcp_server_port(self._core_v1, mcp_server.pod_name, self.namespace)

__enter__()

Enter the context manager.

Source code in src/mcp_ephemeral_k8s/session_manager.py
 99
100
101
102
def __enter__(self) -> Self:
    """Enter the context manager."""
    self.load_session_manager()
    return self

__exit__(exc_type, exc_val, exc_tb)

Exit the context manager.

Source code in src/mcp_ephemeral_k8s/session_manager.py
104
105
106
107
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
    """Exit the context manager."""
    for job_name in self.jobs:
        self._delete_job(job_name)

create_mcp_server(config, wait_for_ready=True, expose_port=False)

Start a new MCP server using the provided configuration.

Parameters:

Name Type Description Default
config EphemeralMcpServerConfig

The configuration for the MCP servers

required
wait_for_ready bool

Whether to wait for the job to be ready

True

Returns:

Type Description
EphemeralMcpServer

The MCP server instance

Source code in src/mcp_ephemeral_k8s/session_manager.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def create_mcp_server(
    self, config: EphemeralMcpServerConfig, wait_for_ready: bool = True, expose_port: bool = False
) -> EphemeralMcpServer:
    """Start a new MCP server using the provided configuration.

    Args:
        config: The configuration for the MCP servers
        wait_for_ready: Whether to wait for the job to be ready

    Returns:
        The MCP server instance
    """
    mcp_server = self._create_job(config)
    self.jobs[mcp_server.pod_name] = mcp_server
    if wait_for_ready:
        self._wait_for_job_ready(mcp_server.pod_name)
    if expose_port:
        self.expose_mcp_server_port(mcp_server)
    return mcp_server

delete_mcp_server(pod_name, wait_for_deletion=True)

Delete the MCP server.

Parameters:

Name Type Description Default
pod_name str

Name of the job/pod

required
wait_for_deletion bool

Whether to wait for the job to be deleted

True

Returns:

Type Description
EphemeralMcpServer

The MCP server instance

Source code in src/mcp_ephemeral_k8s/session_manager.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def delete_mcp_server(self, pod_name: str, wait_for_deletion: bool = True) -> EphemeralMcpServer:
    """Delete the MCP server.

    Args:
        pod_name: Name of the job/pod
        wait_for_deletion: Whether to wait for the job to be deleted

    Returns:
        The MCP server instance
    """
    if pod_name in self.jobs:
        self._delete_job(pod_name)
        if wait_for_deletion:
            self._wait_for_job_deletion(pod_name)
        config = self.jobs[pod_name].config
        result = EphemeralMcpServer(config=config, pod_name=pod_name)
        del self.jobs[pod_name]
        return result
    raise MCPJobNotFoundError(self.namespace, pod_name)

expose_mcp_server_port(mcp_server)

Expose the MCP server port to the outside world.

Source code in src/mcp_ephemeral_k8s/session_manager.py
226
227
228
def expose_mcp_server_port(self, mcp_server: EphemeralMcpServer) -> None:
    """Expose the MCP server port to the outside world."""
    expose_mcp_server_port(self._core_v1, mcp_server.pod_name, self.namespace, mcp_server.config.port)

load_session_manager()

Load Kubernetes configuration from default location or from service account if running in cluster.

Source code in src/mcp_ephemeral_k8s/session_manager.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def load_session_manager(self) -> Self:
    """Load Kubernetes configuration from default location or from service account if running in cluster."""
    self._load_kube_config()
    if not hasattr(self, "_api_client"):
        self._api_client = ApiClient()
    if not hasattr(self, "_batch_v1"):
        self._batch_v1 = BatchV1Api(self._api_client)
    if not hasattr(self, "_core_v1"):
        self._core_v1 = CoreV1Api(self._api_client)
    # check if the configured namespace exists
    namespaces = self._core_v1.list_namespace().items
    if self.namespace not in [namespace.metadata.name for namespace in namespaces if namespace.metadata]:
        raise MCPNamespaceNotFoundError(self.namespace)
    return self

remove_mcp_server_port(mcp_server)

Remove the MCP server port from the outside world.

Source code in src/mcp_ephemeral_k8s/session_manager.py
230
231
232
def remove_mcp_server_port(self, mcp_server: EphemeralMcpServer) -> None:
    """Remove the MCP server port from the outside world."""
    remove_mcp_server_port(self._core_v1, mcp_server.pod_name, self.namespace)

MCP server application, meant to be used as an MCP server that can spawn other MCP servers.

create_mcp_server(ctx, runtime_exec, runtime_mcp, env=None, wait_for_ready=False)

Create a new MCP server.

Parameters:

Name Type Description Default
runtime_exec str

The runtime to use for the MCP server (e.g. "uvx", "npx", "go run").

required
runtime_mcp str

The runtime to use for the MCP server (e.g. "mcp-server-fetch").

required
env dict[str, str] | None

The environment variables to set for the MCP server.

None
wait_for_ready bool

Whether to wait for the MCP server to be ready before returning.

False
Source code in src/mcp_ephemeral_k8s/app/mcp.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@mcp.tool("create_mcp_server")
def create_mcp_server(
    ctx: Context,
    runtime_exec: str,
    runtime_mcp: str,
    env: dict[str, str] | None = None,
    wait_for_ready: bool = False,
) -> EphemeralMcpServer:
    """Create a new MCP server.

    Args:
        runtime_exec: The runtime to use for the MCP server (e.g. "uvx", "npx", "go run").
        runtime_mcp: The runtime to use for the MCP server (e.g. "mcp-server-fetch").
        env: The environment variables to set for the MCP server.
        wait_for_ready: Whether to wait for the MCP server to be ready before returning.
    """
    config = EphemeralMcpServerConfig(runtime_exec=runtime_exec, runtime_mcp=runtime_mcp, env=env)
    session_manager: KubernetesSessionManager = ctx.request_context.lifespan_context
    return session_manager.create_mcp_server(config, wait_for_ready=wait_for_ready)

delete_mcp_server(ctx, pod_name, wait_for_deletion=False)

Delete an MCP server.

Parameters:

Name Type Description Default
pod_name str

The name of the MCP server to delete.

required
wait_for_deletion bool

Whether to wait for the MCP server to be deleted before returning.

False
Source code in src/mcp_ephemeral_k8s/app/mcp.py
67
68
69
70
71
72
73
74
75
76
@mcp.tool("delete_mcp_server")
def delete_mcp_server(ctx: Context, pod_name: str, wait_for_deletion: bool = False) -> EphemeralMcpServer:
    """Delete an MCP server.

    Args:
        pod_name: The name of the MCP server to delete.
        wait_for_deletion: Whether to wait for the MCP server to be deleted before returning.
    """
    session_manager: KubernetesSessionManager = ctx.request_context.lifespan_context
    return session_manager.delete_mcp_server(pod_name, wait_for_deletion=wait_for_deletion)

get_mcp_server_status(ctx, pod_name)

Get the status of an MCP server.

Parameters:

Name Type Description Default
pod_name str

The name of the MCP server to get the status of.

required
Source code in src/mcp_ephemeral_k8s/app/mcp.py
79
80
81
82
83
84
85
86
87
@mcp.tool("get_mcp_server_status")
def get_mcp_server_status(ctx: Context, pod_name: str) -> client.V1Job | None:
    """Get the status of an MCP server.

    Args:
        pod_name: The name of the MCP server to get the status of.
    """
    session_manager: KubernetesSessionManager = ctx.request_context.lifespan_context
    return session_manager._get_job_status(pod_name)

get_version()

Get the version of the MCP ephemeral server.

Source code in src/mcp_ephemeral_k8s/app/mcp.py
26
27
28
29
@mcp.resource("config://version")
def get_version() -> str:
    """Get the version of the MCP ephemeral server."""
    return __version__

lifespan(server) async

Lifecycle hooks for the MCP ephemeral server.

Source code in src/mcp_ephemeral_k8s/app/mcp.py
13
14
15
16
17
18
19
@asynccontextmanager
async def lifespan(server: FastMCP) -> AsyncIterator[KubernetesSessionManager]:
    """
    Lifecycle hooks for the MCP ephemeral server.
    """
    with KubernetesSessionManager(namespace="default", jobs={}, sleep_time=1, max_wait_time=60) as session_manager:
        yield session_manager

list_mcp_servers(ctx)

List all running MCP servers.

Source code in src/mcp_ephemeral_k8s/app/mcp.py
39
40
41
42
43
@mcp.tool("list_mcp_servers")
def list_mcp_servers(ctx: Context) -> list[EphemeralMcpServer]:
    """List all running MCP servers."""
    session_manager: KubernetesSessionManager = ctx.request_context.lifespan_context
    return list(session_manager.jobs.values())

list_presets()

List all preset configurations.

Source code in src/mcp_ephemeral_k8s/app/mcp.py
33
34
35
36
@mcp.resource("config://presets")
def list_presets() -> list[EphemeralMcpServerConfig]:
    """List all preset configurations."""
    return [presets.FETCH, presets.GITHUB, presets.GITLAB, presets.GIT, presets.TIME, presets.BEDROCK_KB_RETRIEVAL]

FastAPI application for the MCP ephemeral server.

lifespan(app) async

Lifecycle hooks for the MCP ephemeral server.

Source code in src/mcp_ephemeral_k8s/app/fastapi.py
14
15
16
17
18
19
20
21
22
23
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
    """
    Lifecycle hooks for the MCP ephemeral server.
    """
    with KubernetesSessionManager() as session_manager:
        app.state.session_manager = session_manager
        yield
    # the session manager will be deleted when the context manager is exited
    del app.state.session_manager

This module contains the models for the MCP ephemeral K8s library.

EphemeralMcpServer

Bases: BaseModel

The MCP server that is running in a Kubernetes pod.

Source code in src/mcp_ephemeral_k8s/api/ephemeral_mcp_server.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
class EphemeralMcpServer(BaseModel):
    """The MCP server that is running in a Kubernetes pod."""

    pod_name: str = Field(
        description="The name of the pod that is running the MCP server", examples=["mcp-ephemeral-k8s-proxy-test"]
    )
    config: EphemeralMcpServerConfig = Field(
        description="The configuration that was used to create the MCP server",
        examples=[
            EphemeralMcpServerConfig(
                runtime_exec="uvx",
                runtime_mcp="mcp-server-fetch",
                port=8000,
                cors_origins=["*"],
            )
        ],
    )

    @computed_field  # type: ignore[prop-decorator]
    @property
    def url(self) -> HttpUrl:
        """The Uniform Resource Locator (URL) for the MCP server."""
        return HttpUrl(f"http://{self.pod_name}.default.svc.cluster.local:{self.config.port}/")

    @computed_field  # type: ignore[prop-decorator]
    @property
    def sse_url(self) -> HttpUrl:
        """The Server-Sent Events (SSE) URL for the MCP server."""
        return HttpUrl(f"{self.url}sse")

sse_url property

The Server-Sent Events (SSE) URL for the MCP server.

url property

The Uniform Resource Locator (URL) for the MCP server.

EphemeralMcpServerConfig

Bases: BaseModel

Configuration for Kubernetes resources.

Source code in src/mcp_ephemeral_k8s/api/ephemeral_mcp_server.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
class EphemeralMcpServerConfig(BaseModel):
    """Configuration for Kubernetes resources."""

    runtime_exec: str | None = Field(
        description="The runtime to use for the MCP container. When None, the image is assumed to be a MCP server instead of a proxy.",
        examples=["uvx", "npx"],
    )
    runtime_mcp: str | None = Field(
        description="The runtime to use for the MCP container. Can be any supported MCP server runtime loadable via the `runtime_exec`. See the [MCP Server Runtimes](https://github.com/modelcontextprotocol/servers/tree/main) for a list of supported runtimes.",
        examples=["mcp-server-fetch", "@modelcontextprotocol/server-github"],
    )
    image: str = Field(
        default="ghcr.io/bobmerkus/mcp-ephemeral-k8s-proxy:latest",
        description="The image to use for the MCP server proxy",
    )
    entrypoint: list[str] | None = Field(
        default=["mcp-proxy"],
        description="The entrypoint for the MCP container. Normally not changed unless a custom image is used.",
    )
    host: str = Field(default="0.0.0.0", description="The host to expose the MCP server on")  # noqa: S104
    port: int = Field(default=8080, description="The port to expose the MCP server on")
    resource_requests: dict[str, str] = Field(
        default={"cpu": "100m", "memory": "100Mi"}, description="Resource requests for the container"
    )
    resource_limits: dict[str, str] = Field(
        default={"cpu": "200m", "memory": "200Mi"}, description="Resource limits for the container"
    )
    env: dict[str, str] | None = Field(
        default=None,
        description="Environment variables to set for the container",
        examples=[None, {"GITHUB_PERSONAL_ACCESS_TOKEN": "1234567890", "GITHUB_DYNAMIC_TOOLSETS": "1"}],
    )
    cors_origins: list[str] | None = Field(
        default=["*"],
        description="The origins to allow CORS from",
        examples=["*"],
    )
    probe_config: KubernetesProbeConfig = Field(
        default_factory=KubernetesProbeConfig,
        description="The configuration for the Kubernetes probe",
    )

    @model_validator(mode="after")
    def validate_runtime_exec(self) -> Self:
        """Validate the runtime configuration.
        Both runtime_exec and runtime_mcp must be specified, or neither.
        """
        if self.runtime_exec is not None and self.runtime_mcp is None:
            message = "Invalid runtime: runtime_exec is specified but runtime_mcp is not"
            raise MCPInvalidRuntimeError(runtime_exec=self.runtime_exec, runtime_mcp=self.runtime_mcp, message=message)
        if self.runtime_exec is None and self.runtime_mcp is not None:
            message = "Invalid runtime: runtime_mcp is specified but runtime_exec is not"
            raise MCPInvalidRuntimeError(runtime_exec=self.runtime_exec, runtime_mcp=self.runtime_mcp, message=message)
        return self

    @computed_field  # type: ignore[prop-decorator]
    @property
    def args(self) -> list[str] | None:
        """The arguments to pass to the MCP server.
        [mcp-proxy](https://github.com/sparfenyuk/mcp-proxy?tab=readme-ov-file#21-configuration)"""
        if self.runtime_exec is not None and self.runtime_mcp is not None:
            args = [
                "--pass-environment",
                f"--sse-port={self.port}",
                f"--sse-host={self.host}",
                self.runtime_exec,
                self.runtime_mcp,
            ]
            if self.cors_origins is not None:
                args.extend(["--allow-origin", *self.cors_origins])
            return args
        return None

    @computed_field  # type: ignore[prop-decorator]
    @property
    def image_name(self) -> str:
        """The name of the image to use for the MCP server."""
        return self.image.split("/")[-1].split(":")[0]

    @computed_field  # type: ignore[prop-decorator]
    @property
    def job_name(self) -> str:
        """The name of the job to use for the MCP server."""
        return generate_unique_id(prefix=self.image_name)

    @classmethod
    def from_docker_image(cls, image: str, entrypoint: list[str] | None = None, **kwargs: Any) -> Self:
        """Create an EphemeralMcpServerConfig from a Docker image.
        The image must be a MCP server image, otherwise an error is raised.
        """
        if image.startswith("ghcr.io/bobmerkus/mcp-ephemeral-k8s-proxy") or image.startswith(
            "ghcr.io/sparfenyuk/mcp-proxy"
        ):
            message = "Invalid runtime: image is a proxy image, please use the `runtime_exec` and `runtime_mcp` fields to specify the MCP server to use."
            raise MCPInvalidRuntimeError(runtime_exec=None, runtime_mcp=None, message=message)
        return cls(image=image, entrypoint=entrypoint, runtime_exec=None, runtime_mcp=None, **kwargs)

args property

The arguments to pass to the MCP server. mcp-proxy

image_name property

The name of the image to use for the MCP server.

job_name property

The name of the job to use for the MCP server.

from_docker_image(image, entrypoint=None, **kwargs) classmethod

Create an EphemeralMcpServerConfig from a Docker image. The image must be a MCP server image, otherwise an error is raised.

Source code in src/mcp_ephemeral_k8s/api/ephemeral_mcp_server.py
116
117
118
119
120
121
122
123
124
125
126
@classmethod
def from_docker_image(cls, image: str, entrypoint: list[str] | None = None, **kwargs: Any) -> Self:
    """Create an EphemeralMcpServerConfig from a Docker image.
    The image must be a MCP server image, otherwise an error is raised.
    """
    if image.startswith("ghcr.io/bobmerkus/mcp-ephemeral-k8s-proxy") or image.startswith(
        "ghcr.io/sparfenyuk/mcp-proxy"
    ):
        message = "Invalid runtime: image is a proxy image, please use the `runtime_exec` and `runtime_mcp` fields to specify the MCP server to use."
        raise MCPInvalidRuntimeError(runtime_exec=None, runtime_mcp=None, message=message)
    return cls(image=image, entrypoint=entrypoint, runtime_exec=None, runtime_mcp=None, **kwargs)

validate_runtime_exec()

Validate the runtime configuration. Both runtime_exec and runtime_mcp must be specified, or neither.

Source code in src/mcp_ephemeral_k8s/api/ephemeral_mcp_server.py
73
74
75
76
77
78
79
80
81
82
83
84
@model_validator(mode="after")
def validate_runtime_exec(self) -> Self:
    """Validate the runtime configuration.
    Both runtime_exec and runtime_mcp must be specified, or neither.
    """
    if self.runtime_exec is not None and self.runtime_mcp is None:
        message = "Invalid runtime: runtime_exec is specified but runtime_mcp is not"
        raise MCPInvalidRuntimeError(runtime_exec=self.runtime_exec, runtime_mcp=self.runtime_mcp, message=message)
    if self.runtime_exec is None and self.runtime_mcp is not None:
        message = "Invalid runtime: runtime_mcp is specified but runtime_exec is not"
        raise MCPInvalidRuntimeError(runtime_exec=self.runtime_exec, runtime_mcp=self.runtime_mcp, message=message)
    return self

KubernetesProbeConfig

Bases: BaseModel

The configuration for the Kubernetes probe.

Source code in src/mcp_ephemeral_k8s/api/ephemeral_mcp_server.py
21
22
23
24
25
26
27
28
class KubernetesProbeConfig(BaseModel):
    """The configuration for the Kubernetes probe."""

    initial_delay_seconds: int = Field(default=10, description="The initial delay seconds for the probe")
    period_seconds: int = Field(default=1, description="The period seconds for the probe")
    timeout_seconds: int = Field(default=2, description="The timeout seconds for the probe")
    success_threshold: int = Field(default=1, description="The success threshold for the probe")
    failure_threshold: int = Field(default=300, description="The failure threshold for the probe")

KubernetesRuntime

Bases: str, Enum

The runtime that is being used for Kubeconfig

Source code in src/mcp_ephemeral_k8s/api/ephemeral_mcp_server.py
14
15
16
17
18
class KubernetesRuntime(str, Enum):
    """The runtime that is being used for Kubeconfig"""

    KUBECONFIG = "KUBECONFIG"
    INCLUSTER = "INCLUSTER"

This module contains the exceptions for the MCP ephemeral K8s library.

InvalidKubeConfigError

Bases: Exception

Exception raised when the kube config is invalid.

Source code in src/mcp_ephemeral_k8s/api/exceptions.py
 6
 7
 8
 9
10
11
class InvalidKubeConfigError(Exception):
    """Exception raised when the kube config is invalid."""

    def __init__(self, message: str):
        self.message = f"Invalid kube config: {message}"
        super().__init__(self.message)

MCPInvalidRuntimeError

Bases: ValueError

An error that occurs when the runtime is invalid.

Source code in src/mcp_ephemeral_k8s/api/exceptions.py
38
39
40
41
42
class MCPInvalidRuntimeError(ValueError):
    """An error that occurs when the runtime is invalid."""

    def __init__(self, runtime_exec: str | None, runtime_mcp: str | None, message: str) -> None:
        super().__init__(f"Invalid runtime: {runtime_exec=} and {runtime_mcp=} {message}")

MCPJobError

Bases: Exception

Exception raised when the MCP job is in an error state.

Source code in src/mcp_ephemeral_k8s/api/exceptions.py
61
62
63
64
65
66
class MCPJobError(Exception):
    """Exception raised when the MCP job is in an error state."""

    def __init__(self, namespace: str, pod_name: str, message: str):
        self.message = f"MCP job error: {namespace=} {pod_name=} - {message}"
        super().__init__(self.message)

MCPJobNotFoundError

Bases: Exception

Exception raised when the MCP job is not found.

Source code in src/mcp_ephemeral_k8s/api/exceptions.py
30
31
32
33
34
35
class MCPJobNotFoundError(Exception):
    """Exception raised when the MCP job is not found."""

    def __init__(self, namespace: str, pod_name: str):
        self.message = f"Failed to find MCP job: {namespace=} {pod_name=}"
        super().__init__(self.message)

MCPJobTimeoutError

Bases: Exception

Exception raised when the MCP job times out.

Source code in src/mcp_ephemeral_k8s/api/exceptions.py
53
54
55
56
57
58
class MCPJobTimeoutError(Exception):
    """Exception raised when the MCP job times out."""

    def __init__(self, namespace: str, pod_name: str):
        self.message = f"MCP job timed out: {namespace=} {pod_name=}"
        super().__init__(self.message)

MCPNamespaceNotFoundError

Bases: ValueError

An error that occurs when the namespace is not found.

Source code in src/mcp_ephemeral_k8s/api/exceptions.py
45
46
47
48
49
50
class MCPNamespaceNotFoundError(ValueError):
    """An error that occurs when the namespace is not found."""

    def __init__(self, namespace: str):
        self.message = f"Namespace not found: {namespace}"
        super().__init__(self.message)

MCPPortForwardError

Bases: Exception

Exception raised when the MCP port forward fails.

Source code in src/mcp_ephemeral_k8s/api/exceptions.py
14
15
16
17
18
19
class MCPPortForwardError(Exception):
    """Exception raised when the MCP port forward fails."""

    def __init__(self, pod_name: str, namespace: str, port: int):
        self.message = f"Failed to create port forward: {pod_name=} {namespace=} {port=}"
        super().__init__(self.message)

MCPServerCreationError

Bases: Exception

Exception raised when the MCP server creation fails.

Source code in src/mcp_ephemeral_k8s/api/exceptions.py
22
23
24
25
26
27
class MCPServerCreationError(Exception):
    """Exception raised when the MCP server creation fails."""

    def __init__(self, message: str):
        self.message = f"Failed to create MCP server: {message}"
        super().__init__(self.message)

check_pod_status(core_v1, pod_name, namespace)

Check the status of pods associated with a job.

Parameters:

Name Type Description Default
core_v1 CoreV1Api

The Kubernetes core API client

required
pod_name str

Name of the job/pod

required
namespace str

Kubernetes namespace

required

Returns:

Type Description
bool

True if a pod is running and ready (probes successful), False if waiting for pods

Raises:

Type Description
MCPJobError

If a pod is in Failed or Unknown state

Source code in src/mcp_ephemeral_k8s/k8s/job.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
def check_pod_status(core_v1: client.CoreV1Api, pod_name: str, namespace: str) -> bool:  # noqa: C901
    """
    Check the status of pods associated with a job.

    Args:
        core_v1: The Kubernetes core API client
        pod_name: Name of the job/pod
        namespace: Kubernetes namespace

    Returns:
        True if a pod is running and ready (probes successful), False if waiting for pods

    Raises:
        MCPJobError: If a pod is in Failed or Unknown state
    """
    pods = core_v1.list_namespaced_pod(namespace=namespace, label_selector=f"job-name={pod_name}")
    if not pods.items:
        logger.warning(f"No pods found for job '{pod_name}', waiting...")
        return False
    for pod in pods.items:
        if pod.status and pod.status.phase:
            if pod.status.phase in ["Failed", "Unknown"]:
                if pod.metadata is not None and pod.metadata.name is not None:
                    logs = core_v1.read_namespaced_pod_log(name=pod.metadata.name, namespace=namespace)
                    logger.error(f"Pod {pod.metadata.name} in error state: {pod.status.phase}")
                    logger.error(f"Logs: {logs}")
                    message = f"Pod is in error state: {pod.status.phase}. Logs: {logs}"
                else:
                    message = f"Pod is in error state: {pod.status.phase}"
                raise MCPJobError(namespace, pod_name, message)
            elif pod.status.phase == "Running":
                is_ready = False
                if pod.status.conditions:
                    for condition in pod.status.conditions:
                        if condition.type == "Ready" and condition.status == "True":
                            is_ready = True
                            break

                if is_ready:
                    logger.info(f"Job '{pod_name}' pod is running and ready (probes successful)")
                    return True
                else:
                    logger.info(f"Job '{pod_name}' pod is running but not ready yet (waiting for probes)")
    return False

create_mcp_server_job(config, namespace)

Create a job that will run until explicitly terminated.

Parameters:

Name Type Description Default
config EphemeralMcpServerConfig

The configuration for the MCP server

required
namespace str

Kubernetes namespace

required

Returns:

Type Description
V1Job

The MCP server instance

Source code in src/mcp_ephemeral_k8s/k8s/job.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def create_mcp_server_job(config: EphemeralMcpServerConfig, namespace: str) -> client.V1Job:
    """
    Create a job that will run until explicitly terminated.

    Args:
        config: The configuration for the MCP server
        namespace: Kubernetes namespace

    Returns:
        The MCP server instance
    """
    # Convert environment variables dictionary to list of V1EnvVar
    env_list = [client.V1EnvVar(name=key, value=value) for key, value in (config.env or {}).items()]

    # Configure the job
    job = client.V1Job(
        api_version="batch/v1",
        kind="Job",
        metadata=client.V1ObjectMeta(name=config.job_name, namespace=namespace),
        spec=client.V1JobSpec(
            backoff_limit=10,
            template=client.V1PodTemplateSpec(
                metadata=client.V1ObjectMeta(labels={"app": config.job_name}),
                spec=client.V1PodSpec(
                    containers=[
                        client.V1Container(
                            name=config.job_name,
                            image=config.image,
                            command=config.entrypoint,
                            image_pull_policy="IfNotPresent",
                            args=config.args,
                            resources=client.V1ResourceRequirements(
                                requests=config.resource_requests, limits=config.resource_limits
                            ),
                            ports=[client.V1ContainerPort(container_port=config.port)],
                            env=env_list,
                            readiness_probe=client.V1Probe(
                                tcp_socket=client.V1TCPSocketAction(port=config.port),
                                **config.probe_config.model_dump(),
                            ),
                        )
                    ],
                    restart_policy="Never",
                ),
            ),
        ),
    )

    return job

delete_mcp_server_job(core_v1, batch_v1, pod_name, namespace)

Delete a Kubernetes job and its associated pods.

Parameters:

Name Type Description Default
core_v1 CoreV1Api

The Kubernetes core API client

required
batch_v1 BatchV1Api

The Kubernetes batch API client

required
pod_name str

The name of the pod to delete

required
namespace str

The namespace of the pod

required

Returns:

Type Description
bool

True if the job was deleted successfully, False otherwise

Source code in src/mcp_ephemeral_k8s/k8s/job.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def delete_mcp_server_job(
    core_v1: client.CoreV1Api, batch_v1: client.BatchV1Api, pod_name: str, namespace: str
) -> bool:
    """
    Delete a Kubernetes job and its associated pods.

    Args:
        core_v1: The Kubernetes core API client
        batch_v1: The Kubernetes batch API client
        pod_name: The name of the pod to delete
        namespace: The namespace of the pod

    Returns:
        True if the job was deleted successfully, False otherwise
    """
    try:
        pods = core_v1.list_namespaced_pod(namespace=namespace, label_selector=f"app={pod_name}")
        for pod in pods.items:
            if pod.metadata is None:
                continue
            pod_name_to_delete = pod.metadata.name
            if pod_name_to_delete is None:
                continue
            logger.info(f"Deleting pod {pod_name_to_delete}")
            core_v1.delete_namespaced_pod(
                name=pod_name_to_delete,
                namespace=namespace,
                body=client.V1DeleteOptions(grace_period_seconds=0, propagation_policy="Background"),
            )
    except ApiException as e:
        logger.info(f"Error deleting pods: {e}")
        return False
    try:
        batch_v1.delete_namespaced_job(
            name=pod_name, namespace=namespace, body=client.V1DeleteOptions(propagation_policy="Foreground")
        )
        logger.info(f"Job '{pod_name}' deleted successfully")
    except ApiException as e:
        logger.info(f"Error deleting job: {e}")
        return False
    else:
        return True

expose_mcp_server_port(core_v1, pod_name, namespace, port)

Expose the MCP server port to the outside world.

Parameters:

Name Type Description Default
core_v1 CoreV1Api

The Kubernetes core API client

required
pod_name str

Name of the pod

required
namespace str

Kubernetes namespace

required
port int

Port to expose

required
Source code in src/mcp_ephemeral_k8s/k8s/job.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def expose_mcp_server_port(core_v1: client.CoreV1Api, pod_name: str, namespace: str, port: int) -> None:
    """
    Expose the MCP server port to the outside world.

    Args:
        core_v1: The Kubernetes core API client
        pod_name: Name of the pod
        namespace: Kubernetes namespace
        port: Port to expose
    """
    core_v1.create_namespaced_service(
        namespace=namespace,
        body=client.V1Service(
            metadata=client.V1ObjectMeta(name=pod_name),
            spec=client.V1ServiceSpec(
                selector={"app": pod_name},
                ports=[client.V1ServicePort(port=port)],
            ),
        ),
    )
    logger.info(f"Service '{pod_name}' created successfully")

get_mcp_server_job_status(batch_v1, pod_name, namespace)

Get the status of a Kubernetes job.

Parameters:

Name Type Description Default
batch_v1 BatchV1Api

The Kubernetes batch API client

required
pod_name str

The name of the pod to get the status of

required
namespace str

The namespace of the pod

required

Returns:

Type Description
None | V1Job

The status of the job

Source code in src/mcp_ephemeral_k8s/k8s/job.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def get_mcp_server_job_status(batch_v1: client.BatchV1Api, pod_name: str, namespace: str) -> None | client.V1Job:
    """
    Get the status of a Kubernetes job.

    Args:
        batch_v1: The Kubernetes batch API client
        pod_name: The name of the pod to get the status of
        namespace: The namespace of the pod

    Returns:
        The status of the job
    """
    try:
        job = batch_v1.read_namespaced_job(name=pod_name, namespace=namespace)

        # Get status
        if job.status is not None:
            active = job.status.active if job.status.active is not None else 0
            succeeded = job.status.succeeded if job.status.succeeded is not None else 0
            failed = job.status.failed if job.status.failed is not None else 0

            logger.info(f"Job '{pod_name}' status:")
            logger.info(f"Active pods: {active}")
            logger.info(f"Succeeded pods: {succeeded}")
            logger.info(f"Failed pods: {failed}")

        # Get job creation time
        if job.metadata is not None and job.metadata.creation_timestamp is not None:
            creation_time = job.metadata.creation_timestamp
            logger.info(f"Creation time: {creation_time}")
    except ApiException as e:
        if e.status == 404:
            logger.info(f"Job '{pod_name}' not found")
        else:
            logger.info(f"Error getting job status: {e}")
        return None
    else:
        return job

remove_mcp_server_port(core_v1, pod_name, namespace)

Remove the MCP server port from the outside world.

Parameters:

Name Type Description Default
core_v1 CoreV1Api

The Kubernetes core API client

required
pod_name str

Name of the pod

required
namespace str

Kubernetes namespace

required
Source code in src/mcp_ephemeral_k8s/k8s/job.py
292
293
294
295
296
297
298
299
300
301
302
def remove_mcp_server_port(core_v1: client.CoreV1Api, pod_name: str, namespace: str) -> None:
    """
    Remove the MCP server port from the outside world.

    Args:
        core_v1: The Kubernetes core API client
        pod_name: Name of the pod
        namespace: Kubernetes namespace
    """
    core_v1.delete_namespaced_service(name=pod_name, namespace=namespace)
    logger.info(f"Service '{pod_name}' deleted successfully")

wait_for_job_deletion(batch_v1, pod_name, namespace, sleep_time=1, max_wait_time=60)

Wait for a job to be deleted.

Parameters:

Name Type Description Default
batch_v1 BatchV1Api

The Kubernetes batch API client

required
pod_name str

Name of the pod

required
namespace str

Kubernetes namespace

required
sleep_time float

Time to sleep between checks

1
max_wait_time float

Maximum time to wait before timing out

60

Raises:

Type Description
MCPJobTimeoutError

If the job is not deleted within max_wait_time

Source code in src/mcp_ephemeral_k8s/k8s/job.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
def wait_for_job_deletion(
    batch_v1: client.BatchV1Api, pod_name: str, namespace: str, sleep_time: float = 1, max_wait_time: float = 60
) -> None:
    """
    Wait for a job to be deleted.

    Args:
        batch_v1: The Kubernetes batch API client
        pod_name: Name of the pod
        namespace: Kubernetes namespace
        sleep_time: Time to sleep between checks
        max_wait_time: Maximum time to wait before timing out

    Raises:
        MCPJobTimeoutError: If the job is not deleted within max_wait_time
    """
    start_time = time.time()
    while True:
        if time.time() - start_time > max_wait_time:
            raise MCPJobTimeoutError(namespace, pod_name)
        if get_mcp_server_job_status(batch_v1, pod_name, namespace) is None:
            break
        time.sleep(sleep_time)

wait_for_job_ready(batch_v1, core_v1, pod_name, namespace, sleep_time=1, max_wait_time=60)

Wait for a job's pod to be in the running state and ready (probes successful).

Parameters:

Name Type Description Default
batch_v1 BatchV1Api

The Kubernetes batch API client

required
core_v1 CoreV1Api

The Kubernetes core API client

required
pod_name str

Name of the pod

required
namespace str

Kubernetes namespace

required
sleep_time float

Time to sleep between checks

1
max_wait_time float

Maximum time to wait before timing out

60

Raises:

Type Description
MCPJobTimeoutError

If the job does not become ready within max_wait_time

Source code in src/mcp_ephemeral_k8s/k8s/job.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
def wait_for_job_ready(
    batch_v1: client.BatchV1Api,
    core_v1: client.CoreV1Api,
    pod_name: str,
    namespace: str,
    sleep_time: float = 1,
    max_wait_time: float = 60,
) -> None:
    """
    Wait for a job's pod to be in the running state and ready (probes successful).

    Args:
        batch_v1: The Kubernetes batch API client
        core_v1: The Kubernetes core API client
        pod_name: Name of the pod
        namespace: Kubernetes namespace
        sleep_time: Time to sleep between checks
        max_wait_time: Maximum time to wait before timing out

    Raises:
        MCPJobTimeoutError: If the job does not become ready within max_wait_time
    """
    start_time = time.time()
    while True:
        if time.time() - start_time > max_wait_time:
            raise MCPJobTimeoutError(namespace, pod_name)

        job = get_mcp_server_job_status(batch_v1, pod_name, namespace)
        if job is None:
            logger.warning(f"Job '{pod_name}' not found, waiting for pod to become ready...")
            time.sleep(sleep_time)
            continue

        if job.status is None:
            logger.warning(f"Job '{pod_name}' status is None, waiting for pod to become ready...")
            time.sleep(sleep_time)
            continue

        # Check if any pod is in running state and ready
        if check_pod_status(core_v1, pod_name, namespace):
            break

        if job.status.active == 1:
            logger.info(f"Job '{pod_name}' active")
        else:
            logger.warning(f"Job '{pod_name}' in unknown state, waiting...")

        time.sleep(sleep_time)

This module contains a utility function to generate unique identifiers for MCP ephemeral K8s resources based on RFC 1123 Label Names.

generate_unique_id(prefix=None, max_length=63)

Generate a unique identifier that follows the Kubernetes naming rules (RFC 1123 Label Names).

RFC 1123 Label Names must: - Contain only lowercase alphanumeric characters or '-' - Start with an alphanumeric character - End with an alphanumeric character - Be at most 63 characters

Parameters:

Name Type Description Default
prefix str | None

Optional prefix for the ID. Will be converted to lowercase and non-compliant characters will be replaced with dashes.

None
max_length int

Maximum length of the generated ID, defaults to 63 (K8s limit).

63

Returns:

Type Description
str

A unique RFC 1123 compliant identifier string.

Source code in src/mcp_ephemeral_k8s/k8s/uid.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def generate_unique_id(prefix: str | None = None, max_length: int = 63) -> str:
    """
    Generate a unique identifier that follows the Kubernetes naming rules (RFC 1123 Label Names).

    RFC 1123 Label Names must:
    - Contain only lowercase alphanumeric characters or '-'
    - Start with an alphanumeric character
    - End with an alphanumeric character
    - Be at most 63 characters

    Args:
        prefix: Optional prefix for the ID. Will be converted to lowercase and non-compliant
                characters will be replaced with dashes.
        max_length: Maximum length of the generated ID, defaults to 63 (K8s limit).

    Returns:
        A unique RFC 1123 compliant identifier string.
    """
    # Process prefix if provided
    processed_prefix = ""
    if prefix:
        # Convert to lowercase and replace invalid characters
        processed_prefix = "".join(
            c if c.isalnum() and c.islower() else (c.lower() if c.isalnum() else "-") for c in prefix
        )

        # Ensure prefix starts with alphanumeric
        if processed_prefix and not processed_prefix[0].isalnum():
            processed_prefix = f"p{processed_prefix}"

        # Add separator
        if processed_prefix:
            processed_prefix = f"{processed_prefix}-"

    # Generate a unique part (timestamp + random)
    timestamp = str(int(time.time()))
    random_chars = "".join(random.choices(string.ascii_lowercase + string.digits, k=8))  # noqa: S311
    unique_part = f"{timestamp}-{random_chars}"

    # Combine and ensure max length
    full_id = f"{processed_prefix}{unique_part}"
    if len(full_id) > max_length:
        # If too long, truncate the ID but keep the random part
        chars_to_keep = max_length - len(random_chars) - 1
        full_id = f"{full_id[:chars_to_keep]}-{random_chars}"

    # Ensure ID ends with alphanumeric
    if not full_id[-1].isalnum():
        full_id = f"{full_id[:-1]}{random.choice(string.ascii_lowercase)}"  # noqa: S311

    return full_id