# PubSub::Publish

Process

Publishes messages to a Google Cloud Pub/Sub topic. Batches incoming messages and flushes when the batch size is reached or the timeout expires, whichever comes first.

PubSub::Publish
M
S
OK
B
D

# Inputs

IDAbbrevNameTypeDefaultDescription
messageMMessageSTRINGMessage content to publish. Each change adds the new value to the internal buffer.

# Outputs

IDAbbrevNameTypeDefaultDescription
statusSStatusNUMBER0HTTP status code of the last publish request. 0 means no publish has been attempted yet.
successOKSuccessBOOLEANfalseTrue if the last publish request returned HTTP 200.
bufferedBBufferedNUMBER0Number of messages currently waiting in the buffer.
droppedDDroppedNUMBER0Cumulative count of messages dropped due to buffer overflow or publish failures.

# Configuration

IDNameTypeDefaultUnitDescription
service_account_keyService account keySTRINGFull JSON content of the GCP service account key file used for authentication.

Details:

Required
project_idProject IDSTRINGGoogle Cloud project ID.

Details:

Required
topicTopicSTRINGPub/Sub topic name to publish messages to.

Details:

Required
batch_timeoutBatch timeoutNUMBER1sMaximum time in seconds to hold messages before flushing the buffer.

Details:

> 0
batch_sizeBatch sizeNUMBER10Maximum number of messages in the buffer before an immediate flush is triggered.

Details:

≥ 1
≤ 100
max_buffer_sizeMax buffer sizeNUMBER20Hard cap on how many messages can sit in the buffer. New messages are dropped when this limit is reached.

Details:

≥ 10
≤ 100

# State

IDNameTypeDefaultUnitDescription
tokenTokenSTRINGCached OAuth2 access token.
token_expiryToken expiryNUMBER0Unix timestamp when the cached token expires.
phasePhaseNUMBER0Async operation tracker: 0 = idle, 1 = awaiting token, 2 = awaiting publish response.
bufferBufferSTRINGAccumulated message entries as comma-separated JSON fragments.
buffer_countBuffer countNUMBER0Number of messages currently in the buffer.
pending_payloadPending payloadSTRINGThe full HTTP request body being published.
pending_countPending countNUMBER0Number of messages in the current pending publish batch.
phase_startedphase_startedNUMBER0

# Source Code

View Volang source
// ---- Helper: build JWT and return signed token string ----
fn build_jwt() {
    sa_key = config::get("service_account_key")
    private_key = json::get(sa_key, "private_key")
    client_email = json::get(sa_key, "client_email")
    key_id = json::get(sa_key, "private_key_id")

    now = time::now()
    exp = now + 3600

    header = str::fmt("""{"alg":"RS256","typ":"JWT","kid":"{}"}""", key_id)
    claims = str::fmt("""{"iss":"{}","sub":"{}","scope":"https://www.googleapis.com/auth/pubsub","aud":"https://oauth2.googleapis.com/token","iat":{},"exp":{}}""", client_email, client_email, now, exp)

    header_b64 = base64::url_encode(header)
    claims_b64 = base64::url_encode(claims)
    signing_input = str::fmt("{}.{}", header_b64, claims_b64)
    signature = crypto::rs256_sign(signing_input, private_key)

    return str::fmt("{}.{}", signing_input, signature)
}

// ---- Helper: publish the pending payload ----
fn do_publish() {
    token = state::get("token")
    payload = state::get("pending_payload")

    project_id = config::get("project_id")
    topic = config::get("topic")
    url = str::fmt("https://pubsub.googleapis.com/v1/projects/{}/topics/{}:publish", project_id, topic)

    client = http::client()
    http::set_method(client, "POST")
    http::set_header(client, "Content-Type", "application/json")
    http::set_header(client, "Authorization", str::fmt("Bearer {}", token))
    http::set_body(client, payload)
    http::call(client, url)

    state::set("phase", 2)
    state::set("phase_started", time::uptime())
}

// ---- Helper: request a new OAuth2 access token ----
fn request_token() {
    jwt = build_jwt()

    client = http::client()
    http::set_method(client, "POST")
    http::set_header(client, "Content-Type", "application/x-www-form-urlencoded")
    http::set_body(client, str::fmt("grant_type=urn:ietf:params:oauth:grant-type:jwt-bearer&assertion={}", jwt))
    http::call(client, "https://oauth2.googleapis.com/token")

    state::set("phase", 1)
    state::set("phase_started", time::uptime())
}

