# PubSub::Publish
Publishes messages to a Google Cloud Pub/Sub topic. Batches incoming messages and flushes when the batch size is reached or the timeout expires, whichever comes first.
PubSub::Publish
M
S
OK
B
D
# Inputs
| ID | Abbrev | Name | Type | Default | Description |
|---|---|---|---|---|---|
message | M | Message | STRING | | Message content to publish. Each change adds the new value to the internal buffer. |
# Outputs
| ID | Abbrev | Name | Type | Default | Description |
|---|---|---|---|---|---|
status | S | Status | NUMBER | 0 | HTTP status code of the last publish request. 0 means no publish has been attempted yet. |
success | OK | Success | BOOLEAN | false | True if the last publish request returned HTTP 200. |
buffered | B | Buffered | NUMBER | 0 | Number of messages currently waiting in the buffer. |
dropped | D | Dropped | NUMBER | 0 | Cumulative count of messages dropped due to buffer overflow or publish failures. |
# Configuration
| ID | Name | Type | Default | Unit | Description |
|---|---|---|---|---|---|
service_account_key | Service account key | STRING | | Full JSON content of the GCP service account key file used for authentication. Details: Required | |
project_id | Project ID | STRING | | Google Cloud project ID. Details: Required | |
topic | Topic | STRING | | Pub/Sub topic name to publish messages to. Details: Required | |
batch_timeout | Batch timeout | NUMBER | 1 | s | Maximum time in seconds to hold messages before flushing the buffer. Details: > 0 |
batch_size | Batch size | NUMBER | 10 | Maximum number of messages in the buffer before an immediate flush is triggered. Details: ≥ 1 ≤ 100 | |
max_buffer_size | Max buffer size | NUMBER | 20 | Hard cap on how many messages can sit in the buffer. New messages are dropped when this limit is reached. Details: ≥ 10 ≤ 100 |
# State
| ID | Name | Type | Default | Unit | Description |
|---|---|---|---|---|---|
token | Token | STRING | | Cached OAuth2 access token. | |
token_expiry | Token expiry | NUMBER | 0 | Unix timestamp when the cached token expires. | |
phase | Phase | NUMBER | 0 | Async operation tracker: 0 = idle, 1 = awaiting token, 2 = awaiting publish response. | |
buffer | Buffer | STRING | | Accumulated message entries as comma-separated JSON fragments. | |
buffer_count | Buffer count | NUMBER | 0 | Number of messages currently in the buffer. | |
pending_payload | Pending payload | STRING | | The full HTTP request body being published. | |
pending_count | Pending count | NUMBER | 0 | Number of messages in the current pending publish batch. | |
phase_started | phase_started | NUMBER | 0 |
# Source Code
View Volang source
// ---- Helper: build JWT and return signed token string ----
fn build_jwt() {
sa_key = config::get("service_account_key")
private_key = json::get(sa_key, "private_key")
client_email = json::get(sa_key, "client_email")
key_id = json::get(sa_key, "private_key_id")
now = time::now()
exp = now + 3600
header = str::fmt("""{"alg":"RS256","typ":"JWT","kid":"{}"}""", key_id)
claims = str::fmt("""{"iss":"{}","sub":"{}","scope":"https://www.googleapis.com/auth/pubsub","aud":"https://oauth2.googleapis.com/token","iat":{},"exp":{}}""", client_email, client_email, now, exp)
header_b64 = base64::url_encode(header)
claims_b64 = base64::url_encode(claims)
signing_input = str::fmt("{}.{}", header_b64, claims_b64)
signature = crypto::rs256_sign(signing_input, private_key)
return str::fmt("{}.{}", signing_input, signature)
}
// ---- Helper: publish the pending payload ----
fn do_publish() {
token = state::get("token")
payload = state::get("pending_payload")
project_id = config::get("project_id")
topic = config::get("topic")
url = str::fmt("https://pubsub.googleapis.com/v1/projects/{}/topics/{}:publish", project_id, topic)
client = http::client()
http::set_method(client, "POST")
http::set_header(client, "Content-Type", "application/json")
http::set_header(client, "Authorization", str::fmt("Bearer {}", token))
http::set_body(client, payload)
http::call(client, url)
state::set("phase", 2)
state::set("phase_started", time::uptime())
}
// ---- Helper: request a new OAuth2 access token ----
fn request_token() {
jwt = build_jwt()
client = http::client()
http::set_method(client, "POST")
http::set_header(client, "Content-Type", "application/x-www-form-urlencoded")
http::set_body(client, str::fmt("grant_type=urn:ietf:params:oauth:grant-type:jwt-bearer&assertion={}", jwt))
http::call(client, "https://oauth2.googleapis.com/token")
state::set("phase", 1)
state::set("phase_started", time::uptime())
}
// ---- Helper: snapshot buffer and begin the publish flow ----
fn start_flush() {
phase = state::get("phase")
if (phase != 0) {
return
}
buffer_count = state::get("buffer_count")
if (buffer_count == 0) {
return
}
// Snapshot buffer into pending payload and reset buffer
buffer = state::get("buffer")
state::set("pending_payload", str::fmt("""{"messages":[{}]}""", buffer))
state::set("pending_count", buffer_count)
state::set("buffer", "")
state::set("buffer_count", 0)
output::set("buffered", 0)
// Check token validity
token_expiry = state::get("token_expiry")
if (token_expiry > time::uptime()) {
do_publish()
} else {
request_token()
}
}
// ---- Helper: add a message to the buffer ----
fn add_to_buffer(message) {
buffer_count = state::get("buffer_count")
// Drop message if buffer is at capacity
if (buffer_count >= config::get("max_buffer_size")) {
output::set("dropped", output::get("dropped") + 1)
return
}
msg_b64 = base64::encode(message)
entry = str::fmt("""{"data":"{}"}""", msg_b64)
buffer = state::get("buffer")
if (buffer_count == 0) {
state::set("buffer", entry)
} else {
state::set("buffer", str::fmt("{},{}", buffer, entry))
}
buffer_count = buffer_count + 1
state::set("buffer_count", buffer_count)
output::set("buffered", buffer_count)
// Only manage callbacks when no HTTP call is in flight (callback slot is free)
if (state::get("phase") == 0) {
if (buffer_count >= config::get("batch_size")) {
callback::clear()
start_flush()
} else if (buffer_count == 1) {
timeout_ms = math::round(config::get("batch_timeout") * 1000)
callback::set(timeout_ms, "onFlush", 0)
}
}
// When phase != 0, messages just accumulate; http::on_response will handle them
}
// ---- Flush timeout callback ----
extern fn onFlush(value) {
start_flush()
}
// ---- HTTP response callback ----
extern fn http::on_response(status, body, headers) {
phase = state::get("phase")
if (phase == 1) {
// Token response
if (status == 200) {
access_token = json::get(body, "access_token")
state::set("token", access_token)
state::set("token_expiry", time::uptime() + 3500000)
do_publish()
} else {
// Token request failed - drop the pending batch
state::set("phase", 0)
output::set("status", status)
output::set("success", false)
output::set("dropped", output::get("dropped") + state::get("pending_count"))
state::set("pending_count", 0)
// Re-arm timeout for any messages that accumulated during the round-trip
buffer_count = state::get("buffer_count")
if (buffer_count >= config::get("batch_size")) {
callback::clear()
start_flush()
} else if (buffer_count > 0) {
callback::clear()
timeout_ms = math::round(config::get("batch_timeout") * 1000)
callback::set(timeout_ms, "onFlush", 0)
}
}
return
}
if (phase == 2) {
// Publish response
state::set("phase", 0)
output::set("status", status)
output::set("success", status == 200)
if (status == 200) {
state::set("pending_count", 0)
} else {
// Publish failed - drop the pending batch
output::set("dropped", output::get("dropped") + state::get("pending_count"))
state::set("pending_count", 0)
if (status == 401) {
state::set("token_expiry", 0)
}
}
// If new messages accumulated during the HTTP round-trip, handle them
buffer_count = state::get("buffer_count")
if (buffer_count >= config::get("batch_size")) {
callback::clear()
start_flush()
} else if (buffer_count > 0) {
// Re-arm timeout in case the previous one fired while publish was in flight
callback::clear()
timeout_ms = math::round(config::get("batch_timeout") * 1000)
callback::set(timeout_ms, "onFlush", 0)
}
return
}
}
// ---- Main: handle incoming message ----
channel = input::channel()
if (channel == "message") {
phase = state::get("phase")
if (phase != 0) {
phase_started = state::get("phase_started")
if (time::uptime() - phase_started > 10000) {
state::set("phase", 0)
output::set("status", 0)
output::set("success", false)
output::set("dropped", output::get("dropped") + state::get("pending_count"))
state::set("pending_count", 0)
}
}
message = input::get("message")
if (message != "") {
if (str::count(message, "${CURRENT_TIMESTAMP_MS}") > 0) {
now_s = time::now()
uptime_ms = time::uptime()
ms_part = uptime_ms % 1000
timestamp_ms = now_s * 1000 + ms_part
message = str::replace(message, "${CURRENT_TIMESTAMP_MS}", str::fmt("{}", timestamp_ms))
}
add_to_buffer(message)
}
}
