# PubSub::Publish

Przetwarzanie

Publikuje wiadomości do tematu Google Cloud Pub/Sub. Buforuje przychodzące wiadomości i wysyła je po osiągnięciu rozmiaru partii lub upływie limitu czasu, w zależności co nastąpi pierwsze.

PubSub::Publish
M
S
OK
B
D

# Wejścia

IDSkrótNazwaTypDomyślnieOpis
messageMWiadomośćSTRINGTreść wiadomości do opublikowania. Każda zmiana dodaje nową wartość do wewnętrznego bufora.

# Wyjścia

IDSkrótNazwaTypDomyślnieOpis
statusSStatusNUMBER0Kod statusu HTTP ostatniego żądania publikacji. 0 oznacza, że żadna publikacja nie została jeszcze wykonana.
successOKSukcesBOOLEANfalsePrawda, jeśli ostatnie żądanie publikacji zwróciło HTTP 200.
bufferedBBuforowaneNUMBER0Liczba wiadomości aktualnie oczekujących w buforze.
droppedDOdrzuconeNUMBER0Łączna liczba wiadomości odrzuconych z powodu przepełnienia bufora lub błędów publikacji.

# Konfiguracja

IDNazwaTypDomyślnieJednostkaOpis
service_account_keyKlucz konta usługiSTRINGPełna zawartość JSON pliku klucza konta usługi GCP używanego do uwierzytelniania.

Szczegóły:

Wymagane
project_idID projektuSTRINGIdentyfikator projektu Google Cloud.

Szczegóły:

Wymagane
topicTematSTRINGNazwa tematu Pub/Sub, do którego mają być publikowane wiadomości.

Szczegóły:

Wymagane
batch_timeoutLimit czasu partiiNUMBER1sMaksymalny czas w sekundach przechowywania wiadomości przed opróżnieniem bufora.

Szczegóły:

> 0
batch_sizeRozmiar partiiNUMBER10Maksymalna liczba wiadomości w buforze przed natychmiastowym opróżnieniem.

Szczegóły:

≥ 1
≤ 100
max_buffer_sizeMaks. rozmiar buforaNUMBER20Twardy limit liczby wiadomości w buforze. Nowe wiadomości są odrzucane po osiągnięciu tego limitu.

Szczegóły:

≥ 10
≤ 100

# Stan

IDNazwaTypDomyślnieJednostkaOpis
tokenTokenSTRINGZbuforowany token dostępu OAuth2.
token_expiryWygaśnięcie tokenaNUMBER0Znacznik czasu Unix wygaśnięcia zbuforowanego tokena.
phaseFazaNUMBER0Śledzenie operacji asynchronicznej: 0 = bezczynny, 1 = oczekiwanie na token, 2 = oczekiwanie na odpowiedź publikacji.
bufferBuforSTRINGZgromadzone wpisy wiadomości jako fragmenty JSON oddzielone przecinkami.
buffer_countLicznik buforaNUMBER0Liczba wiadomości aktualnie w buforze.
pending_payloadOczekujący ładunekSTRINGPełna treść żądania HTTP w trakcie publikacji.
pending_countLicznik oczekującychNUMBER0Liczba wiadomości w bieżącej oczekującej partii publikacji.
phase_startedphase_startedNUMBER0

# Kod źródłowy

Pokaż kod Volang
// ---- Helper: build JWT and return signed token string ----
fn build_jwt() {
    sa_key = config::get("service_account_key")
    private_key = json::get(sa_key, "private_key")
    client_email = json::get(sa_key, "client_email")
    key_id = json::get(sa_key, "private_key_id")

    now = time::now()
    exp = now + 3600

    header = str::fmt("""{"alg":"RS256","typ":"JWT","kid":"{}"}""", key_id)
    claims = str::fmt("""{"iss":"{}","sub":"{}","scope":"https://www.googleapis.com/auth/pubsub","aud":"https://oauth2.googleapis.com/token","iat":{},"exp":{}}""", client_email, client_email, now, exp)

    header_b64 = base64::url_encode(header)
    claims_b64 = base64::url_encode(claims)
    signing_input = str::fmt("{}.{}", header_b64, claims_b64)
    signature = crypto::rs256_sign(signing_input, private_key)

    return str::fmt("{}.{}", signing_input, signature)
}

// ---- Helper: publish the pending payload ----
fn do_publish() {
    token = state::get("token")
    payload = state::get("pending_payload")

    project_id = config::get("project_id")
    topic = config::get("topic")
    url = str::fmt("https://pubsub.googleapis.com/v1/projects/{}/topics/{}:publish", project_id, topic)

    client = http::client()
    http::set_method(client, "POST")
    http::set_header(client, "Content-Type", "application/json")
    http::set_header(client, "Authorization", str::fmt("Bearer {}", token))
    http::set_body(client, payload)
    http::call(client, url)

    state::set("phase", 2)
    state::set("phase_started", time::uptime())
}

// ---- Helper: request a new OAuth2 access token ----
fn request_token() {
    jwt = build_jwt()

    client = http::client()
    http::set_method(client, "POST")
    http::set_header(client, "Content-Type", "application/x-www-form-urlencoded")
    http::set_body(client, str::fmt("grant_type=urn:ietf:params:oauth:grant-type:jwt-bearer&assertion={}", jwt))
    http::call(client, "https://oauth2.googleapis.com/token")

    state::set("phase", 1)
    state::set("phase_started", time::uptime())
}

