【Cloudflareのメッセージキュー】Queuesを試してみました
はじめに
Cloudflare公式URL:https://developers.cloudflare.com/queues/
2024/1現在ではまだベータ版の機能になります。
Workersから連携することでQueueを使用することができます。
Queuesの動作検証
R2作成
CloudflareのR2のバケットを作成します。GUIでも作成できますが今回はAPIを使用してバケットの作成を行います。
$ curl --request POST
--url https://api.cloudflare.com/client/v4/accounts/{account_id}/r2/buckets
--header 'Authorization: Bearer {api-token}'
--header 'Content-Type: application/json'
--data '{"name": "queues-test-bucket"}'
※api-tokenはR2の画面の「R2 API トークンの管理」から作成できます。今回の権限は「管理者読み取りと書き込み」で作成しました。
レスポンスで「success:true」なことを確認します。
{"success":true,"errors":[],"messages":[],"result":{"name":"queues-test-bucket","creation_date":"2024-01-07T03:57:48.224Z","location":"APAC"}}%
GUI(Cloudflare -> アカウント -> R2)からバケットが作成されていることも確認します。
Workers作成
下記コマンドでWorkersのコードを作成します。
$ npm create cloudflare@latest test-queues
下記内容で作成しております。他はデフォルト。
├ What type of application do you want to create?
│ type Queue consumer & producer Worker
│
├ Do you want to use TypeScript?
│ yes typescript
wrangler.tomlを修正します。
name = "test-queues"
main = "src/index.ts"
compatibility_date = "2023-12-18"
# Bind a Queue producer. Use this binding to schedule an arbitrary task that may be processed later by a Queue consumer.
# Docs: https://developers.cloudflare.com/queues/get-started
[[queues.producers]]
binding = "TEST_MY_QUEUE"
queue = "test-my-queue"
# Bind a Queue consumer. Queue Consumers can retrieve tasks scheduled by Producers to act on them.
# Docs: https://developers.cloudflare.com/queues/get-started
[[queues.consumers]]
queue = "test-my-queue"
# Optional: Configure batching and retries: https://developers.cloudflare.com/queues/learning/batching-retries/
max_batch_size = 10
max_batch_timeout = 30
max_retries = 5
dead_letter_queue = "test-my-queue-dlq"
[[r2_buckets]]
bucket_name = "queues-test-bucket"
binding = "QUEUES_TEST_BUCKET"
項目の補足
max_batch_size -> 各バッチで許可されるメッセージの最大数
max_batch_timeout -> バッチがいっぱいになるまで待機する最大秒数
max_retries -> メッセージが失敗した場合、または呼び出された場合のメッセージの最大再試行回数
dead_letter_queue -> 何度か処理に失敗した場合にメッセージを送信する別のキューの名前。これが定義されていない場合、処理に繰り返し失敗するメッセージは最終的に破棄されます。
Cloudflareにログインします。
$ wrangler login
ログインできているか確認します。
$ wrangler whoami
src/index.tsを修正します。
export interface Env {
TEST_MY_QUEUE: Queue<any>;
QUEUES_TEST_BUCKET: R2Bucket;
}
export default {
async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise<Response> {
try {
let log = {
url: request.url,
method: request.method,
headers: Object.fromEntries(request.headers),
};
await env.TEST_MY_QUEUE.send(log);
return new Response('Success!');
} catch (err) {
console.error('An error occurred:', err);
return new Response('Internal Server Error', { status: 500 });
}
},
async queue(batch: MessageBatch<any>, env: Env): Promise<void> {
for (const msg of batch.messages) {
// TODO: do something with the message
msg.ack()
}
const data = JSON.stringify(batch.messages);
await env.QUEUES_TEST_BUCKET.put(`file-${Date.now()}.json`, data, {
httpMetadata: {
contentType: 'application/json',
},
});
},
};
Queue作成
queueを作成します。
$ wrangler queues create test-my-queue
$ wrangler queues create test-my-queue-dlq
コマンドでエラーが出てないことを確認して、
GUI(Cloudflare -> アカウント -> Workers & Pages -> Queues)からQueueが作成されたか確認します。
Cloudflareにdeploy
ここまでできたらWorkersをCloudflareにDeployします。
$ wrangler deploy
GUI(Cloudflare -> アカウント -> Workers & Pages)からWorkersが作成されたことを確認します。
また、Queueを確認してみるとtest-my-queueのステータスがアクティブになっていることを確認します。
WorkersでQueueがbindされると、こちらの項目がアクティブになります。
動作確認
ここまで準備ができたのでWorkersのプレビューのURLからアクセスを行ってみます。
※プレビューのURL:******.workers.dev
作成したWorkersの画面から確認ができます。
「Success!」と表示されるかと思います。
queueの設定で下記のようにしたので、30秒待つか、10件queueが溜まるまでqueueの処理は実行されません。
max_batch_size = 10
max_batch_timeout = 30
上記条件が満たされたら作成したR2のバケットを見てみましょう。
ファイルが作成されていることを確認できました。
ダウンロードして中身を確認してみると、アクセス情報がjsonで入っており正常に動作できてることを確認できました。
GUI(Cloudflare -> アカウント -> Workers & Pages -> Queues)から該当のQueueの情報を確認することもできます。
今回は検証という形で簡単な動作を試してみました。
ただ、queueの処理をミスした場合のためにdead_letter_queueを指定してあるのですが、こちらをうまく動作させることができませんでした。
検証の仕方が悪かったかもしれないのですが、まだベータ版ということもあり、また今度試してみようと思います。
GraphQL
GUIからQueueのグラフは確認できますが、CloudflareのGraphQLを使用してQueueの値を取得することも可能です。
Queue GraphQL URL: https://developers.cloudflare.com/queues/reference/metrics/
まず、対象のQueueのqueueIdを調べる必要があります。
これはAPIを使用して確認が可能です。
$ curl --request GET
--url https://api.cloudflare.com/client/v4/accounts/{account_id}/workers/queues
--header 'Authorization: Bearer {api-token}'
--header 'Content-Type: application/json'
api-tokenの権限は「Cloudflare Workers を編集する」で作成しておけば取得できます。
Queue GraphQL URL内の「Get average Queue backlog over time period」を取得する場合、下記内容のjsonファイルを作成します。
$ vi graphql-queue.json
{
"query":
"{ viewer { accounts(filter: {accountTag: $accountTag}) { queueBacklogAdaptiveGroups( limit: 10000 filter: $filter ) { avg { messages bytes } } } } }",
"variables": {
"accountTag":"xxxxxxxxxxxx",
"filter": {
"queueId": "xxxxxxxxxxxxxxx",
"datetime_geq": "2024-01-01T00:00:00Z",
"datetime_leq": "2024-01-05T23:59:59Z"
}
}
}
accountTagにはCloudflareのaccountIdを入れます。
下記コマンドを実行すれば値が取得できますので参考になれば幸いです。
$ cat graphql-queue.json | tr -d 'n' | curl
-X POST
-H "X-Auth-Email: xxx@xxxxx.net"
-H "X-Auth-key: xxxxxxxxxxxxxxxxxxxxxxxxxxx"
-s
-d @-
https://api.cloudflare.com/client/v4/graphql/;
最後に
Workersと組み合わせて使用する必要はありますが、Queueの使い方は色々ありますし、これ以外にも動作を実装できますので楽しみな機能だと思います。
現在Queueの機能は制限が設定されており、メッセージの最大保存期間が4日間などになっておりますので使用する際に確認しておくと良いです。
https://developers.cloudflare.com/queues/platform/limits/
そしてCloudflareなら上記以外でも、
・パフォーマンスの向上
・サイトの信頼性の向上
・セキュリティの向上
が、一つのサービスで実現できますので非常におすすめです。
Cloudflareに、アクセリアの運用サポートをプラスしたCDNサービスを提供しています
移行支援によるスムーズな導入とともに、お客様の運用負担を最小限にとどめながら、WEBサイトのパフォーマンスとセキュリティを最大限に高めます。運用サポートはフルアウトソーシングからミニマムサポートまで、ご要望に合わせてご提供します。
Cloudflare(クラウドフレア)の導入や運用について、またそれ以外のことでもなにか気になることがございましたらお気軽にご相談下さい。
サービスにご興味をお持ちの方は
お気軽にお問い合わせください。
Webからお問い合わせ
お問い合わせお電話からお問い合わせ
平日09:30 〜 18:00
Free Service