03-5211-7750 平日|09:30~18:00

【Cloudflareのステートマシーン】Workers Workflowsの動作検証ーD1も合わせて使用して処理を試行

           

サービス資料や
ホワイトペーパーはこちら

           資料を【無料】ダウンロードFREE

はじめに

Workers Workflowsが2024年にオープンベータになっていたのですが、こちらの機能がどういったものなのか理解していませんでしたので、この機会に検証を行ってみました。

Cloudflare doc : https://developers.cloudflare.com/workflows/

Workers Workflowsは[step]という機能を使用してstep内で処理が失敗した場合は、状態を維持したまま再試行を実行させたり、[sleep]を実行してstep間で待機させたりできます。
処理を次のstepに安全に進めたり、失敗が続くようなら処理を安全に止めたりできる機能です。

【相談無料】Cloudflareの導入や運用について、こちらからご相談いただけます ✉️

Workers Workflowsの動作検証

クイックスタート


Cloudflareのドキュメントにガイドやクイックスタートが載っていますので、こちらを最初に試してみると理解がより早いかと思います。

Cloudflare doc : https://developers.cloudflare.com/workflows/get-started/guide/

検証簡易処理フロー


CloudflareのD1も合わせて使用して処理を試してみます。

各処理の概要


check parameter
リクエストに必要なパラメータが含まれているか確認

check user
D1にパラメータで渡されたnameが存在するか確認

get api data
CloudflareのAPIからIP情報を取得し、その配列の中からランダムで一つを取得

Cloudflare API URL: https://api.cloudflare.com/client/v4/ips

register or update user
該当のユーザがいなければ新規登録し、該当のユーザがいなければdataカラムとupdated_atカラムを更新

※dataカラムには「get api data」で取得したデータを登録
※updated_atカラムには更新した時間を登録

D1作成


 参考コラム:【初心者向け】CloudflareでサーバーレスデータベースD1を使ってみよう
$ npx wrangler d1 create d1-test-workers-workflows

作成したD1にusersテーブルを作成します。

$ npx wrangler d1 execute d1-test-workers-workflows --command ""CREATE TABLE users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
data TEXT UNIQUE NOT NULL,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);"" --remote

Cloudflareの画面からD1が作成されたことを確認します。

・Database


・Table


・Tableのカラム


Workers Workflows作成


wranglerでCloudflareにログイン実施


$ npx wrangler login
$ npx wrangler whoami

Workersを作成


test-workers-workflowsという名前で作成します。
$ npm create cloudflare@latest test-workers-workflows -- --template "cloudflare/workflows-starter"

今回は動作検証なのでgitとdeployは「no」にしています。
………..
├ Do you want to use git for version control?
│ no git

╰ Application configured

╭ Deploy with Cloudflare Step 3 of 3

├ Do you want to deploy your application?
│ no deploy via `npm run deploy`
………..

test-workflowsディレクトリが作成されますので、移動します。
$ cd test-workers-workflows


index.tsの内容


`/src/index.ts`を下記内容に修正します。
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers';
import { NonRetryableError } from 'cloudflare:workflows';

type Env = {
MY_WORKFLOW: Workflow<Params>;
DB: D1Database;
};

interface Params {
user: string;
}

export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
await step.do('check parameter', async () => {
if (!event.payload.user) {
throw new NonRetryableError("Missing required parameters: 'user'.");
}

return {
inputParams: event.payload,
};
});

const userCheckResult = await step.do(
'check user',
{
retries: {
limit: 5,
delay: '5 second',
backoff: 'exponential',
},
timeout: '1 minutes',
},
async () => {
const userName = event.payload.user;

const { results } = await this.env.DB.prepare('SELECT * FROM users WHERE name = ?').bind(userName).all();

const serializableResults = JSON.parse(JSON.stringify(results));

return { userExists: serializableResults.length > 0, userData: serializableResults };
}
);

