30日間の無料評価版をお試しいただけます。

 

ステップの処理ロジックを機能させるためには、数多くのメソッドを実装しなくてはいけません。一部のメソッドは、ロウ(行)ステップか、またはキャッシュステップかに応じて異なります。

一般的なメソッド

以下は、ステップのデータ処理方法を定義する一般的なメソッドの例です。

メソッド説明と例
public ETLStepAPIVersion getAPIVersion()

ステップの更新を維持するYellowfinステップAPIバージョンの定義に使用するメソッドです。これは通常、列挙型ETLStepAPIVersion内の最新のバージョンです。APIバージョンは、互換性を判断するために使用されます。

@Override
public ETLStepAPIVersion getAPIVersion() {
    return ETLStepAPIVersion.V1;
}

 

 

public Collection<ETLException> validate()

事前実行検証を行うメソッドです。実装は、必須オプションが設定されているか、資格情報は正確か、ホストに到達可能かを確認しなくてはいけません。エラーは、ETLExceptionのインスタンスに取得され、メソッドから返されるコレクションに追加されます。または、ETLExceptionを構築する代わりに、便宜メソッドであるgetInvalidConfigETLException()を使用することもできます。

@Override
public Collection<ETLException> validate() {
    List<ETLException> validationErrors = new ArrayList<>();
            
    String exampleOption = this.getStepOption("APPEND_VALUE");
    if (exampleOption == null) {
        // Add a generic message "Step not Properly Configured"
        validationErrors.add(this.getInvalidConfigETLException());
    }
  
    try {
        Integer.parseInt(exampleOption);
    } catch (NumberFormatException e) {
        ETLException ve = new ETLException(ETLElement.STEP, getUuid(), null, "Option is not a number", e);
        validationErrors.add(ve);
    }
    return validationErrors;
}

   

public Map<String, String> getValidatedStepOptions()

ステップオプションを有効にするメソッドです。optionKeyからoptionValueへのマッピングは、Yellowfinリポジトリデータベースに保存されます。こちらで、無効なオプション値を削除することができます。this.getStepOptions()により返されるマッピングの操作には影響しません。

@Override
public Map<String, String> getValidatedStepOptions() {
    Map<String, String> stepOptions = this.getStepOptions();
    String exampleOption = stepOptions.get("APPEND_VALUE");
    if (exampleOption == null) {
        // Remove option if the value is no longer set
        stepOptions.remove("APPEND_VALUE");
    } else {           
        try {
            Integer.parseInt(exampleOption);
        } catch (NumberFormatException e) {
            // Remove option if the value is not an integer
            stepOptions.remove("APPEND_VALUE");
        }
    }
    // Return the map of valid options
    return stepOptions;
}
public void setupGeneratedFields()

ステップが新規フィールドの出力を必要とする場合は、こちらのメソッドを実装します。新規フィールド内のデータは、他のフィールドを使用して生成されます。新規フィールドは既存フィールドを置き換えるか、複製する場合もあります。Yellowfinは、各操作に便宜メソッドを提供します。こちらのメソッドは、ETLStepMetadataFieldBeanの新規インスタンスを作成するか、既存フィールドを複製することが予想されます。フィールドが事前に設定されていないか、再度設定する必要がある場合にのみ、こちらのメソッドを実行することが重要です。オプション内の変更が原因でフィールドが再作成された場合は、古いフィールドを削除しなくてはいけません。削除をしない場合、ステップが再設定される度に新規フィールドが生成されることになります。

@Override
public void setupGeneratedFields() throws ETLException {
    if (getStepOption("NEW_FIELD") != null) {
        // The field was already set up.
        return;
    }
 
    ETLStepMetadataFieldBean newField = new ETLStepMetadataFieldBean();
    newField.setFieldName("Concatenated Field");
    newField.setFieldType(ETLDataType.TEXT.name());
 
    // The sort order is 0 based, so the new field will be at the end
    newField.setSortOrder(getDefaultMetadataFields().size());
     
    // Ensure that the new field is output from the step
    newField.setStepIncludeField(true);
    newField.setUserIncludeField(true);
     
    // This method assigns the field a new UUID and
    // adds a Step Option to help reference it elsewhere.
    this.addNewGeneratedField(newField, "NEW_FIELD");
}

上記は、新規フィールドを生成する例です。フィールドを複製するには、以下を使用します。

this.addGeneratedField(newFieldBean, ETLFieldLinkType.DUPLICATE, originalFieldUUID)

 

既存フィールドを置き換えるには、以下を使用します。

this.replaceDefaultField(fieldToReplace)

上記は、元のフィールドにリンクする、新規「置き換え」フィールドを返します。オブジェクトは変更しても構いませんが、linkFieldUUIDとlinkTypeは変更してはいけません。

 

元のフィールドに戻り、置き換えたフィールドを削除するには、以下のコードを使用します。

this.restoreReplacedField(replacementField)

public Integer getMinInputSteps()

public Integer getMaxInputSteps()