// ---- Helper: snapshot buffer and begin the publish flow ----
fn start_flush() {
    phase = state::get("phase")
    if (phase != 0) {
        return
    }

    buffer_count = state::get("buffer_count")
    if (buffer_count == 0) {
        return
    }

    // Snapshot buffer into pending payload and reset buffer
    buffer = state::get("buffer")
    state::set("pending_payload", str::fmt("""{"messages":[{}]}""", buffer))
    state::set("pending_count", buffer_count)
    state::set("buffer", "")
    state::set("buffer_count", 0)
    output::set("buffered", 0)

    // Check token validity
    token_expiry = state::get("token_expiry")
    if (token_expiry > time::uptime()) {
        do_publish()
    } else {
        request_token()
    }
}

// ---- Helper: add a message to the buffer ----
fn add_to_buffer(message) {
    buffer_count = state::get("buffer_count")

    // Drop message if buffer is at capacity
    if (buffer_count >= config::get("max_buffer_size")) {
        output::set("dropped", output::get("dropped") + 1)
        return
    }

    msg_b64 = base64::encode(message)
    entry = str::fmt("""{"data":"{}"}""", msg_b64)

    buffer = state::get("buffer")

    if (buffer_count == 0) {
        state::set("buffer", entry)
    } else {
        state::set("buffer", str::fmt("{},{}", buffer, entry))
    }

    buffer_count = buffer_count + 1
    state::set("buffer_count", buffer_count)
    output::set("buffered", buffer_count)

    // Only manage callbacks when no HTTP call is in flight (callback slot is free)
    if (state::get("phase") == 0) {
        if (buffer_count >= config::get("batch_size")) {
            callback::clear()
            start_flush()
        } else if (buffer_count == 1) {
            timeout_ms = math::round(config::get("batch_timeout") * 1000)
            callback::set(timeout_ms, "onFlush", 0)
        }
    }
    // When phase != 0, messages just accumulate; http::on_response will handle them
}

// ---- Flush timeout callback ----
extern fn onFlush(value) {
    start_flush()
}

// ---- HTTP response callback ----
extern fn http::on_response(status, body, headers) {
    phase = state::get("phase")

    if (phase == 1) {
        // Token response
        if (status == 200) {
            access_token = json::get(body, "access_token")
            state::set("token", access_token)
            state::set("token_expiry", time::uptime() + 3500000)
            do_publish()
        } else {
            // Token request failed - drop the pending batch
            state::set("phase", 0)
            output::set("status", status)
            output::set("success", false)
            output::set("dropped", output::get("dropped") + state::get("pending_count"))
            state::set("pending_count", 0)

            // Re-arm timeout for any messages that accumulated during the round-trip
            buffer_count = state::get("buffer_count")
            if (buffer_count >= config::get("batch_size")) {
                callback::clear()
                start_flush()
            } else if (buffer_count > 0) {
                callback::clear()
                timeout_ms = math::round(config::get("batch_timeout") * 1000)
                callback::set(timeout_ms, "onFlush", 0)
            }
        }
        return
    }

    if (phase == 2) {
        // Publish response
        state::set("phase", 0)
        output::set("status", status)
        output::set("success", status == 200)

        if (status == 200) {
            state::set("pending_count", 0)
        } else {
            // Publish failed - drop the pending batch
            output::set("dropped", output::get("dropped") + state::get("pending_count"))
            state::set("pending_count", 0)
            if (status == 401) {
                state::set("token_expiry", 0)
            }
        }

        // If new messages accumulated during the HTTP round-trip, handle them
        buffer_count = state::get("buffer_count")
        if (buffer_count >= config::get("batch_size")) {
            callback::clear()
            start_flush()
        } else if (buffer_count > 0) {
            // Re-arm timeout in case the previous one fired while publish was in flight
            callback::clear()
            timeout_ms = math::round(config::get("batch_timeout") * 1000)
            callback::set(timeout_ms, "onFlush", 0)
        }
        return
    }
}

// ---- Main: handle incoming message ----
channel = input::channel()

if (channel == "message") {
    phase = state::get("phase")
    if (phase != 0) {
        phase_started = state::get("phase_started")
        if (time::uptime() - phase_started > 10000) {
            state::set("phase", 0)
            output::set("status", 0)
            output::set("success", false)
            output::set("dropped", output::get("dropped") + state::get("pending_count"))
            state::set("pending_count", 0)
        }
    }

    message = input::get("message")
    if (message != "") {
        if (str::count(message, "${CURRENT_TIMESTAMP_MS}") > 0) {
            now_s = time::now()
            uptime_ms = time::uptime()
            ms_part = uptime_ms % 1000
            timestamp_ms = now_s * 1000 + ms_part
            message = str::replace(message, "${CURRENT_TIMESTAMP_MS}", str::fmt("{}", timestamp_ms))
        }

        add_to_buffer(message)
    }
}
Publikuje wiadomości do tematu Google Cloud Pub/Sub. Buforuje przychodzące wiadomości i wysyła je po osiągnięciu rozmiaru partii lub upływie limitu czasu, w zależności co nastąpi pierwsze.