const apiData = await step.do(
'get api data',
{
retries: {
limit: 5,
delay: '5 second',
backoff: 'exponential',
},
timeout: '1 minutes',
},
async () => {
let resp = await fetch('https://api.cloudflare.com/client/v4/ips');
let data = await resp.json<any>();

const ipv4List: string[] = data.result.ipv4_cidrs;
const randomIpv4 = ipv4List[Math.floor(Math.random() * ipv4List.length)];
return {
randomIp: randomIpv4,
};
}
);

await step.do(
'register or update user',
{
retries: {
limit: 3,
delay: '10 second',
backoff: 'exponential',
},
timeout: '30 seconds',
},
async () => {
const userName = event.payload.user;
const randomIp = apiData.randomIp;

const { userExists, userData } = userCheckResult;

console.log(`Before Update:`, userData);

if (!userExists) {
await this.env.DB.prepare('INSERT INTO users (name, data, created_at) VALUES (?, ?, ?)')
.bind(userName, randomIp, this.getCurrentTimestamp())
.run();
return { status: 'User created', user: userName, data: randomIp };
} else {
await this.env.DB.prepare('UPDATE users SET data = ?, updated_at = ? WHERE name = ?')
.bind(randomIp, this.getCurrentTimestamp(), userName)
.run();
return {
status: 'User updated',
user: userName,
previousData: userData,
newData: randomIp,
updatedAt: this.getCurrentTimestamp(),
};
}
}
);
}
getCurrentTimestamp(): string {
const now = new Date();
const year = now.getFullYear();
const month = String(now.getMonth() + 1).padStart(2, '0');
const day = String(now.getDate()).padStart(2, '0');
const hours = String(now.getHours()).padStart(2, '0');
const minutes = String(now.getMinutes()).padStart(2, '0');
const seconds = String(now.getSeconds()).padStart(2, '0');
return `${year}-${month}-${day} ${hours}:${minutes}:${seconds}`;
}
}

export default {
async fetch(req: Request, env: Env): Promise<Response> {
if (req.method !== 'POST') {
return new Response('Only POST requests are allowed', { status: 405 });
}

const body = await req.json().catch(() => null);
if (!body) {
return new Response('Invalid JSON body', { status: 400 });
}

const params: Params = body as Params;

const options: WorkflowInstanceCreateOptions<Params> = { params };

let instance = await env.MY_WORKFLOW.create(options);

return Response.json({
result: 'ok',
instanceId: instance.id,
receivedData: body,
});
},
};

コードの一部解説


処理フロー自体は上部に画像がありますのでご参照ください。
下記はWorkflowsを書いた際に調べた箇所の一部になりますので今後書く人の参考になれば幸いです。

リトライなしでの終了
if (!event.payload.user) {
throw new NonRetryableError("Missing required parameters: 'user'.");
}

リトライ処理を入れないとデフォルトのリトライ処理が適用されます。
Cloudflare doc :https://developers.cloudflare.com/workflows/build/sleeping-and-retrying/#retry-steps
リトライさせる必要がない場所はNonRetryableErrorを使用することで処理を終了させられます。

step内でのparameterの受け取り
return {
inputParams: event.payload,
};

渡したparameterはevent.payloadに格納されます。

D1の参照方法
const { results } = await this.env.DB.prepare('SELECT * FROM users WHERE name = ?').bind(userName).all();

env は this.env として参照する必要があります。

returnの形式
const serializableResults = JSON.parse(JSON.stringify(results));

データを JSON に変換できる形式でないとWorkflowsは返せないのでjsonに変換しています。

paramterの渡し方
const options: WorkflowInstanceCreateOptions<Params> = { params };

let instance = await env.MY_WORKFLOW.create(options);

Workers Workflows の create() は WorkflowInstanceCreateOptions 型のデータを受け取るようになっていますので、WorkflowInstanceCreateOptions の形式に合わせる必要があります。