public Integer getMinOutputSteps()

public Integer getMaxOutputSteps()

ステップに複数のインプットやアウトプットがある場合、これらのメソッドを上書きしなくてはいけません。Yellowfinは、ステップカテゴリーに基づき最小値/最大値を返すデフォルト実装を提供します。これらの値は、これらのカテゴリーのETLStepCategory列挙型要素内で定義されます。

YFLogger

これはメソッドではありませんが、すべてのステップに共通します。ステップは、YFLoggerを使用して、データトランスフォーメーションログに書き込むことができます。これは、インスタンス変数として宣言する必要があります。YFLoggerは、log4jのロガーclassのラッパです。

private static final YFLogger log = YFLogger.getLogger(TestStep.class.getName());

 

 

ロウ(行)ステップ実装

ロウ(行)ステップは、AbstractETLRowStep classを拡張します。これには、ひとつのメソッド(processWireData())の実装のみが必要です。

 

processWireData()

フレームワークが、processWireData()を呼び出すと、現在のロウ(行)の各カラム(列)からのデータは、すでに適切なWire上にあります。各wireはメタデータフィールドにマッピングされ、this.getWireForField(fieldUUID)を使用してアクセスされます。データはwireから取得され、処理され、同一のwire、または別のwireに戻されます。

 

  • メソッドパラメーター:メソッドはひとつのパラメーター(List<ETLStepMetadataFieldBean>)を持ちます。これらは、インプットステップからのフィールドです。これらは、便宜のために提供されます。これらはwireを取得するために使用されるかもしれませんが、以下に示すように、デフォルトメタデータフィールドの使用をお勧めします。
  • 戻り値:戻り値はbooleanであり、データのロウ(行)を次のステップに出力すべきかどうかを示します。例えばフィルターステップの場合、ロウ(行)内のデータはフィルター条件を満たさないため、falseが返されます。
  • 例外:ETLExceptionInterruptedExceptionをthrowするメソッドです。処理エラーがある場合は、ETLExceptionのインスタンスにラップし、throwすることで、Yellowfinはそれをユーザーに表示することができます。便宜メソッド(this.throwUnhandledETLException(e))を使用することもできます。例外はcatchや、swallowをしてはいけません。interruptedExceptionもcatchされるため、java.lang.Exceptionのcatchはお勧めできません。これが避けられない場合は、InterruptedExceptionをcatchし、異なるcatchブロックにthrowしなくてはいけません。

    コードの例
} catch (InterruptedException e) {
    throw e;
} catch (Exception e) {
    log.error("Error: " + e, e);
    throwUnhandledETLException(e);
}

 

こちらは、特定のフィールドに数値を追加する実装の例です。

@Override
protected boolean processWireData(List<ETLStepMetadataFieldBean> fields)
                                  throws ETLException, InterruptedException {
         
    // The options should've been validated by the validate() method,
    // so no need for further checks here
    String appendFieldUUID = this.getStepOption("APPEND_FIELD");
    String newFieldUUID = this.getStepOption("NEW_FIELD");
    String appendValue = this.getStepOption("APPEND_VALUE");
  
    Wire<Object, String> appendFieldWire = this.getWireForField(appendFieldUUID);
    Wire<Object, String> newFieldWire = this.getWireForField(newFieldUUID);
  
    Object data = appendFieldWire.getValue();
    String newFieldData = null;
    if (data == null) {
        newFieldData = appendValue;
    } else {
        newFieldData = data.toString() + appendValue;
    }
     
    newFieldWire.send(newFieldData);
     
    return true;
}

こちらの例では、すべてのロウ(行)のデータでprocessWireData()が実行されます。実際には、後続のメソッドの呼び出しで変更されないオブジェクトは、メンバー変数にキャッシュしなくてはいけません。例えば、appendFieldUUID、newFieldUUID、appendValue、appnedFieldWire、newFieldWireはメンバー変数にすべきであり、processWoreData()が初めて実行された時にだけ設定されます。

 

 

キャッシュステップ実装

キャッシュステップは、AbstractETLCachedStepを拡張します。ひとつのメソッド(processEndRows())のみ実装しなくてはいけません。キャッシュステップは通常、ひとつ以上のインプットステップを持ちます。データ取得ステップはインプットを持たないため、キャッシュステップとして実装されることが多いです。例えばこれは、SQLクエリーの実行によりデータを生成します。

 

processEndRow()

  • インプットステップ:フレームワークは、プロセスが実行が開始されるとすぐに、インプットステップにprocessEndRows()を呼び出します。インプットステップは、データキャッシングを気にする必要がありません。しかし、実装はwireにデータを配置し、その出力へ送らなくてはいけません。こちらは、データ生成ステップのメソッドの実装例です。
