Connector Framework
The Connectivity Service is designed to be extensible. You can build a connector to support a new payment processor, or to support a new payment method for an existing processor. This article describes how to build a connector.
Building a Connector
We are going to create a demo connector which reads payments files from a directory. In this directory, we will deposit fictional payments files to be processed by the connector. Each file will contain a payment to be processed by the connector.
Let's get our hands on deck with the Connector interface
type Loader[ConnectorConfig models.ConnectorConfigObject] interface {
// Name has to return the name of the connector. It must be constant and unique
Name() models.ConnectorProvider
// Load is in charge of loading the connector
// It takes a logger and a ConnectorConfig object.
// At this point, the config must have been validated
Load(logger logging.Logger, config ConnectorConfig) connectors.Connector
// ApplyDefaults is used to fill default values of the provided configuration object
ApplyDefaults(t ConnectorConfig) ConnectorConfig
// Extra routes to be added to the connectors manager API
Router(store *storage.Storage) *mux.Router
// AllowTasks define how many task the connector can run
// If too many tasks are scheduled by the connector,
// those will be set to pending state and restarted later when some other tasks will be terminated
AllowTasks() int
}
A connector has a name, which is provided by the loader through the Name() method. The connector also defines a config object using generics, which must implement the payments.ConnectorConfigObject interface. This interface only has a Validate() method, which is used by the code to validate that an external config is valid before loading the connector with it.
type ConnectorConfigObject interface {
ConnectorName() string
Validate() error
Marshal() ([]byte, error)
}
Since some properties of the config may be optional, the loader is also
responsible for configuring default values for these properties. This is done
using the ApplyDefaults(Config)
method.
The framework provides the capability to run tasks, so each connector can start
a number of tasks. These tasks will be scheduled by the framework, and if the
service is restarted, the tasks will be restarted as well. The number of tasks
that a connector can schedule is defined by the AllowTasks()
method.
To implement the Loader
interface, you can create your own struct that
implements the required methods, or you can use utilities provided by the
framework. Here is an example of a basic loader:
// See below in the docs where to add the ConnectorProviderExample enum.
const name = models.ConnectorProviderExample
type Loader struct{}
// Name returns the name of the connector.
func (l *Loader) Name() models.ConnectorProvider {
return name
}
// AllowTasks returns the amount of tasks that are allowed to be scheduled.
func (l *Loader) AllowTasks() int {
return 10
}
// ApplyDefaults applies default values to the configuration.
func (l *Loader) ApplyDefaults(cfg Config) Config {
if cfg.name == "" {
cfg.name = "example"
}
return cfg
}
func (l *Loader) Router(store *storage.Storage) *mux.Router {
return nil
}
func (l *Loader) Load(logger logging.Logger, config Config) connectors.Connector {
return nil // Will be updated in the next section
}
func NewLoader() *Loader {
return &Loader{}
}
type Config struct{
name string
}
func (c Config) ConnectorName() string {
return c.name
}
func (c Config) Validate() error {
return nil
}
func (c Config) Marshal() ([]byte, error) {
return json.Marshal(c)
}
In this example, the name of the connector is "example", and the Config
struct is empty. We will add logic to these structs later.
To integrate the connector into the core, we need to edit the cmd/connectors/internal/api/module.go
file and add the following code to the end of the HTTPModule()
method:
...
addConnector[example.Config](example.NewLoader()),
...
As you may have noticed, the Loader
has method named Load
:
//...
Load(logger logging.Logger, config Config) connectors.Connector
//...
The Load function takes a logger provided by the framework and a configuration, which is likely provided by the API endpoint. It returns a Connector object that provides an entry point to a payment provider. The Connector interface is as follows:
// Connector provide entry point to a payment provider.
type Connector interface {
// Install is used to start the connector. The implementation if in charge of scheduling all required resources.
Install(ctx task.ConnectorContext) error
// Uninstall is used to uninstall the connector. It has to close all related resources opened by the connector.
Uninstall(ctx context.Context) error
// UpdateConfig is used to update the configuration of the connector.
UpdateConfig(ctx task.ConnectorContext, config models.ConnectorConfigObject) error
// Resolve is used to recover state of a failed or restarted task
Resolve(descriptor models.TaskDescriptor) task.Task
// InitiateTransfer is used to initiate a transfer from the connector to a bank account.
InitiatePayment(ctx task.ConnectorContext, transfer *models.TransferInitiation) error
// ReverssePayment is used to reverse a transfer from the connector.
ReversePayment(ctx task.ConnectorContext, transferReversal *models.TransferReversal) error
// CreateExternalBankAccount is used to create a bank account on the connector side.
CreateExternalBankAccount(ctx task.ConnectorContext, bankAccount *models.BankAccount) error
// GetSupportedCurrenciesAndDecimals returns a map of supported currencies and their decimals.
SupportedCurrenciesAndDecimals() map[string]int
}
When you make a request to http://localhost:8080/api/payments/connectors/example
with the
POST
method, the framework calls the Install()
method. Similarly, when you
make a request to the same URL with the DELETE
method, the framework calls
the Uninstall()
method.
It is time to create a basic connector that does nothing.
type Connector struct {
logger logging.Logger
cfg Config
}
func (c *Connector) InitiatePayment(ctx task.ConnectorContext, transfer *models.TransferInitiation) error {
return connectors.ErrNotImplemented
}
func (c *Connector) ReversePayment(ctx task.ConnectorContext, transferReversal *models.TransferReversal) error {
return connectors.ErrNotImplemented
}
func (c *Connector) CreateExternalBankAccount(ctx task.ConnectorContext, bankAccount *models.BankAccount) error {
return connectors.ErrNotImplemented
}
func (c *Connector) SupportedCurrenciesAndDecimals() map[string]int {
return currency.ISO4217Currencies
}
func (c *Connector) UpdateConfig(ctx task.ConnectorContext, config models.ConnectorConfigObject) error {
cfg, ok := config.(Config)
if !ok {
return connectors.ErrInvalidConfig
}
c.cfg = cfg
return nil
}
func (c *Connector) Install(ctx task.ConnectorContext) error {
return nil
}
func (c *Connector) Uninstall(ctx context.Context) error {
return nil
}
func (c *Connector) Resolve(descriptor models.TaskDescriptor) task.Task {
return nil // Will be updated in the next section
}
var _ connectors.Connector = &Connector{}
func newConnector(logger logging.Logger, cfg Config) *Connector {
return &Connector{
logger: logger.WithFields(map[string]any{
"component": "connector",
}),
cfg: cfg,
}
}
Then, we need to modify our loader:
// Load returns the connector.
func (l *Loader) Load(logger logging.Logger, config Config) connectors.Connector {
return newConnector(logger, config)
}
Since the connector's name is an enum, you will need to add it at different places:
internal/storage/migration_v1.x.go
: Add a new migration adding the connector enum to the postgres enum:
migrations.Migration{
Up: func(tx bun.Tx) error {
_, err := tx.Exec(`
ALTER TYPE connector_provider ADD VALUE IF NOT EXISTS 'EXAMPLE';
`)
if err != nil {
return err
}
return nil
},
},
internal/models/connector.go
: Add the enum to theConnectorProvider
type:
ConnectorProviderExample ConnectorProvider = "EXAMPLE"
Don't forget to update all enum functions below the enum definition.
After running the service, you should see output like this:
stack-all-in-one-1 | [payments_1 ] time="12-12-1128 12:38:13.439134" level=info msg="Restoring state for all connectors" component=connector-manager provider=EXAMPLE
This indicates that your connector is properly integrated. You can install it using this command:
curl http://localhost:8080/api/payments/connectors/example -X POST
The service should display output like this:
stack-all-in-one-1 | [payments_1 ] time="12-12-1128 12:40:04.072596" level=info msg="Install connector " component=connector-manager config="{}" provider=EXAMPLE
stack-all-in-one-1 | [payments_1 ] time="12-12-1128 12:40:04.095978" level=info msg="Connector installed" component=connector-manager provider=EXAMPLE
To uninstall your connector, you can now use the following command:
curl http://localhost:8080/api/payments/connectors/example -X DELETE
The service should display output like this:
stack-all-in-one-1 | [payments_1 ] time="12-12-1128 12:54:32.216901" level=info msg="Uninstalling connector: {<connectorID> EXAMPLE}" component=connector-manager provider=EXAMPLE
stack-all-in-one-1 | [payments_1 ] time="12-12-1128 12:54:32.216928" level=info msg="Stopping scheduler..." component=scheduler connectorID="{<connectorID> EXAMPLE}"
stack-all-in-one-1 | [payments_1 ] time="12-12-1128 12:54:32.221242" level=info msg="Connector {<ConnectorID> EXAMPLE} uninstalled" component=connector-manager provider=EXAMPLE
This indicates that your connector has been successfully uninstalled. You can now continue with the next steps in your tutorial.
It's to time to add a bit of logic to our connector.
type Config struct {
name string
Directory string
}
func (c Config) Validate() error {
if c.Directory == "" {
return errors.New("missing directory to watch")
}
return nil
}
Here, we have defined only one property for our connector, called "Directory", which indicates the directory where JSON files will be pushed. Now, let's modify our loader:
type TaskDescriptor string
func (c *Connector) Install(ctx task.ConnectorContext) error {
taskDescriptor, err := models.EncodeTaskDescriptor(TaskDescriptor("directory"))
if err != nil {
return err
}
return ctx.Scheduler().Schedule(
ctx.Context(),
taskDescriptor,
models.TaskSchedulerOptions{
ScheduleOption: models.OPTIONS_RUN_PERIODICALLY,
Duration: 10 * time.Second,
RestartOption: models.OPTIONS_RESTART_ALWAYS,
},
)
}
The Install
method takes a task.ConnectorContext
parameter, which has the
following interface:
type ConnectorContext interface {
Context() context.Context
Scheduler() Scheduler
}
Basically this context provides two Essentially, this context provides two things:
context.Context
: If the connector performs long-running processing, it should listen on this context to abort if necessary.Scheduler
: A scheduler to run tasks
We made our way up to this point without tasks. A task is like a process that the framework will handle for you. It is essentially a simple function. When installed, a connector has the opportunity to schedule tasks and let the system handle them. A task has a descriptor, which must be immutable and represents a specific task in the system. It can be anything. A task also has a state, which can change and the framework provides necessary APIs to do that. We will discuss that later. Like the descriptor, the state is freely defined by the connector.
In our case, the main task is to list the target repository. Secondary tasks will be defined to read each file in the directory. We can define our task descriptor as a string. The value will be the file name for secondary tasks and a hardcoded value of "directory" for the main task.
Here, we instruct the framework to create and schedule a task with the descriptor "directory". When scheduling a task, you have to provide scheduler options:
- Schedule options: can be either:
OPTIONS_RUN_NOW
: The task will be run immediately in an async taskOPTIONS_RUN_IN_DURATION
: The task will be run after a given durationOPTIONS_RUN_SCHEDULED_AT
: The task will be run at a specific timeOPTIONS_RUN_PERIODICALLY
: The task will be run every given durationOPTIONS_RUN_NOW_SYNC
: The task will be run immediately in a sync task and will return the associated error if there is one
- Duration: duration used by the
OPTIONS_RUN_IN_DURATION
orOPTIONS_RUN_PERIODICALLY
schedule options - Restart options: can be either:
OPTIONS_STOP_AND_RESTART
OPTIONS_RESTART_ALWAYS
: The task will always be restartedOPTIONS_RESTART_NEVER
: The task will never be restartedOPTIONS_RESTART_IF_NOT_ACTIVE
: the task will be restarted if it is not marked as active in the databaseOPTIONS_STOP_AND_RESTART
: the task will be stopped if it exists and restarted
However, it does not yet know the specific logic of the task.
To provide this logic, we have to use the last method of the connector:
Resolve(descriptor models.TaskDescriptor) task.Task
. This method is
responsible for providing a task.Task
instance given a descriptor.
Therefore, when calling ctx.Scheduler().Schedule(...)
, the framework will call
the Resolve
method with "directory" as a parameter.
Let's implement the resolve method:
func (c *Connector) Resolve(descriptor models.TaskDescriptor) task.Task {
taskDescriptor, err := models.DecodeTaskDescriptor[TaskDescriptor](descriptor)
if err != nil {
panic(err)
}
if taskDescriptor == "directory" {
return func() {
// TODO
}
}
return func() {
// TODO
}
}
Now, we have to implement the logic for each task. Let's start with the main task which read the directory:
func (c *Connector) Resolve(descriptor models.TaskDescriptor) task.Task {
taskDescriptor, err := models.DecodeTaskDescriptor[TaskDescriptor](descriptor)
if err != nil {
panic(err)
}
if taskDescriptor == "directory" {
return func(
ctx context.Context,
logger logging.Logger,
scheduler task.Scheduler,
) error {
logger.Infof("Opening directory '%s'...", c.cfg.Directory)
dir, err := os.ReadDir(c.cfg.Directory)
if err != nil {
return err
}
logger.Infof("Found %d files", len(dir))
for _, file := range dir {
taskDescriptor, err := models.EncodeTaskDescriptor(TaskDescriptor(file.Name()))
if err != nil {
return err
}
if err := scheduler.Schedule(
ctx,
taskDescriptor,
models.TaskSchedulerOptions{
ScheduleOption: models.OPTIONS_RUN_NOW,
RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE,
},
); err != nil {
return err
}
}
return nil
}
}
return func() error {
return errors.New("not implemented")
}
}
To now test our implementation, we can start the server as usual and issue a curl request to install the connector:
curl http://localhost:8080/api/payments/connectors/example -X POST -d '{"directory": "/tmp/payments"}'
You should see something like this in the curl response:
{"data":{"connectorID":"<connectorID>>"}}
Don't forget to note the connectorID, as we will need it later.
This instructs the connector to watch the directory /tmp/payments
. You should
see something like this in the app logs:
stack-all-in-one-1 | [payments_1 ] time="12-12-1128 12:40:04.072596" level=info msg="Install connector " component=connector-manager config="{ /tmp/payments}" provider=EXAMPLE
stack-all-in-one-1 | [payments_1 ] time="12-12-1128 12:40:04.095978" level=info msg="Connector installed" component=connector-manager provider=EXAMPLE
stack-all-in-one-1 | [payments_1 ] time="12-12-1128 12:40:04.096143" level=info msg="Opening directory '/tmp/payments'..." component=scheduler connectorID="{63976463-040a-44c2-9d17-eed95a19331b EXAMPLE}" task-id=6b545863-889c-4418-98b2-8e1a85ed9a49
stack-all-in-one-1 | [payments_1 ] time="12-12-1128 12:40:04.096267" level=error msg="Task terminated with error: open /tmp/payments: no such file or directory" component=scheduler connectorID="{63976463-040a-44c2-9d17-eed95a19331b EXAMPLE}" task-id=6b545863-889c-4418-98b2-8e1a85ed9a49
As expected, the task triggers an error because the /tmp/payments
directory
does not exist (yet).
You can see the tasks on the API as well:
curl http://localhost:8080/api/payments/connectors/example/<connectorID>/tasks | jq .
which should return something like this:
{
"cursor": {
"pageSize":15,
"hasMore":false,
"data":[
{
"id":"03611faa-56c3-4340-bf34-12bf2f88444a",
"connectorID":"<connectorID>",
"createdAt":"2023-10-09T12:01:11Z",
"updatedAt":"2023-10-09T12:01:11Z",
"descriptor":"directory",
"status":"FAILED",
"state":null,
"error":"open /tmp/payments: no such file or directory"
}
]
}
}
As you can see, a task has an id. This id is simply the descriptor of the task encoded in canonical json and encoded as base 64. Now .
To fix the error we see in the logs, we can create the missing directory, uninstall the connector, and reinstall it:
docker exec stack-all-in-one-1 mkdir /tmp/payments
After a few seconds, you should see thoses logs lines:
stack-all-in-one-1 | [payments_1 ] time="12-12-1128 12:47:44.627235" level=info msg="Connector installed" component=connector-manager provider=EXAMPLE
stack-all-in-one-1 | [payments_1 ] time="12-12-1128 12:47:44.627353" level=info msg="Opening directory '/tmp/payments'..." component=scheduler connectorID="{93b44ff6-d939-473d-ba59-c754c7cd8777 EXAMPLE}" task-id=68942fb0-0abb-48e9-9e37-04d4f81c510d
stack-all-in-one-1 | [payments_1 ] time="12-12-1128 12:47:44.627492" level=info msg="Found 0 files" component=scheduler connectorID="{93b44ff6-d939-473d-ba59-c754c7cd8777 EXAMPLE}" task-id=68942fb0-0abb-48e9-9e37-04d4f81c510d
stack-all-in-one-1 | [payments_1 ] time="12-12-1128 12:47:44.627512" level=info msg="Starting task..." component=scheduler connectorID="{93b44ff6-d939-473d-ba59-c754c7cd8777 EXAMPLE}" task-id=68942fb0-0abb-48e9-9e37-04d4f81c510d
Ok, now we have a directory to watch. Let's add some files to it:
echo '{
"created_at": "2019-05-15T14:00:00.000Z",
"reference": "test",
"amount": 100,
"asset": "EUR/2",
"type": "TRANSFER",
"status": "SUCCEEDED",
"scheme": "visa"
}' > /tmp/test.json
docker cp /tmp/test.json stack-all-in-one-1:/tmp/payments/test.json
Now you should see something like this in the logs:
stack-all-in-one-1 | [payments_1 ] time="12-12-1128 12:57:38.178945" level=info msg="Opening directory '/tmp/payments'..." component=scheduler connectorID="{c283ee56-b62c-4403-ac02-759da8d96c05 EXAMPLE}" task-id=675e9627-7aca-4108-a8a2-494532eb9910
stack-all-in-one-1 | [payments_1 ] time="12-12-1128 12:57:48.178829" level=info msg="Found 1 files" component=scheduler connectorID="{c283ee56-b62c-4403-ac02-759da8d96c05 EXAMPLE}" task-id=675e9627-7aca-4108-a8a2-494532eb9910
stack-all-in-one-1 | [payments_1 ] time="12-12-1128 12:57:58.192963" level=info msg="Starting task..." component=scheduler connectorID="{c283ee56-b62c-4403-ac02-759da8d96c05 EXAMPLE}" task-id=33e944bc-31dd-40c6-a987-41e50d3161a3
stack-all-in-one-1 | [payments_1 ] time="12-10-9108 12:12:12.838669" level=error msg="Task terminated with error: not implemented" component=scheduler connectorID="{c283ee56-b62c-4403-ac02-759da8d96c05 EXAMPLE}" task-id=33e944bc-31dd-40c6-a987-41e50d3161a3
The logs show that our connector properly detected the file and triggered a
new task for the file.
The task terminates with an error as the Resolve
function does not handle
the descriptor. We will fix that in the next section.
As you can see, while the first task is still active, the second is flagged as failed with an error message.
Let's implement the second task. We will simply read the file and ingest its content:
return func(
ctx context.Context,
connectorID models.ConnectorID,
logger logging.Logger,
ingester ingestion.Ingester,
) error {
file, err := os.Open(filepath.Join(c.cfg.Directory, string(taskDescriptor)))
if err != nil {
return err
}
type JsonPayment struct {
CreatedAt time.Time `json:"created_at"`
Reference string `json:"reference"`
Amount *big.Int `json:"amount"`
Type string `json:"type"`
Status string `json:"status"`
Scheme string `json:"scheme"`
Asset string `json:"asset"`
}
jsonPayment := &JsonPayment{}
err = json.NewDecoder(file).Decode(jsonPayment)
if err != nil {
return err
}
return ingester.IngestPayments(ctx, ingestion.PaymentBatch{
{
Payment: &models.Payment{
ID: models.PaymentID{
PaymentReference: models.PaymentReference{
Reference: jsonPayment.Reference,
Type: models.PaymentType(jsonPayment.Type),
},
ConnectorID: connectorID,
},
CreatedAt: jsonPayment.CreatedAt,
Reference: jsonPayment.Reference,
Amount: jsonPayment.Amount,
ConnectorID: connectorID,
Type: models.PaymentType(jsonPayment.Type),
Status: models.PaymentStatus(jsonPayment.Status),
Scheme: models.PaymentScheme(jsonPayment.Scheme),
Asset: models.Asset(jsonPayment.Asset),
},
},
})
}
Now restart the service, uninstall the connector, and reinstall it.
The logs should now show something like this:
stack-all-in-one-1 | [payments_1 ] time="12-10-9108 12:41:42.106212" level=info msg="Opening directory '/tmp/payments'..." component=scheduler provider=EXAMPLE task-id=9c55e905-afc2-4ca3-a6ac-d9934a7dc898
stack-all-in-one-1 | [payments_1 ] time="12-10-9108 12:41:42.111507" level=info msg="Found 1 files" component=scheduler provider=EXAMPLE task-id=9c55e905-afc2-4ca3-a6ac-d9934a7dc898
stack-all-in-one-1 | [payments_1 ] time="12-10-9108 12:41:42.194100" level=debug msg="Publishing message" message_uuid=9e308b6a-052c-4ef7-a617-86a24c3c3625 topic_name=payments
stack-all-in-one-1 | [payments_1 ] time="12-10-9108 12:41:42.233556" level=info msg="Starting task..." component=scheduler provider=EXAMPLE task-id=9c55e905-afc2-4ca3-a6ac-d9934a7dc898
stack-all-in-one-1 | [payments_1 ] time="12-10-9108 12:41:42.234471" level=info msg="Starting task..." component=scheduler provider=EXAMPLE task-id=5b28b138-e814-4674-862e-539fe6c0e4ec
stack-all-in-one-1 | [payments_1 ] time="12-10-9108 12:41:42.235586" level=debug msg="Ingest batch" size=1 startingAt="2023-10-09 12:41:42.235581006 +0000 UTC m=+40.934259811"
stack-all-in-one-1 | [payments_1 ] time="12-10-9108 12:41:42.296278" level=debug msg="Publishing message" message_uuid=22cb3db8-d73c-4b8e-9bb3-fd77ccaca6f9 topic_name=payments
stack-all-in-one-1 | [payments_1 ] time="12-10-9108 12:41:42.297042" level=debug msg="Batch ingested" endedAt="2023-10-09 12:41:42.297037256 +0000 UTC m=+40.995716061" latency=61.45625ms size=1
stack-all-in-one-1 | [payments_1 ] time="12-10-9108 12:41:42.297165" level=info msg="Task terminated with success" component=scheduler provider=EXAMPLE task-id=5b28b138-e814-4674-862e-539fe6c0e4ec
As you can see, this time the second task has been started and successfully completed.
It should have created a payment on database. Let's check it:
curl http://localhost:8080/api/payments/connectors/example | jq
{
"cursor": {
"pageSize": 15,
"hasMore": false,
"data": [
{
"id": "eyJQcm92aWRlciI6IkVYQU1QTEUiLCJSZWZlcmVuY2UiOiJ0ZXN0IiwiVHlwZSI6IlRSQU5TRkVSIn0=",
"reference": "test",
"type": "TRANSFER",
"connectorID": "<connectorID>",
"status": "SUCCEEDED",
"initialAmount": 100,
"scheme": "visa",
"asset": "EUR/2",
"createdAt": "2019-05-15T14:00:00Z",
}
]
}
}
Lovely! We now have successfully created a payment from a file at this stage.
The last important part to cover is the Ingester
, which is the component
that is responsible for saving the payment object in the database.
In the code of the second task, you may have noticed the following part:
return ingester.IngestPayments(ctx, ingestion.PaymentBatch{
{
Payment: &models.Payment{
ID: models.PaymentID{
PaymentReference: models.PaymentReference{
Reference: jsonPayment.Reference,
Type: models.PaymentType(jsonPayment.Type),
},
ConnectorID: connectorID,
},
CreatedAt: jsonPayment.CreatedAt,
Reference: jsonPayment.Reference,
Amount: jsonPayment.Amount,
ConnectorID: connectorID,
Type: models.PaymentType(jsonPayment.Type),
Status: models.PaymentStatus(jsonPayment.Status),
Scheme: models.PaymentScheme(jsonPayment.Scheme),
Asset: models.Asset(jsonPayment.Asset),
},
},
})
The ingester is in charge of accepting payments from a task and an eventual state to be persisted. In our case, we don't alter the state of an existing payment already saved to storage, so we simply pass an empty struct.
Lifecycle-wise, a good thing to note is that if the connector is restarted, the task will be restarted with the previously state.
Wrapping up
In this tutorial, we have seen how to create a connector from scratch. We have seen how to create a connector, how to create a task, and how to use the ingester to save the payment in the database. We have also seen how to use the scheduler to run the task periodically, and how to use the API to manage the connector. The code below is a full recap of the code we have seen in this tutorial.
package example
import (
"context"
"encoding/json"
"errors"
"math/big"
"os"
"path/filepath"
"time"
"github.com/formancehq/payments/cmd/connectors/internal/connectors/currency"
"github.com/formancehq/payments/cmd/connectors/internal/ingestion"
"github.com/formancehq/payments/cmd/connectors/internal/storage"
"github.com/formancehq/payments/cmd/connectors/internal/connectors"
"github.com/formancehq/payments/internal/models"
"github.com/formancehq/payments/cmd/connectors/internal/task"
"github.com/formancehq/stack/libs/go-libs/logging"
)
const Name = models.ConnectorProviderExample
type Loader struct{}
// Name returns the name of the connector.
func (l *Loader) Name() models.ConnectorProvider {
return models.ConnectorProviderExample
}
// AllowTasks returns the amount of tasks that are allowed to be scheduled.
func (l *Loader) AllowTasks() int {
return 10
}
// ApplyDefaults applies default values to the configuration.
func (l *Loader) ApplyDefaults(cfg Config) Config {
if cfg.name == "" {
cfg.name = "example"
}
return cfg
}
func (l *Loader) Router(store *storage.Storage) *mux.Router {
return nil
}
func (l *Loader) Load(logger logging.Logger, config Config) connectors.Connector {
return newConnector(logger, config)
}
func NewLoader() *Loader {
return &Loader{}
}
type Config struct {
name string
Directory string
}
func (c Config) ConnectorName() string {
return c.name
}
func (c Config) Validate() error {
if c.Directory == "" {
return errors.New("missing directory to watch")
}
return nil
}
func (c Config) Marshal() ([]byte, error) {
return json.Marshal(c)
}
type Connector struct {
logger logging.Logger
cfg Config
}
func (c *Connector) InitiatePayment(ctx task.ConnectorContext, transfer *models.TransferInitiation) error {
return connectors.ErrNotImplemented
}
func (c *Connector) CreateExternalBankAccount(ctx task.ConnectorContext, bankAccount *models.BankAccount) error {
return connectors.ErrNotImplemented
}
func (c *Connector) ReversePayment(ctx task.ConnectorContext, transferReversal *models.TransferReversal) error {
return connectors.ErrNotImplemented
}
func (c *Connector) SupportedCurrenciesAndDecimals() map[string]int {
return currency.ISO4217Currencies
}
func (c *Connector) UpdateConfig(ctx task.ConnectorContext, config models.ConnectorConfigObject) error {
cfg, ok := config.(Config)
if !ok {
return connectors.ErrInvalidConfig
}
c.cfg = cfg
return nil
}
type TaskDescriptor string
func (c *Connector) Install(ctx task.ConnectorContext) error {
taskDescriptor, err := models.EncodeTaskDescriptor(TaskDescriptor("directory"))
if err != nil {
return err
}
return ctx.Scheduler().Schedule(
ctx.Context(),
taskDescriptor,
models.TaskSchedulerOptions{
ScheduleOption: models.OPTIONS_RUN_PERIODICALLY,
Duration: 10 * time.Second,
RestartOption: models.OPTIONS_RESTART_ALWAYS,
},
)
}
func (c *Connector) Uninstall(ctx context.Context) error {
return nil
}
func (c *Connector) Resolve(descriptor models.TaskDescriptor) task.Task {
taskDescriptor, err := models.DecodeTaskDescriptor[TaskDescriptor](descriptor)
if err != nil {
panic(err)
}
if taskDescriptor == "directory" {
return func(
ctx context.Context,
logger logging.Logger,
scheduler task.Scheduler,
) error {
logger.Infof("Opening directory '%s'...", c.cfg.Directory)
dir, err := os.ReadDir(c.cfg.Directory)
if err != nil {
return err
}
logger.Infof("Found %d files", len(dir))
for _, file := range dir {
taskDescriptor, err := models.EncodeTaskDescriptor(TaskDescriptor(file.Name()))
if err != nil {
return err
}
if err := scheduler.Schedule(
ctx,
taskDescriptor,
models.TaskSchedulerOptions{
ScheduleOption: models.OPTIONS_RUN_NOW,
RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE,
},
); err != nil {
return err
}
}
return nil
}
}
return func(
ctx context.Context,
connectorID models.ConnectorID,
logger logging.Logger,
ingester ingestion.Ingester,
) error {
file, err := os.Open(filepath.Join(c.cfg.Directory, string(taskDescriptor)))
if err != nil {
return err
}
type JsonPayment struct {
CreatedAt time.Time `json:"created_at"`
Reference string `json:"reference"`
Amount *big.Int `json:"amount"`
Type string `json:"type"`
Status string `json:"status"`
Scheme string `json:"scheme"`
Asset string `json:"asset"`
}
jsonPayment := &JsonPayment{}
err = json.NewDecoder(file).Decode(jsonPayment)
if err != nil {
return err
}
return ingester.IngestPayments(ctx, ingestion.PaymentBatch{
{
Payment: &models.Payment{
ID: models.PaymentID{
PaymentReference: models.PaymentReference{
Reference: jsonPayment.Reference,
Type: models.PaymentType(jsonPayment.Type),
},
ConnectorID: connectorID,
},
CreatedAt: jsonPayment.CreatedAt,
Reference: jsonPayment.Reference,
Amount: jsonPayment.Amount,
ConnectorID: connectorID,
Type: models.PaymentType(jsonPayment.Type),
Status: models.PaymentStatus(jsonPayment.Status),
Scheme: models.PaymentScheme(jsonPayment.Scheme),
Asset: models.Asset(jsonPayment.Asset),
},
},
})
}
}
var _ connectors.Connector = &Connector{}
func newConnector(logger logging.Logger, cfg Config) *Connector {
return &Connector{
logger: logger.WithFields(map[string]any{
"component": "connector",
}),
cfg: cfg,
}
}