03-5211-7750 平日|09:30~18:00

【Cloudflareのメッセージキュー】Queuesを試してみました

検討に役立つお役立ち資料を無料でご利用いただけます

お役立ち資料をダウンロードFREE

はじめに

前から気になっていたのですが、後回しになってしまっていたCloudflareのQueues機能を検証してみました。

Cloudflare公式URL:https://developers.cloudflare.com/queues/

2024/1現在ではまだベータ版の機能になります。
queuesキャプチャ

Workersから連携することでQueueを使用することができます。

Queuesの動作検証

ブラウザからアクセスしてブラウザには「Success!」という文字列を返し、アクセスの情報をQueue側に渡して、ログとしてR2に保存させる処理を実装してみます。

R2作成


CloudflareのR2のバケットを作成します。GUIでも作成できますが今回はAPIを使用してバケットの作成を行います。

$ curl --request POST \
--url https://api.cloudflare.com/client/v4/accounts/{account_id}/r2/buckets \
--header 'Authorization: Bearer {api-token}' \

--header 'Content-Type: application/json' \
--data '{"name": "queues-test-bucket"}'


※api-tokenはR2の画面の「R2 API トークンの管理」から作成できます。今回の権限は「管理者読み取りと書き込み」で作成しました。

レスポンスで「success:true」なことを確認します。

{"success":true,"errors":[],"messages":[],"result":{"name":"queues-test-bucket","creation_date":"2024-01-07T03:57:48.224Z","location":"APAC"}}%


GUI(Cloudflare -> アカウント -> R2)からバケットが作成されていることも確認します。


Workers作成


下記コマンドでWorkersのコードを作成します。

$ npm create cloudflare@latest test-queues


下記内容で作成しております。他はデフォルト。
├ What type of application do you want to create?
│ type Queue consumer & producer Worker

├ Do you want to use TypeScript?
│ yes typescript


wrangler.tomlを修正します。
name = "test-queues"
main = "src/index.ts"
compatibility_date = "2023-12-18"

# Bind a Queue producer. Use this binding to schedule an arbitrary task that may be processed later by a Queue consumer.

# Docs: https://developers.cloudflare.com/queues/get-started
[[queues.producers]]
binding = "TEST_MY_QUEUE"
queue = "test-my-queue"


# Bind a Queue consumer. Queue Consumers can retrieve tasks scheduled by Producers to act on them.
# Docs: https://developers.cloudflare.com/queues/get-started

[[queues.consumers]]
queue = "test-my-queue"
# Optional: Configure batching and retries: https://developers.cloudflare.com/queues/learning/batching-retries/

max_batch_size = 10
max_batch_timeout = 30
max_retries = 5
dead_letter_queue = "test-my-queue-dlq"

[[r2_buckets]]

bucket_name = "queues-test-bucket"
binding = "QUEUES_TEST_BUCKET"


項目の補足

max_batch_size  -> 各バッチで許可されるメッセージの最大数
max_batch_timeout -> バッチがいっぱいになるまで待機する最大秒数
max_retries -> メッセージが失敗した場合、または呼び出された場合のメッセージの最大再試行回数
dead_letter_queue -> 何度か処理に失敗した場合にメッセージを送信する別のキューの名前。これが定義されていない場合、処理に繰り返し失敗するメッセージは最終的に破棄されます。


Cloudflareにログインします。

$ wrangler login


ログインできているか確認します。

$ wrangler whoami


src/index.tsを修正します。

export interface Env {
TEST_MY_QUEUE: Queue<any>;
QUEUES_TEST_BUCKET: R2Bucket;
}

export default {
async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise<Response> {
try {
let log = {
url: request.url,
method: request.method,
headers: Object.fromEntries(request.headers),
};
await env.TEST_MY_QUEUE.send(log);
return new Response('Success!');
} catch (err) {
console.error('An error occurred:', err);
return new Response('Internal Server Error', { status: 500 });
}
},
async queue(batch: MessageBatch<any>, env: Env): Promise<void> {
for (const msg of batch.messages) {

// TODO: do something with the message
msg.ack()
}

const data = JSON.stringify(batch.messages);
await env.QUEUES_TEST_BUCKET.put(`file-${Date.now()}.json`, data, {
httpMetadata: {
contentType: 'application/json',
},
});
},
};


Queue作成


queueを作成します。
$ wrangler queues create test-my-queue
$ wrangler queues create test-my-queue-dlq


コマンドでエラーが出てないことを確認して、
GUI(Cloudflare -> アカウント -> Workers & Pages -> Queues)からQueueが作成されたか確認します。


Cloudflareにdeploy


ここまでできたらWorkersをCloudflareにDeployします。

$ wrangler deploy


GUI(Cloudflare -> アカウント -> Workers & Pages)からWorkersが作成されたことを確認します。



また、Queueを確認してみるとtest-my-queueのステータスがアクティブになっていることを確認します。
WorkersでQueueがbindされると、こちらの項目がアクティブになります。



