03-5211-7750 平日|09:30~18:00

サーバーレスでデータ収集から可視化まで!【Cloudflare Pipelines】検証レポート

           

サービス資料や
ホワイトペーパーはこちら

           資料を【無料】ダウンロードFREE

はじめに

Cloudflare Pipelinesというリアルタイムのデータ取り込みツールがベータ版でリリースされていました。

Cloudflare Doc:https://developers.cloudflare.com/pipelines/

Cloudflare Blog:https://blog.cloudflare.com/ja-jp/cloudflare-acquires-arroyo-pipelines-streaming-ingestion-beta/

取り込んだデータはR2に保存されます。
保存されたデータは分析ツールなどを使用して分析などに活用することもできます。

今回はCloudflare Pipelinesの動作を検証+R2に保存したデータを分析ツールを用いて視覚化してみようと思います。

【相談無料】Cloudflareの導入や運用について、こちらからご相談いただけます ✉️

Cloudflare Pipelinesの動作検証

項目の場所





Pipelines Test


構成


まずはPipelinesが動作するかテストします。

構成は、jsonデータをPOSTしてpipelinesに渡し、R2に保存するシンプルな形になります。


R2の用意


まずはデータを格納するR2バケットを以下コマンドで作成します。
$ npx wrangler r2 bucket create test-pipelines-data


作成されたか以下で確認します。(画面から確認してもOKです)
$ npx wrangler r2 bucket info test-pipelines-data


Pipelinesの用意


$ npx wrangler pipelines create test-pipelines --r2-bucket test-pipelines-data

認証が入りますので「Allow」を選択します。(R2のAPI Tokenを作成して良いか確認されます)


作成されたか以下コマンドで確認します。(画面から確認してもOKです)
$ npx wrangler pipelines list

また、R2のAPI Tokenが作成されていることも確認します。


コマンドラインで操作を行いたい方は下記ドキュメントを参考にしてください。

Cloudflare Doc:https://developers.cloudflare.com/pipelines/platform/wrangler-commands/

Pipelinesを利用してデータをPOST


まずはデータが取り込めるのかテストします。

Pipelinesのendpointを調べるために以下コマンドを実行します。
$ npx wrangler pipelines list

結果の中にendpointが出力されますのでメモしておきます。

そして以下コマンドを実行して、Pipelinesにデータが取り込まれるか確認します。
curl "https://[ ENDPOINT_URL]" -d '[{"foo": "bar"}]'

>{"success":true,"result":{"committed":1}}

作成したR2の中にデータが入ってるか確認します。
時間がかかる場合がありますので、その場合は少し待ってから確認してください。


アップロードが完了すれば、以下のようになってますのでファイルをダウンロードしてみます。


ダウンロードしたファイルを確認すると以下のようにPOSTしたデータが格納されてることが確認できます。
$ gzcat event_date=2025-07-20_hr=09_01K0KHM6CH9VTT7D0T6C6G0CHP.json.gz
{"foo":"bar"}

Pipelinesを使用してデータをR2に送れることが確認できました。

他のアプリケーションと連携


構成


次はアプリケーションと連携させてみます。
Workersでfrontendとbackendを作成し、backendからPipelinesにデータを送ります。

また、R2のデータをduckdbで取得してmetabaseで分析してみます。



backendの作成


まず、以下コマンドでWorkersを作成します。
$ npm create hono@latest test-pipelines-backend-api
> npx
> "create-hono" test-pipelines-backend-api

create-hono version 0.19.2
Using target directory … test-pipelines-backend-api
Which template do you want to use? cloudflare-workers
Do you want to install project dependencies? Yes
Which package manager do you want to use? npm
Cloning the template
Installing project dependencies
Copied project files

作成されたディレクトリに移動して、workers-typesをインストールします。
$ cd test-pipelines-backend-api/
$ npm install -D @cloudflare/workers-types

`wrangler.jsonc`を以下のように編集します。
※pipelinesのpipelinsは作成しパイプラインのnameを指定してください。
{
"$schema": "node_modules/wrangler/config-schema.json",
"name": "test-pipelines-backend-api",
"main": "src/index.ts",
"compatibility_date": "2025-07-19",
"observability": {
"enabled": true
},
"pipelines": [
{
"pipeline": "test-pipelines",
"binding": "PIPELINE"
}
]
}