@Override
protected void processEndRows() throws ETLException, InterruptedException {
 
    // Get the first output flow;
    // Useful for most steps which have a single output
    String outFlow = getFirstOutputFlow();
 
    // The step outputs four Generated fields.
    List<String> orderedFieldUUIDs = new ArrayList<>(4);
 
    // The step's implementation of setupGeneratedFields()
    // should set these up. Their UUIDs would've been saved as step options.
    orderedFieldUUIDs.add(getStepOption("FIELD1_UUID"));
    orderedFieldUUIDs.add(getStepOption("FIELD2_UUID"));
    orderedFieldUUIDs.add(getStepOption("FIELD3_UUID"));
    orderedFieldUUIDs.add(getStepOption("FIELD4_UUID"));
 
    // Sample Data
    String[] field1_data = {"Adventure", "Relaxation", "Culture", "Family"};
    int[] field2_data = {30, 32, 11, 44};
    Date[] field3_data = {new Date(103882823L), new Date(10388283323L),
                          new Date(103883232823L), new Date(102323882823L)};
    Timestamp[] field4_data = {new Timestamp(103882823L), new Timestamp(10388283323L),
                              new Timestamp(103883232823L), new Timestamp(102323882823L)};
 
    // Generate as many rows as configured in Step Option ROW_COUNT
    int rowCount = 10;//Integer.parseInt(getStepOption("ROW_COUNT"));
    Random random = new Random();
 
    for (int i = 0 ; i < rowCount ; i++) {
        // Data is emitted in packets.
        // This implementation creates a new packet for every row.
        // Data packets can accumulate rows and emit, say, every 20 rows.
        ETLStepResult dataPacket = getFreshDataPacket(outFlow);
 
        Object[] row = new Object[4];
        row[0] = field1_data[random.nextInt(4)];
        row[1] = field2_data[random.nextInt(4)];
        row[2] = field3_data[random.nextInt(4)];
        row[3] = field4_data[random.nextInt(4)];
 
        // Send the row of data from Default Fields to Output Fields
        beginInternalTransmission(row, orderedFieldUUIDs);
 
        // Accumulate transmitted data in a data packet
        endInternalTransmission(dataPacket);
 
        // Emit the packet of data to the next step.
        // This may be done less frequently, after accumulating rows
        emitData(dataPacket);
    }
}

 

  • トランスフォーメーションステップ:フレームワークは、すべてのインプットがキャッシュステップへのデータ送信を終了した時に、キャッシュトランスフォーメーションステップにprocessEndRows()を呼び出します。各インプットステップのデータは、異なるメモリーキャッシュに保存されます。ステップ実装は必ずwireにデータを送信し、ステップからのデータをそのアウトプットへ送らなくてはいけません。以下の例は、Union-Allステップのprocを実装し、データキャッシュがどのように使用されるのかを示しています。
@Override
protected void processEndRows() throws ETLException, InterruptedException {
    // Get input flows which feed data to this step
    Set<String> inputFlowUuids = this.getInputFlowUuids();
 
    // Get the output flow as there can be only one
    String outFlowUuid = getFirstOutputFlow();
 
    // Get a data packet
    ETLStepResult dataPacket = getFreshDataPacket(outFlowUuid);
 
    for (String inputFlowUuid : inputFlowUuids) {
        // Get data of each input from its cache
        ETLDataCache inputData = getDataCache(inputFlowUuid);
 
        // Use this to get the Default Metadata Field corresponding to an Input Field.
        // Input Metadata Field is the field in the input step.
        // Cached data will be in the order of input fields.
        Map<String, String> inputToDefaultFieldMap = getInputToDefaultFieldMap();
 
        // The data will match the Input Metadata Fields
        List<ETLStepMetadataFieldBean> inputFieldList = inputData.getMetadataFields();
        List<String> unionResultFields = new ArrayList<String>();
 
        for(ETLStepMetadataFieldBean fieldBean : inputFieldList){
            // Get the Default Metadata Field for an Input Metadata Field
            String inputFieldUuid = fieldBean.getEtlStepMetadataFieldUUID();
            String defaultFieldUuid = inputToDefaultFieldMap.get(inputFieldUuid);
 
            // Get the Generated Default Metadata Field holding the result of the Union.
            // getUnionFieldForDefaultField() is a method defined in the Step.
            // The result of the union operation is sent to new generated fields.
            // It figures out how a Default Field is linked to the generated field.
            String unionResultField = getUnionFieldForDefaultField(defaultFieldUuid);
 
            // This holds the Union field corresponding to the Input Field.
            // Fields which are excluded from the union will have a null entry.
            unionResultFields.add(unionResultField);   
        }
 
        // Iterate through the cached data
        Iterator<Object[]> it = inputData.iterator();
 
        while (it.hasNext()) {
            Object[] row = it.next();
 
            // Transmit data from:
            // input fields -> default -> generated default (union fields) -> output
            // Data excluded from the union will have a "null" field,
            // so nothing will be transmitted.
            this.beginInternalTransmission(row, unionResultFields);
            this.endInternalTransmission(dataPacket);
        }
    }
 
    // Accumulate all data before emitting to the next step
    emitData(dataPacket);
}