Let's copy some bytes using Airbyte Protocol and Rust
Replication Driver
Airbyte replication connectors are run as containers. So we need a container runtime like Docker Desktop.
But the driver here is agnostic to the container runtime when compared to the Official Airbyte Worker job.
The architecture differs in how the driver coordinates the replication process from Source to Destination. Driver plays two roles, Source, and Destination, and it is run inside the containers.
The driver processes inside the containers coordinate themselves through IPC (inter-process communication). They execute an initialization handshake protocol to start the Source and Destination processes.
If the process creation is successful Source driver will the start replication of data into the Destination process.
The Source driver will take care of tracking record flow and publish stats and metrics. The Destination driver will take care of state persistence.
Once the replication is complete, the drivers again coordinate the shutdown procedure through IPC and end gracefully. For the above Protocol, we need some shared volume for the process pipes. So an init command is executed at the startup.
We can run the replication using a docker compose when using Docker.
The end effect is we run just two containers. 🎉🎉
Now as the introduction is out of our way, we shall copy a file from one folder to another using the Airbyte replication.
Let's use Airbyte Connector Source File to Airbyte Connector Destination CSV
À la cp local/input.csv local/_airbyte_raw_test.csv
! 😃
First, download the sample configuration from Github repository.
git clone git@github.com:replication-rs/airbyte-replication-operator-external.git
cd airbyte-replication-operator-external
There are currently few scenarios used for testing, actually the E2E tests going forward.
cd e2e/hello-airbyte-file-to-csv/docker
docker compose up
Output should look like something below.
h7kanna@Harshas-MBP-6 docker % docker compose up
[+] Running 4/2
â ¿ Network docker_default Created 0.0s
â ¿ Container docker-init-1 Created 0.0s
â ¿ Container docker-destination-1 Created 0.1s
â ¿ Container docker-source-1 Created 0.1s
Attaching to docker-destination-1, docker-init-1, docker-source-1
docker-init-1 | 2023-03-21T00:58:16.240494Z INFO airbyte_replication_driver: Starting command="init"
docker-init-1 | 2023-03-21T00:58:16.240520Z INFO airbyte::driver::init: Copying driver to /storage/airbyte-replication-driver ...
docker-init-1 | 2023-03-21T00:58:16.259013Z INFO airbyte::driver::init: Driver successfully copied to: "/storage/airbyte-replication-driver"
docker-init-1 | 2023-03-21T00:58:16.259107Z INFO airbyte::driver::init: Permissions set successfully to: "/storage/airbyte-replication-driver"
docker-init-1 exited with code 0
docker-source-1 | 2023-03-21T00:58:16.613952Z INFO airbyte_replication_driver: Starting command="source"
docker-destination-1 | 2023-03-21T00:58:16.615943Z INFO airbyte_replication_driver: Starting command="destination"
docker-destination-1 | 2023-03-21T00:58:16.616358Z INFO airbyte::driver::pipes: Error while removing named pipes /pipes
docker-destination-1 | 2023-03-21T00:58:16.616390Z INFO airbyte::driver::pipes: Created "/pipes/source"
docker-destination-1 | 2023-03-21T00:58:16.616424Z INFO airbyte::driver::pipes: Created named pipe "/pipes/source/stdin.pipe"
docker-destination-1 | 2023-03-21T00:58:16.616432Z INFO airbyte::driver::pipes: Created named pipe "/pipes/source/stdout.pipe"
docker-destination-1 | 2023-03-21T00:58:16.616438Z INFO airbyte::driver::pipes: Created named pipe "/pipes/source/stderr.pipe"
docker-destination-1 | 2023-03-21T00:58:16.616458Z INFO airbyte::driver::pipes: Created "/pipes/destination"
docker-destination-1 | 2023-03-21T00:58:16.616468Z INFO airbyte::driver::pipes: Created named pipe "/pipes/destination/stdin.pipe"
docker-destination-1 | 2023-03-21T00:58:16.616475Z INFO airbyte::driver::pipes: Created named pipe "/pipes/destination/stdout.pipe"
docker-destination-1 | 2023-03-21T00:58:16.616482Z INFO airbyte::driver::pipes: Created named pipe "/pipes/destination/stderr.pipe"
docker-destination-1 | 2023-03-21T00:58:16.616489Z INFO airbyte::driver::pipes: Created named pipe "/pipes/replication.pipe"
docker-destination-1 | 2023-03-21T00:58:16.616497Z INFO airbyte::driver::destination: Replication started at Instant { tv_sec: 42417, tv_nsec: 705905322 }
docker-source-1 | 2023-03-21T00:58:16.616157Z INFO airbyte::driver::source: /pipes/destination/stdin.pipe is not ready No such device or address (os error 6)
docker-source-1 | 2023-03-21T00:58:16.616379Z INFO airbyte::driver::source: Waiting for destination
docker-source-1 | 2023-03-21T00:58:17.621428Z INFO airbyte::driver::source: Waiting for destination client to connect
docker-source-1 | 2023-03-21T00:58:17.623848Z INFO airbyte::driver::source: Destination client connected!
docker-destination-1 | 2023-03-21T00:58:17.621428Z INFO airbyte::driver::destination: Destination AIRBYTE_ENTRYPOINT /airbyte/base.sh
docker-destination-1 | 2023-03-21T00:58:17.621456Z INFO airbyte::driver::destination: Destination command /airbyte/base.sh
docker-destination-1 | 2023-03-21T00:58:17.621542Z INFO airbyte::driver::destination: Destination output tracker spawned
docker-destination-1 | 2023-03-21T00:58:17.622338Z INFO airbyte::driver::destination: Servery key /tmp/.tmpqjfsIU/socket
docker-destination-1 | 2023-03-21T00:58:17.622490Z INFO airbyte::driver::destination: Client initiated
docker-source-1 | 2023-03-21T00:58:17.640206Z INFO airbyte::driver::source: Source AIRBYTE_ENTRYPOINT python /airbyte/integration_code/main.py
docker-source-1 | 2023-03-21T00:58:17.640242Z INFO airbyte::driver::source: Source command python
docker-source-1 | 2023-03-21T00:58:17.642231Z INFO airbyte::driver::source: Source output tracker spawned
docker-destination-1 | 2023-03-21T00:58:18.074393Z INFO airbyte::driver::destination: AirbyteMessage { catalog: None, connection_status: None, control: None, log: Some(AirbyteLogMessage { level: Info, message: "integration args: {catalog=/secrets/catalog-file.json, write=null, config=/secrets/destination-file.json}", stack_trace: None }), record: None, spec: None, state: None, trace: None, type_: Log }
docker-destination-1 | 2023-03-21T00:58:18.074576Z INFO airbyte::driver::destination: AirbyteMessage { catalog: None, connection_status: None, control: None, log: Some(AirbyteLogMessage { level: Info, message: "Running integration: io.airbyte.integrations.destination.csv.CsvDestination", stack_trace: None }), record: None, spec: None, state: None, trace: None, type_: Log }
docker-destination-1 | 2023-03-21T00:58:18.074800Z INFO airbyte::driver::destination: AirbyteMessage { catalog: None, connection_status: None, control: None, log: Some(AirbyteLogMessage { level: Info, message: "Command: WRITE", stack_trace: None }), record: None, spec: None, state: None, trace: None, type_: Log }
docker-destination-1 | 2023-03-21T00:58:18.077523Z INFO airbyte::driver::destination: AirbyteMessage { catalog: None, connection_status: None, control: None, log: Some(AirbyteLogMessage { level: Info, message: "Integration config: IntegrationConfig{command=WRITE, configPath='/secrets/destination-file.json', catalogPath='/secrets/catalog-file.json', statePath='null'}", stack_trace: None }), record: None, spec: None, state: None, trace: None, type_: Log }
docker-destination-1 | 2023-03-21T00:58:18.206642Z INFO airbyte::driver::destination: AirbyteMessage { catalog: None, connection_status: None, control: None, log: Some(AirbyteLogMessage { level: Info, message: "initializing consumer.", stack_trace: None }), record: None, spec: None, state: None, trace: None, type_: Log }
docker-source-1 | 2023-03-21T00:58:18.619138Z INFO airbyte::driver::source: Replication complete
docker-source-1 | 2023-03-21T00:58:18.619162Z INFO airbyte::driver::source: exit status: 0
docker-destination-1 | 2023-03-21T00:58:18.651731Z INFO airbyte::driver::destination: AirbyteMessage { catalog: None, connection_status: None, control: None, log: Some(AirbyteLogMessage { level: Info, message: "Airbyte message consumer: succeeded.", stack_trace: None }), record: None, spec: None, state: None, trace: None, type_: Log }
docker-destination-1 | 2023-03-21T00:58:18.651797Z INFO airbyte::driver::destination: AirbyteMessage { catalog: None, connection_status: None, control: None, log: Some(AirbyteLogMessage { level: Info, message: "finalizing consumer.", stack_trace: None }), record: None, spec: None, state: None, trace: None, type_: Log }
docker-destination-1 | 2023-03-21T00:58:18.655133Z INFO airbyte::driver::destination: AirbyteMessage { catalog: None, connection_status: None, control: None, log: Some(AirbyteLogMessage { level: Info, message: "File output: /local/_airbyte_raw_test.csv", stack_trace: None }), record: None, spec: None, state: None, trace: None, type_: Log }
docker-destination-1 | 2023-03-21T00:58:18.655787Z INFO airbyte::driver::destination: AirbyteMessage { catalog: None, connection_status: None, control: None, log: Some(AirbyteLogMessage { level: Info, message: "Completed integration: io.airbyte.integrations.destination.csv.CsvDestination", stack_trace: None }), record: None, spec: None, state: None, trace: None, type_: Log }
docker-destination-1 | 2023-03-21T00:58:18.687422Z INFO airbyte::driver::destination: Tracking complete
docker-destination-1 | 2023-03-21T00:58:18.687538Z INFO airbyte::driver::destination: exit status: 0
docker-destination-1 | 2023-03-21T00:58:18.687551Z INFO airbyte::driver::destination: Replication completed in 0 minutes
docker-source-1 exited with code 0
docker-destination-1 exited with code 0
Let's verify
chmod +x assert.sh && ./assert.sh
Replication successful.
Please check ./local/_airbyte_raw_test.csv
Cleanup
docker compose down && rm -rf ./local/_airbyte_raw_test.csv
More example compose files here
Replication Operator
Again, as Airbyte replications are run as containers, we can run them using any Container platform.
My plan is to support running on various platforms like Amazon ECS, Fargate, etc.
But for our darling Kubernetes, Operator component is a Kubernetes Operator which runs the above driver as a Pod and takes care of the lifecycle of one replication.
Idea is that higher-level Orchestrators can use this as a building block to provide features like scheduling, configuration management, UI, etc.
And, Kubernetes is one of the pluggable storage options to store the replication state.
As an example, KubeVela can be used as that higher-level workflow engine. Check
details here.
A video demo is here.
CLI
Driver has a CLI interface for some useful operations to view the State and Progress of replications.
CLI also provides some scheduling(cron) and config management capability for simple use cases.
And in the future, envisioned Airbyte Desktop, a way to store your personal data using Airbyte replications in your own Data Lake.
For all commands
./airbyte-replication-driver --help
Example: Check the status of the replication
./airbyte-replication-driver --command state --replication hello-airbyte-file-to-csv --store-path tmp
Finally
Hopefully, the above tutorial has helped you understand the usage examples of the replication Driver/Operator combo.
And more importantly, the value proposition.
So let us start contributing to the awesome Airbyte ecosystem in a unique way.
Shall we?
Check the Roadmap here.
Your feedback is much appreciated.
Yours truly,
Harsha Teja Kanna