リトライ処理の指定
{
retries: {
limit: 5, //ステップの合計試行回数
delay: '5 second', //試行間の遅延
backoff: 'exponential', //リトライ間隔
},
timeout: '15 minutes', //処理がエラーだと判断するタイムアウト時間
}

backoffに関しては「constant」、「linear」、「exponential」が指定できます。

◆constant
delayで指定した秒数の間隔(下記画像だと3秒)でリトライを実行します。

◆linear
 1回目:「delayで指定した秒数」(下記画像だと3秒)を待ってからリトライを実行。
 2回目:3秒+3秒を待ってからリトライを実行。
 3回目:3秒+3秒+3秒を待ってからリトライを実行します。
再試行毎に待ち時間が長くなります。

◆exponential
 1回目:「delayで指定した秒数」(下記画像だと3秒)を待ってからリトライを実行。
 2回目:(3秒 * 2)を待ってからリトライを実行。
 3回目:((3秒 * 2) * 2)を待ってからリトライを実行します。
指数バックオフの間隔で再試行毎に待ち時間が長くなります。

環境に合わせて最適な間隔を選べます。

wrangler.jsoncを編集


`wrangler.jsonc`のworkersとworkflowsのnameを編集します。この名前で作成されます。
{
……
"name": "test-workers-workflows",
……
"workflows": [
{
"name": "test-workflows",
……
}
],
}
D1に接続するので下記を追加します。
	"d1_databases": [
{
"binding": "DB",
"database_name": "d1-test-workers-workflows",
"database_id": <d1-test-workers-workflowsのID>
}
]

deploy


今までの内容でCloudflareに適用してみます。
$ npx wrangler deploy

Cloudflareの画面からWorkersとWorkflowsが作成されたことが確認できました。

[アカウント(任意) -> コンピューティング(Workers) -> Workers & Pages]

[アカウント(任意) -> コンピューティング(Workers) -> ワークフロー]

Workflowsは下記コマンドでも確認可能です。
$ npx wrangler workflows list



Workflows動作確認


下記コマンドでWorkflowsをtriggerできます。
$npx wrangler workflows trigger test-workflows '{
"user": "user01"
}'

Cloudflareの画面からWorkflowsを開くと動いたのが確認できました。

処理の一覧も確認できます。

処理自体の内容も、クリックすると確認できます。

下記コマンドでも詳細を確認することができます。
$ npx wrangler@latest workflows instances describe test-workflows latest
or
$ npx wrangler@latest workflows instances describe test-workflows 該当のworkflowsのid

また、WorkersのURLにリクエストを送ってもWorkflowsを起動できます。
curl -X POST test-workers-workflows.xxxxxxxxxxxxxx.workers.dev 
-H "Content-Type: application/json"
--data '{
"user": "user01"
}'

Workflowsの実行結果などはCloudflareの画面からグラフで確認することもできます。

D1側にも下記コマンドでデータが登録されたことを確認できました。
$ npx wrangler d1 execute d1-test-workers-workflows --command "SELECT * FROM users;" --remote


リトライの確認


Cloudflareのguideに載っている50%の確率で失敗する処理を全てのasync()内に入れて試してみます。

Cloudflare guide url: https://developers.cloudflare.com/workflows/get-started/guide/#1-define-your-workflow
if (Math.random() > 0.5) {
throw new Error('test error 01');
}

deploy


上記を加えたらCloudflareに適用してみます。
$ npx wrangler deploy

Workflows実行


curlコマンドでリクエストを送り、Workflowsを実行させてみます。
curl -X POST test-workers-workflows.xxxxxxxxxxxxxx.workers.dev 
-H "Content-Type: application/json"
--data '{
"user": "user01"
}'

下記コマンドで結果を確認してみます。
※50%で各処理失敗するようになってるので動作確認する際には何回か試してください。
$ npx wrangler@latest workflows instances describe test-workflow <WorkflowsのID>


