【Cloudflareのステートマシーン】Workers Workflowsの動作検証ーD1も合わせて使用して処理を試行

はじめに
Cloudflare doc : https://developers.cloudflare.com/workflows/
Workers Workflowsは[step]という機能を使用してstep内で処理が失敗した場合は、状態を維持したまま再試行を実行させたり、[sleep]を実行してstep間で待機させたりできます。
処理を次のstepに安全に進めたり、失敗が続くようなら処理を安全に止めたりできる機能です。
Workers Workflowsの動作検証
クイックスタート
Cloudflareのドキュメントにガイドやクイックスタートが載っていますので、こちらを最初に試してみると理解がより早いかと思います。
Cloudflare doc : https://developers.cloudflare.com/workflows/get-started/guide/
検証簡易処理フロー
CloudflareのD1も合わせて使用して処理を試してみます。

各処理の概要
check parameter
リクエストに必要なパラメータが含まれているか確認
check user
D1にパラメータで渡されたnameが存在するか確認
get api data
CloudflareのAPIからIP情報を取得し、その配列の中からランダムで一つを取得
Cloudflare API URL: https://api.cloudflare.com/client/v4/ips
register or update user
該当のユーザがいなければ新規登録し、該当のユーザがいなければdataカラムとupdated_atカラムを更新
※dataカラムには「get api data」で取得したデータを登録
※updated_atカラムには更新した時間を登録
D1作成
![]() | 参考コラム:【初心者向け】CloudflareでサーバーレスデータベースD1を使ってみよう |
$ npx wrangler d1 create d1-test-workers-workflows
作成したD1にusersテーブルを作成します。
$ npx wrangler d1 execute d1-test-workers-workflows --command ""CREATE TABLE users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
data TEXT UNIQUE NOT NULL,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);"" --remote
Cloudflareの画面からD1が作成されたことを確認します。
・Database

・Table

・Tableのカラム