`tsconfig.json`を以下のように編集します。
{
"compilerOptions": {
"target": "ESNext",
"module": "ESNext",
"moduleResolution": "Bundler",
"strict": true,
"skipLibCheck": true,
"lib": ["ESNext"],
"jsx": "react-jsx",
"jsxImportSource": "hono/jsx",
"types": ["@cloudflare/workers-types"]
}
}

`src/index.ts`を以下のように編集します。
import { Hono } from "hono";
import { cors } from "hono/cors";
import { Pipeline } from "cloudflare:pipelines";

type Bindings = {
PIPELINE: Pipeline;
};

const app = new Hono<{ Bindings: Bindings }>();

app.use(
"*",
cors({
origin: "*",
allowMethods: ["POST"],
allowHeaders: ["Content-Type"],
})
);

app.post("/api/v1/survey", async (c) => {
const body = await c.req
.json()
.catch(() => c.json({ error: "Invalid JSON" }, 400));

const requiredKeys = ["user", "q1", "q2", "q3"];
const missingKeys = requiredKeys.filter((key) => !(key in body));

if (missingKeys.length > 0) {
return c.json(
{ error: `Missing required fields: ${missingKeys.join(", ")}` },
400
);
}

const payload = {
user: body.user,
q1: body.q1,
q2: body.q2,
q3: body.q3,
};

try {
await c.env.PIPELINE.send([payload]);
return c.json({ status: "ok" });
} catch (e) {
return c.json({ error: "Failed to send to pipeline" }, 500);
}
});

app.all("/", (c) => {
return c.text("Not Allowed", 405);
});

export default app;

Cloudflareにdeployします。
$ npx wrangler deploy


frontendの作成


簡単なフロント画面を作成していきます。
$ npm create cloudflare@latest -- test-pipelines-front --framework=react

作成されたディレクトリに移動します。
$ cd test-pipelines-front

`src/App.tsx`を以下のように編集します。

リクエストをPOSTするFQDNは、backendの開発用で割り当てられる`workers.dev`のFQDNを指定してください。(本番用でも問題ありませんので、各々の環境に合わせてください。)
import { useState } from "react";

function App() {
const [formData, setFormData] = useState({
user: "",
q1: "",
q2: "",
q3: "",
});

const [message, setMessage] = useState("");

const handleChange = (e: React.ChangeEvent<HTMLInputElement>) => {
setFormData({ ...formData, [e.target.name]: e.target.value });
};

const handleSubmit = async (e: React.FormEvent) => {
e.preventDefault();

// yes/no → 1/0 に変換して送信
const mapYesNoToBinary = (value: string) =>
value === "yes" ? 1 : value === "no" ? 0 : null;

const payload = {
user: formData.user,
q1: mapYesNoToBinary(formData.q1),
q2: mapYesNoToBinary(formData.q2),
q3: mapYesNoToBinary(formData.q3),
};

const res = await fetch(
"https://xxxxxxx.workers.dev/api/v1/survey",
{
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(payload),
}
);

if (res.ok) {
setMessage("送信に成功しました!");
} else {
const err = await res.json();
setMessage(`送信エラー: ${err.error}`);
}
};

return (
<div style={{ padding: "2rem", fontFamily: "sans-serif" }}>
<h1>アンケートフォーム</h1>
<form onSubmit={handleSubmit}>
<div>
<label>
名前:
<input
type="text"
name="user"
value={formData.user}
onChange={handleChange}
required
style={{ marginLeft: "0.5rem" }}
/>
</label>
</div>

<div style={{ marginTop: "1rem" }}>
<p>質問1:サービスに満足していますか?</p>
<label>
<input
type="radio"
name="q1"
value="yes"
onChange={handleChange}
checked={formData.q1 === "yes"}
required
/>
はい
</label>
<label style={{ marginLeft: "1rem" }}>
<input
type="radio"
name="q1"
value="no"
onChange={handleChange}
checked={formData.q1 === "no"}
/>
いいえ
</label>
</div>

<div style={{ marginTop: "1rem" }}>
<p>質問2:また利用したいですか?</p>
<label>
<input
type="radio"
name="q2"
value="yes"
onChange={handleChange}
checked={formData.q2 === "yes"}
required
/>
はい
</label>
<label style={{ marginLeft: "1rem" }}>
<input
type="radio"
name="q2"
value="no"
onChange={handleChange}
checked={formData.q2 === "no"}
/>
いいえ
</label>
</div>

<div style={{ marginTop: "1rem" }}>
<p>質問3:サポートに満足しましたか?</p>
<label>
<input
type="radio"
name="q3"
value="yes"
onChange={handleChange}
checked={formData.q3 === "yes"}
required
/>
はい
</label>
<label style={{ marginLeft: "1rem" }}>
<input
type="radio"
name="q3"
value="no"
onChange={handleChange}
checked={formData.q3 === "no"}
/>
いいえ
</label>
</div>

<div style={{ marginTop: "1.5rem" }}>
<button type="submit">送信</button>
</div>

{message && <p style={{ marginTop: "1rem" }}>{message}</p>}
</form>
</div>
);
}