エラーが出た場合、リトライが実行されていることが確認できました。

Workflowsの記述ルール

Workflowsのstepのコードの書き方には気をつけるべきルールが公式にまとめてあります。

Cloudflare doc : https://developers.cloudflare.com/workflows/build/rules-of-workflows/

・処理の冪等性の確保
・stepは細かく設定すること
・step外の状態に依存しない
など

他のプログラムを書くときでも参考にできそうな内容ですので、こちらは何度か目を通して理解しておくことをお勧めいたします。

Workers Workflowsのメリット

最初はWorkflowsの機能は頑張れば自前でも実装できるかもと感じていたのですが、
自前で実装するよりWorkflowsを使用して書いた方がシンプルに書け、複雑性が少なくなるメリットがあると感じました。
またsleepの時間も長く書けるのもWorkflowsならではと思ってます。

そして、Cloudflareの画面やコマンドからどこまで処理が進んで失敗したのかや、何回失敗したのかなどが視覚化されているので原因調査がしやすいです。

上記から開発側も運用側も楽になる機能になっていると感じました。

最後に

簡単ではありますがWorkers Workflowsの検証をさせていただきました。

Workers Workflowsは、通常のWorkersと同一でCPU時間とリクエスト数で決まるようで無料ではありません。

価格: https://developers.cloudflare.com/workflows/reference/pricing/

2025/03段階ではまだオープンベータの機能となっておりますので、対応してない機能などもあります。
ひとつ見つけたのですが、現状ではWorkflowsの削除はできないようです。
(Cloudflareの画面からも削除の項目は見つけられませんでした。)
$ npx wrangler workflows delete workflows名

本番リリースされれば上記も解消されると思います。
またもっと機能も追加されたり、制限も増加されると思いますので本番リリースされたらまた検証を行っていきたいと思っています。

今回紹介した動作以外にもCloudflareの公式サイトにて、チュートリアルやサンプルの紹介も載っていますので参考にしてみてください。

チュートリアルURL: https://developers.cloudflare.com/workflows/tutorials/
サンプルURL: https://developers.cloudflare.com/workflows/examples/

そしてCloudflareなら上記以外でも、

 ・パフォーマンスの向上
 ・サイトの信頼性の向上
 ・セキュリティの向上

が、一つのサービスで実現できますので非常におすすめです。

Cloudflareに、アクセリアの運用サポートをプラスしたCDNサービスを提供しています

アクセリア自社CDNの開発と運用は、20年以上にわたります。それらの経験とノウハウを駆使したプロフェッショナルサポートをパッケージしたサービスが、[Solution CDN]です。
移行支援によるスムーズな導入とともに、お客様の運用負担を最小限に​とどめながら、WEBサイトのパフォーマンスとセキュリティを最大限に高めます。運用サポートはフルアウトソーシングからミニマムサポートまで、ご要望に合わせてご提供します。

Cloudflare(クラウドフレア)の導入や運用について、またそれ以外のことでもなにか気になることがございましたらお気軽にご相談下さい。

Cloudflareの導入・運用について ご相談いただけます。 導入に関するご相談だけでなく、運用についてもご相談ください。

杉木 俊文

アクセリア株式会社 サービス事業本部ネットワーク運用部 所属
Contact usお問い合わせ

サービスにご興味をお持ちの方は
お気軽にお問い合わせください。

Webからお問い合わせ

お問い合わせ

お電話からお問い合わせ

03-5211-7750

平日09:30 〜 18:00

Download資料ダウンロード

製品紹介やお役立ち資料を無料でご活用いただけます。

Magazineメルマガ登録

最新の製品情報などタイムリーな情報を配信しています。

Free Service

PageSpeed Insights シミュレータ

CDNによるコンテンツの最適化を行った場合のPageSpeed Insightsのスコアをシミュレートしてレポートします。