Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/workerd/api/container.c++
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@ Container::Container(rpc::Container::Client rpcClient, bool running)
: rpcClient(IoContext::current().addObject(kj::heap(kj::mv(rpcClient)))),
running(running) {}

void Container::setEgress(jsg::Lock& js, kj::String addr, jsg::Ref<api::Fetcher> binding) {
auto subrequestChannel = binding->getSubrequestChannel(IoContext::current());
subrequestChannel->requireAllowsTransfer();
auto request = rpcClient->setEgressTcpRequest();
auto serviceDesignator = request.initService();
subrequestChannel->writeServiceDesignator(serviceDesignator);
IoContext::current().addTask(request.sendIgnoringResult());
}

void Container::start(jsg::Lock& js, jsg::Optional<StartupOptions> maybeOptions) {
auto flags = FeatureFlags::get(js);
JSG_REQUIRE(!running, Error, "start() cannot be called on a container that is already running.");
Expand Down
5 changes: 5 additions & 0 deletions src/workerd/api/container.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ class Container: public jsg::Object {
jsg::Ref<Fetcher> getTcpPort(jsg::Lock& js, int port);
jsg::Promise<void> setInactivityTimeout(jsg::Lock& js, int64_t durationMs);

void setEgress(jsg::Lock& js, kj::String addr, jsg::Ref<api::Fetcher> binding);

// TODO(containers): listenTcp()

JSG_RESOURCE_TYPE(Container, CompatibilityFlags::Reader flags) {
Expand All @@ -73,6 +75,9 @@ class Container: public jsg::Object {
JSG_METHOD(signal);
JSG_METHOD(getTcpPort);
JSG_METHOD(setInactivityTimeout);
if (flags.getWorkerdExperimental()) {
JSG_METHOD(setEgress);
}
}

void visitForMemoryInfo(jsg::MemoryTracker& tracker) const {
Expand Down
1 change: 1 addition & 0 deletions src/workerd/io/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ wd_capnp_library(src = "frankenvalue.capnp")
wd_capnp_library(
src = "container.capnp",
deps = [
":worker-interface_capnp",
"@capnp-cpp//src/capnp/compat:byte-stream_capnp",
],
)
Expand Down
7 changes: 6 additions & 1 deletion src/workerd/io/container.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ $Cxx.namespace("workerd::rpc");
$Cxx.allowCancellation;

using import "/capnp/compat/byte-stream.capnp".ByteStream;
using import "/workerd/io/worker-interface.capnp".ServiceDesignator;

interface Container @0x9aaceefc06523bca {
# RPC interface to talk to a container, for containers attached to Durable Objects.
Expand Down Expand Up @@ -100,7 +101,7 @@ interface Container @0x9aaceefc06523bca {
# attempting to connect.
}

setInactivityTimeout @7 (durationMs :Int64);
setInactivityTimeout @7 (durationMs :Int64);
# Configures the duration where the runtime should shutdown the container after there is
# no connections or activity to the Container.
#
Expand All @@ -110,4 +111,8 @@ interface Container @0x9aaceefc06523bca {
# Note that if there is an open connection to the container, the runtime must not shutdown the container.
# If there is no activity timeout duration configured and no container connection, it's up to the runtime
# to decide when to signal the container to exit.

setEgressTcp @8 (addr: Text, service: ServiceDesignator);
# setEgressTcp will configure the container to send traffic to this
# service
}
4 changes: 4 additions & 0 deletions src/workerd/io/io-channels.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ class IoChannelFactory {
// the returned WorkerInterface.
virtual kj::Own<WorkerInterface> startRequest(SubrequestMetadata metadata) = 0;

virtual void writeServiceDesignator(rpc::ServiceDesignator::Builder builder) {
JSG_FAIL_REQUIRE(Error, "You can't transfer to a service across an IO boundary");
}

kj::Own<CapTableEntry> clone() override final {
return kj::addRef(*this);
}
Expand Down
24 changes: 24 additions & 0 deletions src/workerd/io/worker-interface.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,30 @@ using import "/workerd/io/script-version.capnp".ScriptVersion;
using import "/workerd/io/trace.capnp".TagValue;
using import "/workerd/io/trace.capnp".UserSpanData;

struct ServiceDesignator {
# ServiceDesignator serves as a way to transfer binding information to external services

name @0 :Text;
# The service name.

entrypoint @1 :Text;
# A modules-syntax Worker can export multiple named entrypoints. `export default {` specifies
# the default entrypoint, whereas `export let foo = {` defines an entrypoint named `foo`. If
# `entrypoint` is specified here, it names an alternate entrypoint to use on the target worker,
# otherwise the default is used.

props :union {
# Value to provide in `ctx.props` in the target worker
# when invoked by this service.

empty @2 :Void;
# Empty object. (This is the default.)

json @3 :Text;
# A JSON-encoded value.
}
}

# A 128-bit trace ID used to identify traces.
struct TraceId {
high @0 :UInt64;
Expand Down
6 changes: 6 additions & 0 deletions src/workerd/server/container-client.c++
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,12 @@ kj::Promise<void> ContainerClient::setInactivityTimeout(SetInactivityTimeoutCont
co_return;
}

kj::Promise<void> ContainerClient::setEgressTcp(SetEgressTcpContext context) {
// TODO: Implement setEgress
KJ_UNIMPLEMENTED(
"setEgress is not implemented in local development as it's a experimental feature");
}

kj::Promise<void> ContainerClient::getTcpPort(GetTcpPortContext context) {
const auto params = context.getParams();
uint16_t port = params.getPort();
Expand Down
1 change: 1 addition & 0 deletions src/workerd/server/container-client.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class ContainerClient final: public rpc::Container::Server {
kj::Promise<void> getTcpPort(GetTcpPortContext context) override;
kj::Promise<void> listenTcp(ListenTcpContext context) override;
kj::Promise<void> setInactivityTimeout(SetInactivityTimeoutContext context) override;
kj::Promise<void> setEgressTcp(SetEgressTcpContext context) override;

private:
capnp::ByteStreamFactory& byteStreamFactory;
Expand Down
1 change: 1 addition & 0 deletions types/generated-snapshot/experimental/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3827,6 +3827,7 @@ interface Container {
signal(signo: number): void;
getTcpPort(port: number): Fetcher;
setInactivityTimeout(durationMs: number | bigint): Promise<void>;
setEgress(addr: string, binding: Fetcher): void;
}
interface ContainerStartupOptions {
entrypoint?: string[];
Expand Down
1 change: 1 addition & 0 deletions types/generated-snapshot/experimental/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3838,6 +3838,7 @@ export interface Container {
signal(signo: number): void;
getTcpPort(port: number): Fetcher;
setInactivityTimeout(durationMs: number | bigint): Promise<void>;
setEgress(addr: string, binding: Fetcher): void;
}
export interface ContainerStartupOptions {
entrypoint?: string[];
Expand Down
Loading