// ---- Helper: snapshot buffer and begin the publish flow ----
fn start_flush() {
    phase = state::get("phase")
    if (phase != 0) {
        return
    }

    buffer_count = state::get("buffer_count")
    if (buffer_count == 0) {
        return
    }

    // Snapshot buffer into pending payload and reset buffer
    buffer = state::get("buffer")
    state::set("pending_payload", str::fmt("""{"messages":[{}]}""", buffer))
    state::set("pending_count", buffer_count)
    state::set("buffer", "")
    state::set("buffer_count", 0)
    output::set("buffered", 0)

    // Check token validity
    token_expiry = state::get("token_expiry")
    if (token_expiry > time::uptime()) {
        do_publish()
    } else {
        request_token()
    }
}

// ---- Helper: add a message to the buffer ----
fn add_to_buffer(message) {
    buffer_count = state::get("buffer_count")

    // Drop message if buffer is at capacity
    if (buffer_count >= config::get("max_buffer_size")) {
        output::set("dropped", output::get("dropped") + 1)
        return
    }

    msg_b64 = base64::encode(message)
    entry = str::fmt("""{"data":"{}"}""", msg_b64)

    buffer = state::get("buffer")

    if (buffer_count == 0) {
        state::set("buffer", entry)
    } else {
        state::set("buffer", str::fmt("{},{}", buffer, entry))
    }

    buffer_count = buffer_count + 1
    state::set("buffer_count", buffer_count)
    output::set("buffered", buffer_count)

    // Only manage callbacks when no HTTP call is in flight (callback slot is free)
    if (state::get("phase") == 0) {
        if (buffer_count >= config::get("batch_size")) {
            callback::clear()
            start_flush()
        } else if (buffer_count == 1) {
            timeout_ms = math::round(config::get("batch_timeout") * 1000)
            callback::set(timeout_ms, "onFlush", 0)
        }
    }
    // When phase != 0, messages just accumulate; http::on_response will handle them
}

// ---- Flush timeout callback ----
extern fn onFlush(value) {
    start_flush()
}

// ---- HTTP response callback ----
extern fn http::on_response(status, body, headers) {
    phase = state::get("phase")

    if (phase == 1) {
        // Token response
        if (status == 200) {
            access_token = json::get(body, "access_token")
            state::set("token", access_token)
            state::set("token_expiry", time::uptime() + 3500000)
            do_publish()
        } else {
            // Token request failed - drop the pending batch
            state::set("phase", 0)
            output::set("status", status)
            output::set("success", false)
            output::set("dropped", output::get("dropped") + state::get("pending_count"))
            state::set("pending_count", 0)

            // Re-arm timeout for any messages that accumulated during the round-trip
            buffer_count = state::get("buffer_count")
            if (buffer_count >= config::get("batch_size")) {
                callback::clear()
                start_flush()
            } else if (buffer_count > 0) {
                callback::clear()
                timeout_ms = math::round(config::get("batch_timeout") * 1000)
                callback::set(timeout_ms, "onFlush", 0)
            }
        }
        return
    }

    if (phase == 2) {
        // Publish response
        state::set("phase", 0)
        output::set("status", status)
        output::set("success", status == 200)

        if (status == 200) {
            state::set("pending_count", 0)
        } else {
            // Publish failed - drop the pending batch
            output::set("dropped", output::get("dropped") + state::get("pending_count"))
            state::set("pending_count", 0)
            if (status == 401) {
                state::set("token_expiry", 0)
            }
        }

        // If new messages accumulated during the HTTP round-trip, handle them
        buffer_count = state::get("buffer_count")
        if (buffer_count >= config::get("batch_size")) {
            callback::clear()
            start_flush()
        } else if (buffer_count > 0) {
            // Re-arm timeout in case the previous one fired while publish was in flight
            callback::clear()
            timeout_ms = math::round(config::get("batch_timeout") * 1000)
            callback::set(timeout_ms, "onFlush", 0)
        }
        return
    }
}

// ---- Main: handle incoming message ----
channel = input::channel()

if (channel == "message") {
    phase = state::get("phase")
    if (phase != 0) {
        phase_started = state::get("phase_started")
        if (time::uptime() - phase_started > 10000) {
            state::set("phase", 0)
            output::set("status", 0)
            output::set("success", false)
            output::set("dropped", output::get("dropped") + state::get("pending_count"))
            state::set("pending_count", 0)
        }
    }

    message = input::get("message")
    if (message != "") {
        if (str::count(message, "${CURRENT_TIMESTAMP_MS}") > 0) {
            now_s = time::now()
            uptime_ms = time::uptime()
            ms_part = uptime_ms % 1000
            timestamp_ms = now_s * 1000 + ms_part
            message = str::replace(message, "${CURRENT_TIMESTAMP_MS}", str::fmt("{}", timestamp_ms))
        }

        add_to_buffer(message)
    }
}
Publishes messages to a Google Cloud Pub/Sub topic. Batches incoming messages and flushes when the batch size is reached or the timeout expires, whichever comes first.