export default App;

Cloudflareにdeployします。
$ npm run deploy

frontendの開発用で割り当てられる`workers.dev`のFQDNにブラウザからアクセスしてみると、シンプルなアンケートフォーム画面が表示されます。


項目を埋めて送信を押すとbackendにデータがPOSTされます。


R2を確認すると、今回送信したデータが格納されています。

分析


frontend -> backend -> pipelines -> R2

の流れが上記で構築できました。

最後にR2に格納されたデータをduckdb + metabaseを使用して分析してみます。

R2 API Tokenの準備


まずはduckdbからR2に接続するために、R2 API Tokenを作成します。

Cloudflareの画面から、

R2 オブジェクト ストレージ -> {}API -> APIトークンの管理 -> User APIトークンを作成する

にて作成が可能です

アクセス許可として「オブジェクトの読み取りと書き込み」を付与して、特定のバケットにて今回作成した「test-pipelines-data」バケットを指定して作成します。


作成後に表示される「アクセス キー ID」、「シークレット アクセス キー」の値をメモしておいてください。
また、Cloudflareの「アカウントID」も必要になりますのでメモしておいてください。

duckdb、metabaseの準備


今回はdockerでduckdbとmetabaseを起動させますが、metabaseからduckdbに接続させるにはコミュニティのドライバが必要になります。

また、dockerだとmetabaseのコンテナにそのままドライバを読み込ませてもうまく動かないようです。

そこでコンテナイメージを一から作成する必要があります。
以下URLに情報がありますので参考にしてください。

metabase duckdb driver:https://github.com/AlexR2D2/metabase_duckdb_driver?tab=readme-ov-file#docker
FROM openjdk:21-buster

ENV MB_PLUGINS_DIR=/home/plugins/

ADD https://downloads.metabase.com/v0.55.4/metabase.jar /home
ADD https://github.com/motherduckdb/metabase_duckdb_driver/releases/download/0.4.1/duckdb.metabase-driver.jar /home/plugins/

RUN chmod 744 /home/plugins/duckdb.metabase-driver.jar

CMD ["java", "-jar", "/home/metabase.jar"]

compose.ymlを作成します。
services:
metabase:
build: .
container_name: metaduck
ports:
- "3000:3000"
volumes:
- metabase_data:/metabase-data
- duckdb_data:/data
depends_on:
- duckdb

duckdb:
image: datacatering/duckdb:v1.3.1
container_name: duckdb
volumes:
- duckdb_data:/data
tty: true

volumes:
metabase_data:
duckdb_data:

docker composeで起動させます。
$ docker compose up -d

duckdbにdummyのデータを入れます。
$ docker exec -it duckdb /duckdb /data/duckdb.db
DuckDB v1.3.1 (Ossivalis) 2063dda3e6
Enter ".help" for usage hints.
D CREATE TABLE dummy (id INTEGER);
D .exit

そしてブラウザからmetabaseにアクセスしてみます。
http://localhost:3000/



初期設定はテストのため、適当に入力しても問題ありません。

「データを追加する」だけは「あとでデータを追加する」を選択してください。


全て完了したら下記のような画面が出てくると思いますので、ここで「データベースの追加」を選択します。


以下のように設定をします
 - データベースのタイプ:DuckDB
 - 表示名:任意の値
 - Database file:/data/duck.db


下に進んで「Init SQL (run on each new DuckDB instance)」の場所に以下の内容を設定します。

