# PubSub::Publish
Publikuje wiadomości do tematu Google Cloud Pub/Sub. Buforuje przychodzące wiadomości i wysyła je po osiągnięciu rozmiaru partii lub upływie limitu czasu, w zależności co nastąpi pierwsze.
PubSub::Publish
M
S
OK
B
D
# Wejścia
| ID | Skrót | Nazwa | Typ | Domyślnie | Opis |
|---|---|---|---|---|---|
message | M | Wiadomość | STRING | | Treść wiadomości do opublikowania. Każda zmiana dodaje nową wartość do wewnętrznego bufora. |
# Wyjścia
| ID | Skrót | Nazwa | Typ | Domyślnie | Opis |
|---|---|---|---|---|---|
status | S | Status | NUMBER | 0 | Kod statusu HTTP ostatniego żądania publikacji. 0 oznacza, że żadna publikacja nie została jeszcze wykonana. |
success | OK | Sukces | BOOLEAN | false | Prawda, jeśli ostatnie żądanie publikacji zwróciło HTTP 200. |
buffered | B | Buforowane | NUMBER | 0 | Liczba wiadomości aktualnie oczekujących w buforze. |
dropped | D | Odrzucone | NUMBER | 0 | Łączna liczba wiadomości odrzuconych z powodu przepełnienia bufora lub błędów publikacji. |
# Konfiguracja
| ID | Nazwa | Typ | Domyślnie | Jednostka | Opis |
|---|---|---|---|---|---|
service_account_key | Klucz konta usługi | STRING | | Pełna zawartość JSON pliku klucza konta usługi GCP używanego do uwierzytelniania. Szczegóły: Wymagane | |
project_id | ID projektu | STRING | | Identyfikator projektu Google Cloud. Szczegóły: Wymagane | |
topic | Temat | STRING | | Nazwa tematu Pub/Sub, do którego mają być publikowane wiadomości. Szczegóły: Wymagane | |
batch_timeout | Limit czasu partii | NUMBER | 1 | s | Maksymalny czas w sekundach przechowywania wiadomości przed opróżnieniem bufora. Szczegóły: > 0 |
batch_size | Rozmiar partii | NUMBER | 10 | Maksymalna liczba wiadomości w buforze przed natychmiastowym opróżnieniem. Szczegóły: ≥ 1 ≤ 100 | |
max_buffer_size | Maks. rozmiar bufora | NUMBER | 20 | Twardy limit liczby wiadomości w buforze. Nowe wiadomości są odrzucane po osiągnięciu tego limitu. Szczegóły: ≥ 10 ≤ 100 |
# Stan
| ID | Nazwa | Typ | Domyślnie | Jednostka | Opis |
|---|---|---|---|---|---|
token | Token | STRING | | Zbuforowany token dostępu OAuth2. | |
token_expiry | Wygaśnięcie tokena | NUMBER | 0 | Znacznik czasu Unix wygaśnięcia zbuforowanego tokena. | |
phase | Faza | NUMBER | 0 | Śledzenie operacji asynchronicznej: 0 = bezczynny, 1 = oczekiwanie na token, 2 = oczekiwanie na odpowiedź publikacji. | |
buffer | Bufor | STRING | | Zgromadzone wpisy wiadomości jako fragmenty JSON oddzielone przecinkami. | |
buffer_count | Licznik bufora | NUMBER | 0 | Liczba wiadomości aktualnie w buforze. | |
pending_payload | Oczekujący ładunek | STRING | | Pełna treść żądania HTTP w trakcie publikacji. | |
pending_count | Licznik oczekujących | NUMBER | 0 | Liczba wiadomości w bieżącej oczekującej partii publikacji. | |
phase_started | phase_started | NUMBER | 0 |
# Kod źródłowy
Pokaż kod Volang
// ---- Helper: build JWT and return signed token string ----
fn build_jwt() {
sa_key = config::get("service_account_key")
private_key = json::get(sa_key, "private_key")
client_email = json::get(sa_key, "client_email")
key_id = json::get(sa_key, "private_key_id")
now = time::now()
exp = now + 3600
header = str::fmt("""{"alg":"RS256","typ":"JWT","kid":"{}"}""", key_id)
claims = str::fmt("""{"iss":"{}","sub":"{}","scope":"https://www.googleapis.com/auth/pubsub","aud":"https://oauth2.googleapis.com/token","iat":{},"exp":{}}""", client_email, client_email, now, exp)
header_b64 = base64::url_encode(header)
claims_b64 = base64::url_encode(claims)
signing_input = str::fmt("{}.{}", header_b64, claims_b64)
signature = crypto::rs256_sign(signing_input, private_key)
return str::fmt("{}.{}", signing_input, signature)
}
// ---- Helper: publish the pending payload ----
fn do_publish() {
token = state::get("token")
payload = state::get("pending_payload")
project_id = config::get("project_id")
topic = config::get("topic")
url = str::fmt("https://pubsub.googleapis.com/v1/projects/{}/topics/{}:publish", project_id, topic)
client = http::client()
http::set_method(client, "POST")
http::set_header(client, "Content-Type", "application/json")
http::set_header(client, "Authorization", str::fmt("Bearer {}", token))
http::set_body(client, payload)
http::call(client, url)
state::set("phase", 2)
state::set("phase_started", time::uptime())
}
// ---- Helper: request a new OAuth2 access token ----
fn request_token() {
jwt = build_jwt()
client = http::client()
http::set_method(client, "POST")
http::set_header(client, "Content-Type", "application/x-www-form-urlencoded")
http::set_body(client, str::fmt("grant_type=urn:ietf:params:oauth:grant-type:jwt-bearer&assertion={}", jwt))
http::call(client, "https://oauth2.googleapis.com/token")
state::set("phase", 1)
state::set("phase_started", time::uptime())
}
// ---- Helper: snapshot buffer and begin the publish flow ----
fn start_flush() {
phase = state::get("phase")
if (phase != 0) {
return
}
buffer_count = state::get("buffer_count")
if (buffer_count == 0) {
return
}
// Snapshot buffer into pending payload and reset buffer
buffer = state::get("buffer")
state::set("pending_payload", str::fmt("""{"messages":[{}]}""", buffer))
state::set("pending_count", buffer_count)
state::set("buffer", "")
state::set("buffer_count", 0)
output::set("buffered", 0)
// Check token validity
token_expiry = state::get("token_expiry")
if (token_expiry > time::uptime()) {
do_publish()
} else {
request_token()
}
}
// ---- Helper: add a message to the buffer ----
fn add_to_buffer(message) {
buffer_count = state::get("buffer_count")
// Drop message if buffer is at capacity
if (buffer_count >= config::get("max_buffer_size")) {
output::set("dropped", output::get("dropped") + 1)
return
}
msg_b64 = base64::encode(message)
entry = str::fmt("""{"data":"{}"}""", msg_b64)
buffer = state::get("buffer")
if (buffer_count == 0) {
state::set("buffer", entry)
} else {
state::set("buffer", str::fmt("{},{}", buffer, entry))
}
buffer_count = buffer_count + 1
state::set("buffer_count", buffer_count)
output::set("buffered", buffer_count)
// Only manage callbacks when no HTTP call is in flight (callback slot is free)
if (state::get("phase") == 0) {
if (buffer_count >= config::get("batch_size")) {
callback::clear()
start_flush()
} else if (buffer_count == 1) {
timeout_ms = math::round(config::get("batch_timeout") * 1000)
callback::set(timeout_ms, "onFlush", 0)
}
}
// When phase != 0, messages just accumulate; http::on_response will handle them
}
// ---- Flush timeout callback ----
extern fn onFlush(value) {
start_flush()
}
// ---- HTTP response callback ----
extern fn http::on_response(status, body, headers) {
phase = state::get("phase")
if (phase == 1) {
// Token response
if (status == 200) {
access_token = json::get(body, "access_token")
state::set("token", access_token)
state::set("token_expiry", time::uptime() + 3500000)
do_publish()
} else {
// Token request failed - drop the pending batch
state::set("phase", 0)
output::set("status", status)
output::set("success", false)
output::set("dropped", output::get("dropped") + state::get("pending_count"))
state::set("pending_count", 0)
// Re-arm timeout for any messages that accumulated during the round-trip
buffer_count = state::get("buffer_count")
if (buffer_count >= config::get("batch_size")) {
callback::clear()
start_flush()
} else if (buffer_count > 0) {
callback::clear()
timeout_ms = math::round(config::get("batch_timeout") * 1000)
callback::set(timeout_ms, "onFlush", 0)
}
}
return
}
if (phase == 2) {
// Publish response
state::set("phase", 0)
output::set("status", status)
output::set("success", status == 200)
if (status == 200) {
state::set("pending_count", 0)
} else {
// Publish failed - drop the pending batch
output::set("dropped", output::get("dropped") + state::get("pending_count"))
state::set("pending_count", 0)
if (status == 401) {
state::set("token_expiry", 0)
}
}
// If new messages accumulated during the HTTP round-trip, handle them
buffer_count = state::get("buffer_count")
if (buffer_count >= config::get("batch_size")) {
callback::clear()
start_flush()
} else if (buffer_count > 0) {
// Re-arm timeout in case the previous one fired while publish was in flight
callback::clear()
timeout_ms = math::round(config::get("batch_timeout") * 1000)
callback::set(timeout_ms, "onFlush", 0)
}
return
}
}
// ---- Main: handle incoming message ----
channel = input::channel()
if (channel == "message") {
phase = state::get("phase")
if (phase != 0) {
phase_started = state::get("phase_started")
if (time::uptime() - phase_started > 10000) {
state::set("phase", 0)
output::set("status", 0)
output::set("success", false)
output::set("dropped", output::get("dropped") + state::get("pending_count"))
state::set("pending_count", 0)
}
}
message = input::get("message")
if (message != "") {
if (str::count(message, "${CURRENT_TIMESTAMP_MS}") > 0) {
now_s = time::now()
uptime_ms = time::uptime()
ms_part = uptime_ms % 1000
timestamp_ms = now_s * 1000 + ms_part
message = str::replace(message, "${CURRENT_TIMESTAMP_MS}", str::fmt("{}", timestamp_ms))
}
add_to_buffer(message)
}
}