Workers Workflows作成
wranglerでCloudflareにログイン実施
$ npx wrangler login
$ npx wrangler whoami
Workersを作成
test-workers-workflowsという名前で作成します。
$ npm create cloudflare@latest test-workers-workflows -- --template "cloudflare/workflows-starter"
今回は動作検証なのでgitとdeployは「no」にしています。
………..
├ Do you want to use git for version control?
│ no git
│
╰ Application configured
╭ Deploy with Cloudflare Step 3 of 3
│
├ Do you want to deploy your application?
│ no deploy via `npm run deploy`
………..
test-workflowsディレクトリが作成されますので、移動します。
$ cd test-workers-workflows
index.tsの内容
`/src/index.ts`を下記内容に修正します。
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers';
import { NonRetryableError } from 'cloudflare:workflows';
type Env = {
MY_WORKFLOW: Workflow<Params>;
DB: D1Database;
};
interface Params {
user: string;
}
export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
await step.do('check parameter', async () => {
if (!event.payload.user) {
throw new NonRetryableError("Missing required parameters: 'user'.");
}
return {
inputParams: event.payload,
};
});
const userCheckResult = await step.do(
'check user',
{
retries: {
limit: 5,
delay: '5 second',
backoff: 'exponential',
},
timeout: '1 minutes',
},
async () => {
const userName = event.payload.user;
const { results } = await this.env.DB.prepare('SELECT * FROM users WHERE name = ?').bind(userName).all();
const serializableResults = JSON.parse(JSON.stringify(results));
return { userExists: serializableResults.length > 0, userData: serializableResults };
}
);
const apiData = await step.do(
'get api data',
{
retries: {
limit: 5,
delay: '5 second',
backoff: 'exponential',
},
timeout: '1 minutes',
},
async () => {
let resp = await fetch('https://api.cloudflare.com/client/v4/ips');
let data = await resp.json<any>();
const ipv4List: string[] = data.result.ipv4_cidrs;
const randomIpv4 = ipv4List[Math.floor(Math.random() * ipv4List.length)];
return {
randomIp: randomIpv4,
};
}
);
await step.do(
'register or update user',
{
retries: {
limit: 3,
delay: '10 second',
backoff: 'exponential',
},
timeout: '30 seconds',
},
async () => {
const userName = event.payload.user;
const randomIp = apiData.randomIp;
const { userExists, userData } = userCheckResult;
console.log(`Before Update:`, userData);
if (!userExists) {
await this.env.DB.prepare('INSERT INTO users (name, data, created_at) VALUES (?, ?, ?)')
.bind(userName, randomIp, this.getCurrentTimestamp())
.run();
return { status: 'User created', user: userName, data: randomIp };
} else {
await this.env.DB.prepare('UPDATE users SET data = ?, updated_at = ? WHERE name = ?')
.bind(randomIp, this.getCurrentTimestamp(), userName)
.run();
return {
status: 'User updated',
user: userName,
previousData: userData,
newData: randomIp,
updatedAt: this.getCurrentTimestamp(),
};
}
}
);
}
getCurrentTimestamp(): string {
const now = new Date();
const year = now.getFullYear();
const month = String(now.getMonth() + 1).padStart(2, '0');
const day = String(now.getDate()).padStart(2, '0');
const hours = String(now.getHours()).padStart(2, '0');
const minutes = String(now.getMinutes()).padStart(2, '0');
const seconds = String(now.getSeconds()).padStart(2, '0');
return `${year}-${month}-${day} ${hours}:${minutes}:${seconds}`;
}
}
export default {
async fetch(req: Request, env: Env): Promise<Response> {
if (req.method !== 'POST') {
return new Response('Only POST requests are allowed', { status: 405 });
}
const body = await req.json().catch(() => null);
if (!body) {
return new Response('Invalid JSON body', { status: 400 });
}
const params: Params = body as Params;
const options: WorkflowInstanceCreateOptions<Params> = { params };
let instance = await env.MY_WORKFLOW.create(options);
return Response.json({
result: 'ok',
instanceId: instance.id,
receivedData: body,
});
},
};
コードの一部解説
処理フロー自体は上部に画像がありますのでご参照ください。
下記はWorkflowsを書いた際に調べた箇所の一部になりますので今後書く人の参考になれば幸いです。
リトライなしでの終了
if (!event.payload.user) {
throw new NonRetryableError("Missing required parameters: 'user'.");
}
リトライ処理を入れないとデフォルトのリトライ処理が適用されます。
Cloudflare doc :https://developers.cloudflare.com/workflows/build/sleeping-and-retrying/#retry-steps
リトライさせる必要がない場所はNonRetryableErrorを使用することで処理を終了させられます。
step内でのparameterの受け取り
return {
inputParams: event.payload,
};
渡したparameterはevent.payloadに格納されます。
D1の参照方法
const { results } = await this.env.DB.prepare('SELECT * FROM users WHERE name = ?').bind(userName).all();
env は this.env として参照する必要があります。
returnの形式
const serializableResults = JSON.parse(JSON.stringify(results));
データを JSON に変換できる形式でないとWorkflowsは返せないのでjsonに変換しています。
paramterの渡し方
const options: WorkflowInstanceCreateOptions<Params> = { params };
let instance = await env.MY_WORKFLOW.create(options);
Workers Workflows の create() は WorkflowInstanceCreateOptions
リトライ処理の指定
{
retries: {
limit: 5, //ステップの合計試行回数
delay: '5 second', //試行間の遅延
backoff: 'exponential', //リトライ間隔
},
timeout: '15 minutes', //処理がエラーだと判断するタイムアウト時間
}
backoffに関しては「constant」、「linear」、「exponential」が指定できます。
◆constant
delayで指定した秒数の間隔(下記画像だと3秒)でリトライを実行します。

◆linear
1回目:「delayで指定した秒数」(下記画像だと3秒)を待ってからリトライを実行。
2回目:3秒+3秒を待ってからリトライを実行。
3回目:3秒+3秒+3秒を待ってからリトライを実行します。
再試行毎に待ち時間が長くなります。

◆exponential
1回目:「delayで指定した秒数」(下記画像だと3秒)を待ってからリトライを実行。
2回目:(3秒 * 2)を待ってからリトライを実行。
3回目:((3秒 * 2) * 2)を待ってからリトライを実行します。
指数バックオフの間隔で再試行毎に待ち時間が長くなります。

環境に合わせて最適な間隔を選べます。
wrangler.jsoncを編集
`wrangler.jsonc`のworkersとworkflowsのnameを編集します。この名前で作成されます。
{D1に接続するので下記を追加します。
……
"name": "test-workers-workflows",
……
"workflows": [
{
"name": "test-workflows",
……
}
],
}
"d1_databases": [
{
"binding": "DB",
"database_name": "d1-test-workers-workflows",
"database_id": <d1-test-workers-workflowsのID>
}
]
deploy
今までの内容でCloudflareに適用してみます。
$ npx wrangler deploy
Cloudflareの画面からWorkersとWorkflowsが作成されたことが確認できました。
[アカウント(任意) -> コンピューティング(Workers) -> Workers & Pages]

[アカウント(任意) -> コンピューティング(Workers) -> ワークフロー]

Workflowsは下記コマンドでも確認可能です。
$ npx wrangler workflows list