R2 API TokenとCloudflareのアカウントIDをメモしておいたと思いますのでその値を使用します。
 INSTALL httpfs; LOAD httpfs;

CREATE SECRET (
TYPE r2,
KEY_ID 'R2 API Tokenのアクセス キー ID',
SECRET 'R2 API Tokenのシークレット アクセス キー',
ACCOUNT_ID 'CloudflareのアカウントID'
);

以下のduckdbのドキュメントに設定例が載ってますので参考にしてください。
https://duckdb.org/docs/stable/guides/network_cloud_storage/cloudflare_r2_import.html


入力ができたら「保存」をクリックします。

無事に成功したら管理画面を終了して、新規からSQLクエリを選択します。


データベースは「my_duckdb」を選択して、下記SQLを入れます。

read_json_autoの内容は、R2バケットの中のディレクトリに合わせて変更をしてください。
SELECT * FROM read_json_auto('r2://test-pipelines-data/event_date=2025-*/hr=*/*.json.gz');

上記を実行すると、R2に保存されているデータが表示されます。


上記のデータを利用して、ビジュアライゼーションからアンケートの結果をグラフ化してみます。


SQLを変更することで、別のアンケート分析も可能です。



CORS


今回はWorkersからPipelinesにデータを投入しましたが、Pipelinesにフロントエンドから直接投入する場合にCORSの問題が発生することがあります。
PipelinesのCORSの設定は下記ドキュメントを参考にしてください。

Cloudflare Doc:https://developers.cloudflare.com/pipelines/build-with-pipelines/sources/http/#specifying-cors-settings

シャード


Pipelinesのスループットはシャードを増やすことで向上させることが可能です。

Cloudflare Doc:https://developers.cloudflare.com/pipelines/build-with-pipelines/shards/

まだベータ版なので実際に使うことは少ないと思いますが、本番環境でスループットに問題が出た際の対応策として参考にしてください。

最後に

簡単ではありますがCloudflare Pipelinesの検証を行いました。

ユーザ側で特に難しい設定やインフラの面倒も見なくても、データをPipelinesに投げておけばR2にデータを自動で保存してくれるのは非常に魅力的です。

Pipelinesの今後の展開として、R2のData Catalog機能と統合し、icebergテーブルに直接データを取り込めるようになるようです。
そうなれば、icebergを用いた分析がすぐにできるようになり、さらに便利になります。

今まで分析系のツールを使ったことはありませんでしたが、今回のduckdbやmetabaseに触れることで、新しく興味の幅が広がりました。

また、icebergに関しても興味が出てきたので、PipelinesとData Catalogの統合がされたら勉強してみたいと思っています。

そしてCloudflareなら上記以外でも、

 ・パフォーマンスの向上
 ・サイトの信頼性の向上
 ・セキュリティの向上

が、一つのサービスで実現できますので非常におすすめです。

Cloudflareに、アクセリアの運用サポートをプラスしたCDNサービスを提供しています

アクセリア自社CDNの開発と運用は、20年以上にわたります。それらの経験とノウハウを駆使したプロフェッショナルサポートをパッケージしたサービスが、[Solution CDN]です。
移行支援によるスムーズな導入とともに、お客様の運用負担を最小限に​とどめながら、WEBサイトのパフォーマンスとセキュリティを最大限に高めます。運用サポートはフルアウトソーシングからミニマムサポートまで、ご要望に合わせてご提供します。

Cloudflare(クラウドフレア)の導入や運用について、またそれ以外のことでもなにか気になることがございましたらお気軽にご相談下さい。

Cloudflareの導入・運用について ご相談いただけます。 導入に関するご相談だけでなく、運用についてもご相談ください。

杉木 俊文

アクセリア株式会社 サービス事業本部プラットフォーム部
Contact usお問い合わせ

サービスにご興味をお持ちの方は
お気軽にお問い合わせください。

Webからお問い合わせ

お問い合わせ

お電話からお問い合わせ

03-5211-7750

平日09:30 〜 18:00

Download資料ダウンロード

製品紹介やお役立ち資料を無料でご活用いただけます。

Magazineメルマガ登録

最新の製品情報などタイムリーな情報を配信しています。

Free Service

PageSpeed Insights シミュレータ

CDNによるコンテンツの最適化を行った場合のPageSpeed Insightsのスコアをシミュレートしてレポートします。