動作確認


ここまで準備ができたのでWorkersのプレビューのURLからアクセスを行ってみます。
※プレビューのURL:******.workers.dev
作成したWorkersの画面から確認ができます。

「Success!」と表示されるかと思います。

queueの設定で下記のようにしたので、30秒待つか、10件queueが溜まるまでqueueの処理は実行されません。
  max_batch_size = 10
max_batch_timeout = 30


上記条件が満たされたら作成したR2のバケットを見てみましょう。

ファイルが作成されていることを確認できました。
ダウンロードして中身を確認してみると、アクセス情報がjsonで入っており正常に動作できてることを確認できました。

GUI(Cloudflare -> アカウント -> Workers & Pages -> Queues)から該当のQueueの情報を確認することもできます。




今回は検証という形で簡単な動作を試してみました。

ただ、queueの処理をミスした場合のためにdead_letter_queueを指定してあるのですが、こちらをうまく動作させることができませんでした。
検証の仕方が悪かったかもしれないのですが、まだベータ版ということもあり、また今度試してみようと思います。

GraphQL


GUIからQueueのグラフは確認できますが、CloudflareのGraphQLを使用してQueueの値を取得することも可能です。

Queue GraphQL URL: https://developers.cloudflare.com/queues/reference/metrics/

まず、対象のQueueのqueueIdを調べる必要があります。
これはAPIを使用して確認が可能です。
$ curl --request GET \
--url https://api.cloudflare.com/client/v4/accounts/{account_id}/workers/queues \
--header 'Authorization: Bearer {api-token}' \
--header 'Content-Type: application/json'


api-tokenの権限は「Cloudflare Workers を編集する」で作成しておけば取得できます。

Queue GraphQL URL内の「Get average Queue backlog over time period」を取得する場合、下記内容のjsonファイルを作成します。

$ vi graphql-queue.json

{
"query":
"{ viewer { accounts(filter: {accountTag: $accountTag}) { queueBacklogAdaptiveGroups( limit: 10000 filter: $filter ) { avg { messages bytes } } } } }",
"variables": {
"accountTag":"xxxxxxxxxxxx",
"filter": {
"queueId": "xxxxxxxxxxxxxxx",
"datetime_geq": "2024-01-01T00:00:00Z",
"datetime_leq": "2024-01-05T23:59:59Z"
}
}
}


accountTagにはCloudflareのaccountIdを入れます。

下記コマンドを実行すれば値が取得できますので参考になれば幸いです。
$ cat graphql-queue.json | tr -d '\n' | curl \
-X POST \
-H "X-Auth-Email: xxx@xxxxx.net" \
-H "X-Auth-key: xxxxxxxxxxxxxxxxxxxxxxxxxxx" \
-s \
-d @- \
https://api.cloudflare.com/client/v4/graphql/;

最後に

今回はCloudflareのQueueの検証をしてみました。
Workersと組み合わせて使用する必要はありますが、Queueの使い方は色々ありますし、これ以外にも動作を実装できますので楽しみな機能だと思います。

現在Queueの機能は制限が設定されており、メッセージの最大保存期間が4日間などになっておりますので使用する際に確認しておくと良いです。

https://developers.cloudflare.com/queues/platform/limits/

そしてCloudflareなら上記以外でも、

 ・パフォーマンスの向上
 ・サイトの信頼性の向上
 ・セキュリティの向上

が、一つのサービスで実現できますので非常におすすめです。

Cloudflareに、アクセリアの運用サポートをプラスしたCDNサービスを提供しています

アクセリア自社CDNの開発と運用は、20年以上にわたります。それらの経験とノウハウを駆使したプロフェッショナルサポートをパッケージしたサービスが、[Solution CDN]です。
移行支援によるスムーズな導入とともに、お客様の運用負担を最小限に​とどめながら、WEBサイトのパフォーマンスとセキュリティを最大限に高めます。運用サポートはフルアウトソーシングからミニマムサポートまで、ご要望に合わせてご提供します。

Cloudflare(クラウドフレア)の導入や運用について、またそれ以外のことでもなにか気になることがございましたらお気軽にご相談下さい。

Solution CDNサービスバナー

杉木 俊文

アクセリア株式会社 サービス事業本部ネットワーク運用部 所属
Contact usお問い合わせ

サービスにご興味をお持ちの方は
お気軽にお問い合わせください。

Webからお問い合わせ

お問い合わせ

お電話からお問い合わせ

03-5211-7750

平日09:30 〜 18:00

Download資料ダウンロード

製品紹介やお役立ち資料を無料でご活用いただけます。

Magazineメルマガ登録

最新の製品情報などタイムリーな情報を配信しています。

Free Service

PageSpeed Insights シミュレータ

CDNによるコンテンツの最適化を行った場合のPageSpeed Insightsのスコアをシミュレートしてレポートします。