Workflows動作確認
下記コマンドでWorkflowsをtriggerできます。
$npx wrangler workflows trigger test-workflows '{
"user": "user01"
}'

Cloudflareの画面からWorkflowsを開くと動いたのが確認できました。

処理の一覧も確認できます。

処理自体の内容も、クリックすると確認できます。

下記コマンドでも詳細を確認することができます。
$ npx wrangler@latest workflows instances describe test-workflows latest
or
$ npx wrangler@latest workflows instances describe test-workflows 該当のworkflowsのid

また、WorkersのURLにリクエストを送ってもWorkflowsを起動できます。
curl -X POST test-workers-workflows.xxxxxxxxxxxxxx.workers.dev
-H "Content-Type: application/json"
--data '{
"user": "user01"
}'
Workflowsの実行結果などはCloudflareの画面からグラフで確認することもできます。

D1側にも下記コマンドでデータが登録されたことを確認できました。
$ npx wrangler d1 execute d1-test-workers-workflows --command "SELECT * FROM users;" --remote

リトライの確認
Cloudflareのguideに載っている50%の確率で失敗する処理を全てのasync()内に入れて試してみます。
Cloudflare guide url: https://developers.cloudflare.com/workflows/get-started/guide/#1-define-your-workflow
if (Math.random() > 0.5) {
throw new Error('test error 01');
}
deploy
上記を加えたらCloudflareに適用してみます。
$ npx wrangler deploy
Workflows実行
curlコマンドでリクエストを送り、Workflowsを実行させてみます。
curl -X POST test-workers-workflows.xxxxxxxxxxxxxx.workers.dev
-H "Content-Type: application/json"
--data '{
"user": "user01"
}'
下記コマンドで結果を確認してみます。
※50%で各処理失敗するようになってるので動作確認する際には何回か試してください。
$ npx wrangler@latest workflows instances describe test-workflow <WorkflowsのID>

エラーが出た場合、リトライが実行されていることが確認できました。
Workflowsの記述ルール
Cloudflare doc : https://developers.cloudflare.com/workflows/build/rules-of-workflows/
・処理の冪等性の確保
・stepは細かく設定すること
・step外の状態に依存しない
など
他のプログラムを書くときでも参考にできそうな内容ですので、こちらは何度か目を通して理解しておくことをお勧めいたします。
Workers Workflowsのメリット
自前で実装するよりWorkflowsを使用して書いた方がシンプルに書け、複雑性が少なくなるメリットがあると感じました。
またsleepの時間も長く書けるのもWorkflowsならではと思ってます。
そして、Cloudflareの画面やコマンドからどこまで処理が進んで失敗したのかや、何回失敗したのかなどが視覚化されているので原因調査がしやすいです。
上記から開発側も運用側も楽になる機能になっていると感じました。
最後に
Workers Workflowsは、通常のWorkersと同一でCPU時間とリクエスト数で決まるようで無料ではありません。
価格: https://developers.cloudflare.com/workflows/reference/pricing/
2025/03段階ではまだオープンベータの機能となっておりますので、対応してない機能などもあります。
ひとつ見つけたのですが、現状ではWorkflowsの削除はできないようです。
(Cloudflareの画面からも削除の項目は見つけられませんでした。)
$ npx wrangler workflows delete workflows名

本番リリースされれば上記も解消されると思います。
またもっと機能も追加されたり、制限も増加されると思いますので本番リリースされたらまた検証を行っていきたいと思っています。
今回紹介した動作以外にもCloudflareの公式サイトにて、チュートリアルやサンプルの紹介も載っていますので参考にしてみてください。
チュートリアルURL: https://developers.cloudflare.com/workflows/tutorials/
サンプルURL: https://developers.cloudflare.com/workflows/examples/
そしてCloudflareなら上記以外でも、
・パフォーマンスの向上
・サイトの信頼性の向上
・セキュリティの向上
が、一つのサービスで実現できますので非常におすすめです。
Cloudflareに、アクセリアの運用サポートをプラスしたCDNサービスを提供しています
移行支援によるスムーズな導入とともに、お客様の運用負担を最小限にとどめながら、WEBサイトのパフォーマンスとセキュリティを最大限に高めます。運用サポートはフルアウトソーシングからミニマムサポートまで、ご要望に合わせてご提供します。
Cloudflare(クラウドフレア)の導入や運用について、またそれ以外のことでもなにか気になることがございましたらお気軽にご相談下さい。
サービスにご興味をお持ちの方は
お気軽にお問い合わせください。
Webからお問い合わせ
お問い合わせお電話からお問い合わせ
平日09:30 〜 18:00
Free Service